diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl new file mode 100644 index 0000000..71300cc --- /dev/null +++ b/src/emqtt_client.erl @@ -0,0 +1,201 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @doc +%%% MQTT Client based on gen_statem (state_functions) +%%% 1. 自动重连 + 掉线期间 publish 入队 + flush 队列 +%%% 2. 整合的publish和订阅逻辑 +%%%------------------------------------------------------------------- +-module(emqtt_client). +-author("aresei"). +-behaviour(gen_statem). + +%% API +-export([start_link/3, publish_status/1, publish_msg/4]). +%% gen_statem callbacks +-export([init/1, callback_mode/0, disconnected/3, connected/3, terminate/3, code_change/4]). + +-define(CONNECT_ACTION(T), {state_timeout, T, start_connect}). + +-record(state, { + parent_pid :: pid(), + opts = [], + conn_pid :: pid() | undefined, + retry :: non_neg_integer(), + topics :: list(), + queue :: queue:queue() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== +-spec start_link(pid(), list(), list()) -> + {ok, pid()} | ignore | {error, term()}. +start_link(ParentPid, Opts, Topics) when is_pid(ParentPid), is_list(Topics) -> + gen_statem:start_link(?MODULE, [ParentPid, Opts, Topics], []). + +-spec publish_status(pid()) -> map(). +publish_status(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, status). + +-spec publish_msg(Pid :: pid(), Topic :: binary(), Payload :: binary(), Qos :: integer()) -> ok. +publish_msg(Pid, Topic, Payload, Qos) when is_pid(Pid), is_binary(Topic), is_binary(Payload), Qos =:= 0 -> + gen_statem:cast(Pid, {publish, Topic, Payload, Qos}). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +callback_mode() -> + state_functions. + +init([ParentPid, Opts0, Topics]) -> + erlang:process_flag(trap_exit, true), + + DefaultOpts = [ + {owner, self()}, + {tcp_opts, []}, + {auto_ack, true}, + {proto_ver, v3} + ], + Opts = Opts0 ++ DefaultOpts, + + {ok, disconnected, #state{parent_pid = ParentPid, opts = Opts, topics = Topics, retry = 0, conn_pid = undefined, queue = queue:new()}, ?CONNECT_ACTION(0)}. + +%%%=================================================================== +%%% disconnected state +%%%=================================================================== + +disconnected(state_timeout, start_connect, State = #state{retry = Retry, opts = Opts, topics = Topics}) -> + case emqtt:start_link(Opts) of + {ok, ConnPid} -> + case emqtt:connect(ConnPid) of + {ok, _} -> + maybe_subscribe(ConnPid, Topics), + NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}), + {next_state, connected, NState}; + {error, _Reason} -> + %% 连接错误发生的时候,emqtt进程一定会挂掉,在进程错误的时候处理; 否则连接错误和服务端关闭需要2个分支来处理 + %% 这里保存进程id是为了,错误时候的模式匹配 + {keep_state, State#state{conn_pid = ConnPid}} + end; + _ -> + {keep_state, State#state{retry = Retry + 1}, ?CONNECT_ACTION(5000)} + end; + +%% 掉线期间 publish → 入队 +disconnected(cast, {publish, Topic, Payload, Qos}, State = #state{queue = Q}) -> + {keep_state, State#state{queue = enqueue(Q, {Topic, Payload, Qos})}}; + +%% status +disconnected({call, From}, status, State) -> + reply_status(disconnected, From, State); + +%% 处理mqtt socket断开的问题 +disconnected(info, {'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) -> + {keep_state, State, ?CONNECT_ACTION(5000)}; + +%% 兜底 +disconnected(_Type, _Event, State) -> + {keep_state, State}. + +%%%=================================================================== +%%% connected state +%%%=================================================================== + +%% 正常 publish +connected(cast, {publish, Topic, Payload, Qos}, State = #state{conn_pid = ConnPid}) -> + do_publish(ConnPid, Topic, Payload, Qos), + {keep_state, State}; + +%% MQTT 断线 +connected(info, {disconnect, _ReasonCode, _Props}, State) -> + {next_state, disconnected, State#state{conn_pid = undefined}, ?CONNECT_ACTION(5000)}; + +%% 处理mqtt socket断开的问题 +connected(info, {'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) -> + {next_state, disconnected, State#state{conn_pid = undefined}, ?CONNECT_ACTION(5000)}; + +%% 收到订阅消息 +connected(info, {publish, #{topic := Topic, payload := Payload, qos := Qos}}, State = #state{parent_pid = ParentPid}) -> + ParentPid ! {mqtt_message, Topic, Payload, Qos}, + {keep_state, State}; + +%% puback +connected(info, {puback, Ack}, #state{parent_pid = ParentPid}) -> + ParentPid ! {mqtt_puback, Ack}, + keep_state_and_data; + +%% status +connected({call, From}, status, State) -> + reply_status(connected, From, State); + +%% 兜底 +connected(_Type, _Event, State) -> + {keep_state, State}. + +%%%=================================================================== +%%% Internal helpers +%%%=================================================================== + +-spec enqueue(queue:queue(), any()) -> queue:queue(). +enqueue(Q, Msg) -> + case queue:len(Q) < 1000 of + true -> + queue:in(Msg, Q); + false -> + Q + end. + +-spec flush_queue(pid(), #state{}) -> #state{}. +flush_queue(ConnPid, State = #state{queue = Q}) -> + lists:foreach( + fun({Topic, Payload, Qos}) -> do_publish(ConnPid, Topic, Payload, Qos) end, + queue:to_list(Q) + ), + State#state{queue = queue:new()}. + +do_publish(ConnPid, Topic, Payload, Qos) -> + try + emqtt:publish(ConnPid, Topic, Payload, Qos), + ok + catch + _ -> + gen_statem:cast(self(), {publish, Topic, Payload, Qos}), + ok + end. + +maybe_subscribe(_ConnPid, []) -> + ok; +maybe_subscribe(ConnPid, Topics) -> + SubResult = emqtt:subscribe(ConnPid, Topics), + lager:debug("[iot_mqtt_connection] subscribe topics: ~p, result: ~p", [Topics, SubResult]). + +reply_status(StateName, From, + #state{retry = Retry, queue = Q} = State) -> + Reply = #{ + state => StateName, + retry => Retry, + queue_len => queue:len(Q) + }, + {keep_state, State, [{reply, From, Reply}]}. + +%%%=================================================================== +%%% terminate / code_change +%%%=================================================================== + +terminate(_Reason, _StateName, #state{conn_pid = undefined}) -> + ok; +terminate(_Reason, _StateName, #state{conn_pid = ConnPid, topics = Topics}) -> + maybe_unsubscribe(ConnPid, Topics), + emqtt:disconnect(ConnPid), + ok. + +maybe_unsubscribe(_ConnPid, []) -> + ok; +maybe_unsubscribe(ConnPid, Topics) -> + TopicNames = [T || {T, _} <- Topics], + emqtt:unsubscribe(ConnPid, #{}, TopicNames), + ok. + +code_change(_OldVsn, _StateName, State, _Extra) -> + {ok, State}. \ No newline at end of file