diff --git a/apps/iot/src/endpoint/endpoint_kafka.erl b/apps/iot/src/endpoint/endpoint_kafka.erl index d30900d..52a577c 100644 --- a/apps/iot/src/endpoint/endpoint_kafka.erl +++ b/apps/iot/src/endpoint/endpoint_kafka.erl @@ -110,9 +110,15 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS case brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of {ok, ClientPid} -> - ok = brod:start_producer(ClientId, Topic, _ProducerConfig = []), - NBuffer = endpoint_buffer:trigger_next(Buffer), - {noreply, State#state{buffer = NBuffer, client_pid = ClientPid, status = ?CONNECTED}}; + case brod:start_producer(ClientId, Topic, _ProducerConfig = []) of + ok -> + NBuffer = endpoint_buffer:trigger_next(Buffer), + {noreply, State#state{buffer = NBuffer, client_pid = ClientPid, status = ?CONNECTED}}; + {error, Reason} -> + lager:debug("[endpoint_kafka] start_producer: ~p, get error: ~p", [ClientId, Reason]), + retry_connect(), + {noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}} + end; {error, Reason} -> lager:debug("[endpoint_kafka] start_client: ~p, get error: ~p", [ClientId, Reason]), retry_connect(),