fix kafka fetch

This commit is contained in:
anlicheng 2025-11-17 16:46:32 +08:00
parent a3425ffc12
commit 635932e44a
3 changed files with 64 additions and 3 deletions

View File

@ -116,6 +116,7 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS
{noreply, State#state{buffer = NBuffer, client_pid = ClientPid, status = ?CONNECTED}}; {noreply, State#state{buffer = NBuffer, client_pid = ClientPid, status = ?CONNECTED}};
{error, Reason} -> {error, Reason} ->
lager:debug("[endpoint_kafka] start_producer: ~p, get error: ~p", [ClientId, Reason]), lager:debug("[endpoint_kafka] start_producer: ~p, get error: ~p", [ClientId, Reason]),
brod:stop_client(ClientId),
retry_connect(), retry_connect(),
{noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}} {noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}}
end; end;

View File

@ -0,0 +1,62 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 14. 11 2025 23:29
%%%-------------------------------------------------------------------
-module(endpoint_kafka_test).
-author("anlicheng").
-include("endpoint.hrl").
%% API
-export([start_test/0, test_consumer/0]).
start_test() ->
Name = endpoint:get_name(100),
{ok, Pid} = endpoint_kafka:start_link(Name, #endpoint{
id = 100,
%%
matcher = <<"/dhlr/device/*/*">>,
%%
title = <<"test_kafka_title">>,
%% , : #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}}
config = #kafka_endpoint{
sasl_config = {plain, <<"test">>, <<"test123">>},
bootstrap_servers = [{"118.178.229.213", 9092}],
topic = <<"dhlr_data">>
}
}),
Json = jiffy:encode(#{
<<"name">> => <<"anlicheng">>,
<<"age">> => 30
}, [force_utf8]),
endpoint:forward(Pid, Json),
ok.
test_consumer() ->
KafkaBootstrapEndpoints = [{"118.178.229.213", 9092}],
Topic = <<"dhlr_data">>,
Partition = 0,
ClientConfig = [
{sasl, {plain, <<"test">>, <<"test123">>}},
{reconnect_cool_down_seconds, 5},
{socket_options, [{keepalive, true}]}
],
ok = brod:start_client(KafkaBootstrapEndpoints, client1, ClientConfig),
SubscriberCallbackFun = fun(Partition, Msg, ShellPid = CallbackState) ->
lager:debug("call here msg: ~p", [Msg]),
ShellPid ! Msg, {ok, ack, CallbackState}
end,
Receive = fun() -> receive Msg -> Msg after 1000 -> timeout end end,
Res = brod_topic_subscriber:start_link(client1, Topic, all,
_ConsumerConfig=[{begin_offset, 0}],
_CommittedOffsets=[], message, SubscriberCallbackFun,
_CallbackState=self()),
lager:debug("start subscriber res: ~p", [Res]).

View File

@ -29,9 +29,7 @@
%% 支持的协议 %% 支持的协议
{endpoints, [ {endpoints, [
{support_protocols, [ {support_protocols, [
http, http
mqtt,
kafka
]} ]}
]}, ]},