fix alarm
This commit is contained in:
parent
4fec7aec1a
commit
5cf842fa1c
@ -13,7 +13,7 @@
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/2]).
|
||||
-export([start_link/2, push_var/3]).
|
||||
-export([eval_condition/1]).
|
||||
-export([hold/2]).
|
||||
|
||||
@ -33,8 +33,9 @@
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
push_vars(Pid, Vars) when is_pid(Pid), is_list(Vars) ->
|
||||
gen_server:cast(Pid, {push_vars, Vars}).
|
||||
-spec push_var(Pid :: pid(), Key :: binary(), Val :: any()) -> no_return().
|
||||
push_var(Pid, Key, Val) when is_pid(Pid), is_binary(Key) ->
|
||||
gen_server:cast(Pid, {push_var, Key, Val}).
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link(ParentPid :: pid(), Alarm :: #modbus_alarm{}) ->
|
||||
@ -81,17 +82,26 @@ handle_call(_Request, _From, State = #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast({push_vars, Vars}, State = #state{alarm = #modbus_alarm{condition = Condition, hold_time = HoldTime}, history = History}) ->
|
||||
case execute(Vars, Condition) of
|
||||
true ->
|
||||
lager:debug("[push_vars] execute result is: true");
|
||||
false ->
|
||||
lager:debug("[push_vars] execute result is: false")
|
||||
end,
|
||||
handle_cast({push_var, Key, Val}, State = #state{alarm = #modbus_alarm{condition = Condition, hold_time = HoldTime}, history = History}) ->
|
||||
case binary:match(Condition, Key) of
|
||||
nomatch ->
|
||||
{noreply, State};
|
||||
_ ->
|
||||
Expr0 = binary:replace(Condition, Key, Val, [global]),
|
||||
Expr = iolist_to_binary(<<Expr0/binary, ".">>),
|
||||
IsSatisfied = eval_condition(binary_to_list(Expr)),
|
||||
|
||||
Timestamp = modbus_util:current_seconds(),
|
||||
NHistory = [{Timestamp, IsSatisfied}|History],
|
||||
case hold(NHistory, HoldTime) of
|
||||
true ->
|
||||
lager:warning("[push_var] hold will trigger");
|
||||
false ->
|
||||
lager:debug("[push_var] IsSatisfied: ~p, hold will not trigger", [IsSatisfied])
|
||||
end,
|
||||
{noreply, State#state{history = NHistory}}
|
||||
end;
|
||||
|
||||
|
||||
{noreply, State};
|
||||
handle_cast(_Request, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
@ -140,21 +150,10 @@ eval_condition(Condition) when is_list(Condition) ->
|
||||
false
|
||||
end.
|
||||
|
||||
execute([], _) ->
|
||||
execute;
|
||||
execute([{Key, Val}|Tail], Condition) ->
|
||||
case binary:match(Condition, Key) of
|
||||
nomatch ->
|
||||
execute(Tail, Condition);
|
||||
_ ->
|
||||
Expr0 = binary:replace(Condition, Key, Val, [global]),
|
||||
Expr = iolist_to_binary(<<Expr0/binary, ".">>),
|
||||
eval_condition(binary_to_list(Expr))
|
||||
end.
|
||||
|
||||
%% 用来做持续性判断
|
||||
-spec hold(History :: list(), HoldTime :: integer()) -> boolean().
|
||||
hold(History, HoldTime) when is_list(History), is_integer(HoldTime) ->
|
||||
PrefixItems = lists:takewhile(fun({_, Bool}) -> Bool end, History),
|
||||
lager:debug("items: ~p", [PrefixItems]),
|
||||
%% 判断是否存在一个元素的时间
|
||||
Current = modbus_util:current_seconds(),
|
||||
lists:any(fun({T0, _}) -> T0 =< Current - HoldTime end, PrefixItems).
|
||||
@ -103,13 +103,19 @@ handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = Paren
|
||||
|
||||
{noreply, State#state{inflight = maps:put(Ref, Name, Inflight)}};
|
||||
|
||||
handle_info({request_reply, Ref, Val}, State = #state{inflight = Inflight, metrics_map = MetricsMap}) ->
|
||||
handle_info({request_reply, Ref, Val}, State = #state{parent_pid = ParentPid, device = #modbus_device{name = DeviceName}, inflight = Inflight, metrics_map = MetricsMap}) ->
|
||||
case maps:take(Ref, Inflight) of
|
||||
error ->
|
||||
{noreply, State};
|
||||
{Name, NInflight} ->
|
||||
#modbus_metric{} = maps:get(Name, MetricsMap),
|
||||
lager:debug("[modbus_device] metric: ~p, get value: ~p", [Name, Val]),
|
||||
{MetricName, NInflight} ->
|
||||
#modbus_metric{} = maps:get(MetricName, MetricsMap),
|
||||
lager:debug("[modbus_device] metric_name: ~p, get value: ~p", [MetricName, Val]),
|
||||
%% todo 还需要解决数据的上传问题
|
||||
|
||||
%% 解决值的广播问题
|
||||
Key = <<"$", DeviceName/binary, ".", MetricName/binary>>,
|
||||
ParentPid ! {broadcast, Key, Val},
|
||||
|
||||
{noreply, State#state{inflight = NInflight}}
|
||||
end;
|
||||
|
||||
|
||||
@ -43,6 +43,11 @@
|
||||
%% #{slaveId => DevicePid}
|
||||
devices_map = #{},
|
||||
|
||||
%% 所有的处理过程
|
||||
processor_pids = [],
|
||||
%% alarms
|
||||
alarm_pids = [],
|
||||
|
||||
packet_id = 1,
|
||||
queue = queue:new(),
|
||||
|
||||
@ -77,7 +82,7 @@ start_link(AST = #ast{}) ->
|
||||
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||
%% process to initialize.
|
||||
init([AST = #ast{modbus = Modbus = #modbus{error_log = ErrorLog, access_log = AccessLog}, devices = Devices}]) ->
|
||||
init([AST = #ast{modbus = Modbus = #modbus{error_log = ErrorLog, access_log = AccessLog}, devices = Devices, alarms = Alarms}]) ->
|
||||
lager:debug("[modbus_service] modbus is: ~p", [Modbus]),
|
||||
|
||||
%% 建立连接
|
||||
@ -93,11 +98,14 @@ init([AST = #ast{modbus = Modbus = #modbus{error_log = ErrorLog, access_log = Ac
|
||||
lager:debug("[modbus_service] devices pid: ~p", [DevicesMap]),
|
||||
|
||||
{ok, Pid1} = modbus_processor:start_link(self(), hd(AST#ast.processors)),
|
||||
{ok, Pid2} = modbus_alarm:start_link(self(), hd(AST#ast.alarms)),
|
||||
|
||||
AlarmPids = lists:map(fun(Alarm) ->
|
||||
{ok, AlarmPid} = modbus_alarm:start_link(self(), Alarm),
|
||||
AlarmPid
|
||||
end, Alarms),
|
||||
|
||||
{ok, ?DISCONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog),
|
||||
queue = queue:new(), packet_id = 1, devices_map = DevicesMap}}.
|
||||
queue = queue:new(), packet_id = 1, devices_map = DevicesMap, alarm_pids = AlarmPids}}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_statem when it needs to find out
|
||||
@ -155,6 +163,13 @@ handle_event(info, {Port, {data, <<?MODBUS_READ:8, PacketId:32, Val:32>>}}, ?CON
|
||||
|
||||
{keep_state, State#state{inflight = NInflight}};
|
||||
|
||||
%% 广播消息
|
||||
handle_event(info, {broadcast, Key, Val}, ?CONNECTED, State = #state{alarm_pids = AlarmPids}) ->
|
||||
%% 推送告警信息
|
||||
[modbus_alarm:push_var(AlarmPid, Key, Val) || AlarmPid <- AlarmPids],
|
||||
|
||||
{keep_state, State};
|
||||
|
||||
%% port退出
|
||||
handle_event(info, {Port, {exit_status, Code}}, ?CONNECTED, State = #state{mode = rtu, port = Port}) ->
|
||||
lager:debug("[modbus_service] port: ~p, exit with code: ~p", [Port, Code]),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user