From 42e3a77d8233bcd9d138430b92702519d8d37db8 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 19 Aug 2025 00:32:03 +0800 Subject: [PATCH] sasl config --- apps/iot/include/endpoint.hrl | 1 + apps/iot/src/database/endpoint_bo.erl | 3 ++- apps/iot/src/endpoint/endpoint_kafka.erl | 14 +++++++++++--- apps/iot/src/endpoint/endpoint_sup.erl | 1 + 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/apps/iot/include/endpoint.hrl b/apps/iot/include/endpoint.hrl index 225be8f..c49830b 100644 --- a/apps/iot/include/endpoint.hrl +++ b/apps/iot/include/endpoint.hrl @@ -24,6 +24,7 @@ }). -record(kafka_endpoint, { + enable_sasl = false :: boolean(), username = <<>> :: binary(), password = <<>> :: binary(), mechanism :: atom(), diff --git a/apps/iot/src/database/endpoint_bo.erl b/apps/iot/src/database/endpoint_bo.erl index 34d4b28..54637c1 100644 --- a/apps/iot/src/database/endpoint_bo.erl +++ b/apps/iot/src/database/endpoint_bo.erl @@ -60,7 +60,7 @@ parse_config(<<"http">>, #{<<"url">> := Url, <<"pool_size">> := PoolSize}) -> url = Url, pool_size = PoolSize }; -parse_config(<<"kafka">>, #{<<"username">> := Username, <<"password">> := Password, <<"mechanism">> := Mechanism0, <<"bootstrap_servers">> := BootstrapServers0, <<"topic">> := Topic}) -> +parse_config(<<"kafka">>, #{<<"enable_sasl">> := EnableSasl, <<"username">> := Username, <<"password">> := Password, <<"mechanism">> := Mechanism0, <<"bootstrap_servers">> := BootstrapServers0, <<"topic">> := Topic}) -> BootstrapServers = lists:filtermap(fun(S) -> case binary:split(S, <<":">>) of [Host0, Port0] -> @@ -81,6 +81,7 @@ parse_config(<<"kafka">>, #{<<"username">> := Username, <<"password">> := Passwo end, #kafka_endpoint{ + enable_sasl = EnableSasl, username = Username, password = Password, mechanism = Mechanism, diff --git a/apps/iot/src/endpoint/endpoint_kafka.erl b/apps/iot/src/endpoint/endpoint_kafka.erl index 9ed275b..d77b024 100644 --- a/apps/iot/src/endpoint/endpoint_kafka.erl +++ b/apps/iot/src/endpoint/endpoint_kafka.erl @@ -92,16 +92,24 @@ handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId, - endpoint = #endpoint{title = Title, config = #kafka_endpoint{username = Username, password = Password, mechanism = Mechanism, bootstrap_servers = BootstrapServers, topic = Topic}}}) -> + endpoint = #endpoint{title = Title, config = #kafka_endpoint{enable_sasl = EnableSasl, username = Username, password = Password, mechanism = Mechanism, bootstrap_servers = BootstrapServers, topic = Topic}}}) -> lager:debug("[endpoint_kafka] endpoint: ~p, create postman", [Title]), - ClientConfig = [ + ClientConfig0 = [ {reconnect_cool_down_seconds, 5}, - {sasl, {Mechanism, Username, Password}}, {socket_options, [ {keepalive, true} ]} ], + + ClientConfig = case EnableSasl of + true -> + SaslConfig = {sasl, {Mechanism, Username, Password}}, + [SaslConfig|ClientConfig0]; + false -> + ClientConfig0 + end, + case brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of {ok, ClientPid} -> ok = brod:start_producer(ClientId, Topic, _ProducerConfig = []), diff --git a/apps/iot/src/endpoint/endpoint_sup.erl b/apps/iot/src/endpoint/endpoint_sup.erl index 9bb964f..dece399 100644 --- a/apps/iot/src/endpoint/endpoint_sup.erl +++ b/apps/iot/src/endpoint/endpoint_sup.erl @@ -52,6 +52,7 @@ kafka_test() -> title = <<"kafka测试"/utf8>>, %% 配置项, 格式: #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}} config = #kafka_endpoint{ + enable_sasl = true, username = <<"admin">>, password = <<"lz4rP5UavRTiGZEZK8G51mxHcM5iPC">>, mechanism = scram_sha_256,