diff --git a/apps/efka/src/docker/efka_container.erl b/apps/efka/src/docker/efka_container.erl index 7851dc4..af2a682 100644 --- a/apps/efka/src/docker/efka_container.erl +++ b/apps/efka/src/docker/efka_container.erl @@ -31,9 +31,7 @@ %% 通道id信息 channel_pid :: pid() | undefined, %% 数据上传的时候,用来管理容器和微服务 - meta :: binary(), - %% 当前进程的port信息, OSPid = erlang:port_info(Port, os_pid) - port :: undefined | port(), + meta_tag :: binary(), inflight = #{}, %% 容器的运行状态 @@ -95,10 +93,10 @@ init([ContainerId]) -> erlang:process_flag(trap_exit, true), case efka_docker_command:is_container_running(ContainerId) of true -> - {ok, Port} = wait_container(ContainerId), - {ok, #state{container_id = ContainerId, status = ?STATUS_RUNNING, port = Port}}; + efka_docker_events:monitor_container(self(), ContainerId), + {ok, #state{container_id = ContainerId, status = ?STATUS_RUNNING}}; false -> - lager:debug("[efka_service] container_id: ~p", [ContainerId]), + efka_docker_events:monitor_container(self(), ContainerId), {ok, #state{container_id = ContainerId, status = ?STATUS_STOPPED}} end. @@ -113,12 +111,12 @@ init([ContainerId]) -> {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). %% 绑定channel -handle_call({attach_channel, ChannelPid, Meta}, _From, State = #state{channel_pid = OldChannelPid, container_id = ContainerId}) -> +handle_call({attach_channel, ChannelPid, MetaTag}, _From, State = #state{channel_pid = OldChannelPid, container_id = ContainerId}) -> case is_pid(OldChannelPid) andalso is_process_alive(OldChannelPid) of false -> erlang:monitor(process, ChannelPid), lager:debug("[efka_service] service_id: ~p, channel attched", [ContainerId]), - {reply, ok, State#state{channel_pid = ChannelPid, meta = Meta}}; + {reply, ok, State#state{channel_pid = ChannelPid, meta_tag = MetaTag}}; true -> {reply, {error, <<"channel exists">>}, State} end; @@ -132,16 +130,16 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({metric_data, DeviceUUID, RouteKey, Metric}, State = #state{container_id = ContainerId, meta = Meta}) -> +handle_cast({metric_data, DeviceUUID, RouteKey, Metric}, State = #state{container_id = ContainerId, meta_tag = MetaTag}) -> lager:debug("[efka_service] container_id: ~p, meta: ~p, device_uuid: ~p, route_key: ~p, metric data: ~p", - [ContainerId, Meta, DeviceUUID, RouteKey, Metric]), + [ContainerId, MetaTag, DeviceUUID, RouteKey, Metric]), %% 这里的数据需要转换成和meta相关的数据 - efka_remote_agent:metric_data(Meta, DeviceUUID, RouteKey, Metric), + efka_remote_agent:metric_data(MetaTag, DeviceUUID, RouteKey, Metric), {noreply, State}; -handle_cast({send_event, EventType, Params}, State = #state{container_id = ContainerId, meta = Meta}) -> - efka_remote_agent:event(Meta, EventType, Params), - lager:debug("[efka_service] send_event, container_id: ~p, meta: ~p, event_type: ~p, params: ~p", [ContainerId, Meta, EventType, Params]), +handle_cast({send_event, EventType, Params}, State = #state{container_id = ContainerId, meta_tag = MetaTag}) -> + efka_remote_agent:event(MetaTag, EventType, Params), + lager:debug("[efka_service] send_event, container_id: ~p, meta: ~p, event_type: ~p, params: ~p", [ContainerId, MetaTag, EventType, Params]), {noreply, State}; %% 推送配置项目 @@ -188,19 +186,11 @@ handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight, cal {noreply, State#state{inflight = NInflight, callbacks = trigger_callback(Ref, Callbacks)}} end; -handle_info({Port, {data, Data}}, State = #state{container_id = ContainerId}) when is_port(Port) -> - lager:debug("[efka_service] service_id: ~p, port data: ~p", [ContainerId, Data]), - {noreply, State}; +handle_info({docker_events, start}, State) -> + {noreply, State#state{status = ?STATUS_RUNNING}}; -%% 处理port的消息, Port的被动关闭会触发;因此这个时候的Port和State.port的值是相等的 -handle_info({Port, {exit_status, Code}}, State = #state{container_id = ContainerId}) when is_port(Port) -> - lager:debug("[efka_service] service_id: ~p, port: ~p, exit with code: ~p", [ContainerId, Port, Code]), - {noreply, State#state{port = undefined}}; - -%% 处理port的退出消息 -handle_info({'EXIT', Port, Reason}, State = #state{container_id = ContainerId}) when is_port(Port) -> - lager:debug("[efka_service] service_id: ~p, port: ~p, exit with reason: ~p", [ContainerId, Port, Reason]), - {noreply, State#state{port = undefined}}; +handle_info({docker_events, stop}, State) -> + {noreply, State#state{status = ?STATUS_STOPPED}}; %% 处理channel进程的退出 handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, container_id = ContainerId}) -> @@ -214,8 +204,7 @@ handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_ %% with Reason. The return value is ignored. -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). -terminate(Reason, _State = #state{container_id = ContainerId, port = Port}) -> - erlang:is_port(Port) andalso erlang:port_close(Port), +terminate(Reason, _State = #state{container_id = ContainerId}) -> lager:debug("[efka_service] service_id: ~p, terminate with reason: ~p", [ContainerId, Reason]), ok. @@ -231,17 +220,6 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== --spec wait_container(ContainerId :: binary()) -> {ok, Port :: erlang:port()} | error. -wait_container(ContainerId) when is_binary(ContainerId) -> - PortSettings = [stream, exit_status, use_stdio, binary], - ExecCmd = "docker wait " ++ binary_to_list(ContainerId), - case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of - Port when is_port(Port) -> - {ok, Port}; - _Other -> - error - end. - -spec trigger_callback(Ref :: reference(), Callbacks :: map()) -> NewCallbacks :: map(). trigger_callback(Ref, Callbacks) -> case maps:take(Ref, Callbacks) of diff --git a/apps/efka/src/docker/efka_docker_events.erl b/apps/efka/src/docker/efka_docker_events.erl index 923f230..0b9bd5f 100644 --- a/apps/efka/src/docker/efka_docker_events.erl +++ b/apps/efka/src/docker/efka_docker_events.erl @@ -30,6 +30,7 @@ %%% API %%%=================================================================== +-spec monitor_container(ReceiverPid :: pid(), ContainerId :: binary()) -> no_return(). monitor_container(ReceiverPid, ContainerId) when is_pid(ReceiverPid), is_binary(ContainerId) -> gen_server:cast(?SERVER, {monitor_container, ReceiverPid, ContainerId}). @@ -91,7 +92,7 @@ handle_info({timeout, _, attach_docker_events}, State = #state{port = undefined} try_attach_events(5000), {noreply, State} end; -handle_info({Port, {data, BinLine}}, State = #state{port = Port}) -> +handle_info({Port, {data, {eol, BinLine}}}, State = #state{port = Port}) -> Event = catch jiffy:decode(BinLine, [return_maps]), lager:debug("event: ~p", [Event]), handle_event(Event, State), @@ -129,7 +130,7 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -handle_event(#{type := <<"container">>, status := Status, id := Id}, #state{monitors = Monitors}) -> +handle_event(#{<<"Type">> := <<"container">>, <<"status">> := Status, <<"id">> := Id}, #state{monitors = Monitors}) -> case maps:find(Id, Monitors) of error -> ok;