ekfa/apps/efka/src/efka_service.erl
2025-05-08 11:21:10 +08:00

320 lines
14 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%% 1. 需要管理服务的整个生命周期,包括: 启动,停止
%%% 2. 需要监控服务的状态通过port的方式
%%% @end
%%% Created : 18. 4月 2025 16:50
%%%-------------------------------------------------------------------
-module(efka_service).
-author("anlicheng").
-include("efka_tables.hrl").
-behaviour(gen_server).
%% 当前微服务的状态
-define(STATUS_STOPPED, stopped).
-define(STATUS_RUNNING, running).
%% API
-export([start_link/2]).
-export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]).
-export([push_config/3, request_config/1]).
-export([metric_data/3, send_event/3, send_ai_event/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-export([test/0, test1/0]).
-record(state, {
service_id :: binary(),
%% 通道id信息
channel_pid :: pid() | undefined,
%% 当前进程的port信息, OSPid = erlang:port_info(Port, os_pid)
port :: undefined | port(),
%% 系统对应的pid
os_pid :: undefined | integer(),
%% 配置信息
manifest :: undefined | efka_manifest:manifest(),
inflight = #{},
%% 当前服务的运行状态
running_status = ?STATUS_STOPPED
}).
%%%===================================================================
%%% API
%%%===================================================================
test() ->
Pid = get_pid(<<"test1234">>),
stop_service(Pid).
test1() ->
Pid = get_pid(<<"test1234">>),
start_service(Pid).
-spec get_name(ServiceId :: binary()) -> atom().
get_name(ServiceId) when is_binary(ServiceId) ->
list_to_atom("efka_service:" ++ binary_to_list(ServiceId)).
-spec get_pid(ServiceId :: binary()) -> undefined | pid().
get_pid(ServiceId) when is_binary(ServiceId) ->
whereis(get_name(ServiceId)).
push_config(Pid, Ref, ConfigJson) when is_pid(Pid), is_binary(ConfigJson) ->
gen_server:cast(Pid, {push_config, Ref, self(), ConfigJson}).
request_config(Pid) when is_pid(Pid) ->
gen_server:call(Pid, request_config).
metric_data(Pid, DeviceUUID, Data) when is_pid(Pid), is_binary(DeviceUUID), is_binary(Data) ->
gen_server:cast(Pid, {metric_data, DeviceUUID, Data}).
send_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_binary(Params) ->
gen_server:cast(Pid, {send_event, EventType, Params}).
send_ai_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_binary(Params) ->
gen_server:cast(Pid, {send_ai_event, EventType, Params}).
-spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()}.
attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) ->
gen_server:call(Pid, {attach_channel, ChannelPid}).
-spec start_service(Pid :: pid()) -> ok | {error, Reason :: binary()}.
start_service(Pid) when is_pid(Pid) ->
gen_server:call(Pid, start_service).
-spec stop_service(Pid :: pid()) -> ok | {error, Reason :: binary()}.
stop_service(Pid) when is_pid(Pid) ->
gen_server:call(Pid, stop_service).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Name :: atom(), Service :: binary()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(Name, ServiceId) when is_atom(Name), is_binary(ServiceId) ->
gen_server:start_link({local, Name}, ?MODULE, [ServiceId], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([ServiceId]) ->
case service_model:get_service(ServiceId) of
error ->
lager:notice("[efka_micro_service] service_id: ~p, not found", [ServiceId]),
ignore;
{ok, Service = #micro_service{root_dir = RootDir}} ->
case efka_manifest:new(RootDir) of
{ok, Manifest} ->
init0(Service, Manifest);
{error, Reason} ->
lager:notice("[efka_micro_service] service: ~p, read manifest.json get error: ~p", [ServiceId, Reason]),
ignore
end
end.
init0(#micro_service{service_id = ServiceId, status = 1}, Manifest) ->
%% 数据的状态和运行状态是2回事
case efka_manifest:startup(Manifest) of
{ok, Port} ->
{os_pid, OSPid} = erlang:port_info(Port, os_pid),
lager:debug("[efka_micro_service] service: ~p, port: ~p, boot_service success os_pid: ~p", [ServiceId, Port, OSPid]),
{ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}};
{error, Reason} ->
lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]),
{ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}}
end;
init0(#micro_service{service_id = ServiceId, status = 0}, Manifest) ->
lager:debug("[efka_micro_service] service: ~p current status is 0, not boot"),
{ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 绑定channel
handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = OldChannelPid, service_id = ServiceId}) ->
Status = service_model:get_status(ServiceId),
case {Status, is_pid(OldChannelPid) andalso is_process_alive(OldChannelPid)} of
{1, false} ->
erlang:monitor(process, ChannelPid),
{reply, ok, State#state{channel_pid = ChannelPid}};
{1, true} ->
{reply, {error, <<"channel exists">>}, State};
{0, _} ->
{reply, {error, <<"serivce stopped">>}, State}
end;
%% 请求参数项 done
handle_call(request_config, _From, State = #state{service_id = ServiceId, running_status = ?STATUS_RUNNING}) ->
Params = service_model:get_params(ServiceId),
{reply, {ok, Params}, State};
%% 启动服务: 当前服务如果正常运行,则不允许重启
handle_call(start_service, _From, State = #state{running_status = ?STATUS_RUNNING}) ->
{reply, {error, <<"service is running">>}, State};
handle_call(start_service, _From, State = #state{running_status = ?STATUS_STOPPED, manifest = Manifest, service_id = ServiceId}) ->
%% 异步启动服务
case efka_manifest:startup(Manifest) of
{ok, Port} ->
{os_pid, OSPid} = erlang:port_info(Port, os_pid),
lager:debug("start_service port: ~p, os_pid: ~p", [Port, OSPid]),
%% 更新数据库状态
ok = service_model:change_status(ServiceId, 1),
{reply, ok, State#state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}};
{error, Reason} ->
%% 启动失败不能更新数据库里面的状态
{reply, {error, Reason}, State}
end;
%% 停止服务, 主动停止的时候会改变服务配置的status字段
handle_call(stop_service, _From, State = #state{running_status = ?STATUS_STOPPED, port = Port, os_pid = OSPid}) ->
lager:debug("stop service port: ~p, os_pid: ~p", [Port, OSPid]),
{reply, {error, <<"service not running">>}, State};
handle_call(stop_service, _From, State = #state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid, service_id = ServiceId}) when is_port(Port) ->
%% 优先使用微服务提供的stop指令, 没有提供的情况下使用kill指令
kill_os_pid(OSPid),
erlang:is_port(Port) andalso erlang:port_close(Port),
lager:debug("port: ~p, os_pid: ~p, will closed", [Port, OSPid]),
ok = service_model:change_status(ServiceId, 0),
{reply, ok, State#state{port = undefined, os_pid = undefined, running_status = ?STATUS_STOPPED}};
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({metric_data, DeviceUUID, LineProtocolData}, State = #state{service_id = ServiceId}) ->
efka_agent:metric_data(ServiceId, DeviceUUID, LineProtocolData),
{noreply, State};
handle_cast({send_event, EventType, Params}, State = #state{service_id = ServiceId}) ->
efka_agent:event(ServiceId, EventType, Params),
{noreply, State};
handle_cast({send_ai_event, EventType, Params}, State = #state{service_id = ServiceId}) ->
efka_agent:ai_event(ServiceId, EventType, Params),
{noreply, State};
%% 推送配置项目
handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid, inflight = Inflight}) ->
case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of
true ->
efka_tcp_channel:push_config(ChannelPid, Ref, self(), ConfigJson),
{noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}};
false ->
ReceiverPid ! {ems_reply, Ref, {error, <<"channel is not alive">>}},
{reply, State}
end;
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 重启服务
handle_info({timeout, _, reboot_service}, State = #state{service_id = ServiceId, manifest = Manifest}) ->
case service_model:get_status(ServiceId) of
0 ->
lager:debug("[efka_micro_service] service_id: ~p, is stopped, ignore boot_service", [ServiceId]),
{noreply, State};
1 ->
case efka_manifest:startup(Manifest) of
{ok, Port} ->
{os_pid, OSPid} = erlang:port_info(Port, os_pid),
lager:debug("[efka_micro_service] reboot service success: ~p, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]),
{noreply, State#state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}};
{error, Reason} ->
lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]),
{noreply, State#state{running_status = ?STATUS_STOPPED}}
end
end;
%% 处理channel的回复
handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight}) ->
case maps:take(Ref, Inflight) of
error ->
{noreply, State};
{ReceiverPid, NInflight} ->
ReceiverPid ! {ems_reply, Ref, Reply},
{noreply, State#state{inflight = NInflight}}
end;
handle_info({Port, {data, Data}}, State = #state{port = Port, service_id = ServiceId}) ->
lager:debug("[efka_micro_service] service_id: ~p, port data: ~p", [ServiceId, Data]),
{noreply, State};
%% 处理port的消息, Port的被动关闭会触发因此这个时候的Port和State.port的值是相等的
handle_info({Port, {exit_status, Code}}, State = #state{service_id = ServiceId}) ->
lager:debug("[efka_micro_service] service_id: ~p, port: ~p, exit with code: ~p", [ServiceId, Port, Code]),
% erlang:start_timer(5000, self(), reboot_service),
{noreply, State#state{port = undefined, os_pid = undefined, running_status = ?STATUS_STOPPED}};
%% 处理channel进程的退出
handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service_id = ServiceId}) ->
lager:debug("[efka_micro_service] service: ~p, channel exited: ~p", [ServiceId, Reason]),
{noreply, State#state{channel_pid = undefined, inflight = #{}}}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, _State = #state{os_pid = OSPid}) ->
lager:debug("[efka_micro_service] terminate with reason: ~p", [Reason]),
kill_os_pid(OSPid),
ok;
terminate(Reason, _State) ->
lager:debug("[efka_micro_service] terminate with reason: ~p", [Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
%% 关闭系统进程
-spec kill_os_pid(port() | undefined) -> no_return().
kill_os_pid(undefined) ->
ok;
kill_os_pid(OSPid) when is_integer(OSPid) ->
Cmd = lists:flatten(io_lib:format("kill -9 ~p", [OSPid])),
lager:debug("kill cmd is: ~p", [Cmd]),
os:cmd(Cmd).