diff --git a/apps/efka/src/efka_micro_service.erl b/apps/efka/src/efka_micro_service.erl index 0a2a34b..36c8c09 100644 --- a/apps/efka/src/efka_micro_service.erl +++ b/apps/efka/src/efka_micro_service.erl @@ -13,6 +13,10 @@ -behaviour(gen_server). +%% 当前微服务的状态 +-define(STATUS_STOPPED, stopped). +-define(STATUS_RUNNING, running). + %% API -export([start_link/2]). -export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]). @@ -25,7 +29,9 @@ %% 通道id信息 channel_pid :: pid() | undefined, %% 当前进程的port信息, OSPid = erlang:port_info(Port, os_pid) - port :: undefined | port() + port :: undefined | port(), + %% 当前服务的运行状态 + running_status = ?STATUS_STOPPED }). %%%=================================================================== @@ -44,9 +50,11 @@ get_pid(ServiceId) when is_binary(ServiceId) -> attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) -> gen_server:call(Pid, {attach_channel, ChannelPid}). +-spec start_service(Pid :: pid()) -> ok | {error, Reason :: binary()}. start_service(Pid) when is_pid(Pid) -> gen_server:call(Pid, start_service). +-spec stop_service(Pid :: pid()) -> ok | {error, Reason :: binary()}. stop_service(Pid) when is_pid(Pid) -> gen_server:call(Pid, stop_service). @@ -65,9 +73,11 @@ start_link(Name, Service = #micro_service{}) when is_atom(Name) -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([Service]) -> - erlang:start_timer(0, self(), boot_service), - {ok, #state{service = Service}}. +init([Service = #micro_service{status = Status}]) -> + %% 数据的状态和运行状态是2回事 + Status == 1 andalso erlang:start_timer(0, self(), boot_service), + + {ok, #state{service = Service, running_status = ?STATUS_STOPPED}}. %% @private %% @doc Handling call messages @@ -91,19 +101,32 @@ handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = Ol {reply, {error, <<"serivce stopped">>}, State} end; -%% 启动服务 -handle_call(start_service, _From, State = #state{service = Service = #micro_service{service_id = ServiceId}}) -> +%% 启动服务: 当前服务如果正常运行,则重启 +handle_call(start_service, _From, State = #state{running_status = RunningStatus, port = Port, service = Service = #micro_service{service_id = ServiceId}}) -> + %% 运行中的服务,先停止 + RunningStatus == ?STATUS_RUNNING andalso stop_port(Port), + %% 更新数据库状态 ok = micro_service_model:start_service(ServiceId), %% 异步启动服务 - erlang:start_timer(0, self(), boot_service), - {reply, ok, State#state{service = Service#micro_service{status = 1}}}; + case boot_service(Service) of + {ok, Port} -> + {reply, ok, State#state{running_status = ?STATUS_RUNNING, port = Port, service = Service#micro_service{status = 1}}}; + {error, Reason} -> + %% 启动失败不能更新数据库里面的状态 + {reply, {error, Reason}, State#state{running_status = ?STATUS_STOPPED}} + end; %% 停止服务 -handle_call(stop_service, _From, State = #state{port = Port, service = Service = #micro_service{service_id = ServiceId}}) -> - ok = micro_service_model:stop_service(ServiceId), - stop_port(Port), - {reply, ok, State#state{port = undefined, service = Service#micro_service{status = 0}}}; +handle_call(stop_service, _From, State = #state{running_status = RunningStatus, port = Port, service = Service = #micro_service{service_id = ServiceId}}) -> + case RunningStatus of + ?STATUS_STOPPED -> + {reply, {error, <<"service not running">>}, State}; + ?STATUS_RUNNING -> + ok = micro_service_model:stop_service(ServiceId), + stop_port(Port), + {reply, ok, State#state{port = undefined, service = Service#micro_service{status = 0}}} + end; handle_call(_Request, _From, State = #state{}) -> {reply, ok, State}. @@ -129,20 +152,25 @@ handle_info({timeout, _, boot_service}, State = #state{service = #micro_service{ handle_info({timeout, _, boot_service}, State = #state{service = Service = #micro_service{service_id = ServiceId, status = 1}}) -> case boot_service(Service) of {ok, Port} -> - {noreply, State#state{port = Port}}; + {noreply, State#state{running_status = ?STATUS_RUNNING, port = Port}}; {error, Reason} -> lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), - {noreply, State} + {noreply, State#state{running_status = ?STATUS_STOPPED}} end; handle_info({Port, {data, Data}}, State = #state{port = Port, service = Service}) -> lager:debug("[efka_micro_service] service_id: ~p, port data: ~p", [Service#micro_service.service_id, Data]), {noreply, State}; -%% 处理port的消息 +%% 处理port的消息, Port的被动关闭会触发;因此这个时候的Port和State.port的值是相等的 handle_info({Port, {exit_status, Code}}, State = #state{port = Port, service = Service}) -> lager:debug("[efka_micro_service] service_id: ~p, port exit with code: ~p", [Service#micro_service.service_id, Code]), erlang:start_timer(5000, self(), boot_service), + {noreply, State#state{port = undefined}}; + +%% 主动关闭的port,不能启动重启的逻辑 +handle_info({Port, {exit_status, Code}}, State = #state{service = Service}) when is_port(Port) -> + lager:debug("[efka_micro_service] service_id: ~p, port close with code: ~p", [Service#micro_service.service_id, Code]), {noreply, State#state{port = undefined}}. %% @private