sasl config

This commit is contained in:
anlicheng 2025-08-19 00:32:03 +08:00
parent 0dae29b5a9
commit 42e3a77d82
4 changed files with 15 additions and 4 deletions

View File

@ -24,6 +24,7 @@
}). }).
-record(kafka_endpoint, { -record(kafka_endpoint, {
enable_sasl = false :: boolean(),
username = <<>> :: binary(), username = <<>> :: binary(),
password = <<>> :: binary(), password = <<>> :: binary(),
mechanism :: atom(), mechanism :: atom(),

View File

@ -60,7 +60,7 @@ parse_config(<<"http">>, #{<<"url">> := Url, <<"pool_size">> := PoolSize}) ->
url = Url, url = Url,
pool_size = PoolSize pool_size = PoolSize
}; };
parse_config(<<"kafka">>, #{<<"username">> := Username, <<"password">> := Password, <<"mechanism">> := Mechanism0, <<"bootstrap_servers">> := BootstrapServers0, <<"topic">> := Topic}) -> parse_config(<<"kafka">>, #{<<"enable_sasl">> := EnableSasl, <<"username">> := Username, <<"password">> := Password, <<"mechanism">> := Mechanism0, <<"bootstrap_servers">> := BootstrapServers0, <<"topic">> := Topic}) ->
BootstrapServers = lists:filtermap(fun(S) -> BootstrapServers = lists:filtermap(fun(S) ->
case binary:split(S, <<":">>) of case binary:split(S, <<":">>) of
[Host0, Port0] -> [Host0, Port0] ->
@ -81,6 +81,7 @@ parse_config(<<"kafka">>, #{<<"username">> := Username, <<"password">> := Passwo
end, end,
#kafka_endpoint{ #kafka_endpoint{
enable_sasl = EnableSasl,
username = Username, username = Username,
password = Password, password = Password,
mechanism = Mechanism, mechanism = Mechanism,

View File

@ -92,16 +92,24 @@ handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) ->
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId, handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId,
endpoint = #endpoint{title = Title, config = #kafka_endpoint{username = Username, password = Password, mechanism = Mechanism, bootstrap_servers = BootstrapServers, topic = Topic}}}) -> endpoint = #endpoint{title = Title, config = #kafka_endpoint{enable_sasl = EnableSasl, username = Username, password = Password, mechanism = Mechanism, bootstrap_servers = BootstrapServers, topic = Topic}}}) ->
lager:debug("[endpoint_kafka] endpoint: ~p, create postman", [Title]), lager:debug("[endpoint_kafka] endpoint: ~p, create postman", [Title]),
ClientConfig = [ ClientConfig0 = [
{reconnect_cool_down_seconds, 5}, {reconnect_cool_down_seconds, 5},
{sasl, {Mechanism, Username, Password}},
{socket_options, [ {socket_options, [
{keepalive, true} {keepalive, true}
]} ]}
], ],
ClientConfig = case EnableSasl of
true ->
SaslConfig = {sasl, {Mechanism, Username, Password}},
[SaslConfig|ClientConfig0];
false ->
ClientConfig0
end,
case brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of case brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of
{ok, ClientPid} -> {ok, ClientPid} ->
ok = brod:start_producer(ClientId, Topic, _ProducerConfig = []), ok = brod:start_producer(ClientId, Topic, _ProducerConfig = []),

View File

@ -52,6 +52,7 @@ kafka_test() ->
title = <<"kafka测试"/utf8>>, title = <<"kafka测试"/utf8>>,
%% , : #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}} %% , : #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}}
config = #kafka_endpoint{ config = #kafka_endpoint{
enable_sasl = true,
username = <<"admin">>, username = <<"admin">>,
password = <<"lz4rP5UavRTiGZEZK8G51mxHcM5iPC">>, password = <<"lz4rP5UavRTiGZEZK8G51mxHcM5iPC">>,
mechanism = scram_sha_256, mechanism = scram_sha_256,