This commit is contained in:
anlicheng 2025-11-12 15:25:04 +08:00
parent 3290bd1be0
commit 4405483b9d
5 changed files with 36 additions and 7 deletions

View File

@ -13,7 +13,7 @@
%% API %% API
-export([start_link/1]). -export([start_link/1]).
-export([get_name/1, get_pid/1, forward/3, reload/2, clean_up/1]). -export([get_name/1, get_pid/1, forward/3, reload/2, clean_up/1]).
-export([get_alias_pid/1]). -export([get_alias_pid/1, is_support/1, get_protocol/1]).
-export([endpoint_record/1]). -export([endpoint_record/1]).
%%%=================================================================== %%%===================================================================
@ -58,6 +58,20 @@ reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) ->
clean_up(Pid) when is_pid(Pid) -> clean_up(Pid) when is_pid(Pid) ->
gen_server:call(Pid, clean_up, 5000). gen_server:call(Pid, clean_up, 5000).
-spec get_protocol(Endpoint :: #endpoint{}) -> atom().
get_protocol(#endpoint{config = #http_endpoint{}}) ->
http;
get_protocol(#endpoint{config = #mqtt_endpoint{}}) ->
mqtt;
get_protocol(#endpoint{config = #kafka_endpoint{}}) ->
kafka.
-spec is_support(Protocol :: atom()) -> boolean().
is_support(Protocol) when is_atom(Protocol) ->
{ok, Props} = application:get_env(iot, endpoints),
SupportProtocols = proplists:get_value(support_protocols, Props, []),
lists:member(Protocol, SupportProtocols).
-spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}. -spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}.
endpoint_record(#{<<"id">> := Id, <<"matcher">> := Matcher, <<"title">> := Title, <<"type">> := Type, <<"config">> := ConfigJson, endpoint_record(#{<<"id">> := Id, <<"matcher">> := Matcher, <<"title">> := Title, <<"type">> := Type, <<"config">> := ConfigJson,
<<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) -> <<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) ->

View File

@ -108,7 +108,7 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS
BaseConfig BaseConfig
end, end,
case brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of case catch brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of
{ok, ClientPid} -> {ok, ClientPid} ->
case brod:start_producer(ClientId, Topic, _ProducerConfig = []) of case brod:start_producer(ClientId, Topic, _ProducerConfig = []) of
ok -> ok ->
@ -119,8 +119,8 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS
retry_connect(), retry_connect(),
{noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}} {noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}}
end; end;
{error, Reason} -> Error ->
lager:debug("[endpoint_kafka] start_client: ~p, get error: ~p", [ClientId, Reason]), lager:debug("[endpoint_kafka] start_client: ~p, get error: ~p", [ClientId, Error]),
retry_connect(), retry_connect(),
{noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}} {noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}}
end; end;

View File

@ -53,7 +53,7 @@ start_link(LocalName, Endpoint = #endpoint{}) when is_atom(LocalName) ->
%% gen_statem:start_link/[3,4], this function is called by the new %% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize. %% process to initialize.
init([Endpoint = #endpoint{matcher = Matcher}]) -> init([Endpoint = #endpoint{matcher = Matcher}]) ->
erlang:process_flag(trap_exit, true), % erlang:process_flag(trap_exit, true),
endpoint_subscription:subscribe(Matcher, self()), endpoint_subscription:subscribe(Matcher, self()),
%% , %% ,
@ -113,7 +113,7 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status
{ok, ConnPid} = emqtt:start_link(Opts), {ok, ConnPid} = emqtt:start_link(Opts),
lager:debug("[endpoint_mqtt] start connect, options: ~p", [Opts]), lager:debug("[endpoint_mqtt] start connect, options: ~p", [Opts]),
case emqtt:connect(ConnPid, 5000) of case catch emqtt:connect(ConnPid, 5000) of
{ok, _} -> {ok, _} ->
lager:debug("[endpoint_mqtt] connect success, pid: ~p", [ConnPid]), lager:debug("[endpoint_mqtt] connect success, pid: ~p", [ConnPid]),
NBuffer = endpoint_buffer:trigger_n(Buffer), NBuffer = endpoint_buffer:trigger_n(Buffer),
@ -121,6 +121,10 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status
{error, Reason} -> {error, Reason} ->
lager:warning("[endpoint_mqtt] connect get error: ~p", [Reason]), lager:warning("[endpoint_mqtt] connect get error: ~p", [Reason]),
erlang:start_timer(5000, self(), create_postman), erlang:start_timer(5000, self(), create_postman),
{noreply, State};
Error ->
lager:warning("[endpoint_mqtt] connect get error: ~p", [Error]),
erlang:start_timer(5000, self(), create_postman),
{noreply, State} {noreply, State}
end; end;

View File

@ -35,7 +35,12 @@ init([]) ->
error -> error ->
[]; [];
{ok, Endpoint} -> {ok, Endpoint} ->
[child_spec(Endpoint)] case endpoint:is_support(endpoint:get_protocol(Endpoint)) of
true ->
[child_spec(Endpoint)];
false ->
[]
end
end end
end, Endpoints), end, Endpoints),
{ok, {SupFlags, ChildSpecs}}. {ok, {SupFlags, ChildSpecs}}.

View File

@ -26,6 +26,12 @@
]}, ]},
{api_url, "http://100.123.0.4/api/v1"}, {api_url, "http://100.123.0.4/api/v1"},
%% 支持的协议
{endpoints, [
{support_protocols, [
http
]}
]},
%% 目标服务器地址 %% 目标服务器地址
{emqx_server, [ {emqx_server, [