diff --git a/apps/iot/src/endpoint/iot_zd_endpoint.erl b/apps/iot/src/endpoint/iot_zd_endpoint.erl index ecff2bd..dba6e6c 100644 --- a/apps/iot/src/endpoint/iot_zd_endpoint.erl +++ b/apps/iot/src/endpoint/iot_zd_endpoint.erl @@ -25,6 +25,9 @@ %% 最大数据缓冲量 -define(MAX_QUEUE_SIZE, 5_000_000). +%% 期望的数据总量 +-define(DESIRED_VALUE, 400_0000). + %% 处理日志信息 -define(log(Msg), north_data:info(Msg)). @@ -37,7 +40,10 @@ %% 定时器 timer_ref :: undefined | reference(), %% 是否繁忙 - is_busy = false :: boolean() + is_busy = false :: boolean(), + + %% 单位时间内的累积值 + acc_num = 0 }). %%%=================================================================== @@ -81,6 +87,9 @@ init([]) -> %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 erlang:start_timer(0, self(), create_postman), + %% 检测每个小时内的数据的异常变化 + erlang:start_timer(3600, self(), check_desired_ticker), + {ok, disconnected, #state{mqtt_opts = Opts, iot_queue = iot_queue:new(?MAX_QUEUE_SIZE), postman_pid = undefined}}. %% @private @@ -132,7 +141,7 @@ handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPi end; %% 收到确认消息 -handle_event(info, {ack, AssocMessage}, StateName, State = #state{timer_ref = TimerRef}) -> +handle_event(info, {ack, AssocMessage}, StateName, State = #state{timer_ref = TimerRef, acc_num = AccNum}) -> %% 记录日志信息 ?log(iolist_to_binary(AssocMessage)), @@ -145,7 +154,7 @@ handle_event(info, {ack, AssocMessage}, StateName, State = #state{timer_ref = Ti Key = get_counter_key(iot_util:date()), mnesia_counter:inc(Key), - {keep_state, State#state{timer_ref = undefined, is_busy = false}, Actions}; + {keep_state, State#state{timer_ref = undefined, is_busy = false, acc_num = AccNum + 1}, Actions}; %% 收到重发过期请求 handle_event(info, {timeout, _, {repost_ticker, Body}}, connected, State = #state{postman_pid = PostmanPid}) -> @@ -171,6 +180,20 @@ handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{mq {keep_state, State#state{postman_pid = undefined}} end; +%% 检测每个小时内的数据增量 +handle_event(info, {timeout, _, check_desired_ticker}, _, State = #state{acc_num = AccNum}) -> + HourDesiredNum = ?DESIRED_VALUE div 24, + %% 允许25%的的波动 + case HourDesiredNum >= AccNum orelse HourDesiredNum - AccNum < erlang:ceil(HourDesiredNum * 0.25) of + true -> + ok; + false -> + %% 报警,数据下降异常 + iot_watchdog:warn(iolist_to_binary([<<"中电数据异常:"/utf8>>, integer_to_binary(AccNum), <<"/h">>])) + end, + erlang:start_timer(3600, self(), check_desired_ticker), + {keep_state, State#state{acc_num = 0}}; + %% 获取当前统计信息 handle_event({call, From}, get_stat, StateName, State = #state{iot_queue = Q}) -> Key = get_counter_key(iot_util:date()),