From dcd23dd6dafa699ab9638443a271948b11cc6778 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 11 Sep 2024 14:07:21 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=B8=BB=E6=9C=BA=E7=9A=84?= =?UTF-8?q?=E7=A6=BB=E7=BA=BF=E5=92=8C=E5=9C=A8=E7=BA=BF=E5=91=8A=E8=AD=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/iot/src/iot_host.erl | 22 ++++++++++++--------- apps/iot/src/iot_watchdog.erl | 36 +++++++++++++++++++++++++++++++++-- 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index e4cf309..63e375d 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -32,7 +32,7 @@ -record(state, { host_id :: integer(), - name :: binary(), + name = <<"">> :: binary(), %% 从数据库里面读取到的数据 uuid :: binary(), %% aes的key, 后续通讯需要基于这个加密 @@ -341,7 +341,11 @@ handle_event({call, From}, {create_session, PubKey}, ?STATE_ACTIVATED, State = # Reply = #{<<"a">> => true, <<"aes">> => Aes}, EncReply = iot_cipher_rsa:encode(Reply, PubKey), {ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE), - warn_status(Name, <<"在线"/utf8>>), + + %% 上线提示 + Warn = format_warn(Name, <<"在线"/utf8>>), + catch iot_watchdog:delay_warn(Warn), + report_event(UUID, ?HOST_ONLINE), lager:debug("[iot_host] host_id(session) uuid: ~p, create_session, will change status, affected_row: ~p", [UUID, AffectedRow]), @@ -526,8 +530,11 @@ handle_event(info, {timeout, _, heartbeat_ticker}, _, State = #state{uuid = UUID lager:debug("[iot_host] host: ~p, host_maybe_offline, host now is offline, do nothing", [UUID]); ?HOST_ONLINE -> {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), - warn_status(Name, <<"离线"/utf8>>), - report_event(UUID, ?HOST_OFFLINE) + %% 离线告警 + Warn = format_warn(Name, <<"离线"/utf8>>), + iot_watchdog:warn(Warn), + + catch report_event(UUID, ?HOST_OFFLINE) end, %% 关闭channel,主机需要重新连接,才能保存状态的一致 @@ -635,11 +642,8 @@ report_event(UUID, NewStatus) when is_binary(UUID), is_integer(NewStatus) -> lager:debug("[iot_host] host_uuid: ~p, route fields: ~p", [UUID, FieldsList]). %% 报警主机的状态 -warn_status(Name, Status) when is_binary(Name), is_binary(Status) -> - Warn = iolist_to_binary([<<"主机: "/utf8>>, Name, <<" || ">>, Status]), - iot_watchdog:warn(Warn); -warn_status(_, _) -> - ok. +format_warn(Name, Status) when is_binary(Name), is_binary(Status) -> + iolist_to_binary([<<"主机: "/utf8>>, Name, <<" || ">>, Status]). %% 将当前的state转换成map state_map(#state{host_id = HostId, uuid = UUID, aes = Aes, has_session = HasSession, heartbeat_counter = HeartbeatCounter, channel_pid = ChannelPid, metrics = Metrics}) -> diff --git a/apps/iot/src/iot_watchdog.erl b/apps/iot/src/iot_watchdog.erl index 83310cd..3f6b500 100644 --- a/apps/iot/src/iot_watchdog.erl +++ b/apps/iot/src/iot_watchdog.erl @@ -13,7 +13,7 @@ %% API -export([start_link/0]). --export([detection/3, warn/1]). +-export([detection/3, warn/1, delay_warn/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -23,6 +23,9 @@ %% 系统id -define(SYS_ID, <<"ZNWLZJJKXT1">>). +%% warn相关的消息每10分钟最多发送一次 +-define(WARN_TICKER, 600_000). + %% 限制器 -record(limiter, { cpu_temperature = 0 :: integer(), @@ -38,7 +41,8 @@ users :: binary(), %% 格式: "S123, S1234" pri_key :: public_key:private_key(), %% 消息发送的频率限制,格式: #{uuid => #limiter{}} - limiters = #{} + limiters = #{}, + warn_buf = [] }). %%%=================================================================== @@ -55,6 +59,10 @@ detection(HostUUID, Name, _) when is_binary(HostUUID), is_binary(Name) -> warn(Warn) when is_binary(Warn) -> gen_server:cast(?SERVER, {warn, Warn}). +-spec delay_warn(Warn :: binary()) -> no_return(). +delay_warn(Warn) when is_binary(Warn) -> + gen_server:cast(?SERVER, {delay_warn, Warn}). + %% @doc Spawns the server and registers the local name (unique) -spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). @@ -84,6 +92,8 @@ init([]) -> {ok, LoggerPid} = iot_logger:start_link("watchdog_data"), PriKey = generate_private_key(PriFile), + erlang:start_timer(?WARN_TICKER, self(), warn_ticker), + {ok, #state{logger_pid = LoggerPid, pri_key = PriKey, report_interval = ReportInterval, url = Url, users = Users, limiters = #{}, guard_items = GuardItems}}. %% @private @@ -134,6 +144,10 @@ handle_cast({detection, HostUUID, Name, Metric}, {noreply, State#state{limiters = maps:put(HostUUID, NLimiter, Limiters)}}; +%% 可延时推送的告警信息 +handle_cast({delay_warn, Warn}, State = #state{warn_buf = WarnBuf}) -> + {noreply, State#state{warn_buf = [Warn|WarnBuf]}}; + handle_cast({warn, Warn}, State = #state{url = Url, users = Users, pri_key = PriKey, logger_pid = LoggerPid}) -> Body = format_warn(Warn, Users, PriKey), case catch do_post(Url, Body) of @@ -150,6 +164,24 @@ handle_cast({warn, Warn}, State = #state{url = Url, users = Users, pri_key = Pri {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). +handle_info({timeout, _, warn_ticker}, State = #state{warn_buf = WarnBuf, url = Url, users = Users, pri_key = PriKey, logger_pid = LoggerPid}) -> + erlang:start_timer(?WARN_TICKER, self(), warn_ticker), + case length(WarnBuf) > 0 of + true -> + Warn0 = hd(WarnBuf), + Warn = iolist_to_binary([Warn0, <<"(累计: ">>, length(WarnBuf), <<")">>]), + Body = format_warn(Warn, Users, PriKey), + case catch do_post(Url, Body) of + {ok, Resp} -> + iot_logger:write(LoggerPid, [Body, Resp]); + {error, Reason} -> + lager:warning("[iot_watchdog] url: ~p, send body: ~ts, get error: ~p", [Url, Body, Reason]) + end; + false -> + ok + end, + {noreply, State#state{warn_buf = []}}; + handle_info(_Info, State = #state{}) -> {noreply, State}.