From 0dae29b5a92fd6f1a996a74d10cef922d3445b73 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 19 Aug 2025 00:13:29 +0800 Subject: [PATCH] fix endpoint_buffer --- apps/iot/src/endpoint/endpoint_buffer.erl | 12 +++++------- apps/iot/src/endpoint/endpoint_kafka.erl | 15 +++++++++++---- apps/iot/src/endpoint/endpoint_sup.erl | 4 ++-- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/apps/iot/src/endpoint/endpoint_buffer.erl b/apps/iot/src/endpoint/endpoint_buffer.erl index 0c32f41..4368a53 100644 --- a/apps/iot/src/endpoint/endpoint_buffer.erl +++ b/apps/iot/src/endpoint/endpoint_buffer.erl @@ -71,21 +71,19 @@ trigger_n(Buffer = #buffer{window_size = WindowSize}) -> %% 触发读取下一条数据 -spec trigger_next(Buffer :: #buffer{}) -> NBuffer :: #buffer{}. trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid, flight_num = FlightNum}) -> - case ets:next(Tid, Cursor) of + case ets:first(Tid) of '$end_of_table' -> Buffer; - NKey -> - lager:debug("nkey: ~p", [NKey]), - [#north_data{id = Id, tuple = Tuple}] = ets:lookup(Tid, NKey), + Key -> + [#north_data{id = Id, tuple = Tuple}|_] = ets:take(Tid, Key), ReceiverPid = self(), ReceiverPid ! {next_data, Id, Tuple}, endpoint_timer:task(TimerPid, Id, fun() -> ReceiverPid ! {next_data, Id, Tuple} end), - Buffer#buffer{flight_num = FlightNum + 1} + Buffer#buffer{flight_num = FlightNum + 1, cursor = Cursor + 1} end. -spec ack(Id :: integer(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}. -ack(Id, Buffer = #buffer{tid = Tid, timer_pid = TimerPid, acc_num = AccNum, flight_num = FlightNum}) when is_integer(Id) -> - true = ets:delete(Tid, Id), +ack(Id, Buffer = #buffer{timer_pid = TimerPid, acc_num = AccNum, flight_num = FlightNum}) when is_integer(Id) -> endpoint_timer:ack(TimerPid, Id), trigger_next(Buffer#buffer{acc_num = AccNum + 1, flight_num = FlightNum - 1}). diff --git a/apps/iot/src/endpoint/endpoint_kafka.erl b/apps/iot/src/endpoint/endpoint_kafka.erl index 086b41d..9ed275b 100644 --- a/apps/iot/src/endpoint/endpoint_kafka.erl +++ b/apps/iot/src/endpoint/endpoint_kafka.erl @@ -97,7 +97,7 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS ClientConfig = [ {reconnect_cool_down_seconds, 5}, - %{sasl, {Mechanism, Username, Password}}, + {sasl, {Mechanism, Username, Password}}, {socket_options, [ {keepalive, true} ]} @@ -125,8 +125,7 @@ handle_info({next_data, Id, {_ServiceId, Metric}}, State = #state{status = ?CONN lager:debug("[endpoint_kafka] ack partion: ~p, offset: ~p", [Partition, BaseOffset]), ReceiverPid ! {ack, Id} end, - Res = brod:produce_cb(ClientPid, Topic, random, <<>>, Metric, AckCb), - lager:debug("[endpoint_kafka] produce_cb res: ~p", [Res]), + _ = brod:produce_cb(ClientPid, Topic, random, <<>>, Metric, AckCb), {noreply, State}; @@ -169,4 +168,12 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%%=================================================================== retry_connect() -> - erlang:start_timer(?RETRY_INTERVAL, self(), connect). \ No newline at end of file + erlang:start_timer(?RETRY_INTERVAL, self(), connect). + +check_produce_result(ok) -> + true; +check_produce_result({ok, _}) -> + true; +check_produce_result({ok, _}) -> + false. + diff --git a/apps/iot/src/endpoint/endpoint_sup.erl b/apps/iot/src/endpoint/endpoint_sup.erl index fce7e19..9bb964f 100644 --- a/apps/iot/src/endpoint/endpoint_sup.erl +++ b/apps/iot/src/endpoint/endpoint_sup.erl @@ -12,7 +12,7 @@ -export([ensured_endpoint_started/1, delete_endpoint/1]). -export([init/1]). --export([start_kafka_test/0]). +-export([kafka_test/0]). -define(SERVER, ?MODULE). @@ -43,7 +43,7 @@ init([]) -> %% internal functions -start_kafka_test() -> +kafka_test() -> Endpoint = #endpoint{ id = 1, %% 全局唯一,在路由规则中通过名称来指定