fix mqtt
This commit is contained in:
parent
5ad875d3fe
commit
9d6f80232e
@ -22,7 +22,7 @@
|
|||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
http_endpoint :: #http_endpoint{},
|
endpoint :: #endpoint{},
|
||||||
buffer :: endpoint_buffer:buffer()
|
buffer :: endpoint_buffer:buffer()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
@ -55,7 +55,7 @@ cleanup(Pid) when is_pid(Pid) ->
|
|||||||
%% @doc Spawns the server and registers the local name (unique)
|
%% @doc Spawns the server and registers the local name (unique)
|
||||||
-spec(start_link(Name, Endpoint :: #http_endpoint{}) ->
|
-spec(start_link(Name, Endpoint :: #http_endpoint{}) ->
|
||||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||||
start_link(Name, Endpoint = #http_endpoint{}) when is_atom(Name) ->
|
start_link(Name, Endpoint = #endpoint{config = #http_endpoint{}}) when is_atom(Name) ->
|
||||||
gen_server:start_link({local, Name}, ?MODULE, [Endpoint], []).
|
gen_server:start_link({local, Name}, ?MODULE, [Endpoint], []).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
@ -69,7 +69,7 @@ start_link(Name, Endpoint = #http_endpoint{}) when is_atom(Name) ->
|
|||||||
{stop, Reason :: term()} | ignore).
|
{stop, Reason :: term()} | ignore).
|
||||||
init([Endpoint]) ->
|
init([Endpoint]) ->
|
||||||
Buffer = endpoint_buffer:new(Endpoint, 10),
|
Buffer = endpoint_buffer:new(Endpoint, 10),
|
||||||
{ok, #state{http_endpoint = Endpoint, buffer = Buffer}}.
|
{ok, #state{endpoint = Endpoint, buffer = Buffer}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling call messages
|
%% @doc Handling call messages
|
||||||
|
|||||||
223
apps/endpoint/src/endpoint_for_mqtt.erl
Normal file
223
apps/endpoint/src/endpoint_for_mqtt.erl
Normal file
@ -0,0 +1,223 @@
|
|||||||
|
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
%%% @author aresei
|
||||||
|
%%% @copyright (C) 2023, <COMPANY>
|
||||||
|
%%% @doc
|
||||||
|
%%%
|
||||||
|
%%% @end
|
||||||
|
%%% Created : 06. 7月 2023 12:02
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
-module(endpoint_for_mqtt).
|
||||||
|
|
||||||
|
-include("endpoint.hrl").
|
||||||
|
-behaviour(gen_statem).
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([start_link/2]).
|
||||||
|
-export([get_name/1, get_pid/1, forward/4, get_stat/1, reload/2, clean_up/1, get_mapper_fun/1]).
|
||||||
|
|
||||||
|
%% gen_statem callbacks
|
||||||
|
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||||
|
|
||||||
|
%% 消息重发间隔
|
||||||
|
-define(RETRY_INTERVAL, 5000).
|
||||||
|
|
||||||
|
-record(state, {
|
||||||
|
endpoint :: #endpoint{},
|
||||||
|
buffer :: endpoint_buffer:buffer(),
|
||||||
|
conn_pid :: undefined | pid(),
|
||||||
|
%% 待确认的数据, #{PacketId :: integer() => Id :: integer()}
|
||||||
|
inflight = #{}
|
||||||
|
}).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% API
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
-spec get_name(Id :: integer()) -> atom().
|
||||||
|
get_name(Id) when is_integer(Id) ->
|
||||||
|
list_to_atom("endpoint:" ++ integer_to_list(Id)).
|
||||||
|
|
||||||
|
-spec get_pid(Name :: binary()) -> undefined | pid().
|
||||||
|
get_pid(Name) when is_binary(Name) ->
|
||||||
|
whereis(get_name(Name)).
|
||||||
|
|
||||||
|
-spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
|
||||||
|
forward(undefined, _, _, _) ->
|
||||||
|
ok;
|
||||||
|
forward(Pid, LocationCode, Fields, Timestamp) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) ->
|
||||||
|
gen_statem:cast(Pid, {forward, LocationCode, Fields, Timestamp}).
|
||||||
|
|
||||||
|
reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) ->
|
||||||
|
gen_statem:cast(Pid, {reload, NEndpoint}).
|
||||||
|
|
||||||
|
-spec get_stat(Pid :: pid()) -> {ok, Stat :: #{}}.
|
||||||
|
get_stat(Pid) when is_pid(Pid) ->
|
||||||
|
gen_statem:call(Pid, get_stat, 5000).
|
||||||
|
|
||||||
|
-spec clean_up(Pid :: pid()) -> ok.
|
||||||
|
clean_up(Pid) when is_pid(Pid) ->
|
||||||
|
gen_statem:call(Pid, clean_up, 5000).
|
||||||
|
|
||||||
|
-spec get_mapper_fun(Pid :: pid()) -> fun().
|
||||||
|
get_mapper_fun(Pid) when is_pid(Pid) ->
|
||||||
|
gen_statem:call(Pid, get_mapper_fun).
|
||||||
|
|
||||||
|
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||||
|
%% initialize. To ensure a synchronized start-up procedure, this
|
||||||
|
%% function does not return until Module:init/1 has returned.
|
||||||
|
start_link(Name, Endpoint = #endpoint{}) when is_atom(Name) ->
|
||||||
|
gen_statem:start_link({local, Name}, ?MODULE, [Endpoint], []).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% gen_statem callbacks
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
||||||
|
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||||
|
%% process to initialize.
|
||||||
|
init([Endpoint]) ->
|
||||||
|
erlang:process_flag(trap_exit, true),
|
||||||
|
%% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制
|
||||||
|
erlang:start_timer(0, self(), create_postman),
|
||||||
|
%% 初始化存储
|
||||||
|
Buffer = endpoint_buffer:new(Endpoint, 10),
|
||||||
|
|
||||||
|
{ok, disconnected, #state{endpoint = Endpoint, buffer = Buffer}}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc This function is called by a gen_statem when it needs to find out
|
||||||
|
%% the callback mode of the callback module.
|
||||||
|
callback_mode() ->
|
||||||
|
handle_event_function.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc There should be one instance of this function for each possible
|
||||||
|
%% state name. If callback_mode is state_functions, one of these
|
||||||
|
%% functions is called when gen_statem receives and event from
|
||||||
|
%% call/2, cast/2, or as a normal process message.
|
||||||
|
|
||||||
|
handle_event(cast, {forward, LocationCode, Fields, Timestamp}, _StateName, State = #state{buffer = Buffer, endpoint = #endpoint{name = Name}}) ->
|
||||||
|
lager:debug("[iot_endpoint] name: ~p, match location_code: ~p", [Name, LocationCode]),
|
||||||
|
NBuffer = endpoint_buffer:append({LocationCode, Fields, Timestamp}, Buffer),
|
||||||
|
|
||||||
|
{keep_state, State#state{buffer = NBuffer}};
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
handle_event(info, {timeout, _, create_postman}, disconnected,
|
||||||
|
State = #state{buffer = Buffer, endpoint = #endpoint{name = Name, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}}}) ->
|
||||||
|
|
||||||
|
lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Name]),
|
||||||
|
|
||||||
|
Node = atom_to_binary(node()),
|
||||||
|
ClientId = <<"mqtt-client-", Node/binary, "-", Name/binary>>,
|
||||||
|
Opts = [
|
||||||
|
{owner, self()},
|
||||||
|
{clientid, ClientId},
|
||||||
|
{host, binary_to_list(Host)},
|
||||||
|
{port, Port},
|
||||||
|
{tcp_opts, []},
|
||||||
|
{username, binary_to_list(Username)},
|
||||||
|
{password, binary_to_list(Password)},
|
||||||
|
{keepalive, 86400},
|
||||||
|
{auto_ack, true},
|
||||||
|
{connect_timeout, 5000},
|
||||||
|
{proto_ver, v5},
|
||||||
|
{retry_interval, 5000}
|
||||||
|
],
|
||||||
|
|
||||||
|
{ok, ConnPid} = emqtt:start_link(Opts),
|
||||||
|
lager:debug("[mqtt_postman] start connect, options: ~p", [Opts]),
|
||||||
|
case emqtt:connect(ConnPid, 5000) of
|
||||||
|
{ok, _} ->
|
||||||
|
lager:debug("[mqtt_postman] connect success, pid: ~p", [ConnPid]),
|
||||||
|
NBuffer = endpoint_buffer:trigger_n(Buffer),
|
||||||
|
{next_state, connected, State#state{conn_pid = ConnPid, buffer = NBuffer}};
|
||||||
|
{error, Reason} ->
|
||||||
|
lager:warning("[mqtt_postman] connect get error: ~p", [Reason]),
|
||||||
|
erlang:start_timer(5000, self(), create_postman),
|
||||||
|
{keep_state, State}
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_event({call, From}, get_mapper_fun, _, State = #state{endpoint = #endpoint{mapper_fun = F}}) ->
|
||||||
|
{keep_state, State, [{reply, From, F}]};
|
||||||
|
|
||||||
|
%% 获取当前统计信息
|
||||||
|
handle_event({call, From}, get_stat, _StateName, State = #state{buffer = Buffer}) ->
|
||||||
|
Stat = endpoint_buffer:stat(Buffer),
|
||||||
|
|
||||||
|
{keep_state, State, [{reply, From, Stat}]};
|
||||||
|
|
||||||
|
%% 离线时,忽略数据发送逻辑
|
||||||
|
handle_event(info, {next_data, _Id, _LocationCode, _Message}, disconnected, State) ->
|
||||||
|
{keep_state, State};
|
||||||
|
%% 发送数据到mqtt服务器
|
||||||
|
handle_event(info, {next_data, Id, LocationCode, Message}, connected,
|
||||||
|
State = #state{conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) ->
|
||||||
|
|
||||||
|
Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]),
|
||||||
|
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Message, Qos]),
|
||||||
|
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
|
||||||
|
ok ->
|
||||||
|
NBuffer = endpoint_buffer:ack(Id, Buffer),
|
||||||
|
{keep_state, State#state{buffer = NBuffer}};
|
||||||
|
{ok, PacketId} ->
|
||||||
|
{keep_state, State#state{inflight = maps:put(PacketId, Id, InFlight)}};
|
||||||
|
{error, Reason} ->
|
||||||
|
lager:warning("[mqtt_postman] send message to topic: ~p, get error: ~p", [Topic, Reason]),
|
||||||
|
{stop, Reason, State}
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_event(info, {disconnected, ReasonCode, Properties}, connected, State) ->
|
||||||
|
lager:debug("[mqtt_postman] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
|
||||||
|
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
|
||||||
|
{next_state, disconnected, State#state{conn_pid = undefined}};
|
||||||
|
|
||||||
|
handle_event(info, {publish, Message = #{packet_id := _PacketId, payload := Payload}}, connected, State) ->
|
||||||
|
lager:debug("[mqtt_postman] Recv a publish packet: ~p, payload: ~p", [Message, Payload]),
|
||||||
|
{keep_state, State};
|
||||||
|
|
||||||
|
%% 收到确认的消息
|
||||||
|
handle_event(info, {puback, #{packet_id := PacketId}}, connected, State = #state{inflight = Inflight, buffer = Buffer}) ->
|
||||||
|
case maps:take(PacketId, Inflight) of
|
||||||
|
{Id, RestInflight} ->
|
||||||
|
NBuffer = endpoint_buffer:ack(Id, Buffer),
|
||||||
|
{keep_state, State#state{buffer = NBuffer, inflight = RestInflight}};
|
||||||
|
error ->
|
||||||
|
{keep_state, State}
|
||||||
|
end;
|
||||||
|
|
||||||
|
%% postman进程挂掉时,重新建立新的
|
||||||
|
handle_event(info, {'EXIT', ConnPid, Reason}, connected, State = #state{endpoint = #endpoint{name = Name}, conn_pid = ConnPid}) ->
|
||||||
|
lager:warning("[enpoint_mqtt] endpoint: ~p, conn pid exit with reason: ~p", [Name, Reason]),
|
||||||
|
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
|
||||||
|
{next_state, disconnected, State#state{conn_pid = undefined}};
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc If callback_mode is handle_event_function, then whenever a
|
||||||
|
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
||||||
|
%% process message, this function is called.
|
||||||
|
handle_event(EventType, Event, StateName, State) ->
|
||||||
|
lager:warning("[iot_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]),
|
||||||
|
{keep_state, State}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc This function is called by a gen_statem when it is about to
|
||||||
|
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||||
|
%% necessary cleaning up. When it returns, the gen_statem terminates with
|
||||||
|
%% Reason. The return value is ignored.
|
||||||
|
terminate(Reason, _StateName, #state{endpoint = #endpoint{name = Name}, buffer = Buffer}) ->
|
||||||
|
lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Name, Reason]),
|
||||||
|
endpoint_buffer:cleanup(Buffer),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Convert process state when code is changed
|
||||||
|
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
||||||
|
{ok, StateName, State}.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% Internal functions
|
||||||
|
%%%===================================================================
|
||||||
Loading…
x
Reference in New Issue
Block a user