From 01ed436a9b9910148a5e285567ea32f34b808903 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 7 May 2024 10:56:54 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3buffer=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/endpoint/src/endpoint_buffer.erl | 125 +++++++++++++++++++++++++ apps/endpoint/src/endpoint_timer.erl | 128 ++++++++++++++++++++++++++ 2 files changed, 253 insertions(+) create mode 100644 apps/endpoint/src/endpoint_buffer.erl create mode 100644 apps/endpoint/src/endpoint_timer.erl diff --git a/apps/endpoint/src/endpoint_buffer.erl b/apps/endpoint/src/endpoint_buffer.erl new file mode 100644 index 0000000..2a3a1b7 --- /dev/null +++ b/apps/endpoint/src/endpoint_buffer.erl @@ -0,0 +1,125 @@ + +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 06. 7月 2023 12:02 +%%%------------------------------------------------------------------- +-module(endpoint_buffer). + +-include("endpoint.hrl"). + +%% 消息重发间隔 +-define(RETRY_INTERVAL, 5000). + +-export([new/1, append/2]). + +-record(buffer, { + endpoint, + %% ets存储 + next_id = 1 :: integer(), + cursor :: integer(), + tid :: reference(), + %% 定时器 + timer_pid :: pid(), + %% 窗口大小,允许最大的未确认消息数 + window_size = 10, + %% 未确认的消息数 + flight_num = 0, + %% 记录成功处理的消息数 + acc_num = 0 +}). + +new(Endpoint = #endpoint{id = Id}) -> + %% 初始化存储 + EtsName = list_to_atom("endpoint_ets:" ++ integer_to_list(Id)), + Tid = ets:new(EtsName, [ordered_set, private]), + %% 定义重发器 + TimerPid = endpoint_timer:start_link(?RETRY_INTERVAL), + + #buffer{tid = Tid, timer_pid = TimerPid}. + +append({LocationCode, Fields, Timestamp}, Buffer = #buffer{tid = Tid, next_id = NextId, window_size = WindowSize, flight_num = FlightNum}) -> + NorthData = #north_data{id = NextId, location_code = LocationCode, fields = Fields, timestamp = Timestamp}, + true = ets:insert(Tid, NorthData), + + case FlightNum < WindowSize of + true -> + %% Todo + [{next_event, info, fetch_next}]; + false -> + ok + end, + + Buffer#buffer{next_id = NextId + 1}. + +trigger_n(Buffer = #buffer{endpoint = Endpoint = #endpoint{name = Name}, window_size = WindowSize}) -> + lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Name]), + %% 最多允许window_size + lists:map(fun(_) -> + trigger_next(Buffer) + end, lists:seq(1, WindowSize)). + +%% 触发读取下一条数据 +trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid, flight_num = FlightNum, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) -> + case ets:next(Tid, Cursor) of + '$end_of_table' -> + Buffer; + NKey -> + [NorthData = #north_data{id = Id, location_code = LocationCode}] = ets:lookup(Tid, NKey), + %lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]), + + %% 重发机制, 在发送的过程中mapper可能会改变 + case safe_invoke_mapper(MapperFun, NorthData) of + {ok, Body} -> + ReceiverPid = self(), + ReceiverPid ! {next_data, Id, LocationCode, Body}, + endpoint_timer:task(TimerPid, Id, fun() -> ReceiverPid ! {next_data, Id, LocationCode, Body} end), + + Buffer#buffer{flight_num = FlightNum + 1}; + {error, Error} -> + lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p, message discard", [Name, Error]), + ets:delete(Tid, Id), + Buffer; + ignore -> + lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore, messge discard", [Name]), + ets:delete(Tid, Id), + Buffer + end + end. + +ack(Id, Buffer = #buffer{tid = Tid, timer_pid = TimerPid, acc_num = AccNum, flight_num = FlightNum}) -> + true = ets:delete(Tid, Id), + endpoint_timer:ack(TimerPid, Id), + trigger_next(Buffer#buffer{acc_num = AccNum + 1, flight_num = FlightNum - 1}). + +%% 获取当前统计信息 +stat(#buffer{acc_num = AccNum, tid = Tid}) -> + #{ + <<"acc_num">> => AccNum, + <<"queue_num">> => ets:info(Tid, size) + }. + +cleanup(#buffer{timer_pid = TimerPid}) -> + endpoint_timer:cleanup(TimerPid), + ok. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec safe_invoke_mapper(MapperFun :: fun(), NorthData :: #north_data{}) -> + {ok, Body :: any()} | ignore | {error, Reason :: any()}. +safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}) -> + try + if + is_function(MapperFun, 2) -> + MapperFun(LocationCode, Fields); + is_function(MapperFun, 3) -> + MapperFun(LocationCode, Fields, Timestamp) + end + catch _:Error -> + {error, Error} + end. \ No newline at end of file diff --git a/apps/endpoint/src/endpoint_timer.erl b/apps/endpoint/src/endpoint_timer.erl new file mode 100644 index 0000000..1244500 --- /dev/null +++ b/apps/endpoint/src/endpoint_timer.erl @@ -0,0 +1,128 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2024, +%%% @doc +%%% +%%% @end +%%% Created : 07. 5月 2024 10:30 +%%%------------------------------------------------------------------- +-module(endpoint_timer). +-author("anlicheng"). +-include("endpoint.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/1]). +-export([task/3, ack/2, cleanup/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-record(state, { + retry_interval = 0, + %% 定时器 + timer_map = #{} +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +task(Pid, Id, Task) when is_pid(Pid), is_integer(Id), is_function(Task, 0) -> + gen_server:cast(Pid, {task, Id, Task}). + +ack(Pid, Id) when is_pid(Pid), is_integer(Id) -> + gen_server:cast(Pid, {ack, Id}). + +cleanup(Pid) when is_pid(Pid) -> + gen_server:cast(Pid, cleanup). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(RetryInterval :: integer()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(RetryInterval) when is_integer(RetryInterval) -> + gen_server:start_link(?MODULE, [RetryInterval], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([RetryInterval]) -> + {ok, #state{retry_interval = RetryInterval}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({task, Id, Task}, State = #state{retry_interval = RetryInterval, timer_map = TimerMap}) -> + TimerRef = erlang:start_timer(RetryInterval, self(), {repost_ticker, {Id, Task}}), + {noreply, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}}; + +%% 取消 +handle_cast({ack, Id}, State = #state{timer_map = TimerMap}) -> + case maps:take(Id, TimerMap) of + error -> + {noreply, State}; + {TimerRef, NTimerMap} -> + is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), + {noreply, State#state{timer_map = NTimerMap}} + end; + +handle_cast(cleanup, State = #state{timer_map = TimerMap}) -> + lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)), + {noreply, State#state{timer_map = #{}}}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({timeout, _, {repost_ticker, {Id, Task}}}, State = #state{retry_interval = RetryInterval, timer_map = TimerMap}) -> + Task(), + TimerRef = erlang:start_timer(RetryInterval, self(), {repost_ticker, {Id, Task}}), + + {noreply, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%===================================================================