diff --git a/apps/endpoint/include/endpoint.hrl b/apps/endpoint/include/endpoint.hrl index b02b241..df99a1b 100644 --- a/apps/endpoint/include/endpoint.hrl +++ b/apps/endpoint/include/endpoint.hrl @@ -62,12 +62,7 @@ created_at = 0 :: integer() }). --record(north_data, { - id :: integer(), - location_code, - fields, - timestamp :: integer() -}). + -record(post_data, { id, diff --git a/apps/endpoint/src/endpoint_buffer.erl b/apps/endpoint/src/endpoint_buffer.erl index 2a3a1b7..dbf2aee 100644 --- a/apps/endpoint/src/endpoint_buffer.erl +++ b/apps/endpoint/src/endpoint_buffer.erl @@ -14,13 +14,14 @@ %% 消息重发间隔 -define(RETRY_INTERVAL, 5000). --export([new/1, append/2]). +-export([new/2, append/2, trigger_next/1, trigger_n/1, cleanup/1, ack/2, stat/1]). -record(buffer, { endpoint, - %% ets存储 next_id = 1 :: integer(), - cursor :: integer(), + %% 当前数据所在的游标 + cursor = 0 :: integer(), + %% ets存储的引用 tid :: reference(), %% 定时器 timer_pid :: pid(), @@ -32,37 +33,42 @@ acc_num = 0 }). -new(Endpoint = #endpoint{id = Id}) -> +-record(north_data, { + id :: integer(), + location_code, + fields, + timestamp :: integer() +}). + +-spec new(Endpoint :: #endpoint{}, WindowSize :: integer()) -> Buffer :: #buffer{}. +new(Endpoint = #endpoint{id = Id}, WindowSize) when is_integer(WindowSize), WindowSize > 0 -> %% 初始化存储 - EtsName = list_to_atom("endpoint_ets:" ++ integer_to_list(Id)), + EtsName = list_to_atom("endpoint_buffer_ets:" ++ integer_to_list(Id)), Tid = ets:new(EtsName, [ordered_set, private]), %% 定义重发器 TimerPid = endpoint_timer:start_link(?RETRY_INTERVAL), - #buffer{tid = Tid, timer_pid = TimerPid}. + #buffer{cursor = 0, tid = Tid, timer_pid = TimerPid, endpoint = Endpoint, window_size = WindowSize}. +-spec append(tuple(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}. append({LocationCode, Fields, Timestamp}, Buffer = #buffer{tid = Tid, next_id = NextId, window_size = WindowSize, flight_num = FlightNum}) -> NorthData = #north_data{id = NextId, location_code = LocationCode, fields = Fields, timestamp = Timestamp}, true = ets:insert(Tid, NorthData), - + NBuffer = Buffer#buffer{next_id = NextId + 1}, case FlightNum < WindowSize of true -> - %% Todo - [{next_event, info, fetch_next}]; + trigger_next(NBuffer); false -> - ok - end, + NBuffer + end. - Buffer#buffer{next_id = NextId + 1}. - -trigger_n(Buffer = #buffer{endpoint = Endpoint = #endpoint{name = Name}, window_size = WindowSize}) -> - lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Name]), +-spec trigger_n(Buffer :: #buffer{}) -> NBuffer :: #buffer{}. +trigger_n(Buffer = #buffer{window_size = WindowSize}) -> %% 最多允许window_size - lists:map(fun(_) -> - trigger_next(Buffer) - end, lists:seq(1, WindowSize)). + lists:foldl(fun(_, Buffer0) -> trigger_next(Buffer0) end, Buffer, lists:seq(1, WindowSize)). %% 触发读取下一条数据 +-spec trigger_next(Buffer :: #buffer{}) -> NBuffer :: #buffer{}. trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid, flight_num = FlightNum, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) -> case ets:next(Tid, Cursor) of '$end_of_table' -> @@ -70,7 +76,6 @@ trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid, NKey -> [NorthData = #north_data{id = Id, location_code = LocationCode}] = ets:lookup(Tid, NKey), %lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]), - %% 重发机制, 在发送的过程中mapper可能会改变 case safe_invoke_mapper(MapperFun, NorthData) of {ok, Body} -> @@ -90,18 +95,21 @@ trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid, end end. -ack(Id, Buffer = #buffer{tid = Tid, timer_pid = TimerPid, acc_num = AccNum, flight_num = FlightNum}) -> +-spec ack(Id :: integer(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}. +ack(Id, Buffer = #buffer{tid = Tid, timer_pid = TimerPid, acc_num = AccNum, flight_num = FlightNum}) when is_integer(Id) -> true = ets:delete(Tid, Id), endpoint_timer:ack(TimerPid, Id), trigger_next(Buffer#buffer{acc_num = AccNum + 1, flight_num = FlightNum - 1}). %% 获取当前统计信息 +-spec stat(Buffer :: #buffer{}) -> map(). stat(#buffer{acc_num = AccNum, tid = Tid}) -> #{ <<"acc_num">> => AccNum, <<"queue_num">> => ets:info(Tid, size) }. +-spec cleanup(Buffer :: #buffer{}) -> ok. cleanup(#buffer{timer_pid = TimerPid}) -> endpoint_timer:cleanup(TimerPid), ok.