fix kafka

This commit is contained in:
anlicheng 2025-11-11 22:48:37 +08:00
parent b7c745a41e
commit 3290bd1be0

View File

@ -110,9 +110,15 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS
case brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of case brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of
{ok, ClientPid} -> {ok, ClientPid} ->
ok = brod:start_producer(ClientId, Topic, _ProducerConfig = []), case brod:start_producer(ClientId, Topic, _ProducerConfig = []) of
ok ->
NBuffer = endpoint_buffer:trigger_next(Buffer), NBuffer = endpoint_buffer:trigger_next(Buffer),
{noreply, State#state{buffer = NBuffer, client_pid = ClientPid, status = ?CONNECTED}}; {noreply, State#state{buffer = NBuffer, client_pid = ClientPid, status = ?CONNECTED}};
{error, Reason} ->
lager:debug("[endpoint_kafka] start_producer: ~p, get error: ~p", [ClientId, Reason]),
retry_connect(),
{noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}}
end;
{error, Reason} -> {error, Reason} ->
lager:debug("[endpoint_kafka] start_client: ~p, get error: ~p", [ClientId, Reason]), lager:debug("[endpoint_kafka] start_client: ~p, get error: ~p", [ClientId, Reason]),
retry_connect(), retry_connect(),