diff --git a/apps/modbus/include/modbus_ast.hrl b/apps/modbus/include/modbus_ast.hrl index d885280..7bd27a7 100644 --- a/apps/modbus/include/modbus_ast.hrl +++ b/apps/modbus/include/modbus_ast.hrl @@ -62,9 +62,11 @@ retry_timeout :: integer(), %% 数据定义 - metrics = #{} :: map(), - - controls = #{} :: map() + metrics = [] :: list(), + %% 控制 + controls = [] :: list(), + % 告警信息 + alarms = [] :: list() }). -record(modbus_metric, { @@ -82,38 +84,21 @@ precision }). --record(modbus_processor, { - name :: binary(), - input :: binary(), - transform :: any(), - - output :: [] -}). - -record(modbus_alarm, { name :: binary(), condition :: binary(), - %% 持续判定 - hold_time :: binary(), - - %% 动作 - actions :: [], - - %% 恢复动作 - recovery_actions = [] + hold_time :: binary() }). -record(modbus_var, { address :: integer(), type :: atom(), scale = 1.0 :: float(), - unit :: binary() | undefined, - poll = true :: boolean() + unit :: binary() | undefined }). -record(ast, { modbus, - devices = [], - alarms = [] + devices = [] }). \ No newline at end of file diff --git a/apps/modbus/src/modbus_device.erl b/apps/modbus/src/modbus_device.erl index 12f8541..e029831 100644 --- a/apps/modbus/src/modbus_device.erl +++ b/apps/modbus/src/modbus_device.erl @@ -21,8 +21,6 @@ -record(state, { parent_pid :: pid(), device :: #modbus_device{}, - %% 重新建立 #{Name => Metric} - metrics_map = #{}, %% #{Ref => MetricName} inflight = #{} }). @@ -46,21 +44,19 @@ start_link(ParentPid, Device = #modbus_device{}) when is_pid(ParentPid) -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([ParentPid, Device = #modbus_device{metrics = Metrics0, poll_interval = PollInterval}]) -> +init([ParentPid, Device = #modbus_device{metrics = Metrics, poll_interval = PollInterval}]) -> %% 处理采集项目 - MetricsMap = maps:from_list(lists:map(fun(Metric0 = #modbus_metric{name = Name}) -> {Name, Metric0} end, Metrics0)), - lager:debug("[modbus_device] device metrics is: ~p", [MetricsMap]), %% 初步启动的过程中按照step提交任务 - Len = length(Metrics0), + Len = length(Metrics), case Len > 0 of true -> Step = erlang:max(1, PollInterval div Len), lager:debug("[modbus_deivce] step is: ~p", [Step]), - start_ticker(maps:keys(MetricsMap), Step); + start_ticker(Metrics, Step); false -> ok end, - {ok, #state{parent_pid = ParentPid, metrics_map = MetricsMap, device = Device}}. + {ok, #state{parent_pid = ParentPid, device = Device}}. %% @private %% @doc Handling call messages @@ -81,8 +77,20 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(_Request, State = #state{}) -> - {noreply, State}. +handle_cast({control, ControlName, Val0}, State = #state{parent_pid = ParentPid, device = #modbus_device{slave_id = SlaveId, controls = Controls}}) -> + case lists:search(fun(#modbus_control{name = Name0}) -> Name0 =:= ControlName end, Controls) of + {value, #modbus_control{address = Address, type = Type, precision = Precision}} -> + %% 读取采集项目 + ReceiverPid = self(), + Ref = make_ref(), + Cnt = get_register_count(Type), + Val = Val0 * Precision, + ParentPid ! {control_device, ReceiverPid, Ref, SlaveId, Address, Val, Cnt}, + + {noreply, State}; + false -> + {noreply, State} + end. %% @private %% @doc Handling all non call/cast messages @@ -90,27 +98,28 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = ParentPid, inflight = Inflight, metrics_map = MetricsMap, - device = #modbus_device{slave_id = SlaveId, poll_interval = PollInterval}}) -> +handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = ParentPid, inflight = Inflight, + device = #modbus_device{slave_id = SlaveId, metrics = Metrics, poll_interval = PollInterval}}) -> %% 开启下一次的循环 poll_ticker(PollInterval, Name), + #modbus_metric{address = Address, type = Type} = get_metric(Name, Metrics), - #modbus_metric{address = Address, type = Type} = maps:get(Name, MetricsMap), %% 读取采集项目 ReceiverPid = self(), Ref = make_ref(), Cnt = get_register_count(Type), - ParentPid ! {request, ReceiverPid, Ref, SlaveId, Address, Cnt}, + ParentPid ! {request_device, ReceiverPid, Ref, SlaveId, Address, Cnt}, {noreply, State#state{inflight = maps:put(Ref, Name, Inflight)}}; -handle_info({request_reply, Ref, Val}, State = #state{parent_pid = ParentPid, device = #modbus_device{name = DeviceName}, inflight = Inflight, metrics_map = MetricsMap}) -> +handle_info({request_reply, Ref, Val}, State = #state{parent_pid = ParentPid, device = #modbus_device{name = DeviceName, metrics = Metrics}, inflight = Inflight}) -> case maps:take(Ref, Inflight) of error -> {noreply, State}; {MetricName, NInflight} -> - #modbus_metric{name = MetricName, unit = Uint, type = Type, scale = Scale} = maps:get(MetricName, MetricsMap), + #modbus_metric{name = MetricName, unit = Uint, type = Type, scale = Scale} = get_metric(MetricName, Metrics), + lager:debug("[modbus_device] metric_name: ~p, get value: ~p", [MetricName, Val]), RealVal = parse_value(Val, Type, Scale), @@ -154,7 +163,7 @@ code_change(_OldVsn, State = #state{}, _Extra) -> -spec start_ticker(list(), Step :: integer()) -> no_return(). start_ticker([], _) -> ok; -start_ticker([Name|T], Step) -> +start_ticker([#modbus_metric{name = Name}|T], Step) -> poll_ticker(Step, Name), start_ticker(T, Step + Step). @@ -194,4 +203,9 @@ parse_value(<>, <<"uint32">>, Scale) -> parse_value(<>, <<"float32">>, Scale) -> Val * Scale; parse_value(<>, <<"float32">>, Scale) -> - Val * Scale. \ No newline at end of file + Val * Scale. + +-spec get_metric(Name :: binary(), Metrics :: [#modbus_metric{}]) -> Metric :: #modbus_metric{}. +get_metric(Name, Metrics) when is_list(Metrics) -> + {value, Metric} = lists:search(fun(#modbus_metric{name = Name0}) -> Name0 =:= Name end, Metrics), + Metric. \ No newline at end of file diff --git a/apps/modbus/src/modbus_service.erl b/apps/modbus/src/modbus_service.erl index b1964c2..181a4e9 100644 --- a/apps/modbus/src/modbus_service.erl +++ b/apps/modbus/src/modbus_service.erl @@ -13,7 +13,7 @@ -behaviour(gen_statem). %% API --export([start_link/1]). +-export([start_link/2]). -export([test/0, frame_delay/4]). %% gen_statem callbacks @@ -21,6 +21,7 @@ %% rtu指令 -define(MODBUS_READ, 16#01). +-define(MODBUS_CONTROL, 16#02). %% 当前的状态 -define(DISCONNECTED, disconnected). @@ -38,6 +39,9 @@ }). -record(state, { + %% 拥有者的pid + owner_pid :: pid(), + ast :: #ast{}, %% 连接模式 @@ -66,13 +70,21 @@ test() -> {ok, Config} = file:read_file("/usr/local/code/cloudkit/modbus/modbus.conf"), %lager:debug("config is: ~ts", [Config]), {ok, AST} = modbus_parser:parse(Config), - start_link(AST). + + Pid = spawn(fun() -> + receive + Info -> + lager:debug("[onwer] recv: ~p", [Info]) + end + end), + + start_link(Pid, AST). %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. -start_link(AST = #ast{}) -> - gen_statem:start_link(?MODULE, [AST], []). +start_link(OwnerPid, AST = #ast{}) when is_pid(OwnerPid) -> + gen_statem:start_link(?MODULE, [OwnerPid, AST], []). %%%=================================================================== %%% gen_statem callbacks @@ -82,7 +94,7 @@ start_link(AST = #ast{}) -> %% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. -init([AST = #ast{modbus = Modbus = #modbus{transport = Transport, error_log = ErrorLog, access_log = AccessLog}, devices = Devices, alarms = Alarms}]) -> +init([OwnerPid, AST = #ast{modbus = Modbus = #modbus{transport = Transport, error_log = ErrorLog, access_log = AccessLog}, devices = Devices, alarms = Alarms}]) -> %lager:debug("[modbus_service] ast is: ~p", [AST]), lager:debug("[modbus_service] devices is: ~p", [Devices]), @@ -93,11 +105,11 @@ init([AST = #ast{modbus = Modbus = #modbus{transport = Transport, error_log = Er case ConnectResult of {rtu, Port, DelayMs} -> - {ok, ?CONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog), + {ok, ?CONNECTED, #state{owner_pid = OwnerPid, ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog), mode = #rtu_mode{port = Port, delay_ms = DelayMs}, queue = queue:new(), packet_id = 1, devices_map = DevicesMap, alarm_pids = []}}; {tcp, Socket} -> - {ok, ?CONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog), + {ok, ?CONNECTED, #state{owner_pid = OwnerPid, ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog), mode = #tcp_mode{socket = Socket}, queue = queue:new(), packet_id = 1, devices_map = DevicesMap, alarm_pids = []}} end; @@ -105,7 +117,7 @@ init([AST = #ast{modbus = Modbus = #modbus{transport = Transport, error_log = Er lager:warning("[modbus_serivce] connect get error: ~p", [Reason]), %% 5秒后重试 retry_connect(), - {ok, ?DISCONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog), + {ok, ?DISCONNECTED, #state{owner_pid = OwnerPid, ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog), queue = queue:new(), packet_id = 1, devices_map = DevicesMap, alarm_pids = []}} end. @@ -121,11 +133,22 @@ callback_mode() -> %% process message, this function is called. %% 请求的时候必须先加入队列,modbus需要控制请求的频率 -handle_event(info, {request, ReceiverPid, Ref, SlaveId, Address, Cnt}, StateName, State = #state{mode = #rtu_mode{}, packet_id = PacketId, queue = Q}) -> +handle_event(info, {request_device, ReceiverPid, Ref, SlaveId, Address, Cnt}, StateName, State = #state{mode = #rtu_mode{}, packet_id = PacketId, queue = Q}) -> case StateName of ?CONNECTED -> %% 基于队列处理, modbus不是全双工的模式 - NQ = queue:in({PacketId, ReceiverPid, Ref, SlaveId, Address, Cnt}, Q), + NQ = queue:in({request, PacketId, ReceiverPid, Ref, SlaveId, Address, Cnt}, Q), + {keep_state, State#state{queue = NQ, packet_id = PacketId + 1}, [{next_event, info, read_next}]}; + _ -> + {keep_state, State} + end; + +%% 请求的时候必须先加入队列,modbus需要控制请求的频率 +handle_event(info, {control_device, ReceiverPid, Ref, SlaveId, Address, Val, Cnt}, StateName, State = #state{mode = #rtu_mode{}, packet_id = PacketId, queue = Q}) -> + case StateName of + ?CONNECTED -> + %% 基于队列处理, modbus不是全双工的模式 + NQ = queue:in({control, PacketId, ReceiverPid, Ref, SlaveId, Address, Val, Cnt}, Q), {keep_state, State#state{queue = NQ, packet_id = PacketId + 1}, [{next_event, info, read_next}]}; _ -> {keep_state, State} @@ -136,12 +159,18 @@ handle_event(info, read_next, ?CONNECTED, State = #state{mode = #rtu_mode{is_bus {keep_state, State}; handle_event(info, read_next, ?CONNECTED, State = #state{mode = Mode = #rtu_mode{port = Port}, queue = Q, inflight = Inflight}) -> lager:debug("[modbus_service] read next, q: ~p, port is: ~p", [queue:to_list(Q), Port]), - case queue:out(Q) of - {{value, {PacketId, ReceiverPid, Ref, SlaveId, Address, Cnt}}, Q2} -> - ReadCmd = <>, - Port ! {self(), {command, ReadCmd}}, - {keep_state, State#state{queue = Q2, mode = Mode#rtu_mode{is_busy = true}, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; + {{value, Item}, Q2} -> + case Item of + {request, PacketId, ReceiverPid, Ref, SlaveId, Address, Cnt} -> + ReadCmd = <>, + Port ! {self(), {command, ReadCmd}}, + {keep_state, State#state{queue = Q2, mode = Mode#rtu_mode{is_busy = true}, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; + {control, PacketId, ReceiverPid, Ref, SlaveId, Address, Val, Cnt} -> + ReadCmd = <>, + Port ! {self(), {command, ReadCmd}}, + {keep_state, State#state{queue = Q2, mode = Mode#rtu_mode{is_busy = true}, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}} + end; {empty, Q1} -> {keep_state, State#state{queue = Q1}} end; @@ -172,9 +201,15 @@ handle_event(info, {timeout, _, delay_locking}, ?CONNECTED, State = #state{mode lager:debug("[modbus_service] delay unlock trigger next"), {keep_state, State#state{mode = Mode#rtu_mode{is_busy = false}}, [{next_event, info, read_next}]}; -%% todo 数据上报 -handle_event(info, {device_report, DeviceName, Name, Value, Uint}, ?CONNECTED, State) -> - lager:debug("[modbus_service] get report_data device_name: ~p, name: ~p, value: ~p, uint: ~p", [DeviceName, Name, Value, Uint]), +%% 数据上报 +handle_event(info, {device_report, DeviceName, MetricName, Value, Uint}, ?CONNECTED, State = #state{owner_pid = OwnerPid}) -> + lager:debug("[modbus_service] get report_data device_name: ~p, name: ~p, value: ~p, uint: ~p", [DeviceName, MetricName, Value, Uint]), + OwnerPid ! {metric_data, DeviceName, MetricName, Value, Uint}, + {keep_state, State}; + +handle_event(info, {device_alarm, AlarmName, HoldTime, Val}, ?CONNECTED, State = #state{owner_pid = OwnerPid}) -> + lager:debug("[modbus_service] get alarm device_name: ~p, name: ~p, value: ~p, uint: ~p", [DeviceName, MetricName, Value, Uint]), + % OwnerPid ! {metric_data, DeviceName, MetricName, Value, Uint}, {keep_state, State}; %% 广播消息 diff --git a/modbus.conf b/modbus.conf index 7dbf2c0..5ac076c 100644 --- a/modbus.conf +++ b/modbus.conf @@ -66,6 +66,17 @@ device_io t1 { } } + ## 告警信息 + alarms { + high_temperature { + # 触发条件 + condition $temperature > 90.0; + + # 持续判定 + hold_time 30s; + } + } + } device boiler_controller { @@ -126,18 +137,6 @@ device xyz { scale 0.01; unit "kPa"; } - - # 状态位(线圈) - #alarm_status { - # address 00001; - # type bool; - # bits { - # 0 "overheat"; - # 1 "low_pressure"; - # 2 "pump_failure"; - # } - # poll on; - #} } # 写入控制 @@ -158,12 +157,4 @@ device xyz { precision 0.1; } } -} - -alarm high_temperature { - # 触发条件 - condition $boiler_controller.temperature > 90.0; - - # 持续判定 - hold_time 30s; } \ No newline at end of file