From 3561a3e3ad8afb3985e90cea45706a63138f7013 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 4 Jul 2025 00:42:38 +0800 Subject: [PATCH] fix alarms --- apps/modbus/src/modbus_alarm.erl | 145 +++++++----------------------- apps/modbus/src/modbus_device.erl | 37 +++++++- 2 files changed, 64 insertions(+), 118 deletions(-) diff --git a/apps/modbus/src/modbus_alarm.erl b/apps/modbus/src/modbus_alarm.erl index 89c6f7d..4d9d574 100644 --- a/apps/modbus/src/modbus_alarm.erl +++ b/apps/modbus/src/modbus_alarm.erl @@ -7,132 +7,49 @@ %%% Created : 24. 6月 2025 22:43 %%%------------------------------------------------------------------- -module(modbus_alarm). --author("anlicheng"). -include("modbus_ast.hrl"). +-export([new/1, maybe_alarm/3]). --behaviour(gen_server). - -%% API --export([start_link/2, push_var/3]). --export([eval_condition/1]). --export([hold/2]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - --record(state, { - parent_pid :: pid(), +-record(alarm_mod, { alarm :: #modbus_alarm{}, %% 存储历史值,用来做持续性判断, {timestamp, bool} history = [] }). -%%%=================================================================== -%%% API -%%%=================================================================== - --spec push_var(Pid :: pid(), Key :: binary(), Val :: any()) -> no_return(). -push_var(Pid, Key, Val) when is_pid(Pid), is_binary(Key) -> - gen_server:cast(Pid, {push_var, Key, Val}). - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link(ParentPid :: pid(), Alarm :: #modbus_alarm{}) -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(ParentPid, Alarm) -> - gen_server:start_link(?MODULE, [ParentPid, Alarm], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([ParentPid, Alarm = #modbus_alarm{condition = Condition}]) -> - lager:debug("alarm is: ~p", [Alarm]), - case binary:split(Condition, <<".">>) of - [<<$$, DeviceName/binary>>, Metric] -> - lager:debug("device_name is: ~p, metric is: ~p", [DeviceName, Metric]); - _ -> - lager:debug("condition input: ~p", [Condition]) - end, - - {ok, #state{parent_pid = ParentPid, alarm = Alarm}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({push_var, Key, Val}, State = #state{parent_pid = ParentPid, alarm = #modbus_alarm{name = AlarmName, condition = Condition, hold_time = HoldTime}, history = History}) -> - case binary:match(Condition, Key) of - nomatch -> - {noreply, State}; - _ -> - Expr0 = binary:replace(Condition, Key, Val, [global]), - Expr = iolist_to_binary(<>), - IsSatisfied = eval_condition(binary_to_list(Expr)), - - Timestamp = modbus_util:current_seconds(), - NHistory = [{Timestamp, IsSatisfied}|History], - case hold(NHistory, HoldTime) of +-spec new(Alarm :: #modbus_alarm{}) -> error | {ok, Key :: binary(), AlarmMod :: #alarm_mod{}}. +new(Alarm = #modbus_alarm{condition = Condition0}) -> + Condition = binary_to_list(Condition0), + case re:run(Condition, "\\$(\\w+)", [global, {capture, all}, list]) of + {match, Captures} -> + Vars = [Var || [_|Var] <- Captures], + NVars = sets:to_list(sets:from_list(Vars)), + case length(NVars) =:= 1 of true -> - %% 告警 - ParentPid ! {device_alarm, AlarmName, HoldTime, Val}, - lager:warning("[push_var] hold will trigger"); + {ok, hd(NVars), #alarm_mod{alarm = Alarm, history = []}}; false -> - lager:debug("[push_var] IsSatisfied: ~p, hold will not trigger", [IsSatisfied]) - end, - {noreply, State#state{history = NHistory}} - end; + error + end + end. -handle_cast(_Request, State = #state{}) -> - {noreply, State}. +-spec maybe_alarm(Key0 :: binary(), Val :: any(), AlarmMod :: #alarm_mod{}) -> + {alarm, {AlarmName :: binary(), HoldTime :: integer()}, AlarmMod :: #alarm_mod{}} | {ignore, AlarmMod :: #alarm_mod{}}. +maybe_alarm(Key0, Val, AlarmMod = #alarm_mod{alarm = #modbus_alarm{name = AlarmName, condition = Condition, hold_time = HoldTime}, history = History}) when is_binary(Key0) -> + Key = <<$$, Key0/binary>>, -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info(_Info, State = #state{}) -> - {noreply, State}. + Expr0 = binary:replace(Condition, Key, Val, [global]), + Expr = iolist_to_binary(<>), + IsSatisfied = eval_condition(binary_to_list(Expr)), -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. + Timestamp = modbus_util:current_seconds(), + NHistory = [{Timestamp, IsSatisfied}|History], + case hold(NHistory, HoldTime) of + true -> + %% 告警 + {alarm, {AlarmName, HoldTime}, AlarmMod#alarm_mod{history = NHistory}}; + false -> + lager:debug("[push_var] IsSatisfied: ~p, hold will not trigger", [IsSatisfied]), + {ignore, AlarmMod#alarm_mod{history = NHistory}} + end. %%%=================================================================== %%% Internal functions diff --git a/apps/modbus/src/modbus_device.erl b/apps/modbus/src/modbus_device.erl index e029831..424c1a1 100644 --- a/apps/modbus/src/modbus_device.erl +++ b/apps/modbus/src/modbus_device.erl @@ -21,6 +21,8 @@ -record(state, { parent_pid :: pid(), device :: #modbus_device{}, + + alarm_map = #{}, %% #{Ref => MetricName} inflight = #{} }). @@ -44,7 +46,7 @@ start_link(ParentPid, Device = #modbus_device{}) when is_pid(ParentPid) -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([ParentPid, Device = #modbus_device{metrics = Metrics, poll_interval = PollInterval}]) -> +init([ParentPid, Device = #modbus_device{metrics = Metrics, alarms = Alarms0, poll_interval = PollInterval}]) -> %% 处理采集项目 %% 初步启动的过程中按照step提交任务 Len = length(Metrics), @@ -56,7 +58,20 @@ init([ParentPid, Device = #modbus_device{metrics = Metrics, poll_interval = Poll false -> ok end, - {ok, #state{parent_pid = ParentPid, device = Device}}. + + %% 转化成对应的模块管理对象 + Alarms = lists:flatmap(fun(Alarm0) -> + case modbus_alarm:new(Alarm0) of + error -> + []; + {ok, Val, AlarmMod} -> + [{Val, AlarmMod}] + end + end, Alarms0), + + lager:debug("[modbus_device] alarms is: ~p", [Alarms]), + + {ok, #state{parent_pid = ParentPid, device = Device, alarm_map = maps:from_list(Alarms)}}. %% @private %% @doc Handling call messages @@ -113,7 +128,7 @@ handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = Paren {noreply, State#state{inflight = maps:put(Ref, Name, Inflight)}}; -handle_info({request_reply, Ref, Val}, State = #state{parent_pid = ParentPid, device = #modbus_device{name = DeviceName, metrics = Metrics}, inflight = Inflight}) -> +handle_info({request_reply, Ref, Val}, State = #state{parent_pid = ParentPid, alarm_map = AlarmMap, device = #modbus_device{name = DeviceName, metrics = Metrics}, inflight = Inflight}) -> case maps:take(Ref, Inflight) of error -> {noreply, State}; @@ -131,7 +146,21 @@ handle_info({request_reply, Ref, Val}, State = #state{parent_pid = ParentPid, de Key = <<"$", DeviceName/binary, ".", MetricName/binary>>, ParentPid ! {broadcast, Key, Val}, - {noreply, State#state{inflight = NInflight}} + NAlarmMap = case maps:take(MetricName, AlarmMap) of + error -> + AlarmMap; + {AlarmMod, Map2} -> + case modbus_alarm:maybe_alarm(MetricName, Val, AlarmMod) of + {alarm, {AlarmName, HoldTime}, NAlarmMod} -> + lager:debug("[modbus_device] alarm_name: ~p, hold_time: ~p", [AlarmName, HoldTime]), + %% todo alarm + maps:put(MetricName, NAlarmMod, Map2); + {ignore, NAlarmMod} -> + maps:put(MetricName, NAlarmMod, Map2) + end + end, + + {noreply, State#state{inflight = NInflight, alarm_map = NAlarmMap}} end; handle_info(_Info, State = #state{}) ->