增加报警逻辑

This commit is contained in:
anlicheng 2024-09-02 16:47:58 +08:00
parent bdaaf45604
commit 02c48495bf
10 changed files with 381 additions and 44 deletions

View File

@ -0,0 +1 @@
MIICdwIBADANBgkqhkiG9w0BAQEFAASCAmEwggJdAgEAAoGBAMthPZx2cBn1bhLr0ezb+sju5+eAaaUW2bnVd6A16kZmbK2HqNoTDVN/0Np2CrzLwDzz+es0NlsVSLlU6VgEXLLheunGfVHQSfOtEjuhMYGue8zYLkGPIulVyz3G/HKKJlvEHWg2AOmxGiTFWkWLhEEAcKuWGRXX531Y1lugDhVXAgMBAAECgYAVGJsZdbefGbQ3RjraRN84pH9tpgZPV5VyD1B/T9hjQRCY12+OX1eMh7+USZZHiyL/r5cG7L2OqSCICOTDeoBa5mYuTwjU1GrOWvcM5KFtffvgJh6CLBt8doy6kvGpI1TtDAYLYnC/kXEj1bTctYJSjik4nNxq2hlKOSMjYQCP4QJBAOypsK7hn70nqlMKWsLrqKiQk/SEnAqZVuyF8ijltmyaiXbbYqgb3uqLsPwLglMixbWudpI9WlEZJ31ZSm1EQnkCQQDb/11/4NdPLLn1R602m1O8SIi5iT/klBg/MwbKcPXrwnyhpXYiQ0MuKjvLY+kqaI8U/cDMiZLC0+WLc47SFaJPAkAFg6KpQHqPVrhCPRCGUCVa4BuCbACSwqXi1vAggdGxUBx513zB+/xCrMSP6ti7hSjkJc9/csyC7TodUpJIX3nJAkEApfSndbYI9eMJjKw7UDwwGJKnAT82AZpuCA7YCpLYELcwCUimKwaOgR6+6Jk/5QvnE7CwZhtJqMrsGBxeGsDTJQJBAOO4xzWNMarCxI8e6/PF04FRU1spvPQqCuHUH2W0obA1p00eSJg/srOSNrsZpvYAVsTtvHc+KMq2lTGixHQlZ3k=

View File

