diff --git a/apps/iot/src/iot_mqtt_client_sup.erl b/apps/iot/src/iot_mqtt_client_sup.erl new file mode 100644 index 0000000..ed73b6a --- /dev/null +++ b/apps/iot/src/iot_mqtt_client_sup.erl @@ -0,0 +1,41 @@ +%%%------------------------------------------------------------------- +%% @doc iot top level supervisor. +%% @end +%%%------------------------------------------------------------------- + +-module(iot_mqtt_client_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%% sup_flags() = #{strategy => strategy(), % optional +%% intensity => non_neg_integer(), % optional +%% period => pos_integer()} % optional +%% child_spec() = #{id => child_id(), % mandatory +%% start => mfargs(), % mandatory +%% restart => restart(), % optional +%% shutdown => shutdown(), % optional +%% type => worker(), % optional +%% modules => modules()} % optional +init([]) -> + SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, + Specs = [ + #{ + id => iot_watchdog, + start => {'iot_watchdog', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_watchdog'] + } + ], + + {ok, {SupFlags, Specs}}. \ No newline at end of file diff --git a/apps/iot/src/iot_mqtt_connection.erl b/apps/iot/src/iot_mqtt_connection.erl new file mode 100644 index 0000000..774f951 --- /dev/null +++ b/apps/iot/src/iot_mqtt_connection.erl @@ -0,0 +1,197 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @doc +%%% MQTT Client based on gen_statem +%%% 自动重连 + 掉线期间 publish 入队 + flush 队列 +%%%------------------------------------------------------------------- +-module(iot_mqtt_connection). +-author("aresei"). +-behaviour(gen_statem). + +-include("iot.hrl"). + +%% API +-export([start_link/1, publish_status/1, publish_msg/2]). + +%% gen_statem callbacks +-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). + +-define(DISCONNECTED, disconnected). +-define(CONNECTED, connected). + +-record(state, { + parent_pid :: pid(), + conn_pid = undefined :: pid() | undefined, + retry = 0 :: non_neg_integer(), + topics = [] :: list(), + %% 掉线期间 publish 缓冲 + queue = queue:new() :: queue:queue() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% 需要订阅的主题信息 +%% Topics: [{<<"CET/NX/upload">>, 2}] +-spec start_link(ParentPid :: pid(), Topics :: list()) -> {ok, pid()} | ignore | {error, term()}. +start_link(ParentPid, Topics) when is_pid(ParentPid), is_list(Topics) -> + gen_statem:start_link(?MODULE, [ParentPid, Topics], []). + +-spec publish_status(ConnPid :: pid()) -> #{state := atom(), retry := non_neg_integer(), queue_len := non_neg_integer()}. +publish_status(ConnPid) when is_pid(ConnPid) -> + gen_statem:call(ConnPid, status). + +-spec publish_msg(ConnPid :: pid(), Msg :: any()) -> ok. +publish_msg(ConnPid, Msg) when is_pid(ConnPid) -> + gen_statem:cast(ConnPid, {publish, Msg}). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +callback_mode() -> + handle_event_function. + +-spec init([]) -> {ok, StateName :: atom(), StateData :: #state{}}. +init([ParentPid, Topics]) -> + %% 尝试建立到服务器的连接 + self() ! start_connect, + {ok, ?DISCONNECTED, #state{parent_pid = ParentPid, topics = Topics}}. + +%%%=================================================================== +%%% handle_event/4 +%%%=================================================================== +handle_event(info, start_connect, ?DISCONNECTED, State = #state{retry = Retry, topics = Topics}) -> + Opts = iot_config:emqt_opts(<<"host-subscriber">>), + lager:debug("[mqtt] connecting, opts: ~p", [Opts]), + case emqtt:start_link(Opts) of + {ok, ConnPid} -> + {ok, _} = emqtt:connect(ConnPid), + case length(Topics) > 0 of + true -> + SubscribeResult = emqtt:subscribe(ConnPid, Topics), + lager:debug("[mqtt] connected pid: ~p subscribed: ~p", [ConnPid, SubscribeResult]); + false -> + ok + end, + NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}), + {next_state, ?CONNECTED, NState}; + ignore -> + lager:debug("[mqtt] start_link returned ignore"), + try_reconnect(), + {next_state, ?DISCONNECTED, State#state{retry = Retry + 1}}; + {error, Reason} -> + lager:debug("[mqtt] start_link returned error: ~p", [Reason]), + try_reconnect(), + {next_state, ?DISCONNECTED, State#state{retry = Retry + 1}} + end; + +%% ------------------------------- +%% connected 状态 +%% ------------------------------- +handle_event(cast, {publish, Msg}, ?CONNECTED, State = #state{conn_pid = ConnPid}) -> + do_publish(ConnPid, Msg), + {keep_state, State}; + +handle_event(info, {disconnect, ReasonCode, _Props}, ?CONNECTED, State = #state{conn_pid = _ConnPid}) -> + lager:debug("[mqtt] disconnected ReasonCode: ~p", [ReasonCode]), + try_reconnect(), + {next_state, ?DISCONNECTED, State#state{conn_pid = undefined}}; + +%% ------------------------------- +%% disconnected / reconnect_wait 状态收到 publish +%% ------------------------------- +handle_event(cast, {publish, Msg}, ?DISCONNECTED, State = #state{queue = Queue}) -> + NewQueue = enqueue(Queue, Msg), + {keep_state, State#state{queue = NewQueue}}; + +%% ------------------------------- +%% call +%% ------------------------------- +handle_event({call, From}, status, StateName, State = #state{retry = Retry}) -> + Reply = #{ + state => StateName, + retry => Retry, + queue_len => queue:len(State#state.queue) + }, + {keep_state, State, [{reply, From, Reply}]}; + +%% ------------------------------- +%% 收到推送消息 +%% ------------------------------- +handle_event(info, {publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, _StateName, State = #state{parent_pid = ParentPid}) -> + lager:debug("[iot_mqtt_subscriber] Recv a topic: ~p, publish packet: ~p, qos: ~p", [Topic, Payload, Qos]), + %% 将消息分发到对应的host进程去处理 + ParentPid ! {mqtt_message, Topic, Payload, Qos}, + {keep_state, State}; +handle_event(info, {puback, Packet = #{packet_id := _PacketId}}, _, State = #state{}) -> + lager:debug("[iot_mqtt_subscriber] receive puback packet: ~p", [Packet]), + {keep_state, State}; + +%% ------------------------------- +%% 默认信息处理 +%% ------------------------------- +handle_event(_Type, Info, StateName, State) -> + lager:debug("[mqtt] unhandled event ~p in state ~p", [Info, StateName]), + {keep_state, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec enqueue(Queue :: queue:queue(), Msg :: any()) -> NQueue :: queue:queue(). +enqueue(Queue, Msg) -> + %% 队列最大长度 1000 + case queue:len(Queue) < 1000 of + true -> + queue:in(Msg, Queue); + false -> + lager:warning("[mqtt] queue full, dropping message ~p", [Msg]), + Queue + end. + +-spec flush_queue(ConnPid :: pid(), State :: #state{}) -> NState :: #state{}. +flush_queue(ConnPid, State = #state{queue = Queue}) -> + lists:foreach(fun(Msg) -> do_publish(ConnPid, Msg) end, queue:to_list(Queue)), + State#state{queue = queue:new()}. + +-spec do_publish(ConnPid :: pid(), Msg :: any()) -> no_return(). +do_publish(ConnPid, Msg) -> + try + emqtt:publish(ConnPid, Msg) + catch + _:Err -> + lager:error("[mqtt] publish failed ~p, requeueing", [Err]), + Pid = self(), + gen_statem:cast(Pid, {publish, Msg}) + end. + +%%%=================================================================== +%%% terminate/3 +%%%=================================================================== +-spec terminate(Reason :: term(), StateName :: atom(), State :: #state{}) -> ok. +terminate(_Reason, _StateName, #state{conn_pid = undefined}) -> + ok; +terminate(_Reason, _StateName, #state{conn_pid = ConnPid, topics = Topics}) -> + case length(Topics) > 0 of + true -> + TopicNames = lists:map(fun({Name,_}) -> Name end, Topics), + {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames); + false -> + ok + end, + ok = emqtt:disconnect(ConnPid), + lager:debug("[mqtt] terminate cleanup done"), + ok. + +%%%=================================================================== +%%% code_change/4 +%%%=================================================================== +-spec code_change(term(), StateName :: atom(), State :: #state{}, Extra :: term()) -> + {ok, State :: #state{}}. +code_change(_OldVsn, _StateName, State, _Extra) -> + {ok, State}. + +try_reconnect() -> + erlang:send_after(5000, self(), start_connect). \ No newline at end of file