diff --git a/apps/efka/src/docker/docker_deployer.erl b/apps/efka/src/docker/docker_deployer.erl index c772eec..964315e 100644 --- a/apps/efka/src/docker/docker_deployer.erl +++ b/apps/efka/src/docker/docker_deployer.erl @@ -14,6 +14,9 @@ -export([start_monitor/3]). -export([deploy/3]). +-define(TASK_SUCCESS, <<"success">>). +-define(TASK_FAIL, <<"fail">>). + %%%=================================================================== %%% API %%%=================================================================== @@ -46,7 +49,7 @@ deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerD case docker_commands:check_container_exist(ContainerName) of true -> trace_log(TaskId, <<"info">>, <<"本地容器已经存在:"/utf8, ContainerName/binary>>), - efka_remote_agent:close_task_event_stream(TaskId); + efka_remote_agent:close_task_event_stream(TaskId, ?TASK_FAIL); false -> Image0 = maps:get(<<"image">>, Config), Image = normalize_image(Image0), @@ -85,15 +88,15 @@ deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerD ShortContainerId = binary:part(ContainerId, 1, 12), trace_log(TaskId, <<"info">>, <<"容器创建成功: "/utf8, ShortContainerId/binary>>), trace_log(TaskId, <<"info">>, <<"任务完成"/utf8>>), - efka_remote_agent:close_task_event_stream(TaskId); + efka_remote_agent:close_task_event_stream(TaskId, ?TASK_SUCCESS); {error, Reason} -> trace_log(TaskId, <<"error">>, <<"容器创建失败: "/utf8, Reason/binary>>), trace_log(TaskId, <<"error">>, <<"任务失败"/utf8>>), - efka_remote_agent:close_task_event_stream(TaskId) + efka_remote_agent:close_task_event_stream(TaskId, ?TASK_FAIL) end; {error, Reason} -> trace_log(TaskId, <<"error">>, <<"镜像拉取失败: "/utf8, Reason/binary>>), - efka_remote_agent:close_task_event_stream(TaskId) + efka_remote_agent:close_task_event_stream(TaskId, ?TASK_FAIL) end end. diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index fc01238..20f4663 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -15,7 +15,7 @@ %% API -export([start_link/0]). --export([metric_data/4, event/3, ping/13, task_event_stream/3, close_task_event_stream/1]). +-export([metric_data/4, event/3, ping/13, task_event_stream/3, close_task_event_stream/2]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). @@ -53,9 +53,9 @@ event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventT task_event_stream(TaskId, Type, Stream) when is_integer(TaskId), is_binary(Type), is_binary(Stream) -> gen_statem:cast(?SERVER, {task_event_stream, TaskId, Type, Stream}). --spec close_task_event_stream(TaskId :: integer()) -> no_return(). -close_task_event_stream(TaskId) when is_integer(TaskId) -> - gen_statem:cast(?SERVER, {close_task_event_stream, TaskId}). +-spec close_task_event_stream(TaskId :: integer(), Reason :: binary()) -> no_return(). +close_task_event_stream(TaskId, Reason) when is_integer(TaskId), is_binary(Reason) -> + gen_statem:cast(?SERVER, {close_task_event_stream, TaskId, Reason}). ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) -> gen_statem:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}). @@ -138,11 +138,11 @@ handle_event(cast, {task_event_stream, TaskId, Type, Stream}, ?STATE_ACTIVATED, efka_transport:send(TransportPid, EventPacket), {keep_state, State}; -handle_event(cast, {close_task_event_stream, TaskId}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(cast, {close_task_event_stream, TaskId, Reason}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> EventPacket = message_codec:encode(?MESSAGE_EVENT_STREAM, #task_event_stream{ task_id = TaskId, type = <<"close">>, - stream = <<>> + stream = Reason }), efka_transport:send(TransportPid, EventPacket), {keep_state, State};