diff --git a/apps/iot/include/endpoint.hrl b/apps/iot/include/endpoint.hrl index 5676be8..49a33aa 100644 --- a/apps/iot/include/endpoint.hrl +++ b/apps/iot/include/endpoint.hrl @@ -33,7 +33,7 @@ -record(endpoint, { id :: integer(), %% 全局唯一,在路由规则中通过名称来指定 - name :: binary(), + matcher :: binary(), %% 标题描述 title = <<>> :: binary(), %% 配置项, 格式: #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}} diff --git a/apps/iot/src/endpoint/endpoint.erl b/apps/iot/src/endpoint/endpoint.erl index 2d16266..48dbc7c 100644 --- a/apps/iot/src/endpoint/endpoint.erl +++ b/apps/iot/src/endpoint/endpoint.erl @@ -21,18 +21,15 @@ %%%=================================================================== -spec start_link(Endpoint :: #endpoint{}) -> {'ok', pid()} | 'ignore' | {'error', term()}. -start_link(Endpoint = #endpoint{id = Id, name = Name, config = #http_endpoint{}}) -> +start_link(Endpoint = #endpoint{id = Id, config = #http_endpoint{}}) -> LocalName = get_name(Id), - AliasName = get_alias_name(Name), - endpoint_http:start_link(LocalName, AliasName, Endpoint); -start_link(Endpoint = #endpoint{id = Id, name = Name, config = #mqtt_endpoint{}}) -> + endpoint_http:start_link(LocalName, Endpoint); +start_link(Endpoint = #endpoint{id = Id, config = #mqtt_endpoint{}}) -> LocalName = get_name(Id), - AliasName = get_alias_name(Name), - endpoint_mqtt:start_link(LocalName, AliasName, Endpoint); -start_link(Endpoint = #endpoint{id = Id, name = Name, config = #kafka_endpoint{}}) -> + endpoint_mqtt:start_link(LocalName, Endpoint); +start_link(Endpoint = #endpoint{id = Id, config = #kafka_endpoint{}}) -> LocalName = get_name(Id), - AliasName = get_alias_name(Name), - endpoint_kafka:start_link(LocalName, AliasName, Endpoint). + endpoint_kafka:start_link(LocalName, Endpoint). -spec get_name(Id :: integer()) -> atom(). get_name(Id) when is_integer(Id) -> @@ -62,13 +59,13 @@ clean_up(Pid) when is_pid(Pid) -> gen_server:call(Pid, clean_up, 5000). -spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}. -endpoint_record(#{<<"id">> := Id, <<"name">> := Name, <<"title">> := Title, <<"type">> := Type, <<"config_json">> := ConfigJson, +endpoint_record(#{<<"id">> := Id, <<"matcher">> := Matcher, <<"title">> := Title, <<"type">> := Type, <<"config_json">> := ConfigJson, <<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) -> try Config = parse_config(Type, ConfigJson), {ok, #endpoint { id = Id, - name = Name, + matcher = Matcher, title = Title, config = Config, status = Status, diff --git a/apps/iot/src/endpoint/endpoint_http.erl b/apps/iot/src/endpoint/endpoint_http.erl index ecfd4fc..585af1e 100644 --- a/apps/iot/src/endpoint/endpoint_http.erl +++ b/apps/iot/src/endpoint/endpoint_http.erl @@ -13,7 +13,7 @@ -behaviour(gen_server). %% API --export([start_link/3]). +-export([start_link/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -30,10 +30,10 @@ %%%=================================================================== %% @doc Spawns the server and registers the local name (unique) --spec(start_link(LocalName :: atom(), AliasName :: atom(), Endpoint :: #endpoint{}) -> +-spec(start_link(LocalName :: atom(), Endpoint :: #endpoint{}) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(LocalName, AliasName, Endpoint = #endpoint{config = #http_endpoint{}}) when is_atom(LocalName), is_atom(AliasName) -> - gen_server:start_link({local, LocalName}, ?MODULE, [AliasName, Endpoint], []). +start_link(LocalName, Endpoint = #endpoint{config = #http_endpoint{}}) when is_atom(LocalName) -> + gen_server:start_link({local, LocalName}, ?MODULE, [Endpoint], []). %%%=================================================================== %%% gen_server callbacks @@ -44,9 +44,9 @@ start_link(LocalName, AliasName, Endpoint = #endpoint{config = #http_endpoint{}} -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([AliasName, Endpoint]) -> +init([Endpoint = #endpoint{matcher = Matcher}]) -> + endpoint_subscription:subscribe(Matcher, self()), Buffer = endpoint_buffer:new(Endpoint, 10), - true = gproc:reg({n, l, AliasName}), {ok, #state{endpoint = Endpoint, buffer = Buffer}}. %% @private diff --git a/apps/iot/src/endpoint/endpoint_kafka.erl b/apps/iot/src/endpoint/endpoint_kafka.erl index aecf5ab..d30900d 100644 --- a/apps/iot/src/endpoint/endpoint_kafka.erl +++ b/apps/iot/src/endpoint/endpoint_kafka.erl @@ -13,7 +13,7 @@ -behaviour(gen_server). %% API --export([start_link/3]). +-export([start_link/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -39,8 +39,8 @@ %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. -start_link(LocalName, AliasName, Endpoint = #endpoint{}) when is_atom(LocalName), is_atom(AliasName) -> - gen_server:start_link({local, LocalName}, ?MODULE, [AliasName, Endpoint], []). +start_link(LocalName, Endpoint = #endpoint{}) when is_atom(LocalName) -> + gen_server:start_link({local, LocalName}, ?MODULE, [Endpoint], []). %%%=================================================================== %%% gen_statem callbacks @@ -50,10 +50,10 @@ start_link(LocalName, AliasName, Endpoint = #endpoint{}) when is_atom(LocalName) %% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. -init([AliasName, Endpoint = #endpoint{id = Id}]) -> - erlang:process_flag(trap_exit, true), - true = gproc:reg({n, l, AliasName}), +init([Endpoint = #endpoint{id = Id, matcher = Matcher}]) -> + endpoint_subscription:subscribe(Matcher, self()), + erlang:process_flag(trap_exit, true), %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 erlang:start_timer(0, self(), connect), %% 初始化存储 diff --git a/apps/iot/src/endpoint/endpoint_mqtt.erl b/apps/iot/src/endpoint/endpoint_mqtt.erl index 59dfc43..150e6ec 100644 --- a/apps/iot/src/endpoint/endpoint_mqtt.erl +++ b/apps/iot/src/endpoint/endpoint_mqtt.erl @@ -13,7 +13,7 @@ -behaviour(gen_server). %% API --export([start_link/3]). +-export([start_link/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -41,8 +41,8 @@ %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. -start_link(LocalName, AliasName, Endpoint = #endpoint{}) when is_atom(LocalName), is_atom(AliasName) -> - gen_server:start_link({local, LocalName}, ?MODULE, [AliasName, Endpoint], []). +start_link(LocalName, Endpoint = #endpoint{}) when is_atom(LocalName) -> + gen_server:start_link({local, LocalName}, ?MODULE, [Endpoint], []). %%%=================================================================== %%% gen_statem callbacks @@ -52,9 +52,10 @@ start_link(LocalName, AliasName, Endpoint = #endpoint{}) when is_atom(LocalName) %% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. -init([AliasName, Endpoint]) -> +init([Endpoint = #endpoint{matcher = Matcher}]) -> erlang:process_flag(trap_exit, true), - true = gproc:reg({n, l, AliasName}), + endpoint_subscription:subscribe(Matcher, self()), + %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 erlang:start_timer(0, self(), create_postman), %% 初始化存储 diff --git a/apps/iot/src/endpoint/endpoint_sup.erl b/apps/iot/src/endpoint/endpoint_sup.erl index 271f31a..54ceaf5 100644 --- a/apps/iot/src/endpoint/endpoint_sup.erl +++ b/apps/iot/src/endpoint/endpoint_sup.erl @@ -12,7 +12,6 @@ -export([ensured_endpoint_started/1, delete_endpoint/1]). -export([init/1]). --export([kafka_test/0]). -define(SERVER, ?MODULE). @@ -31,6 +30,7 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, Endpoints = iot_api:get_all_endpoints(), + lager:debug("ep: ~p", [Endpoints]), ChildSpecs = lists:flatmap(fun(EndpointInfo) -> case endpoint:endpoint_record(EndpointInfo) of error -> @@ -41,48 +41,6 @@ init([]) -> end, Endpoints), {ok, {SupFlags, ChildSpecs}}. -%% internal functions - -kafka_test() -> - Endpoint = #endpoint{ - id = 1, - %% 全局唯一,在路由规则中通过名称来指定 - name = <<"kafka_test">>, - %% 标题描述 - title = <<"kafka测试"/utf8>>, - %% 配置项, 格式: #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}} - config = #kafka_endpoint{ - %sasl_config = { - % scram_sha_256, - % <<"admin">>, - % <<"lz4rP5UavRTiGZEZK8G51mxHcM5iPC">> - %}, - - sasl_config = undefined, - - bootstrap_servers = [ - {"127.0.0.1", 19092} - ], - topic = <<"metric">> - }, - status = 0, - updated_at = 0, - created_at = 0 - }, - {ok, Pid} = ensured_endpoint_started(Endpoint), - ServiceId = <<"service_id_123">>, - Metric = <<"this is a test">>, - endpoint:forward(Pid, ServiceId, Metric), - endpoint:forward(Pid, ServiceId, Metric), - endpoint:forward(Pid, ServiceId, Metric), - endpoint:forward(Pid, ServiceId, Metric), - endpoint:forward(Pid, ServiceId, Metric), - endpoint:forward(Pid, ServiceId, Metric), - endpoint:forward(Pid, ServiceId, Metric), - endpoint:forward(Pid, ServiceId, Metric), - endpoint:forward(Pid, ServiceId, Metric), - endpoint:forward(Pid, ServiceId, Metric). - -spec ensured_endpoint_started(Endpoint :: #endpoint{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}. ensured_endpoint_started(Endpoint = #endpoint{}) -> case supervisor:start_child(?MODULE, child_spec(Endpoint)) of