From 124db78aa72d747817c406704dc950bd7fbf0819 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 13 Aug 2025 15:52:38 +0800 Subject: [PATCH] fix efka --- apps/efka/src/efka_std_modbus_service.erl | 291 ++++++++++++++++++++++ message_pb.proto | 6 + 2 files changed, 297 insertions(+) create mode 100644 apps/efka/src/efka_std_modbus_service.erl diff --git a/apps/efka/src/efka_std_modbus_service.erl b/apps/efka/src/efka_std_modbus_service.erl new file mode 100644 index 0000000..15fdb23 --- /dev/null +++ b/apps/efka/src/efka_std_modbus_service.erl @@ -0,0 +1,291 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% 1. 需要管理服务的整个生命周期,包括: 启动,停止 +%%% 2. 需要监控服务的状态,通过port的方式 +%%% 3. 服务的启动和关闭,需要在更高的层级控制 +%%% @end +%%% Created : 18. 4月 2025 16:50 +%%%------------------------------------------------------------------- +-module(efka_std_modbus_service). +-author("anlicheng"). +-include("efka_tables.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/2]). +-export([get_name/1, get_pid/1, attach_channel/2]). +-export([push_config/3, request_config/1, invoke/3]). +-export([metric_data/3, send_event/3]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-record(state, { + service_id :: binary(), + %% 通道id信息 + channel_pid :: pid() | undefined, + %% 当前进程的port信息, OSPid = erlang:port_info(Port, os_pid) + port :: undefined | port(), + %% 系统对应的pid + os_pid :: undefined | integer(), + %% 配置信息 + manifest :: undefined | efka_manifest:manifest(), + inflight = #{}, + + %% 映射关系: #{Ref => Fun} + callbacks = #{} +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec get_name(ServiceId :: binary()) -> atom(). +get_name(ServiceId) when is_binary(ServiceId) -> + list_to_atom("efka_service:" ++ binary_to_list(ServiceId)). + +-spec get_pid(ServiceId :: binary()) -> undefined | pid(). +get_pid(ServiceId) when is_binary(ServiceId) -> + whereis(get_name(ServiceId)). + +-spec push_config(Pid :: pid(), Ref :: reference(), ConfigJson :: binary()) -> no_return(). +push_config(Pid, Ref, ConfigJson) when is_pid(Pid), is_binary(ConfigJson) -> + gen_server:cast(Pid, {push_config, Ref, self(), ConfigJson}). + +-spec invoke(Pid :: pid(), Ref :: reference(), Payload :: binary()) -> no_return(). +invoke(Pid, Ref, Payload) when is_pid(Pid), is_reference(Ref), is_binary(Payload) -> + gen_server:cast(Pid, {invoke, Ref, self(), Payload}). + +-spec request_config(Pid :: pid()) -> {ok, Config :: binary()}. +request_config(Pid) when is_pid(Pid) -> + gen_server:call(Pid, request_config). + +-spec metric_data(Pid :: pid(), DeviceUUID :: binary(), Data :: binary()) -> no_return(). +metric_data(Pid, DeviceUUID, Data) when is_pid(Pid), is_binary(DeviceUUID), is_binary(Data) -> + gen_server:cast(Pid, {metric_data, DeviceUUID, Data}). + +-spec send_event(Pid :: pid(), EventType :: integer(), Params :: binary()) -> no_return(). +send_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_binary(Params) -> + gen_server:cast(Pid, {send_event, EventType, Params}). + +-spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()}. +attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) -> + gen_server:call(Pid, {attach_channel, ChannelPid}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(Name :: atom(), Service :: binary()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(Name, ServiceId) when is_atom(Name), is_binary(ServiceId) -> + gen_server:start_link({local, Name}, ?MODULE, [ServiceId], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([ServiceId]) -> + %% supervisor进程通过exit(ChildPid, shutdown)调用的时候,确保terminate函数被调用 + erlang:process_flag(trap_exit, true), + case service_model:get_service(ServiceId) of + error -> + lager:notice("[efka_service] service_id: ~p, not found", [ServiceId]), + ignore; + {ok, #service{root_dir = RootDir}} -> + %% 第一次启动,要求必须成功;只有第一次启动成功,后续的重启逻辑才有意义 + case efka_manifest:new(RootDir) of + {ok, Manifest} -> + case efka_manifest:startup(Manifest) of + {ok, Port} -> + {os_pid, OSPid} = erlang:port_info(Port, os_pid), + lager:debug("[efka_service] service: ~p, port: ~p, boot_service success os_pid: ~p", [ServiceId, Port, OSPid]), + {ok, #state{service_id = ServiceId, manifest = Manifest, port = Port, os_pid = OSPid}}; + {error, Reason} -> + lager:debug("[efka_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), + {stop, Reason} + end; + {error, Reason} -> + lager:notice("[efka_service] service: ~p, read manifest.json get error: ~p", [ServiceId, Reason]), + ignore + end + end. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +%% 绑定channel +handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = OldChannelPid, service_id = ServiceId}) -> + case is_pid(OldChannelPid) andalso is_process_alive(OldChannelPid) of + false -> + erlang:monitor(process, ChannelPid), + lager:debug("[efka_service] service_id: ~p, channel attched", [ServiceId]), + {reply, ok, State#state{channel_pid = ChannelPid}}; + true -> + {reply, {error, <<"channel exists">>}, State} + end; + +%% 请求参数项 done +handle_call(request_config, _From, State = #state{service_id = ServiceId}) -> + case service_model:get_config_json(ServiceId) of + {ok, ConfigJson} -> + {reply, {ok, ConfigJson}, State}; + error -> + {reply, {ok, <<>>}, State} + end; + +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({metric_data, DeviceUUID, LineProtocolData}, State = #state{service_id = ServiceId}) -> + lager:debug("[efka_service] metric_data service_id: ~p, device_uuid: ~p, metric data: ~p", [ServiceId, DeviceUUID, LineProtocolData]), + efka_agent:metric_data(ServiceId, DeviceUUID, LineProtocolData), + {noreply, State}; + +handle_cast({send_event, EventType, Params}, State = #state{service_id = ServiceId}) -> + efka_agent:event(ServiceId, EventType, Params), + lager:debug("[efka_service] send_event, service_id: ~p, event_type: ~p, params: ~p", [ServiceId, EventType, Params]), + {noreply, State}; + +%% 推送配置项目 +handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{channel_pid = ChannelPid, service_id = ServiceId, inflight = Inflight, callbacks = Callbacks}) -> + case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of + true -> + efka_tcp_channel:push_config(ChannelPid, Ref, self(), ConfigJson), + %% 设置成功,需要更新微服务的配置 + CB = fun() -> service_model:set_config(ServiceId, ConfigJson) end, + {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight), callbacks = maps:put(Ref, CB, Callbacks)}}; + false -> + ReceiverPid ! {service_reply, Ref, {error, <<"channel is not alive">>}}, + {noreply, State} + end; + +%% 推送配置项目 +handle_cast({invoke, Ref, ReceiverPid, Payload}, State = #state{channel_pid = ChannelPid, inflight = Inflight}) -> + case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of + true -> + efka_tcp_channel:invoke(ChannelPid, Ref, self(), Payload), + {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}}; + false -> + ReceiverPid ! {service_reply, Ref, {error, <<"channel is not alive">>}}, + {reply, State} + end; + +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +%% 重启服务 +handle_info({timeout, _, reboot_service}, State = #state{service_id = ServiceId, manifest = Manifest}) -> + case efka_manifest:startup(Manifest) of + {ok, Port} -> + {os_pid, OSPid} = erlang:port_info(Port, os_pid), + lager:debug("[efka_service] service_id: ~p, reboot success, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]), + {noreply, State#state{port = Port, os_pid = OSPid}}; + {error, Reason} -> + lager:debug("[efka_service] service_id: ~p, boot_service get error: ~p", [ServiceId, Reason]), + try_reboot(), + {noreply, State} + end; + +%% 处理channel的回复 +handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight, callbacks = Callbacks}) -> + case maps:take(Ref, Inflight) of + error -> + {noreply, State}; + {ReceiverPid, NInflight} -> + ReceiverPid ! {service_reply, Ref, Reply}, + + {noreply, State#state{inflight = NInflight, callbacks = trigger_callback(Ref, Callbacks)}} + end; + +handle_info({Port, {data, Data}}, State = #state{service_id = ServiceId}) when is_port(Port) -> + lager:debug("[efka_service] service_id: ~p, port data: ~p", [ServiceId, Data]), + {noreply, State}; + +%% 处理port的消息, Port的被动关闭会触发;因此这个时候的Port和State.port的值是相等的 +handle_info({Port, {exit_status, Code}}, State = #state{service_id = ServiceId}) when is_port(Port) -> + lager:debug("[efka_service] service_id: ~p, port: ~p, exit with code: ~p", [ServiceId, Port, Code]), + {noreply, State#state{port = undefined, os_pid = undefined}}; + +%% 处理port的退出消息 +handle_info({'EXIT', Port, Reason}, State = #state{service_id = ServiceId}) when is_port(Port) -> + lager:debug("[efka_service] service_id: ~p, port: ~p, exit with reason: ~p", [ServiceId, Port, Reason]), + try_reboot(), + {noreply, State#state{port = undefined, os_pid = undefined}}; + +%% 处理channel进程的退出 +handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service_id = ServiceId}) -> + lager:debug("[efka_service] service_id: ~p, channel exited: ~p", [ServiceId, Reason]), + {noreply, State#state{channel_pid = undefined, inflight = #{}}}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(Reason, _State = #state{service_id = ServiceId, port = Port, os_pid = OSPid}) -> + erlang:is_port(Port) andalso erlang:port_close(Port), + catch kill_os_pid(OSPid), + lager:debug("[efka_service] service_id: ~p, terminate with reason: ~p", [ServiceId, Reason]), + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% 关闭系统进程 +-spec kill_os_pid(port() | undefined) -> no_return(). +kill_os_pid(undefined) -> + ok; +kill_os_pid(OSPid) when is_integer(OSPid) -> + Cmd = lists:flatten(io_lib:format("kill -9 ~p", [OSPid])), + lager:debug("kill cmd is: ~p", [Cmd]), + os:cmd(Cmd). + +-spec try_reboot() -> no_return(). +try_reboot() -> + erlang:start_timer(5000, self(), reboot_service). + +-spec trigger_callback(Ref :: reference(), Callbacks :: map()) -> NewCallbacks :: map(). +trigger_callback(Ref, Callbacks) -> + case maps:take(Ref, Callbacks) of + error -> + Callbacks; + {Fun, NCallbacks} -> + catch Fun(), + NCallbacks + end. \ No newline at end of file diff --git a/message_pb.proto b/message_pb.proto index 11507e1..0e8112a 100644 --- a/message_pb.proto +++ b/message_pb.proto @@ -111,4 +111,10 @@ message Event { string service_id = 1; uint32 event_type = 2; string params = 3; +} + +// 告警信息 +message Alarm { + string service_id = 1; + string params = 2; } \ No newline at end of file