From 3290bd1be0bb7f6fed64c9bd4132e0527948dedb Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 11 Nov 2025 22:48:37 +0800 Subject: [PATCH] fix kafka --- apps/iot/src/endpoint/endpoint_kafka.erl | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/apps/iot/src/endpoint/endpoint_kafka.erl b/apps/iot/src/endpoint/endpoint_kafka.erl index d30900d..52a577c 100644 --- a/apps/iot/src/endpoint/endpoint_kafka.erl +++ b/apps/iot/src/endpoint/endpoint_kafka.erl @@ -110,9 +110,15 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS case brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of {ok, ClientPid} -> - ok = brod:start_producer(ClientId, Topic, _ProducerConfig = []), - NBuffer = endpoint_buffer:trigger_next(Buffer), - {noreply, State#state{buffer = NBuffer, client_pid = ClientPid, status = ?CONNECTED}}; + case brod:start_producer(ClientId, Topic, _ProducerConfig = []) of + ok -> + NBuffer = endpoint_buffer:trigger_next(Buffer), + {noreply, State#state{buffer = NBuffer, client_pid = ClientPid, status = ?CONNECTED}}; + {error, Reason} -> + lager:debug("[endpoint_kafka] start_producer: ~p, get error: ~p", [ClientId, Reason]), + retry_connect(), + {noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}} + end; {error, Reason} -> lager:debug("[endpoint_kafka] start_client: ~p, get error: ~p", [ClientId, Reason]), retry_connect(),