diff --git a/apps/efka/src/docker/efka_container.erl b/apps/efka/src/docker/efka_container.erl deleted file mode 100644 index 4805fb3..0000000 --- a/apps/efka/src/docker/efka_container.erl +++ /dev/null @@ -1,214 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author anlicheng -%%% @copyright (C) 2025, -%%% @doc -%%% 1. 需要管理服务的整个生命周期,包括: 启动,停止 -%%% 2. 需要监控服务的状态,通过port的方式 -%%% 3. 服务的启动和关闭,需要在更高的层级控制 -%%% @end -%%% Created : 18. 4月 2025 16:50 -%%%------------------------------------------------------------------- --module(efka_container). --author("anlicheng"). --include("efka_tables.hrl"). - --behaviour(gen_server). - --define(STATUS_RUNNING, running). --define(STATUS_STOPPED, stopped). - -%% API --export([start_link/2]). --export([get_name/1, get_pid/1, attach_channel/3]). --export([invoke/3]). --export([metric_data/4, send_event/3]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --record(state, { - container_id :: binary(), - %% 通道id信息 - channel_pid :: pid() | undefined, - %% 数据上传的时候,用来管理容器和微服务 - meta_tag :: binary(), - - inflight = #{}, - %% 容器的运行状态 - status = ?STATUS_STOPPED, - - %% 映射关系: #{Ref => Fun} - callbacks = #{} -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - --spec get_name(ContainerId :: binary()) -> atom(). -get_name(ContainerId) when is_binary(ContainerId) -> - list_to_atom("efka_container:" ++ binary_to_list(ContainerId)). - --spec get_pid(ContainerId :: binary()) -> undefined | pid(). -get_pid(ContainerId) when is_binary(ContainerId) -> - whereis(get_name(ContainerId)). - --spec invoke(Pid :: pid(), Ref :: reference(), Payload :: binary()) -> no_return(). -invoke(Pid, Ref, Payload) when is_pid(Pid), is_reference(Ref), is_binary(Payload) -> - gen_server:cast(Pid, {invoke, Ref, self(), Payload}). - --spec metric_data(Pid :: pid(), DeviceUUID :: binary(), RouteKey :: binary(), Metric :: binary()) -> no_return(). -metric_data(Pid, DeviceUUID, RouteKey, Metric) when is_pid(Pid), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) -> - gen_server:cast(Pid, {metric_data, DeviceUUID, RouteKey, Metric}). - --spec send_event(Pid :: pid(), EventType :: integer(), Params :: binary()) -> no_return(). -send_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_binary(Params) -> - gen_server:cast(Pid, {send_event, EventType, Params}). - --spec attach_channel(Pid :: pid(), ChannelPid :: pid(), MetaTag :: binary()) -> ok | {error, Reason :: binary()}. -attach_channel(Pid, ChannelPid, MetaTag) when is_pid(Pid), is_pid(ChannelPid), is_binary(MetaTag) -> - gen_server:call(Pid, {attach_channel, ChannelPid, MetaTag}). - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link(Name :: atom(), ContainerId :: binary()) -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(Name, ContainerId) when is_atom(Name), is_binary(ContainerId) -> - gen_server:start_link({local, Name}, ?MODULE, [ContainerId], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([ContainerId]) -> - %% supervisor进程通过exit(ChildPid, shutdown)调用的时候,确保terminate函数被调用 - erlang:process_flag(trap_exit, true), - case efka_docker_command:is_container_running(ContainerId) of - true -> - efka_docker_events:monitor_container(self(), ContainerId), - {ok, #state{container_id = ContainerId, status = ?STATUS_RUNNING}}; - false -> - efka_docker_events:monitor_container(self(), ContainerId), - {ok, #state{container_id = ContainerId, status = ?STATUS_STOPPED}} - end. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -%% 绑定channel -handle_call({attach_channel, ChannelPid, MetaTag}, _From, State = #state{channel_pid = OldChannelPid, container_id = ContainerId}) -> - case is_pid(OldChannelPid) andalso is_process_alive(OldChannelPid) of - false -> - erlang:monitor(process, ChannelPid), - lager:debug("[efka_service] service_id: ~p, channel attched", [ContainerId]), - {reply, ok, State#state{channel_pid = ChannelPid, meta_tag = MetaTag}}; - true -> - {reply, {error, <<"channel exists">>}, State} - end; - -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({metric_data, DeviceUUID, RouteKey, Metric}, State = #state{container_id = ContainerId, meta_tag = MetaTag}) -> - lager:debug("[efka_service] container_id: ~p, meta: ~p, device_uuid: ~p, route_key: ~p, metric data: ~p", - [ContainerId, MetaTag, DeviceUUID, RouteKey, Metric]), - %% 这里的数据需要转换成和meta相关的数据 - efka_remote_agent:metric_data(MetaTag, DeviceUUID, RouteKey, Metric), - {noreply, State}; - -handle_cast({send_event, EventType, Params}, State = #state{container_id = ContainerId, meta_tag = MetaTag}) -> - efka_remote_agent:event(MetaTag, EventType, Params), - lager:debug("[efka_service] send_event, container_id: ~p, meta: ~p, event_type: ~p, params: ~p", [ContainerId, MetaTag, EventType, Params]), - {noreply, State}; - -%% 推送配置项目 -handle_cast({invoke, Ref, ReceiverPid, Payload}, State = #state{channel_pid = ChannelPid, inflight = Inflight}) -> - case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of - true -> - ws_channel:invoke(ChannelPid, Ref, self(), Payload), - {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}}; - false -> - ReceiverPid ! {service_reply, Ref, {error, <<"channel is not alive">>}}, - {reply, State} - end; - -handle_cast(_Request, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -%% 处理channel的回复 -handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight, callbacks = Callbacks}) -> - case maps:take(Ref, Inflight) of - error -> - {noreply, State}; - {ReceiverPid, NInflight} -> - ReceiverPid ! {service_reply, Ref, Reply}, - - {noreply, State#state{inflight = NInflight, callbacks = trigger_callback(Ref, Callbacks)}} - end; - -handle_info({docker_events, start}, State) -> - {noreply, State#state{status = ?STATUS_RUNNING}}; - -handle_info({docker_events, stop}, State) -> - {noreply, State#state{status = ?STATUS_STOPPED}}; - -%% 处理channel进程的退出 -handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, container_id = ContainerId}) -> - lager:debug("[efka_service] service_id: ~p, channel exited: ~p", [ContainerId, Reason]), - {noreply, State#state{channel_pid = undefined, inflight = #{}}}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(Reason, _State = #state{container_id = ContainerId}) -> - lager:debug("[efka_service] service_id: ~p, terminate with reason: ~p", [ContainerId, Reason]), - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - --spec trigger_callback(Ref :: reference(), Callbacks :: map()) -> NewCallbacks :: map(). -trigger_callback(Ref, Callbacks) -> - case maps:take(Ref, Callbacks) of - error -> - Callbacks; - {Fun, NCallbacks} -> - catch Fun(), - NCallbacks - end. \ No newline at end of file diff --git a/apps/efka/src/efka_service.erl b/apps/efka/src/efka_service.erl new file mode 100644 index 0000000..a26a4b2 --- /dev/null +++ b/apps/efka/src/efka_service.erl @@ -0,0 +1,291 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% 1. 需要管理服务的整个生命周期,包括: 启动,停止 +%%% 2. 需要监控服务的状态,通过port的方式 +%%% 3. 服务的启动和关闭,需要在更高的层级控制 +%%% @end +%%% Created : 18. 4月 2025 16:50 +%%%------------------------------------------------------------------- +-module(efka_service). +-author("anlicheng"). +-include("efka_tables.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/2]). +-export([get_name/1, get_pid/1, attach_channel/2]). +-export([push_config/3, request_config/1, invoke/3]). +-export([metric_data/4, send_event/3]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-record(state, { + service_id :: binary(), + %% 通道id信息 + channel_pid :: pid() | undefined, + %% 当前进程的port信息, OSPid = erlang:port_info(Port, os_pid) + port :: undefined | port(), + %% 系统对应的pid + os_pid :: undefined | integer(), + %% 配置信息 + manifest :: undefined | efka_manifest:manifest(), + inflight = #{}, + + %% 映射关系: #{Ref => Fun} + callbacks = #{} +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec get_name(ServiceId :: binary()) -> atom(). +get_name(ServiceId) when is_binary(ServiceId) -> + list_to_atom("efka_service:" ++ binary_to_list(ServiceId)). + +-spec get_pid(ServiceId :: binary()) -> undefined | pid(). +get_pid(ServiceId) when is_binary(ServiceId) -> + whereis(get_name(ServiceId)). + +-spec push_config(Pid :: pid(), Ref :: reference(), ConfigJson :: binary()) -> no_return(). +push_config(Pid, Ref, ConfigJson) when is_pid(Pid), is_binary(ConfigJson) -> + gen_server:cast(Pid, {push_config, Ref, self(), ConfigJson}). + +-spec invoke(Pid :: pid(), Ref :: reference(), Payload :: binary()) -> no_return(). +invoke(Pid, Ref, Payload) when is_pid(Pid), is_reference(Ref), is_binary(Payload) -> + gen_server:cast(Pid, {invoke, Ref, self(), Payload}). + +-spec request_config(Pid :: pid()) -> {ok, Config :: binary()}. +request_config(Pid) when is_pid(Pid) -> + gen_server:call(Pid, request_config). + +-spec metric_data(Pid :: pid(), DeviceUUID :: binary(), RouteKey :: binary(), Metric :: binary()) -> no_return(). +metric_data(Pid, DeviceUUID, RouteKey, Metric) when is_pid(Pid), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) -> + gen_server:cast(Pid, {metric_data, DeviceUUID, RouteKey, Metric}). + +-spec send_event(Pid :: pid(), EventType :: integer(), Params :: binary()) -> no_return(). +send_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_binary(Params) -> + gen_server:cast(Pid, {send_event, EventType, Params}). + +-spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()}. +attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) -> + gen_server:call(Pid, {attach_channel, ChannelPid}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(Name :: atom(), Service :: binary()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(Name, ServiceId) when is_atom(Name), is_binary(ServiceId) -> + gen_server:start_link({local, Name}, ?MODULE, [ServiceId], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([ServiceId]) -> + %% supervisor进程通过exit(ChildPid, shutdown)调用的时候,确保terminate函数被调用 + erlang:process_flag(trap_exit, true), + case service_model:get_service(ServiceId) of + error -> + lager:notice("[efka_service] service_id: ~p, not found", [ServiceId]), + ignore; + {ok, #service{root_dir = RootDir}} -> + %% 第一次启动,要求必须成功;只有第一次启动成功,后续的重启逻辑才有意义 + case efka_manifest:new(RootDir) of + {ok, Manifest} -> + case efka_manifest:startup(Manifest) of + {ok, Port} -> + {os_pid, OSPid} = erlang:port_info(Port, os_pid), + lager:debug("[efka_service] service: ~p, port: ~p, boot_service success os_pid: ~p", [ServiceId, Port, OSPid]), + {ok, #state{service_id = ServiceId, manifest = Manifest, port = Port, os_pid = OSPid}}; + {error, Reason} -> + lager:debug("[efka_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), + {stop, Reason} + end; + {error, Reason} -> + lager:notice("[efka_service] service: ~p, read manifest.json get error: ~p", [ServiceId, Reason]), + ignore + end + end. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +%% 绑定channel +handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = OldChannelPid, service_id = ServiceId}) -> + case is_pid(OldChannelPid) andalso is_process_alive(OldChannelPid) of + false -> + erlang:monitor(process, ChannelPid), + lager:debug("[efka_service] service_id: ~p, channel attched", [ServiceId]), + {reply, ok, State#state{channel_pid = ChannelPid}}; + true -> + {reply, {error, <<"channel exists">>}, State} + end; + +%% 请求参数项 done +handle_call(request_config, _From, State = #state{service_id = ServiceId}) -> + case service_model:get_config_json(ServiceId) of + {ok, ConfigJson} -> + {reply, {ok, ConfigJson}, State}; + error -> + {reply, {ok, <<>>}, State} + end; + +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({metric_data, DeviceUUID, RouteKey, Metric}, State = #state{service_id = ServiceId}) -> + lager:debug("[efka_service] metric_data service_id: ~p, device_uuid: ~p, route_key: ~p, metric data: ~p", [ServiceId, DeviceUUID, RouteKey, Metric]), + efka_remote_agent:metric_data(ServiceId, DeviceUUID, RouteKey, Metric), + {noreply, State}; + +handle_cast({send_event, EventType, Params}, State = #state{service_id = ServiceId}) -> + efka_remote_agent:event(ServiceId, EventType, Params), + lager:debug("[efka_service] send_event, service_id: ~p, event_type: ~p, params: ~p", [ServiceId, EventType, Params]), + {noreply, State}; + +%% 推送配置项目 +handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{channel_pid = ChannelPid, service_id = ServiceId, inflight = Inflight, callbacks = Callbacks}) -> + case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of + true -> + gen_channel:push_config(ChannelPid, Ref, self(), ConfigJson), + %% 设置成功,需要更新微服务的配置 + CB = fun() -> service_model:set_config(ServiceId, ConfigJson) end, + {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight), callbacks = maps:put(Ref, CB, Callbacks)}}; + false -> + ReceiverPid ! {service_reply, Ref, {error, <<"channel is not alive">>}}, + {noreply, State} + end; + +%% 推送配置项目 +handle_cast({invoke, Ref, ReceiverPid, Payload}, State = #state{channel_pid = ChannelPid, inflight = Inflight}) -> + case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of + true -> + gen_channel:invoke(ChannelPid, Ref, self(), Payload), + {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}}; + false -> + ReceiverPid ! {service_reply, Ref, {error, <<"channel is not alive">>}}, + {reply, State} + end; + +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +%% 重启服务 +handle_info({timeout, _, reboot_service}, State = #state{service_id = ServiceId, manifest = Manifest}) -> + case efka_manifest:startup(Manifest) of + {ok, Port} -> + {os_pid, OSPid} = erlang:port_info(Port, os_pid), + lager:debug("[efka_service] service_id: ~p, reboot success, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]), + {noreply, State#state{port = Port, os_pid = OSPid}}; + {error, Reason} -> + lager:debug("[efka_service] service_id: ~p, boot_service get error: ~p", [ServiceId, Reason]), + try_reboot(), + {noreply, State} + end; + +%% 处理channel的回复 +handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight, callbacks = Callbacks}) -> + case maps:take(Ref, Inflight) of + error -> + {noreply, State}; + {ReceiverPid, NInflight} -> + ReceiverPid ! {service_reply, Ref, Reply}, + + {noreply, State#state{inflight = NInflight, callbacks = trigger_callback(Ref, Callbacks)}} + end; + +handle_info({Port, {data, Data}}, State = #state{service_id = ServiceId}) when is_port(Port) -> + lager:debug("[efka_service] service_id: ~p, port data: ~p", [ServiceId, Data]), + {noreply, State}; + +%% 处理port的消息, Port的被动关闭会触发;因此这个时候的Port和State.port的值是相等的 +handle_info({Port, {exit_status, Code}}, State = #state{service_id = ServiceId}) when is_port(Port) -> + lager:debug("[efka_service] service_id: ~p, port: ~p, exit with code: ~p", [ServiceId, Port, Code]), + {noreply, State#state{port = undefined, os_pid = undefined}}; + +%% 处理port的退出消息 +handle_info({'EXIT', Port, Reason}, State = #state{service_id = ServiceId}) when is_port(Port) -> + lager:debug("[efka_service] service_id: ~p, port: ~p, exit with reason: ~p", [ServiceId, Port, Reason]), + try_reboot(), + {noreply, State#state{port = undefined, os_pid = undefined}}; + +%% 处理channel进程的退出 +handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service_id = ServiceId}) -> + lager:debug("[efka_service] service_id: ~p, channel exited: ~p", [ServiceId, Reason]), + {noreply, State#state{channel_pid = undefined, inflight = #{}}}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(Reason, _State = #state{service_id = ServiceId, port = Port, os_pid = OSPid}) -> + erlang:is_port(Port) andalso erlang:port_close(Port), + catch kill_os_pid(OSPid), + lager:debug("[efka_service] service_id: ~p, terminate with reason: ~p", [ServiceId, Reason]), + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% 关闭系统进程 +-spec kill_os_pid(port() | undefined) -> no_return(). +kill_os_pid(undefined) -> + ok; +kill_os_pid(OSPid) when is_integer(OSPid) -> + Cmd = lists:flatten(io_lib:format("kill -9 ~p", [OSPid])), + lager:debug("kill cmd is: ~p", [Cmd]), + os:cmd(Cmd). + +-spec try_reboot() -> no_return(). +try_reboot() -> + erlang:start_timer(5000, self(), reboot_service). + +-spec trigger_callback(Ref :: reference(), Callbacks :: map()) -> NewCallbacks :: map(). +trigger_callback(Ref, Callbacks) -> + case maps:take(Ref, Callbacks) of + error -> + Callbacks; + {Fun, NCallbacks} -> + catch Fun(), + NCallbacks + end. \ No newline at end of file diff --git a/apps/efka/src/docker/efka_container_sup.erl b/apps/efka/src/efka_service_sup.erl similarity index 67% rename from apps/efka/src/docker/efka_container_sup.erl rename to apps/efka/src/efka_service_sup.erl index 12f3134..a516296 100644 --- a/apps/efka/src/docker/efka_container_sup.erl +++ b/apps/efka/src/efka_service_sup.erl @@ -6,7 +6,7 @@ %%% @end %%% Created : 18. 4月 2025 16:42 %%%------------------------------------------------------------------- --module(efka_container_sup). +-module(efka_service_sup). -author("anlicheng"). -include("efka_tables.hrl"). @@ -14,7 +14,7 @@ %% API -export([start_link/0]). --export([start_container/1, stop_container/1]). +-export([start_service/1, stop_service/1]). %% Supervisor callbacks -export([init/1]). @@ -42,11 +42,11 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, %% 简化逻辑,只启动需要运行的微服务 - %{ok, Services} = service_model:get_running_services(), - %ServiceIds = lists:map(fun(#service{service_id = ServiceId}) -> ServiceId end, Services), - %lager:debug("[efka_service_sup] will start services: ~p", [ServiceIds]), - %Specs = lists:map(fun(ServiceId) -> child_spec(ServiceId) end, Services), - Specs = [], + {ok, Services} = service_model:get_running_services(), + ServiceIds = lists:map(fun(#service{service_id = ServiceId}) -> ServiceId end, Services), + lager:debug("[efka_service_sup] will start services: ~p", [ServiceIds]), + + Specs = lists:map(fun(ServiceId) -> child_spec(ServiceId) end, Services), {ok, {SupFlags, Specs}}. @@ -54,9 +54,9 @@ init([]) -> %%% Internal functions %%%=================================================================== --spec start_container(ContainerId :: binary()) -> {ok, Pid :: pid()} | {error, Reason :: any()}. -start_container(ContainerId) when is_binary(ContainerId) -> - case supervisor:start_child(?MODULE, child_spec(ContainerId)) of +-spec start_service(ServiceId :: binary()) -> {ok, Pid :: pid()} | {error, Reason :: any()}. +start_service(ServiceId) when is_binary(ServiceId) -> + case supervisor:start_child(?MODULE, child_spec(ServiceId)) of {ok, Pid} when is_pid(Pid) -> {ok, Pid}; {error, {'already_started', Pid}} when is_pid(Pid) -> @@ -65,21 +65,21 @@ start_container(ContainerId) when is_binary(ContainerId) -> {error, Error} end. --spec stop_container(ContainerId :: binary()) -> ok. -stop_container(ContainerId) when is_binary(ContainerId) -> - ChildId = efka_container:get_name(ContainerId), +-spec stop_service(ServiceId :: binary()) -> ok. +stop_service(ServiceId) when is_binary(ServiceId) -> + ChildId = efka_service:get_name(ServiceId), supervisor:terminate_child(?MODULE, ChildId), supervisor:delete_child(?MODULE, ChildId). child_spec(#service{service_id = ServiceId}) when is_binary(ServiceId) -> child_spec(ServiceId); -child_spec(ContainerId) when is_binary(ContainerId) -> - Name = efka_service:get_name(ContainerId), +child_spec(ServiceId) when is_binary(ServiceId) -> + Name = efka_service:get_name(ServiceId), #{ id => Name, - start => {efka_container, start_link, [Name, ContainerId]}, + start => {efka_service, start_link, [Name, ServiceId]}, restart => permanent, shutdown => 5000, type => worker, - modules => ['efka_container'] + modules => ['efka_service'] }. \ No newline at end of file