iot_cloud/apps/endpoint/src/endpoint_buffer.erl
2024-05-07 22:09:13 +08:00

136 lines
5.1 KiB
Erlang

%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 06. 7月 2023 12:02
%%%-------------------------------------------------------------------
-module(endpoint_buffer).
-include("endpoint.hrl").
%% 消息重发间隔
-define(RETRY_INTERVAL, 5000).
-export([new/2, append/2, trigger_next/1, trigger_n/1, cleanup/1, ack/2, stat/1]).
-export_type([buffer/0]).
-record(buffer, {
endpoint :: #endpoint{},
next_id = 1 :: integer(),
%% 当前数据所在的游标
cursor = 0 :: integer(),
%% ets存储的引用
tid :: ets:tid(),
%% 定时器
timer_pid :: pid(),
%% 窗口大小,允许最大的未确认消息数
window_size = 10,
%% 未确认的消息数
flight_num = 0,
%% 记录成功处理的消息数
acc_num = 0
}).
-record(north_data, {
id :: integer(),
location_code,
fields,
timestamp :: integer()
}).
-type buffer() :: #buffer{}.
-spec new(Endpoint :: #endpoint{}, WindowSize :: integer()) -> Buffer :: #buffer{}.
new(Endpoint = #endpoint{id = Id}, WindowSize) when is_integer(WindowSize), WindowSize > 0 ->
%% 初始化存储
EtsName = list_to_atom("endpoint_buffer_ets:" ++ integer_to_list(Id)),
Tid = ets:new(EtsName, [ordered_set, private]),
%% 定义重发器
{ok, TimerPid} = endpoint_timer:start_link(?RETRY_INTERVAL),
#buffer{cursor = 0, tid = Tid, timer_pid = TimerPid, endpoint = Endpoint, window_size = WindowSize}.
-spec append(tuple(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
append({LocationCode, Fields, Timestamp}, Buffer = #buffer{tid = Tid, next_id = NextId, window_size = WindowSize, flight_num = FlightNum}) ->
NorthData = #north_data{id = NextId, location_code = LocationCode, fields = Fields, timestamp = Timestamp},
true = ets:insert(Tid, NorthData),
NBuffer = Buffer#buffer{next_id = NextId + 1},
case FlightNum < WindowSize of
true ->
trigger_next(NBuffer);
false ->
NBuffer
end.
-spec trigger_n(Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
trigger_n(Buffer = #buffer{window_size = WindowSize}) ->
%% 最多允许window_size
lists:foldl(fun(_, Buffer0) -> trigger_next(Buffer0) end, Buffer, lists:seq(1, WindowSize)).
%% 触发读取下一条数据
-spec trigger_next(Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid, flight_num = FlightNum, endpoint = #endpoint{title = Title, mapper_fun = MapperFun}}) ->
case ets:next(Tid, Cursor) of
'$end_of_table' ->
Buffer;
NKey ->
[NorthData = #north_data{id = Id, location_code = LocationCode}] = ets:lookup(Tid, NKey),
%lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]),
%% 重发机制, 在发送的过程中mapper可能会改变
case safe_invoke_mapper(MapperFun, NorthData) of
{ok, Body} ->
ReceiverPid = self(),
ReceiverPid ! {next_data, Id, LocationCode, Body},
endpoint_timer:task(TimerPid, Id, fun() -> ReceiverPid ! {next_data, Id, LocationCode, Body} end),
Buffer#buffer{flight_num = FlightNum + 1};
{error, Error} ->
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p, message discard", [Title, Error]),
ets:delete(Tid, Id),
Buffer;
ignore ->
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore, messge discard", [Title]),
ets:delete(Tid, Id),
Buffer
end
end.
-spec ack(Id :: integer(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
ack(Id, Buffer = #buffer{tid = Tid, timer_pid = TimerPid, acc_num = AccNum, flight_num = FlightNum}) when is_integer(Id) ->
true = ets:delete(Tid, Id),
endpoint_timer:ack(TimerPid, Id),
trigger_next(Buffer#buffer{acc_num = AccNum + 1, flight_num = FlightNum - 1}).
%% 获取当前统计信息
-spec stat(Buffer :: #buffer{}) -> map().
stat(#buffer{acc_num = AccNum, tid = Tid}) ->
#{
<<"acc_num">> => AccNum,
<<"queue_num">> => ets:info(Tid, size)
}.
-spec cleanup(Buffer :: #buffer{}) -> ok.
cleanup(#buffer{timer_pid = TimerPid}) ->
endpoint_timer:cleanup(TimerPid),
ok.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec safe_invoke_mapper(MapperFun :: fun(), NorthData :: #north_data{}) ->
{ok, Body :: any()} | ignore | {error, Reason :: any()}.
safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}) ->
try
if
is_function(MapperFun, 2) ->
MapperFun(LocationCode, Fields);
is_function(MapperFun, 3) ->
MapperFun(LocationCode, Fields, Timestamp)
end
catch _:Error ->
{error, Error}
end.