@ -195,7 +195,7 @@ format_events(LocationCode, DynamicLocationCode, EventType, #{<<"event_code">> :
},
ReqData = #{
<<"sign">> => sign(DeviceInfo, PriKey),
<<"sign">> => iot_jinzhi_signer:sign(DeviceInfo, PriKey),
<<"sysId">> => ?SYS_ID,
<<"taskId">> => generate_task_id(DynamicLocationCode, EventCode),
<<"count">> => 1,
@ -223,41 +223,3 @@ generate_private_key(PriFile) when is_list(PriFile) ->
{ok, PriKeyData} = file:read_file(PriKeyFile),
PriDerData = base64:decode(PriKeyData),
public_key:der_decode('PrivateKeyInfo', PriDerData).
%%
-spec sign(M :: #{}, PrivateKey :: public_key:private_key()) -> binary().
sign(M, PrivateKey) when is_map(M) ->
Json = serialize(M),
Hash = iolist_to_binary(io_lib:format("~64.16.0b", [binary:decode_unsigned(crypto:hash(sha256, Json))])),
RsaEncoded = public_key:encrypt_private(Hash, PrivateKey),
base64:encode(RsaEncoded).
%% sign签名
-spec serialize(M :: map()) -> JsonString :: binary().
serialize(M) when is_map(M) ->
L = maps:to_list(M),
L1 = lists:sort(fun({K, _}, {K1, _}) -> K < K1 end, L),
serialize(L1, []).
serialize([], Target) ->
B = iolist_to_binary(lists:join(<<$,>>, lists:reverse(Target))),
<<${, B/binary, $}>>;
serialize([{K, V}|T], Target) ->
V1 = if
is_integer(V) ->
integer_to_binary(V);
is_float(V) ->
float_to_binary(V);
is_binary(V) ->
<<$", V/binary, $">>;
is_boolean(V) andalso V ->
<<"true">>;
is_boolean(V) andalso not V ->
<<"false">>;
is_list(V) ->
Items = lists:map(fun(E) -> serialize(E) end, V),
V0 = iolist_to_binary(lists:join(<<$,>>, Items)),
<<$[, V0/binary, $]>>
end,
Item = <<$", K/binary, $", $:, V1/binary>>,
serialize(T, [Item|Target]).

View File

@ -29,7 +29,13 @@ start(_StartType, _StartArgs) ->
%% buckets
SuperDeviceUUIDs = iot_env:get_value(super_device_uuids, []),
influx_client:create_all_buckets(SuperDeviceUUIDs),
case is_list(SuperDeviceUUIDs) andalso length(SuperDeviceUUIDs) > 0 of
true ->
Result = catch influx_client:create_all_buckets(SuperDeviceUUIDs),
lager:debug("[iot_app] create_all_buckets: ~p, result: ~p", [SuperDeviceUUIDs, Result]);
false ->
ok
end,
{ok, SupPid}.

View File

@ -32,6 +32,7 @@
-record(state, {
host_id :: integer(),
name :: binary(),
%%
uuid :: binary(),
%% aes的key,
@ -188,7 +189,7 @@ start_link(Name, UUID) when is_atom(Name), is_binary(UUID) ->
%% process to initialize.
init([UUID]) ->
case host_bo:get_host_by_uuid(UUID) of
{ok, #{<<"id">> := HostId, <<"authorize_status">> := AuthorizeStatus}} ->
{ok, #{<<"id">> := HostId, <<"authorize_status">> := AuthorizeStatus, <<"name">> := Name}} ->
%% host_id注册别名, HostPid
AliasName = get_alias_name(HostId),
global:register_name(AliasName, self()),
@ -201,7 +202,7 @@ init([UUID]) ->
true -> ?STATE_ACTIVATED;
false -> ?STATE_DENIED
end,
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, has_session = false}};
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, name = Name, has_session = false}};
undefined ->
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
ignore
@ -393,11 +394,13 @@ handle_event(cast, {handle, {data, Data}}, ?STATE_ACTIVATED, State = #state{aes
{keep_state, State};
%% ping的数据是通过aes加密后的
handle_event(cast, {handle, {ping, CipherMetric}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = AES, has_session = true}) ->
handle_event(cast, {handle, {ping, CipherMetric}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, name = Name, aes = AES, has_session = true}) ->
MetricsInfo = iot_cipher_aes:decrypt(AES, CipherMetric),
case catch jiffy:decode(MetricsInfo, [return_maps]) of
Metrics when is_map(Metrics) ->
lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]),
%%
iot_watchdog:detection(UUID, Name, Metrics),
{keep_state, State#state{metrics = Metrics}};
Other ->
lager:warning("[iot_host] host_id: ~p, ping is invalid json: ~p", [UUID, Other]),

View File

@ -28,6 +28,15 @@ start_link() ->
init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
Specs = [
#{
id => iot_watchdog,
start => {'iot_watchdog', start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['iot_watchdog']
},
#{
id => 'iot_task',
start => {'iot_task', start_link, []},

View File

@ -0,0 +1,271 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2024, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 30. 8 2024 17:45
%%%-------------------------------------------------------------------
-module(iot_watchdog).
-author("anlicheng").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([detection/3]).
-export([test/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
%% id
-define(SYS_ID, <<"ZNWLZJJKXT1">>).
%%
-record(limiter, {
cpu_temperature = 0 :: integer(),
disk = 0 :: integer(),
memory = 0 :: integer()
}).
-record(state, {
guard_items = [],
logger_pid :: pid(),
report_interval = 60,
url :: string(),
users :: binary(), %% : "S123, S1234"
pri_key :: public_key:private_key(),
%% : #{uuid => #limiter{}}
limiters = #{}
}).
test() ->
Metric = #{
<<"cpu_temperature">> => 90.5,
<<"disk">> => #{
<<"total">> => 7129,
<<"used">> => 345
},
<<"memory">> => #{
<<"total">> => 990,
<<"used">> => 900
}
},
detection(<<"1234">>, <<"test1234">>, Metric).
%%%===================================================================
%%% API
%%%===================================================================
-spec detection(HostUUID :: binary(), Name :: binary(), Metric :: map()) -> no_return().
detection(HostUUID, Name, Metric) when is_binary(HostUUID), is_binary(Name), is_map(Metric) ->
gen_server:cast(?SERVER, {detection, HostUUID, Name, Metric});
detection(HostUUID, Name, _) when is_binary(HostUUID), is_binary(Name) ->
ok.
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, Opts} = application:get_env(iot, watchdog),
PriFile = proplists:get_value(pri_key, Opts),
Url = proplists:get_value(url, Opts),
ReportInterval = proplists:get_value(report_interval, Opts, 60),
GuardItems = proplists:get_value(guard_items, Opts),
Users0 = proplists:get_value(users, Opts),
Users = iolist_to_binary(lists:join(",", Users0)),
{ok, LoggerPid} = iot_logger:start_link("watchdog_data"),
PriKey = generate_private_key(PriFile),
{ok, #state{logger_pid = LoggerPid, pri_key = PriKey, report_interval = ReportInterval, url = Url, users = Users, limiters = #{}, guard_items = GuardItems}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({detection, HostUUID, Name, Metric},
State = #state{url = Url, users = Users, pri_key = PriKey, limiters = Limiters, guard_items = Config, logger_pid = LoggerPid, report_interval = ReportInterval}) ->
Limiter = maps:get(HostUUID, Limiters, #limiter{cpu_temperature = 0, disk = 0, memory = 0}),
{Warnings, NLimiter} = lists:foldl(fun(F, {W0, L0}) ->
case F(Metric, Config, ReportInterval, L0) of
ok ->
{W0, L0};
{error, Msg0, L1} ->
{[Msg0|W0], L1}
end
end, {[], Limiter}, [fun check_cpu_temperature/4, fun check_disk/4, fun check_memory/4]),
case length(Warnings) > 0 of
true ->
Subject = iolist_to_binary([<<"主机: "/utf8>>, Name, <<"(">>, HostUUID, <<") ">>, lists:join(<<"|">>, lists:reverse(Warnings))]),
Body = format_warn(Subject, Users, PriKey),
case catch do_post(Url, Body) of
{ok, Resp} ->
lager:debug("[iot_watchdog] url: ~p, body: ~ts, resp: ~ts", [Url, Body, Resp]),
iot_logger:write(LoggerPid, [Body, Resp]);
{error, Reason} ->
lager:warning("[iot_watchdog] url: ~p, send body: ~ts, get error: ~p", [Url, Body, Reason])
end;
false ->
ok
end,
{noreply, State#state{limiters = maps:put(HostUUID, NLimiter, Limiters)}}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec format_warn(Subject :: binary(), Users :: binary(), PriKey :: public_key:private_key()) -> binary().
format_warn(Subject, Users, PriKey) when is_binary(Subject), is_binary(Users) ->
PostParams = #{
<<"target">> => <<"3">>,
<<"bizId">> => <<"">>,
<<"userId">> => Users,
<<"subject">> => Subject,
<<"mobileUrl">> => <<"">>,
<<"url">> => <<"">>,
<<"applier">> => <<"李睿"/utf8>>,
<<"creatorId">> => <<"S20232026">>,
<<"submitTime">> => format_date(),
<<"sysName">> => <<"后勤监管"/utf8>>,
<<"nodeName">> => <<"主机监控"/utf8>>,
<<"sysId">> => ?SYS_ID
},
Sign = iot_jinzhi_signer:sign(PostParams, PriKey),
jiffy:encode(PostParams#{<<"sign">> => Sign}, [force_utf8]).
-spec do_post(Url :: string(), Body :: binary()) -> {ok, Resp :: any()} | {error, Reason :: binary()}.
do_post(Url, Body) when is_list(Url), is_binary(Body) ->
Headers = [
{<<"content-type">>, <<"application/json">>}
],
case hackney:request(post, Url, Headers, Body, [{pool, false}, {connect_timeout, 5000}]) of
{ok, 200, _, ClientRef} ->
{ok, RespBody} = hackney:body(ClientRef),
hackney:close(ClientRef),
{ok, RespBody};
{ok, HttpCode, _, ClientRef} ->
{ok, RespBody} = hackney:body(ClientRef),
hackney:close(ClientRef),
{error, {HttpCode, RespBody}};
{error, Reason} ->
{error, Reason}
end.
-spec check_cpu_temperature(map(), Config :: list(), ReportInterval :: integer(), Limiter :: #limiter{}) -> ok | {error, Warn :: binary()}.
check_cpu_temperature(#{<<"cpu_temperature">> := CpuTemperature}, Config, ReportInterval, Limiter = #limiter{cpu_temperature = TTL0}) ->
Threshold = proplists:get_value(cpu_temperature, Config),
Timestamp = iot_util:timestamp_of_seconds(),
case is_number(CpuTemperature) andalso CpuTemperature > Threshold of
true when TTL0 == 0 orelse TTL0 < Timestamp ->
Msg = unicode:characters_to_binary(io_lib:format("CPU温度超过:~p°C", [CpuTemperature])),
{error, Msg, Limiter#limiter{cpu_temperature = Timestamp + ReportInterval}};
_ ->
ok
end.
-spec check_disk(map(), Config :: list(), ReportInterval :: integer(), Limiter :: #limiter{}) -> ok | {error, Warn :: binary()}.
check_disk(#{<<"disk">> := #{<<"total">> := Total, <<"used">> := Free}}, Config, ReportInterval, Limiter = #limiter{disk = TTL0}) ->
Threshold = proplists:get_value(disk_used_percentage, Config),
Timestamp = iot_util:timestamp_of_seconds(),
Used = Total - Free,
case Total > 0 andalso (Used / Total) * 100 > Threshold of
true when TTL0 == 0 orelse TTL0 < Timestamp ->
Val = erlang:trunc((Used / Total) * 100),
Msg = unicode:characters_to_binary(io_lib:format("硬盘使用超过:~p%", [Val])),
{error, Msg, Limiter#limiter{disk = Timestamp + ReportInterval}};
_ ->
ok
end.
-spec check_memory(map(), Config :: list(), ReportInterval :: integer(), Limiter :: #limiter{}) -> ok | {error, Warn :: binary()}.
check_memory(#{<<"memory">> := #{<<"total">> := Total, <<"used">> := Used}}, Config, ReportInterval, Limiter = #limiter{memory = TTL0}) ->
Threshold = proplists:get_value(memory_used_percentage, Config),
Timestamp = iot_util:timestamp_of_seconds(),
case Total > 0 andalso (Used / Total) * 100 > Threshold of
true when TTL0 == 0 orelse TTL0 < Timestamp ->
Val = erlang:trunc((Used / Total) * 100),
Msg = unicode:characters_to_binary(io_lib:format("内存使用超过:~p%", [Val])),
{error, Msg, Limiter#limiter{memory = Timestamp + ReportInterval}};
_ ->
ok
end.
-spec generate_private_key(PriFile :: string()) -> public_key:private_key().
generate_private_key(PriFile) when is_list(PriFile) ->
PriKeyFile = code:priv_dir(iot) ++ "/" ++ PriFile,
%%
{ok, PriKeyData} = file:read_file(PriKeyFile),
PriDerData = base64:decode(PriKeyData),
public_key:der_decode('PrivateKeyInfo', PriDerData).
-spec format_date() -> binary().
format_date() ->
{Date, Time} = calendar:now_to_local_time(os:timestamp()),
{{Year, Month, Day}, {Hour, Minute, Second}} = {Date, Time},
DateTime = io_lib:format("~4..0B-~2..0B-~2..0B ~2..0B:~2..0B:~2..0B", [Year, Month, Day, Hour, Minute, Second]),
list_to_binary(DateTime).

View File

@ -0,0 +1,51 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2024, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 02. 9 2024 11:28
%%%-------------------------------------------------------------------
-module(iot_jinzhi_signer).
-author("anlicheng").
%% API
-export([sign/2]).
%%
-spec sign(M :: #{}, PrivateKey :: public_key:private_key()) -> binary().
sign(M, PrivateKey) when is_map(M) ->
Json = serialize(M),
Hash = iolist_to_binary(io_lib:format("~64.16.0b", [binary:decode_unsigned(crypto:hash(sha256, Json))])),
RsaEncoded = public_key:encrypt_private(Hash, PrivateKey),
base64:encode(RsaEncoded).
%% sign签名
-spec serialize(M :: map()) -> JsonString :: binary().
serialize(M) when is_map(M) ->
L = maps:to_list(M),
L1 = lists:sort(fun({K, _}, {K1, _}) -> K < K1 end, L),
serialize(L1, []).
serialize([], Target) ->
B = iolist_to_binary(lists:join(<<$,>>, lists:reverse(Target))),
<<${, B/binary, $}>>;
serialize([{K, V}|T], Target) ->
V1 = if
is_integer(V) ->
integer_to_binary(V);
is_float(V) ->
float_to_binary(V);
is_binary(V) ->
<<$", V/binary, $">>;
is_boolean(V) andalso V ->
<<"true">>;
is_boolean(V) andalso not V ->
<<"false">>;
is_list(V) ->
Items = lists:map(fun(E) -> serialize(E) end, V),
V0 = iolist_to_binary(lists:join(<<$,>>, Items)),
<<$[, V0/binary, $]>>
end,
Item = <<$", K/binary, $", $:, V1/binary>>,
serialize(T, [Item|Target]).

View File

@ -16,6 +16,18 @@
{api_url, "http://39.98.184.67:8800"},
{watchdog, [
{pri_key, "jinzhi_watchdog_pri.key"},
{url, "http://172.30.37.242:8080/hqtaskcenterapp/sys/taskCenter/taskReceive/sendNotice.do"},
{users, ["S20232026", "S20232321", "S20232323"]},
{report_interval, 60},
{guard_items, [
{cpu_temperature, 60},
{disk_used_percentage, 60},
{memory_used_percentage, 60}
]}
]},
%% 目标服务器地址
{emqx_server, [
{host, {39, 98, 184, 67}},

View File

@ -14,6 +14,17 @@
%% 数据的最大缓存量
{device_cache_size, 200},
{watchdog, [
{pri_key, "jinzhi_watchdog_pri.key"},
{url, "http://172.30.37.242:8080/hqtaskcenterapp/sys/taskCenter/taskReceive/sendNotice.do"},
{users, ["S20232026", "S20232321", "S20232323"]},
{guard_items, [
{cpu_temperature, 60},
{disk_used_percentage, 60},
{memory_used_percentage, 60}
]}
]},
%% 权限检验时的预埋token
{pre_tokens, [
{<<"test">>, <<"iot2023">>}

View File

@ -14,6 +14,17 @@
%% 数据的最大缓存量
{device_cache_size, 200},
{watchdog, [
{pri_key, "jinzhi_watchdog_pri.key"},
{url, "http://172.30.37.242:8080/hqtaskcenterapp/sys/taskCenter/taskReceive/sendNotice.do"},
{users, ["S20232026", "S20232321", "S20232323"]},
{guard_items, [
{cpu_temperature, 60},
{disk_used_percentage, 60},
{memory_used_percentage, 60}
]}
]},
%% 权限检验时的预埋token
{pre_tokens, [
{<<"test">>, <<"iot2023">>}