ekfa/apps/efka/src/docker/efka_container.erl
2025-09-16 11:40:25 +08:00

284 lines
12 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%% 1. 需要管理服务的整个生命周期,包括: 启动,停止
%%% 2. 需要监控服务的状态通过port的方式
%%% 3. 服务的启动和关闭,需要在更高的层级控制
%%% @end
%%% Created : 18. 4月 2025 16:50
%%%-------------------------------------------------------------------
-module(efka_container).
-author("anlicheng").
-include("efka_tables.hrl").
-behaviour(gen_server).
%% API
-export([start_link/3]).
-export([get_name/1, get_pid/1, attach_channel/3]).
-export([push_envs/3, invoke/3]).
-export([metric_data/4, send_event/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(state, {
%service_id :: binary(),
container_id :: binary(),
%% 通道id信息
channel_pid :: pid() | undefined,
%% 数据上传的时候,用来管理容器和微服务
meta :: binary(),
%% 当前进程的port信息, OSPid = erlang:port_info(Port, os_pid)
port :: undefined | port(),
%% 系统对应的pid
os_pid :: undefined | integer(),
%% 配置信息
manifest :: undefined | efka_manifest:manifest(),
inflight = #{},
%% 映射关系: #{Ref => Fun}
callbacks = #{}
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec get_name(ContainerId :: binary()) -> atom().
get_name(ContainerId) when is_binary(ContainerId) ->
list_to_atom("efka_container:" ++ binary_to_list(ContainerId)).
-spec get_pid(ContainerId :: binary()) -> undefined | pid().
get_pid(ContainerId) when is_binary(ContainerId) ->
whereis(get_name(ContainerId)).
-spec push_envs(Pid :: pid(), Ref :: reference(), Envs :: binary()) -> no_return().
push_envs(Pid, Ref, Envs) when is_pid(Pid), is_binary(Envs) ->
gen_server:cast(Pid, {push_envs, Ref, self(), Envs}).
-spec invoke(Pid :: pid(), Ref :: reference(), Payload :: binary()) -> no_return().
invoke(Pid, Ref, Payload) when is_pid(Pid), is_reference(Ref), is_binary(Payload) ->
gen_server:cast(Pid, {invoke, Ref, self(), Payload}).
-spec metric_data(Pid :: pid(), DeviceUUID :: binary(), RouteKey :: binary(), Metric :: binary()) -> no_return().
metric_data(Pid, DeviceUUID, RouteKey, Metric) when is_pid(Pid), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) ->
gen_server:cast(Pid, {metric_data, DeviceUUID, RouteKey, Metric}).
-spec send_event(Pid :: pid(), EventType :: integer(), Params :: binary()) -> no_return().
send_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_binary(Params) ->
gen_server:cast(Pid, {send_event, EventType, Params}).
-spec attach_channel(Pid :: pid(), ChannelPid :: pid(), Meta :: binary()) -> ok | {error, Reason :: binary()}.
attach_channel(Pid, ChannelPid, Meta) when is_pid(Pid), is_pid(ChannelPid), is_binary(Meta) ->
gen_server:call(Pid, {attach_channel, ChannelPid, Meta}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Name :: atom(), ContainerId :: binary(), Args :: [binary()]) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(Name, ContainerId, Args) when is_atom(Name), is_binary(ContainerId), is_list(Args) ->
gen_server:start_link({local, Name}, ?MODULE, [ContainerId, Args], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([ContainerId, Args]) ->
%% supervisor进程通过exit(ChildPid, shutdown)调用的时候确保terminate函数被调用
erlang:process_flag(trap_exit, true),
case startup(<<>>, Args) of
{ok, Port} ->
{os_pid, OSPid} = erlang:port_info(Port, os_pid),
lager:debug("[efka_service] service: ~p, port: ~p, boot_service success os_pid: ~p", [ContainerId, Port, OSPid]),
{ok, #state{container_id = ContainerId, port = Port, os_pid = OSPid}};
{error, Reason} ->
lager:debug("[efka_service] service: ~p", [ContainerId]),
{stop, Reason}
end.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 绑定channel
handle_call({attach_channel, ChannelPid, Meta}, _From, State = #state{channel_pid = OldChannelPid, container_id = ContainerId}) ->
case is_pid(OldChannelPid) andalso is_process_alive(OldChannelPid) of
false ->
erlang:monitor(process, ChannelPid),
lager:debug("[efka_service] service_id: ~p, channel attched", [ContainerId]),
{reply, ok, State#state{channel_pid = ChannelPid, meta = Meta}};
true ->
{reply, {error, <<"channel exists">>}, State}
end;
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({metric_data, DeviceUUID, RouteKey, Metric}, State = #state{container_id = ContainerId, meta = Meta}) ->
lager:debug("[efka_service] container_id: ~p, meta: ~p, device_uuid: ~p, route_key: ~p, metric data: ~p",
[ContainerId, Meta, DeviceUUID, RouteKey, Metric]),
%% 这里的数据需要转换成和meta相关的数据, todo container_id的数据是否需要加上
efka_remote_agent:metric_data(ContainerId, Meta, DeviceUUID, RouteKey, Metric),
{noreply, State};
handle_cast({send_event, EventType, Params}, State = #state{container_id = ContainerId, meta = Meta}) ->
efka_remote_agent:event(ContainerId, Meta, EventType, Params),
lager:debug("[efka_service] send_event, container_id: ~p, meta: ~p, event_type: ~p, params: ~p", [ContainerId, Meta, EventType, Params]),
{noreply, State};
%% 推送配置项目
handle_cast({push_envs, Ref, ReceiverPid, Envs}, State = #state{channel_pid = ChannelPid, container_id = ContainerId, inflight = Inflight, callbacks = Callbacks}) ->
case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of
true ->
gen_channel:push_config(ChannelPid, Ref, self(), Envs),
%% 设置成功,需要更新微服务的配置
CB = fun() -> service_model:set_config(ContainerId, Envs) end,
{noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight), callbacks = maps:put(Ref, CB, Callbacks)}};
false ->
ReceiverPid ! {service_reply, Ref, {error, <<"channel is not alive">>}},
{noreply, State}
end;
%% 推送配置项目
handle_cast({invoke, Ref, ReceiverPid, Payload}, State = #state{channel_pid = ChannelPid, inflight = Inflight}) ->
case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of
true ->
gen_channel:invoke(ChannelPid, Ref, self(), Payload),
{noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}};
false ->
ReceiverPid ! {service_reply, Ref, {error, <<"channel is not alive">>}},
{reply, State}
end;
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 重启服务
handle_info({timeout, _, reboot_service}, State = #state{container_id = ContainerId, manifest = Manifest}) ->
case startup(Manifest, []) of
{ok, Port} ->
{os_pid, OSPid} = erlang:port_info(Port, os_pid),
lager:debug("[efka_service] service_id: ~p, reboot success, port: ~p, os_pid: ~p", [ContainerId, Port, OSPid]),
{noreply, State#state{port = Port, os_pid = OSPid}};
{error, Reason} ->
lager:debug("[efka_service] service_id: ~p, boot_service get error: ~p", [ContainerId, Reason]),
try_reboot(),
{noreply, State}
end;
%% 处理channel的回复
handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight, callbacks = Callbacks}) ->
case maps:take(Ref, Inflight) of
error ->
{noreply, State};
{ReceiverPid, NInflight} ->
ReceiverPid ! {service_reply, Ref, Reply},
{noreply, State#state{inflight = NInflight, callbacks = trigger_callback(Ref, Callbacks)}}
end;
handle_info({Port, {data, Data}}, State = #state{container_id = ContainerId}) when is_port(Port) ->
lager:debug("[efka_service] service_id: ~p, port data: ~p", [ContainerId, Data]),
{noreply, State};
%% 处理port的消息, Port的被动关闭会触发因此这个时候的Port和State.port的值是相等的
handle_info({Port, {exit_status, Code}}, State = #state{container_id = ContainerId}) when is_port(Port) ->
lager:debug("[efka_service] service_id: ~p, port: ~p, exit with code: ~p", [ContainerId, Port, Code]),
{noreply, State#state{port = undefined, os_pid = undefined}};
%% 处理port的退出消息
handle_info({'EXIT', Port, Reason}, State = #state{container_id = ContainerId}) when is_port(Port) ->
lager:debug("[efka_service] service_id: ~p, port: ~p, exit with reason: ~p", [ContainerId, Port, Reason]),
try_reboot(),
{noreply, State#state{port = undefined, os_pid = undefined}};
%% 处理channel进程的退出
handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, container_id = ContainerId}) ->
lager:debug("[efka_service] service_id: ~p, channel exited: ~p", [ContainerId, Reason]),
{noreply, State#state{channel_pid = undefined, inflight = #{}}}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, _State = #state{container_id = ContainerId, port = Port, os_pid = OSPid}) ->
erlang:is_port(Port) andalso erlang:port_close(Port),
catch kill_os_pid(OSPid),
lager:debug("[efka_service] service_id: ~p, terminate with reason: ~p", [ContainerId, Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
startup(ExecCmd0, Args0) ->
PortSettings = [
{args, [binary_to_list(A) || A <- Args0]},
exit_status
],
ExecCmd = binary_to_list(ExecCmd0),
case catch erlang:open_port({spawn_executable, ExecCmd}, PortSettings) of
Port when is_port(Port) ->
{ok, Port};
_Other ->
{error, <<"exec command startup failed">>}
end.
%% 关闭系统进程
-spec kill_os_pid(port() | undefined) -> no_return().
kill_os_pid(undefined) ->
ok;
kill_os_pid(OSPid) when is_integer(OSPid) ->
Cmd = lists:flatten(io_lib:format("kill -9 ~p", [OSPid])),
lager:debug("kill cmd is: ~p", [Cmd]),
os:cmd(Cmd).
-spec try_reboot() -> no_return().
try_reboot() ->
erlang:start_timer(5000, self(), reboot_service).
-spec trigger_callback(Ref :: reference(), Callbacks :: map()) -> NewCallbacks :: map().
trigger_callback(Ref, Callbacks) ->
case maps:take(Ref, Callbacks) of
error ->
Callbacks;
{Fun, NCallbacks} ->
catch Fun(),
NCallbacks
end.