From 635932e44a7964d84e987b5a8c6e95627f81b153 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 17 Nov 2025 16:46:32 +0800 Subject: [PATCH] fix kafka fetch --- apps/iot/src/endpoint/endpoint_kafka.erl | 1 + apps/iot/src/mocker/endpoint_kafka_test.erl | 62 +++++++++++++++++++++ config/sys-dev.config | 4 +- 3 files changed, 64 insertions(+), 3 deletions(-) create mode 100644 apps/iot/src/mocker/endpoint_kafka_test.erl diff --git a/apps/iot/src/endpoint/endpoint_kafka.erl b/apps/iot/src/endpoint/endpoint_kafka.erl index 3bd2aaf..5f451fe 100644 --- a/apps/iot/src/endpoint/endpoint_kafka.erl +++ b/apps/iot/src/endpoint/endpoint_kafka.erl @@ -116,6 +116,7 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS {noreply, State#state{buffer = NBuffer, client_pid = ClientPid, status = ?CONNECTED}}; {error, Reason} -> lager:debug("[endpoint_kafka] start_producer: ~p, get error: ~p", [ClientId, Reason]), + brod:stop_client(ClientId), retry_connect(), {noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}} end; diff --git a/apps/iot/src/mocker/endpoint_kafka_test.erl b/apps/iot/src/mocker/endpoint_kafka_test.erl new file mode 100644 index 0000000..b9d566b --- /dev/null +++ b/apps/iot/src/mocker/endpoint_kafka_test.erl @@ -0,0 +1,62 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 14. 11月 2025 23:29 +%%%------------------------------------------------------------------- +-module(endpoint_kafka_test). +-author("anlicheng"). +-include("endpoint.hrl"). + +%% API +-export([start_test/0, test_consumer/0]). + +start_test() -> + Name = endpoint:get_name(100), + {ok, Pid} = endpoint_kafka:start_link(Name, #endpoint{ + id = 100, + %% 全局唯一,在路由规则中通过名称来指定 + matcher = <<"/dhlr/device/*/*">>, + %% 标题描述 + title = <<"test_kafka_title">>, + %% 配置项, 格式: #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}} + config = #kafka_endpoint{ + sasl_config = {plain, <<"test">>, <<"test123">>}, + bootstrap_servers = [{"118.178.229.213", 9092}], + topic = <<"dhlr_data">> + } + }), + + Json = jiffy:encode(#{ + <<"name">> => <<"anlicheng">>, + <<"age">> => 30 + }, [force_utf8]), + + endpoint:forward(Pid, Json), + ok. + +test_consumer() -> + KafkaBootstrapEndpoints = [{"118.178.229.213", 9092}], + Topic = <<"dhlr_data">>, + Partition = 0, + + ClientConfig = [ + {sasl, {plain, <<"test">>, <<"test123">>}}, + {reconnect_cool_down_seconds, 5}, + {socket_options, [{keepalive, true}]} + ], + + ok = brod:start_client(KafkaBootstrapEndpoints, client1, ClientConfig), + SubscriberCallbackFun = fun(Partition, Msg, ShellPid = CallbackState) -> + lager:debug("call here msg: ~p", [Msg]), + ShellPid ! Msg, {ok, ack, CallbackState} + end, + Receive = fun() -> receive Msg -> Msg after 1000 -> timeout end end, + Res = brod_topic_subscriber:start_link(client1, Topic, all, + _ConsumerConfig=[{begin_offset, 0}], + _CommittedOffsets=[], message, SubscriberCallbackFun, + _CallbackState=self()), + + lager:debug("start subscriber res: ~p", [Res]). \ No newline at end of file diff --git a/config/sys-dev.config b/config/sys-dev.config index d70d7d8..2d3a5df 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -29,9 +29,7 @@ %% 支持的协议 {endpoints, [ {support_protocols, [ - http, - mqtt, - kafka + http ]} ]},