ekfa/apps/efka/src/docker/docker_events.erl
2025-09-17 10:15:09 +08:00

152 lines
5.6 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 16. 9月 2025 16:48
%%%-------------------------------------------------------------------
-module(docker_events).
-author("anlicheng").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([monitor_container/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {
port,
%% 观察者
monitors = #{}
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec monitor_container(ReceiverPid :: pid(), ContainerId :: binary()) -> no_return().
monitor_container(ReceiverPid, ContainerId) when is_pid(ReceiverPid), is_binary(ContainerId) ->
gen_server:cast(?SERVER, {monitor_container, ReceiverPid, ContainerId}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
process_flag(trap_exit, true),
try_attach_events(0),
{ok, #state{}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({monitor_container, ReceiverPid, ContainerId}, State = #state{monitors = Monitors}) ->
MRef = erlang:monitor(process, ReceiverPid),
{noreply, State#state{monitors = maps:put(ContainerId, {ReceiverPid, MRef}, Monitors)}}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, attach_docker_events}, State = #state{port = undefined}) ->
ExecCmd = "docker events --format \"{{json .}}\"",
case catch erlang:open_port({spawn, ExecCmd}, [exit_status, {line, 10239}, use_stdio, stderr_to_stdout, binary]) of
Port when is_port(Port) ->
{noreply, State#state{port = Port}};
_Error ->
try_attach_events(5000),
{noreply, State}
end;
handle_info({Port, {data, {eol, BinLine}}}, State = #state{port = Port}) ->
Event = catch jiffy:decode(BinLine, [return_maps]),
lager:debug("event: ~p", [Event]),
handle_event(Event, State),
{noreply, State};
%% 进程退出的时候删除掉管理的Pid
handle_info({'DOWN', MRef, process, _Pid, _Reason}, State = #state{monitors = Monitors}) ->
NMonitors = maps:filter(fun(_Key, {_, Ref}) -> MRef =/= Ref end, Monitors),
{noreply, State#state{monitors = NMonitors}};
%% Port退出的时候尝试重启
handle_info({'EXIT', Port, Reason}, State = #state{port = Port}) ->
lager:warning("[efka_docker_events] exit with reason: ~p", [Reason]),
try_attach_events(5000),
{noreply, State#state{port = undefined}}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
handle_event(#{<<"Type">> := <<"container">>, <<"status">> := Status, <<"id">> := Id}, #state{monitors = Monitors}) ->
case maps:find(Id, Monitors) of
error ->
ok;
{ok, {ReceiverPid, _}} ->
case Status of
<<"start">> ->
ReceiverPid ! {docker_events, start};
<<"stop">> ->
ReceiverPid ! {docker_events, stop};
_ ->
ok
end
end;
handle_event(_, _) ->
ok.
try_attach_events(Timeout) ->
erlang:start_timer(Timeout, self(), attach_docker_events).