This commit is contained in:
anlicheng 2025-11-11 22:06:43 +08:00
parent 6d0110032f
commit 304b83420d
6 changed files with 28 additions and 72 deletions

View File

@ -33,7 +33,7 @@
-record(endpoint, {
id :: integer(),
%%
name :: binary(),
matcher :: binary(),
%%
title = <<>> :: binary(),
%% , : #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}}

View File

@ -21,18 +21,15 @@
%%%===================================================================
-spec start_link(Endpoint :: #endpoint{}) -> {'ok', pid()} | 'ignore' | {'error', term()}.
start_link(Endpoint = #endpoint{id = Id, name = Name, config = #http_endpoint{}}) ->
start_link(Endpoint = #endpoint{id = Id, config = #http_endpoint{}}) ->
LocalName = get_name(Id),
AliasName = get_alias_name(Name),
endpoint_http:start_link(LocalName, AliasName, Endpoint);
start_link(Endpoint = #endpoint{id = Id, name = Name, config = #mqtt_endpoint{}}) ->
endpoint_http:start_link(LocalName, Endpoint);
start_link(Endpoint = #endpoint{id = Id, config = #mqtt_endpoint{}}) ->
LocalName = get_name(Id),
AliasName = get_alias_name(Name),
endpoint_mqtt:start_link(LocalName, AliasName, Endpoint);
start_link(Endpoint = #endpoint{id = Id, name = Name, config = #kafka_endpoint{}}) ->
endpoint_mqtt:start_link(LocalName, Endpoint);
start_link(Endpoint = #endpoint{id = Id, config = #kafka_endpoint{}}) ->
LocalName = get_name(Id),
AliasName = get_alias_name(Name),
endpoint_kafka:start_link(LocalName, AliasName, Endpoint).
endpoint_kafka:start_link(LocalName, Endpoint).
-spec get_name(Id :: integer()) -> atom().
get_name(Id) when is_integer(Id) ->
@ -62,13 +59,13 @@ clean_up(Pid) when is_pid(Pid) ->
gen_server:call(Pid, clean_up, 5000).
-spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}.
endpoint_record(#{<<"id">> := Id, <<"name">> := Name, <<"title">> := Title, <<"type">> := Type, <<"config_json">> := ConfigJson,
endpoint_record(#{<<"id">> := Id, <<"matcher">> := Matcher, <<"title">> := Title, <<"type">> := Type, <<"config_json">> := ConfigJson,
<<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) ->
try
Config = parse_config(Type, ConfigJson),
{ok, #endpoint {
id = Id,
name = Name,
matcher = Matcher,
title = Title,
config = Config,
status = Status,

View File

@ -13,7 +13,7 @@
-behaviour(gen_server).
%% API
-export([start_link/3]).
-export([start_link/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -30,10 +30,10 @@
%%%===================================================================
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(LocalName :: atom(), AliasName :: atom(), Endpoint :: #endpoint{}) ->
-spec(start_link(LocalName :: atom(), Endpoint :: #endpoint{}) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(LocalName, AliasName, Endpoint = #endpoint{config = #http_endpoint{}}) when is_atom(LocalName), is_atom(AliasName) ->
gen_server:start_link({local, LocalName}, ?MODULE, [AliasName, Endpoint], []).
start_link(LocalName, Endpoint = #endpoint{config = #http_endpoint{}}) when is_atom(LocalName) ->
gen_server:start_link({local, LocalName}, ?MODULE, [Endpoint], []).
%%%===================================================================
%%% gen_server callbacks
@ -44,9 +44,9 @@ start_link(LocalName, AliasName, Endpoint = #endpoint{config = #http_endpoint{}}
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([AliasName, Endpoint]) ->
init([Endpoint = #endpoint{matcher = Matcher}]) ->
endpoint_subscription:subscribe(Matcher, self()),
Buffer = endpoint_buffer:new(Endpoint, 10),
true = gproc:reg({n, l, AliasName}),
{ok, #state{endpoint = Endpoint, buffer = Buffer}}.
%% @private

View File

@ -13,7 +13,7 @@
-behaviour(gen_server).
%% API
-export([start_link/3]).
-export([start_link/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -39,8 +39,8 @@
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link(LocalName, AliasName, Endpoint = #endpoint{}) when is_atom(LocalName), is_atom(AliasName) ->
gen_server:start_link({local, LocalName}, ?MODULE, [AliasName, Endpoint], []).
start_link(LocalName, Endpoint = #endpoint{}) when is_atom(LocalName) ->
gen_server:start_link({local, LocalName}, ?MODULE, [Endpoint], []).
%%%===================================================================
%%% gen_statem callbacks
@ -50,10 +50,10 @@ start_link(LocalName, AliasName, Endpoint = #endpoint{}) when is_atom(LocalName)
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([AliasName, Endpoint = #endpoint{id = Id}]) ->
erlang:process_flag(trap_exit, true),
true = gproc:reg({n, l, AliasName}),
init([Endpoint = #endpoint{id = Id, matcher = Matcher}]) ->
endpoint_subscription:subscribe(Matcher, self()),
erlang:process_flag(trap_exit, true),
%% ,
erlang:start_timer(0, self(), connect),
%%

View File

@ -13,7 +13,7 @@
-behaviour(gen_server).
%% API
-export([start_link/3]).
-export([start_link/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -41,8 +41,8 @@
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link(LocalName, AliasName, Endpoint = #endpoint{}) when is_atom(LocalName), is_atom(AliasName) ->
gen_server:start_link({local, LocalName}, ?MODULE, [AliasName, Endpoint], []).
start_link(LocalName, Endpoint = #endpoint{}) when is_atom(LocalName) ->
gen_server:start_link({local, LocalName}, ?MODULE, [Endpoint], []).
%%%===================================================================
%%% gen_statem callbacks
@ -52,9 +52,10 @@ start_link(LocalName, AliasName, Endpoint = #endpoint{}) when is_atom(LocalName)
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([AliasName, Endpoint]) ->
init([Endpoint = #endpoint{matcher = Matcher}]) ->
erlang:process_flag(trap_exit, true),
true = gproc:reg({n, l, AliasName}),
endpoint_subscription:subscribe(Matcher, self()),
%% ,
erlang:start_timer(0, self(), create_postman),
%%

View File

@ -12,7 +12,6 @@
-export([ensured_endpoint_started/1, delete_endpoint/1]).
-export([init/1]).
-export([kafka_test/0]).
-define(SERVER, ?MODULE).
@ -31,6 +30,7 @@ start_link() ->
init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
Endpoints = iot_api:get_all_endpoints(),
lager:debug("ep: ~p", [Endpoints]),
ChildSpecs = lists:flatmap(fun(EndpointInfo) ->
case endpoint:endpoint_record(EndpointInfo) of
error ->
@ -41,48 +41,6 @@ init([]) ->
end, Endpoints),
{ok, {SupFlags, ChildSpecs}}.
%% internal functions
kafka_test() ->
Endpoint = #endpoint{
id = 1,
%%
name = <<"kafka_test">>,
%%
title = <<"kafka测试"/utf8>>,
%% , : #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}}
config = #kafka_endpoint{
%sasl_config = {
% scram_sha_256,
% <<"admin">>,
% <<"lz4rP5UavRTiGZEZK8G51mxHcM5iPC">>
%},
sasl_config = undefined,
bootstrap_servers = [
{"127.0.0.1", 19092}
],
topic = <<"metric">>
},
status = 0,
updated_at = 0,
created_at = 0
},
{ok, Pid} = ensured_endpoint_started(Endpoint),
ServiceId = <<"service_id_123">>,
Metric = <<"this is a test">>,
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric).
-spec ensured_endpoint_started(Endpoint :: #endpoint{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}.
ensured_endpoint_started(Endpoint = #endpoint{}) ->
case supervisor:start_child(?MODULE, child_spec(Endpoint)) of