diff --git a/apps/endpoint/src/endpoint_for_http.erl b/apps/endpoint/src/endpoint_for_http.erl index 597d2c0..6faeabe 100644 --- a/apps/endpoint/src/endpoint_for_http.erl +++ b/apps/endpoint/src/endpoint_for_http.erl @@ -22,7 +22,7 @@ -define(SERVER, ?MODULE). -record(state, { - http_endpoint :: #http_endpoint{}, + endpoint :: #endpoint{}, buffer :: endpoint_buffer:buffer() }). @@ -55,7 +55,7 @@ cleanup(Pid) when is_pid(Pid) -> %% @doc Spawns the server and registers the local name (unique) -spec(start_link(Name, Endpoint :: #http_endpoint{}) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(Name, Endpoint = #http_endpoint{}) when is_atom(Name) -> +start_link(Name, Endpoint = #endpoint{config = #http_endpoint{}}) when is_atom(Name) -> gen_server:start_link({local, Name}, ?MODULE, [Endpoint], []). %%%=================================================================== @@ -69,7 +69,7 @@ start_link(Name, Endpoint = #http_endpoint{}) when is_atom(Name) -> {stop, Reason :: term()} | ignore). init([Endpoint]) -> Buffer = endpoint_buffer:new(Endpoint, 10), - {ok, #state{http_endpoint = Endpoint, buffer = Buffer}}. + {ok, #state{endpoint = Endpoint, buffer = Buffer}}. %% @private %% @doc Handling call messages diff --git a/apps/endpoint/src/endpoint_for_mqtt.erl b/apps/endpoint/src/endpoint_for_mqtt.erl new file mode 100644 index 0000000..ce2992d --- /dev/null +++ b/apps/endpoint/src/endpoint_for_mqtt.erl @@ -0,0 +1,223 @@ + +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 06. 7月 2023 12:02 +%%%------------------------------------------------------------------- +-module(endpoint_for_mqtt). + +-include("endpoint.hrl"). +-behaviour(gen_statem). + +%% API +-export([start_link/2]). +-export([get_name/1, get_pid/1, forward/4, get_stat/1, reload/2, clean_up/1, get_mapper_fun/1]). + +%% gen_statem callbacks +-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). + +%% 消息重发间隔 +-define(RETRY_INTERVAL, 5000). + +-record(state, { + endpoint :: #endpoint{}, + buffer :: endpoint_buffer:buffer(), + conn_pid :: undefined | pid(), + %% 待确认的数据, #{PacketId :: integer() => Id :: integer()} + inflight = #{} +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec get_name(Id :: integer()) -> atom(). +get_name(Id) when is_integer(Id) -> + list_to_atom("endpoint:" ++ integer_to_list(Id)). + +-spec get_pid(Name :: binary()) -> undefined | pid(). +get_pid(Name) when is_binary(Name) -> + whereis(get_name(Name)). + +-spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). +forward(undefined, _, _, _) -> + ok; +forward(Pid, LocationCode, Fields, Timestamp) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> + gen_statem:cast(Pid, {forward, LocationCode, Fields, Timestamp}). + +reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) -> + gen_statem:cast(Pid, {reload, NEndpoint}). + +-spec get_stat(Pid :: pid()) -> {ok, Stat :: #{}}. +get_stat(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, get_stat, 5000). + +-spec clean_up(Pid :: pid()) -> ok. +clean_up(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, clean_up, 5000). + +-spec get_mapper_fun(Pid :: pid()) -> fun(). +get_mapper_fun(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, get_mapper_fun). + +%% @doc Creates a gen_statem process which calls Module:init/1 to +%% initialize. To ensure a synchronized start-up procedure, this +%% function does not return until Module:init/1 has returned. +start_link(Name, Endpoint = #endpoint{}) when is_atom(Name) -> + gen_statem:start_link({local, Name}, ?MODULE, [Endpoint], []). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +%% @private +%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or +%% gen_statem:start_link/[3,4], this function is called by the new +%% process to initialize. +init([Endpoint]) -> + erlang:process_flag(trap_exit, true), + %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 + erlang:start_timer(0, self(), create_postman), + %% 初始化存储 + Buffer = endpoint_buffer:new(Endpoint, 10), + + {ok, disconnected, #state{endpoint = Endpoint, buffer = Buffer}}. + +%% @private +%% @doc This function is called by a gen_statem when it needs to find out +%% the callback mode of the callback module. +callback_mode() -> + handle_event_function. + +%% @private +%% @doc There should be one instance of this function for each possible +%% state name. If callback_mode is state_functions, one of these +%% functions is called when gen_statem receives and event from +%% call/2, cast/2, or as a normal process message. + +handle_event(cast, {forward, LocationCode, Fields, Timestamp}, _StateName, State = #state{buffer = Buffer, endpoint = #endpoint{name = Name}}) -> + lager:debug("[iot_endpoint] name: ~p, match location_code: ~p", [Name, LocationCode]), + NBuffer = endpoint_buffer:append({LocationCode, Fields, Timestamp}, Buffer), + + {keep_state, State#state{buffer = NBuffer}}; + + + +handle_event(info, {timeout, _, create_postman}, disconnected, + State = #state{buffer = Buffer, endpoint = #endpoint{name = Name, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}}}) -> + + lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Name]), + + Node = atom_to_binary(node()), + ClientId = <<"mqtt-client-", Node/binary, "-", Name/binary>>, + Opts = [ + {owner, self()}, + {clientid, ClientId}, + {host, binary_to_list(Host)}, + {port, Port}, + {tcp_opts, []}, + {username, binary_to_list(Username)}, + {password, binary_to_list(Password)}, + {keepalive, 86400}, + {auto_ack, true}, + {connect_timeout, 5000}, + {proto_ver, v5}, + {retry_interval, 5000} + ], + + {ok, ConnPid} = emqtt:start_link(Opts), + lager:debug("[mqtt_postman] start connect, options: ~p", [Opts]), + case emqtt:connect(ConnPid, 5000) of + {ok, _} -> + lager:debug("[mqtt_postman] connect success, pid: ~p", [ConnPid]), + NBuffer = endpoint_buffer:trigger_n(Buffer), + {next_state, connected, State#state{conn_pid = ConnPid, buffer = NBuffer}}; + {error, Reason} -> + lager:warning("[mqtt_postman] connect get error: ~p", [Reason]), + erlang:start_timer(5000, self(), create_postman), + {keep_state, State} + end; + +handle_event({call, From}, get_mapper_fun, _, State = #state{endpoint = #endpoint{mapper_fun = F}}) -> + {keep_state, State, [{reply, From, F}]}; + +%% 获取当前统计信息 +handle_event({call, From}, get_stat, _StateName, State = #state{buffer = Buffer}) -> + Stat = endpoint_buffer:stat(Buffer), + + {keep_state, State, [{reply, From, Stat}]}; + +%% 离线时,忽略数据发送逻辑 +handle_event(info, {next_data, _Id, _LocationCode, _Message}, disconnected, State) -> + {keep_state, State}; +%% 发送数据到mqtt服务器 +handle_event(info, {next_data, Id, LocationCode, Message}, connected, + State = #state{conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) -> + + Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]), + lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Message, Qos]), + case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of + ok -> + NBuffer = endpoint_buffer:ack(Id, Buffer), + {keep_state, State#state{buffer = NBuffer}}; + {ok, PacketId} -> + {keep_state, State#state{inflight = maps:put(PacketId, Id, InFlight)}}; + {error, Reason} -> + lager:warning("[mqtt_postman] send message to topic: ~p, get error: ~p", [Topic, Reason]), + {stop, Reason, State} + end; + +handle_event(info, {disconnected, ReasonCode, Properties}, connected, State) -> + lager:debug("[mqtt_postman] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + {next_state, disconnected, State#state{conn_pid = undefined}}; + +handle_event(info, {publish, Message = #{packet_id := _PacketId, payload := Payload}}, connected, State) -> + lager:debug("[mqtt_postman] Recv a publish packet: ~p, payload: ~p", [Message, Payload]), + {keep_state, State}; + +%% 收到确认的消息 +handle_event(info, {puback, #{packet_id := PacketId}}, connected, State = #state{inflight = Inflight, buffer = Buffer}) -> + case maps:take(PacketId, Inflight) of + {Id, RestInflight} -> + NBuffer = endpoint_buffer:ack(Id, Buffer), + {keep_state, State#state{buffer = NBuffer, inflight = RestInflight}}; + error -> + {keep_state, State} + end; + +%% postman进程挂掉时,重新建立新的 +handle_event(info, {'EXIT', ConnPid, Reason}, connected, State = #state{endpoint = #endpoint{name = Name}, conn_pid = ConnPid}) -> + lager:warning("[enpoint_mqtt] endpoint: ~p, conn pid exit with reason: ~p", [Name, Reason]), + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + {next_state, disconnected, State#state{conn_pid = undefined}}; + +%% @private +%% @doc If callback_mode is handle_event_function, then whenever a +%% gen_statem receives an event from call/2, cast/2, or as a normal +%% process message, this function is called. +handle_event(EventType, Event, StateName, State) -> + lager:warning("[iot_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]), + {keep_state, State}. + +%% @private +%% @doc This function is called by a gen_statem when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_statem terminates with +%% Reason. The return value is ignored. +terminate(Reason, _StateName, #state{endpoint = #endpoint{name = Name}, buffer = Buffer}) -> + lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Name, Reason]), + endpoint_buffer:cleanup(Buffer), + ok. + +%% @private +%% @doc Convert process state when code is changed +code_change(_OldVsn, StateName, State = #state{}, _Extra) -> + {ok, StateName, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== \ No newline at end of file