This commit is contained in:
anlicheng 2025-06-20 23:06:18 +08:00
parent 18723bdb0d
commit 67a8343517
2 changed files with 60 additions and 25 deletions

View File

@ -20,7 +20,11 @@
-record(state, {
parent_pid :: pid(),
device :: #modbus_device{}
device :: #modbus_device{},
%% #{Name => Metric}
metrics = #{},
%% #{Ref => MetricName}
inflight = #{}
}).
%%%===================================================================
@ -42,17 +46,20 @@ start_link(ParentPid, Device = #modbus_device{}) when is_pid(ParentPid) ->
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([ParentPid, Device = #modbus_device{metrics = Metrics, poll_interval = PollInterval}]) ->
init([ParentPid, Device = #modbus_device{metrics = Metrics0, poll_interval = PollInterval}]) ->
%%
Metrics = maps:from_list(lists:map(fun(Metric0 = #modbus_metric{name = Name}) -> {Name, Metric0} end, Metrics0)),
#modbus_metric{
name = Name,
address = Address,
type = Type,
unit = Uint
} = hd(Metrics),
{ok, #state{parent_pid = ParentPid, device = Device}}.
%% step提交任务
Len = length(Metrics0),
case Len > 0 of
true ->
Step = erlang:max(1, PollInterval div Len),
start_ticker(maps:keys(Metrics), Step);
false ->
ok
end,
{ok, #state{parent_pid = ParentPid, metrics = Metrics, device = Device}}.
%% @private
%% @doc Handling call messages
@ -82,16 +89,30 @@ handle_cast(_Request, State = #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, poll_ticker}, State = #state{parent_pid = ParentPid,
handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = ParentPid, inflight = Inflight,
device = #modbus_device{slave_id = SlaveId, poll_interval = PollInterval, metrics = Metrics}}) ->
poll_ticker(PollInterval),
%%
poll_ticker(PollInterval, Name),
#modbus_metric{address = Address} = maps:get(Name, Metrics),
%%
lists:foreach(fun(#modbus_metric{address = Address}) ->
ParentPid ! {read, SlaveId, Address}
end, Metrics),
ReceiverPid = self(),
Ref = make_ref(),
ParentPid ! {read, ReceiverPid, Ref, SlaveId, Address},
{noreply, State#state{inflight = maps:put(Ref, Name, Inflight)}};
handle_info({request_reply, Ref, Val}, State = #state{inflight = Inflight, metrics = Metrics}) ->
case maps:take(Ref, Inflight) of
error ->
{noreply, State};
{Name, NInflight} ->
#modbus_metric{} = maps:get(Name, Metrics),
lager:debug("[modbus_device] metric: ~p, get value: ~p", [Name, Val]),
{noreply, State#state{inflight = NInflight}}
end;
{noreply, State};
handle_info(_Info, State = #state{}) ->
{noreply, State}.
@ -117,5 +138,14 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%% Internal functions
%%%===================================================================
poll_ticker(PollInterval) when is_integer(PollInterval) ->
erlang:start_timer(PollInterval, self(), poll_ticker).
%% step分散一下轮询的压力
-spec start_ticker(list(), Step :: integer()) -> no_return().
start_ticker([], _) ->
ok;
start_ticker([Name|T], Step) ->
poll_ticker(Step, Name),
start_ticker(T, Step + Step).
-spec poll_ticker(PollInterval :: integer(), Name :: binary()) -> no_return().
poll_ticker(PollInterval, Name) when is_integer(PollInterval), is_binary(Name) ->
erlang:start_timer(PollInterval, self(), {poll_ticker, Name}).

View File

@ -90,7 +90,7 @@ init([AST = #ast{modbus = Modbus = #modbus{error_log = ErrorLog, access_log = Ac
end, Devices),
DevicesMap = maps:from_list(DevicesPids),
lager:debug("device pid: ~p", [DevicesMap]),
lager:debug("devices pid: ~p", [DevicesMap]),
{ok, ?DISCONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog),
queue = queue:new(), packet_id = 1, devices_map = DevicesMap}}.
@ -135,7 +135,7 @@ handle_event(info, {Port, {data, <<?MODBUS_READ:8, PacketId:32, Val:32>>}}, ?CON
erlang:is_process_alive(ReceiverPid) andalso ReceiverPid ! {request_reply, Ref, Val},
NMap
end,
lager:debug("port data is: ~p", [{PacketId, Val}]),
lager:debug("[modbus_service] port data is: ~p", [{PacketId, Val}]),
%%
erlang:start_timer(DelayMs, self(), read_next),
@ -143,25 +143,25 @@ handle_event(info, {Port, {data, <<?MODBUS_READ:8, PacketId:32, Val:32>>}}, ?CON
%% port退出
handle_event(info, {Port, {exit_status, Code}}, ?CONNECTED, State = #state{mode = rtu, port = Port}) ->
lager:debug("[efka_service] port: ~p, exit with code: ~p", [Port, Code]),
lager:debug("[modbus_service] port: ~p, exit with code: ~p", [Port, Code]),
retry_connect(),
{next_state, ?DISCONNECTED, State#state{port = undefined}};
%% port的退出消息
handle_event(info, {'EXIT', Port, Reason}, ?CONNECTED, State = #state{mode = rtu, port = Port}) ->
lager:debug("[efka_service] port: ~p, exit with code: ~p", [Port, Reason]),
lager:debug("[modbus_service] port: ~p, exit with code: ~p", [Port, Reason]),
retry_connect(),
{next_state, ?DISCONNECTED, State#state{port = undefined}};
%% tcp退出
handle_event(info, {tcp_closed, Socket}, ?CONNECTED, State = #state{mode = tcp, socket = Socket}) ->
lager:debug("[efka_service] socket: ~p, exit with tcp_closed", [Socket]),
lager:debug("[modbus_service] socket: ~p, exit with tcp_closed", [Socket]),
retry_connect(),
{next_state, ?DISCONNECTED, State#state{socket = undefined}};
%% tcp的退出消息
handle_event(info, {tcp_error, Socket, Reason}, ?CONNECTED, State = #state{mode = tcp, socket = Socket}) ->
lager:debug("[efka_service] socket: ~p, exit with code: ~p", [Socket, Reason]),
lager:debug("[modbus_service] socket: ~p, exit with code: ~p", [Socket, Reason]),
retry_connect(),
{next_state, ?DISCONNECTED, State#state{socket = undefined}};
@ -207,6 +207,8 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
%%% Internal functions
%%%===================================================================
-spec connect(Transport :: #modbus_transport_rtu{} | #modbus_transport_tcp{}) ->
{ok, {rtu, Port :: port(), DelayMs :: integer()}} | {ok, Socket :: gen_tcp:socket()} | {error, Reason :: any()}.
%% databits为8位
connect(#modbus_transport_rtu{port = Port, baudrate = BaudRate, stopbits = StopBits, parity = Parity, timeout = Timeout}) ->
RealExecCmd = "",
@ -229,18 +231,21 @@ connect(#modbus_transport_tcp{host = Host, port = Port, timeout = Timeout0}) ->
gen_tcp:connect(Host, Port, [binary], Timeout).
%% 3.5
-spec frame_delay(BaudRate :: integer(), DataBits :: integer(), Parity :: integer(), StopBits :: integer()) -> integer().
frame_delay(BaudRate, DataBits, Parity, StopBits) when is_integer(BaudRate), is_integer(DataBits), is_integer(Parity), is_integer(StopBits) ->
%% 1
CharBits = 1 + DataBits + case Parity of 0 -> 0; _ -> 1 end + StopBits,
%% 3.5), 20%
erlang:ceil((3500 * CharBits) / BaudRate * 1.2).
-spec retry_connect() -> no_return().
retry_connect() ->
erlang:start_timer(5000, self(), modbus_connect).
-spec create_log_file(undefined | string()) -> undefined | pid().
create_log_file(undefined) ->
undefined;
create_log_file(FileName) ->
create_log_file(FileName) when is_list(FileName) ->
case modbus_logger:start_link(FileName) of
{ok, FilePid} ->
FilePid;