Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 20b1872d22 |
201
src/emqtt_client.erl
Normal file
201
src/emqtt_client.erl
Normal file
@ -0,0 +1,201 @@
|
|||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
%%% @author aresei
|
||||||
|
%%% @doc
|
||||||
|
%%% MQTT Client based on gen_statem (state_functions)
|
||||||
|
%%% 1. 自动重连 + 掉线期间 publish 入队 + flush 队列
|
||||||
|
%%% 2. 整合的publish和订阅逻辑
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
-module(emqtt_client).
|
||||||
|
-author("aresei").
|
||||||
|
-behaviour(gen_statem).
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([start_link/3, publish_status/1, publish_msg/4]).
|
||||||
|
%% gen_statem callbacks
|
||||||
|
-export([init/1, callback_mode/0, disconnected/3, connected/3, terminate/3, code_change/4]).
|
||||||
|
|
||||||
|
-define(CONNECT_ACTION(T), {state_timeout, T, start_connect}).
|
||||||
|
|
||||||
|
-record(state, {
|
||||||
|
parent_pid :: pid(),
|
||||||
|
opts = [],
|
||||||
|
conn_pid :: pid() | undefined,
|
||||||
|
retry :: non_neg_integer(),
|
||||||
|
topics :: list(),
|
||||||
|
queue :: queue:queue()
|
||||||
|
}).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% API
|
||||||
|
%%%===================================================================
|
||||||
|
-spec start_link(pid(), list(), list()) ->
|
||||||
|
{ok, pid()} | ignore | {error, term()}.
|
||||||
|
start_link(ParentPid, Opts, Topics) when is_pid(ParentPid), is_list(Topics) ->
|
||||||
|
gen_statem:start_link(?MODULE, [ParentPid, Opts, Topics], []).
|
||||||
|
|
||||||
|
-spec publish_status(pid()) -> map().
|
||||||
|
publish_status(Pid) when is_pid(Pid) ->
|
||||||
|
gen_statem:call(Pid, status).
|
||||||
|
|
||||||
|
-spec publish_msg(Pid :: pid(), Topic :: binary(), Payload :: binary(), Qos :: integer()) -> ok.
|
||||||
|
publish_msg(Pid, Topic, Payload, Qos) when is_pid(Pid), is_binary(Topic), is_binary(Payload), Qos =:= 0 ->
|
||||||
|
gen_statem:cast(Pid, {publish, Topic, Payload, Qos}).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% gen_statem callbacks
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
callback_mode() ->
|
||||||
|
state_functions.
|
||||||
|
|
||||||
|
init([ParentPid, Opts0, Topics]) ->
|
||||||
|
erlang:process_flag(trap_exit, true),
|
||||||
|
|
||||||
|
DefaultOpts = [
|
||||||
|
{owner, self()},
|
||||||
|
{tcp_opts, []},
|
||||||
|
{auto_ack, true},
|
||||||
|
{proto_ver, v3}
|
||||||
|
],
|
||||||
|
Opts = Opts0 ++ DefaultOpts,
|
||||||
|
|
||||||
|
{ok, disconnected, #state{parent_pid = ParentPid, opts = Opts, topics = Topics, retry = 0, conn_pid = undefined, queue = queue:new()}, ?CONNECT_ACTION(0)}.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% disconnected state
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
disconnected(state_timeout, start_connect, State = #state{retry = Retry, opts = Opts, topics = Topics}) ->
|
||||||
|
case emqtt:start_link(Opts) of
|
||||||
|
{ok, ConnPid} ->
|
||||||
|
case emqtt:connect(ConnPid) of
|
||||||
|
{ok, _} ->
|
||||||
|
maybe_subscribe(ConnPid, Topics),
|
||||||
|
NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}),
|
||||||
|
{next_state, connected, NState};
|
||||||
|
{error, _Reason} ->
|
||||||
|
%% 连接错误发生的时候,emqtt进程一定会挂掉,在进程错误的时候处理; 否则连接错误和服务端关闭需要2个分支来处理
|
||||||
|
%% 这里保存进程id是为了,错误时候的模式匹配
|
||||||
|
{keep_state, State#state{conn_pid = ConnPid}}
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{keep_state, State#state{retry = Retry + 1}, ?CONNECT_ACTION(5000)}
|
||||||
|
end;
|
||||||
|
|
||||||
|
%% 掉线期间 publish → 入队
|
||||||
|
disconnected(cast, {publish, Topic, Payload, Qos}, State = #state{queue = Q}) ->
|
||||||
|
{keep_state, State#state{queue = enqueue(Q, {Topic, Payload, Qos})}};
|
||||||
|
|
||||||
|
%% status
|
||||||
|
disconnected({call, From}, status, State) ->
|
||||||
|
reply_status(disconnected, From, State);
|
||||||
|
|
||||||
|
%% 处理mqtt socket断开的问题
|
||||||
|
disconnected(info, {'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) ->
|
||||||
|
{keep_state, State, ?CONNECT_ACTION(5000)};
|
||||||
|
|
||||||
|
%% 兜底
|
||||||
|
disconnected(_Type, _Event, State) ->
|
||||||
|
{keep_state, State}.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% connected state
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
%% 正常 publish
|
||||||
|
connected(cast, {publish, Topic, Payload, Qos}, State = #state{conn_pid = ConnPid}) ->
|
||||||
|
do_publish(ConnPid, Topic, Payload, Qos),
|
||||||
|
{keep_state, State};
|
||||||
|
|
||||||
|
%% MQTT 断线
|
||||||
|
connected(info, {disconnect, _ReasonCode, _Props}, State) ->
|
||||||
|
{next_state, disconnected, State#state{conn_pid = undefined}, ?CONNECT_ACTION(5000)};
|
||||||
|
|
||||||
|
%% 处理mqtt socket断开的问题
|
||||||
|
connected(info, {'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) ->
|
||||||
|
{next_state, disconnected, State#state{conn_pid = undefined}, ?CONNECT_ACTION(5000)};
|
||||||
|
|
||||||
|
%% 收到订阅消息
|
||||||
|
connected(info, {publish, #{topic := Topic, payload := Payload, qos := Qos}}, State = #state{parent_pid = ParentPid}) ->
|
||||||
|
ParentPid ! {mqtt_message, Topic, Payload, Qos},
|
||||||
|
{keep_state, State};
|
||||||
|
|
||||||
|
%% puback
|
||||||
|
connected(info, {puback, Ack}, #state{parent_pid = ParentPid}) ->
|
||||||
|
ParentPid ! {mqtt_puback, Ack},
|
||||||
|
keep_state_and_data;
|
||||||
|
|
||||||
|
%% status
|
||||||
|
connected({call, From}, status, State) ->
|
||||||
|
reply_status(connected, From, State);
|
||||||
|
|
||||||
|
%% 兜底
|
||||||
|
connected(_Type, _Event, State) ->
|
||||||
|
{keep_state, State}.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% Internal helpers
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
-spec enqueue(queue:queue(), any()) -> queue:queue().
|
||||||
|
enqueue(Q, Msg) ->
|
||||||
|
case queue:len(Q) < 1000 of
|
||||||
|
true ->
|
||||||
|
queue:in(Msg, Q);
|
||||||
|
false ->
|
||||||
|
Q
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec flush_queue(pid(), #state{}) -> #state{}.
|
||||||
|
flush_queue(ConnPid, State = #state{queue = Q}) ->
|
||||||
|
lists:foreach(
|
||||||
|
fun({Topic, Payload, Qos}) -> do_publish(ConnPid, Topic, Payload, Qos) end,
|
||||||
|
queue:to_list(Q)
|
||||||
|
),
|
||||||
|
State#state{queue = queue:new()}.
|
||||||
|
|
||||||
|
do_publish(ConnPid, Topic, Payload, Qos) ->
|
||||||
|
try
|
||||||
|
emqtt:publish(ConnPid, Topic, Payload, Qos),
|
||||||
|
ok
|
||||||
|
catch
|
||||||
|
_ ->
|
||||||
|
gen_statem:cast(self(), {publish, Topic, Payload, Qos}),
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
maybe_subscribe(_ConnPid, []) ->
|
||||||
|
ok;
|
||||||
|
maybe_subscribe(ConnPid, Topics) ->
|
||||||
|
SubResult = emqtt:subscribe(ConnPid, Topics),
|
||||||
|
lager:debug("[iot_mqtt_connection] subscribe topics: ~p, result: ~p", [Topics, SubResult]).
|
||||||
|
|
||||||
|
reply_status(StateName, From,
|
||||||
|
#state{retry = Retry, queue = Q} = State) ->
|
||||||
|
Reply = #{
|
||||||
|
state => StateName,
|
||||||
|
retry => Retry,
|
||||||
|
queue_len => queue:len(Q)
|
||||||
|
},
|
||||||
|
{keep_state, State, [{reply, From, Reply}]}.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% terminate / code_change
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
terminate(_Reason, _StateName, #state{conn_pid = undefined}) ->
|
||||||
|
ok;
|
||||||
|
terminate(_Reason, _StateName, #state{conn_pid = ConnPid, topics = Topics}) ->
|
||||||
|
maybe_unsubscribe(ConnPid, Topics),
|
||||||
|
emqtt:disconnect(ConnPid),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
maybe_unsubscribe(_ConnPid, []) ->
|
||||||
|
ok;
|
||||||
|
maybe_unsubscribe(ConnPid, Topics) ->
|
||||||
|
TopicNames = [T || {T, _} <- Topics],
|
||||||
|
emqtt:unsubscribe(ConnPid, #{}, TopicNames),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
code_change(_OldVsn, _StateName, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
Loading…
x
Reference in New Issue
Block a user