解决了mqtt的连接问题
This commit is contained in:
parent
ff1fb5e363
commit
0b0aa2659f
@ -9,13 +9,15 @@
|
|||||||
-behaviour(gen_statem).
|
-behaviour(gen_statem).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/2, publish_status/1, publish_msg/2]).
|
-export([start_link/3, publish_status/1, publish_msg/4]).
|
||||||
|
|
||||||
%% gen_statem callbacks
|
%% gen_statem callbacks
|
||||||
-export([init/1, callback_mode/0, disconnected/3, connected/3, terminate/3, code_change/4]).
|
-export([init/1, callback_mode/0, disconnected/3, connected/3, terminate/3, code_change/4]).
|
||||||
|
|
||||||
|
-define(CONNECT_ACTION(T), {state_timeout, T, start_connect}).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
parent_pid :: pid(),
|
parent_pid :: pid(),
|
||||||
|
opts = [],
|
||||||
conn_pid :: pid() | undefined,
|
conn_pid :: pid() | undefined,
|
||||||
retry :: non_neg_integer(),
|
retry :: non_neg_integer(),
|
||||||
topics :: list(),
|
topics :: list(),
|
||||||
@ -25,20 +27,18 @@
|
|||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% API
|
%%% API
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
-spec start_link(pid(), list(), list()) ->
|
||||||
-spec start_link(pid(), list()) ->
|
|
||||||
{ok, pid()} | ignore | {error, term()}.
|
{ok, pid()} | ignore | {error, term()}.
|
||||||
start_link(ParentPid, Topics)
|
start_link(ParentPid, Opts, Topics) when is_pid(ParentPid), is_list(Topics) ->
|
||||||
when is_pid(ParentPid), is_list(Topics) ->
|
gen_statem:start_link(?MODULE, [ParentPid, Opts, Topics], []).
|
||||||
gen_statem:start_link(?MODULE, [ParentPid, Topics], []).
|
|
||||||
|
|
||||||
-spec publish_status(pid()) -> map().
|
-spec publish_status(pid()) -> map().
|
||||||
publish_status(Pid) when is_pid(Pid) ->
|
publish_status(Pid) when is_pid(Pid) ->
|
||||||
gen_statem:call(Pid, status).
|
gen_statem:call(Pid, status).
|
||||||
|
|
||||||
-spec publish_msg(pid(), any()) -> ok.
|
-spec publish_msg(Pid :: pid(), Topic :: binary(), Payload :: binary(), Qos :: integer()) -> ok.
|
||||||
publish_msg(Pid, Msg) when is_pid(Pid) ->
|
publish_msg(Pid, Topic, Payload, Qos) when is_pid(Pid), is_binary(Topic), is_binary(Payload), is_integer(Qos) ->
|
||||||
gen_statem:cast(Pid, {publish, Msg}).
|
gen_statem:cast(Pid, {publish, Topic, Payload, Qos}).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% gen_statem callbacks
|
%%% gen_statem callbacks
|
||||||
@ -47,35 +47,52 @@ publish_msg(Pid, Msg) when is_pid(Pid) ->
|
|||||||
callback_mode() ->
|
callback_mode() ->
|
||||||
state_functions.
|
state_functions.
|
||||||
|
|
||||||
init([ParentPid, Topics]) ->
|
init([ParentPid, Opts0, Topics]) ->
|
||||||
self() ! start_connect,
|
erlang:process_flag(trap_exit, true),
|
||||||
{ok, disconnected, #state{parent_pid = ParentPid, topics = Topics, retry = 0, conn_pid = undefined, queue = queue:new()}}.
|
|
||||||
|
DefaultOpts = [
|
||||||
|
{owner, self()},
|
||||||
|
{tcp_opts, []},
|
||||||
|
{auto_ack, true},
|
||||||
|
{proto_ver, v3}
|
||||||
|
],
|
||||||
|
Opts = Opts0 ++ DefaultOpts,
|
||||||
|
|
||||||
|
{ok, disconnected, #state{parent_pid = ParentPid, opts = Opts, topics = Topics, retry = 0, conn_pid = undefined, queue = queue:new()}, ?CONNECT_ACTION(0)}.
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% disconnected state
|
%%% disconnected state
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
disconnected(info, start_connect, State = #state{retry = Retry, topics = Topics}) ->
|
disconnected(state_timeout, start_connect, State = #state{retry = Retry, opts = Opts, topics = Topics}) ->
|
||||||
Opts = iot_config:emqt_opts(<<"host-subscriber">>),
|
|
||||||
lager:debug("[mqtt] connecting, opts=~p", [Opts]),
|
|
||||||
case emqtt:start_link(Opts) of
|
case emqtt:start_link(Opts) of
|
||||||
{ok, ConnPid} ->
|
{ok, ConnPid} ->
|
||||||
{ok, _} = emqtt:connect(ConnPid),
|
case emqtt:connect(ConnPid) of
|
||||||
maybe_subscribe(ConnPid, Topics),
|
{ok, _} ->
|
||||||
NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}),
|
maybe_subscribe(ConnPid, Topics),
|
||||||
{next_state, connected, NState};
|
NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}),
|
||||||
|
{next_state, connected, NState};
|
||||||
|
{error, _Reason} ->
|
||||||
|
%% 连接错误发生的时候,emqtt进程一定会挂掉,在进程错误的时候处理; 否则连接错误和服务端关闭需要2个分支来处理
|
||||||
|
%% 这里保存进程id是为了,错误时候的模式匹配
|
||||||
|
{keep_state, State#state{conn_pid = ConnPid}}
|
||||||
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
{keep_state, State#state{retry = Retry + 1}, [{timeout, start_connect, 5000}]}
|
{keep_state, State#state{retry = Retry + 1}, ?CONNECT_ACTION(5000)}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% 掉线期间 publish → 入队
|
%% 掉线期间 publish → 入队
|
||||||
disconnected(cast, {publish, Msg}, State = #state{queue = Q}) ->
|
disconnected(cast, {publish, Topic, Payload, Qos}, State = #state{queue = Q}) ->
|
||||||
{keep_state, State#state{queue = enqueue(Q, Msg)}};
|
{keep_state, State#state{queue = enqueue(Q, {Topic, Payload, Qos})}};
|
||||||
|
|
||||||
%% status
|
%% status
|
||||||
disconnected({call, From}, status, State) ->
|
disconnected({call, From}, status, State) ->
|
||||||
reply_status(disconnected, From, State);
|
reply_status(disconnected, From, State);
|
||||||
|
|
||||||
|
%% 处理mqtt socket断开的问题
|
||||||
|
disconnected(info, {'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) ->
|
||||||
|
{keep_state, State, ?CONNECT_ACTION(5000)};
|
||||||
|
|
||||||
%% 兜底
|
%% 兜底
|
||||||
disconnected(_Type, _Event, State) ->
|
disconnected(_Type, _Event, State) ->
|
||||||
{keep_state, State}.
|
{keep_state, State}.
|
||||||
@ -85,14 +102,17 @@ disconnected(_Type, _Event, State) ->
|
|||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
%% 正常 publish
|
%% 正常 publish
|
||||||
connected(cast, {publish, Msg}, State = #state{conn_pid = ConnPid}) ->
|
connected(cast, {publish, Topic, Payload, Qos}, State = #state{conn_pid = ConnPid}) ->
|
||||||
do_publish(ConnPid, Msg),
|
do_publish(ConnPid, Topic, Payload, Qos),
|
||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
|
||||||
%% MQTT 断线
|
%% MQTT 断线
|
||||||
connected(info, {disconnect, ReasonCode, _Props}, State) ->
|
connected(info, {disconnect, _ReasonCode, _Props}, State) ->
|
||||||
lager:warning("[mqtt] disconnected, reason=~p", [ReasonCode]),
|
{next_state, disconnected, State#state{conn_pid = undefined}, ?CONNECT_ACTION(5000)};
|
||||||
{next_state, disconnected, State#state{conn_pid = undefined}, [{timeout, start_connect, 5000}]};
|
|
||||||
|
%% 处理mqtt socket断开的问题
|
||||||
|
connected(info, {'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) ->
|
||||||
|
{next_state, disconnected, State#state{conn_pid = undefined}, ?CONNECT_ACTION(5000)};
|
||||||
|
|
||||||
%% 收到订阅消息
|
%% 收到订阅消息
|
||||||
connected(info, {publish, #{topic := Topic, payload := Payload, qos := Qos}}, State = #state{parent_pid = ParentPid}) ->
|
connected(info, {publish, #{topic := Topic, payload := Payload, qos := Qos}}, State = #state{parent_pid = ParentPid}) ->
|
||||||
@ -100,8 +120,8 @@ connected(info, {publish, #{topic := Topic, payload := Payload, qos := Qos}}, St
|
|||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
|
||||||
%% puback
|
%% puback
|
||||||
connected(info, {puback, Packet}, _State) ->
|
connected(info, {puback, Ack}, #state{parent_pid = ParentPid}) ->
|
||||||
lager:debug("[mqtt] puback ~p", [Packet]),
|
ParentPid ! {mqtt_puback, Ack},
|
||||||
keep_state_and_data;
|
keep_state_and_data;
|
||||||
|
|
||||||
%% status
|
%% status
|
||||||
@ -122,27 +142,24 @@ enqueue(Q, Msg) ->
|
|||||||
true ->
|
true ->
|
||||||
queue:in(Msg, Q);
|
queue:in(Msg, Q);
|
||||||
false ->
|
false ->
|
||||||
lager:warning("[mqtt] queue full, drop msg"),
|
|
||||||
Q
|
Q
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec flush_queue(pid(), #state{}) -> #state{}.
|
-spec flush_queue(pid(), #state{}) -> #state{}.
|
||||||
flush_queue(ConnPid, State = #state{queue = Q}) ->
|
flush_queue(ConnPid, State = #state{queue = Q}) ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(Msg) -> do_publish(ConnPid, Msg) end,
|
fun({Topic, Payload, Qos}) -> do_publish(ConnPid, Topic, Payload, Qos) end,
|
||||||
queue:to_list(Q)
|
queue:to_list(Q)
|
||||||
),
|
),
|
||||||
State#state{queue = queue:new()}.
|
State#state{queue = queue:new()}.
|
||||||
|
|
||||||
-spec do_publish(pid(), any()) -> ok.
|
do_publish(ConnPid, Topic, Payload, Qos) ->
|
||||||
do_publish(ConnPid, Msg) ->
|
|
||||||
try
|
try
|
||||||
emqtt:publish(ConnPid, Msg),
|
emqtt:publish(ConnPid, Topic, Payload, Qos),
|
||||||
ok
|
ok
|
||||||
catch
|
catch
|
||||||
_:Err ->
|
_ ->
|
||||||
lager:error("[mqtt] publish failed ~p, requeue", [Err]),
|
gen_statem:cast(self(), {publish, Topic, Payload, Qos}),
|
||||||
gen_statem:cast(self(), {publish, Msg}),
|
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user