fix alarms
This commit is contained in:
parent
a836b16f15
commit
3561a3e3ad
@ -7,132 +7,49 @@
|
|||||||
%%% Created : 24. 6月 2025 22:43
|
%%% Created : 24. 6月 2025 22:43
|
||||||
%%%-------------------------------------------------------------------
|
%%%-------------------------------------------------------------------
|
||||||
-module(modbus_alarm).
|
-module(modbus_alarm).
|
||||||
-author("anlicheng").
|
|
||||||
-include("modbus_ast.hrl").
|
-include("modbus_ast.hrl").
|
||||||
|
-export([new/1, maybe_alarm/3]).
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-record(alarm_mod, {
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([start_link/2, push_var/3]).
|
|
||||||
-export([eval_condition/1]).
|
|
||||||
-export([hold/2]).
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
|
||||||
|
|
||||||
-record(state, {
|
|
||||||
parent_pid :: pid(),
|
|
||||||
alarm :: #modbus_alarm{},
|
alarm :: #modbus_alarm{},
|
||||||
%% 存储历史值,用来做持续性判断, {timestamp, bool}
|
%% 存储历史值,用来做持续性判断, {timestamp, bool}
|
||||||
history = []
|
history = []
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%%===================================================================
|
-spec new(Alarm :: #modbus_alarm{}) -> error | {ok, Key :: binary(), AlarmMod :: #alarm_mod{}}.
|
||||||
%%% API
|
new(Alarm = #modbus_alarm{condition = Condition0}) ->
|
||||||
%%%===================================================================
|
Condition = binary_to_list(Condition0),
|
||||||
|
case re:run(Condition, "\\$(\\w+)", [global, {capture, all}, list]) of
|
||||||
-spec push_var(Pid :: pid(), Key :: binary(), Val :: any()) -> no_return().
|
{match, Captures} ->
|
||||||
push_var(Pid, Key, Val) when is_pid(Pid), is_binary(Key) ->
|
Vars = [Var || [_|Var] <- Captures],
|
||||||
gen_server:cast(Pid, {push_var, Key, Val}).
|
NVars = sets:to_list(sets:from_list(Vars)),
|
||||||
|
case length(NVars) =:= 1 of
|
||||||
%% @doc Spawns the server and registers the local name (unique)
|
|
||||||
-spec(start_link(ParentPid :: pid(), Alarm :: #modbus_alarm{}) ->
|
|
||||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
|
||||||
start_link(ParentPid, Alarm) ->
|
|
||||||
gen_server:start_link(?MODULE, [ParentPid, Alarm], []).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% gen_server callbacks
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Initializes the server
|
|
||||||
-spec(init(Args :: term()) ->
|
|
||||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term()} | ignore).
|
|
||||||
init([ParentPid, Alarm = #modbus_alarm{condition = Condition}]) ->
|
|
||||||
lager:debug("alarm is: ~p", [Alarm]),
|
|
||||||
case binary:split(Condition, <<".">>) of
|
|
||||||
[<<$$, DeviceName/binary>>, Metric] ->
|
|
||||||
lager:debug("device_name is: ~p, metric is: ~p", [DeviceName, Metric]);
|
|
||||||
_ ->
|
|
||||||
lager:debug("condition input: ~p", [Condition])
|
|
||||||
end,
|
|
||||||
|
|
||||||
{ok, #state{parent_pid = ParentPid, alarm = Alarm}}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling call messages
|
|
||||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
|
||||||
State :: #state{}) ->
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}} |
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_call(_Request, _From, State = #state{}) ->
|
|
||||||
{reply, ok, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling cast messages
|
|
||||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_cast({push_var, Key, Val}, State = #state{parent_pid = ParentPid, alarm = #modbus_alarm{name = AlarmName, condition = Condition, hold_time = HoldTime}, history = History}) ->
|
|
||||||
case binary:match(Condition, Key) of
|
|
||||||
nomatch ->
|
|
||||||
{noreply, State};
|
|
||||||
_ ->
|
|
||||||
Expr0 = binary:replace(Condition, Key, Val, [global]),
|
|
||||||
Expr = iolist_to_binary(<<Expr0/binary, ".">>),
|
|
||||||
IsSatisfied = eval_condition(binary_to_list(Expr)),
|
|
||||||
|
|
||||||
Timestamp = modbus_util:current_seconds(),
|
|
||||||
NHistory = [{Timestamp, IsSatisfied}|History],
|
|
||||||
case hold(NHistory, HoldTime) of
|
|
||||||
true ->
|
true ->
|
||||||
%% 告警
|
{ok, hd(NVars), #alarm_mod{alarm = Alarm, history = []}};
|
||||||
ParentPid ! {device_alarm, AlarmName, HoldTime, Val},
|
|
||||||
lager:warning("[push_var] hold will trigger");
|
|
||||||
false ->
|
false ->
|
||||||
lager:debug("[push_var] IsSatisfied: ~p, hold will not trigger", [IsSatisfied])
|
error
|
||||||
end,
|
end
|
||||||
{noreply, State#state{history = NHistory}}
|
end.
|
||||||
end;
|
|
||||||
|
|
||||||
handle_cast(_Request, State = #state{}) ->
|
-spec maybe_alarm(Key0 :: binary(), Val :: any(), AlarmMod :: #alarm_mod{}) ->
|
||||||
{noreply, State}.
|
{alarm, {AlarmName :: binary(), HoldTime :: integer()}, AlarmMod :: #alarm_mod{}} | {ignore, AlarmMod :: #alarm_mod{}}.
|
||||||
|
maybe_alarm(Key0, Val, AlarmMod = #alarm_mod{alarm = #modbus_alarm{name = AlarmName, condition = Condition, hold_time = HoldTime}, history = History}) when is_binary(Key0) ->
|
||||||
|
Key = <<$$, Key0/binary>>,
|
||||||
|
|
||||||
%% @private
|
Expr0 = binary:replace(Condition, Key, Val, [global]),
|
||||||
%% @doc Handling all non call/cast messages
|
Expr = iolist_to_binary(<<Expr0/binary, ".">>),
|
||||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
IsSatisfied = eval_condition(binary_to_list(Expr)),
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_info(_Info, State = #state{}) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
%% @private
|
Timestamp = modbus_util:current_seconds(),
|
||||||
%% @doc This function is called by a gen_server when it is about to
|
NHistory = [{Timestamp, IsSatisfied}|History],
|
||||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
case hold(NHistory, HoldTime) of
|
||||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
true ->
|
||||||
%% with Reason. The return value is ignored.
|
%% 告警
|
||||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
{alarm, {AlarmName, HoldTime}, AlarmMod#alarm_mod{history = NHistory}};
|
||||||
State :: #state{}) -> term()).
|
false ->
|
||||||
terminate(_Reason, _State = #state{}) ->
|
lager:debug("[push_var] IsSatisfied: ~p, hold will not trigger", [IsSatisfied]),
|
||||||
ok.
|
{ignore, AlarmMod#alarm_mod{history = NHistory}}
|
||||||
|
end.
|
||||||
%% @private
|
|
||||||
%% @doc Convert process state when code is changed
|
|
||||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
|
||||||
Extra :: term()) ->
|
|
||||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
|
||||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
|
|||||||
@ -21,6 +21,8 @@
|
|||||||
-record(state, {
|
-record(state, {
|
||||||
parent_pid :: pid(),
|
parent_pid :: pid(),
|
||||||
device :: #modbus_device{},
|
device :: #modbus_device{},
|
||||||
|
|
||||||
|
alarm_map = #{},
|
||||||
%% #{Ref => MetricName}
|
%% #{Ref => MetricName}
|
||||||
inflight = #{}
|
inflight = #{}
|
||||||
}).
|
}).
|
||||||
@ -44,7 +46,7 @@ start_link(ParentPid, Device = #modbus_device{}) when is_pid(ParentPid) ->
|
|||||||
-spec(init(Args :: term()) ->
|
-spec(init(Args :: term()) ->
|
||||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term()} | ignore).
|
{stop, Reason :: term()} | ignore).
|
||||||
init([ParentPid, Device = #modbus_device{metrics = Metrics, poll_interval = PollInterval}]) ->
|
init([ParentPid, Device = #modbus_device{metrics = Metrics, alarms = Alarms0, poll_interval = PollInterval}]) ->
|
||||||
%% 处理采集项目
|
%% 处理采集项目
|
||||||
%% 初步启动的过程中按照step提交任务
|
%% 初步启动的过程中按照step提交任务
|
||||||
Len = length(Metrics),
|
Len = length(Metrics),
|
||||||
@ -56,7 +58,20 @@ init([ParentPid, Device = #modbus_device{metrics = Metrics, poll_interval = Poll
|
|||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
{ok, #state{parent_pid = ParentPid, device = Device}}.
|
|
||||||
|
%% 转化成对应的模块管理对象
|
||||||
|
Alarms = lists:flatmap(fun(Alarm0) ->
|
||||||
|
case modbus_alarm:new(Alarm0) of
|
||||||
|
error ->
|
||||||
|
[];
|
||||||
|
{ok, Val, AlarmMod} ->
|
||||||
|
[{Val, AlarmMod}]
|
||||||
|
end
|
||||||
|
end, Alarms0),
|
||||||
|
|
||||||
|
lager:debug("[modbus_device] alarms is: ~p", [Alarms]),
|
||||||
|
|
||||||
|
{ok, #state{parent_pid = ParentPid, device = Device, alarm_map = maps:from_list(Alarms)}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling call messages
|
%% @doc Handling call messages
|
||||||
@ -113,7 +128,7 @@ handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = Paren
|
|||||||
|
|
||||||
{noreply, State#state{inflight = maps:put(Ref, Name, Inflight)}};
|
{noreply, State#state{inflight = maps:put(Ref, Name, Inflight)}};
|
||||||
|
|
||||||
handle_info({request_reply, Ref, Val}, State = #state{parent_pid = ParentPid, device = #modbus_device{name = DeviceName, metrics = Metrics}, inflight = Inflight}) ->
|
handle_info({request_reply, Ref, Val}, State = #state{parent_pid = ParentPid, alarm_map = AlarmMap, device = #modbus_device{name = DeviceName, metrics = Metrics}, inflight = Inflight}) ->
|
||||||
case maps:take(Ref, Inflight) of
|
case maps:take(Ref, Inflight) of
|
||||||
error ->
|
error ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
@ -131,7 +146,21 @@ handle_info({request_reply, Ref, Val}, State = #state{parent_pid = ParentPid, de
|
|||||||
Key = <<"$", DeviceName/binary, ".", MetricName/binary>>,
|
Key = <<"$", DeviceName/binary, ".", MetricName/binary>>,
|
||||||
ParentPid ! {broadcast, Key, Val},
|
ParentPid ! {broadcast, Key, Val},
|
||||||
|
|
||||||
{noreply, State#state{inflight = NInflight}}
|
NAlarmMap = case maps:take(MetricName, AlarmMap) of
|
||||||
|
error ->
|
||||||
|
AlarmMap;
|
||||||
|
{AlarmMod, Map2} ->
|
||||||
|
case modbus_alarm:maybe_alarm(MetricName, Val, AlarmMod) of
|
||||||
|
{alarm, {AlarmName, HoldTime}, NAlarmMod} ->
|
||||||
|
lager:debug("[modbus_device] alarm_name: ~p, hold_time: ~p", [AlarmName, HoldTime]),
|
||||||
|
%% todo alarm
|
||||||
|
maps:put(MetricName, NAlarmMod, Map2);
|
||||||
|
{ignore, NAlarmMod} ->
|
||||||
|
maps:put(MetricName, NAlarmMod, Map2)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
|
||||||
|
{noreply, State#state{inflight = NInflight, alarm_map = NAlarmMap}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info(_Info, State = #state{}) ->
|
handle_info(_Info, State = #state{}) ->
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user