diff --git a/apps/iot/include/endpoint.hrl b/apps/iot/include/endpoint.hrl index b94405f..225be8f 100644 --- a/apps/iot/include/endpoint.hrl +++ b/apps/iot/include/endpoint.hrl @@ -26,7 +26,8 @@ -record(kafka_endpoint, { username = <<>> :: binary(), password = <<>> :: binary(), - bootstrap_servers = [] :: [binary()], + mechanism :: atom(), + bootstrap_servers = [] :: [{string(), integer()}], topic = <<>> :: binary() }). diff --git a/apps/iot/src/database/endpoint_bo.erl b/apps/iot/src/database/endpoint_bo.erl index 8514147..e05ef97 100644 --- a/apps/iot/src/database/endpoint_bo.erl +++ b/apps/iot/src/database/endpoint_bo.erl @@ -60,10 +60,30 @@ parse_config(<<"http">>, #{<<"url">> := Url, <<"pool_size">> := PoolSize}) -> url = Url, pool_size = PoolSize }; -parse_config(<<"kafka">>, #{<<"username">> := Username, <<"password">> := Password, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) -> +parse_config(<<"kafka">>, #{<<"username">> := Username, <<"password">> := Password, <<"mechanism">> := Mechanism0, <<"bootstrap_servers">> := BootstrapServers0, <<"topic">> := Topic}) -> + BootstrapServers = lists:filtermap(fun(S) -> + case binary:split(S, <<":">>) of + [Host0, Port0] -> + {true, {binary_to_list(Host0), binary_to_integer(Port0)}}; + _ -> + false + end + end, BootstrapServers0), + Mechanism = case Mechanism of + <<"sha_256">> -> + scram_sha_256; + <<"sha_512">> -> + scram_sha_512; + <<"plain">> -> + plain; + _ -> + plain + end, + #kafka_endpoint{ username = Username, password = Password, + mechanism = Mechanism, bootstrap_servers = BootstrapServers, topic = Topic }; diff --git a/apps/iot/src/endpoint/endpoint_kafka.erl b/apps/iot/src/endpoint/endpoint_kafka.erl index c3badcd..89740da 100644 --- a/apps/iot/src/endpoint/endpoint_kafka.erl +++ b/apps/iot/src/endpoint/endpoint_kafka.erl @@ -92,19 +92,16 @@ handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId, - endpoint = #endpoint{title = Title, config = #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers0, topic = Topic}}}) -> + endpoint = #endpoint{title = Title, config = #kafka_endpoint{username = Username, password = Password, mechanism = Mechanism, bootstrap_servers = BootstrapServers, topic = Topic}}}) -> lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Title]), - BootstrapServers = lists:flatmap(fun(S) -> - case binary:split(S, <<":">>) of - [Host0, Port] -> - [{binary_to_list(Host0), binary_to_integer(Port)}]; - _ -> - [] - end - end, BootstrapServers0), ClientConfig = [ {reconnect_cool_down_seconds, 5}, + {sasl, [ + {mechanism, Mechanism}, + {username, Username}, + {password, Password} + ]}, {socket_options, [{keepalive, true}]} ], case brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of