diff --git a/apps/modbus/src/modbus_service.erl b/apps/modbus/src/modbus_service.erl index abab248..54cfe89 100644 --- a/apps/modbus/src/modbus_service.erl +++ b/apps/modbus/src/modbus_service.erl @@ -28,12 +28,23 @@ -define(CONNECTED, connected). -record(state, { - port, + ast :: #ast{}, + + %% 连接模式 + mode :: rtu | tcp, + + port :: erlang:port() | undefined, + %% 延迟的毫秒数 + delay_ms = 0, + + socket :: gen_tcp:socket() | undefined, + + %% #{slaveId => DevicePid} + devices_map = #{}, + packet_id = 1, queue = queue:new(), is_pending = false :: boolean(), - %% 延迟的毫秒数 - delay_ms = 0, %% 未确认的消息 inflight = #{}, @@ -66,21 +77,29 @@ start_link(AST = #ast{}) -> %% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. -init([AST = #ast{modbus = Modbus, devices = Devices}]) -> +init([AST = #ast{modbus = Modbus = #modbus{transport = Transport}, devices = Devices}]) -> lager:debug("modbus is: ~p", [Modbus]), Device = hd(Devices), lager:debug("devices is: ~p", [Device#modbus_device.metrics]), % Res = connect(Transport), % lager:debug("connect res: ~p", [Res]), - {ok, DevicePid} = modbus_device:start_link(hd(Devices)), - lager:debug("device pid: ~p", [DevicePid]), + %% 建立连接 + erlang:start_timer(0, self(), modbus_connect), + %% 启动设备相关的进程 + DevicesPids = lists:map(fun(Device = #modbus_device{slave_id = SlaveId}) -> + {ok, DevicePid} = modbus_device:start_link(self(), Device), + {SlaveId, DevicePid} + end, Devices), + DevicesMap = maps:from_list(DevicesPids), + + lager:debug("device pid: ~p", [DevicesMap]), %{ok, AccessLogPid} = modbus_logger:start_link(AccessLog), %{ok, ErrorLogPid} = modbus_logger:start_link(ErrorLog), - {ok, ?DISCONNECTED, #state{queue = queue:new(), packet_id = 1}}. + {ok, ?DISCONNECTED, #state{ast = AST, queue = queue:new(), packet_id = 1, devices_map = DevicesMap}}. %% @private %% @doc This function is called by a gen_statem when it needs to find out @@ -128,6 +147,51 @@ handle_event(info, {Port, {data, <>}}, ?CON {keep_state, State#state{inflight = NInflight}}; +%% port退出 +handle_event(info, {Port, {exit_status, Code}}, ?CONNECTED, State = #state{mode = rtu, port = Port}) -> + lager:debug("[efka_service] port: ~p, exit with code: ~p", [Port, Code]), + retry_connect(), + {next_state, ?DISCONNECTED, State#state{port = undefined}}; + +%% 处理port的退出消息 +handle_event(info, {'EXIT', Port, Reason}, ?CONNECTED, State = #state{mode = rtu, port = Port}) -> + lager:debug("[efka_service] port: ~p, exit with code: ~p", [Port, Reason]), + retry_connect(), + {next_state, ?DISCONNECTED, State#state{port = undefined}}; + +%% tcp退出 +handle_event(info, {tcp_closed, Socket}, ?CONNECTED, State = #state{mode = tcp, socket = Socket}) -> + lager:debug("[efka_service] socket: ~p, exit with tcp_closed", [Socket]), + retry_connect(), + {next_state, ?DISCONNECTED, State#state{socket = undefined}}; + +%% 处理tcp的退出消息 +handle_event(info, {tcp_error, Socket, Reason}, ?CONNECTED, State = #state{mode = tcp, socket = Socket}) -> + lager:debug("[efka_service] socket: ~p, exit with code: ~p", [Socket, Reason]), + retry_connect(), + {next_state, ?DISCONNECTED, State#state{socket = undefined}}; + +%% 建立连接, todo is_pending的状态要形成闭环 +handle_event(info, modbus_connect, ?DISCONNECTED, State = #state{ast = #ast{modbus = #modbus{transport = Transport}}, queue = Q}) -> + case connect(Transport) of + {ok, ConnectResult} -> + Actions = case not queue:is_empty(Q) of + true -> [{next_event, info, read_next}]; + false -> [] + end, + case ConnectResult of + {rtu, Port, DelayMs} -> + {next_state, ?CONNECTED, State#state{mode = rtu, port = Port, delay_ms = DelayMs}, Actions}; + Socket -> + {next_state, ?CONNECTED, State#state{mode = tcp, socket = Socket}, Actions} + end; + {error, Reason} -> + lager:warning("[modbus_serivce] connect get error: ~p", [Reason]), + %% 5秒后重试 + retry_connect(), + {keep_state, State} + end; + handle_event(_EventType, _EventContent, _StateName, State = #state{}) -> NextStateName = the_next_state_name, {next_state, NextStateName, State}. @@ -150,7 +214,7 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) -> %%%=================================================================== %% databits为8位 -connect(#modbus{transport = #modbus_transport_rtu{port = Port, baudrate = BaudRate, stopbits = StopBits, parity = Parity, timeout = Timeout}}) -> +connect(#modbus_transport_rtu{port = Port, baudrate = BaudRate, stopbits = StopBits, parity = Parity, timeout = Timeout}) -> RealExecCmd = "", Port = erlang:open_port({spawn_executable, RealExecCmd}, [binary, {packet, 2}, exit_status]), @@ -160,9 +224,8 @@ connect(#modbus{transport = #modbus_transport_rtu{port = Port, baudrate = BaudRa ConnectCmd = <>, %% 建立连接 Port ! {self(), {command, ConnectCmd}}, - ok; - -connect(#modbus{transport = #modbus_transport_tcp{host = Host, port = Port, timeout = Timeout0}}) -> + {ok, {rtu, Port, DelayMs}}; +connect(#modbus_transport_tcp{host = Host, port = Port, timeout = Timeout0}) -> Timeout = case is_integer(Timeout0) andalso Timeout0 > 0 of true -> Timeout0; @@ -176,4 +239,7 @@ frame_delay(BaudRate, DataBits, Parity, StopBits) when is_integer(BaudRate), is_ %% 计算1个字符的总位数 CharBits = 1 + DataBits + case Parity of 0 -> 0; _ -> 1 end + StopBits, %% 3.5字符时间(毫秒), 预留20%的时间 - erlang:ceil((3500 * CharBits) / BaudRate * 1.2). \ No newline at end of file + erlang:ceil((3500 * CharBits) / BaudRate * 1.2). + +retry_connect() -> + erlang:start_timer(5000, self(), modbus_connect). \ No newline at end of file