diff --git a/apps/iot/src/iot_mqtt_connection.erl b/apps/iot/src/iot_mqtt_connection.erl index 7b981b3..267fb88 100644 --- a/apps/iot/src/iot_mqtt_connection.erl +++ b/apps/iot/src/iot_mqtt_connection.erl @@ -9,13 +9,15 @@ -behaviour(gen_statem). %% API --export([start_link/2, publish_status/1, publish_msg/2]). - +-export([start_link/3, publish_status/1, publish_msg/4]). %% gen_statem callbacks -export([init/1, callback_mode/0, disconnected/3, connected/3, terminate/3, code_change/4]). +-define(CONNECT_ACTION(T), {state_timeout, T, start_connect}). + -record(state, { parent_pid :: pid(), + opts = [], conn_pid :: pid() | undefined, retry :: non_neg_integer(), topics :: list(), @@ -25,20 +27,18 @@ %%%=================================================================== %%% API %%%=================================================================== - --spec start_link(pid(), list()) -> +-spec start_link(pid(), list(), list()) -> {ok, pid()} | ignore | {error, term()}. -start_link(ParentPid, Topics) - when is_pid(ParentPid), is_list(Topics) -> - gen_statem:start_link(?MODULE, [ParentPid, Topics], []). +start_link(ParentPid, Opts, Topics) when is_pid(ParentPid), is_list(Topics) -> + gen_statem:start_link(?MODULE, [ParentPid, Opts, Topics], []). -spec publish_status(pid()) -> map(). publish_status(Pid) when is_pid(Pid) -> gen_statem:call(Pid, status). --spec publish_msg(pid(), any()) -> ok. -publish_msg(Pid, Msg) when is_pid(Pid) -> - gen_statem:cast(Pid, {publish, Msg}). +-spec publish_msg(Pid :: pid(), Topic :: binary(), Payload :: binary(), Qos :: integer()) -> ok. +publish_msg(Pid, Topic, Payload, Qos) when is_pid(Pid), is_binary(Topic), is_binary(Payload), is_integer(Qos) -> + gen_statem:cast(Pid, {publish, Topic, Payload, Qos}). %%%=================================================================== %%% gen_statem callbacks @@ -47,35 +47,52 @@ publish_msg(Pid, Msg) when is_pid(Pid) -> callback_mode() -> state_functions. -init([ParentPid, Topics]) -> - self() ! start_connect, - {ok, disconnected, #state{parent_pid = ParentPid, topics = Topics, retry = 0, conn_pid = undefined, queue = queue:new()}}. +init([ParentPid, Opts0, Topics]) -> + erlang:process_flag(trap_exit, true), + + DefaultOpts = [ + {owner, self()}, + {tcp_opts, []}, + {auto_ack, true}, + {proto_ver, v3} + ], + Opts = Opts0 ++ DefaultOpts, + + {ok, disconnected, #state{parent_pid = ParentPid, opts = Opts, topics = Topics, retry = 0, conn_pid = undefined, queue = queue:new()}, ?CONNECT_ACTION(0)}. %%%=================================================================== %%% disconnected state %%%=================================================================== -disconnected(info, start_connect, State = #state{retry = Retry, topics = Topics}) -> - Opts = iot_config:emqt_opts(<<"host-subscriber">>), - lager:debug("[mqtt] connecting, opts=~p", [Opts]), +disconnected(state_timeout, start_connect, State = #state{retry = Retry, opts = Opts, topics = Topics}) -> case emqtt:start_link(Opts) of {ok, ConnPid} -> - {ok, _} = emqtt:connect(ConnPid), - maybe_subscribe(ConnPid, Topics), - NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}), - {next_state, connected, NState}; + case emqtt:connect(ConnPid) of + {ok, _} -> + maybe_subscribe(ConnPid, Topics), + NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}), + {next_state, connected, NState}; + {error, _Reason} -> + %% 连接错误发生的时候,emqtt进程一定会挂掉,在进程错误的时候处理; 否则连接错误和服务端关闭需要2个分支来处理 + %% 这里保存进程id是为了,错误时候的模式匹配 + {keep_state, State#state{conn_pid = ConnPid}} + end; _ -> - {keep_state, State#state{retry = Retry + 1}, [{timeout, start_connect, 5000}]} + {keep_state, State#state{retry = Retry + 1}, ?CONNECT_ACTION(5000)} end; %% 掉线期间 publish → 入队 -disconnected(cast, {publish, Msg}, State = #state{queue = Q}) -> - {keep_state, State#state{queue = enqueue(Q, Msg)}}; +disconnected(cast, {publish, Topic, Payload, Qos}, State = #state{queue = Q}) -> + {keep_state, State#state{queue = enqueue(Q, {Topic, Payload, Qos})}}; %% status disconnected({call, From}, status, State) -> reply_status(disconnected, From, State); +%% 处理mqtt socket断开的问题 +disconnected(info, {'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) -> + {keep_state, State, ?CONNECT_ACTION(5000)}; + %% 兜底 disconnected(_Type, _Event, State) -> {keep_state, State}. @@ -85,14 +102,17 @@ disconnected(_Type, _Event, State) -> %%%=================================================================== %% 正常 publish -connected(cast, {publish, Msg}, State = #state{conn_pid = ConnPid}) -> - do_publish(ConnPid, Msg), +connected(cast, {publish, Topic, Payload, Qos}, State = #state{conn_pid = ConnPid}) -> + do_publish(ConnPid, Topic, Payload, Qos), {keep_state, State}; %% MQTT 断线 -connected(info, {disconnect, ReasonCode, _Props}, State) -> - lager:warning("[mqtt] disconnected, reason=~p", [ReasonCode]), - {next_state, disconnected, State#state{conn_pid = undefined}, [{timeout, start_connect, 5000}]}; +connected(info, {disconnect, _ReasonCode, _Props}, State) -> + {next_state, disconnected, State#state{conn_pid = undefined}, ?CONNECT_ACTION(5000)}; + +%% 处理mqtt socket断开的问题 +connected(info, {'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) -> + {next_state, disconnected, State#state{conn_pid = undefined}, ?CONNECT_ACTION(5000)}; %% 收到订阅消息 connected(info, {publish, #{topic := Topic, payload := Payload, qos := Qos}}, State = #state{parent_pid = ParentPid}) -> @@ -100,8 +120,8 @@ connected(info, {publish, #{topic := Topic, payload := Payload, qos := Qos}}, St {keep_state, State}; %% puback -connected(info, {puback, Packet}, _State) -> - lager:debug("[mqtt] puback ~p", [Packet]), +connected(info, {puback, Ack}, #state{parent_pid = ParentPid}) -> + ParentPid ! {mqtt_puback, Ack}, keep_state_and_data; %% status @@ -122,27 +142,24 @@ enqueue(Q, Msg) -> true -> queue:in(Msg, Q); false -> - lager:warning("[mqtt] queue full, drop msg"), Q end. -spec flush_queue(pid(), #state{}) -> #state{}. flush_queue(ConnPid, State = #state{queue = Q}) -> lists:foreach( - fun(Msg) -> do_publish(ConnPid, Msg) end, + fun({Topic, Payload, Qos}) -> do_publish(ConnPid, Topic, Payload, Qos) end, queue:to_list(Q) ), State#state{queue = queue:new()}. --spec do_publish(pid(), any()) -> ok. -do_publish(ConnPid, Msg) -> +do_publish(ConnPid, Topic, Payload, Qos) -> try - emqtt:publish(ConnPid, Msg), + emqtt:publish(ConnPid, Topic, Payload, Qos), ok catch - _:Err -> - lager:error("[mqtt] publish failed ~p, requeue", [Err]), - gen_statem:cast(self(), {publish, Msg}), + _ -> + gen_statem:cast(self(), {publish, Topic, Payload, Qos}), ok end.