From 94d7e5bcd87a628c05a35bc94761f1ce39e96fa5 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 20 Jun 2025 23:27:23 +0800 Subject: [PATCH] fix --- apps/modbus/src/modbus_device.erl | 2 +- apps/modbus/src/modbus_service.erl | 54 ++++++++++++++++++------------ 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/apps/modbus/src/modbus_device.erl b/apps/modbus/src/modbus_device.erl index 97afc8c..a17c2ae 100644 --- a/apps/modbus/src/modbus_device.erl +++ b/apps/modbus/src/modbus_device.erl @@ -99,7 +99,7 @@ handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = Paren %% 读取采集项目 ReceiverPid = self(), Ref = make_ref(), - ParentPid ! {read, ReceiverPid, Ref, SlaveId, Address}, + ParentPid ! {request, ReceiverPid, Ref, SlaveId, Address}, {noreply, State#state{inflight = maps:put(Ref, Name, Inflight)}}; diff --git a/apps/modbus/src/modbus_service.erl b/apps/modbus/src/modbus_service.erl index 55bfab9..1aa97a1 100644 --- a/apps/modbus/src/modbus_service.erl +++ b/apps/modbus/src/modbus_service.erl @@ -36,6 +36,7 @@ port :: erlang:port() | undefined, %% 延迟的毫秒数 delay_ms = 0, + is_pending = true :: boolean(), socket :: gen_tcp:socket() | undefined, @@ -44,7 +45,6 @@ packet_id = 1, queue = queue:new(), - is_pending = false :: boolean(), %% 未确认的消息 inflight = #{}, @@ -78,7 +78,7 @@ start_link(AST = #ast{}) -> %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. init([AST = #ast{modbus = Modbus = #modbus{error_log = ErrorLog, access_log = AccessLog}, devices = Devices}]) -> - lager:debug("modbus is: ~p", [Modbus]), + lager:debug("[modbus_service] modbus is: ~p", [Modbus]), %% 建立连接 erlang:start_timer(0, self(), modbus_connect), @@ -90,7 +90,7 @@ init([AST = #ast{modbus = Modbus = #modbus{error_log = ErrorLog, access_log = Ac end, Devices), DevicesMap = maps:from_list(DevicesPids), - lager:debug("devices pid: ~p", [DevicesMap]), + lager:debug("[modbus_service] devices pid: ~p", [DevicesMap]), {ok, ?DISCONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog), queue = queue:new(), packet_id = 1, devices_map = DevicesMap}}. @@ -106,34 +106,42 @@ callback_mode() -> %% gen_statem receives an event from call/2, cast/2, or as a normal %% process message, this function is called. -handle_event(info, {read, SlaveId, Address}, ?CONNECTED, State = #state{packet_id = PacketId, queue = Q, is_pending = IsPending}) -> - %% 基于队列处理, modbus不是全双工的模式 - NQ = queue:in({PacketId, SlaveId, Address}, Q), - Actions = case IsPending of - true -> [{next_event, info, read_next}]; - false -> [] - end, - {keep_state, State#state{queue = NQ, packet_id = PacketId + 1, is_pending = false}, Actions}; +%% 请求的时候必须先加入队列,modbus需要控制请求的频率 +handle_event(info, {request, ReceiverPid, Ref, SlaveId, Address}, StateName, State = #state{mode = rtu, is_pending = IsPending, packet_id = PacketId, queue = Q}) -> + case StateName of + ?CONNECTED -> + Actions = case IsPending of + true -> + [{next_event, info, read_next}]; + false -> + [] + end, + %% 基于队列处理, modbus不是全双工的模式 + NQ = queue:in({PacketId, ReceiverPid, Ref, SlaveId, Address}, Q), + {keep_state, State#state{queue = NQ, packet_id = PacketId + 1}, Actions}; + _ -> + {keep_state, State} + end; %% 读取下一个任务 -handle_event(info, read_next, ?CONNECTED, State = #state{queue = Q, port = Port}) -> +handle_event(info, read_next, ?CONNECTED, State = #state{mode = rtu, queue = Q, port = Port, inflight = Inflight}) -> case queue:out(Q) of - {{value, {PacketId, SlaveId, Address}}, Q2} -> + {{value, {PacketId, ReceiverPid, Ref, SlaveId, Address}}, Q2} -> ReadCmd = <>, Port ! {self(), {command, ReadCmd}}, - {keep_state, State#state{queue = Q2}}; + {keep_state, State#state{queue = Q2, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; {empty, Q1} -> - {keep_state, State#state{queue = Q1}} + {keep_state, State#state{queue = Q1, is_pending = true}} end; %% 从port读取数据, todo 获取到的数据表示为32位整数 -handle_event(info, {Port, {data, <>}}, ?CONNECTED, State = #state{port = Port, delay_ms = DelayMs, inflight = Inflight}) -> +handle_event(info, {Port, {data, <>}}, ?CONNECTED, State = #state{mode = rtu, port = Port, delay_ms = DelayMs, inflight = Inflight}) -> NInflight = case maps:take(PacketId, Inflight) of error -> Inflight; - {{ReceiverPid, Ref}, NMap} -> - erlang:is_process_alive(ReceiverPid) andalso ReceiverPid ! {request_reply, Ref, Val}, - NMap + {{ReceiverPid, Ref}, Inflight0} -> + erlang:is_process_alive(ReceiverPid) andalso erlang:send(ReceiverPid, {request_reply, Ref, Val}), + Inflight0 end, lager:debug("[modbus_service] port data is: ~p", [{PacketId, Val}]), %% 读取下一条 @@ -145,13 +153,13 @@ handle_event(info, {Port, {data, <>}}, ?CON handle_event(info, {Port, {exit_status, Code}}, ?CONNECTED, State = #state{mode = rtu, port = Port}) -> lager:debug("[modbus_service] port: ~p, exit with code: ~p", [Port, Code]), retry_connect(), - {next_state, ?DISCONNECTED, State#state{port = undefined}}; + {next_state, ?DISCONNECTED, State#state{port = undefined, is_pending = true}}; %% 处理port的退出消息 handle_event(info, {'EXIT', Port, Reason}, ?CONNECTED, State = #state{mode = rtu, port = Port}) -> lager:debug("[modbus_service] port: ~p, exit with code: ~p", [Port, Reason]), retry_connect(), - {next_state, ?DISCONNECTED, State#state{port = undefined}}; + {next_state, ?DISCONNECTED, State#state{port = undefined, is_pending = true}}; %% tcp退出 handle_event(info, {tcp_closed, Socket}, ?CONNECTED, State = #state{mode = tcp, socket = Socket}) -> @@ -175,7 +183,9 @@ handle_event(info, modbus_connect, ?DISCONNECTED, State = #state{ast = #ast{modb end, case ConnectResult of {rtu, Port, DelayMs} -> - {next_state, ?CONNECTED, State#state{mode = rtu, port = Port, delay_ms = DelayMs}, Actions}; + %% 触发读取下一条 + erlang:start_timer(0, self(), read_next), + {next_state, ?CONNECTED, State#state{mode = rtu, port = Port, delay_ms = DelayMs, is_pending = false}, Actions}; Socket -> {next_state, ?CONNECTED, State#state{mode = tcp, socket = Socket}, Actions} end;