fix alarm

This commit is contained in:
anlicheng 2025-06-25 00:35:42 +08:00
parent 94d7e5bcd8
commit 4fec7aec1a
6 changed files with 311 additions and 26 deletions

View File

@ -0,0 +1,160 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 24. 6 2025 22:43
%%%-------------------------------------------------------------------
-module(modbus_alarm).
-author("anlicheng").
-include("modbus_ast.hrl").
-behaviour(gen_server).
%% API
-export([start_link/2]).
-export([eval_condition/1]).
-export([hold/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {
parent_pid :: pid(),
alarm :: #modbus_alarm{},
%% , {timestamp, bool}
history = []
}).
%%%===================================================================
%%% API
%%%===================================================================
push_vars(Pid, Vars) when is_pid(Pid), is_list(Vars) ->
gen_server:cast(Pid, {push_vars, Vars}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(ParentPid :: pid(), Alarm :: #modbus_alarm{}) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(ParentPid, Alarm) ->
gen_server:start_link(?MODULE, [ParentPid, Alarm], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([ParentPid, Alarm = #modbus_alarm{condition = Condition}]) ->
lager:debug("alarm is: ~p", [Alarm]),
case binary:split(Condition, <<".">>) of
[<<$$, DeviceName/binary>>, Metric] ->
lager:debug("device_name is: ~p, metric is: ~p", [DeviceName, Metric]);
_ ->
lager:debug("condition input: ~p", [Condition])
end,
{ok, #state{parent_pid = ParentPid, alarm = Alarm}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({push_vars, Vars}, State = #state{alarm = #modbus_alarm{condition = Condition, hold_time = HoldTime}, history = History}) ->
case execute(Vars, Condition) of
true ->
lager:debug("[push_vars] execute result is: true");
false ->
lager:debug("[push_vars] execute result is: false")
end,
{noreply, State};
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec eval_condition(Condition :: binary() | string()) -> boolean().
eval_condition(Condition) when is_binary(Condition) ->
eval_condition(binary_to_list(Condition));
eval_condition(Condition) when is_list(Condition) ->
{ok, Tokens, _} = erl_scan:string(Condition),
{ok, ExprList} = erl_parse:parse_exprs(Tokens),
{value, Bool, _} = erl_eval:exprs(ExprList, []),
case is_boolean(Bool) of
true ->
Bool;
false ->
false
end.
execute([], _) ->
execute;
execute([{Key, Val}|Tail], Condition) ->
case binary:match(Condition, Key) of
nomatch ->
execute(Tail, Condition);
_ ->
Expr0 = binary:replace(Condition, Key, Val, [global]),
Expr = iolist_to_binary(<<Expr0/binary, ".">>),
eval_condition(binary_to_list(Expr))
end.
hold(History, HoldTime) when is_list(History), is_integer(HoldTime) ->
PrefixItems = lists:takewhile(fun({_, Bool}) -> Bool end, History),
lager:debug("items: ~p", [PrefixItems]),
%%
Current = modbus_util:current_seconds(),
lists:any(fun({T0, _}) -> T0 =< Current - HoldTime end, PrefixItems).

View File

@ -22,7 +22,7 @@
parent_pid :: pid(),
device :: #modbus_device{},
%% #{Name => Metric}
metrics = #{},
metrics_map = #{},
%% #{Ref => MetricName}
inflight = #{}
}).
@ -48,18 +48,18 @@ start_link(ParentPid, Device = #modbus_device{}) when is_pid(ParentPid) ->
{stop, Reason :: term()} | ignore).
init([ParentPid, Device = #modbus_device{metrics = Metrics0, poll_interval = PollInterval}]) ->
%%
Metrics = maps:from_list(lists:map(fun(Metric0 = #modbus_metric{name = Name}) -> {Name, Metric0} end, Metrics0)),
MetricsMap = maps:from_list(lists:map(fun(Metric0 = #modbus_metric{name = Name}) -> {Name, Metric0} end, Metrics0)),
%% step提交任务
Len = length(Metrics0),
case Len > 0 of
true ->
Step = erlang:max(1, PollInterval div Len),
start_ticker(maps:keys(Metrics), Step);
lager:debug("[modbus_deivce] step is: ~p", [Step]),
start_ticker(maps:keys(MetricsMap), Step);
false ->
ok
end,
{ok, #state{parent_pid = ParentPid, metrics = Metrics, device = Device}}.
{ok, #state{parent_pid = ParentPid, metrics_map = MetricsMap, device = Device}}.
%% @private
%% @doc Handling call messages
@ -89,13 +89,13 @@ handle_cast(_Request, State = #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = ParentPid, inflight = Inflight,
device = #modbus_device{slave_id = SlaveId, poll_interval = PollInterval, metrics = Metrics}}) ->
handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = ParentPid, inflight = Inflight, metrics_map = MetricsMap,
device = #modbus_device{slave_id = SlaveId, poll_interval = PollInterval}}) ->
%%
poll_ticker(PollInterval, Name),
#modbus_metric{address = Address} = maps:get(Name, Metrics),
#modbus_metric{address = Address} = maps:get(Name, MetricsMap),
%%
ReceiverPid = self(),
Ref = make_ref(),
@ -103,12 +103,12 @@ handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = Paren
{noreply, State#state{inflight = maps:put(Ref, Name, Inflight)}};
handle_info({request_reply, Ref, Val}, State = #state{inflight = Inflight, metrics = Metrics}) ->
handle_info({request_reply, Ref, Val}, State = #state{inflight = Inflight, metrics_map = MetricsMap}) ->
case maps:take(Ref, Inflight) of
error ->
{noreply, State};
{Name, NInflight} ->
#modbus_metric{} = maps:get(Name, Metrics),
#modbus_metric{} = maps:get(Name, MetricsMap),
lager:debug("[modbus_device] metric: ~p, get value: ~p", [Name, Val]),
{noreply, State#state{inflight = NInflight}}
end;

View File

@ -155,8 +155,8 @@ parse_ast0(#block{ident = <<"modbus">>, props = Props}) ->
MapProps = parse_ast1(Props),
#modbus {
transport = maps:get(<<"transport">>, MapProps, undefined),
access_log = map_of_string(<<"access_log">>, MapProps, undefined),
error_log = map_of_string(<<"error_log">>, MapProps, undefined)
access_log = map_of_binary(<<"access_log">>, MapProps, undefined),
error_log = map_of_binary(<<"error_log">>, MapProps, undefined)
};
parse_ast0(#block{ident = <<"device", Name0/binary>>, props = Props}) ->
MapProps = parse_ast1(Props),
@ -165,7 +165,7 @@ parse_ast0(#block{ident = <<"device", Name0/binary>>, props = Props}) ->
slave_id = map_of_integer(<<"slave_id">>, MapProps, 0),
model = maps:get(<<"model">>, MapProps, undefined),
description = maps:get(<<"description">>, MapProps, undefined),
poll_interval = maps:get(<<"poll_interval">>, MapProps, undefined),
poll_interval = map_of_time(<<"poll_interval">>, MapProps, 0),
retries = map_of_integer(<<"retries">>, MapProps, 0),
retry_timeout = maps:get(<<"retry_timeout">>, MapProps, undefined),
metrics = maps:get(<<"metrics">>, MapProps, undefined),
@ -175,15 +175,15 @@ parse_ast0(#block{ident = <<"processor", Name0/binary>>, props = Props}) ->
MapProps = parse_ast1(Props),
#modbus_processor {
name = string:trim(Name0),
input = maps:get(<<"input">>, MapProps, undefined),
input = map_of_binary(<<"input">>, MapProps, <<"">>),
transform = maps:get(<<"transform">>, MapProps, undefined)
};
parse_ast0(#block{ident = <<"alarm", Name0/binary>>, props = Props}) ->
MapProps = parse_ast1(Props, []),
#modbus_alarm {
name = string:trim(Name0),
condition = maps:get(<<"condition">>, MapProps, undefined),
hold_time = maps:get(<<"hold_time">>, MapProps, undefined),
condition = map_of_binary(<<"condition">>, MapProps, <<>>),
hold_time = map_of_time(<<"hold_time">>, MapProps, 0),
actions = maps:get(<<"actions">>, MapProps, undefined),
recovery_actions = maps:get(<<"recovery_actions">>, MapProps, undefined)
}.
@ -211,8 +211,8 @@ parse_ast1([#block{ident = <<"metrics">>, props = Metrics0}|T], Acc) ->
#modbus_metric{
name = MetricName,
address = map_of_integer(<<"address">>, PropsMap, 0),
type = map_of_string(<<"type">>, PropsMap, ""),
unit = strip_quotes(map_of_string(<<"unit">>, PropsMap, ""))
type = map_of_binary(<<"type">>, PropsMap, <<"">>),
unit = strip_quotes(map_of_binary(<<"unit">>, PropsMap, <<"">>))
}
end, Metrics0),
parse_ast1(T, [{<<"metrics">>, Metrics}|Acc]);
@ -232,7 +232,7 @@ parse_ast1([#block{ident = <<"transport", Name0/binary>>, props = Props}|T], Acc
};
<<"tcp">> ->
#modbus_transport_tcp {
host = map_of_string(<<"host">>, PropsMap, undefined),
host = map_of_binary(<<"host">>, PropsMap, undefined),
port = map_of_integer(<<"port">>, PropsMap, undefined),
timeout = map_of_time(<<"timeout">>, PropsMap, undefined)
}
@ -279,11 +279,11 @@ map_of_integer(Key, M, Def) ->
Def
end.
map_of_string(Key, M, Def) ->
map_of_binary(Key, M, Def) ->
case maps:is_key(Key, M) of
true ->
[Val0] = maps:get(Key, M),
binary_to_list(Val0);
iolist_to_binary(Val0);
false ->
Def
end.
@ -308,6 +308,8 @@ map_of_time(Key, M, Def) ->
Def
end.
strip_quotes(Str) when is_binary(Str) ->
list_to_binary(strip_quotes(binary_to_list(Str)));
strip_quotes(Str) when is_list(Str) ->
Str1 = string:trim(Str, leading, "\""),
Str2 = string:trim(Str1, trailing, "\""),

View File

@ -0,0 +1,113 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 24. 6 2025 10:49
%%%-------------------------------------------------------------------
-module(modbus_processor).
-author("anlicheng").
-include("modbus_ast.hrl").
-behaviour(gen_server).
%% API
-export([start_link/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {
parent_pid :: pid(),
processor :: #modbus_processor{}
}).
%%%===================================================================
%%% API
%%%===================================================================
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(ParentPid :: pid(), Processor :: #modbus_processor{}) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(ParentPid, Processor = #modbus_processor{}) when is_pid(ParentPid) ->
gen_server:start_link(?MODULE, [ParentPid, Processor], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([ParentPid, Processor = #modbus_processor{input = Input}]) ->
%% input
lager:debug("processor is: ~p", [Processor]),
case binary:split(Input, <<".">>) of
[<<$$, DeviceName/binary>>, Metric] ->
lager:debug("device_name is: ~p, metric is: ~p", [DeviceName, Metric]);
_ ->
lager:debug("invalid input: ~p", [Input])
end,
{ok, #state{parent_pid = ParentPid, processor = Processor}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -92,6 +92,10 @@ init([AST = #ast{modbus = Modbus = #modbus{error_log = ErrorLog, access_log = Ac
lager:debug("[modbus_service] devices pid: ~p", [DevicesMap]),
{ok, Pid1} = modbus_processor:start_link(self(), hd(AST#ast.processors)),
{ok, Pid2} = modbus_alarm:start_link(self(), hd(AST#ast.alarms)),
{ok, ?DISCONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog),
queue = queue:new(), packet_id = 1, devices_map = DevicesMap}}.
@ -123,7 +127,7 @@ handle_event(info, {request, ReceiverPid, Ref, SlaveId, Address}, StateName, Sta
{keep_state, State}
end;
%%
%% rtu
handle_event(info, read_next, ?CONNECTED, State = #state{mode = rtu, queue = Q, port = Port, inflight = Inflight}) ->
case queue:out(Q) of
{{value, {PacketId, ReceiverPid, Ref, SlaveId, Address}}, Q2} ->
@ -133,6 +137,8 @@ handle_event(info, read_next, ?CONNECTED, State = #state{mode = rtu, queue = Q,
{empty, Q1} ->
{keep_state, State#state{queue = Q1, is_pending = true}}
end;
handle_event(info, read_next, ?DISCONNECTED, State = #state{mode = rtu}) ->
{keep_state, State};
%% port读取数据, todo 32
handle_event(info, {Port, {data, <<?MODBUS_READ:8, PacketId:32, Val:32>>}}, ?CONNECTED, State = #state{mode = rtu, port = Port, delay_ms = DelayMs, inflight = Inflight}) ->
@ -255,8 +261,8 @@ retry_connect() ->
-spec create_log_file(undefined | string()) -> undefined | pid().
create_log_file(undefined) ->
undefined;
create_log_file(FileName) when is_list(FileName) ->
case modbus_logger:start_link(FileName) of
create_log_file(FileName) when is_binary(FileName) ->
case modbus_logger:start_link(binary_to_list(FileName)) of
{ok, FilePid} ->
FilePid;
_ ->

View File

@ -10,7 +10,7 @@
-author("anlicheng").
%% API
-export([split/1, read_until/2]).
-export([split/1, read_until/2, current_seconds/0]).
%% ,
-spec split(Bin :: binary()) -> [binary()].
@ -38,4 +38,8 @@ read_until(Bin, Delim) when is_binary(Bin), is_binary(Delim) ->
{binary:part(Bin, 0, Pos), binary:part(Bin, Pos + 1, byte_size(Bin) - Pos - 1)};
nomatch ->
{Bin, <<>>}
end.
end.
current_seconds() ->
{Mega, Seconds, _Micro} = os:timestamp(),
Mega * 1000000 + Seconds.