iot/apps/iot/src/iot_mqtt_message_handler.erl
安礼成 d44889aff2 fix
2023-03-06 16:40:13 +08:00

171 lines
6.8 KiB
Erlang

%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 18. 2月 2023 21:39
%%%-------------------------------------------------------------------
-module(iot_mqtt_message_handler).
-author("aresei").
-include("iot.hrl").
%% API
-export([handle/2]).
handle(<<"server.register">>, Msg = #{<<"c_id">> := ClientId, <<"r">> := PubKey,
<<"m">> := #{<<"cpu_core">> := CpuCore, <<"memory">> := Memory, <<"disk">> := Disk, <<"boot_time">> := BootTime, <<"efka_version">> := EfkaVersion, <<"kernel_arch">> := KernelArch, <<"ipv4_1">> := Ip1, <<"ipv4_2">> := Ip2}}) ->
lager:debug("[iot_message_handler] get server register message: ~p", [Msg]),
Aes = iot_util:rand_bytes(16),
Host = #host{
host_id = ClientId,
aes = Aes,
metric = #host_metric{
%% cpu相关
cpus = [
#cpu_metric{
%% cpu编号
num = 1,
%% 负载
load = 0
},
#cpu_metric{
%% cpu编号
num = 2,
%% 负载
load = 0
}
],
%% cpu温度
cpu_temperature = 0,
%% 内存状态
memory = #memory_metric{
%% 使用量
used = 0,
%% 总量
total = Memory
},
%% 硬盘状态
disk = #disk_metric{
total = Disk
},
%% 接口状态
interfaces = []
},
activated_ts = iot_util:current_time(),
update_ts = iot_util:current_time(),
status = ?HOST_STATUS_INACTIVE
},
case host_model:add_host(Host) of
ok ->
Reply = #{
<<"a">> => true,
<<"aes">> => Aes,
<<"reply">> => <<"client.reply.", ClientId/binary>>
},
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
lager:debug("enc_reply is: ~p", [EncReply]);
{error, Reason} ->
lager:debug("register error is: ~p", [Reason]),
Reply = #{
<<"a">> => false,
<<"aes">> => <<"">>,
<<"reply">> => <<"client.reply.", ClientId/binary>>
},
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
lager:debug("enc_reply is: ~p", [EncReply])
end;
handle(<<"server.data">>, #{<<"c_id">> := HostId, <<"d">> := Data}) ->
case host_model:get_host(HostId) of
{ok, #host{aes = Aes}} ->
Services = service_model:get_host_services(HostId),
PlainData = iot_cipher_aes:decrypt(Aes, Aes, Data),
case jiffy:decode(PlainData, [return_maps]) of
Infos when is_list(Infos) ->
lager:debug("[iot_message_handler] the data is: ~p", [Infos]),
%% 一次可能提前多组数据
lists:foreach(fun(#{<<"service_name">> := ServiceName, <<"data">> := Items}) ->
case lists:search(fun(#service{name = Name}) -> Name =:= ServiceName end, Services) of
{value, #service{service_id = ServiceId, metrics = Metrics}} ->
%% 更新数据
NMetrics = lists:foldl(fun(MetricData, MetricsAcc) -> append_metric(MetricsAcc, MetricData) end, Metrics, Items),
case service_model:update_metric(ServiceId, NMetrics) of
ok ->
lager:debug("[iot_message_handler] update metrics success");
{ok, Reason} ->
lager:debug("[iot_message_handler] update metrics error: ~p", [Reason])
end;
false ->
lager:warning("[iot_message_handler] host_id: ~p, not found service_name: ~p", [HostId, ServiceName])
end
end, Infos);
_ ->
lager:debug("[iot_message_handler] the metric is invalid json")
end;
undefined ->
lager:warning("[iot_message_handler] host_id: ~p, not exists", [HostId])
end;
%% 处理服务器的ping
handle(<<"server.ping">>, #{<<"c">> := HostId, <<"at">> := At,
<<"h">> := #{<<"fm">> := FreeMemory, <<"fd">> := FreeDisk, <<"cp">> := CpuLoad}}) ->
case host_model:get_host(HostId) of
{ok, Host=#host{metric = Metric = #host_metric{disk = Disk, memory = Memory}}} ->
NMetric = Metric#host_metric{
disk = Disk#disk_metric{free = FreeDisk},
memory = Memory#memory_metric{free = FreeMemory}
},
case mnesia:transaction(fun() -> mnesia:write(host, Host#host{metric = NMetric}, write) end) of
{atomic, ok} ->
ok;
{error, Reason} ->
lager:warning("[iot_message_handler] host_id: ~p, ping get error: ~p", [HostId, Reason])
end;
undefined ->
lager:warning("[iot_message_handler] host_id: ~p, not exists", [HostId])
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% helper methods
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
append_metric(Metrics, MetricData) when is_map(MetricData) ->
Name = maps:get(<<"name">>, MetricData, <<"">>),
Symbol = maps:get(<<"symbol">>, MetricData, <<"">>),
Value = maps:get(<<"value">>, MetricData),
Ts = maps:get(<<"time">>, MetricData, 0),
case lists:any(fun(#service_metric{symbol = Symbol0}) -> Symbol =:= Symbol0 end, Metrics) of
true ->
lists:map(fun(Metric=#service_metric{symbol = Symbol0, queue = Q}) ->
case Symbol =:= Symbol0 of
true ->
Q1 = iot_util:queue_limited_in({Ts, Value}, Q, 100),
Metric#service_metric{
name = Name,
last_value = Value,
update_ts = Ts,
queue = Q1
};
false ->
Metric
end
end, Metrics);
false ->
Q0 = queue:new(),
Metric = #service_metric{
name = Name,
symbol = Symbol,
last_value = Value,
update_ts = Ts,
queue = queue:in({Ts, Value}, Q0)
},
Metrics ++ [Metric]
end.