From f6cf5647800b9582145b55870842ecd4a6ae122a Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 2 Jul 2025 11:24:33 +0800 Subject: [PATCH] fix modbus_device --- apps/modbus/src/modbus_device.erl | 4 +- apps/modbus/src/modbus_service.erl | 98 ++++++++++++++++++------------ 2 files changed, 60 insertions(+), 42 deletions(-) diff --git a/apps/modbus/src/modbus_device.erl b/apps/modbus/src/modbus_device.erl index b67c171..a290ade 100644 --- a/apps/modbus/src/modbus_device.erl +++ b/apps/modbus/src/modbus_device.erl @@ -47,11 +47,9 @@ start_link(ParentPid, Device = #modbus_device{}) when is_pid(ParentPid) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([ParentPid, Device = #modbus_device{metrics = Metrics0, poll_interval = PollInterval}]) -> - - lager:warning("device metrics is: ~p", [Metrics0]), - %% 处理采集项目 MetricsMap = maps:from_list(lists:map(fun(Metric0 = #modbus_metric{name = Name}) -> {Name, Metric0} end, Metrics0)), + lager:debug("[modbus_device] device metrics is: ~p", [MetricsMap]), %% 初步启动的过程中按照step提交任务 Len = length(Metrics0), case Len > 0 of diff --git a/apps/modbus/src/modbus_service.erl b/apps/modbus/src/modbus_service.erl index 693ee6c..d0c4015 100644 --- a/apps/modbus/src/modbus_service.erl +++ b/apps/modbus/src/modbus_service.erl @@ -27,15 +27,21 @@ -define(DISCONNECTED, disconnected). -define(CONNECTED, connected). +-record(tcp_mode, { + socket :: gen_tcp:socket() +}). + +-record(rtu_mode, { + port :: erlang:port(), + delay_ms = 0 :: integer() +}). + -record(state, { ast :: #ast{}, %% 连接模式 - mode :: rtu | tcp, + mode :: #rtu_mode{} | #tcp_mode{} | undefined, - port :: erlang:port() | undefined, - %% 延迟的毫秒数 - delay_ms = 0, is_pending = true :: boolean(), socket :: gen_tcp:socket() | undefined, @@ -82,33 +88,33 @@ start_link(AST = #ast{}) -> %% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. -init([AST = #ast{modbus = Modbus = #modbus{error_log = ErrorLog, access_log = AccessLog}, device_ios = IOs, devices = Devices, alarms = Alarms}]) -> +init([AST = #ast{modbus = Modbus = #modbus{transport = Transport, error_log = ErrorLog, access_log = AccessLog}, device_ios = IOs, devices = Devices, alarms = Alarms}]) -> %lager:debug("[modbus_service] ast is: ~p", [AST]), lager:debug("[modbus_service] io is: ~p", [IOs]), %lager:debug("[modbus_service] devices is: ~p", [Devices]), + DevicesMap = start_devices(Devices), %% 建立连接 - erlang:start_timer(0, self(), modbus_connect), + case connect(Transport) of + {ok, ConnectResult} -> + case ConnectResult of + {rtu, Port, DelayMs} -> - %% 启动设备相关的进程 - DevicesPids = lists:map(fun(Device = #modbus_device{slave_id = SlaveId}) -> - lager:debug("merge result is: ~p", [merge_io(Device, IOs)]), - {ok, DevicePid} = modbus_device:start_link(self(), merge_io(Device, IOs)), - {SlaveId, DevicePid} - end, Devices), - DevicesMap = maps:from_list(DevicesPids), - - lager:debug("[modbus_service] devices pid: ~p", [DevicesMap]), - - {ok, Pid1} = modbus_processor:start_link(self(), hd(AST#ast.processors)), - - AlarmPids = lists:map(fun(Alarm) -> - {ok, AlarmPid} = modbus_alarm:start_link(self(), Alarm), - AlarmPid - end, Alarms), - - {ok, ?DISCONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog), - queue = queue:new(), packet_id = 1, devices_map = DevicesMap, alarm_pids = AlarmPids}}. + {ok, ?CONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog), + mode = #rtu_mode{port = Port, delay_ms = DelayMs}, is_pending = false, + queue = queue:new(), packet_id = 1, devices_map = DevicesMap, alarm_pids = []}}; + Socket -> + {ok, ?CONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog), + mode = #tcp_mode{socket = Socket}, is_pending = false, + queue = queue:new(), packet_id = 1, devices_map = DevicesMap, alarm_pids = []}} + end; + {error, Reason} -> + lager:warning("[modbus_serivce] connect get error: ~p", [Reason]), + %% 5秒后重试 + retry_connect(), + {ok, ?DISCONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog), + queue = queue:new(), packet_id = 1, devices_map = DevicesMap, alarm_pids = []}} + end. %% @private %% @doc This function is called by a gen_statem when it needs to find out @@ -139,7 +145,9 @@ handle_event(info, {request, ReceiverPid, Ref, SlaveId, Address}, StateName, Sta end; %% rtu 模式下读取下一个任务 -handle_event(info, read_next, ?CONNECTED, State = #state{mode = rtu, queue = Q, port = Port, inflight = Inflight}) -> +handle_event(info, read_next, ?CONNECTED, State = #state{mode = #rtu_mode{port = Port}, queue = Q, inflight = Inflight}) -> + lager:debug("[modbus_service] read next, q: ~p, port is: ~p", [queue:to_list(Q), Port]), + case queue:out(Q) of {{value, {PacketId, ReceiverPid, Ref, SlaveId, Address}}, Q2} -> ReadCmd = <>, @@ -152,7 +160,7 @@ handle_event(info, read_next, ?DISCONNECTED, State = #state{mode = rtu}) -> {keep_state, State}; %% 从port读取数据, todo 获取到的数据表示为32位整数 -handle_event(info, {Port, {data, <>}}, ?CONNECTED, State = #state{mode = rtu, port = Port, delay_ms = DelayMs, inflight = Inflight}) -> +handle_event(info, {Port, {data, <>}}, ?CONNECTED, State = #state{mode = #rtu_mode{port = Port, delay_ms = DelayMs}, inflight = Inflight}) -> NInflight = case maps:take(PacketId, Inflight) of error -> Inflight; @@ -174,16 +182,16 @@ handle_event(info, {broadcast, Key, Val}, ?CONNECTED, State = #state{alarm_pids {keep_state, State}; %% port退出 -handle_event(info, {Port, {exit_status, Code}}, ?CONNECTED, State = #state{mode = rtu, port = Port}) -> +handle_event(info, {Port, {exit_status, Code}}, ?CONNECTED, State = #state{mode = #rtu_mode{port = Port}}) -> lager:debug("[modbus_service] port: ~p, exit with code: ~p", [Port, Code]), retry_connect(), - {next_state, ?DISCONNECTED, State#state{port = undefined, is_pending = true}}; + {next_state, ?DISCONNECTED, State#state{mode = undefined, is_pending = true}}; %% 处理port的退出消息 -handle_event(info, {'EXIT', Port, Reason}, ?CONNECTED, State = #state{mode = rtu, port = Port}) -> +handle_event(info, {'EXIT', Port, Reason}, ?CONNECTED, State = #state{mode = #rtu_mode{port = Port}}) -> lager:debug("[modbus_service] port: ~p, exit with code: ~p", [Port, Reason]), retry_connect(), - {next_state, ?DISCONNECTED, State#state{port = undefined, is_pending = true}}; + {next_state, ?DISCONNECTED, State#state{mode = undefined, is_pending = true}}; %% tcp退出 handle_event(info, {tcp_closed, Socket}, ?CONNECTED, State = #state{mode = tcp, socket = Socket}) -> @@ -202,17 +210,16 @@ handle_event(info, {timeout, _Ref, modbus_connect}, ?DISCONNECTED, State = #stat lager:debug("call me will connect, transport: ~p", [Transport]), case connect(Transport) of {ok, ConnectResult} -> + %% 根据条件判断是否读取下一条数据 Actions = case not queue:is_empty(Q) of true -> [{next_event, info, read_next}]; false -> [] end, case ConnectResult of {rtu, Port, DelayMs} -> - %% todo 触发读取下一条 - % erlang:start_timer(0, self(), read_next), - {next_state, ?CONNECTED, State#state{mode = rtu, port = Port, delay_ms = DelayMs, is_pending = false}, Actions}; + {next_state, ?CONNECTED, State#state{mode = #rtu_mode{port = Port, delay_ms = DelayMs}, is_pending = false}, Actions}; Socket -> - {next_state, ?CONNECTED, State#state{mode = tcp, socket = Socket}, Actions} + {next_state, ?CONNECTED, State#state{mode = #tcp_mode{socket = Socket}}, Actions} end; {error, Reason} -> lager:warning("[modbus_serivce] connect get error: ~p", [Reason]), @@ -238,12 +245,27 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== +-spec start_devices(Devices :: list()) -> map(). +start_devices(Devices) when is_list(Devices) -> + %% 启动设备相关的进程 + DevicesPids = lists:map(fun(Device = #modbus_device{slave_id = SlaveId}) -> + {ok, DevicePid} = modbus_device:start_link(self(), Device), + {SlaveId, DevicePid} + end, Devices), + maps:from_list(DevicesPids). + +%% 启动告警 +-spec start_alarms(Alarms :: list()) -> [pid()]. +start_alarms(Alarms) when is_list(Alarms) -> + lists:map(fun(Alarm) -> + {ok, AlarmPid} = modbus_alarm:start_link(self(), Alarm), + AlarmPid + end, Alarms). + -spec connect(Transport :: #modbus_transport_rtu{} | #modbus_transport_tcp{}) -> {ok, {rtu, Port :: port(), DelayMs :: integer()}} | {ok, Socket :: gen_tcp:socket()} | {error, Reason :: any()}. %% databits为8位 connect(#modbus_transport_rtu{port = SerialPort, baudrate = BaudRate, stopbits = StopBits, parity = Parity0, timeout = Timeout}) -> - lager:warning("call me here connect, serial_port: ~p", [SerialPort]), - %% 奇偶校验 Parity = case Parity0 of <<"none">> -> 0; @@ -260,9 +282,7 @@ connect(#modbus_transport_rtu{port = SerialPort, baudrate = BaudRate, stopbits = "--timeout " ++ integer_to_list(Timeout) ], Port = erlang:open_port({spawn_executable, RealExecCmd}, [binary, {packet, 2}, exit_status, {args, Args}]), - lager:debug("args: ~p", [Args]), DelayMs = frame_delay(BaudRate, 8, Parity, StopBits), - lager:debug("port is: ~p", [Port]), %% 建立连接 {ok, {rtu, Port, DelayMs}}; connect(#modbus_transport_tcp{host = Host, port = Port, timeout = Timeout0}) ->