Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 20b1872d22 |
201
src/emqtt_client.erl
Normal file
201
src/emqtt_client.erl
Normal file
@ -0,0 +1,201 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @doc
|
||||
%%% MQTT Client based on gen_statem (state_functions)
|
||||
%%% 1. 自动重连 + 掉线期间 publish 入队 + flush 队列
|
||||
%%% 2. 整合的publish和订阅逻辑
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(emqtt_client).
|
||||
-author("aresei").
|
||||
-behaviour(gen_statem).
|
||||
|
||||
%% API
|
||||
-export([start_link/3, publish_status/1, publish_msg/4]).
|
||||
%% gen_statem callbacks
|
||||
-export([init/1, callback_mode/0, disconnected/3, connected/3, terminate/3, code_change/4]).
|
||||
|
||||
-define(CONNECT_ACTION(T), {state_timeout, T, start_connect}).
|
||||
|
||||
-record(state, {
|
||||
parent_pid :: pid(),
|
||||
opts = [],
|
||||
conn_pid :: pid() | undefined,
|
||||
retry :: non_neg_integer(),
|
||||
topics :: list(),
|
||||
queue :: queue:queue()
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
-spec start_link(pid(), list(), list()) ->
|
||||
{ok, pid()} | ignore | {error, term()}.
|
||||
start_link(ParentPid, Opts, Topics) when is_pid(ParentPid), is_list(Topics) ->
|
||||
gen_statem:start_link(?MODULE, [ParentPid, Opts, Topics], []).
|
||||
|
||||
-spec publish_status(pid()) -> map().
|
||||
publish_status(Pid) when is_pid(Pid) ->
|
||||
gen_statem:call(Pid, status).
|
||||
|
||||
-spec publish_msg(Pid :: pid(), Topic :: binary(), Payload :: binary(), Qos :: integer()) -> ok.
|
||||
publish_msg(Pid, Topic, Payload, Qos) when is_pid(Pid), is_binary(Topic), is_binary(Payload), Qos =:= 0 ->
|
||||
gen_statem:cast(Pid, {publish, Topic, Payload, Qos}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_statem callbacks
|
||||
%%%===================================================================
|
||||
|
||||
callback_mode() ->
|
||||
state_functions.
|
||||
|
||||
init([ParentPid, Opts0, Topics]) ->
|
||||
erlang:process_flag(trap_exit, true),
|
||||
|
||||
DefaultOpts = [
|
||||
{owner, self()},
|
||||
{tcp_opts, []},
|
||||
{auto_ack, true},
|
||||
{proto_ver, v3}
|
||||
],
|
||||
Opts = Opts0 ++ DefaultOpts,
|
||||
|
||||
{ok, disconnected, #state{parent_pid = ParentPid, opts = Opts, topics = Topics, retry = 0, conn_pid = undefined, queue = queue:new()}, ?CONNECT_ACTION(0)}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% disconnected state
|
||||
%%%===================================================================
|
||||
|
||||
disconnected(state_timeout, start_connect, State = #state{retry = Retry, opts = Opts, topics = Topics}) ->
|
||||
case emqtt:start_link(Opts) of
|
||||
{ok, ConnPid} ->
|
||||
case emqtt:connect(ConnPid) of
|
||||
{ok, _} ->
|
||||
maybe_subscribe(ConnPid, Topics),
|
||||
NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}),
|
||||
{next_state, connected, NState};
|
||||
{error, _Reason} ->
|
||||
%% 连接错误发生的时候,emqtt进程一定会挂掉,在进程错误的时候处理; 否则连接错误和服务端关闭需要2个分支来处理
|
||||
%% 这里保存进程id是为了,错误时候的模式匹配
|
||||
{keep_state, State#state{conn_pid = ConnPid}}
|
||||
end;
|
||||
_ ->
|
||||
{keep_state, State#state{retry = Retry + 1}, ?CONNECT_ACTION(5000)}
|
||||
end;
|
||||
|
||||
%% 掉线期间 publish → 入队
|
||||
disconnected(cast, {publish, Topic, Payload, Qos}, State = #state{queue = Q}) ->
|
||||
{keep_state, State#state{queue = enqueue(Q, {Topic, Payload, Qos})}};
|
||||
|
||||
%% status
|
||||
disconnected({call, From}, status, State) ->
|
||||
reply_status(disconnected, From, State);
|
||||
|
||||
%% 处理mqtt socket断开的问题
|
||||
disconnected(info, {'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) ->
|
||||
{keep_state, State, ?CONNECT_ACTION(5000)};
|
||||
|
||||
%% 兜底
|
||||
disconnected(_Type, _Event, State) ->
|
||||
{keep_state, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% connected state
|
||||
%%%===================================================================
|
||||
|
||||
%% 正常 publish
|
||||
connected(cast, {publish, Topic, Payload, Qos}, State = #state{conn_pid = ConnPid}) ->
|
||||
do_publish(ConnPid, Topic, Payload, Qos),
|
||||
{keep_state, State};
|
||||
|
||||
%% MQTT 断线
|
||||
connected(info, {disconnect, _ReasonCode, _Props}, State) ->
|
||||
{next_state, disconnected, State#state{conn_pid = undefined}, ?CONNECT_ACTION(5000)};
|
||||
|
||||
%% 处理mqtt socket断开的问题
|
||||
connected(info, {'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) ->
|
||||
{next_state, disconnected, State#state{conn_pid = undefined}, ?CONNECT_ACTION(5000)};
|
||||
|
||||
%% 收到订阅消息
|
||||
connected(info, {publish, #{topic := Topic, payload := Payload, qos := Qos}}, State = #state{parent_pid = ParentPid}) ->
|
||||
ParentPid ! {mqtt_message, Topic, Payload, Qos},
|
||||
{keep_state, State};
|
||||
|
||||
%% puback
|
||||
connected(info, {puback, Ack}, #state{parent_pid = ParentPid}) ->
|
||||
ParentPid ! {mqtt_puback, Ack},
|
||||
keep_state_and_data;
|
||||
|
||||
%% status
|
||||
connected({call, From}, status, State) ->
|
||||
reply_status(connected, From, State);
|
||||
|
||||
%% 兜底
|
||||
connected(_Type, _Event, State) ->
|
||||
{keep_state, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal helpers
|
||||
%%%===================================================================
|
||||
|
||||
-spec enqueue(queue:queue(), any()) -> queue:queue().
|
||||
enqueue(Q, Msg) ->
|
||||
case queue:len(Q) < 1000 of
|
||||
true ->
|
||||
queue:in(Msg, Q);
|
||||
false ->
|
||||
Q
|
||||
end.
|
||||
|
||||
-spec flush_queue(pid(), #state{}) -> #state{}.
|
||||
flush_queue(ConnPid, State = #state{queue = Q}) ->
|
||||
lists:foreach(
|
||||
fun({Topic, Payload, Qos}) -> do_publish(ConnPid, Topic, Payload, Qos) end,
|
||||
queue:to_list(Q)
|
||||
),
|
||||
State#state{queue = queue:new()}.
|
||||
|
||||
do_publish(ConnPid, Topic, Payload, Qos) ->
|
||||
try
|
||||
emqtt:publish(ConnPid, Topic, Payload, Qos),
|
||||
ok
|
||||
catch
|
||||
_ ->
|
||||
gen_statem:cast(self(), {publish, Topic, Payload, Qos}),
|
||||
ok
|
||||
end.
|
||||
|
||||
maybe_subscribe(_ConnPid, []) ->
|
||||
ok;
|
||||
maybe_subscribe(ConnPid, Topics) ->
|
||||
SubResult = emqtt:subscribe(ConnPid, Topics),
|
||||
lager:debug("[iot_mqtt_connection] subscribe topics: ~p, result: ~p", [Topics, SubResult]).
|
||||
|
||||
reply_status(StateName, From,
|
||||
#state{retry = Retry, queue = Q} = State) ->
|
||||
Reply = #{
|
||||
state => StateName,
|
||||
retry => Retry,
|
||||
queue_len => queue:len(Q)
|
||||
},
|
||||
{keep_state, State, [{reply, From, Reply}]}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% terminate / code_change
|
||||
%%%===================================================================
|
||||
|
||||
terminate(_Reason, _StateName, #state{conn_pid = undefined}) ->
|
||||
ok;
|
||||
terminate(_Reason, _StateName, #state{conn_pid = ConnPid, topics = Topics}) ->
|
||||
maybe_unsubscribe(ConnPid, Topics),
|
||||
emqtt:disconnect(ConnPid),
|
||||
ok.
|
||||
|
||||
maybe_unsubscribe(_ConnPid, []) ->
|
||||
ok;
|
||||
maybe_unsubscribe(ConnPid, Topics) ->
|
||||
TopicNames = [T || {T, _} <- Topics],
|
||||
emqtt:unsubscribe(ConnPid, #{}, TopicNames),
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, _StateName, State, _Extra) ->
|
||||
{ok, State}.
|
||||
Loading…
x
Reference in New Issue
Block a user