fix mqtt connection
This commit is contained in:
parent
d4e1e8ea8b
commit
450b2f6e29
41
apps/iot/src/iot_mqtt_client_sup.erl
Normal file
41
apps/iot/src/iot_mqtt_client_sup.erl
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
%% @doc iot top level supervisor.
|
||||||
|
%% @end
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(iot_mqtt_client_sup).
|
||||||
|
|
||||||
|
-behaviour(supervisor).
|
||||||
|
|
||||||
|
-export([start_link/0]).
|
||||||
|
|
||||||
|
-export([init/1]).
|
||||||
|
|
||||||
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
|
start_link() ->
|
||||||
|
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
|
||||||
|
|
||||||
|
%% sup_flags() = #{strategy => strategy(), % optional
|
||||||
|
%% intensity => non_neg_integer(), % optional
|
||||||
|
%% period => pos_integer()} % optional
|
||||||
|
%% child_spec() = #{id => child_id(), % mandatory
|
||||||
|
%% start => mfargs(), % mandatory
|
||||||
|
%% restart => restart(), % optional
|
||||||
|
%% shutdown => shutdown(), % optional
|
||||||
|
%% type => worker(), % optional
|
||||||
|
%% modules => modules()} % optional
|
||||||
|
init([]) ->
|
||||||
|
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
|
||||||
|
Specs = [
|
||||||
|
#{
|
||||||
|
id => iot_watchdog,
|
||||||
|
start => {'iot_watchdog', start_link, []},
|
||||||
|
restart => permanent,
|
||||||
|
shutdown => 2000,
|
||||||
|
type => worker,
|
||||||
|
modules => ['iot_watchdog']
|
||||||
|
}
|
||||||
|
],
|
||||||
|
|
||||||
|
{ok, {SupFlags, Specs}}.
|
||||||
197
apps/iot/src/iot_mqtt_connection.erl
Normal file
197
apps/iot/src/iot_mqtt_connection.erl
Normal file
@ -0,0 +1,197 @@
|
|||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
%%% @author aresei
|
||||||
|
%%% @doc
|
||||||
|
%%% MQTT Client based on gen_statem
|
||||||
|
%%% 自动重连 + 掉线期间 publish 入队 + flush 队列
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
-module(iot_mqtt_connection).
|
||||||
|
-author("aresei").
|
||||||
|
-behaviour(gen_statem).
|
||||||
|
|
||||||
|
-include("iot.hrl").
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([start_link/1, publish_status/1, publish_msg/2]).
|
||||||
|
|
||||||
|
%% gen_statem callbacks
|
||||||
|
-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]).
|
||||||
|
|
||||||
|
-define(DISCONNECTED, disconnected).
|
||||||
|
-define(CONNECTED, connected).
|
||||||
|
|
||||||
|
-record(state, {
|
||||||
|
parent_pid :: pid(),
|
||||||
|
conn_pid = undefined :: pid() | undefined,
|
||||||
|
retry = 0 :: non_neg_integer(),
|
||||||
|
topics = [] :: list(),
|
||||||
|
%% 掉线期间 publish 缓冲
|
||||||
|
queue = queue:new() :: queue:queue()
|
||||||
|
}).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% API
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
%% 需要订阅的主题信息
|
||||||
|
%% Topics: [{<<"CET/NX/upload">>, 2}]
|
||||||
|
-spec start_link(ParentPid :: pid(), Topics :: list()) -> {ok, pid()} | ignore | {error, term()}.
|
||||||
|
start_link(ParentPid, Topics) when is_pid(ParentPid), is_list(Topics) ->
|
||||||
|
gen_statem:start_link(?MODULE, [ParentPid, Topics], []).
|
||||||
|
|
||||||
|
-spec publish_status(ConnPid :: pid()) -> #{state := atom(), retry := non_neg_integer(), queue_len := non_neg_integer()}.
|
||||||
|
publish_status(ConnPid) when is_pid(ConnPid) ->
|
||||||
|
gen_statem:call(ConnPid, status).
|
||||||
|
|
||||||
|
-spec publish_msg(ConnPid :: pid(), Msg :: any()) -> ok.
|
||||||
|
publish_msg(ConnPid, Msg) when is_pid(ConnPid) ->
|
||||||
|
gen_statem:cast(ConnPid, {publish, Msg}).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% gen_statem callbacks
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
callback_mode() ->
|
||||||
|
handle_event_function.
|
||||||
|
|
||||||
|
-spec init([]) -> {ok, StateName :: atom(), StateData :: #state{}}.
|
||||||
|
init([ParentPid, Topics]) ->
|
||||||
|
%% 尝试建立到服务器的连接
|
||||||
|
self() ! start_connect,
|
||||||
|
{ok, ?DISCONNECTED, #state{parent_pid = ParentPid, topics = Topics}}.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% handle_event/4
|
||||||
|
%%%===================================================================
|
||||||
|
handle_event(info, start_connect, ?DISCONNECTED, State = #state{retry = Retry, topics = Topics}) ->
|
||||||
|
Opts = iot_config:emqt_opts(<<"host-subscriber">>),
|
||||||
|
lager:debug("[mqtt] connecting, opts: ~p", [Opts]),
|
||||||
|
case emqtt:start_link(Opts) of
|
||||||
|
{ok, ConnPid} ->
|
||||||
|
{ok, _} = emqtt:connect(ConnPid),
|
||||||
|
case length(Topics) > 0 of
|
||||||
|
true ->
|
||||||
|
SubscribeResult = emqtt:subscribe(ConnPid, Topics),
|
||||||
|
lager:debug("[mqtt] connected pid: ~p subscribed: ~p", [ConnPid, SubscribeResult]);
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}),
|
||||||
|
{next_state, ?CONNECTED, NState};
|
||||||
|
ignore ->
|
||||||
|
lager:debug("[mqtt] start_link returned ignore"),
|
||||||
|
try_reconnect(),
|
||||||
|
{next_state, ?DISCONNECTED, State#state{retry = Retry + 1}};
|
||||||
|
{error, Reason} ->
|
||||||
|
lager:debug("[mqtt] start_link returned error: ~p", [Reason]),
|
||||||
|
try_reconnect(),
|
||||||
|
{next_state, ?DISCONNECTED, State#state{retry = Retry + 1}}
|
||||||
|
end;
|
||||||
|
|
||||||
|
%% -------------------------------
|
||||||
|
%% connected 状态
|
||||||
|
%% -------------------------------
|
||||||
|
handle_event(cast, {publish, Msg}, ?CONNECTED, State = #state{conn_pid = ConnPid}) ->
|
||||||
|
do_publish(ConnPid, Msg),
|
||||||
|
{keep_state, State};
|
||||||
|
|
||||||
|
handle_event(info, {disconnect, ReasonCode, _Props}, ?CONNECTED, State = #state{conn_pid = _ConnPid}) ->
|
||||||
|
lager:debug("[mqtt] disconnected ReasonCode: ~p", [ReasonCode]),
|
||||||
|
try_reconnect(),
|
||||||
|
{next_state, ?DISCONNECTED, State#state{conn_pid = undefined}};
|
||||||
|
|
||||||
|
%% -------------------------------
|
||||||
|
%% disconnected / reconnect_wait 状态收到 publish
|
||||||
|
%% -------------------------------
|
||||||
|
handle_event(cast, {publish, Msg}, ?DISCONNECTED, State = #state{queue = Queue}) ->
|
||||||
|
NewQueue = enqueue(Queue, Msg),
|
||||||
|
{keep_state, State#state{queue = NewQueue}};
|
||||||
|
|
||||||
|
%% -------------------------------
|
||||||
|
%% call
|
||||||
|
%% -------------------------------
|
||||||
|
handle_event({call, From}, status, StateName, State = #state{retry = Retry}) ->
|
||||||
|
Reply = #{
|
||||||
|
state => StateName,
|
||||||
|
retry => Retry,
|
||||||
|
queue_len => queue:len(State#state.queue)
|
||||||
|
},
|
||||||
|
{keep_state, State, [{reply, From, Reply}]};
|
||||||
|
|
||||||
|
%% -------------------------------
|
||||||
|
%% 收到推送消息
|
||||||
|
%% -------------------------------
|
||||||
|
handle_event(info, {publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, _StateName, State = #state{parent_pid = ParentPid}) ->
|
||||||
|
lager:debug("[iot_mqtt_subscriber] Recv a topic: ~p, publish packet: ~p, qos: ~p", [Topic, Payload, Qos]),
|
||||||
|
%% 将消息分发到对应的host进程去处理
|
||||||
|
ParentPid ! {mqtt_message, Topic, Payload, Qos},
|
||||||
|
{keep_state, State};
|
||||||
|
handle_event(info, {puback, Packet = #{packet_id := _PacketId}}, _, State = #state{}) ->
|
||||||
|
lager:debug("[iot_mqtt_subscriber] receive puback packet: ~p", [Packet]),
|
||||||
|
{keep_state, State};
|
||||||
|
|
||||||
|
%% -------------------------------
|
||||||
|
%% 默认信息处理
|
||||||
|
%% -------------------------------
|
||||||
|
handle_event(_Type, Info, StateName, State) ->
|
||||||
|
lager:debug("[mqtt] unhandled event ~p in state ~p", [Info, StateName]),
|
||||||
|
{keep_state, State}.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% Internal functions
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
-spec enqueue(Queue :: queue:queue(), Msg :: any()) -> NQueue :: queue:queue().
|
||||||
|
enqueue(Queue, Msg) ->
|
||||||
|
%% 队列最大长度 1000
|
||||||
|
case queue:len(Queue) < 1000 of
|
||||||
|
true ->
|
||||||
|
queue:in(Msg, Queue);
|
||||||
|
false ->
|
||||||
|
lager:warning("[mqtt] queue full, dropping message ~p", [Msg]),
|
||||||
|
Queue
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec flush_queue(ConnPid :: pid(), State :: #state{}) -> NState :: #state{}.
|
||||||
|
flush_queue(ConnPid, State = #state{queue = Queue}) ->
|
||||||
|
lists:foreach(fun(Msg) -> do_publish(ConnPid, Msg) end, queue:to_list(Queue)),
|
||||||
|
State#state{queue = queue:new()}.
|
||||||
|
|
||||||
|
-spec do_publish(ConnPid :: pid(), Msg :: any()) -> no_return().
|
||||||
|
do_publish(ConnPid, Msg) ->
|
||||||
|
try
|
||||||
|
emqtt:publish(ConnPid, Msg)
|
||||||
|
catch
|
||||||
|
_:Err ->
|
||||||
|
lager:error("[mqtt] publish failed ~p, requeueing", [Err]),
|
||||||
|
Pid = self(),
|
||||||
|
gen_statem:cast(Pid, {publish, Msg})
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% terminate/3
|
||||||
|
%%%===================================================================
|
||||||
|
-spec terminate(Reason :: term(), StateName :: atom(), State :: #state{}) -> ok.
|
||||||
|
terminate(_Reason, _StateName, #state{conn_pid = undefined}) ->
|
||||||
|
ok;
|
||||||
|
terminate(_Reason, _StateName, #state{conn_pid = ConnPid, topics = Topics}) ->
|
||||||
|
case length(Topics) > 0 of
|
||||||
|
true ->
|
||||||
|
TopicNames = lists:map(fun({Name,_}) -> Name end, Topics),
|
||||||
|
{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames);
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
ok = emqtt:disconnect(ConnPid),
|
||||||
|
lager:debug("[mqtt] terminate cleanup done"),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% code_change/4
|
||||||
|
%%%===================================================================
|
||||||
|
-spec code_change(term(), StateName :: atom(), State :: #state{}, Extra :: term()) ->
|
||||||
|
{ok, State :: #state{}}.
|
||||||
|
code_change(_OldVsn, _StateName, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
try_reconnect() ->
|
||||||
|
erlang:send_after(5000, self(), start_connect).
|
||||||
Loading…
x
Reference in New Issue
Block a user