diff --git a/apps/efka/include/message.hrl b/apps/efka/include/message.hrl index f5f13c9..611b098 100644 --- a/apps/efka/include/message.hrl +++ b/apps/efka/include/message.hrl @@ -28,6 +28,9 @@ -define(MESSAGE_DATA, 16#06). -define(MESSAGE_EVENT, 16#07). +%% efka主动上报的event-stream流, 单向消息,主要是: docker-create的实时处理逻辑上报 +-define(MESSAGE_EVENT_STREAM, 16#08). + %% 响应数据 -define(MESSAGE_RPC_REPLY, 16#FF). @@ -80,4 +83,9 @@ service_id :: binary(), event_type :: integer(), params :: binary() +}). + +-record(event_stream, { + task_id :: integer(), + stream :: binary() }). \ No newline at end of file diff --git a/apps/efka/src/docker/docker_commands.erl b/apps/efka/src/docker/docker_commands.erl index e7809a5..177ce44 100644 --- a/apps/efka/src/docker/docker_commands.erl +++ b/apps/efka/src/docker/docker_commands.erl @@ -13,7 +13,7 @@ -export([pull_image/1, check_image_exist/1]). -export([create_container/3, check_container_exist/1, is_container_running/1, start_container/1, stop_container/1]). --spec pull_image(Image :: binary()) -> ok | {error, Reason :: any()}. +-spec pull_image(Image :: binary()) -> {ok, Output :: binary()} | {error, Reason :: any()}. pull_image(Image) when is_binary(Image) -> %% todo 重定向错误流 {stderr_to_stdout, true} PortSettings = [stream, exit_status, use_stdio, binary], @@ -24,7 +24,7 @@ pull_image(Image) when is_binary(Image) -> case gather_output(Port) of {0, Output} -> lager:debug("docker pull output: ~p", [Output]), - ok; + {ok, Output}; {ExitCode, Error} -> lager:debug("call me here: ~p", [Error]), {error, {ExitCode, Error}} diff --git a/apps/efka/src/docker/docker_deployer.erl b/apps/efka/src/docker/docker_deployer.erl index e9b3813..9ee9f2f 100644 --- a/apps/efka/src/docker/docker_deployer.erl +++ b/apps/efka/src/docker/docker_deployer.erl @@ -164,41 +164,38 @@ code_change(_OldVsn, State = #state{}, _Extra) -> % "command": ["nginx", "-g", "daemon off;"], % "restart": "always" %} --spec do_deploy(TaskId :: integer(), ContainerDir :: string(), Config :: map()) -> ok | {error, Reason :: any()}. +-spec do_deploy(TaskId :: integer(), ContainerDir :: string(), Config :: map()) -> no_return(). do_deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) -> %% 尝试拉取镜像 ContainerName = maps:get(<<"container_name">>, Config), - Image0 = maps:get(<<"image">>, Config), - Image = normalize_image(Image0), + efka_remote_agent:task_event_stream(TaskId, <<"开始部署容器:"/utf8, ContainerName/binary>>), - case try_pull_image(Image) of - ok -> - %% 创建container - %% 如果存在envs参数,则生成环境变量参数文件 - %maybe_create_env_file(ContainerDir, maps:get(<<"envs">>, Config, [])), - %% 创建镜像, 并预留配置文件的绑定位置: "/etc/容器名称/" - case docker_commands:check_container_exist(ContainerName) of - true -> - {error, <<"container exist">>}; - false -> - case docker_commands:create_container(ContainerName, ContainerDir, Config) of - {ok, ContainerId} -> - {ok, ContainerId}; - {error, Reason} -> - {error, Reason} - end - end; - {error, Reason} -> - {error, Reason} - end. - --spec try_pull_image(Image :: binary()) -> ok | {error, Reason :: any()}. -try_pull_image(Image) when is_binary(Image) -> - case docker_commands:check_image_exist(Image) of + case docker_commands:check_container_exist(ContainerName) of true -> - ok; + efka_remote_agent:task_event_stream(TaskId, <<"本地容器已经存在:"/utf8, ContainerName/binary>>); false -> - docker_commands:pull_image(Image) + Image0 = maps:get(<<"image">>, Config), + Image = normalize_image(Image0), + efka_remote_agent:task_event_stream(TaskId, <<"使用镜像:"/utf8, Image/binary>>), + case docker_commands:check_image_exist(Image) of + true -> + efka_remote_agent:task_event_stream(TaskId, <<"镜像本地已存在:"/utf8, Image/binary>>); + false -> + efka_remote_agent:task_event_stream(TaskId, <<"开始拉取镜像:"/utf8, Image/binary>>), + case docker_commands:pull_image(Image) of + {ok, Output} -> + efka_remote_agent:task_event_stream(TaskId, Output), + efka_remote_agent:task_event_stream(TaskId, <<"开始创建容器: "/utf8, ContainerName/binary>>), + case docker_commands:create_container(ContainerName, ContainerDir, Config) of + {ok, ContainerId} -> + efka_remote_agent:task_event_stream(TaskId, <<"容器创建成功: "/utf8, ContainerId/binary>>); + {error, Reason} -> + efka_remote_agent:task_event_stream(TaskId, <<"容器创建失败: "/utf8, Reason/binary>>) + end; + {error, Reason} -> + efka_remote_agent:task_event_stream(TaskId, <<"镜像拉取失败:"/utf8, Reason/binary>>) + end + end end. maybe_create_env_file(_ContainerDir, []) -> diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index d857b7c..5fdd9b0 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -15,7 +15,7 @@ %% API -export([start_link/0]). --export([metric_data/4, event/3, ping/13]). +-export([metric_data/4, event/3, ping/13, task_event_stream/2]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). @@ -49,6 +49,10 @@ metric_data(ServiceId, DeviceUUID, RouteKey, Metric) when is_binary(ServiceId), event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) -> gen_statem:cast(?SERVER, {event, ServiceId, EventType, Params}). +-spec task_event_stream(TaskId :: integer(), Stream :: binary()) -> no_return(). +task_event_stream(TaskId, Stream) when is_integer(TaskId), is_binary(Stream) -> + gen_statem:cast(?SERVER, {task_event_stream, TaskId, Stream}). + ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) -> gen_statem:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}). @@ -111,7 +115,7 @@ handle_event(cast, {event, ServiceId, EventType, Params}, ?STATE_ACTIVATED, Stat }), efka_transport:send(TransportPid, EventPacket), {keep_state, State}; -handle_event(cast, {event, ServiceId, EventType, Params}, ?STATE_ACTIVATED, State) -> +handle_event(cast, {event, ServiceId, EventType, Params}, _, State) -> EventPacket = message_codec:encode(?MESSAGE_EVENT, #event{ service_id = ServiceId, event_type = EventType, @@ -120,6 +124,18 @@ handle_event(cast, {event, ServiceId, EventType, Params}, ?STATE_ACTIVATED, Stat ok = cache_model:insert(EventPacket), {keep_state, State}; +handle_event(cast, {task_event_stream, TaskId, Stream}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> + EventPacket = message_codec:encode(?MESSAGE_EVENT_STREAM, #event_stream{ + task_id = TaskId, + stream = Stream + }), + efka_transport:send(TransportPid, EventPacket), + {keep_state, State}; + +%% 其他情况下直接忽略 +handle_event(cast, {task_event_stream, _TaskId, _Stream}, _, State = #state{}) -> + {keep_state, State}; + %handle_event(cast, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}, ?STATE_ACTIVATED, % State = #state{transport_pid = TransportPid}) -> % diff --git a/apps/efka/src/message/message_codec.erl b/apps/efka/src/message/message_codec.erl index 6e2927b..4283b84 100644 --- a/apps/efka/src/message/message_codec.erl +++ b/apps/efka/src/message/message_codec.erl @@ -66,6 +66,11 @@ encode0(#event{service_id = ServiceId, event_type = EventType, params = Params}) marshal(?Bytes, ServiceId), marshal(?I32, EventType), marshal(?Bytes, Params) + ]); +encode0(#event_stream{task_id = TaskId, stream = Stream}) -> + iolist_to_binary([ + marshal(?Bytes, TaskId), + marshal(?I32, Stream) ]). -spec decode(Bin :: binary()) -> {ok, Message :: any()} | error. @@ -92,6 +97,8 @@ decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) -> {ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}}; decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) -> {ok, #event{service_id = ServiceId, event_type = EventType, params = Params}}; +decode0(?MESSAGE_EVENT_STREAM, [TaskId, Stream]) -> + {ok, #event_stream{task_id = TaskId, stream = Stream}}; decode0(_, _) -> error.