diff --git a/apps/iot/src/endpoint/endpoint.erl b/apps/iot/src/endpoint/endpoint.erl index 445b48c..89d1a1e 100644 --- a/apps/iot/src/endpoint/endpoint.erl +++ b/apps/iot/src/endpoint/endpoint.erl @@ -13,7 +13,7 @@ %% API -export([start_link/1]). -export([get_name/1, get_pid/1, forward/3, reload/2, clean_up/1]). --export([get_alias_pid/1]). +-export([get_alias_pid/1, is_support/1, get_protocol/1]). -export([endpoint_record/1]). %%%=================================================================== @@ -58,6 +58,20 @@ reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) -> clean_up(Pid) when is_pid(Pid) -> gen_server:call(Pid, clean_up, 5000). +-spec get_protocol(Endpoint :: #endpoint{}) -> atom(). +get_protocol(#endpoint{config = #http_endpoint{}}) -> + http; +get_protocol(#endpoint{config = #mqtt_endpoint{}}) -> + mqtt; +get_protocol(#endpoint{config = #kafka_endpoint{}}) -> + kafka. + +-spec is_support(Protocol :: atom()) -> boolean(). +is_support(Protocol) when is_atom(Protocol) -> + {ok, Props} = application:get_env(iot, endpoints), + SupportProtocols = proplists:get_value(support_protocols, Props, []), + lists:member(Protocol, SupportProtocols). + -spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}. endpoint_record(#{<<"id">> := Id, <<"matcher">> := Matcher, <<"title">> := Title, <<"type">> := Type, <<"config">> := ConfigJson, <<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) -> diff --git a/apps/iot/src/endpoint/endpoint_kafka.erl b/apps/iot/src/endpoint/endpoint_kafka.erl index 52a577c..7c16a8e 100644 --- a/apps/iot/src/endpoint/endpoint_kafka.erl +++ b/apps/iot/src/endpoint/endpoint_kafka.erl @@ -108,7 +108,7 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS BaseConfig end, - case brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of + case catch brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of {ok, ClientPid} -> case brod:start_producer(ClientId, Topic, _ProducerConfig = []) of ok -> @@ -119,8 +119,8 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS retry_connect(), {noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}} end; - {error, Reason} -> - lager:debug("[endpoint_kafka] start_client: ~p, get error: ~p", [ClientId, Reason]), + Error -> + lager:debug("[endpoint_kafka] start_client: ~p, get error: ~p", [ClientId, Error]), retry_connect(), {noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}} end; diff --git a/apps/iot/src/endpoint/endpoint_mqtt.erl b/apps/iot/src/endpoint/endpoint_mqtt.erl index fe99c20..87eabec 100644 --- a/apps/iot/src/endpoint/endpoint_mqtt.erl +++ b/apps/iot/src/endpoint/endpoint_mqtt.erl @@ -53,7 +53,7 @@ start_link(LocalName, Endpoint = #endpoint{}) when is_atom(LocalName) -> %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. init([Endpoint = #endpoint{matcher = Matcher}]) -> - erlang:process_flag(trap_exit, true), + % erlang:process_flag(trap_exit, true), endpoint_subscription:subscribe(Matcher, self()), %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 @@ -113,7 +113,7 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status {ok, ConnPid} = emqtt:start_link(Opts), lager:debug("[endpoint_mqtt] start connect, options: ~p", [Opts]), - case emqtt:connect(ConnPid, 5000) of + case catch emqtt:connect(ConnPid, 5000) of {ok, _} -> lager:debug("[endpoint_mqtt] connect success, pid: ~p", [ConnPid]), NBuffer = endpoint_buffer:trigger_n(Buffer), @@ -121,6 +121,10 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status {error, Reason} -> lager:warning("[endpoint_mqtt] connect get error: ~p", [Reason]), erlang:start_timer(5000, self(), create_postman), + {noreply, State}; + Error -> + lager:warning("[endpoint_mqtt] connect get error: ~p", [Error]), + erlang:start_timer(5000, self(), create_postman), {noreply, State} end; diff --git a/apps/iot/src/endpoint/endpoint_sup.erl b/apps/iot/src/endpoint/endpoint_sup.erl index 4a87c42..59bcec0 100644 --- a/apps/iot/src/endpoint/endpoint_sup.erl +++ b/apps/iot/src/endpoint/endpoint_sup.erl @@ -35,7 +35,12 @@ init([]) -> error -> []; {ok, Endpoint} -> - [child_spec(Endpoint)] + case endpoint:is_support(endpoint:get_protocol(Endpoint)) of + true -> + [child_spec(Endpoint)]; + false -> + [] + end end end, Endpoints), {ok, {SupFlags, ChildSpecs}}. diff --git a/config/sys-dev.config b/config/sys-dev.config index ed3fcd2..2d3a5df 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -26,6 +26,12 @@ ]}, {api_url, "http://100.123.0.4/api/v1"}, + %% 支持的协议 + {endpoints, [ + {support_protocols, [ + http + ]} + ]}, %% 目标服务器地址 {emqx_server, [