From ff1fb5e36384ddc7da0ffabbf0493713df61bcda Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 12 Jan 2026 10:49:47 +0800 Subject: [PATCH] fix mqtt connection --- apps/iot/src/iot_mqtt_connection.erl | 250 +++++++++++++-------------- 1 file changed, 119 insertions(+), 131 deletions(-) diff --git a/apps/iot/src/iot_mqtt_connection.erl b/apps/iot/src/iot_mqtt_connection.erl index 8ced897..7b981b3 100644 --- a/apps/iot/src/iot_mqtt_connection.erl +++ b/apps/iot/src/iot_mqtt_connection.erl @@ -1,197 +1,185 @@ %%%------------------------------------------------------------------- %%% @author aresei %%% @doc -%%% MQTT Client based on gen_statem +%%% MQTT Client based on gen_statem (state_functions) %%% 自动重连 + 掉线期间 publish 入队 + flush 队列 %%%------------------------------------------------------------------- -module(iot_mqtt_connection). -author("aresei"). -behaviour(gen_statem). --include("iot.hrl"). - %% API -export([start_link/2, publish_status/1, publish_msg/2]). %% gen_statem callbacks --export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). - --define(DISCONNECTED, disconnected). --define(CONNECTED, connected). +-export([init/1, callback_mode/0, disconnected/3, connected/3, terminate/3, code_change/4]). -record(state, { - parent_pid :: pid(), - conn_pid = undefined :: pid() | undefined, - retry = 0 :: non_neg_integer(), - topics = [] :: list(), - %% 掉线期间 publish 缓冲 - queue = queue:new() :: queue:queue() + parent_pid :: pid(), + conn_pid :: pid() | undefined, + retry :: non_neg_integer(), + topics :: list(), + queue :: queue:queue() }). %%%=================================================================== %%% API %%%=================================================================== -%% 需要订阅的主题信息 -%% Topics: [{<<"CET/NX/upload">>, 2}] --spec start_link(ParentPid :: pid(), Topics :: list()) -> {ok, pid()} | ignore | {error, term()}. -start_link(ParentPid, Topics) when is_pid(ParentPid), is_list(Topics) -> +-spec start_link(pid(), list()) -> + {ok, pid()} | ignore | {error, term()}. +start_link(ParentPid, Topics) + when is_pid(ParentPid), is_list(Topics) -> gen_statem:start_link(?MODULE, [ParentPid, Topics], []). --spec publish_status(ConnPid :: pid()) -> #{state := atom(), retry := non_neg_integer(), queue_len := non_neg_integer()}. -publish_status(ConnPid) when is_pid(ConnPid) -> - gen_statem:call(ConnPid, status). +-spec publish_status(pid()) -> map(). +publish_status(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, status). --spec publish_msg(ConnPid :: pid(), Msg :: any()) -> ok. -publish_msg(ConnPid, Msg) when is_pid(ConnPid) -> - gen_statem:cast(ConnPid, {publish, Msg}). +-spec publish_msg(pid(), any()) -> ok. +publish_msg(Pid, Msg) when is_pid(Pid) -> + gen_statem:cast(Pid, {publish, Msg}). %%%=================================================================== %%% gen_statem callbacks %%%=================================================================== callback_mode() -> - handle_event_function. + state_functions. --spec init([]) -> {ok, StateName :: atom(), StateData :: #state{}}. init([ParentPid, Topics]) -> - %% 尝试建立到服务器的连接 self() ! start_connect, - {ok, ?DISCONNECTED, #state{parent_pid = ParentPid, topics = Topics}}. + {ok, disconnected, #state{parent_pid = ParentPid, topics = Topics, retry = 0, conn_pid = undefined, queue = queue:new()}}. %%%=================================================================== -%%% handle_event/4 +%%% disconnected state %%%=================================================================== -handle_event(info, start_connect, ?DISCONNECTED, State = #state{retry = Retry, topics = Topics}) -> + +disconnected(info, start_connect, State = #state{retry = Retry, topics = Topics}) -> Opts = iot_config:emqt_opts(<<"host-subscriber">>), - lager:debug("[mqtt] connecting, opts: ~p", [Opts]), + lager:debug("[mqtt] connecting, opts=~p", [Opts]), case emqtt:start_link(Opts) of {ok, ConnPid} -> {ok, _} = emqtt:connect(ConnPid), - case length(Topics) > 0 of - true -> - SubscribeResult = emqtt:subscribe(ConnPid, Topics), - lager:debug("[mqtt] connected pid: ~p subscribed: ~p", [ConnPid, SubscribeResult]); - false -> - ok - end, + maybe_subscribe(ConnPid, Topics), NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}), - {next_state, ?CONNECTED, NState}; - ignore -> - lager:debug("[mqtt] start_link returned ignore"), - try_reconnect(), - {next_state, ?DISCONNECTED, State#state{retry = Retry + 1}}; - {error, Reason} -> - lager:debug("[mqtt] start_link returned error: ~p", [Reason]), - try_reconnect(), - {next_state, ?DISCONNECTED, State#state{retry = Retry + 1}} + {next_state, connected, NState}; + _ -> + {keep_state, State#state{retry = Retry + 1}, [{timeout, start_connect, 5000}]} end; -%% ------------------------------- -%% connected 状态 -%% ------------------------------- -handle_event(cast, {publish, Msg}, ?CONNECTED, State = #state{conn_pid = ConnPid}) -> - do_publish(ConnPid, Msg), - {keep_state, State}; +%% 掉线期间 publish → 入队 +disconnected(cast, {publish, Msg}, State = #state{queue = Q}) -> + {keep_state, State#state{queue = enqueue(Q, Msg)}}; -handle_event(info, {disconnect, ReasonCode, _Props}, ?CONNECTED, State = #state{conn_pid = _ConnPid}) -> - lager:debug("[mqtt] disconnected ReasonCode: ~p", [ReasonCode]), - try_reconnect(), - {next_state, ?DISCONNECTED, State#state{conn_pid = undefined}}; +%% status +disconnected({call, From}, status, State) -> + reply_status(disconnected, From, State); -%% ------------------------------- -%% disconnected / reconnect_wait 状态收到 publish -%% ------------------------------- -handle_event(cast, {publish, Msg}, ?DISCONNECTED, State = #state{queue = Queue}) -> - NewQueue = enqueue(Queue, Msg), - {keep_state, State#state{queue = NewQueue}}; - -%% ------------------------------- -%% call -%% ------------------------------- -handle_event({call, From}, status, StateName, State = #state{retry = Retry}) -> - Reply = #{ - state => StateName, - retry => Retry, - queue_len => queue:len(State#state.queue) - }, - {keep_state, State, [{reply, From, Reply}]}; - -%% ------------------------------- -%% 收到推送消息 -%% ------------------------------- -handle_event(info, {publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, _StateName, State = #state{parent_pid = ParentPid}) -> - lager:debug("[iot_mqtt_subscriber] Recv a topic: ~p, publish packet: ~p, qos: ~p", [Topic, Payload, Qos]), - %% 将消息分发到对应的host进程去处理 - ParentPid ! {mqtt_message, Topic, Payload, Qos}, - {keep_state, State}; -handle_event(info, {puback, Packet = #{packet_id := _PacketId}}, _, State = #state{}) -> - lager:debug("[iot_mqtt_subscriber] receive puback packet: ~p", [Packet]), - {keep_state, State}; - -%% ------------------------------- -%% 默认信息处理 -%% ------------------------------- -handle_event(_Type, Info, StateName, State) -> - lager:debug("[mqtt] unhandled event ~p in state ~p", [Info, StateName]), +%% 兜底 +disconnected(_Type, _Event, State) -> {keep_state, State}. %%%=================================================================== -%%% Internal functions +%%% connected state %%%=================================================================== --spec enqueue(Queue :: queue:queue(), Msg :: any()) -> NQueue :: queue:queue(). -enqueue(Queue, Msg) -> - %% 队列最大长度 1000 - case queue:len(Queue) < 1000 of +%% 正常 publish +connected(cast, {publish, Msg}, State = #state{conn_pid = ConnPid}) -> + do_publish(ConnPid, Msg), + {keep_state, State}; + +%% MQTT 断线 +connected(info, {disconnect, ReasonCode, _Props}, State) -> + lager:warning("[mqtt] disconnected, reason=~p", [ReasonCode]), + {next_state, disconnected, State#state{conn_pid = undefined}, [{timeout, start_connect, 5000}]}; + +%% 收到订阅消息 +connected(info, {publish, #{topic := Topic, payload := Payload, qos := Qos}}, State = #state{parent_pid = ParentPid}) -> + ParentPid ! {mqtt_message, Topic, Payload, Qos}, + {keep_state, State}; + +%% puback +connected(info, {puback, Packet}, _State) -> + lager:debug("[mqtt] puback ~p", [Packet]), + keep_state_and_data; + +%% status +connected({call, From}, status, State) -> + reply_status(connected, From, State); + +%% 兜底 +connected(_Type, _Event, State) -> + {keep_state, State}. + +%%%=================================================================== +%%% Internal helpers +%%%=================================================================== + +-spec enqueue(queue:queue(), any()) -> queue:queue(). +enqueue(Q, Msg) -> + case queue:len(Q) < 1000 of true -> - queue:in(Msg, Queue); + queue:in(Msg, Q); false -> - lager:warning("[mqtt] queue full, dropping message ~p", [Msg]), - Queue + lager:warning("[mqtt] queue full, drop msg"), + Q end. --spec flush_queue(ConnPid :: pid(), State :: #state{}) -> NState :: #state{}. -flush_queue(ConnPid, State = #state{queue = Queue}) -> - lists:foreach(fun(Msg) -> do_publish(ConnPid, Msg) end, queue:to_list(Queue)), +-spec flush_queue(pid(), #state{}) -> #state{}. +flush_queue(ConnPid, State = #state{queue = Q}) -> + lists:foreach( + fun(Msg) -> do_publish(ConnPid, Msg) end, + queue:to_list(Q) + ), State#state{queue = queue:new()}. --spec do_publish(ConnPid :: pid(), Msg :: any()) -> no_return(). +-spec do_publish(pid(), any()) -> ok. do_publish(ConnPid, Msg) -> try - emqtt:publish(ConnPid, Msg) + emqtt:publish(ConnPid, Msg), + ok catch _:Err -> - lager:error("[mqtt] publish failed ~p, requeueing", [Err]), - Pid = self(), - gen_statem:cast(Pid, {publish, Msg}) + lager:error("[mqtt] publish failed ~p, requeue", [Err]), + gen_statem:cast(self(), {publish, Msg}), + ok end. -%%%=================================================================== -%%% terminate/3 -%%%=================================================================== --spec terminate(Reason :: term(), StateName :: atom(), State :: #state{}) -> ok. -terminate(_Reason, _StateName, #state{conn_pid = undefined}) -> +maybe_subscribe(_ConnPid, []) -> ok; -terminate(_Reason, _StateName, #state{conn_pid = ConnPid, topics = Topics}) -> - case length(Topics) > 0 of - true -> - TopicNames = lists:map(fun({Name,_}) -> Name end, Topics), - {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames); - false -> - ok - end, - ok = emqtt:disconnect(ConnPid), - lager:debug("[mqtt] terminate cleanup done"), +maybe_subscribe(ConnPid, Topics) -> + SubResult = emqtt:subscribe(ConnPid, Topics), + lager:debug("[iot_mqtt_connection] subscribe topics: ~p, result: ~p", [Topics, SubResult]). + +reply_status(StateName, From, + #state{retry = Retry, queue = Q} = State) -> + Reply = #{ + state => StateName, + retry => Retry, + queue_len => queue:len(Q) + }, + {keep_state, State, [{reply, From, Reply}]}. + +%%%=================================================================== +%%% terminate / code_change +%%%=================================================================== + +terminate(_Reason, _StateName, + #state{conn_pid = undefined}) -> + ok; +terminate(_Reason, _StateName, + #state{conn_pid = ConnPid, topics = Topics}) -> + maybe_unsubscribe(ConnPid, Topics), + emqtt:disconnect(ConnPid), ok. -%%%=================================================================== -%%% code_change/4 -%%%=================================================================== --spec code_change(term(), StateName :: atom(), State :: #state{}, Extra :: term()) -> - {ok, State :: #state{}}. -code_change(_OldVsn, _StateName, State, _Extra) -> - {ok, State}. +maybe_unsubscribe(_ConnPid, []) -> + ok; +maybe_unsubscribe(ConnPid, Topics) -> + TopicNames = [T || {T, _} <- Topics], + emqtt:unsubscribe(ConnPid, #{}, TopicNames), + ok. -try_reconnect() -> - erlang:send_after(5000, self(), start_connect). \ No newline at end of file +code_change(_OldVsn, _StateName, State, _Extra) -> + {ok, State}. \ No newline at end of file