fix modbus_device

This commit is contained in:
anlicheng 2025-07-02 11:24:33 +08:00
parent 9c38133b1f
commit f6cf564780
2 changed files with 60 additions and 42 deletions

View File

@ -47,11 +47,9 @@ start_link(ParentPid, Device = #modbus_device{}) when is_pid(ParentPid) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([ParentPid, Device = #modbus_device{metrics = Metrics0, poll_interval = PollInterval}]) ->
lager:warning("device metrics is: ~p", [Metrics0]),
%%
MetricsMap = maps:from_list(lists:map(fun(Metric0 = #modbus_metric{name = Name}) -> {Name, Metric0} end, Metrics0)),
lager:debug("[modbus_device] device metrics is: ~p", [MetricsMap]),
%% step提交任务
Len = length(Metrics0),
case Len > 0 of

View File

@ -27,15 +27,21 @@
-define(DISCONNECTED, disconnected).
-define(CONNECTED, connected).
-record(tcp_mode, {
socket :: gen_tcp:socket()
}).
-record(rtu_mode, {
port :: erlang:port(),
delay_ms = 0 :: integer()
}).
-record(state, {
ast :: #ast{},
%%
mode :: rtu | tcp,
mode :: #rtu_mode{} | #tcp_mode{} | undefined,
port :: erlang:port() | undefined,
%%
delay_ms = 0,
is_pending = true :: boolean(),
socket :: gen_tcp:socket() | undefined,
@ -82,33 +88,33 @@ start_link(AST = #ast{}) ->
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([AST = #ast{modbus = Modbus = #modbus{error_log = ErrorLog, access_log = AccessLog}, device_ios = IOs, devices = Devices, alarms = Alarms}]) ->
init([AST = #ast{modbus = Modbus = #modbus{transport = Transport, error_log = ErrorLog, access_log = AccessLog}, device_ios = IOs, devices = Devices, alarms = Alarms}]) ->
%lager:debug("[modbus_service] ast is: ~p", [AST]),
lager:debug("[modbus_service] io is: ~p", [IOs]),
%lager:debug("[modbus_service] devices is: ~p", [Devices]),
DevicesMap = start_devices(Devices),
%%
erlang:start_timer(0, self(), modbus_connect),
case connect(Transport) of
{ok, ConnectResult} ->
case ConnectResult of
{rtu, Port, DelayMs} ->
%%
DevicesPids = lists:map(fun(Device = #modbus_device{slave_id = SlaveId}) ->
lager:debug("merge result is: ~p", [merge_io(Device, IOs)]),
{ok, DevicePid} = modbus_device:start_link(self(), merge_io(Device, IOs)),
{SlaveId, DevicePid}
end, Devices),
DevicesMap = maps:from_list(DevicesPids),
lager:debug("[modbus_service] devices pid: ~p", [DevicesMap]),
{ok, Pid1} = modbus_processor:start_link(self(), hd(AST#ast.processors)),
AlarmPids = lists:map(fun(Alarm) ->
{ok, AlarmPid} = modbus_alarm:start_link(self(), Alarm),
AlarmPid
end, Alarms),
{ok, ?DISCONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog),
queue = queue:new(), packet_id = 1, devices_map = DevicesMap, alarm_pids = AlarmPids}}.
{ok, ?CONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog),
mode = #rtu_mode{port = Port, delay_ms = DelayMs}, is_pending = false,
queue = queue:new(), packet_id = 1, devices_map = DevicesMap, alarm_pids = []}};
Socket ->
{ok, ?CONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog),
mode = #tcp_mode{socket = Socket}, is_pending = false,
queue = queue:new(), packet_id = 1, devices_map = DevicesMap, alarm_pids = []}}
end;
{error, Reason} ->
lager:warning("[modbus_serivce] connect get error: ~p", [Reason]),
%% 5
retry_connect(),
{ok, ?DISCONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog),
queue = queue:new(), packet_id = 1, devices_map = DevicesMap, alarm_pids = []}}
end.
%% @private
%% @doc This function is called by a gen_statem when it needs to find out
@ -139,7 +145,9 @@ handle_event(info, {request, ReceiverPid, Ref, SlaveId, Address}, StateName, Sta
end;
%% rtu
handle_event(info, read_next, ?CONNECTED, State = #state{mode = rtu, queue = Q, port = Port, inflight = Inflight}) ->
handle_event(info, read_next, ?CONNECTED, State = #state{mode = #rtu_mode{port = Port}, queue = Q, inflight = Inflight}) ->
lager:debug("[modbus_service] read next, q: ~p, port is: ~p", [queue:to_list(Q), Port]),
case queue:out(Q) of
{{value, {PacketId, ReceiverPid, Ref, SlaveId, Address}}, Q2} ->
ReadCmd = <<?MODBUS_READ:8, PacketId:32, SlaveId:8, Address:16>>,
@ -152,7 +160,7 @@ handle_event(info, read_next, ?DISCONNECTED, State = #state{mode = rtu}) ->
{keep_state, State};
%% port读取数据, todo 32
handle_event(info, {Port, {data, <<?MODBUS_READ:8, PacketId:32, Val:32>>}}, ?CONNECTED, State = #state{mode = rtu, port = Port, delay_ms = DelayMs, inflight = Inflight}) ->
handle_event(info, {Port, {data, <<?MODBUS_READ:8, PacketId:32, Val:32>>}}, ?CONNECTED, State = #state{mode = #rtu_mode{port = Port, delay_ms = DelayMs}, inflight = Inflight}) ->
NInflight = case maps:take(PacketId, Inflight) of
error ->
Inflight;
@ -174,16 +182,16 @@ handle_event(info, {broadcast, Key, Val}, ?CONNECTED, State = #state{alarm_pids
{keep_state, State};
%% port退出
handle_event(info, {Port, {exit_status, Code}}, ?CONNECTED, State = #state{mode = rtu, port = Port}) ->
handle_event(info, {Port, {exit_status, Code}}, ?CONNECTED, State = #state{mode = #rtu_mode{port = Port}}) ->
lager:debug("[modbus_service] port: ~p, exit with code: ~p", [Port, Code]),
retry_connect(),
{next_state, ?DISCONNECTED, State#state{port = undefined, is_pending = true}};
{next_state, ?DISCONNECTED, State#state{mode = undefined, is_pending = true}};
%% port的退出消息
handle_event(info, {'EXIT', Port, Reason}, ?CONNECTED, State = #state{mode = rtu, port = Port}) ->
handle_event(info, {'EXIT', Port, Reason}, ?CONNECTED, State = #state{mode = #rtu_mode{port = Port}}) ->
lager:debug("[modbus_service] port: ~p, exit with code: ~p", [Port, Reason]),
retry_connect(),
{next_state, ?DISCONNECTED, State#state{port = undefined, is_pending = true}};
{next_state, ?DISCONNECTED, State#state{mode = undefined, is_pending = true}};
%% tcp退出
handle_event(info, {tcp_closed, Socket}, ?CONNECTED, State = #state{mode = tcp, socket = Socket}) ->
@ -202,17 +210,16 @@ handle_event(info, {timeout, _Ref, modbus_connect}, ?DISCONNECTED, State = #stat
lager:debug("call me will connect, transport: ~p", [Transport]),
case connect(Transport) of
{ok, ConnectResult} ->
%%
Actions = case not queue:is_empty(Q) of
true -> [{next_event, info, read_next}];
false -> []
end,
case ConnectResult of
{rtu, Port, DelayMs} ->
%% todo
% erlang:start_timer(0, self(), read_next),
{next_state, ?CONNECTED, State#state{mode = rtu, port = Port, delay_ms = DelayMs, is_pending = false}, Actions};
{next_state, ?CONNECTED, State#state{mode = #rtu_mode{port = Port, delay_ms = DelayMs}, is_pending = false}, Actions};
Socket ->
{next_state, ?CONNECTED, State#state{mode = tcp, socket = Socket}, Actions}
{next_state, ?CONNECTED, State#state{mode = #tcp_mode{socket = Socket}}, Actions}
end;
{error, Reason} ->
lager:warning("[modbus_serivce] connect get error: ~p", [Reason]),
@ -238,12 +245,27 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
%%% Internal functions
%%%===================================================================
-spec start_devices(Devices :: list()) -> map().
start_devices(Devices) when is_list(Devices) ->
%%
DevicesPids = lists:map(fun(Device = #modbus_device{slave_id = SlaveId}) ->
{ok, DevicePid} = modbus_device:start_link(self(), Device),
{SlaveId, DevicePid}
end, Devices),
maps:from_list(DevicesPids).
%%
-spec start_alarms(Alarms :: list()) -> [pid()].
start_alarms(Alarms) when is_list(Alarms) ->
lists:map(fun(Alarm) ->
{ok, AlarmPid} = modbus_alarm:start_link(self(), Alarm),
AlarmPid
end, Alarms).
-spec connect(Transport :: #modbus_transport_rtu{} | #modbus_transport_tcp{}) ->
{ok, {rtu, Port :: port(), DelayMs :: integer()}} | {ok, Socket :: gen_tcp:socket()} | {error, Reason :: any()}.
%% databits为8位
connect(#modbus_transport_rtu{port = SerialPort, baudrate = BaudRate, stopbits = StopBits, parity = Parity0, timeout = Timeout}) ->
lager:warning("call me here connect, serial_port: ~p", [SerialPort]),
%%
Parity = case Parity0 of
<<"none">> -> 0;
@ -260,9 +282,7 @@ connect(#modbus_transport_rtu{port = SerialPort, baudrate = BaudRate, stopbits =
"--timeout " ++ integer_to_list(Timeout)
],
Port = erlang:open_port({spawn_executable, RealExecCmd}, [binary, {packet, 2}, exit_status, {args, Args}]),
lager:debug("args: ~p", [Args]),
DelayMs = frame_delay(BaudRate, 8, Parity, StopBits),
lager:debug("port is: ~p", [Port]),
%%
{ok, {rtu, Port, DelayMs}};
connect(#modbus_transport_tcp{host = Host, port = Port, timeout = Timeout0}) ->