From 74d68340b502ccbb21369ff986ca8eb22aacdd4f Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 26 Sep 2025 13:44:04 +0800 Subject: [PATCH] fix deployer --- apps/efka/src/docker/docker_deployer.erl | 37 ++++++++++++------------ apps/efka/src/efka_remote_agent.erl | 15 +++++++++- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/apps/efka/src/docker/docker_deployer.erl b/apps/efka/src/docker/docker_deployer.erl index 3defaae..050687e 100644 --- a/apps/efka/src/docker/docker_deployer.erl +++ b/apps/efka/src/docker/docker_deployer.erl @@ -24,11 +24,6 @@ -define(SERVER, ?MODULE). --define(STREAM_TYPE_INFO, <<"info">>). --define(STREAM_TYPE_ERROR, <<"error">>). --define(STREAM_TYPE_WARNING, <<"warning">>). --define(STREAM_TYPE_NOTICE, <<"notice">>). - -record(state, { root_dir :: string(), task_id :: integer(), @@ -174,34 +169,35 @@ code_change(_OldVsn, State = #state{}, _Extra) -> do_deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) -> %% 尝试拉取镜像 ContainerName = maps:get(<<"container_name">>, Config), - efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"开始部署容器:"/utf8, ContainerName/binary>>), + efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"开始部署容器:"/utf8, ContainerName/binary>>), case docker_commands:check_container_exist(ContainerName) of true -> - efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"本地容器已经存在:"/utf8, ContainerName/binary>>); + efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"本地容器已经存在:"/utf8, ContainerName/binary>>), + efka_remote_agent:task_event_stream_close(TaskId); false -> Image0 = maps:get(<<"image">>, Config), Image = normalize_image(Image0), - efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"使用镜像:"/utf8, Image/binary>>), + efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"使用镜像:"/utf8, Image/binary>>), PullResult = case docker_commands:check_image_exist(Image) of true -> - efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"镜像本地已存在:"/utf8, Image/binary>>), + efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"镜像本地已存在:"/utf8, Image/binary>>), ok; false -> - efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"开始拉取镜像:"/utf8, Image/binary>>), + efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"开始拉取镜像:"/utf8, Image/binary>>), CB = fun ({message, Msg}) -> - efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, Msg); + efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg); ({error, Error}) -> - efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_ERROR, Error) + efka_remote_agent:task_event_stream(TaskId, <<"error">>, Error) end, docker_commands:pull_image(Image, CB) end, case PullResult of ok -> - efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"开始创建容器: "/utf8, ContainerName/binary>>), + efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"开始创建容器: "/utf8, ContainerName/binary>>), case docker_commands:create_container(ContainerName, ContainerDir, Config) of {ok, ContainerId} -> %% 创建容器对应的配置文件 @@ -212,17 +208,20 @@ do_deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(Contain file:close(FD); {error, Reason} -> Reason1 = list_to_binary(io_lib:format("~p", [Reason])), - efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_NOTICE, <<"创建配置文件失败: "/utf8, Reason1/binary>>) + efka_remote_agent:task_event_stream(TaskId, <<"notice">>, <<"创建配置文件失败: "/utf8, Reason1/binary>>) end, ShortContainerId = binary:part(ContainerId, 1, 12), - efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"容器创建成功: "/utf8, ShortContainerId/binary>>), - efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"任务完成"/utf8>>); + efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"容器创建成功: "/utf8, ShortContainerId/binary>>), + efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"任务完成"/utf8>>), + efka_remote_agent:task_event_stream_close(TaskId); {error, Reason} -> - efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_ERROR, <<"容器创建失败: "/utf8, Reason/binary>>), - efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_ERROR, <<"任务失败"/utf8>>) + efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"容器创建失败: "/utf8, Reason/binary>>), + efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"任务失败"/utf8>>), + efka_remote_agent:task_event_stream_close(TaskId) end; {error, Reason} -> - efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_ERROR, <<"镜像拉取失败: "/utf8, Reason/binary>>) + efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"镜像拉取失败: "/utf8, Reason/binary>>), + efka_remote_agent:task_event_stream_close(TaskId) end end. diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index 369be57..ced0c7f 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -15,7 +15,7 @@ %% API -export([start_link/0]). --export([metric_data/4, event/3, ping/13, task_event_stream/3]). +-export([metric_data/4, event/3, ping/13, task_event_stream/3, task_event_stream_close/1]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). @@ -53,6 +53,10 @@ event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventT task_event_stream(TaskId, Type, Stream) when is_integer(TaskId), is_binary(Type), is_binary(Stream) -> gen_statem:cast(?SERVER, {task_event_stream, TaskId, Type, Stream}). +-spec task_event_stream_close(TaskId :: integer()) -> no_return(). +task_event_stream_close(TaskId) when is_integer(TaskId) -> + gen_statem:cast(?SERVER, {task_event_stream_close, TaskId}). + ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) -> gen_statem:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}). @@ -134,6 +138,15 @@ handle_event(cast, {task_event_stream, TaskId, Type, Stream}, ?STATE_ACTIVATED, efka_transport:send(TransportPid, EventPacket), {keep_state, State}; +handle_event(cast, {task_event_stream_close, TaskId}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> + EventPacket = message_codec:encode(?MESSAGE_EVENT_STREAM, #task_event_stream{ + task_id = TaskId, + type = <<"close">>, + stream = <<>> + }), + efka_transport:send(TransportPid, EventPacket), + {keep_state, State}; + %% 其他情况下直接忽略 handle_event(cast, {task_event_stream, _TaskId, _Stream}, _, State = #state{}) -> {keep_state, State};