From 67a83435176bf0646bc8ed02c8eaff99367e90ca Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 20 Jun 2025 23:06:18 +0800 Subject: [PATCH] fix --- apps/modbus/src/modbus_device.erl | 66 ++++++++++++++++++++++-------- apps/modbus/src/modbus_service.erl | 19 +++++---- 2 files changed, 60 insertions(+), 25 deletions(-) diff --git a/apps/modbus/src/modbus_device.erl b/apps/modbus/src/modbus_device.erl index a6789b8..97afc8c 100644 --- a/apps/modbus/src/modbus_device.erl +++ b/apps/modbus/src/modbus_device.erl @@ -20,7 +20,11 @@ -record(state, { parent_pid :: pid(), - device :: #modbus_device{} + device :: #modbus_device{}, + %% 重新建立 #{Name => Metric} + metrics = #{}, + %% #{Ref => MetricName} + inflight = #{} }). %%%=================================================================== @@ -42,17 +46,20 @@ start_link(ParentPid, Device = #modbus_device{}) when is_pid(ParentPid) -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([ParentPid, Device = #modbus_device{metrics = Metrics, poll_interval = PollInterval}]) -> +init([ParentPid, Device = #modbus_device{metrics = Metrics0, poll_interval = PollInterval}]) -> %% 处理采集项目 + Metrics = maps:from_list(lists:map(fun(Metric0 = #modbus_metric{name = Name}) -> {Name, Metric0} end, Metrics0)), - #modbus_metric{ - name = Name, - address = Address, - type = Type, - unit = Uint - } = hd(Metrics), - - {ok, #state{parent_pid = ParentPid, device = Device}}. + %% 初步启动的过程中按照step提交任务 + Len = length(Metrics0), + case Len > 0 of + true -> + Step = erlang:max(1, PollInterval div Len), + start_ticker(maps:keys(Metrics), Step); + false -> + ok + end, + {ok, #state{parent_pid = ParentPid, metrics = Metrics, device = Device}}. %% @private %% @doc Handling call messages @@ -82,16 +89,30 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({timeout, _, poll_ticker}, State = #state{parent_pid = ParentPid, +handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = ParentPid, inflight = Inflight, device = #modbus_device{slave_id = SlaveId, poll_interval = PollInterval, metrics = Metrics}}) -> - poll_ticker(PollInterval), + %% 开启下一次的循环 + poll_ticker(PollInterval, Name), + + #modbus_metric{address = Address} = maps:get(Name, Metrics), %% 读取采集项目 - lists:foreach(fun(#modbus_metric{address = Address}) -> - ParentPid ! {read, SlaveId, Address} - end, Metrics), + ReceiverPid = self(), + Ref = make_ref(), + ParentPid ! {read, ReceiverPid, Ref, SlaveId, Address}, + + {noreply, State#state{inflight = maps:put(Ref, Name, Inflight)}}; + +handle_info({request_reply, Ref, Val}, State = #state{inflight = Inflight, metrics = Metrics}) -> + case maps:take(Ref, Inflight) of + error -> + {noreply, State}; + {Name, NInflight} -> + #modbus_metric{} = maps:get(Name, Metrics), + lager:debug("[modbus_device] metric: ~p, get value: ~p", [Name, Val]), + {noreply, State#state{inflight = NInflight}} + end; - {noreply, State}; handle_info(_Info, State = #state{}) -> {noreply, State}. @@ -117,5 +138,14 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== -poll_ticker(PollInterval) when is_integer(PollInterval) -> - erlang:start_timer(PollInterval, self(), poll_ticker). \ No newline at end of file +%% 启动的时候按照step分散一下轮询的压力 +-spec start_ticker(list(), Step :: integer()) -> no_return(). +start_ticker([], _) -> + ok; +start_ticker([Name|T], Step) -> + poll_ticker(Step, Name), + start_ticker(T, Step + Step). + +-spec poll_ticker(PollInterval :: integer(), Name :: binary()) -> no_return(). +poll_ticker(PollInterval, Name) when is_integer(PollInterval), is_binary(Name) -> + erlang:start_timer(PollInterval, self(), {poll_ticker, Name}). \ No newline at end of file diff --git a/apps/modbus/src/modbus_service.erl b/apps/modbus/src/modbus_service.erl index aee81f1..55bfab9 100644 --- a/apps/modbus/src/modbus_service.erl +++ b/apps/modbus/src/modbus_service.erl @@ -90,7 +90,7 @@ init([AST = #ast{modbus = Modbus = #modbus{error_log = ErrorLog, access_log = Ac end, Devices), DevicesMap = maps:from_list(DevicesPids), - lager:debug("device pid: ~p", [DevicesMap]), + lager:debug("devices pid: ~p", [DevicesMap]), {ok, ?DISCONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog), queue = queue:new(), packet_id = 1, devices_map = DevicesMap}}. @@ -135,7 +135,7 @@ handle_event(info, {Port, {data, <>}}, ?CON erlang:is_process_alive(ReceiverPid) andalso ReceiverPid ! {request_reply, Ref, Val}, NMap end, - lager:debug("port data is: ~p", [{PacketId, Val}]), + lager:debug("[modbus_service] port data is: ~p", [{PacketId, Val}]), %% 读取下一条 erlang:start_timer(DelayMs, self(), read_next), @@ -143,25 +143,25 @@ handle_event(info, {Port, {data, <>}}, ?CON %% port退出 handle_event(info, {Port, {exit_status, Code}}, ?CONNECTED, State = #state{mode = rtu, port = Port}) -> - lager:debug("[efka_service] port: ~p, exit with code: ~p", [Port, Code]), + lager:debug("[modbus_service] port: ~p, exit with code: ~p", [Port, Code]), retry_connect(), {next_state, ?DISCONNECTED, State#state{port = undefined}}; %% 处理port的退出消息 handle_event(info, {'EXIT', Port, Reason}, ?CONNECTED, State = #state{mode = rtu, port = Port}) -> - lager:debug("[efka_service] port: ~p, exit with code: ~p", [Port, Reason]), + lager:debug("[modbus_service] port: ~p, exit with code: ~p", [Port, Reason]), retry_connect(), {next_state, ?DISCONNECTED, State#state{port = undefined}}; %% tcp退出 handle_event(info, {tcp_closed, Socket}, ?CONNECTED, State = #state{mode = tcp, socket = Socket}) -> - lager:debug("[efka_service] socket: ~p, exit with tcp_closed", [Socket]), + lager:debug("[modbus_service] socket: ~p, exit with tcp_closed", [Socket]), retry_connect(), {next_state, ?DISCONNECTED, State#state{socket = undefined}}; %% 处理tcp的退出消息 handle_event(info, {tcp_error, Socket, Reason}, ?CONNECTED, State = #state{mode = tcp, socket = Socket}) -> - lager:debug("[efka_service] socket: ~p, exit with code: ~p", [Socket, Reason]), + lager:debug("[modbus_service] socket: ~p, exit with code: ~p", [Socket, Reason]), retry_connect(), {next_state, ?DISCONNECTED, State#state{socket = undefined}}; @@ -207,6 +207,8 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== +-spec connect(Transport :: #modbus_transport_rtu{} | #modbus_transport_tcp{}) -> + {ok, {rtu, Port :: port(), DelayMs :: integer()}} | {ok, Socket :: gen_tcp:socket()} | {error, Reason :: any()}. %% databits为8位 connect(#modbus_transport_rtu{port = Port, baudrate = BaudRate, stopbits = StopBits, parity = Parity, timeout = Timeout}) -> RealExecCmd = "", @@ -229,18 +231,21 @@ connect(#modbus_transport_tcp{host = Host, port = Port, timeout = Timeout0}) -> gen_tcp:connect(Host, Port, [binary], Timeout). %% 计算3.5个字符的静默时间 +-spec frame_delay(BaudRate :: integer(), DataBits :: integer(), Parity :: integer(), StopBits :: integer()) -> integer(). frame_delay(BaudRate, DataBits, Parity, StopBits) when is_integer(BaudRate), is_integer(DataBits), is_integer(Parity), is_integer(StopBits) -> %% 计算1个字符的总位数 CharBits = 1 + DataBits + case Parity of 0 -> 0; _ -> 1 end + StopBits, %% 3.5字符时间(毫秒), 预留20%的时间 erlang:ceil((3500 * CharBits) / BaudRate * 1.2). +-spec retry_connect() -> no_return(). retry_connect() -> erlang:start_timer(5000, self(), modbus_connect). +-spec create_log_file(undefined | string()) -> undefined | pid(). create_log_file(undefined) -> undefined; -create_log_file(FileName) -> +create_log_file(FileName) when is_list(FileName) -> case modbus_logger:start_link(FileName) of {ok, FilePid} -> FilePid;