fix mqtt connection

This commit is contained in:
anlicheng 2026-01-12 10:49:47 +08:00
parent 41e88100ec
commit ff1fb5e363

View File

@ -1,197 +1,185 @@
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
%%% @author aresei %%% @author aresei
%%% @doc %%% @doc
%%% MQTT Client based on gen_statem %%% MQTT Client based on gen_statem (state_functions)
%%% + 线 publish + flush %%% + 线 publish + flush
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
-module(iot_mqtt_connection). -module(iot_mqtt_connection).
-author("aresei"). -author("aresei").
-behaviour(gen_statem). -behaviour(gen_statem).
-include("iot.hrl").
%% API %% API
-export([start_link/2, publish_status/1, publish_msg/2]). -export([start_link/2, publish_status/1, publish_msg/2]).
%% gen_statem callbacks %% gen_statem callbacks
-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). -export([init/1, callback_mode/0, disconnected/3, connected/3, terminate/3, code_change/4]).
-define(DISCONNECTED, disconnected).
-define(CONNECTED, connected).
-record(state, { -record(state, {
parent_pid :: pid(), parent_pid :: pid(),
conn_pid = undefined :: pid() | undefined, conn_pid :: pid() | undefined,
retry = 0 :: non_neg_integer(), retry :: non_neg_integer(),
topics = [] :: list(), topics :: list(),
%% 线 publish queue :: queue:queue()
queue = queue:new() :: queue:queue()
}). }).
%%%=================================================================== %%%===================================================================
%%% API %%% API
%%%=================================================================== %%%===================================================================
%% -spec start_link(pid(), list()) ->
%% Topics: [{<<"CET/NX/upload">>, 2}] {ok, pid()} | ignore | {error, term()}.
-spec start_link(ParentPid :: pid(), Topics :: list()) -> {ok, pid()} | ignore | {error, term()}. start_link(ParentPid, Topics)
start_link(ParentPid, Topics) when is_pid(ParentPid), is_list(Topics) -> when is_pid(ParentPid), is_list(Topics) ->
gen_statem:start_link(?MODULE, [ParentPid, Topics], []). gen_statem:start_link(?MODULE, [ParentPid, Topics], []).
-spec publish_status(ConnPid :: pid()) -> #{state := atom(), retry := non_neg_integer(), queue_len := non_neg_integer()}. -spec publish_status(pid()) -> map().
publish_status(ConnPid) when is_pid(ConnPid) -> publish_status(Pid) when is_pid(Pid) ->
gen_statem:call(ConnPid, status). gen_statem:call(Pid, status).
-spec publish_msg(ConnPid :: pid(), Msg :: any()) -> ok. -spec publish_msg(pid(), any()) -> ok.
publish_msg(ConnPid, Msg) when is_pid(ConnPid) -> publish_msg(Pid, Msg) when is_pid(Pid) ->
gen_statem:cast(ConnPid, {publish, Msg}). gen_statem:cast(Pid, {publish, Msg}).
%%%=================================================================== %%%===================================================================
%%% gen_statem callbacks %%% gen_statem callbacks
%%%=================================================================== %%%===================================================================
callback_mode() -> callback_mode() ->
handle_event_function. state_functions.
-spec init([]) -> {ok, StateName :: atom(), StateData :: #state{}}.
init([ParentPid, Topics]) -> init([ParentPid, Topics]) ->
%%
self() ! start_connect, self() ! start_connect,
{ok, ?DISCONNECTED, #state{parent_pid = ParentPid, topics = Topics}}. {ok, disconnected, #state{parent_pid = ParentPid, topics = Topics, retry = 0, conn_pid = undefined, queue = queue:new()}}.
%%%=================================================================== %%%===================================================================
%%% handle_event/4 %%% disconnected state
%%%=================================================================== %%%===================================================================
handle_event(info, start_connect, ?DISCONNECTED, State = #state{retry = Retry, topics = Topics}) ->
disconnected(info, start_connect, State = #state{retry = Retry, topics = Topics}) ->
Opts = iot_config:emqt_opts(<<"host-subscriber">>), Opts = iot_config:emqt_opts(<<"host-subscriber">>),
lager:debug("[mqtt] connecting, opts: ~p", [Opts]), lager:debug("[mqtt] connecting, opts=~p", [Opts]),
case emqtt:start_link(Opts) of case emqtt:start_link(Opts) of
{ok, ConnPid} -> {ok, ConnPid} ->
{ok, _} = emqtt:connect(ConnPid), {ok, _} = emqtt:connect(ConnPid),
case length(Topics) > 0 of maybe_subscribe(ConnPid, Topics),
true ->
SubscribeResult = emqtt:subscribe(ConnPid, Topics),
lager:debug("[mqtt] connected pid: ~p subscribed: ~p", [ConnPid, SubscribeResult]);
false ->
ok
end,
NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}), NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}),
{next_state, ?CONNECTED, NState}; {next_state, connected, NState};
ignore -> _ ->
lager:debug("[mqtt] start_link returned ignore"), {keep_state, State#state{retry = Retry + 1}, [{timeout, start_connect, 5000}]}
try_reconnect(),
{next_state, ?DISCONNECTED, State#state{retry = Retry + 1}};
{error, Reason} ->
lager:debug("[mqtt] start_link returned error: ~p", [Reason]),
try_reconnect(),
{next_state, ?DISCONNECTED, State#state{retry = Retry + 1}}
end; end;
%% ------------------------------- %% 线 publish
%% connected disconnected(cast, {publish, Msg}, State = #state{queue = Q}) ->
%% ------------------------------- {keep_state, State#state{queue = enqueue(Q, Msg)}};
handle_event(cast, {publish, Msg}, ?CONNECTED, State = #state{conn_pid = ConnPid}) ->
do_publish(ConnPid, Msg),
{keep_state, State};
handle_event(info, {disconnect, ReasonCode, _Props}, ?CONNECTED, State = #state{conn_pid = _ConnPid}) -> %% status
lager:debug("[mqtt] disconnected ReasonCode: ~p", [ReasonCode]), disconnected({call, From}, status, State) ->
try_reconnect(), reply_status(disconnected, From, State);
{next_state, ?DISCONNECTED, State#state{conn_pid = undefined}};
%% ------------------------------- %%
%% disconnected / reconnect_wait publish disconnected(_Type, _Event, State) ->
%% -------------------------------
handle_event(cast, {publish, Msg}, ?DISCONNECTED, State = #state{queue = Queue}) ->
NewQueue = enqueue(Queue, Msg),
{keep_state, State#state{queue = NewQueue}};
%% -------------------------------
%% call
%% -------------------------------
handle_event({call, From}, status, StateName, State = #state{retry = Retry}) ->
Reply = #{
state => StateName,
retry => Retry,
queue_len => queue:len(State#state.queue)
},
{keep_state, State, [{reply, From, Reply}]};
%% -------------------------------
%%
%% -------------------------------
handle_event(info, {publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, _StateName, State = #state{parent_pid = ParentPid}) ->
lager:debug("[iot_mqtt_subscriber] Recv a topic: ~p, publish packet: ~p, qos: ~p", [Topic, Payload, Qos]),
%% host进程去处理
ParentPid ! {mqtt_message, Topic, Payload, Qos},
{keep_state, State};
handle_event(info, {puback, Packet = #{packet_id := _PacketId}}, _, State = #state{}) ->
lager:debug("[iot_mqtt_subscriber] receive puback packet: ~p", [Packet]),
{keep_state, State};
%% -------------------------------
%%
%% -------------------------------
handle_event(_Type, Info, StateName, State) ->
lager:debug("[mqtt] unhandled event ~p in state ~p", [Info, StateName]),
{keep_state, State}. {keep_state, State}.
%%%=================================================================== %%%===================================================================
%%% Internal functions %%% connected state
%%%=================================================================== %%%===================================================================
-spec enqueue(Queue :: queue:queue(), Msg :: any()) -> NQueue :: queue:queue(). %% publish
enqueue(Queue, Msg) -> connected(cast, {publish, Msg}, State = #state{conn_pid = ConnPid}) ->
%% 1000 do_publish(ConnPid, Msg),
case queue:len(Queue) < 1000 of {keep_state, State};
%% MQTT 线
connected(info, {disconnect, ReasonCode, _Props}, State) ->
lager:warning("[mqtt] disconnected, reason=~p", [ReasonCode]),
{next_state, disconnected, State#state{conn_pid = undefined}, [{timeout, start_connect, 5000}]};
%%
connected(info, {publish, #{topic := Topic, payload := Payload, qos := Qos}}, State = #state{parent_pid = ParentPid}) ->
ParentPid ! {mqtt_message, Topic, Payload, Qos},
{keep_state, State};
%% puback
connected(info, {puback, Packet}, _State) ->
lager:debug("[mqtt] puback ~p", [Packet]),
keep_state_and_data;
%% status
connected({call, From}, status, State) ->
reply_status(connected, From, State);
%%
connected(_Type, _Event, State) ->
{keep_state, State}.
%%%===================================================================
%%% Internal helpers
%%%===================================================================
-spec enqueue(queue:queue(), any()) -> queue:queue().
enqueue(Q, Msg) ->
case queue:len(Q) < 1000 of
true -> true ->
queue:in(Msg, Queue); queue:in(Msg, Q);
false -> false ->
lager:warning("[mqtt] queue full, dropping message ~p", [Msg]), lager:warning("[mqtt] queue full, drop msg"),
Queue Q
end. end.
-spec flush_queue(ConnPid :: pid(), State :: #state{}) -> NState :: #state{}. -spec flush_queue(pid(), #state{}) -> #state{}.
flush_queue(ConnPid, State = #state{queue = Queue}) -> flush_queue(ConnPid, State = #state{queue = Q}) ->
lists:foreach(fun(Msg) -> do_publish(ConnPid, Msg) end, queue:to_list(Queue)), lists:foreach(
fun(Msg) -> do_publish(ConnPid, Msg) end,
queue:to_list(Q)
),
State#state{queue = queue:new()}. State#state{queue = queue:new()}.
-spec do_publish(ConnPid :: pid(), Msg :: any()) -> no_return(). -spec do_publish(pid(), any()) -> ok.
do_publish(ConnPid, Msg) -> do_publish(ConnPid, Msg) ->
try try
emqtt:publish(ConnPid, Msg) emqtt:publish(ConnPid, Msg),
ok
catch catch
_:Err -> _:Err ->
lager:error("[mqtt] publish failed ~p, requeueing", [Err]), lager:error("[mqtt] publish failed ~p, requeue", [Err]),
Pid = self(), gen_statem:cast(self(), {publish, Msg}),
gen_statem:cast(Pid, {publish, Msg}) ok
end. end.
%%%=================================================================== maybe_subscribe(_ConnPid, []) ->
%%% terminate/3
%%%===================================================================
-spec terminate(Reason :: term(), StateName :: atom(), State :: #state{}) -> ok.
terminate(_Reason, _StateName, #state{conn_pid = undefined}) ->
ok; ok;
terminate(_Reason, _StateName, #state{conn_pid = ConnPid, topics = Topics}) -> maybe_subscribe(ConnPid, Topics) ->
case length(Topics) > 0 of SubResult = emqtt:subscribe(ConnPid, Topics),
true -> lager:debug("[iot_mqtt_connection] subscribe topics: ~p, result: ~p", [Topics, SubResult]).
TopicNames = lists:map(fun({Name,_}) -> Name end, Topics),
{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames); reply_status(StateName, From,
false -> #state{retry = Retry, queue = Q} = State) ->
ok Reply = #{
end, state => StateName,
ok = emqtt:disconnect(ConnPid), retry => Retry,
lager:debug("[mqtt] terminate cleanup done"), queue_len => queue:len(Q)
},
{keep_state, State, [{reply, From, Reply}]}.
%%%===================================================================
%%% terminate / code_change
%%%===================================================================
terminate(_Reason, _StateName,
#state{conn_pid = undefined}) ->
ok;
terminate(_Reason, _StateName,
#state{conn_pid = ConnPid, topics = Topics}) ->
maybe_unsubscribe(ConnPid, Topics),
emqtt:disconnect(ConnPid),
ok. ok.
%%%=================================================================== maybe_unsubscribe(_ConnPid, []) ->
%%% code_change/4 ok;
%%%=================================================================== maybe_unsubscribe(ConnPid, Topics) ->
-spec code_change(term(), StateName :: atom(), State :: #state{}, Extra :: term()) -> TopicNames = [T || {T, _} <- Topics],
{ok, State :: #state{}}. emqtt:unsubscribe(ConnPid, #{}, TopicNames),
code_change(_OldVsn, _StateName, State, _Extra) -> ok.
{ok, State}.
try_reconnect() -> code_change(_OldVsn, _StateName, State, _Extra) ->
erlang:send_after(5000, self(), start_connect). {ok, State}.