diff --git a/apps/efka/src/efka_micro_service.erl b/apps/efka/src/efka_micro_service.erl index 934b33b..0a2a34b 100644 --- a/apps/efka/src/efka_micro_service.erl +++ b/apps/efka/src/efka_micro_service.erl @@ -24,8 +24,6 @@ service :: #micro_service{}, %% 通道id信息 channel_pid :: pid() | undefined, - %% 当前服务的状态 - status = 0, %% 当前进程的port信息, OSPid = erlang:port_info(Port, os_pid) port :: undefined | port() }). @@ -82,23 +80,30 @@ init([Service]) -> {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). %% 绑定channel -handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = undefined}) -> - erlang:monitor(process, ChannelPid), - {reply, ok, State#state{channel_pid = ChannelPid}}; - -handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = OldChannelPid, service = Service}) when is_pid(OldChannelPid) -> - lager:notice("[efka_micro_service] service_id: ~p, attach_channel old channel exists: ~p", [ Service#micro_service.service_id, OldChannelPid]), - {reply, {error, <<"channel exists">>}, State#state{channel_pid = ChannelPid}}; +handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = OldChannelPid, service = #micro_service{status = Status}}) -> + case {Status, is_pid(OldChannelPid) andalso is_process_alive(OldChannelPid)} of + {1, false} -> + erlang:monitor(process, ChannelPid), + {reply, ok, State#state{channel_pid = ChannelPid}}; + {1, true} -> + {reply, {error, <<"channel exists">>}, State}; + {0, _} -> + {reply, {error, <<"serivce stopped">>}, State} + end; %% 启动服务 handle_call(start_service, _From, State = #state{service = Service = #micro_service{service_id = ServiceId}}) -> + %% 更新数据库状态 ok = micro_service_model:start_service(ServiceId), - {reply, ok, State#state{service = Service#micro_service{status = 0}}}; + %% 异步启动服务 + erlang:start_timer(0, self(), boot_service), + {reply, ok, State#state{service = Service#micro_service{status = 1}}}; %% 停止服务 -handle_call(stop_service, _From, State = #state{service = Service = #micro_service{service_id = ServiceId}}) -> +handle_call(stop_service, _From, State = #state{port = Port, service = Service = #micro_service{service_id = ServiceId}}) -> ok = micro_service_model:stop_service(ServiceId), - {reply, ok, State#state{service = Service#micro_service{status = 1}}}; + stop_port(Port), + {reply, ok, State#state{port = undefined, service = Service#micro_service{status = 0}}}; handle_call(_Request, _From, State = #state{}) -> {reply, ok, State}. @@ -118,35 +123,16 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({timeout, _, boot_service}, State = #state{service = #micro_service{work_dir = WorkDir0}}) -> - WorkDir = binary_to_list(WorkDir0), - {ok, ManifestInfo} = file:read_file(WorkDir ++ "manifest.json"), - Manifest = jiffy:decode(ManifestInfo, [return_maps]), - - lager:debug("Manifest: ~p", [Manifest]), - case maps:find(<<"exec">>, Manifest) of - error -> - lager:debug("not start command"), - {noreply, State}; - {ok, ExecCmd0} -> - PortSettings = [ - {cd, WorkDir}, - exit_status - ], - PortSettings1 = case maps:find(<<"args">>, Manifest) of - error -> - PortSettings; - {ok, Args} -> - [{args, [binary_to_list(A) || A <- Args]}|PortSettings] - end, - - ExecCmd = binary_to_list(ExecCmd0), - RealExecCmd = filename:absname_join(WorkDir, ExecCmd), - lager:debug("real: ~p", [RealExecCmd]), - - Port = erlang:open_port({spawn_executable, RealExecCmd}, PortSettings1), - lager:debug("port is: ~p", [Port]), - {noreply, State#state{port = Port}} +handle_info({timeout, _, boot_service}, State = #state{service = #micro_service{service_id = ServiceId, status = 0}}) -> + lager:debug("[efka_micro_service] service_id: ~p, is stopped, ignore boot_service", [ServiceId]), + {noreply, State}; +handle_info({timeout, _, boot_service}, State = #state{service = Service = #micro_service{service_id = ServiceId, status = 1}}) -> + case boot_service(Service) of + {ok, Port} -> + {noreply, State#state{port = Port}}; + {error, Reason} -> + lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), + {noreply, State} end; handle_info({Port, {data, Data}}, State = #state{port = Port, service = Service}) -> @@ -166,17 +152,9 @@ handle_info({Port, {exit_status, Code}}, State = #state{port = Port, service = S %% with Reason. The return value is ignored. -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). -terminate(Reason, _State = #state{port = Port}) when is_port(Port) -> - lager:debug("[efka_micro_service] terminate with reason: ~p", [Reason]), - case erlang:port_info(Port, os_pid) of - undefined -> - ok; - {os_pid, OsPid} -> - os:cmd(io_lib:format("kill -9 ~p", [OsPid])) - end, - ok; -terminate(Reason, _State = #state{}) -> +terminate(Reason, _State = #state{port = Port}) -> lager:debug("[efka_micro_service] terminate with reason: ~p", [Reason]), + stop_port(Port), ok. %% @private @@ -190,3 +168,49 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +-spec stop_port(port() | undefined) -> no_return(). +stop_port(undefined) -> + ok; +stop_port(Port) when is_port(Port) -> + case erlang:port_info(Port, os_pid) of + undefined -> + ok; + {os_pid, OsPid} -> + os:cmd(io_lib:format("kill -9 ~p", [OsPid])) + end. + +%% 启动微服务 +-spec boot_service(MicroService :: #micro_service{}) -> {ok, Port :: port()} | {error, Reason :: binary()}. +boot_service(#micro_service{work_dir = WorkDir0, service_id = ServiceId}) -> + WorkDir = binary_to_list(WorkDir0), + case file:read_file(WorkDir ++ "manifest.json") of + {ok, ManifestInfo} -> + Manifest = jiffy:decode(ManifestInfo, [return_maps]), + case maps:find(<<"exec">>, Manifest) of + error -> + {error, <<"manifest `exec` not found">>}; + {ok, ExecCmd0} -> + PortSettings = [ + {cd, WorkDir}, + exit_status + ], + PortSettings1 = case maps:find(<<"args">>, Manifest) of + error -> + PortSettings; + {ok, Args} -> + [{args, [binary_to_list(A) || A <- Args]}|PortSettings] + end, + + ExecCmd = binary_to_list(ExecCmd0), + RealExecCmd = filename:absname_join(WorkDir, ExecCmd), + lager:debug("[efka_micro_service] service: ~p, real_exec: ~p", [ServiceId, RealExecCmd]), + + Port = erlang:open_port({spawn_executable, RealExecCmd}, PortSettings1), + {ok, Port} + + end; + {error, Reason} -> + lager:notice("[efka_micro_service] service: ~p, read manifest.json get error: ~p", [ServiceId, Reason]), + {error, <<"manifest.json not found">>} + end. \ No newline at end of file diff --git a/apps/efka/src/efka_micro_service_sup.erl b/apps/efka/src/efka_micro_service_sup.erl index 361e53e..87fb5c9 100644 --- a/apps/efka/src/efka_micro_service_sup.erl +++ b/apps/efka/src/efka_micro_service_sup.erl @@ -47,7 +47,7 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, - MicroServices = micro_service_model:get_running_services(), + MicroServices = micro_service_model:get_all_services(), Specs = lists:map(fun(Service) -> child_spec(Service) end, MicroServices), Spec1 = child_spec(#micro_service{ diff --git a/apps/efka/src/mnesia/micro_service_model.erl b/apps/efka/src/mnesia/micro_service_model.erl index f44cdbe..1afc194 100644 --- a/apps/efka/src/mnesia/micro_service_model.erl +++ b/apps/efka/src/mnesia/micro_service_model.erl @@ -15,7 +15,7 @@ %% API -export([create_table/0]). --export([insert/1, start_service/1, stop_service/1, get_running_services/0]). +-export([insert/1, start_service/1, stop_service/1, get_all_services/0]). create_table() -> %% id生成器 @@ -66,10 +66,10 @@ start_service(ServiceId) when is_binary(ServiceId) -> {error, Reason} end. --spec get_running_services() -> [#micro_service{}]. -get_running_services() -> +-spec get_all_services() -> [#micro_service{}]. +get_all_services() -> Fun = fun() -> - Q = qlc:q([E || E <- mnesia:table(?TAB), E#micro_service.status =:= 1]), + Q = qlc:q([E || E <- mnesia:table(?TAB)]), qlc:e(Q) end,