From bc8517dc04c67c43a4c8fcbeba172b0eb432e742 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 30 Apr 2025 16:12:35 +0800 Subject: [PATCH] fix --- apps/efka/src/efka_micro_service.erl | 73 +++++++++++++++------------- 1 file changed, 40 insertions(+), 33 deletions(-) diff --git a/apps/efka/src/efka_micro_service.erl b/apps/efka/src/efka_micro_service.erl index f2da91e..888e61a 100644 --- a/apps/efka/src/efka_micro_service.erl +++ b/apps/efka/src/efka_micro_service.erl @@ -24,14 +24,18 @@ %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([test/0]). +-export([test/0, test1/0]). -record(state, { service :: #micro_service{}, %% 通道id信息 channel_pid :: pid() | undefined, + %% 当前进程的port信息, OSPid = erlang:port_info(Port, os_pid) port :: undefined | port(), + %% 系统对应的pid + os_pid :: undefined | integer(), + %% 当前服务的运行状态 running_status = ?STATUS_STOPPED }). @@ -42,7 +46,10 @@ test() -> Pid = get_pid(<<"test1234">>), - stop_service(Pid), + stop_service(Pid). + +test1() -> + Pid = get_pid(<<"test1234">>), start_service(Pid). -spec get_name(ServiceId :: binary()) -> atom(). @@ -86,14 +93,16 @@ init([Service = #micro_service{service_id = ServiceId, status = Status}]) -> true -> case boot_service(Service) of {ok, Port} -> - {ok, #state{service = Service, running_status = ?STATUS_RUNNING, port = Port}}; + {os_pid, OSPid} = port_info(Port, os_pid), + lager:debug("[efka_micro_service] service: ~p, boot_service success os_pid: ~p", [ServiceId, OSPid]), + {ok, #state{service = Service, running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}}; {error, Reason} -> lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), - {ok, #state{service = Service, running_status = ?STATUS_STOPPED}} + {ok, #state{service = Service, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}} end; false -> lager:debug("[efka_micro_service] service: ~p current status is 0, not boot"), - {ok, #state{service = Service, running_status = ?STATUS_STOPPED}} + {ok, #state{service = Service, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}} end. %% @private @@ -122,25 +131,26 @@ handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = Ol handle_call(start_service, _From, State = #state{running_status = ?STATUS_RUNNING}) -> {reply, {error, <<"service is running">>}, State}; handle_call(start_service, _From, State = #state{running_status = ?STATUS_STOPPED, service = Service = #micro_service{service_id = ServiceId}}) -> - %% 更新数据库状态 - micro_service_model:start_service(ServiceId), %% 异步启动服务 case boot_service(Service) of {ok, Port} -> - {reply, ok, State#state{running_status = ?STATUS_RUNNING, port = Port, service = Service#micro_service{status = 1}}}; + %% 更新数据库状态 + micro_service_model:start_service(ServiceId), + {os_pid, OSPid} = port_info(Port, os_pid), + {reply, ok, State#state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid, service = Service#micro_service{status = 1}}}; {error, Reason} -> %% 启动失败不能更新数据库里面的状态 - {reply, {error, Reason}, State#state{running_status = ?STATUS_STOPPED}} + {reply, {error, Reason}, State} end; %% 停止服务, 主动停止的时候会改变服务配置的status字段 handle_call(stop_service, _From, State = #state{running_status = ?STATUS_STOPPED}) -> {reply, {error, <<"service not running">>}, State}; -handle_call(stop_service, _From, State = #state{running_status = ?STATUS_RUNNING, port = Port, service = Service = #micro_service{service_id = ServiceId}}) when is_port(Port) -> +handle_call(stop_service, _From, State = #state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid, service = Service = #micro_service{service_id = ServiceId}}) when is_port(Port) -> micro_service_model:stop_service(ServiceId), - kill_os_pid(Port), - {reply, ok, State#state{port = undefined, running_status = ?STATUS_STOPPED, service = Service#micro_service{status = 0}}}; + kill_os_pid(OSPid), + {reply, ok, State#state{port = undefined, os_pid = undefined, running_status = ?STATUS_STOPPED, service = Service#micro_service{status = 0}}}; handle_call(_Request, _From, State = #state{}) -> {reply, ok, State}. @@ -167,26 +177,28 @@ handle_info({timeout, _, reboot_service}, State = #state{service = #micro_servic handle_info({timeout, _, reboot_service}, State = #state{service = Service = #micro_service{service_id = ServiceId, status = 1}}) -> case boot_service(Service) of {ok, Port} -> - {noreply, State#state{running_status = ?STATUS_RUNNING, port = Port}}; + {os_pid, OSPid} = erlang:port_info(Port, os_pid), + lager:debug("[efka_micro_service] reboot service success: ~p, os_pid: ~p", [ServiceId, OSPid]), + {noreply, State#state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}}; {error, Reason} -> lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), {noreply, State#state{running_status = ?STATUS_STOPPED}} end; -handle_info({Port, {data, Data}}, State = #state{port = Port, service = Service}) -> - lager:debug("[efka_micro_service] service_id: ~p, port data: ~p", [Service#micro_service.service_id, Data]), +handle_info({Port, {data, Data}}, State = #state{port = Port, service = #micro_service{service_id = ServiceId}}) -> + lager:debug("[efka_micro_service] service_id: ~p, port data: ~p", [ServiceId, Data]), {noreply, State}; + %% 处理port的消息, Port的被动关闭会触发;因此这个时候的Port和State.port的值是相等的 -handle_info({Port, {exit_status, Code}}, State = #state{port = Port, service = Service}) -> - lager:debug("[efka_micro_service] service_id: ~p, port exit with code: ~p", [Service#micro_service.service_id, Code]), - % erlang:start_timer(5000, self(), reboot_service), +handle_info({Port, {exit_status, Code}}, State = #state{port = Port, service = #micro_service{service_id = ServiceId}}) -> + lager:debug("[efka_micro_service] service_id: ~p, port exit with code: ~p", [ServiceId, Code]), + erlang:start_timer(5000, self(), reboot_service), + {noreply, State#state{port = undefined, os_pid = undefined, running_status = ?STATUS_STOPPED}}; - {noreply, State#state{port = undefined}}; - -%% 主动关闭的port,不能启动重启的逻辑 -handle_info({Port, {exit_status, Code}}, State = #state{service = Service}) when is_port(Port) -> - lager:debug("[efka_micro_service] service_id: ~p, port close with code: ~p", [Service#micro_service.service_id, Code]), - {noreply, State#state{port = undefined}}. +%% 处理channel进程的退出 +handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service = #micro_service{service_id = ServiceId}}) -> + lager:debug("[efka_micro_service] service: ~p, channel exited: ~p", [ServiceId, Reason]), + {noreply, State#state{channel_pid = undefined}}. %% @private %% @doc This function is called by a gen_server when it is about to @@ -216,15 +228,10 @@ code_change(_OldVsn, State = #state{}, _Extra) -> -spec kill_os_pid(port() | undefined) -> no_return(). kill_os_pid(undefined) -> ok; -kill_os_pid(Port) when is_port(Port) -> - case erlang:port_info(Port, os_pid) of - undefined -> - ok; - {os_pid, OsPid} when is_integer(OsPid) -> - Cmd = lists:flatten(io_lib:format("kill -9 ~p", [OsPid])), - lager:debug("kill cmd is: ~p", [Cmd]), - os:cmd(Cmd) - end. +kill_os_pid(OSPid) when is_integer(OSPid) -> + Cmd = lists:flatten(io_lib:format("kill -9 ~p", [OSPid])), + lager:debug("kill cmd is: ~p", [Cmd]), + os:cmd(Cmd). %% 启动微服务 -spec boot_service(MicroService :: #micro_service{}) -> {ok, Port :: port()} | {error, Reason :: binary()}.