fix endpoint_buffer

This commit is contained in:
anlicheng 2025-08-19 00:13:29 +08:00
parent d54e3852bb
commit 0dae29b5a9
3 changed files with 18 additions and 13 deletions

View File

@ -71,21 +71,19 @@ trigger_n(Buffer = #buffer{window_size = WindowSize}) ->
%% %%
-spec trigger_next(Buffer :: #buffer{}) -> NBuffer :: #buffer{}. -spec trigger_next(Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid, flight_num = FlightNum}) -> trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid, flight_num = FlightNum}) ->
case ets:next(Tid, Cursor) of case ets:first(Tid) of
'$end_of_table' -> '$end_of_table' ->
Buffer; Buffer;
NKey -> Key ->
lager:debug("nkey: ~p", [NKey]), [#north_data{id = Id, tuple = Tuple}|_] = ets:take(Tid, Key),
[#north_data{id = Id, tuple = Tuple}] = ets:lookup(Tid, NKey),
ReceiverPid = self(), ReceiverPid = self(),
ReceiverPid ! {next_data, Id, Tuple}, ReceiverPid ! {next_data, Id, Tuple},
endpoint_timer:task(TimerPid, Id, fun() -> ReceiverPid ! {next_data, Id, Tuple} end), endpoint_timer:task(TimerPid, Id, fun() -> ReceiverPid ! {next_data, Id, Tuple} end),
Buffer#buffer{flight_num = FlightNum + 1} Buffer#buffer{flight_num = FlightNum + 1, cursor = Cursor + 1}
end. end.
-spec ack(Id :: integer(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}. -spec ack(Id :: integer(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
ack(Id, Buffer = #buffer{tid = Tid, timer_pid = TimerPid, acc_num = AccNum, flight_num = FlightNum}) when is_integer(Id) -> ack(Id, Buffer = #buffer{timer_pid = TimerPid, acc_num = AccNum, flight_num = FlightNum}) when is_integer(Id) ->
true = ets:delete(Tid, Id),
endpoint_timer:ack(TimerPid, Id), endpoint_timer:ack(TimerPid, Id),
trigger_next(Buffer#buffer{acc_num = AccNum + 1, flight_num = FlightNum - 1}). trigger_next(Buffer#buffer{acc_num = AccNum + 1, flight_num = FlightNum - 1}).

View File

@ -97,7 +97,7 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS
ClientConfig = [ ClientConfig = [
{reconnect_cool_down_seconds, 5}, {reconnect_cool_down_seconds, 5},
%{sasl, {Mechanism, Username, Password}}, {sasl, {Mechanism, Username, Password}},
{socket_options, [ {socket_options, [
{keepalive, true} {keepalive, true}
]} ]}
@ -125,8 +125,7 @@ handle_info({next_data, Id, {_ServiceId, Metric}}, State = #state{status = ?CONN
lager:debug("[endpoint_kafka] ack partion: ~p, offset: ~p", [Partition, BaseOffset]), lager:debug("[endpoint_kafka] ack partion: ~p, offset: ~p", [Partition, BaseOffset]),
ReceiverPid ! {ack, Id} ReceiverPid ! {ack, Id}
end, end,
Res = brod:produce_cb(ClientPid, Topic, random, <<>>, Metric, AckCb), _ = brod:produce_cb(ClientPid, Topic, random, <<>>, Metric, AckCb),
lager:debug("[endpoint_kafka] produce_cb res: ~p", [Res]),
{noreply, State}; {noreply, State};
@ -169,4 +168,12 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%%=================================================================== %%%===================================================================
retry_connect() -> retry_connect() ->
erlang:start_timer(?RETRY_INTERVAL, self(), connect). erlang:start_timer(?RETRY_INTERVAL, self(), connect).
check_produce_result(ok) ->
true;
check_produce_result({ok, _}) ->
true;
check_produce_result({ok, _}) ->
false.

View File

@ -12,7 +12,7 @@
-export([ensured_endpoint_started/1, delete_endpoint/1]). -export([ensured_endpoint_started/1, delete_endpoint/1]).
-export([init/1]). -export([init/1]).
-export([start_kafka_test/0]). -export([kafka_test/0]).
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
@ -43,7 +43,7 @@ init([]) ->
%% internal functions %% internal functions
start_kafka_test() -> kafka_test() ->
Endpoint = #endpoint{ Endpoint = #endpoint{
id = 1, id = 1,
%% %%