diff --git a/apps/efka/include/message.hrl b/apps/efka/include/message.hrl index d3f956a..2f1b985 100644 --- a/apps/efka/include/message.hrl +++ b/apps/efka/include/message.hrl @@ -87,5 +87,6 @@ -record(task_event_stream, { task_id :: integer(), + type :: binary(), stream :: binary() }). \ No newline at end of file diff --git a/apps/efka/src/docker/docker_deployer.erl b/apps/efka/src/docker/docker_deployer.erl index e764bdf..3defaae 100644 --- a/apps/efka/src/docker/docker_deployer.erl +++ b/apps/efka/src/docker/docker_deployer.erl @@ -9,6 +9,7 @@ -module(docker_deployer). -author("anlicheng"). -include("efka_tables.hrl"). +-include("message.hrl"). -behaviour(gen_server). @@ -23,6 +24,11 @@ -define(SERVER, ?MODULE). +-define(STREAM_TYPE_INFO, <<"info">>). +-define(STREAM_TYPE_ERROR, <<"error">>). +-define(STREAM_TYPE_WARNING, <<"warning">>). +-define(STREAM_TYPE_NOTICE, <<"notice">>). + -record(state, { root_dir :: string(), task_id :: integer(), @@ -168,34 +174,34 @@ code_change(_OldVsn, State = #state{}, _Extra) -> do_deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) -> %% 尝试拉取镜像 ContainerName = maps:get(<<"container_name">>, Config), - efka_remote_agent:task_event_stream(TaskId, <<"开始部署容器:"/utf8, ContainerName/binary>>), + efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"开始部署容器:"/utf8, ContainerName/binary>>), case docker_commands:check_container_exist(ContainerName) of true -> - efka_remote_agent:task_event_stream(TaskId, <<"本地容器已经存在:"/utf8, ContainerName/binary>>); + efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"本地容器已经存在:"/utf8, ContainerName/binary>>); false -> Image0 = maps:get(<<"image">>, Config), Image = normalize_image(Image0), - efka_remote_agent:task_event_stream(TaskId, <<"使用镜像:"/utf8, Image/binary>>), + efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"使用镜像:"/utf8, Image/binary>>), PullResult = case docker_commands:check_image_exist(Image) of true -> - efka_remote_agent:task_event_stream(TaskId, <<"镜像本地已存在:"/utf8, Image/binary>>), + efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"镜像本地已存在:"/utf8, Image/binary>>), ok; false -> - efka_remote_agent:task_event_stream(TaskId, <<"开始拉取镜像:"/utf8, Image/binary>>), + efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"开始拉取镜像:"/utf8, Image/binary>>), CB = fun ({message, Msg}) -> - efka_remote_agent:task_event_stream(TaskId, Msg); + efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, Msg); ({error, Error}) -> - efka_remote_agent:task_event_stream(TaskId, Error) + efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_ERROR, Error) end, docker_commands:pull_image(Image, CB) end, case PullResult of ok -> - efka_remote_agent:task_event_stream(TaskId, <<"开始创建容器: "/utf8, ContainerName/binary>>), + efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"开始创建容器: "/utf8, ContainerName/binary>>), case docker_commands:create_container(ContainerName, ContainerDir, Config) of {ok, ContainerId} -> %% 创建容器对应的配置文件 @@ -206,17 +212,17 @@ do_deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(Contain file:close(FD); {error, Reason} -> Reason1 = list_to_binary(io_lib:format("~p", [Reason])), - efka_remote_agent:task_event_stream(TaskId, <<"创建配置文件失败: "/utf8, Reason1/binary>>) + efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_NOTICE, <<"创建配置文件失败: "/utf8, Reason1/binary>>) end, ShortContainerId = binary:part(ContainerId, 1, 12), - efka_remote_agent:task_event_stream(TaskId, <<"容器创建成功: "/utf8, ShortContainerId/binary>>), - efka_remote_agent:task_event_stream(TaskId, <<"任务完成"/utf8>>); + efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"容器创建成功: "/utf8, ShortContainerId/binary>>), + efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"任务完成"/utf8>>); {error, Reason} -> - efka_remote_agent:task_event_stream(TaskId, <<"容器创建失败: "/utf8, Reason/binary>>), - efka_remote_agent:task_event_stream(TaskId, <<"任务失败"/utf8>>) + efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_ERROR, <<"容器创建失败: "/utf8, Reason/binary>>), + efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_ERROR, <<"任务失败"/utf8>>) end; {error, Reason} -> - efka_remote_agent:task_event_stream(TaskId, <<"镜像拉取失败: "/utf8, Reason/binary>>) + efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_ERROR, <<"镜像拉取失败: "/utf8, Reason/binary>>) end end. diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index 350f5e9..369be57 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -15,7 +15,7 @@ %% API -export([start_link/0]). --export([metric_data/4, event/3, ping/13, task_event_stream/2]). +-export([metric_data/4, event/3, ping/13, task_event_stream/3]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). @@ -49,9 +49,9 @@ metric_data(ServiceId, DeviceUUID, RouteKey, Metric) when is_binary(ServiceId), event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) -> gen_statem:cast(?SERVER, {event, ServiceId, EventType, Params}). --spec task_event_stream(TaskId :: integer(), Stream :: binary()) -> no_return(). -task_event_stream(TaskId, Stream) when is_integer(TaskId), is_binary(Stream) -> - gen_statem:cast(?SERVER, {task_event_stream, TaskId, Stream}). +-spec task_event_stream(TaskId :: integer(), Type :: binary(), Stream :: binary()) -> no_return(). +task_event_stream(TaskId, Type, Stream) when is_integer(TaskId), is_binary(Type), is_binary(Stream) -> + gen_statem:cast(?SERVER, {task_event_stream, TaskId, Type, Stream}). ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) -> gen_statem:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}). @@ -124,10 +124,11 @@ handle_event(cast, {event, ServiceId, EventType, Params}, _, State) -> ok = cache_model:insert(EventPacket), {keep_state, State}; -handle_event(cast, {task_event_stream, TaskId, Stream}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(cast, {task_event_stream, TaskId, Type, Stream}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> lager:debug("[efka_remote_agent] event_stream task_id: ~p, stream: ~ts", [TaskId, Stream]), EventPacket = message_codec:encode(?MESSAGE_EVENT_STREAM, #task_event_stream{ task_id = TaskId, + type = Type, stream = Stream }), efka_transport:send(TransportPid, EventPacket), diff --git a/apps/efka/src/message/message_codec.erl b/apps/efka/src/message/message_codec.erl index ddb5078..eb34af5 100644 --- a/apps/efka/src/message/message_codec.erl +++ b/apps/efka/src/message/message_codec.erl @@ -67,9 +67,10 @@ encode0(#event{service_id = ServiceId, event_type = EventType, params = Params}) marshal(?I32, EventType), marshal(?Bytes, Params) ]); -encode0(#task_event_stream{task_id = TaskId, stream = Stream}) -> +encode0(#task_event_stream{task_id = TaskId, type = Type, stream = Stream}) -> iolist_to_binary([ marshal(?I32, TaskId), + marshal(?Bytes, Type), marshal(?Bytes, Stream) ]). @@ -97,8 +98,8 @@ decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) -> {ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}}; decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) -> {ok, #event{service_id = ServiceId, event_type = EventType, params = Params}}; -decode0(?MESSAGE_EVENT_STREAM, [TaskId, Stream]) -> - {ok, #task_event_stream{task_id = TaskId, stream = Stream}}; +decode0(?MESSAGE_EVENT_STREAM, [TaskId, Type, Stream]) -> + {ok, #task_event_stream{task_id = TaskId, type = Type, stream = Stream}}; decode0(_, _) -> error.