diff --git a/apps/efka/src/efka_micro_service.erl b/apps/efka/src/efka_micro_service.erl index 36c8c09..ea91e4d 100644 --- a/apps/efka/src/efka_micro_service.erl +++ b/apps/efka/src/efka_micro_service.erl @@ -24,6 +24,8 @@ %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-export([test/0]). + -record(state, { service :: #micro_service{}, %% 通道id信息 @@ -38,6 +40,10 @@ %%% API %%%=================================================================== +test() -> + Pid = get_pid(<<"test1234">>), + start_service(Pid). + -spec get_name(ServiceId :: binary()) -> atom(). get_name(ServiceId) when is_binary(ServiceId) -> list_to_atom("efka_service:" ++ binary_to_list(ServiceId)). @@ -73,11 +79,21 @@ start_link(Name, Service = #micro_service{}) when is_atom(Name) -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([Service = #micro_service{status = Status}]) -> +init([Service = #micro_service{service_id = ServiceId, status = Status}]) -> %% 数据的状态和运行状态是2回事 - Status == 1 andalso erlang:start_timer(0, self(), boot_service), - - {ok, #state{service = Service, running_status = ?STATUS_STOPPED}}. + case Status == 1 of + true -> + case boot_service(Service) of + {ok, Port} -> + {ok, #state{service = Service, running_status = ?STATUS_RUNNING, port = Port}}; + {error, Reason} -> + lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), + {ok, #state{service = Service, running_status = ?STATUS_STOPPED}} + end; + false -> + lager:debug("[efka_micro_service] service: ~p current status is 0, not boot"), + {ok, #state{service = Service, running_status = ?STATUS_STOPPED}} + end. %% @private %% @doc Handling call messages @@ -101,13 +117,12 @@ handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = Ol {reply, {error, <<"serivce stopped">>}, State} end; -%% 启动服务: 当前服务如果正常运行,则重启 -handle_call(start_service, _From, State = #state{running_status = RunningStatus, port = Port, service = Service = #micro_service{service_id = ServiceId}}) -> - %% 运行中的服务,先停止 - RunningStatus == ?STATUS_RUNNING andalso stop_port(Port), - +%% 启动服务: 当前服务如果正常运行,则不允许重启 +handle_call(start_service, _From, State = #state{running_status = ?STATUS_RUNNING}) -> + {reply, {error, <<"service is running">>}, State}; +handle_call(start_service, _From, State = #state{running_status = ?STATUS_STOPPED, service = Service = #micro_service{service_id = ServiceId}}) -> %% 更新数据库状态 - ok = micro_service_model:start_service(ServiceId), + micro_service_model:start_service(ServiceId), %% 异步启动服务 case boot_service(Service) of {ok, Port} -> @@ -117,14 +132,14 @@ handle_call(start_service, _From, State = #state{running_status = RunningStatus, {reply, {error, Reason}, State#state{running_status = ?STATUS_STOPPED}} end; -%% 停止服务 +%% 停止服务, 主动停止的时候会改变服务配置的status字段 handle_call(stop_service, _From, State = #state{running_status = RunningStatus, port = Port, service = Service = #micro_service{service_id = ServiceId}}) -> case RunningStatus of ?STATUS_STOPPED -> {reply, {error, <<"service not running">>}, State}; ?STATUS_RUNNING -> ok = micro_service_model:stop_service(ServiceId), - stop_port(Port), + kill_os_pid(Port), {reply, ok, State#state{port = undefined, service = Service#micro_service{status = 0}}} end; @@ -146,10 +161,11 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({timeout, _, boot_service}, State = #state{service = #micro_service{service_id = ServiceId, status = 0}}) -> +%% 重启服务 +handle_info({timeout, _, reboot_service}, State = #state{service = #micro_service{service_id = ServiceId, status = 0}}) -> lager:debug("[efka_micro_service] service_id: ~p, is stopped, ignore boot_service", [ServiceId]), {noreply, State}; -handle_info({timeout, _, boot_service}, State = #state{service = Service = #micro_service{service_id = ServiceId, status = 1}}) -> +handle_info({timeout, _, reboot_service}, State = #state{service = Service = #micro_service{service_id = ServiceId, status = 1}}) -> case boot_service(Service) of {ok, Port} -> {noreply, State#state{running_status = ?STATUS_RUNNING, port = Port}}; @@ -164,7 +180,7 @@ handle_info({Port, {data, Data}}, State = #state{port = Port, service = Service} %% 处理port的消息, Port的被动关闭会触发;因此这个时候的Port和State.port的值是相等的 handle_info({Port, {exit_status, Code}}, State = #state{port = Port, service = Service}) -> lager:debug("[efka_micro_service] service_id: ~p, port exit with code: ~p", [Service#micro_service.service_id, Code]), - erlang:start_timer(5000, self(), boot_service), + erlang:start_timer(5000, self(), reboot_service), {noreply, State#state{port = undefined}}; @@ -182,7 +198,7 @@ handle_info({Port, {exit_status, Code}}, State = #state{service = Service}) when State :: #state{}) -> term()). terminate(Reason, _State = #state{port = Port}) -> lager:debug("[efka_micro_service] terminate with reason: ~p", [Reason]), - stop_port(Port), + kill_os_pid(Port), ok. %% @private @@ -197,15 +213,18 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== --spec stop_port(port() | undefined) -> no_return(). -stop_port(undefined) -> +%% 关闭系统进程 +-spec kill_os_pid(port() | undefined) -> no_return(). +kill_os_pid(undefined) -> ok; -stop_port(Port) when is_port(Port) -> +kill_os_pid(Port) when is_port(Port) -> case erlang:port_info(Port, os_pid) of undefined -> ok; - {os_pid, OsPid} -> - os:cmd(io_lib:format("kill -9 ~p", [OsPid])) + {os_pid, OsPid} when is_integer(OsPid) -> + Cmd = lists:flatten(io_lib:format("kill -9 ~p", [OsPid])), + lager:debug("kill cmd is: ~p", [Cmd]), + os:cmd(Cmd) end. %% 启动微服务