From 450b2f6e298d46d3ac92382db8220b76cd343e72 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sat, 10 Jan 2026 18:40:44 +0800 Subject: [PATCH 1/5] fix mqtt connection --- apps/iot/src/iot_mqtt_client_sup.erl | 41 ++++++ apps/iot/src/iot_mqtt_connection.erl | 197 +++++++++++++++++++++++++++ 2 files changed, 238 insertions(+) create mode 100644 apps/iot/src/iot_mqtt_client_sup.erl create mode 100644 apps/iot/src/iot_mqtt_connection.erl diff --git a/apps/iot/src/iot_mqtt_client_sup.erl b/apps/iot/src/iot_mqtt_client_sup.erl new file mode 100644 index 0000000..ed73b6a --- /dev/null +++ b/apps/iot/src/iot_mqtt_client_sup.erl @@ -0,0 +1,41 @@ +%%%------------------------------------------------------------------- +%% @doc iot top level supervisor. +%% @end +%%%------------------------------------------------------------------- + +-module(iot_mqtt_client_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%% sup_flags() = #{strategy => strategy(), % optional +%% intensity => non_neg_integer(), % optional +%% period => pos_integer()} % optional +%% child_spec() = #{id => child_id(), % mandatory +%% start => mfargs(), % mandatory +%% restart => restart(), % optional +%% shutdown => shutdown(), % optional +%% type => worker(), % optional +%% modules => modules()} % optional +init([]) -> + SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, + Specs = [ + #{ + id => iot_watchdog, + start => {'iot_watchdog', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_watchdog'] + } + ], + + {ok, {SupFlags, Specs}}. \ No newline at end of file diff --git a/apps/iot/src/iot_mqtt_connection.erl b/apps/iot/src/iot_mqtt_connection.erl new file mode 100644 index 0000000..774f951 --- /dev/null +++ b/apps/iot/src/iot_mqtt_connection.erl @@ -0,0 +1,197 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @doc +%%% MQTT Client based on gen_statem +%%% 自动重连 + 掉线期间 publish 入队 + flush 队列 +%%%------------------------------------------------------------------- +-module(iot_mqtt_connection). +-author("aresei"). +-behaviour(gen_statem). + +-include("iot.hrl"). + +%% API +-export([start_link/1, publish_status/1, publish_msg/2]). + +%% gen_statem callbacks +-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). + +-define(DISCONNECTED, disconnected). +-define(CONNECTED, connected). + +-record(state, { + parent_pid :: pid(), + conn_pid = undefined :: pid() | undefined, + retry = 0 :: non_neg_integer(), + topics = [] :: list(), + %% 掉线期间 publish 缓冲 + queue = queue:new() :: queue:queue() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% 需要订阅的主题信息 +%% Topics: [{<<"CET/NX/upload">>, 2}] +-spec start_link(ParentPid :: pid(), Topics :: list()) -> {ok, pid()} | ignore | {error, term()}. +start_link(ParentPid, Topics) when is_pid(ParentPid), is_list(Topics) -> + gen_statem:start_link(?MODULE, [ParentPid, Topics], []). + +-spec publish_status(ConnPid :: pid()) -> #{state := atom(), retry := non_neg_integer(), queue_len := non_neg_integer()}. +publish_status(ConnPid) when is_pid(ConnPid) -> + gen_statem:call(ConnPid, status). + +-spec publish_msg(ConnPid :: pid(), Msg :: any()) -> ok. +publish_msg(ConnPid, Msg) when is_pid(ConnPid) -> + gen_statem:cast(ConnPid, {publish, Msg}). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +callback_mode() -> + handle_event_function. + +-spec init([]) -> {ok, StateName :: atom(), StateData :: #state{}}. +init([ParentPid, Topics]) -> + %% 尝试建立到服务器的连接 + self() ! start_connect, + {ok, ?DISCONNECTED, #state{parent_pid = ParentPid, topics = Topics}}. + +%%%=================================================================== +%%% handle_event/4 +%%%=================================================================== +handle_event(info, start_connect, ?DISCONNECTED, State = #state{retry = Retry, topics = Topics}) -> + Opts = iot_config:emqt_opts(<<"host-subscriber">>), + lager:debug("[mqtt] connecting, opts: ~p", [Opts]), + case emqtt:start_link(Opts) of + {ok, ConnPid} -> + {ok, _} = emqtt:connect(ConnPid), + case length(Topics) > 0 of + true -> + SubscribeResult = emqtt:subscribe(ConnPid, Topics), + lager:debug("[mqtt] connected pid: ~p subscribed: ~p", [ConnPid, SubscribeResult]); + false -> + ok + end, + NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}), + {next_state, ?CONNECTED, NState}; + ignore -> + lager:debug("[mqtt] start_link returned ignore"), + try_reconnect(), + {next_state, ?DISCONNECTED, State#state{retry = Retry + 1}}; + {error, Reason} -> + lager:debug("[mqtt] start_link returned error: ~p", [Reason]), + try_reconnect(), + {next_state, ?DISCONNECTED, State#state{retry = Retry + 1}} + end; + +%% ------------------------------- +%% connected 状态 +%% ------------------------------- +handle_event(cast, {publish, Msg}, ?CONNECTED, State = #state{conn_pid = ConnPid}) -> + do_publish(ConnPid, Msg), + {keep_state, State}; + +handle_event(info, {disconnect, ReasonCode, _Props}, ?CONNECTED, State = #state{conn_pid = _ConnPid}) -> + lager:debug("[mqtt] disconnected ReasonCode: ~p", [ReasonCode]), + try_reconnect(), + {next_state, ?DISCONNECTED, State#state{conn_pid = undefined}}; + +%% ------------------------------- +%% disconnected / reconnect_wait 状态收到 publish +%% ------------------------------- +handle_event(cast, {publish, Msg}, ?DISCONNECTED, State = #state{queue = Queue}) -> + NewQueue = enqueue(Queue, Msg), + {keep_state, State#state{queue = NewQueue}}; + +%% ------------------------------- +%% call +%% ------------------------------- +handle_event({call, From}, status, StateName, State = #state{retry = Retry}) -> + Reply = #{ + state => StateName, + retry => Retry, + queue_len => queue:len(State#state.queue) + }, + {keep_state, State, [{reply, From, Reply}]}; + +%% ------------------------------- +%% 收到推送消息 +%% ------------------------------- +handle_event(info, {publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, _StateName, State = #state{parent_pid = ParentPid}) -> + lager:debug("[iot_mqtt_subscriber] Recv a topic: ~p, publish packet: ~p, qos: ~p", [Topic, Payload, Qos]), + %% 将消息分发到对应的host进程去处理 + ParentPid ! {mqtt_message, Topic, Payload, Qos}, + {keep_state, State}; +handle_event(info, {puback, Packet = #{packet_id := _PacketId}}, _, State = #state{}) -> + lager:debug("[iot_mqtt_subscriber] receive puback packet: ~p", [Packet]), + {keep_state, State}; + +%% ------------------------------- +%% 默认信息处理 +%% ------------------------------- +handle_event(_Type, Info, StateName, State) -> + lager:debug("[mqtt] unhandled event ~p in state ~p", [Info, StateName]), + {keep_state, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec enqueue(Queue :: queue:queue(), Msg :: any()) -> NQueue :: queue:queue(). +enqueue(Queue, Msg) -> + %% 队列最大长度 1000 + case queue:len(Queue) < 1000 of + true -> + queue:in(Msg, Queue); + false -> + lager:warning("[mqtt] queue full, dropping message ~p", [Msg]), + Queue + end. + +-spec flush_queue(ConnPid :: pid(), State :: #state{}) -> NState :: #state{}. +flush_queue(ConnPid, State = #state{queue = Queue}) -> + lists:foreach(fun(Msg) -> do_publish(ConnPid, Msg) end, queue:to_list(Queue)), + State#state{queue = queue:new()}. + +-spec do_publish(ConnPid :: pid(), Msg :: any()) -> no_return(). +do_publish(ConnPid, Msg) -> + try + emqtt:publish(ConnPid, Msg) + catch + _:Err -> + lager:error("[mqtt] publish failed ~p, requeueing", [Err]), + Pid = self(), + gen_statem:cast(Pid, {publish, Msg}) + end. + +%%%=================================================================== +%%% terminate/3 +%%%=================================================================== +-spec terminate(Reason :: term(), StateName :: atom(), State :: #state{}) -> ok. +terminate(_Reason, _StateName, #state{conn_pid = undefined}) -> + ok; +terminate(_Reason, _StateName, #state{conn_pid = ConnPid, topics = Topics}) -> + case length(Topics) > 0 of + true -> + TopicNames = lists:map(fun({Name,_}) -> Name end, Topics), + {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames); + false -> + ok + end, + ok = emqtt:disconnect(ConnPid), + lager:debug("[mqtt] terminate cleanup done"), + ok. + +%%%=================================================================== +%%% code_change/4 +%%%=================================================================== +-spec code_change(term(), StateName :: atom(), State :: #state{}, Extra :: term()) -> + {ok, State :: #state{}}. +code_change(_OldVsn, _StateName, State, _Extra) -> + {ok, State}. + +try_reconnect() -> + erlang:send_after(5000, self(), start_connect). \ No newline at end of file -- 2.47.2 From 41e88100ec28ece9fe08b70eb23d2046c02d2d67 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 12 Jan 2026 10:15:57 +0800 Subject: [PATCH 2/5] fix mqtt --- apps/iot/src/iot_mqtt_connection.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/iot/src/iot_mqtt_connection.erl b/apps/iot/src/iot_mqtt_connection.erl index 774f951..8ced897 100644 --- a/apps/iot/src/iot_mqtt_connection.erl +++ b/apps/iot/src/iot_mqtt_connection.erl @@ -11,7 +11,7 @@ -include("iot.hrl"). %% API --export([start_link/1, publish_status/1, publish_msg/2]). +-export([start_link/2, publish_status/1, publish_msg/2]). %% gen_statem callbacks -export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). -- 2.47.2 From ff1fb5e36384ddc7da0ffabbf0493713df61bcda Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 12 Jan 2026 10:49:47 +0800 Subject: [PATCH 3/5] fix mqtt connection --- apps/iot/src/iot_mqtt_connection.erl | 250 +++++++++++++-------------- 1 file changed, 119 insertions(+), 131 deletions(-) diff --git a/apps/iot/src/iot_mqtt_connection.erl b/apps/iot/src/iot_mqtt_connection.erl index 8ced897..7b981b3 100644 --- a/apps/iot/src/iot_mqtt_connection.erl +++ b/apps/iot/src/iot_mqtt_connection.erl @@ -1,197 +1,185 @@ %%%------------------------------------------------------------------- %%% @author aresei %%% @doc -%%% MQTT Client based on gen_statem +%%% MQTT Client based on gen_statem (state_functions) %%% 自动重连 + 掉线期间 publish 入队 + flush 队列 %%%------------------------------------------------------------------- -module(iot_mqtt_connection). -author("aresei"). -behaviour(gen_statem). --include("iot.hrl"). - %% API -export([start_link/2, publish_status/1, publish_msg/2]). %% gen_statem callbacks --export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). - --define(DISCONNECTED, disconnected). --define(CONNECTED, connected). +-export([init/1, callback_mode/0, disconnected/3, connected/3, terminate/3, code_change/4]). -record(state, { - parent_pid :: pid(), - conn_pid = undefined :: pid() | undefined, - retry = 0 :: non_neg_integer(), - topics = [] :: list(), - %% 掉线期间 publish 缓冲 - queue = queue:new() :: queue:queue() + parent_pid :: pid(), + conn_pid :: pid() | undefined, + retry :: non_neg_integer(), + topics :: list(), + queue :: queue:queue() }). %%%=================================================================== %%% API %%%=================================================================== -%% 需要订阅的主题信息 -%% Topics: [{<<"CET/NX/upload">>, 2}] --spec start_link(ParentPid :: pid(), Topics :: list()) -> {ok, pid()} | ignore | {error, term()}. -start_link(ParentPid, Topics) when is_pid(ParentPid), is_list(Topics) -> +-spec start_link(pid(), list()) -> + {ok, pid()} | ignore | {error, term()}. +start_link(ParentPid, Topics) + when is_pid(ParentPid), is_list(Topics) -> gen_statem:start_link(?MODULE, [ParentPid, Topics], []). --spec publish_status(ConnPid :: pid()) -> #{state := atom(), retry := non_neg_integer(), queue_len := non_neg_integer()}. -publish_status(ConnPid) when is_pid(ConnPid) -> - gen_statem:call(ConnPid, status). +-spec publish_status(pid()) -> map(). +publish_status(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, status). --spec publish_msg(ConnPid :: pid(), Msg :: any()) -> ok. -publish_msg(ConnPid, Msg) when is_pid(ConnPid) -> - gen_statem:cast(ConnPid, {publish, Msg}). +-spec publish_msg(pid(), any()) -> ok. +publish_msg(Pid, Msg) when is_pid(Pid) -> + gen_statem:cast(Pid, {publish, Msg}). %%%=================================================================== %%% gen_statem callbacks %%%=================================================================== callback_mode() -> - handle_event_function. + state_functions. --spec init([]) -> {ok, StateName :: atom(), StateData :: #state{}}. init([ParentPid, Topics]) -> - %% 尝试建立到服务器的连接 self() ! start_connect, - {ok, ?DISCONNECTED, #state{parent_pid = ParentPid, topics = Topics}}. + {ok, disconnected, #state{parent_pid = ParentPid, topics = Topics, retry = 0, conn_pid = undefined, queue = queue:new()}}. %%%=================================================================== -%%% handle_event/4 +%%% disconnected state %%%=================================================================== -handle_event(info, start_connect, ?DISCONNECTED, State = #state{retry = Retry, topics = Topics}) -> + +disconnected(info, start_connect, State = #state{retry = Retry, topics = Topics}) -> Opts = iot_config:emqt_opts(<<"host-subscriber">>), - lager:debug("[mqtt] connecting, opts: ~p", [Opts]), + lager:debug("[mqtt] connecting, opts=~p", [Opts]), case emqtt:start_link(Opts) of {ok, ConnPid} -> {ok, _} = emqtt:connect(ConnPid), - case length(Topics) > 0 of - true -> - SubscribeResult = emqtt:subscribe(ConnPid, Topics), - lager:debug("[mqtt] connected pid: ~p subscribed: ~p", [ConnPid, SubscribeResult]); - false -> - ok - end, + maybe_subscribe(ConnPid, Topics), NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}), - {next_state, ?CONNECTED, NState}; - ignore -> - lager:debug("[mqtt] start_link returned ignore"), - try_reconnect(), - {next_state, ?DISCONNECTED, State#state{retry = Retry + 1}}; - {error, Reason} -> - lager:debug("[mqtt] start_link returned error: ~p", [Reason]), - try_reconnect(), - {next_state, ?DISCONNECTED, State#state{retry = Retry + 1}} + {next_state, connected, NState}; + _ -> + {keep_state, State#state{retry = Retry + 1}, [{timeout, start_connect, 5000}]} end; -%% ------------------------------- -%% connected 状态 -%% ------------------------------- -handle_event(cast, {publish, Msg}, ?CONNECTED, State = #state{conn_pid = ConnPid}) -> - do_publish(ConnPid, Msg), - {keep_state, State}; +%% 掉线期间 publish → 入队 +disconnected(cast, {publish, Msg}, State = #state{queue = Q}) -> + {keep_state, State#state{queue = enqueue(Q, Msg)}}; -handle_event(info, {disconnect, ReasonCode, _Props}, ?CONNECTED, State = #state{conn_pid = _ConnPid}) -> - lager:debug("[mqtt] disconnected ReasonCode: ~p", [ReasonCode]), - try_reconnect(), - {next_state, ?DISCONNECTED, State#state{conn_pid = undefined}}; +%% status +disconnected({call, From}, status, State) -> + reply_status(disconnected, From, State); -%% ------------------------------- -%% disconnected / reconnect_wait 状态收到 publish -%% ------------------------------- -handle_event(cast, {publish, Msg}, ?DISCONNECTED, State = #state{queue = Queue}) -> - NewQueue = enqueue(Queue, Msg), - {keep_state, State#state{queue = NewQueue}}; - -%% ------------------------------- -%% call -%% ------------------------------- -handle_event({call, From}, status, StateName, State = #state{retry = Retry}) -> - Reply = #{ - state => StateName, - retry => Retry, - queue_len => queue:len(State#state.queue) - }, - {keep_state, State, [{reply, From, Reply}]}; - -%% ------------------------------- -%% 收到推送消息 -%% ------------------------------- -handle_event(info, {publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, _StateName, State = #state{parent_pid = ParentPid}) -> - lager:debug("[iot_mqtt_subscriber] Recv a topic: ~p, publish packet: ~p, qos: ~p", [Topic, Payload, Qos]), - %% 将消息分发到对应的host进程去处理 - ParentPid ! {mqtt_message, Topic, Payload, Qos}, - {keep_state, State}; -handle_event(info, {puback, Packet = #{packet_id := _PacketId}}, _, State = #state{}) -> - lager:debug("[iot_mqtt_subscriber] receive puback packet: ~p", [Packet]), - {keep_state, State}; - -%% ------------------------------- -%% 默认信息处理 -%% ------------------------------- -handle_event(_Type, Info, StateName, State) -> - lager:debug("[mqtt] unhandled event ~p in state ~p", [Info, StateName]), +%% 兜底 +disconnected(_Type, _Event, State) -> {keep_state, State}. %%%=================================================================== -%%% Internal functions +%%% connected state %%%=================================================================== --spec enqueue(Queue :: queue:queue(), Msg :: any()) -> NQueue :: queue:queue(). -enqueue(Queue, Msg) -> - %% 队列最大长度 1000 - case queue:len(Queue) < 1000 of +%% 正常 publish +connected(cast, {publish, Msg}, State = #state{conn_pid = ConnPid}) -> + do_publish(ConnPid, Msg), + {keep_state, State}; + +%% MQTT 断线 +connected(info, {disconnect, ReasonCode, _Props}, State) -> + lager:warning("[mqtt] disconnected, reason=~p", [ReasonCode]), + {next_state, disconnected, State#state{conn_pid = undefined}, [{timeout, start_connect, 5000}]}; + +%% 收到订阅消息 +connected(info, {publish, #{topic := Topic, payload := Payload, qos := Qos}}, State = #state{parent_pid = ParentPid}) -> + ParentPid ! {mqtt_message, Topic, Payload, Qos}, + {keep_state, State}; + +%% puback +connected(info, {puback, Packet}, _State) -> + lager:debug("[mqtt] puback ~p", [Packet]), + keep_state_and_data; + +%% status +connected({call, From}, status, State) -> + reply_status(connected, From, State); + +%% 兜底 +connected(_Type, _Event, State) -> + {keep_state, State}. + +%%%=================================================================== +%%% Internal helpers +%%%=================================================================== + +-spec enqueue(queue:queue(), any()) -> queue:queue(). +enqueue(Q, Msg) -> + case queue:len(Q) < 1000 of true -> - queue:in(Msg, Queue); + queue:in(Msg, Q); false -> - lager:warning("[mqtt] queue full, dropping message ~p", [Msg]), - Queue + lager:warning("[mqtt] queue full, drop msg"), + Q end. --spec flush_queue(ConnPid :: pid(), State :: #state{}) -> NState :: #state{}. -flush_queue(ConnPid, State = #state{queue = Queue}) -> - lists:foreach(fun(Msg) -> do_publish(ConnPid, Msg) end, queue:to_list(Queue)), +-spec flush_queue(pid(), #state{}) -> #state{}. +flush_queue(ConnPid, State = #state{queue = Q}) -> + lists:foreach( + fun(Msg) -> do_publish(ConnPid, Msg) end, + queue:to_list(Q) + ), State#state{queue = queue:new()}. --spec do_publish(ConnPid :: pid(), Msg :: any()) -> no_return(). +-spec do_publish(pid(), any()) -> ok. do_publish(ConnPid, Msg) -> try - emqtt:publish(ConnPid, Msg) + emqtt:publish(ConnPid, Msg), + ok catch _:Err -> - lager:error("[mqtt] publish failed ~p, requeueing", [Err]), - Pid = self(), - gen_statem:cast(Pid, {publish, Msg}) + lager:error("[mqtt] publish failed ~p, requeue", [Err]), + gen_statem:cast(self(), {publish, Msg}), + ok end. -%%%=================================================================== -%%% terminate/3 -%%%=================================================================== --spec terminate(Reason :: term(), StateName :: atom(), State :: #state{}) -> ok. -terminate(_Reason, _StateName, #state{conn_pid = undefined}) -> +maybe_subscribe(_ConnPid, []) -> ok; -terminate(_Reason, _StateName, #state{conn_pid = ConnPid, topics = Topics}) -> - case length(Topics) > 0 of - true -> - TopicNames = lists:map(fun({Name,_}) -> Name end, Topics), - {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames); - false -> - ok - end, - ok = emqtt:disconnect(ConnPid), - lager:debug("[mqtt] terminate cleanup done"), +maybe_subscribe(ConnPid, Topics) -> + SubResult = emqtt:subscribe(ConnPid, Topics), + lager:debug("[iot_mqtt_connection] subscribe topics: ~p, result: ~p", [Topics, SubResult]). + +reply_status(StateName, From, + #state{retry = Retry, queue = Q} = State) -> + Reply = #{ + state => StateName, + retry => Retry, + queue_len => queue:len(Q) + }, + {keep_state, State, [{reply, From, Reply}]}. + +%%%=================================================================== +%%% terminate / code_change +%%%=================================================================== + +terminate(_Reason, _StateName, + #state{conn_pid = undefined}) -> + ok; +terminate(_Reason, _StateName, + #state{conn_pid = ConnPid, topics = Topics}) -> + maybe_unsubscribe(ConnPid, Topics), + emqtt:disconnect(ConnPid), ok. -%%%=================================================================== -%%% code_change/4 -%%%=================================================================== --spec code_change(term(), StateName :: atom(), State :: #state{}, Extra :: term()) -> - {ok, State :: #state{}}. -code_change(_OldVsn, _StateName, State, _Extra) -> - {ok, State}. +maybe_unsubscribe(_ConnPid, []) -> + ok; +maybe_unsubscribe(ConnPid, Topics) -> + TopicNames = [T || {T, _} <- Topics], + emqtt:unsubscribe(ConnPid, #{}, TopicNames), + ok. -try_reconnect() -> - erlang:send_after(5000, self(), start_connect). \ No newline at end of file +code_change(_OldVsn, _StateName, State, _Extra) -> + {ok, State}. \ No newline at end of file -- 2.47.2 From 0b0aa2659f1c6465d77ceabfd18ff7ac45264bac Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 12 Jan 2026 12:12:31 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E4=BA=86mqtt=E7=9A=84?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/iot/src/iot_mqtt_connection.erl | 93 ++++++++++++++++------------ 1 file changed, 55 insertions(+), 38 deletions(-) diff --git a/apps/iot/src/iot_mqtt_connection.erl b/apps/iot/src/iot_mqtt_connection.erl index 7b981b3..267fb88 100644 --- a/apps/iot/src/iot_mqtt_connection.erl +++ b/apps/iot/src/iot_mqtt_connection.erl @@ -9,13 +9,15 @@ -behaviour(gen_statem). %% API --export([start_link/2, publish_status/1, publish_msg/2]). - +-export([start_link/3, publish_status/1, publish_msg/4]). %% gen_statem callbacks -export([init/1, callback_mode/0, disconnected/3, connected/3, terminate/3, code_change/4]). +-define(CONNECT_ACTION(T), {state_timeout, T, start_connect}). + -record(state, { parent_pid :: pid(), + opts = [], conn_pid :: pid() | undefined, retry :: non_neg_integer(), topics :: list(), @@ -25,20 +27,18 @@ %%%=================================================================== %%% API %%%=================================================================== - --spec start_link(pid(), list()) -> +-spec start_link(pid(), list(), list()) -> {ok, pid()} | ignore | {error, term()}. -start_link(ParentPid, Topics) - when is_pid(ParentPid), is_list(Topics) -> - gen_statem:start_link(?MODULE, [ParentPid, Topics], []). +start_link(ParentPid, Opts, Topics) when is_pid(ParentPid), is_list(Topics) -> + gen_statem:start_link(?MODULE, [ParentPid, Opts, Topics], []). -spec publish_status(pid()) -> map(). publish_status(Pid) when is_pid(Pid) -> gen_statem:call(Pid, status). --spec publish_msg(pid(), any()) -> ok. -publish_msg(Pid, Msg) when is_pid(Pid) -> - gen_statem:cast(Pid, {publish, Msg}). +-spec publish_msg(Pid :: pid(), Topic :: binary(), Payload :: binary(), Qos :: integer()) -> ok. +publish_msg(Pid, Topic, Payload, Qos) when is_pid(Pid), is_binary(Topic), is_binary(Payload), is_integer(Qos) -> + gen_statem:cast(Pid, {publish, Topic, Payload, Qos}). %%%=================================================================== %%% gen_statem callbacks @@ -47,35 +47,52 @@ publish_msg(Pid, Msg) when is_pid(Pid) -> callback_mode() -> state_functions. -init([ParentPid, Topics]) -> - self() ! start_connect, - {ok, disconnected, #state{parent_pid = ParentPid, topics = Topics, retry = 0, conn_pid = undefined, queue = queue:new()}}. +init([ParentPid, Opts0, Topics]) -> + erlang:process_flag(trap_exit, true), + + DefaultOpts = [ + {owner, self()}, + {tcp_opts, []}, + {auto_ack, true}, + {proto_ver, v3} + ], + Opts = Opts0 ++ DefaultOpts, + + {ok, disconnected, #state{parent_pid = ParentPid, opts = Opts, topics = Topics, retry = 0, conn_pid = undefined, queue = queue:new()}, ?CONNECT_ACTION(0)}. %%%=================================================================== %%% disconnected state %%%=================================================================== -disconnected(info, start_connect, State = #state{retry = Retry, topics = Topics}) -> - Opts = iot_config:emqt_opts(<<"host-subscriber">>), - lager:debug("[mqtt] connecting, opts=~p", [Opts]), +disconnected(state_timeout, start_connect, State = #state{retry = Retry, opts = Opts, topics = Topics}) -> case emqtt:start_link(Opts) of {ok, ConnPid} -> - {ok, _} = emqtt:connect(ConnPid), - maybe_subscribe(ConnPid, Topics), - NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}), - {next_state, connected, NState}; + case emqtt:connect(ConnPid) of + {ok, _} -> + maybe_subscribe(ConnPid, Topics), + NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}), + {next_state, connected, NState}; + {error, _Reason} -> + %% 连接错误发生的时候,emqtt进程一定会挂掉,在进程错误的时候处理; 否则连接错误和服务端关闭需要2个分支来处理 + %% 这里保存进程id是为了,错误时候的模式匹配 + {keep_state, State#state{conn_pid = ConnPid}} + end; _ -> - {keep_state, State#state{retry = Retry + 1}, [{timeout, start_connect, 5000}]} + {keep_state, State#state{retry = Retry + 1}, ?CONNECT_ACTION(5000)} end; %% 掉线期间 publish → 入队 -disconnected(cast, {publish, Msg}, State = #state{queue = Q}) -> - {keep_state, State#state{queue = enqueue(Q, Msg)}}; +disconnected(cast, {publish, Topic, Payload, Qos}, State = #state{queue = Q}) -> + {keep_state, State#state{queue = enqueue(Q, {Topic, Payload, Qos})}}; %% status disconnected({call, From}, status, State) -> reply_status(disconnected, From, State); +%% 处理mqtt socket断开的问题 +disconnected(info, {'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) -> + {keep_state, State, ?CONNECT_ACTION(5000)}; + %% 兜底 disconnected(_Type, _Event, State) -> {keep_state, State}. @@ -85,14 +102,17 @@ disconnected(_Type, _Event, State) -> %%%=================================================================== %% 正常 publish -connected(cast, {publish, Msg}, State = #state{conn_pid = ConnPid}) -> - do_publish(ConnPid, Msg), +connected(cast, {publish, Topic, Payload, Qos}, State = #state{conn_pid = ConnPid}) -> + do_publish(ConnPid, Topic, Payload, Qos), {keep_state, State}; %% MQTT 断线 -connected(info, {disconnect, ReasonCode, _Props}, State) -> - lager:warning("[mqtt] disconnected, reason=~p", [ReasonCode]), - {next_state, disconnected, State#state{conn_pid = undefined}, [{timeout, start_connect, 5000}]}; +connected(info, {disconnect, _ReasonCode, _Props}, State) -> + {next_state, disconnected, State#state{conn_pid = undefined}, ?CONNECT_ACTION(5000)}; + +%% 处理mqtt socket断开的问题 +connected(info, {'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) -> + {next_state, disconnected, State#state{conn_pid = undefined}, ?CONNECT_ACTION(5000)}; %% 收到订阅消息 connected(info, {publish, #{topic := Topic, payload := Payload, qos := Qos}}, State = #state{parent_pid = ParentPid}) -> @@ -100,8 +120,8 @@ connected(info, {publish, #{topic := Topic, payload := Payload, qos := Qos}}, St {keep_state, State}; %% puback -connected(info, {puback, Packet}, _State) -> - lager:debug("[mqtt] puback ~p", [Packet]), +connected(info, {puback, Ack}, #state{parent_pid = ParentPid}) -> + ParentPid ! {mqtt_puback, Ack}, keep_state_and_data; %% status @@ -122,27 +142,24 @@ enqueue(Q, Msg) -> true -> queue:in(Msg, Q); false -> - lager:warning("[mqtt] queue full, drop msg"), Q end. -spec flush_queue(pid(), #state{}) -> #state{}. flush_queue(ConnPid, State = #state{queue = Q}) -> lists:foreach( - fun(Msg) -> do_publish(ConnPid, Msg) end, + fun({Topic, Payload, Qos}) -> do_publish(ConnPid, Topic, Payload, Qos) end, queue:to_list(Q) ), State#state{queue = queue:new()}. --spec do_publish(pid(), any()) -> ok. -do_publish(ConnPid, Msg) -> +do_publish(ConnPid, Topic, Payload, Qos) -> try - emqtt:publish(ConnPid, Msg), + emqtt:publish(ConnPid, Topic, Payload, Qos), ok catch - _:Err -> - lager:error("[mqtt] publish failed ~p, requeue", [Err]), - gen_statem:cast(self(), {publish, Msg}), + _ -> + gen_statem:cast(self(), {publish, Topic, Payload, Qos}), ok end. -- 2.47.2 From 504f82109a3d5aff3b3884c7207c671d48ffd0e3 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 12 Jan 2026 15:36:02 +0800 Subject: [PATCH 5/5] add aircon command --- apps/iot/src/http_handler/host_handler.erl | 21 +++ apps/iot/src/iot_mqtt_aircon_gateway.erl | 177 +++++++++++++++++++++ apps/iot/src/iot_mqtt_client_sup.erl | 41 ----- apps/iot/src/iot_sup.erl | 9 ++ config/sys-dev.config | 8 +- config/sys-prod.config | 10 ++ 6 files changed, 221 insertions(+), 45 deletions(-) create mode 100644 apps/iot/src/iot_mqtt_aircon_gateway.erl delete mode 100644 apps/iot/src/iot_mqtt_client_sup.erl diff --git a/apps/iot/src/http_handler/host_handler.erl b/apps/iot/src/http_handler/host_handler.erl index 39bbf4a..4fba434 100644 --- a/apps/iot/src/http_handler/host_handler.erl +++ b/apps/iot/src/http_handler/host_handler.erl @@ -137,6 +137,27 @@ handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> := {ok, 200, iot_util:json_data(<<"success">>)} end; +%% 空调控制参数下发 +handle_request("POST", "/host/publish_aircon_command", _, + PostParams = #{<<"uuid">> := UUID, <<"timeout">> := Timeout0, <<"device_id">> := DeviceId, <<"command">> := Command}) + when is_binary(UUID), is_integer(Timeout0) -> + + Timeout = Timeout0 * 1000, + lager:debug("[http_host_handler] publish_aircon_command body is: ~p", [PostParams]), + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + Ref = iot_mqtt_aircon_gateway:send_command(DeviceId, Command), + receive + {send_command_reply, Ref, Reply} -> + {ok, 200, iot_util:json_data(Reply)} + after + Timeout -> + {ok, 200, iot_util:json_error(401, <<"timeout">>)} + end + end; + handle_request(_, Path, _, _) -> Path1 = list_to_binary(Path), {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. diff --git a/apps/iot/src/iot_mqtt_aircon_gateway.erl b/apps/iot/src/iot_mqtt_aircon_gateway.erl new file mode 100644 index 0000000..e4f9ebc --- /dev/null +++ b/apps/iot/src/iot_mqtt_aircon_gateway.erl @@ -0,0 +1,177 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2026, +%%% @doc +%%% +%%% @end +%%% Created : 12. 1月 2026 14:30 +%%%------------------------------------------------------------------- +-module(iot_mqtt_aircon_gateway). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([send_command/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + conn_pid :: pid(), + %% 保存请求和响应的对应关系: #{{DeviceId, sessionId} => {Ref, ReceiverPid}} + inflight = #{} +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec send_command(DeviceId :: binary(), Command :: map()) -> Ref :: reference(). +send_command(DeviceId, Command) when is_binary(DeviceId), is_map(Command) -> + Ref = make_ref(), + ReceiverPid = self(), + gen_server:cast(?SERVER, {send_command, Ref, ReceiverPid, DeviceId, Command}), + Ref. + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + %% 建立到emqx服务器的连接 + {ok, Props} = application:get_env(iot, aircon_emqx_server), + EMQXHost = proplists:get_value(host, Props), + EMQXPort = proplists:get_value(port, Props, 1883), + Username = proplists:get_value(username, Props), + Password = proplists:get_value(password, Props), + RetryInterval = proplists:get_value(retry_interval, Props, 5), + Keepalive = proplists:get_value(keepalive, Props, 86400), + + ClientId = <<"mqtt-client-iot-aircon-gateway">>, + Opts = [ + {clientid, ClientId}, + {host, EMQXHost}, + {port, EMQXPort}, + {username, Username}, + {password, Password}, + {keepalive, Keepalive}, + {retry_interval, RetryInterval} + ], + + %% 需要订阅的消息 + Topics = [{<<"/aircon/+/command_reply">>, 2}], + {ok, ConnPid} = iot_mqtt_connection:start_link(self(), Opts, Topics), + + {ok, #state{conn_pid = ConnPid}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({send_command, Ref, ReceiverPid, DeviceId, Command}, State = #state{conn_pid = ConnPid, inflight = Inflight}) -> + Topic = <<"/aircon/", DeviceId/binary, "/command">>, + SessionId = new_session_id(), + + NCommand = jiffy:encode(Command#{<<"sessionid">> => SessionId}, [force_utf8]), + iot_mqtt_connection:publish_msg(ConnPid, Topic, NCommand, 2), + + NInflight = maps:put({DeviceId, SessionId}, {Ref, ReceiverPid}, Inflight), + {noreply, State#state{inflight = NInflight}}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({mqtt_message, Topic, Payload, _Qos}, State = #state{inflight = Inflight}) -> + case binary:split(Topic, <<"/">>) of + [<<>>, <<"aircon">>, DeviceId, <<"command_reply">>] -> + Reply = catch jiffy:decode(Payload, [return_maps]), + case Reply of + #{<<"sessionid">> := SessionId} -> + case maps:take({DeviceId, SessionId}, Inflight) of + error -> + {noreply, State}; + {{Ref, ReceiverPid}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {send_command_reply, Ref, Reply}; + false -> + ok + end, + {noreply, State#state{inflight = NInflight}} + end; + _ -> + {noreply, State} + end; + _ -> + lager:notice("[~p] get a invalid topic: ~p, message: ~p", [?MODULE, Topic, Payload]), + {noreply, State} + end; + +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% 生成固定长度 ID +-spec new_session_id() -> integer(). +new_session_id() -> + %% 10位时间戳(毫秒取后10位) + {Mega, Seconds, _} = os:timestamp(), + Timestamp = Mega * 1000000 + Seconds, + %% 5位单调序号 + Counter = erlang:unique_integer([monotonic, positive]) rem 100000, + %% 拼接成整数 + Timestamp * 100000 + Counter. \ No newline at end of file diff --git a/apps/iot/src/iot_mqtt_client_sup.erl b/apps/iot/src/iot_mqtt_client_sup.erl deleted file mode 100644 index ed73b6a..0000000 --- a/apps/iot/src/iot_mqtt_client_sup.erl +++ /dev/null @@ -1,41 +0,0 @@ -%%%------------------------------------------------------------------- -%% @doc iot top level supervisor. -%% @end -%%%------------------------------------------------------------------- - --module(iot_mqtt_client_sup). - --behaviour(supervisor). - --export([start_link/0]). - --export([init/1]). - --define(SERVER, ?MODULE). - -start_link() -> - supervisor:start_link({local, ?SERVER}, ?MODULE, []). - -%% sup_flags() = #{strategy => strategy(), % optional -%% intensity => non_neg_integer(), % optional -%% period => pos_integer()} % optional -%% child_spec() = #{id => child_id(), % mandatory -%% start => mfargs(), % mandatory -%% restart => restart(), % optional -%% shutdown => shutdown(), % optional -%% type => worker(), % optional -%% modules => modules()} % optional -init([]) -> - SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, - Specs = [ - #{ - id => iot_watchdog, - start => {'iot_watchdog', start_link, []}, - restart => permanent, - shutdown => 2000, - type => worker, - modules => ['iot_watchdog'] - } - ], - - {ok, {SupFlags, Specs}}. \ No newline at end of file diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 03834d8..79333eb 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -143,6 +143,15 @@ init([]) -> shutdown => 2000, type => worker, modules => ['iot_jinzhi_endpoint'] + }, + + #{ + id => 'iot_mqtt_aircon_gateway', + start => {'iot_mqtt_aircon_gateway', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_mqtt_aircon_gateway'] } ], diff --git a/config/sys-dev.config b/config/sys-dev.config index ee29974..b00e5e0 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -37,12 +37,12 @@ ]}, %% 目标服务器地址 - {emqx_server, [ - {host, {39, 98, 184, 67}}, + {aircon_emqx_server, [ + {host, "118.178.229.213"}, {port, 1883}, {tcp_opts, []}, - {username, "test"}, - {password, "test1234"}, + {username, "aircon"}, + {password, "A9K2rM8QxL7WZsP5D@B!3"}, {keepalive, 86400}, {retry_interval, 5} ]}, diff --git a/config/sys-prod.config b/config/sys-prod.config index 7410074..d0b4a6e 100644 --- a/config/sys-prod.config +++ b/config/sys-prod.config @@ -25,6 +25,16 @@ 15 => 300 }}, + {aircon_emqx_server, [ + {host, "172.30.37.212"}, + {port, 1883}, + {tcp_opts, []}, + {username, "aircon"}, + {password, "A9K2rM8QxL7WZsP5D@B!3"}, + {keepalive, 86400}, + {retry_interval, 5} + ]}, + {watchdog, [ {pri_key, "jinzhi_watchdog_pri.key"}, {url, "http://172.30.37.242:8080/hqtaskcenterapp/sys/taskCenter/taskReceive/sendNotice.do"}, -- 2.47.2