diff --git a/apps/iot/src/endpoint/iot_zd_endpoint.erl b/apps/iot/src/endpoint/iot_zd_endpoint.erl index 254cb5f..743b7bc 100644 --- a/apps/iot/src/endpoint/iot_zd_endpoint.erl +++ b/apps/iot/src/endpoint/iot_zd_endpoint.erl @@ -22,13 +22,16 @@ %% 消息重发间隔 -define(RETRY_INTERVAL, 5000). +%% 最大数据缓冲量 +-define(MAX_QUEUE_SIZE, 5_000_000). + -record(state, { mqtt_opts = [], postman_pid :: undefined | pid(), logger_pid :: pid(), %% 数据缓存队列 - queue = queue:new(), + iot_queue, %% 定时器 timer_ref :: undefined | reference(), @@ -78,7 +81,7 @@ init([]) -> %% 启动日志记录器 {ok, LoggerPid} = iot_logger:start_link("north_data"), - {ok, disconnected, #state{mqtt_opts = Opts, queue = queue:new(), postman_pid = undefined, logger_pid = LoggerPid}}. + {ok, disconnected, #state{mqtt_opts = Opts, iot_queue = iot_queue:new(?MAX_QUEUE_SIZE), postman_pid = undefined, logger_pid = LoggerPid}}. %% @private %% @doc This function is called by a gen_statem when it needs to find out @@ -92,17 +95,23 @@ callback_mode() -> %% functions is called when gen_statem receives and event from %% call/2, cast/2, or as a normal process message. -handle_event(cast, {forward, LocationCode, DynamicLocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy, queue = Q}) -> - case format_data(LocationCode, DynamicLocationCode, Fields, Timestamp) of - {ok, Body} -> - %% 避免不必要的内部消息 - Actions = case StateName =:= connected andalso not IsBusy of - true -> [{next_event, info, fetch_next}]; - false -> [] - end, - {keep_state, State#state{queue = queue:in(Body, Q)}, Actions}; - error -> - {keep_state, State} +handle_event(cast, {forward, LocationCode, DynamicLocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy, iot_queue = Q}) -> + case iot_queue:is_full(Q) of + true -> + lager:debug("[iot_zd_endpoint] queue is full discard data: ~p", [{LocationCode, DynamicLocationCode, Fields, Timestamp}]), + {keep_state, State}; + false -> + case format_data(LocationCode, DynamicLocationCode, Fields, Timestamp) of + {ok, Body} -> + %% 避免不必要的内部消息 + Actions = case StateName =:= connected andalso not IsBusy of + true -> [{next_event, info, fetch_next}]; + false -> [] + end, + {keep_state, State#state{iot_queue = iot_queue:in(Body, Q)}, Actions}; + error -> + {keep_state, State} + end end; %% 触发读取下一条数据 @@ -111,14 +120,14 @@ handle_event(info, fetch_next, disconnected, State) -> {keep_state, State}; handle_event(info, fetch_next, connected, State = #state{is_busy = true}) -> {keep_state, State}; -handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPid, queue = Q}) -> - case queue:out(Q) of +handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPid, iot_queue = Q}) -> + case iot_queue:out(Q) of {{value, Body}, Q1} -> lager:debug("[iot_zd_endpoint] fetch_next success, north data is: ~p", [Body]), PostmanPid ! {post, self(), Body}, TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Body}), - {keep_state, State#state{queue = Q1, timer_ref = TimerRef, is_busy = true}}; + {keep_state, State#state{iot_queue = Q1, timer_ref = TimerRef, is_busy = true}}; {empty, _} -> {keep_state, State} end; @@ -161,10 +170,10 @@ handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{mq end; %% 获取当前统计信息 -handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, queue = Q}) -> +handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, iot_queue = Q}) -> Stat = #{ <<"acc_num">> => AccNum, - <<"queue_num">> => queue:len(Q), + <<"queue_num">> => iot_queue:len(Q), <<"state_name">> => atom_to_binary(StateName) }, {keep_state, State, [{reply, From, Stat}]}; diff --git a/apps/iot/src/struct/iot_queue.erl b/apps/iot/src/struct/iot_queue.erl index 517b022..968d42d 100644 --- a/apps/iot/src/struct/iot_queue.erl +++ b/apps/iot/src/struct/iot_queue.erl @@ -10,7 +10,7 @@ -author("anlicheng"). %% API --export([new/1, in/2, out/1]). +-export([new/1, in/2, out/1, is_full/1, len/1]). -record(iot_queue, { capacity = 1024, @@ -26,6 +26,14 @@ new(Capacity) when is_integer(Capacity) -> queue = queue:new() }. +-spec is_full(Iot_queue :: #iot_queue{}) -> boolean(). +is_full(#iot_queue{capacity = Capacity, len = Len}) -> + Len >= Capacity. + +-spec len(Iot_queue :: #iot_queue{}) -> integer(). +len(#iot_queue{len = Len}) -> + Len. + -spec in(Item :: any(), TQ :: #iot_queue{}) -> NQ :: #iot_queue{}. in(_Item, TQ = #iot_queue{capacity = Capacity, len = Len}) when Len >= Capacity -> TQ;