add kafka client

This commit is contained in:
anlicheng 2025-06-04 19:37:08 +08:00
parent 8e491c0a54
commit 65c7a1b2e6
2 changed files with 28 additions and 35 deletions

View File

@ -28,6 +28,8 @@
-record(kafka_endpoint, {
username = <<>> :: binary(),
password = <<>> :: binary(),
%%
mechanism = plain :: plain | scram_sha_256 | scram_sha_512,
bootstrap_servers = [] :: [binary()],
topic = <<>> :: binary()
}).

View File

@ -95,7 +95,7 @@ handle_cast({forward, LocationCode, Fields, Timestamp}, State = #state{buffer =
{stop, Reason :: term(), NewState :: #state{}}).
%% bootstrap_servers的格式为: [<<"localhost:9092">>, <<"localhost:9093">>]
handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId,
endpoint = #endpoint{title = Title, config = #kafka_endpoint{bootstrap_servers = BootstrapServers0, topic = Topic, username = Username, password = Password}}}) ->
endpoint = #endpoint{title = Title, config = #kafka_endpoint{bootstrap_servers = BootstrapServers0, topic = Topic, mechanism = Mechanism, username = Username, password = Password}}}) ->
BootstrapServers = lists:flatmap(fun(S) ->
case binary:split(S, <<":">>, [global, trim]) of
[Host, Port] ->
@ -104,27 +104,29 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status
[]
end
end, BootstrapServers0),
lager:debug("[kafka_client] endpoint: ~p, create postman, bootstap_servers: ~p", [Title, BootstrapServers]),
ClientConfig = [{reconnect_cool_down_seconds, 10}],
case brod:start_client(BootstrapServers, ClientId, ClientConfig) of
KafkaConfig = [
{sasl, {
Mechanism,
Username,
Password
}},
{ssl, false},
{auto_start_producers, true},
{reconnect_cool_down_seconds, 10}
],
lager:debug("[kafka_client] endpoint: ~p, create postman, bootstap_servers: ~p, kafka config: ~p", [Title, BootstrapServers, KafkaConfig]),
case brod:start_client(BootstrapServers, ClientId, KafkaConfig) of
ok ->
case brod:start_producer(ClientId, Topic, []) of
ok ->
ClientPid = whereis(ClientId),
erlang:monitor(process, ClientPid),
ClientPid = whereis(ClientId),
erlang:monitor(process, ClientPid),
lager:debug("[kafka_client] start producer success, topic: ~p", [Topic]),
NBuffer = endpoint_buffer:trigger_n(Buffer),
{noreply, State#state{client_pid = ClientPid, status = ?CONNECTED, buffer = NBuffer}};
{error, Reason} ->
lager:debug("[kafka_client] start producer, get error: ~p", [Reason]),
erlang:start_timer(5000, self(), create_postman),
{noreply, State}
end;
lager:debug("[kafka_client] start producer success, topic: ~p", [Topic]),
NBuffer = endpoint_buffer:trigger_n(Buffer),
{noreply, State#state{client_pid = ClientPid, status = ?CONNECTED, buffer = NBuffer}};
{error, Reason} ->
lager:debug("[kafka_client] start client, get error: ~p", [Reason]),
erlang:start_timer(5000, self(), create_postman),
reconnect_ticker(),
{noreply, State}
end;
@ -135,14 +137,18 @@ handle_info({next_data, _Id, _Metadata, _Metric}, State = #state{status = ?DISCO
handle_info({next_data, Id, Metadata, Metric}, State = #state{client_id = ClientId, status = ?CONNECTED, buffer = Buffer,
endpoint = #endpoint{config = #kafka_endpoint{topic = Topic}}}) ->
XMetadata = base64:encode(iolist_to_binary(jiffy:encode(Metadata, [force_utf8]))),
Len = byte_size(XMetadata),
Payload = <<Len:16, XMetadata/binary, Metric/binary>>,
%% 使service_id左右hash的key
Args = case maps:get(<<"service_id">>, Metadata, undefined) of
undefined ->
[ClientId, Topic, random, <<"">>, Metric];
[ClientId, Topic, random, <<"">>, Payload];
Key ->
[ClientId, Topic, undefined, Key, Metric]
[ClientId, Topic, undefined, Key, Payload]
end,
lager:debug("[kafka_client] will publish topic: ~p, message: ~p", [Topic, Metric]),
lager:debug("[kafka_client] will publish topic: ~p, message: ~p", [Topic, Payload]),
case erlang:apply(brod, produce_sync, Args)of
ok ->
@ -188,21 +194,6 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%% Internal functions
%%%===================================================================
-spec format_props(Props :: [{Key :: any(), Val :: any()}]) -> [{Key1 :: binary(), Val1 :: binary()}].
format_props(Props) when is_list(Props) ->
format_props(Props, []).
format_props([], Acc) ->
Acc;
format_props([{Key, Val}|T], Acc) when is_binary(Key), is_integer(Val) ->
format_props(T, [{Key, integer_to_binary(Val)}|Acc]);
format_props([{Key, Val}|T], Acc) when is_binary(Key), is_float(Val) ->
NVal = list_to_binary(float_to_list(Val, [{decimals, 2}, compact])),
format_props(T, [{Key, NVal}|Acc]);
format_props([{Key, Val}|T], Acc) when is_binary(Key), is_binary(Val) ->
format_props(T, [{Key, Val}|Acc]);
format_props([_|T], Acc) ->
format_props(T, Acc).
-spec reconnect_ticker() -> no_return().
reconnect_ticker() ->
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman).