ekfa/apps/efka/src/efka_service.erl
2025-09-17 11:41:02 +08:00

150 lines
6.5 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%% 1. 需要管理服务的整个生命周期,包括: 启动,停止
%%% 2. 需要监控服务的状态通过port的方式
%%% 3. 服务的启动和关闭,需要在更高的层级控制
%%% @end
%%% Created : 18. 4月 2025 16:50
%%%-------------------------------------------------------------------
-module(efka_service).
-author("anlicheng").
-include("efka_tables.hrl").
-behaviour(gen_server).
%% API
-export([start_link/2]).
-export([get_name/1, get_pid/1, attach_channel/2]).
-export([metric_data/4, send_event/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(state, {
service_id :: binary(),
%% 通道id信息
channel_pid :: pid() | undefined
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec get_name(ServiceId :: binary()) -> atom().
get_name(ServiceId) when is_binary(ServiceId) ->
list_to_atom("efka_service:" ++ binary_to_list(ServiceId)).
-spec get_pid(ServiceId :: binary()) -> undefined | pid().
get_pid(ServiceId) when is_binary(ServiceId) ->
whereis(get_name(ServiceId)).
-spec metric_data(Pid :: pid(), DeviceUUID :: binary(), RouteKey :: binary(), Metric :: binary()) -> no_return().
metric_data(Pid, DeviceUUID, RouteKey, Metric) when is_pid(Pid), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) ->
gen_server:cast(Pid, {metric_data, DeviceUUID, RouteKey, Metric}).
-spec send_event(Pid :: pid(), EventType :: integer(), Params :: binary()) -> no_return().
send_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_binary(Params) ->
gen_server:cast(Pid, {send_event, EventType, Params}).
-spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()}.
attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) ->
gen_server:call(Pid, {attach_channel, ChannelPid}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Name :: atom(), Service :: binary()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(Name, ServiceId) when is_atom(Name), is_binary(ServiceId) ->
gen_server:start_link({local, Name}, ?MODULE, [ServiceId], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([ServiceId]) ->
%% supervisor进程通过exit(ChildPid, shutdown)调用的时候确保terminate函数被调用
lager:debug("[efka_service] service_id: ~p, started", [ServiceId]),
{ok, #state{service_id = ServiceId}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 绑定channel
handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = OldChannelPid, service_id = ServiceId}) ->
case is_pid(OldChannelPid) andalso is_process_alive(OldChannelPid) of
false ->
erlang:monitor(process, ChannelPid),
lager:debug("[efka_service] service_id: ~p, channel attched", [ServiceId]),
{reply, ok, State#state{channel_pid = ChannelPid}};
true ->
{reply, {error, <<"channel exists">>}, State}
end;
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({metric_data, DeviceUUID, RouteKey, Metric}, State = #state{service_id = ServiceId}) ->
lager:debug("[efka_service] metric_data service_id: ~p, device_uuid: ~p, route_key: ~p, metric data: ~p", [ServiceId, DeviceUUID, RouteKey, Metric]),
efka_remote_agent:metric_data(ServiceId, DeviceUUID, RouteKey, Metric),
{noreply, State};
handle_cast({send_event, EventType, Params}, State = #state{service_id = ServiceId}) ->
efka_remote_agent:event(ServiceId, EventType, Params),
lager:debug("[efka_service] send_event, service_id: ~p, event_type: ~p, params: ~p", [ServiceId, EventType, Params]),
{noreply, State};
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 处理channel进程的退出
handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service_id = ServiceId}) ->
lager:debug("[efka_service] service_id: ~p, channel exited: ~p", [ServiceId, Reason]),
{noreply, State#state{channel_pid = undefined}}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, _State = #state{service_id = ServiceId}) ->
lager:debug("[efka_service] service_id: ~p, terminate with reason: ~p", [ServiceId, Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================