diff --git a/apps/modbus/src/modbus_alarm.erl b/apps/modbus/src/modbus_alarm.erl index 4d304c2..7ad8a3d 100644 --- a/apps/modbus/src/modbus_alarm.erl +++ b/apps/modbus/src/modbus_alarm.erl @@ -13,7 +13,7 @@ -behaviour(gen_server). %% API --export([start_link/2]). +-export([start_link/2, push_var/3]). -export([eval_condition/1]). -export([hold/2]). @@ -33,8 +33,9 @@ %%% API %%%=================================================================== -push_vars(Pid, Vars) when is_pid(Pid), is_list(Vars) -> - gen_server:cast(Pid, {push_vars, Vars}). +-spec push_var(Pid :: pid(), Key :: binary(), Val :: any()) -> no_return(). +push_var(Pid, Key, Val) when is_pid(Pid), is_binary(Key) -> + gen_server:cast(Pid, {push_var, Key, Val}). %% @doc Spawns the server and registers the local name (unique) -spec(start_link(ParentPid :: pid(), Alarm :: #modbus_alarm{}) -> @@ -81,17 +82,26 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({push_vars, Vars}, State = #state{alarm = #modbus_alarm{condition = Condition, hold_time = HoldTime}, history = History}) -> - case execute(Vars, Condition) of - true -> - lager:debug("[push_vars] execute result is: true"); - false -> - lager:debug("[push_vars] execute result is: false") - end, +handle_cast({push_var, Key, Val}, State = #state{alarm = #modbus_alarm{condition = Condition, hold_time = HoldTime}, history = History}) -> + case binary:match(Condition, Key) of + nomatch -> + {noreply, State}; + _ -> + Expr0 = binary:replace(Condition, Key, Val, [global]), + Expr = iolist_to_binary(<>), + IsSatisfied = eval_condition(binary_to_list(Expr)), + Timestamp = modbus_util:current_seconds(), + NHistory = [{Timestamp, IsSatisfied}|History], + case hold(NHistory, HoldTime) of + true -> + lager:warning("[push_var] hold will trigger"); + false -> + lager:debug("[push_var] IsSatisfied: ~p, hold will not trigger", [IsSatisfied]) + end, + {noreply, State#state{history = NHistory}} + end; - - {noreply, State}; handle_cast(_Request, State = #state{}) -> {noreply, State}. @@ -140,21 +150,10 @@ eval_condition(Condition) when is_list(Condition) -> false end. -execute([], _) -> - execute; -execute([{Key, Val}|Tail], Condition) -> - case binary:match(Condition, Key) of - nomatch -> - execute(Tail, Condition); - _ -> - Expr0 = binary:replace(Condition, Key, Val, [global]), - Expr = iolist_to_binary(<>), - eval_condition(binary_to_list(Expr)) - end. - +%% 用来做持续性判断 +-spec hold(History :: list(), HoldTime :: integer()) -> boolean(). hold(History, HoldTime) when is_list(History), is_integer(HoldTime) -> PrefixItems = lists:takewhile(fun({_, Bool}) -> Bool end, History), - lager:debug("items: ~p", [PrefixItems]), %% 判断是否存在一个元素的时间 Current = modbus_util:current_seconds(), lists:any(fun({T0, _}) -> T0 =< Current - HoldTime end, PrefixItems). \ No newline at end of file diff --git a/apps/modbus/src/modbus_device.erl b/apps/modbus/src/modbus_device.erl index 7175183..f3eb04a 100644 --- a/apps/modbus/src/modbus_device.erl +++ b/apps/modbus/src/modbus_device.erl @@ -103,13 +103,19 @@ handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = Paren {noreply, State#state{inflight = maps:put(Ref, Name, Inflight)}}; -handle_info({request_reply, Ref, Val}, State = #state{inflight = Inflight, metrics_map = MetricsMap}) -> +handle_info({request_reply, Ref, Val}, State = #state{parent_pid = ParentPid, device = #modbus_device{name = DeviceName}, inflight = Inflight, metrics_map = MetricsMap}) -> case maps:take(Ref, Inflight) of error -> {noreply, State}; - {Name, NInflight} -> - #modbus_metric{} = maps:get(Name, MetricsMap), - lager:debug("[modbus_device] metric: ~p, get value: ~p", [Name, Val]), + {MetricName, NInflight} -> + #modbus_metric{} = maps:get(MetricName, MetricsMap), + lager:debug("[modbus_device] metric_name: ~p, get value: ~p", [MetricName, Val]), + %% todo 还需要解决数据的上传问题 + + %% 解决值的广播问题 + Key = <<"$", DeviceName/binary, ".", MetricName/binary>>, + ParentPid ! {broadcast, Key, Val}, + {noreply, State#state{inflight = NInflight}} end; diff --git a/apps/modbus/src/modbus_service.erl b/apps/modbus/src/modbus_service.erl index 6c52c84..fd6d313 100644 --- a/apps/modbus/src/modbus_service.erl +++ b/apps/modbus/src/modbus_service.erl @@ -43,6 +43,11 @@ %% #{slaveId => DevicePid} devices_map = #{}, + %% 所有的处理过程 + processor_pids = [], + %% alarms + alarm_pids = [], + packet_id = 1, queue = queue:new(), @@ -77,7 +82,7 @@ start_link(AST = #ast{}) -> %% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. -init([AST = #ast{modbus = Modbus = #modbus{error_log = ErrorLog, access_log = AccessLog}, devices = Devices}]) -> +init([AST = #ast{modbus = Modbus = #modbus{error_log = ErrorLog, access_log = AccessLog}, devices = Devices, alarms = Alarms}]) -> lager:debug("[modbus_service] modbus is: ~p", [Modbus]), %% 建立连接 @@ -93,11 +98,14 @@ init([AST = #ast{modbus = Modbus = #modbus{error_log = ErrorLog, access_log = Ac lager:debug("[modbus_service] devices pid: ~p", [DevicesMap]), {ok, Pid1} = modbus_processor:start_link(self(), hd(AST#ast.processors)), - {ok, Pid2} = modbus_alarm:start_link(self(), hd(AST#ast.alarms)), + AlarmPids = lists:map(fun(Alarm) -> + {ok, AlarmPid} = modbus_alarm:start_link(self(), Alarm), + AlarmPid + end, Alarms), {ok, ?DISCONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog), - queue = queue:new(), packet_id = 1, devices_map = DevicesMap}}. + queue = queue:new(), packet_id = 1, devices_map = DevicesMap, alarm_pids = AlarmPids}}. %% @private %% @doc This function is called by a gen_statem when it needs to find out @@ -155,6 +163,13 @@ handle_event(info, {Port, {data, <>}}, ?CON {keep_state, State#state{inflight = NInflight}}; +%% 广播消息 +handle_event(info, {broadcast, Key, Val}, ?CONNECTED, State = #state{alarm_pids = AlarmPids}) -> + %% 推送告警信息 + [modbus_alarm:push_var(AlarmPid, Key, Val) || AlarmPid <- AlarmPids], + + {keep_state, State}; + %% port退出 handle_event(info, {Port, {exit_status, Code}}, ?CONNECTED, State = #state{mode = rtu, port = Port}) -> lager:debug("[modbus_service] port: ~p, exit with code: ~p", [Port, Code]),