增加带容量限制的缓冲队列,避免内存耗尽

This commit is contained in:
anlicheng 2024-08-01 11:46:41 +08:00
parent 5d0c4a0900
commit e1e2a09b21
2 changed files with 36 additions and 19 deletions

View File

@ -22,13 +22,16 @@
%% %%
-define(RETRY_INTERVAL, 5000). -define(RETRY_INTERVAL, 5000).
%%
-define(MAX_QUEUE_SIZE, 5_000_000).
-record(state, { -record(state, {
mqtt_opts = [], mqtt_opts = [],
postman_pid :: undefined | pid(), postman_pid :: undefined | pid(),
logger_pid :: pid(), logger_pid :: pid(),
%% %%
queue = queue:new(), iot_queue,
%% %%
timer_ref :: undefined | reference(), timer_ref :: undefined | reference(),
@ -78,7 +81,7 @@ init([]) ->
%% %%
{ok, LoggerPid} = iot_logger:start_link("north_data"), {ok, LoggerPid} = iot_logger:start_link("north_data"),
{ok, disconnected, #state{mqtt_opts = Opts, queue = queue:new(), postman_pid = undefined, logger_pid = LoggerPid}}. {ok, disconnected, #state{mqtt_opts = Opts, iot_queue = iot_queue:new(?MAX_QUEUE_SIZE), postman_pid = undefined, logger_pid = LoggerPid}}.
%% @private %% @private
%% @doc This function is called by a gen_statem when it needs to find out %% @doc This function is called by a gen_statem when it needs to find out
@ -92,17 +95,23 @@ callback_mode() ->
%% functions is called when gen_statem receives and event from %% functions is called when gen_statem receives and event from
%% call/2, cast/2, or as a normal process message. %% call/2, cast/2, or as a normal process message.
handle_event(cast, {forward, LocationCode, DynamicLocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy, queue = Q}) -> handle_event(cast, {forward, LocationCode, DynamicLocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy, iot_queue = Q}) ->
case format_data(LocationCode, DynamicLocationCode, Fields, Timestamp) of case iot_queue:is_full(Q) of
{ok, Body} -> true ->
%% lager:debug("[iot_zd_endpoint] queue is full discard data: ~p", [{LocationCode, DynamicLocationCode, Fields, Timestamp}]),
Actions = case StateName =:= connected andalso not IsBusy of {keep_state, State};
true -> [{next_event, info, fetch_next}]; false ->
false -> [] case format_data(LocationCode, DynamicLocationCode, Fields, Timestamp) of
end, {ok, Body} ->
{keep_state, State#state{queue = queue:in(Body, Q)}, Actions}; %%
error -> Actions = case StateName =:= connected andalso not IsBusy of
{keep_state, State} true -> [{next_event, info, fetch_next}];
false -> []
end,
{keep_state, State#state{iot_queue = iot_queue:in(Body, Q)}, Actions};
error ->
{keep_state, State}
end
end; end;
%% %%
@ -111,14 +120,14 @@ handle_event(info, fetch_next, disconnected, State) ->
{keep_state, State}; {keep_state, State};
handle_event(info, fetch_next, connected, State = #state{is_busy = true}) -> handle_event(info, fetch_next, connected, State = #state{is_busy = true}) ->
{keep_state, State}; {keep_state, State};
handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPid, queue = Q}) -> handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPid, iot_queue = Q}) ->
case queue:out(Q) of case iot_queue:out(Q) of
{{value, Body}, Q1} -> {{value, Body}, Q1} ->
lager:debug("[iot_zd_endpoint] fetch_next success, north data is: ~p", [Body]), lager:debug("[iot_zd_endpoint] fetch_next success, north data is: ~p", [Body]),
PostmanPid ! {post, self(), Body}, PostmanPid ! {post, self(), Body},
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Body}), TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Body}),
{keep_state, State#state{queue = Q1, timer_ref = TimerRef, is_busy = true}}; {keep_state, State#state{iot_queue = Q1, timer_ref = TimerRef, is_busy = true}};
{empty, _} -> {empty, _} ->
{keep_state, State} {keep_state, State}
end; end;
@ -161,10 +170,10 @@ handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{mq
end; end;
%% %%
handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, queue = Q}) -> handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, iot_queue = Q}) ->
Stat = #{ Stat = #{
<<"acc_num">> => AccNum, <<"acc_num">> => AccNum,
<<"queue_num">> => queue:len(Q), <<"queue_num">> => iot_queue:len(Q),
<<"state_name">> => atom_to_binary(StateName) <<"state_name">> => atom_to_binary(StateName)
}, },
{keep_state, State, [{reply, From, Stat}]}; {keep_state, State, [{reply, From, Stat}]};

View File

@ -10,7 +10,7 @@
-author("anlicheng"). -author("anlicheng").
%% API %% API
-export([new/1, in/2, out/1]). -export([new/1, in/2, out/1, is_full/1, len/1]).
-record(iot_queue, { -record(iot_queue, {
capacity = 1024, capacity = 1024,
@ -26,6 +26,14 @@ new(Capacity) when is_integer(Capacity) ->
queue = queue:new() queue = queue:new()
}. }.
-spec is_full(Iot_queue :: #iot_queue{}) -> boolean().
is_full(#iot_queue{capacity = Capacity, len = Len}) ->
Len >= Capacity.
-spec len(Iot_queue :: #iot_queue{}) -> integer().
len(#iot_queue{len = Len}) ->
Len.
-spec in(Item :: any(), TQ :: #iot_queue{}) -> NQ :: #iot_queue{}. -spec in(Item :: any(), TQ :: #iot_queue{}) -> NQ :: #iot_queue{}.
in(_Item, TQ = #iot_queue{capacity = Capacity, len = Len}) when Len >= Capacity -> in(_Item, TQ = #iot_queue{capacity = Capacity, len = Len}) when Len >= Capacity ->
TQ; TQ;