From 4fec7aec1ae05914058c49381a31b8b644ea6aba Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 25 Jun 2025 00:35:42 +0800 Subject: [PATCH] fix alarm --- apps/modbus/src/modbus_alarm.erl | 160 +++++++++++++++++++++++++++ apps/modbus/src/modbus_device.erl | 20 ++-- apps/modbus/src/modbus_parser.erl | 24 ++-- apps/modbus/src/modbus_processor.erl | 113 +++++++++++++++++++ apps/modbus/src/modbus_service.erl | 12 +- apps/modbus/src/modbus_util.erl | 8 +- 6 files changed, 311 insertions(+), 26 deletions(-) create mode 100644 apps/modbus/src/modbus_alarm.erl create mode 100644 apps/modbus/src/modbus_processor.erl diff --git a/apps/modbus/src/modbus_alarm.erl b/apps/modbus/src/modbus_alarm.erl new file mode 100644 index 0000000..4d304c2 --- /dev/null +++ b/apps/modbus/src/modbus_alarm.erl @@ -0,0 +1,160 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 24. 6月 2025 22:43 +%%%------------------------------------------------------------------- +-module(modbus_alarm). +-author("anlicheng"). +-include("modbus_ast.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/2]). +-export([eval_condition/1]). +-export([hold/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + parent_pid :: pid(), + alarm :: #modbus_alarm{}, + %% 存储历史值,用来做持续性判断, {timestamp, bool} + history = [] +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +push_vars(Pid, Vars) when is_pid(Pid), is_list(Vars) -> + gen_server:cast(Pid, {push_vars, Vars}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(ParentPid :: pid(), Alarm :: #modbus_alarm{}) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(ParentPid, Alarm) -> + gen_server:start_link(?MODULE, [ParentPid, Alarm], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([ParentPid, Alarm = #modbus_alarm{condition = Condition}]) -> + lager:debug("alarm is: ~p", [Alarm]), + case binary:split(Condition, <<".">>) of + [<<$$, DeviceName/binary>>, Metric] -> + lager:debug("device_name is: ~p, metric is: ~p", [DeviceName, Metric]); + _ -> + lager:debug("condition input: ~p", [Condition]) + end, + + {ok, #state{parent_pid = ParentPid, alarm = Alarm}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({push_vars, Vars}, State = #state{alarm = #modbus_alarm{condition = Condition, hold_time = HoldTime}, history = History}) -> + case execute(Vars, Condition) of + true -> + lager:debug("[push_vars] execute result is: true"); + false -> + lager:debug("[push_vars] execute result is: false") + end, + + + + {noreply, State}; +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec eval_condition(Condition :: binary() | string()) -> boolean(). +eval_condition(Condition) when is_binary(Condition) -> + eval_condition(binary_to_list(Condition)); +eval_condition(Condition) when is_list(Condition) -> + {ok, Tokens, _} = erl_scan:string(Condition), + {ok, ExprList} = erl_parse:parse_exprs(Tokens), + {value, Bool, _} = erl_eval:exprs(ExprList, []), + case is_boolean(Bool) of + true -> + Bool; + false -> + false + end. + +execute([], _) -> + execute; +execute([{Key, Val}|Tail], Condition) -> + case binary:match(Condition, Key) of + nomatch -> + execute(Tail, Condition); + _ -> + Expr0 = binary:replace(Condition, Key, Val, [global]), + Expr = iolist_to_binary(<>), + eval_condition(binary_to_list(Expr)) + end. + +hold(History, HoldTime) when is_list(History), is_integer(HoldTime) -> + PrefixItems = lists:takewhile(fun({_, Bool}) -> Bool end, History), + lager:debug("items: ~p", [PrefixItems]), + %% 判断是否存在一个元素的时间 + Current = modbus_util:current_seconds(), + lists:any(fun({T0, _}) -> T0 =< Current - HoldTime end, PrefixItems). \ No newline at end of file diff --git a/apps/modbus/src/modbus_device.erl b/apps/modbus/src/modbus_device.erl index a17c2ae..7175183 100644 --- a/apps/modbus/src/modbus_device.erl +++ b/apps/modbus/src/modbus_device.erl @@ -22,7 +22,7 @@ parent_pid :: pid(), device :: #modbus_device{}, %% 重新建立 #{Name => Metric} - metrics = #{}, + metrics_map = #{}, %% #{Ref => MetricName} inflight = #{} }). @@ -48,18 +48,18 @@ start_link(ParentPid, Device = #modbus_device{}) when is_pid(ParentPid) -> {stop, Reason :: term()} | ignore). init([ParentPid, Device = #modbus_device{metrics = Metrics0, poll_interval = PollInterval}]) -> %% 处理采集项目 - Metrics = maps:from_list(lists:map(fun(Metric0 = #modbus_metric{name = Name}) -> {Name, Metric0} end, Metrics0)), - + MetricsMap = maps:from_list(lists:map(fun(Metric0 = #modbus_metric{name = Name}) -> {Name, Metric0} end, Metrics0)), %% 初步启动的过程中按照step提交任务 Len = length(Metrics0), case Len > 0 of true -> Step = erlang:max(1, PollInterval div Len), - start_ticker(maps:keys(Metrics), Step); + lager:debug("[modbus_deivce] step is: ~p", [Step]), + start_ticker(maps:keys(MetricsMap), Step); false -> ok end, - {ok, #state{parent_pid = ParentPid, metrics = Metrics, device = Device}}. + {ok, #state{parent_pid = ParentPid, metrics_map = MetricsMap, device = Device}}. %% @private %% @doc Handling call messages @@ -89,13 +89,13 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = ParentPid, inflight = Inflight, - device = #modbus_device{slave_id = SlaveId, poll_interval = PollInterval, metrics = Metrics}}) -> +handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = ParentPid, inflight = Inflight, metrics_map = MetricsMap, + device = #modbus_device{slave_id = SlaveId, poll_interval = PollInterval}}) -> %% 开启下一次的循环 poll_ticker(PollInterval, Name), - #modbus_metric{address = Address} = maps:get(Name, Metrics), + #modbus_metric{address = Address} = maps:get(Name, MetricsMap), %% 读取采集项目 ReceiverPid = self(), Ref = make_ref(), @@ -103,12 +103,12 @@ handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = Paren {noreply, State#state{inflight = maps:put(Ref, Name, Inflight)}}; -handle_info({request_reply, Ref, Val}, State = #state{inflight = Inflight, metrics = Metrics}) -> +handle_info({request_reply, Ref, Val}, State = #state{inflight = Inflight, metrics_map = MetricsMap}) -> case maps:take(Ref, Inflight) of error -> {noreply, State}; {Name, NInflight} -> - #modbus_metric{} = maps:get(Name, Metrics), + #modbus_metric{} = maps:get(Name, MetricsMap), lager:debug("[modbus_device] metric: ~p, get value: ~p", [Name, Val]), {noreply, State#state{inflight = NInflight}} end; diff --git a/apps/modbus/src/modbus_parser.erl b/apps/modbus/src/modbus_parser.erl index 49cf9c2..54a41f7 100644 --- a/apps/modbus/src/modbus_parser.erl +++ b/apps/modbus/src/modbus_parser.erl @@ -155,8 +155,8 @@ parse_ast0(#block{ident = <<"modbus">>, props = Props}) -> MapProps = parse_ast1(Props), #modbus { transport = maps:get(<<"transport">>, MapProps, undefined), - access_log = map_of_string(<<"access_log">>, MapProps, undefined), - error_log = map_of_string(<<"error_log">>, MapProps, undefined) + access_log = map_of_binary(<<"access_log">>, MapProps, undefined), + error_log = map_of_binary(<<"error_log">>, MapProps, undefined) }; parse_ast0(#block{ident = <<"device", Name0/binary>>, props = Props}) -> MapProps = parse_ast1(Props), @@ -165,7 +165,7 @@ parse_ast0(#block{ident = <<"device", Name0/binary>>, props = Props}) -> slave_id = map_of_integer(<<"slave_id">>, MapProps, 0), model = maps:get(<<"model">>, MapProps, undefined), description = maps:get(<<"description">>, MapProps, undefined), - poll_interval = maps:get(<<"poll_interval">>, MapProps, undefined), + poll_interval = map_of_time(<<"poll_interval">>, MapProps, 0), retries = map_of_integer(<<"retries">>, MapProps, 0), retry_timeout = maps:get(<<"retry_timeout">>, MapProps, undefined), metrics = maps:get(<<"metrics">>, MapProps, undefined), @@ -175,15 +175,15 @@ parse_ast0(#block{ident = <<"processor", Name0/binary>>, props = Props}) -> MapProps = parse_ast1(Props), #modbus_processor { name = string:trim(Name0), - input = maps:get(<<"input">>, MapProps, undefined), + input = map_of_binary(<<"input">>, MapProps, <<"">>), transform = maps:get(<<"transform">>, MapProps, undefined) }; parse_ast0(#block{ident = <<"alarm", Name0/binary>>, props = Props}) -> MapProps = parse_ast1(Props, []), #modbus_alarm { name = string:trim(Name0), - condition = maps:get(<<"condition">>, MapProps, undefined), - hold_time = maps:get(<<"hold_time">>, MapProps, undefined), + condition = map_of_binary(<<"condition">>, MapProps, <<>>), + hold_time = map_of_time(<<"hold_time">>, MapProps, 0), actions = maps:get(<<"actions">>, MapProps, undefined), recovery_actions = maps:get(<<"recovery_actions">>, MapProps, undefined) }. @@ -211,8 +211,8 @@ parse_ast1([#block{ident = <<"metrics">>, props = Metrics0}|T], Acc) -> #modbus_metric{ name = MetricName, address = map_of_integer(<<"address">>, PropsMap, 0), - type = map_of_string(<<"type">>, PropsMap, ""), - unit = strip_quotes(map_of_string(<<"unit">>, PropsMap, "")) + type = map_of_binary(<<"type">>, PropsMap, <<"">>), + unit = strip_quotes(map_of_binary(<<"unit">>, PropsMap, <<"">>)) } end, Metrics0), parse_ast1(T, [{<<"metrics">>, Metrics}|Acc]); @@ -232,7 +232,7 @@ parse_ast1([#block{ident = <<"transport", Name0/binary>>, props = Props}|T], Acc }; <<"tcp">> -> #modbus_transport_tcp { - host = map_of_string(<<"host">>, PropsMap, undefined), + host = map_of_binary(<<"host">>, PropsMap, undefined), port = map_of_integer(<<"port">>, PropsMap, undefined), timeout = map_of_time(<<"timeout">>, PropsMap, undefined) } @@ -279,11 +279,11 @@ map_of_integer(Key, M, Def) -> Def end. -map_of_string(Key, M, Def) -> +map_of_binary(Key, M, Def) -> case maps:is_key(Key, M) of true -> [Val0] = maps:get(Key, M), - binary_to_list(Val0); + iolist_to_binary(Val0); false -> Def end. @@ -308,6 +308,8 @@ map_of_time(Key, M, Def) -> Def end. +strip_quotes(Str) when is_binary(Str) -> + list_to_binary(strip_quotes(binary_to_list(Str))); strip_quotes(Str) when is_list(Str) -> Str1 = string:trim(Str, leading, "\""), Str2 = string:trim(Str1, trailing, "\""), diff --git a/apps/modbus/src/modbus_processor.erl b/apps/modbus/src/modbus_processor.erl new file mode 100644 index 0000000..603b612 --- /dev/null +++ b/apps/modbus/src/modbus_processor.erl @@ -0,0 +1,113 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 24. 6月 2025 10:49 +%%%------------------------------------------------------------------- +-module(modbus_processor). +-author("anlicheng"). +-include("modbus_ast.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + parent_pid :: pid(), + processor :: #modbus_processor{} +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(ParentPid :: pid(), Processor :: #modbus_processor{}) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(ParentPid, Processor = #modbus_processor{}) when is_pid(ParentPid) -> + gen_server:start_link(?MODULE, [ParentPid, Processor], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([ParentPid, Processor = #modbus_processor{input = Input}]) -> + %% 处理input + lager:debug("processor is: ~p", [Processor]), + + case binary:split(Input, <<".">>) of + [<<$$, DeviceName/binary>>, Metric] -> + lager:debug("device_name is: ~p, metric is: ~p", [DeviceName, Metric]); + _ -> + lager:debug("invalid input: ~p", [Input]) + end, + + + + {ok, #state{parent_pid = ParentPid, processor = Processor}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/modbus/src/modbus_service.erl b/apps/modbus/src/modbus_service.erl index 1aa97a1..6c52c84 100644 --- a/apps/modbus/src/modbus_service.erl +++ b/apps/modbus/src/modbus_service.erl @@ -92,6 +92,10 @@ init([AST = #ast{modbus = Modbus = #modbus{error_log = ErrorLog, access_log = Ac lager:debug("[modbus_service] devices pid: ~p", [DevicesMap]), + {ok, Pid1} = modbus_processor:start_link(self(), hd(AST#ast.processors)), + {ok, Pid2} = modbus_alarm:start_link(self(), hd(AST#ast.alarms)), + + {ok, ?DISCONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog), queue = queue:new(), packet_id = 1, devices_map = DevicesMap}}. @@ -123,7 +127,7 @@ handle_event(info, {request, ReceiverPid, Ref, SlaveId, Address}, StateName, Sta {keep_state, State} end; -%% 读取下一个任务 +%% rtu 模式下读取下一个任务 handle_event(info, read_next, ?CONNECTED, State = #state{mode = rtu, queue = Q, port = Port, inflight = Inflight}) -> case queue:out(Q) of {{value, {PacketId, ReceiverPid, Ref, SlaveId, Address}}, Q2} -> @@ -133,6 +137,8 @@ handle_event(info, read_next, ?CONNECTED, State = #state{mode = rtu, queue = Q, {empty, Q1} -> {keep_state, State#state{queue = Q1, is_pending = true}} end; +handle_event(info, read_next, ?DISCONNECTED, State = #state{mode = rtu}) -> + {keep_state, State}; %% 从port读取数据, todo 获取到的数据表示为32位整数 handle_event(info, {Port, {data, <>}}, ?CONNECTED, State = #state{mode = rtu, port = Port, delay_ms = DelayMs, inflight = Inflight}) -> @@ -255,8 +261,8 @@ retry_connect() -> -spec create_log_file(undefined | string()) -> undefined | pid(). create_log_file(undefined) -> undefined; -create_log_file(FileName) when is_list(FileName) -> - case modbus_logger:start_link(FileName) of +create_log_file(FileName) when is_binary(FileName) -> + case modbus_logger:start_link(binary_to_list(FileName)) of {ok, FilePid} -> FilePid; _ -> diff --git a/apps/modbus/src/modbus_util.erl b/apps/modbus/src/modbus_util.erl index 9a5a30f..a5dd7c7 100644 --- a/apps/modbus/src/modbus_util.erl +++ b/apps/modbus/src/modbus_util.erl @@ -10,7 +10,7 @@ -author("anlicheng"). %% API --export([split/1, read_until/2]). +-export([split/1, read_until/2, current_seconds/0]). %% 按照空格分割字符串, 需要将引号里面的字符看成一个整体 -spec split(Bin :: binary()) -> [binary()]. @@ -38,4 +38,8 @@ read_until(Bin, Delim) when is_binary(Bin), is_binary(Delim) -> {binary:part(Bin, 0, Pos), binary:part(Bin, Pos + 1, byte_size(Bin) - Pos - 1)}; nomatch -> {Bin, <<>>} - end. \ No newline at end of file + end. + +current_seconds() -> + {Mega, Seconds, _Micro} = os:timestamp(), + Mega * 1000000 + Seconds. \ No newline at end of file