fix endpoint buffer
This commit is contained in:
parent
01ed436a9b
commit
57e8c3e3c1
@ -62,12 +62,7 @@
|
|||||||
created_at = 0 :: integer()
|
created_at = 0 :: integer()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-record(north_data, {
|
|
||||||
id :: integer(),
|
|
||||||
location_code,
|
|
||||||
fields,
|
|
||||||
timestamp :: integer()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-record(post_data, {
|
-record(post_data, {
|
||||||
id,
|
id,
|
||||||
|
|||||||
@ -14,13 +14,14 @@
|
|||||||
%% 消息重发间隔
|
%% 消息重发间隔
|
||||||
-define(RETRY_INTERVAL, 5000).
|
-define(RETRY_INTERVAL, 5000).
|
||||||
|
|
||||||
-export([new/1, append/2]).
|
-export([new/2, append/2, trigger_next/1, trigger_n/1, cleanup/1, ack/2, stat/1]).
|
||||||
|
|
||||||
-record(buffer, {
|
-record(buffer, {
|
||||||
endpoint,
|
endpoint,
|
||||||
%% ets存储
|
|
||||||
next_id = 1 :: integer(),
|
next_id = 1 :: integer(),
|
||||||
cursor :: integer(),
|
%% 当前数据所在的游标
|
||||||
|
cursor = 0 :: integer(),
|
||||||
|
%% ets存储的引用
|
||||||
tid :: reference(),
|
tid :: reference(),
|
||||||
%% 定时器
|
%% 定时器
|
||||||
timer_pid :: pid(),
|
timer_pid :: pid(),
|
||||||
@ -32,37 +33,42 @@
|
|||||||
acc_num = 0
|
acc_num = 0
|
||||||
}).
|
}).
|
||||||
|
|
||||||
new(Endpoint = #endpoint{id = Id}) ->
|
-record(north_data, {
|
||||||
|
id :: integer(),
|
||||||
|
location_code,
|
||||||
|
fields,
|
||||||
|
timestamp :: integer()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-spec new(Endpoint :: #endpoint{}, WindowSize :: integer()) -> Buffer :: #buffer{}.
|
||||||
|
new(Endpoint = #endpoint{id = Id}, WindowSize) when is_integer(WindowSize), WindowSize > 0 ->
|
||||||
%% 初始化存储
|
%% 初始化存储
|
||||||
EtsName = list_to_atom("endpoint_ets:" ++ integer_to_list(Id)),
|
EtsName = list_to_atom("endpoint_buffer_ets:" ++ integer_to_list(Id)),
|
||||||
Tid = ets:new(EtsName, [ordered_set, private]),
|
Tid = ets:new(EtsName, [ordered_set, private]),
|
||||||
%% 定义重发器
|
%% 定义重发器
|
||||||
TimerPid = endpoint_timer:start_link(?RETRY_INTERVAL),
|
TimerPid = endpoint_timer:start_link(?RETRY_INTERVAL),
|
||||||
|
|
||||||
#buffer{tid = Tid, timer_pid = TimerPid}.
|
#buffer{cursor = 0, tid = Tid, timer_pid = TimerPid, endpoint = Endpoint, window_size = WindowSize}.
|
||||||
|
|
||||||
|
-spec append(tuple(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
|
||||||
append({LocationCode, Fields, Timestamp}, Buffer = #buffer{tid = Tid, next_id = NextId, window_size = WindowSize, flight_num = FlightNum}) ->
|
append({LocationCode, Fields, Timestamp}, Buffer = #buffer{tid = Tid, next_id = NextId, window_size = WindowSize, flight_num = FlightNum}) ->
|
||||||
NorthData = #north_data{id = NextId, location_code = LocationCode, fields = Fields, timestamp = Timestamp},
|
NorthData = #north_data{id = NextId, location_code = LocationCode, fields = Fields, timestamp = Timestamp},
|
||||||
true = ets:insert(Tid, NorthData),
|
true = ets:insert(Tid, NorthData),
|
||||||
|
NBuffer = Buffer#buffer{next_id = NextId + 1},
|
||||||
case FlightNum < WindowSize of
|
case FlightNum < WindowSize of
|
||||||
true ->
|
true ->
|
||||||
%% Todo
|
trigger_next(NBuffer);
|
||||||
[{next_event, info, fetch_next}];
|
|
||||||
false ->
|
false ->
|
||||||
ok
|
NBuffer
|
||||||
end,
|
end.
|
||||||
|
|
||||||
Buffer#buffer{next_id = NextId + 1}.
|
-spec trigger_n(Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
|
||||||
|
trigger_n(Buffer = #buffer{window_size = WindowSize}) ->
|
||||||
trigger_n(Buffer = #buffer{endpoint = Endpoint = #endpoint{name = Name}, window_size = WindowSize}) ->
|
|
||||||
lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Name]),
|
|
||||||
%% 最多允许window_size
|
%% 最多允许window_size
|
||||||
lists:map(fun(_) ->
|
lists:foldl(fun(_, Buffer0) -> trigger_next(Buffer0) end, Buffer, lists:seq(1, WindowSize)).
|
||||||
trigger_next(Buffer)
|
|
||||||
end, lists:seq(1, WindowSize)).
|
|
||||||
|
|
||||||
%% 触发读取下一条数据
|
%% 触发读取下一条数据
|
||||||
|
-spec trigger_next(Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
|
||||||
trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid, flight_num = FlightNum, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) ->
|
trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid, flight_num = FlightNum, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) ->
|
||||||
case ets:next(Tid, Cursor) of
|
case ets:next(Tid, Cursor) of
|
||||||
'$end_of_table' ->
|
'$end_of_table' ->
|
||||||
@ -70,7 +76,6 @@ trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid,
|
|||||||
NKey ->
|
NKey ->
|
||||||
[NorthData = #north_data{id = Id, location_code = LocationCode}] = ets:lookup(Tid, NKey),
|
[NorthData = #north_data{id = Id, location_code = LocationCode}] = ets:lookup(Tid, NKey),
|
||||||
%lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]),
|
%lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]),
|
||||||
|
|
||||||
%% 重发机制, 在发送的过程中mapper可能会改变
|
%% 重发机制, 在发送的过程中mapper可能会改变
|
||||||
case safe_invoke_mapper(MapperFun, NorthData) of
|
case safe_invoke_mapper(MapperFun, NorthData) of
|
||||||
{ok, Body} ->
|
{ok, Body} ->
|
||||||
@ -90,18 +95,21 @@ trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid,
|
|||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
ack(Id, Buffer = #buffer{tid = Tid, timer_pid = TimerPid, acc_num = AccNum, flight_num = FlightNum}) ->
|
-spec ack(Id :: integer(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
|
||||||
|
ack(Id, Buffer = #buffer{tid = Tid, timer_pid = TimerPid, acc_num = AccNum, flight_num = FlightNum}) when is_integer(Id) ->
|
||||||
true = ets:delete(Tid, Id),
|
true = ets:delete(Tid, Id),
|
||||||
endpoint_timer:ack(TimerPid, Id),
|
endpoint_timer:ack(TimerPid, Id),
|
||||||
trigger_next(Buffer#buffer{acc_num = AccNum + 1, flight_num = FlightNum - 1}).
|
trigger_next(Buffer#buffer{acc_num = AccNum + 1, flight_num = FlightNum - 1}).
|
||||||
|
|
||||||
%% 获取当前统计信息
|
%% 获取当前统计信息
|
||||||
|
-spec stat(Buffer :: #buffer{}) -> map().
|
||||||
stat(#buffer{acc_num = AccNum, tid = Tid}) ->
|
stat(#buffer{acc_num = AccNum, tid = Tid}) ->
|
||||||
#{
|
#{
|
||||||
<<"acc_num">> => AccNum,
|
<<"acc_num">> => AccNum,
|
||||||
<<"queue_num">> => ets:info(Tid, size)
|
<<"queue_num">> => ets:info(Tid, size)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
-spec cleanup(Buffer :: #buffer{}) -> ok.
|
||||||
cleanup(#buffer{timer_pid = TimerPid}) ->
|
cleanup(#buffer{timer_pid = TimerPid}) ->
|
||||||
endpoint_timer:cleanup(TimerPid),
|
endpoint_timer:cleanup(TimerPid),
|
||||||
ok.
|
ok.
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user