diff --git a/apps/endpoint/src/endpoint_buffer.erl b/apps/endpoint/src/endpoint_buffer.erl index dbf2aee..20c0108 100644 --- a/apps/endpoint/src/endpoint_buffer.erl +++ b/apps/endpoint/src/endpoint_buffer.erl @@ -15,6 +15,7 @@ -define(RETRY_INTERVAL, 5000). -export([new/2, append/2, trigger_next/1, trigger_n/1, cleanup/1, ack/2, stat/1]). +-export_type([buffer/0]). -record(buffer, { endpoint, @@ -40,6 +41,8 @@ timestamp :: integer() }). +-type buffer() :: #buffer{}. + -spec new(Endpoint :: #endpoint{}, WindowSize :: integer()) -> Buffer :: #buffer{}. new(Endpoint = #endpoint{id = Id}, WindowSize) when is_integer(WindowSize), WindowSize > 0 -> %% 初始化存储 diff --git a/apps/endpoint/src/endpoint_for_http.erl b/apps/endpoint/src/endpoint_for_http.erl new file mode 100644 index 0000000..597d2c0 --- /dev/null +++ b/apps/endpoint/src/endpoint_for_http.erl @@ -0,0 +1,150 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2024, +%%% @doc +%%% +%%% @end +%%% Created : 07. 5月 2024 11:17 +%%%------------------------------------------------------------------- +-module(endpoint_for_http). +-author("anlicheng"). +-include("endpoint.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/2]). +-export([get_name/1, get_pid/1, forward/4, get_stat/1, cleanup/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + http_endpoint :: #http_endpoint{}, + buffer :: endpoint_buffer:buffer() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec get_name(Name :: binary() | #endpoint{}) -> atom(). +get_name(Id) when is_integer(Id) -> + list_to_atom("endpoint:" ++ integer_to_list(Id)). + +-spec get_pid(Id :: integer()) -> undefined | pid(). +get_pid(Id) when is_integer(Id) -> + whereis(get_name(Id)). + +-spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). +forward(undefined, _, _, _) -> + ok; +forward(Pid, LocationCode, Fields, Timestamp) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> + gen_server:cast(Pid, {forward, LocationCode, Fields, Timestamp}). + +-spec get_stat(Pid :: pid()) -> {ok, Stat :: #{}}. +get_stat(Pid) when is_pid(Pid) -> + gen_server:call(Pid, get_stat, 5000). + +-spec cleanup(Pid :: pid()) -> ok. +cleanup(Pid) when is_pid(Pid) -> + gen_server:cast(Pid, cleanup). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(Name, Endpoint :: #http_endpoint{}) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(Name, Endpoint = #http_endpoint{}) when is_atom(Name) -> + gen_server:start_link({local, Name}, ?MODULE, [Endpoint], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([Endpoint]) -> + Buffer = endpoint_buffer:new(Endpoint, 10), + {ok, #state{http_endpoint = Endpoint, buffer = Buffer}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(get_stat, _From, State = #state{buffer = Buffer}) -> + Stat = endpoint_buffer:stat(Buffer), + {reply, {ok, Stat}, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({forward, LocationCode, Fields, Timestamp}, State = #state{buffer = Buffer}) -> + NBuffer = endpoint_buffer:append({LocationCode, Fields, Timestamp}, Buffer), + {noreply, State#state{buffer = NBuffer}}; + +handle_cast(cleanup, State = #state{buffer = Buffer}) -> + endpoint_buffer:cleanup(Buffer), + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({next_data, Id, _LocationCode, Body}, State = #state{buffer = Buffer, http_endpoint = #http_endpoint{url = Url}}) -> + Headers = [ + {<<"content-type">>, <<"application/json">>} + ], + case hackney:request(post, Url, Headers, Body) of + {ok, 200, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + hackney:close(ClientRef), + lager:debug("[http_postman] url: ~p, response is: ~p", [Url, RespBody]), + NBuffer = endpoint_buffer:ack(Buffer, Id), + + {noreply, State#state{buffer = NBuffer}}; + {ok, HttpCode, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + hackney:close(ClientRef), + lager:debug("[http_postman] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]), + {noreply, State}; + {error, Reason} -> + lager:warning("[http_postman] url: ~p, get error: ~p", [Url, Reason]), + {noreply, State} + end. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%===================================================================