解决数据转发逻辑

This commit is contained in:
anlicheng 2025-06-04 16:15:39 +08:00
parent 0c28b67a07
commit 21c254b8b1
4 changed files with 46 additions and 74 deletions

View File

@ -13,6 +13,8 @@
parse_trans,
lager,
mnesia,
ssl,
public_key,
kernel,
stdlib
]},

View File

@ -3,7 +3,7 @@
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% 1.
%%% @end
%%% Created : 06. 7 2023 12:02
%%%-------------------------------------------------------------------
@ -14,16 +14,14 @@
%%
-define(RETRY_INTERVAL, 5000).
-export([new/2, append/2, trigger_next/1, trigger_n/1, cleanup/1, ack/2, stat/1]).
-export([new/2, append/3, trigger_next/1, trigger_n/1, cleanup/1, ack/2, stat/1]).
-export_type([buffer/0]).
-record(buffer, {
endpoint :: #endpoint{},
next_id = 1 :: integer(),
%%
cursor = 0 :: integer(),
%% ets存储的引用
tid :: ets:tid(),
%% queue存储
queue :: queue:queue(),
%%
timer_pid :: pid(),
%%
@ -34,30 +32,18 @@
acc_num = 0
}).
-record(north_data, {
id :: integer(),
location_code,
fields,
timestamp :: integer()
}).
-type buffer() :: #buffer{}.
-spec new(Endpoint :: #endpoint{}, WindowSize :: integer()) -> Buffer :: #buffer{}.
new(Endpoint = #endpoint{id = Id}, WindowSize) when is_integer(WindowSize), WindowSize > 0 ->
%%
EtsName = list_to_atom("endpoint_buffer_ets:" ++ integer_to_list(Id)),
Tid = ets:new(EtsName, [ordered_set, private]),
new(Endpoint = #endpoint{}, WindowSize) when is_integer(WindowSize), WindowSize > 0 ->
%%
{ok, TimerPid} = endpoint_timer:start_link(?RETRY_INTERVAL),
#buffer{queue = queue:new(), timer_pid = TimerPid, endpoint = Endpoint, window_size = WindowSize}.
#buffer{cursor = 0, tid = Tid, timer_pid = TimerPid, endpoint = Endpoint, window_size = WindowSize}.
-spec append(tuple(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
append({LocationCode, Fields, Timestamp}, Buffer = #buffer{tid = Tid, next_id = NextId, window_size = WindowSize, flight_num = FlightNum}) ->
NorthData = #north_data{id = NextId, location_code = LocationCode, fields = Fields, timestamp = Timestamp},
true = ets:insert(Tid, NorthData),
NBuffer = Buffer#buffer{next_id = NextId + 1},
-spec append(Metadata :: map(), Metric :: binary(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
append(Metadata, Metric, Buffer = #buffer{queue = Q, next_id = NextId, window_size = WindowSize, flight_num = FlightNum}) when is_map(Metadata), is_binary(Metric) ->
NQ = queue:in({NextId, Metadata, Metric}, Q),
NBuffer = Buffer#buffer{queue = NQ, next_id = NextId + 1},
case FlightNum < WindowSize of
true ->
trigger_next(NBuffer);
@ -72,65 +58,36 @@ trigger_n(Buffer = #buffer{window_size = WindowSize}) ->
%%
-spec trigger_next(Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid, flight_num = FlightNum, endpoint = #endpoint{title = Title, mapper_fun = MapperFun}}) ->
case ets:next(Tid, Cursor) of
'$end_of_table' ->
Buffer;
NKey ->
[NorthData = #north_data{id = Id, location_code = LocationCode}] = ets:lookup(Tid, NKey),
%lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]),
trigger_next(Buffer = #buffer{queue = Q, timer_pid = TimerPid, flight_num = FlightNum, endpoint = #endpoint{}}) ->
case queue:out(Q) of
{empty, Q1} ->
Buffer#buffer{queue = Q1};
{{value, {Id, Metadata, Metric}}, Q2} ->
%% , mapper可能会改变
case safe_invoke_mapper(MapperFun, NorthData) of
{ok, Body} ->
ReceiverPid = self(),
ReceiverPid ! {next_data, Id, LocationCode, Body},
endpoint_timer:task(TimerPid, Id, fun() -> ReceiverPid ! {next_data, Id, LocationCode, Body} end),
ReceiverPid = self(),
Message = {next_data, Id, Metadata, Metric},
Buffer#buffer{flight_num = FlightNum + 1};
{error, Error} ->
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p, message discard", [Title, Error]),
ets:delete(Tid, Id),
Buffer;
ignore ->
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore, messge discard", [Title]),
ets:delete(Tid, Id),
Buffer
end
ReceiverPid ! Message,
%%
endpoint_timer:task(TimerPid, Id, fun() -> ReceiverPid ! Message end),
Buffer#buffer{flight_num = FlightNum + 1, queue = Q2}
end.
-spec ack(Id :: integer(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
ack(Id, Buffer = #buffer{tid = Tid, timer_pid = TimerPid, acc_num = AccNum, flight_num = FlightNum}) when is_integer(Id) ->
true = ets:delete(Tid, Id),
ack(Id, Buffer = #buffer{timer_pid = TimerPid, acc_num = AccNum, flight_num = FlightNum}) when is_integer(Id) ->
endpoint_timer:ack(TimerPid, Id),
trigger_next(Buffer#buffer{acc_num = AccNum + 1, flight_num = FlightNum - 1}).
%%
-spec stat(Buffer :: #buffer{}) -> map().
stat(#buffer{acc_num = AccNum, tid = Tid}) ->
stat(#buffer{acc_num = AccNum, queue = Q}) ->
#{
<<"acc_num">> => AccNum,
<<"queue_num">> => ets:info(Tid, size)
<<"queue_num">> => queue:len(Q)
}.
-spec cleanup(Buffer :: #buffer{}) -> ok.
cleanup(#buffer{timer_pid = TimerPid}) ->
endpoint_timer:cleanup(TimerPid),
ok.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec safe_invoke_mapper(MapperFun :: fun(), NorthData :: #north_data{}) ->
{ok, Body :: any()} | ignore | {error, Reason :: any()}.
safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}) ->
try
if
is_function(MapperFun, 2) ->
MapperFun(LocationCode, Fields);
is_function(MapperFun, 3) ->
MapperFun(LocationCode, Fields, Timestamp)
end
catch _:Error ->
{error, Error}
end.
ok.

View File

@ -82,11 +82,9 @@ handle_cast(cleanup, State = #state{buffer = Buffer}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({next_data, Id, _LocationCode, Body}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) ->
Headers = [
{<<"content-type">>, <<"application/json">>}
],
case hackney:request(post, Url, Headers, Body) of
handle_info({next_data, Id, Metadata, Metric}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) ->
Headers = request_headers(Metadata),
case hackney:request(post, Url, Headers, Metric, [{pool, false}]) of
{ok, 200, _, ClientRef} ->
{ok, RespBody} = hackney:body(ClientRef),
hackney:close(ClientRef),
@ -125,3 +123,15 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec request_headers(Metadata :: any()) -> list().
request_headers(Metadata) when is_map(Metadata), map_size(Metadata) > 0 ->
XMetaData = base64:encode(iolist_to_binary(jiffy:encode(Metadata, [force_utf8]))),
[
{<<"X-Metadata">>, XMetaData},
{<<"Content-Type">>, <<"application/json">>}
];
request_headers(_) ->
[
{<<"Content-Type">>, <<"application/json">>}
].

View File

@ -29,12 +29,15 @@
%%% API
%%%===================================================================
-spec task(Pid :: pid(), Id :: integer(), Task :: fun(() -> no_return())) -> no_return().
task(Pid, Id, Task) when is_pid(Pid), is_integer(Id), is_function(Task, 0) ->
gen_server:cast(Pid, {task, Id, Task}).
-spec ack(Pid :: pid(), Id :: integer()) -> no_return().
ack(Pid, Id) when is_pid(Pid), is_integer(Id) ->
gen_server:cast(Pid, {ack, Id}).
-spec cleanup(Pid :: pid()) -> no_return().
cleanup(Pid) when is_pid(Pid) ->
gen_server:cast(Pid, cleanup).