This commit is contained in:
anlicheng 2025-09-16 17:31:58 +08:00
parent 0b0043d98d
commit d2ab014258
2 changed files with 20 additions and 41 deletions

View File

@ -31,9 +31,7 @@
%% id信息
channel_pid :: pid() | undefined,
%%
meta :: binary(),
%% port信息, OSPid = erlang:port_info(Port, os_pid)
port :: undefined | port(),
meta_tag :: binary(),
inflight = #{},
%%
@ -95,10 +93,10 @@ init([ContainerId]) ->
erlang:process_flag(trap_exit, true),
case efka_docker_command:is_container_running(ContainerId) of
true ->
{ok, Port} = wait_container(ContainerId),
{ok, #state{container_id = ContainerId, status = ?STATUS_RUNNING, port = Port}};
efka_docker_events:monitor_container(self(), ContainerId),
{ok, #state{container_id = ContainerId, status = ?STATUS_RUNNING}};
false ->
lager:debug("[efka_service] container_id: ~p", [ContainerId]),
efka_docker_events:monitor_container(self(), ContainerId),
{ok, #state{container_id = ContainerId, status = ?STATUS_STOPPED}}
end.
@ -113,12 +111,12 @@ init([ContainerId]) ->
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
%% channel
handle_call({attach_channel, ChannelPid, Meta}, _From, State = #state{channel_pid = OldChannelPid, container_id = ContainerId}) ->
handle_call({attach_channel, ChannelPid, MetaTag}, _From, State = #state{channel_pid = OldChannelPid, container_id = ContainerId}) ->
case is_pid(OldChannelPid) andalso is_process_alive(OldChannelPid) of
false ->
erlang:monitor(process, ChannelPid),
lager:debug("[efka_service] service_id: ~p, channel attched", [ContainerId]),
{reply, ok, State#state{channel_pid = ChannelPid, meta = Meta}};
{reply, ok, State#state{channel_pid = ChannelPid, meta_tag = MetaTag}};
true ->
{reply, {error, <<"channel exists">>}, State}
end;
@ -132,16 +130,16 @@ handle_call(_Request, _From, State = #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({metric_data, DeviceUUID, RouteKey, Metric}, State = #state{container_id = ContainerId, meta = Meta}) ->
handle_cast({metric_data, DeviceUUID, RouteKey, Metric}, State = #state{container_id = ContainerId, meta_tag = MetaTag}) ->
lager:debug("[efka_service] container_id: ~p, meta: ~p, device_uuid: ~p, route_key: ~p, metric data: ~p",
[ContainerId, Meta, DeviceUUID, RouteKey, Metric]),
[ContainerId, MetaTag, DeviceUUID, RouteKey, Metric]),
%% meta相关的数据
efka_remote_agent:metric_data(Meta, DeviceUUID, RouteKey, Metric),
efka_remote_agent:metric_data(MetaTag, DeviceUUID, RouteKey, Metric),
{noreply, State};
handle_cast({send_event, EventType, Params}, State = #state{container_id = ContainerId, meta = Meta}) ->
efka_remote_agent:event(Meta, EventType, Params),
lager:debug("[efka_service] send_event, container_id: ~p, meta: ~p, event_type: ~p, params: ~p", [ContainerId, Meta, EventType, Params]),
handle_cast({send_event, EventType, Params}, State = #state{container_id = ContainerId, meta_tag = MetaTag}) ->
efka_remote_agent:event(MetaTag, EventType, Params),
lager:debug("[efka_service] send_event, container_id: ~p, meta: ~p, event_type: ~p, params: ~p", [ContainerId, MetaTag, EventType, Params]),
{noreply, State};
%%
@ -188,19 +186,11 @@ handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight, cal
{noreply, State#state{inflight = NInflight, callbacks = trigger_callback(Ref, Callbacks)}}
end;
handle_info({Port, {data, Data}}, State = #state{container_id = ContainerId}) when is_port(Port) ->
lager:debug("[efka_service] service_id: ~p, port data: ~p", [ContainerId, Data]),
{noreply, State};
handle_info({docker_events, start}, State) ->
{noreply, State#state{status = ?STATUS_RUNNING}};
%% port的消息, Port的被动关闭会触发Port和State.port的值是相等的
handle_info({Port, {exit_status, Code}}, State = #state{container_id = ContainerId}) when is_port(Port) ->
lager:debug("[efka_service] service_id: ~p, port: ~p, exit with code: ~p", [ContainerId, Port, Code]),
{noreply, State#state{port = undefined}};
%% port的退出消息
handle_info({'EXIT', Port, Reason}, State = #state{container_id = ContainerId}) when is_port(Port) ->
lager:debug("[efka_service] service_id: ~p, port: ~p, exit with reason: ~p", [ContainerId, Port, Reason]),
{noreply, State#state{port = undefined}};
handle_info({docker_events, stop}, State) ->
{noreply, State#state{status = ?STATUS_STOPPED}};
%% channel进程的退出
handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, container_id = ContainerId}) ->
@ -214,8 +204,7 @@ handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, _State = #state{container_id = ContainerId, port = Port}) ->
erlang:is_port(Port) andalso erlang:port_close(Port),
terminate(Reason, _State = #state{container_id = ContainerId}) ->
lager:debug("[efka_service] service_id: ~p, terminate with reason: ~p", [ContainerId, Reason]),
ok.
@ -231,17 +220,6 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%% Internal functions
%%%===================================================================
-spec wait_container(ContainerId :: binary()) -> {ok, Port :: erlang:port()} | error.
wait_container(ContainerId) when is_binary(ContainerId) ->
PortSettings = [stream, exit_status, use_stdio, binary],
ExecCmd = "docker wait " ++ binary_to_list(ContainerId),
case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of
Port when is_port(Port) ->
{ok, Port};
_Other ->
error
end.
-spec trigger_callback(Ref :: reference(), Callbacks :: map()) -> NewCallbacks :: map().
trigger_callback(Ref, Callbacks) ->
case maps:take(Ref, Callbacks) of

View File

@ -30,6 +30,7 @@
%%% API
%%%===================================================================
-spec monitor_container(ReceiverPid :: pid(), ContainerId :: binary()) -> no_return().
monitor_container(ReceiverPid, ContainerId) when is_pid(ReceiverPid), is_binary(ContainerId) ->
gen_server:cast(?SERVER, {monitor_container, ReceiverPid, ContainerId}).
@ -91,7 +92,7 @@ handle_info({timeout, _, attach_docker_events}, State = #state{port = undefined}
try_attach_events(5000),
{noreply, State}
end;
handle_info({Port, {data, BinLine}}, State = #state{port = Port}) ->
handle_info({Port, {data, {eol, BinLine}}}, State = #state{port = Port}) ->
Event = catch jiffy:decode(BinLine, [return_maps]),
lager:debug("event: ~p", [Event]),
handle_event(Event, State),
@ -129,7 +130,7 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
handle_event(#{type := <<"container">>, status := Status, id := Id}, #state{monitors = Monitors}) ->
handle_event(#{<<"Type">> := <<"container">>, <<"status">> := Status, <<"id">> := Id}, #state{monitors = Monitors}) ->
case maps:find(Id, Monitors) of
error ->
ok;