diff --git a/apps/endpoint/src/endpoint.app.src b/apps/endpoint/src/endpoint.app.src index d7e7b82..f2ced32 100644 --- a/apps/endpoint/src/endpoint.app.src +++ b/apps/endpoint/src/endpoint.app.src @@ -13,6 +13,8 @@ parse_trans, lager, mnesia, + ssl, + public_key, kernel, stdlib ]}, diff --git a/apps/endpoint/src/endpoint_buffer.erl b/apps/endpoint/src/endpoint_buffer.erl index 3ccc24b..a2f5e6b 100644 --- a/apps/endpoint/src/endpoint_buffer.erl +++ b/apps/endpoint/src/endpoint_buffer.erl @@ -3,7 +3,7 @@ %%% @author aresei %%% @copyright (C) 2023, %%% @doc -%%% +%%% 1. 既然数据整体都没有做到硬盘的持久化,因此当失败的时候,可能的重试 %%% @end %%% Created : 06. 7月 2023 12:02 %%%------------------------------------------------------------------- @@ -14,16 +14,14 @@ %% 消息重发间隔 -define(RETRY_INTERVAL, 5000). --export([new/2, append/2, trigger_next/1, trigger_n/1, cleanup/1, ack/2, stat/1]). +-export([new/2, append/3, trigger_next/1, trigger_n/1, cleanup/1, ack/2, stat/1]). -export_type([buffer/0]). -record(buffer, { endpoint :: #endpoint{}, next_id = 1 :: integer(), - %% 当前数据所在的游标 - cursor = 0 :: integer(), - %% ets存储的引用 - tid :: ets:tid(), + %% 用queue存储 + queue :: queue:queue(), %% 定时器 timer_pid :: pid(), %% 窗口大小,允许最大的未确认消息数 @@ -34,30 +32,18 @@ acc_num = 0 }). --record(north_data, { - id :: integer(), - location_code, - fields, - timestamp :: integer() -}). - -type buffer() :: #buffer{}. -spec new(Endpoint :: #endpoint{}, WindowSize :: integer()) -> Buffer :: #buffer{}. -new(Endpoint = #endpoint{id = Id}, WindowSize) when is_integer(WindowSize), WindowSize > 0 -> - %% 初始化存储 - EtsName = list_to_atom("endpoint_buffer_ets:" ++ integer_to_list(Id)), - Tid = ets:new(EtsName, [ordered_set, private]), +new(Endpoint = #endpoint{}, WindowSize) when is_integer(WindowSize), WindowSize > 0 -> %% 定义重发器 {ok, TimerPid} = endpoint_timer:start_link(?RETRY_INTERVAL), + #buffer{queue = queue:new(), timer_pid = TimerPid, endpoint = Endpoint, window_size = WindowSize}. - #buffer{cursor = 0, tid = Tid, timer_pid = TimerPid, endpoint = Endpoint, window_size = WindowSize}. - --spec append(tuple(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}. -append({LocationCode, Fields, Timestamp}, Buffer = #buffer{tid = Tid, next_id = NextId, window_size = WindowSize, flight_num = FlightNum}) -> - NorthData = #north_data{id = NextId, location_code = LocationCode, fields = Fields, timestamp = Timestamp}, - true = ets:insert(Tid, NorthData), - NBuffer = Buffer#buffer{next_id = NextId + 1}, +-spec append(Metadata :: map(), Metric :: binary(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}. +append(Metadata, Metric, Buffer = #buffer{queue = Q, next_id = NextId, window_size = WindowSize, flight_num = FlightNum}) when is_map(Metadata), is_binary(Metric) -> + NQ = queue:in({NextId, Metadata, Metric}, Q), + NBuffer = Buffer#buffer{queue = NQ, next_id = NextId + 1}, case FlightNum < WindowSize of true -> trigger_next(NBuffer); @@ -72,65 +58,36 @@ trigger_n(Buffer = #buffer{window_size = WindowSize}) -> %% 触发读取下一条数据 -spec trigger_next(Buffer :: #buffer{}) -> NBuffer :: #buffer{}. -trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid, flight_num = FlightNum, endpoint = #endpoint{title = Title, mapper_fun = MapperFun}}) -> - case ets:next(Tid, Cursor) of - '$end_of_table' -> - Buffer; - NKey -> - [NorthData = #north_data{id = Id, location_code = LocationCode}] = ets:lookup(Tid, NKey), - %lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]), +trigger_next(Buffer = #buffer{queue = Q, timer_pid = TimerPid, flight_num = FlightNum, endpoint = #endpoint{}}) -> + case queue:out(Q) of + {empty, Q1} -> + Buffer#buffer{queue = Q1}; + {{value, {Id, Metadata, Metric}}, Q2} -> %% 重发机制, 在发送的过程中mapper可能会改变 - case safe_invoke_mapper(MapperFun, NorthData) of - {ok, Body} -> - ReceiverPid = self(), - ReceiverPid ! {next_data, Id, LocationCode, Body}, - endpoint_timer:task(TimerPid, Id, fun() -> ReceiverPid ! {next_data, Id, LocationCode, Body} end), + ReceiverPid = self(), + Message = {next_data, Id, Metadata, Metric}, - Buffer#buffer{flight_num = FlightNum + 1}; - {error, Error} -> - lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p, message discard", [Title, Error]), - ets:delete(Tid, Id), - Buffer; - ignore -> - lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore, messge discard", [Title]), - ets:delete(Tid, Id), - Buffer - end + ReceiverPid ! Message, + %% 重试机制 + endpoint_timer:task(TimerPid, Id, fun() -> ReceiverPid ! Message end), + + Buffer#buffer{flight_num = FlightNum + 1, queue = Q2} end. -spec ack(Id :: integer(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}. -ack(Id, Buffer = #buffer{tid = Tid, timer_pid = TimerPid, acc_num = AccNum, flight_num = FlightNum}) when is_integer(Id) -> - true = ets:delete(Tid, Id), +ack(Id, Buffer = #buffer{timer_pid = TimerPid, acc_num = AccNum, flight_num = FlightNum}) when is_integer(Id) -> endpoint_timer:ack(TimerPid, Id), trigger_next(Buffer#buffer{acc_num = AccNum + 1, flight_num = FlightNum - 1}). %% 获取当前统计信息 -spec stat(Buffer :: #buffer{}) -> map(). -stat(#buffer{acc_num = AccNum, tid = Tid}) -> +stat(#buffer{acc_num = AccNum, queue = Q}) -> #{ <<"acc_num">> => AccNum, - <<"queue_num">> => ets:info(Tid, size) + <<"queue_num">> => queue:len(Q) }. -spec cleanup(Buffer :: #buffer{}) -> ok. cleanup(#buffer{timer_pid = TimerPid}) -> endpoint_timer:cleanup(TimerPid), - ok. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - --spec safe_invoke_mapper(MapperFun :: fun(), NorthData :: #north_data{}) -> - {ok, Body :: any()} | ignore | {error, Reason :: any()}. -safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}) -> - try - if - is_function(MapperFun, 2) -> - MapperFun(LocationCode, Fields); - is_function(MapperFun, 3) -> - MapperFun(LocationCode, Fields, Timestamp) - end - catch _:Error -> - {error, Error} - end. \ No newline at end of file + ok. \ No newline at end of file diff --git a/apps/endpoint/src/endpoint_http.erl b/apps/endpoint/src/endpoint_http.erl index 8da36da..68581ec 100644 --- a/apps/endpoint/src/endpoint_http.erl +++ b/apps/endpoint/src/endpoint_http.erl @@ -82,11 +82,9 @@ handle_cast(cleanup, State = #state{buffer = Buffer}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({next_data, Id, _LocationCode, Body}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) -> - Headers = [ - {<<"content-type">>, <<"application/json">>} - ], - case hackney:request(post, Url, Headers, Body) of +handle_info({next_data, Id, Metadata, Metric}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) -> + Headers = request_headers(Metadata), + case hackney:request(post, Url, Headers, Metric, [{pool, false}]) of {ok, 200, _, ClientRef} -> {ok, RespBody} = hackney:body(ClientRef), hackney:close(ClientRef), @@ -125,3 +123,15 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +-spec request_headers(Metadata :: any()) -> list(). +request_headers(Metadata) when is_map(Metadata), map_size(Metadata) > 0 -> + XMetaData = base64:encode(iolist_to_binary(jiffy:encode(Metadata, [force_utf8]))), + [ + {<<"X-Metadata">>, XMetaData}, + {<<"Content-Type">>, <<"application/json">>} + ]; +request_headers(_) -> + [ + {<<"Content-Type">>, <<"application/json">>} + ]. \ No newline at end of file diff --git a/apps/endpoint/src/endpoint_timer.erl b/apps/endpoint/src/endpoint_timer.erl index 1244500..a41ef44 100644 --- a/apps/endpoint/src/endpoint_timer.erl +++ b/apps/endpoint/src/endpoint_timer.erl @@ -29,12 +29,15 @@ %%% API %%%=================================================================== +-spec task(Pid :: pid(), Id :: integer(), Task :: fun(() -> no_return())) -> no_return(). task(Pid, Id, Task) when is_pid(Pid), is_integer(Id), is_function(Task, 0) -> gen_server:cast(Pid, {task, Id, Task}). +-spec ack(Pid :: pid(), Id :: integer()) -> no_return(). ack(Pid, Id) when is_pid(Pid), is_integer(Id) -> gen_server:cast(Pid, {ack, Id}). +-spec cleanup(Pid :: pid()) -> no_return(). cleanup(Pid) when is_pid(Pid) -> gen_server:cast(Pid, cleanup).