fix
This commit is contained in:
parent
67a8343517
commit
94d7e5bcd8
@ -99,7 +99,7 @@ handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = Paren
|
||||
%% 读取采集项目
|
||||
ReceiverPid = self(),
|
||||
Ref = make_ref(),
|
||||
ParentPid ! {read, ReceiverPid, Ref, SlaveId, Address},
|
||||
ParentPid ! {request, ReceiverPid, Ref, SlaveId, Address},
|
||||
|
||||
{noreply, State#state{inflight = maps:put(Ref, Name, Inflight)}};
|
||||
|
||||
|
||||
@ -36,6 +36,7 @@
|
||||
port :: erlang:port() | undefined,
|
||||
%% 延迟的毫秒数
|
||||
delay_ms = 0,
|
||||
is_pending = true :: boolean(),
|
||||
|
||||
socket :: gen_tcp:socket() | undefined,
|
||||
|
||||
@ -44,7 +45,6 @@
|
||||
|
||||
packet_id = 1,
|
||||
queue = queue:new(),
|
||||
is_pending = false :: boolean(),
|
||||
|
||||
%% 未确认的消息
|
||||
inflight = #{},
|
||||
@ -78,7 +78,7 @@ start_link(AST = #ast{}) ->
|
||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||
%% process to initialize.
|
||||
init([AST = #ast{modbus = Modbus = #modbus{error_log = ErrorLog, access_log = AccessLog}, devices = Devices}]) ->
|
||||
lager:debug("modbus is: ~p", [Modbus]),
|
||||
lager:debug("[modbus_service] modbus is: ~p", [Modbus]),
|
||||
|
||||
%% 建立连接
|
||||
erlang:start_timer(0, self(), modbus_connect),
|
||||
@ -90,7 +90,7 @@ init([AST = #ast{modbus = Modbus = #modbus{error_log = ErrorLog, access_log = Ac
|
||||
end, Devices),
|
||||
DevicesMap = maps:from_list(DevicesPids),
|
||||
|
||||
lager:debug("devices pid: ~p", [DevicesMap]),
|
||||
lager:debug("[modbus_service] devices pid: ~p", [DevicesMap]),
|
||||
|
||||
{ok, ?DISCONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog),
|
||||
queue = queue:new(), packet_id = 1, devices_map = DevicesMap}}.
|
||||
@ -106,34 +106,42 @@ callback_mode() ->
|
||||
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
||||
%% process message, this function is called.
|
||||
|
||||
handle_event(info, {read, SlaveId, Address}, ?CONNECTED, State = #state{packet_id = PacketId, queue = Q, is_pending = IsPending}) ->
|
||||
%% 基于队列处理, modbus不是全双工的模式
|
||||
NQ = queue:in({PacketId, SlaveId, Address}, Q),
|
||||
%% 请求的时候必须先加入队列,modbus需要控制请求的频率
|
||||
handle_event(info, {request, ReceiverPid, Ref, SlaveId, Address}, StateName, State = #state{mode = rtu, is_pending = IsPending, packet_id = PacketId, queue = Q}) ->
|
||||
case StateName of
|
||||
?CONNECTED ->
|
||||
Actions = case IsPending of
|
||||
true -> [{next_event, info, read_next}];
|
||||
false -> []
|
||||
true ->
|
||||
[{next_event, info, read_next}];
|
||||
false ->
|
||||
[]
|
||||
end,
|
||||
{keep_state, State#state{queue = NQ, packet_id = PacketId + 1, is_pending = false}, Actions};
|
||||
%% 基于队列处理, modbus不是全双工的模式
|
||||
NQ = queue:in({PacketId, ReceiverPid, Ref, SlaveId, Address}, Q),
|
||||
{keep_state, State#state{queue = NQ, packet_id = PacketId + 1}, Actions};
|
||||
_ ->
|
||||
{keep_state, State}
|
||||
end;
|
||||
|
||||
%% 读取下一个任务
|
||||
handle_event(info, read_next, ?CONNECTED, State = #state{queue = Q, port = Port}) ->
|
||||
handle_event(info, read_next, ?CONNECTED, State = #state{mode = rtu, queue = Q, port = Port, inflight = Inflight}) ->
|
||||
case queue:out(Q) of
|
||||
{{value, {PacketId, SlaveId, Address}}, Q2} ->
|
||||
{{value, {PacketId, ReceiverPid, Ref, SlaveId, Address}}, Q2} ->
|
||||
ReadCmd = <<?MODBUS_READ:8, PacketId:32, SlaveId:8, Address:16>>,
|
||||
Port ! {self(), {command, ReadCmd}},
|
||||
{keep_state, State#state{queue = Q2}};
|
||||
{keep_state, State#state{queue = Q2, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}};
|
||||
{empty, Q1} ->
|
||||
{keep_state, State#state{queue = Q1}}
|
||||
{keep_state, State#state{queue = Q1, is_pending = true}}
|
||||
end;
|
||||
|
||||
%% 从port读取数据, todo 获取到的数据表示为32位整数
|
||||
handle_event(info, {Port, {data, <<?MODBUS_READ:8, PacketId:32, Val:32>>}}, ?CONNECTED, State = #state{port = Port, delay_ms = DelayMs, inflight = Inflight}) ->
|
||||
handle_event(info, {Port, {data, <<?MODBUS_READ:8, PacketId:32, Val:32>>}}, ?CONNECTED, State = #state{mode = rtu, port = Port, delay_ms = DelayMs, inflight = Inflight}) ->
|
||||
NInflight = case maps:take(PacketId, Inflight) of
|
||||
error ->
|
||||
Inflight;
|
||||
{{ReceiverPid, Ref}, NMap} ->
|
||||
erlang:is_process_alive(ReceiverPid) andalso ReceiverPid ! {request_reply, Ref, Val},
|
||||
NMap
|
||||
{{ReceiverPid, Ref}, Inflight0} ->
|
||||
erlang:is_process_alive(ReceiverPid) andalso erlang:send(ReceiverPid, {request_reply, Ref, Val}),
|
||||
Inflight0
|
||||
end,
|
||||
lager:debug("[modbus_service] port data is: ~p", [{PacketId, Val}]),
|
||||
%% 读取下一条
|
||||
@ -145,13 +153,13 @@ handle_event(info, {Port, {data, <<?MODBUS_READ:8, PacketId:32, Val:32>>}}, ?CON
|
||||
handle_event(info, {Port, {exit_status, Code}}, ?CONNECTED, State = #state{mode = rtu, port = Port}) ->
|
||||
lager:debug("[modbus_service] port: ~p, exit with code: ~p", [Port, Code]),
|
||||
retry_connect(),
|
||||
{next_state, ?DISCONNECTED, State#state{port = undefined}};
|
||||
{next_state, ?DISCONNECTED, State#state{port = undefined, is_pending = true}};
|
||||
|
||||
%% 处理port的退出消息
|
||||
handle_event(info, {'EXIT', Port, Reason}, ?CONNECTED, State = #state{mode = rtu, port = Port}) ->
|
||||
lager:debug("[modbus_service] port: ~p, exit with code: ~p", [Port, Reason]),
|
||||
retry_connect(),
|
||||
{next_state, ?DISCONNECTED, State#state{port = undefined}};
|
||||
{next_state, ?DISCONNECTED, State#state{port = undefined, is_pending = true}};
|
||||
|
||||
%% tcp退出
|
||||
handle_event(info, {tcp_closed, Socket}, ?CONNECTED, State = #state{mode = tcp, socket = Socket}) ->
|
||||
@ -175,7 +183,9 @@ handle_event(info, modbus_connect, ?DISCONNECTED, State = #state{ast = #ast{modb
|
||||
end,
|
||||
case ConnectResult of
|
||||
{rtu, Port, DelayMs} ->
|
||||
{next_state, ?CONNECTED, State#state{mode = rtu, port = Port, delay_ms = DelayMs}, Actions};
|
||||
%% 触发读取下一条
|
||||
erlang:start_timer(0, self(), read_next),
|
||||
{next_state, ?CONNECTED, State#state{mode = rtu, port = Port, delay_ms = DelayMs, is_pending = false}, Actions};
|
||||
Socket ->
|
||||
{next_state, ?CONNECTED, State#state{mode = tcp, socket = Socket}, Actions}
|
||||
end;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user