add kafka endpoint

This commit is contained in:
anlicheng 2025-08-18 17:24:50 +08:00
parent 3db0159573
commit 11fe527135
3 changed files with 29 additions and 11 deletions

View File

@ -26,7 +26,8 @@
-record(kafka_endpoint, {
username = <<>> :: binary(),
password = <<>> :: binary(),
bootstrap_servers = [] :: [binary()],
mechanism :: atom(),
bootstrap_servers = [] :: [{string(), integer()}],
topic = <<>> :: binary()
}).

View File

@ -60,10 +60,30 @@ parse_config(<<"http">>, #{<<"url">> := Url, <<"pool_size">> := PoolSize}) ->
url = Url,
pool_size = PoolSize
};
parse_config(<<"kafka">>, #{<<"username">> := Username, <<"password">> := Password, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) ->
parse_config(<<"kafka">>, #{<<"username">> := Username, <<"password">> := Password, <<"mechanism">> := Mechanism0, <<"bootstrap_servers">> := BootstrapServers0, <<"topic">> := Topic}) ->
BootstrapServers = lists:filtermap(fun(S) ->
case binary:split(S, <<":">>) of
[Host0, Port0] ->
{true, {binary_to_list(Host0), binary_to_integer(Port0)}};
_ ->
false
end
end, BootstrapServers0),
Mechanism = case Mechanism of
<<"sha_256">> ->
scram_sha_256;
<<"sha_512">> ->
scram_sha_512;
<<"plain">> ->
plain;
_ ->
plain
end,
#kafka_endpoint{
username = Username,
password = Password,
mechanism = Mechanism,
bootstrap_servers = BootstrapServers,
topic = Topic
};

View File

@ -92,19 +92,16 @@ handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) ->
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId,
endpoint = #endpoint{title = Title, config = #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers0, topic = Topic}}}) ->
endpoint = #endpoint{title = Title, config = #kafka_endpoint{username = Username, password = Password, mechanism = Mechanism, bootstrap_servers = BootstrapServers, topic = Topic}}}) ->
lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Title]),
BootstrapServers = lists:flatmap(fun(S) ->
case binary:split(S, <<":">>) of
[Host0, Port] ->
[{binary_to_list(Host0), binary_to_integer(Port)}];
_ ->
[]
end
end, BootstrapServers0),
ClientConfig = [
{reconnect_cool_down_seconds, 5},
{sasl, [
{mechanism, Mechanism},
{username, Username},
{password, Password}
]},
{socket_options, [{keepalive, true}]}
],
case brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of