add emqtt_client

This commit is contained in:
anlicheng 2026-01-13 11:41:37 +08:00
parent c590588b16
commit 20b1872d22

201
src/emqtt_client.erl Normal file
View File

@ -0,0 +1,201 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @doc
%%% MQTT Client based on gen_statem (state_functions)
%%% 1. + 线 publish + flush
%%% 2. publish和订阅逻辑
%%%-------------------------------------------------------------------
-module(emqtt_client).
-author("aresei").
-behaviour(gen_statem).
%% API
-export([start_link/3, publish_status/1, publish_msg/4]).
%% gen_statem callbacks
-export([init/1, callback_mode/0, disconnected/3, connected/3, terminate/3, code_change/4]).
-define(CONNECT_ACTION(T), {state_timeout, T, start_connect}).
-record(state, {
parent_pid :: pid(),
opts = [],
conn_pid :: pid() | undefined,
retry :: non_neg_integer(),
topics :: list(),
queue :: queue:queue()
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec start_link(pid(), list(), list()) ->
{ok, pid()} | ignore | {error, term()}.
start_link(ParentPid, Opts, Topics) when is_pid(ParentPid), is_list(Topics) ->
gen_statem:start_link(?MODULE, [ParentPid, Opts, Topics], []).
-spec publish_status(pid()) -> map().
publish_status(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, status).
-spec publish_msg(Pid :: pid(), Topic :: binary(), Payload :: binary(), Qos :: integer()) -> ok.
publish_msg(Pid, Topic, Payload, Qos) when is_pid(Pid), is_binary(Topic), is_binary(Payload), Qos =:= 0 ->
gen_statem:cast(Pid, {publish, Topic, Payload, Qos}).
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
callback_mode() ->
state_functions.
init([ParentPid, Opts0, Topics]) ->
erlang:process_flag(trap_exit, true),
DefaultOpts = [
{owner, self()},
{tcp_opts, []},
{auto_ack, true},
{proto_ver, v3}
],
Opts = Opts0 ++ DefaultOpts,
{ok, disconnected, #state{parent_pid = ParentPid, opts = Opts, topics = Topics, retry = 0, conn_pid = undefined, queue = queue:new()}, ?CONNECT_ACTION(0)}.
%%%===================================================================
%%% disconnected state
%%%===================================================================
disconnected(state_timeout, start_connect, State = #state{retry = Retry, opts = Opts, topics = Topics}) ->
case emqtt:start_link(Opts) of
{ok, ConnPid} ->
case emqtt:connect(ConnPid) of
{ok, _} ->
maybe_subscribe(ConnPid, Topics),
NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}),
{next_state, connected, NState};
{error, _Reason} ->
%% emqtt进程一定会挂掉; 2
%% id是为了
{keep_state, State#state{conn_pid = ConnPid}}
end;
_ ->
{keep_state, State#state{retry = Retry + 1}, ?CONNECT_ACTION(5000)}
end;
%% 线 publish
disconnected(cast, {publish, Topic, Payload, Qos}, State = #state{queue = Q}) ->
{keep_state, State#state{queue = enqueue(Q, {Topic, Payload, Qos})}};
%% status
disconnected({call, From}, status, State) ->
reply_status(disconnected, From, State);
%% mqtt socket断开的问题
disconnected(info, {'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) ->
{keep_state, State, ?CONNECT_ACTION(5000)};
%%
disconnected(_Type, _Event, State) ->
{keep_state, State}.
%%%===================================================================
%%% connected state
%%%===================================================================
%% publish
connected(cast, {publish, Topic, Payload, Qos}, State = #state{conn_pid = ConnPid}) ->
do_publish(ConnPid, Topic, Payload, Qos),
{keep_state, State};
%% MQTT 线
connected(info, {disconnect, _ReasonCode, _Props}, State) ->
{next_state, disconnected, State#state{conn_pid = undefined}, ?CONNECT_ACTION(5000)};
%% mqtt socket断开的问题
connected(info, {'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) ->
{next_state, disconnected, State#state{conn_pid = undefined}, ?CONNECT_ACTION(5000)};
%%
connected(info, {publish, #{topic := Topic, payload := Payload, qos := Qos}}, State = #state{parent_pid = ParentPid}) ->
ParentPid ! {mqtt_message, Topic, Payload, Qos},
{keep_state, State};
%% puback
connected(info, {puback, Ack}, #state{parent_pid = ParentPid}) ->
ParentPid ! {mqtt_puback, Ack},
keep_state_and_data;
%% status
connected({call, From}, status, State) ->
reply_status(connected, From, State);
%%
connected(_Type, _Event, State) ->
{keep_state, State}.
%%%===================================================================
%%% Internal helpers
%%%===================================================================
-spec enqueue(queue:queue(), any()) -> queue:queue().
enqueue(Q, Msg) ->
case queue:len(Q) < 1000 of
true ->
queue:in(Msg, Q);
false ->
Q
end.
-spec flush_queue(pid(), #state{}) -> #state{}.
flush_queue(ConnPid, State = #state{queue = Q}) ->
lists:foreach(
fun({Topic, Payload, Qos}) -> do_publish(ConnPid, Topic, Payload, Qos) end,
queue:to_list(Q)
),
State#state{queue = queue:new()}.
do_publish(ConnPid, Topic, Payload, Qos) ->
try
emqtt:publish(ConnPid, Topic, Payload, Qos),
ok
catch
_ ->
gen_statem:cast(self(), {publish, Topic, Payload, Qos}),
ok
end.
maybe_subscribe(_ConnPid, []) ->
ok;
maybe_subscribe(ConnPid, Topics) ->
SubResult = emqtt:subscribe(ConnPid, Topics),
lager:debug("[iot_mqtt_connection] subscribe topics: ~p, result: ~p", [Topics, SubResult]).
reply_status(StateName, From,
#state{retry = Retry, queue = Q} = State) ->
Reply = #{
state => StateName,
retry => Retry,
queue_len => queue:len(Q)
},
{keep_state, State, [{reply, From, Reply}]}.
%%%===================================================================
%%% terminate / code_change
%%%===================================================================
terminate(_Reason, _StateName, #state{conn_pid = undefined}) ->
ok;
terminate(_Reason, _StateName, #state{conn_pid = ConnPid, topics = Topics}) ->
maybe_unsubscribe(ConnPid, Topics),
emqtt:disconnect(ConnPid),
ok.
maybe_unsubscribe(_ConnPid, []) ->
ok;
maybe_unsubscribe(ConnPid, Topics) ->
TopicNames = [T || {T, _} <- Topics],
emqtt:unsubscribe(ConnPid, #{}, TopicNames),
ok.
code_change(_OldVsn, _StateName, State, _Extra) ->
{ok, State}.