iot_cloud/apps/iot/src/endpoint/iot_http_endpoint.erl
2024-01-12 17:25:21 +08:00

170 lines
7.0 KiB
Erlang

%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 06. 7月 2023 12:02
%%%-------------------------------------------------------------------
-module(iot_http_endpoint).
-author("aresei").
-include("iot.hrl").
-behaviour(gen_statem).
%% API
-export([start_link/2]).
-export([get_pid/1, forward/4, get_stat/0]).
%% gen_statem callbacks
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
%% 消息重发间隔
-define(RETRY_INTERVAL, 5000).
-record(state, {
postman_pid :: undefined | pid(),
pool_size = 0,
flight_num = 0,
id = 1,
queue :: queue:queue(),
%% 定时器对应关系
timer_map = #{},
%% 记录成功处理的消息数
acc_num = 0
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec get_pid(Name :: atom()) -> undefined | pid().
get_pid(Name) when is_atom(Name) ->
whereis(Name).
-spec forward(Pid :: pid(), LocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return().
forward(Pid, LocationCode, EventType, Params) when is_pid(Pid), is_binary(LocationCode), is_integer(EventType), is_map(Params) ->
gen_statem:cast(Pid, {forward, LocationCode, EventType, Params}).
-spec get_stat() -> {ok, Stat :: #{}}.
get_stat() ->
gen_statem:call(?MODULE, get_stat, 5000).
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link(Name, Opts) when is_atom(Name), is_list(Opts) ->
gen_statem:start_link({local, Name}, ?MODULE, [Opts], []).
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
%% @private
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([Opts]) ->
PoolSize = proplists:get_value(pool_size, Opts),
{ok, PostmanPid} = broker_postman:start_link(http_postman, Opts, PoolSize),
{ok, connected, #state{postman_pid = PostmanPid, pool_size = PoolSize, queue = queue:new()}}.
%% @private
%% @doc This function is called by a gen_statem when it needs to find out
%% the callback mode of the callback module.
callback_mode() ->
handle_event_function.
%% @private
%% @doc There should be one instance of this function for each possible
%% state name. If callback_mode is state_functions, one of these
%% functions is called when gen_statem receives and event from
%% call/2, cast/2, or as a normal process message.
handle_event(cast, {forward, LocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) ->
EventData = #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params},
%% 避免不必要的内部消息
Actions = case FlightNum < PoolSize of
true -> [{next_event, info, fetch_next}];
false -> []
end,
{keep_state, State#state{queue = queue:in(EventData, Q), id = Id + 1}, Actions};
%% 触发读取下一条数据
handle_event(info, fetch_next, _, State = #state{postman_pid = PostmanPid, queue = Q, flight_num = FlightNum, timer_map = TimerMap}) ->
case queue:out(Q) of
{{value, EventData = #event_data{id = Id}}, Q1} ->
lager:debug("[iot_http_endpoint] fetch_next success, event data is: ~p", [EventData]),
do_post(PostmanPid, EventData),
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}),
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap), queue = Q1, flight_num = FlightNum + 1}};
{empty, Q1} ->
{keep_state, State#state{queue = Q1}}
end;
%% 收到确认消息
handle_event(info, {ack, Id}, _, State = #state{timer_map = TimerMap, acc_num = AccNum, flight_num = FlightNum}) ->
lager:debug("[iot_zd_endpoint] get ack: ~p", [Id]),
case maps:take(Id, TimerMap) of
error ->
{keep_state, State#state{acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]};
{TimerRef, NTimerMap} ->
is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef),
{keep_state, State#state{timer_map = NTimerMap, acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]}
end;
%% 收到重发过期请求
handle_event(info, {timeout, _, {repost_ticker, EventData = #event_data{id = Id}}}, _, State = #state{postman_pid = PostmanPid, timer_map = TimerMap}) ->
lager:debug("[iot_zd_endpoint] repost data: ~p", [EventData]),
do_post(PostmanPid, EventData),
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}),
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}};
%% 获取当前统计信息
handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) ->
Stat = #{
<<"acc_num">> => AccNum,
<<"queue_num">> => mnesia_queue:table_size(),
<<"state_name">> => atom_to_binary(StateName)
},
{keep_state, State, [{reply, From, Stat}]};
%% @private
%% @doc If callback_mode is handle_event_function, then whenever a
%% gen_statem receives an event from call/2, cast/2, or as a normal
%% process message, this function is called.
handle_event(EventType, Event, StateName, State) ->
lager:warning("[iot_zd_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]),
{keep_state, State}.
%% @private
%% @doc This function is called by a gen_statem when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored.
terminate(Reason, _StateName, #state{}) ->
lager:debug("[iot_zd_endpoint] terminate with reason: ~p", [Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
{ok, StateName, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec do_post(PostmanPid :: pid(), EventData :: #event_data{}) -> no_return().
do_post(PostmanPid, #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params}) when is_pid(PostmanPid) ->
Data = #{
<<"version">> => <<"1.0">>,
<<"event_type">> => EventType,
<<"params">> => Params
},
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}},
ok.