From 375db51b9ce56e79df4cb391b44e2e866047f7dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Wed, 22 Feb 2023 17:17:45 +0800 Subject: [PATCH] fix microservice --- apps/iot/src/iot_message_handler.erl | 61 +++++++++++++++++++++-- apps/iot/src/model/microservice_model.erl | 46 +++++++++++++++++ apps/iot/src/model/terminal_model.erl | 48 +----------------- 3 files changed, 105 insertions(+), 50 deletions(-) diff --git a/apps/iot/src/iot_message_handler.erl b/apps/iot/src/iot_message_handler.erl index 233726a..3ae1e0f 100644 --- a/apps/iot/src/iot_message_handler.erl +++ b/apps/iot/src/iot_message_handler.erl @@ -80,15 +80,70 @@ handle(<<"server.register">>, Msg = #{<<"c_id">> := ClientId, <<"r">> := PubKey, handle(<<"server.data">>, #{<<"c_id">> := HostId, <<"d">> := Data}) -> {ok, Host = #host{aes = Aes}} = host_model:get_host(HostId), + Services = microservice_model:get_services(HostId), PlainData = iot_cipher_aes:decrypt(Aes, Aes, Data), case jiffy:decode(PlainData, [return_maps]) of - Metrics when is_list(Metrics) -> - lager:debug("the metric is: ~p", [Metrics]), + Infos when is_list(Infos) -> + lager:debug("the data is: ~p", [Infos]), + + %% 一次可能提前多组数据 + lists:foreach(fun(#{<<"service_name">> := ServiceName, <<"time">> := Time, <<"data">> := Items}) -> + case lists:search(fun(#microservice{name = Name}) -> Name =:= ServiceName end, Services) of + {value, Service=#microservice{metrics = Metrics}} -> + %% 更新数据 + + + + + ok; + false -> + lager:warning("[iot_message_handler] host_id: ~p, not found service_name: ~p", [HostId, ServiceName]) + + end + + + + end, Infos), + + + + + ok; _ -> lager:debug("the metric is invalid json") end, - ok. \ No newline at end of file + ok. + + +append_metric(Metrics, MetricName, MetricSymbol, MetricValue, Ts) -> + case lists:any(fun(#service_metric{symbol = Symbol}) -> Symbol =:= MetricSymbol end, Metrics) of + true -> + lists:map(fun(Metric=#service_metric{symbol = Symbol, queue = Q}) -> + case Symbol =:= MetricSymbol of + true -> + Q1 = iot_util:queue_limited_in({Ts, MetricValue}, Q, 100), + Metric#service_metric{ + name = MetricName, + last_value = MetricValue, + update_ts = Ts, + queue = Q1 + }; + false -> + Metric + end + end, Metrics); + false -> + Q0 = queue:new(), + Metric = #service_metric{ + name = MetricName, + symbol = MetricSymbol, + last_value = MetricValue, + update_ts = Ts, + queue = queue:in({Ts, MetricValue}, Q0) + }, + Metrics ++ [Metric] + end. diff --git a/apps/iot/src/model/microservice_model.erl b/apps/iot/src/model/microservice_model.erl index 8479cc0..c968474 100644 --- a/apps/iot/src/model/microservice_model.erl +++ b/apps/iot/src/model/microservice_model.erl @@ -63,6 +63,52 @@ change_status(ServiceId, Status) when is_binary(ServiceId), is_integer(Status) - {error, Reason} end. +update_metric(TerminalId, MetricName, MetricSymbol, MetricValue) when is_binary(TerminalId), is_binary(MetricName), is_binary(MetricSymbol), is_number(MetricValue) -> + Fun = fun() -> + case mnesia:read(terminal, TerminalId) of + [] -> + mnesia:abort(<<"terminal not found">>); + [Terminal = #terminal{metrics = Metrics}] -> + Ts = iot_util:current_time(), + NMetrics = case lists:any(fun(#service_metric{symbol = Symbol}) -> Symbol =:= MetricSymbol end, Metrics) of + true -> + lists:map(fun(Metric=#service_metric{symbol = Symbol, queue = Q}) -> + case Symbol =:= MetricSymbol of + true -> + Q1 = iot_util:queue_limited_in({Ts, MetricValue}, Q, 100), + Metric#service_metric{ + name = MetricName, + last_value = MetricValue, + update_ts = Ts, + queue = Q1 + }; + false -> + Metric + end + end, Metrics); + false -> + Q0 = queue:new(), + Metric = #service_metric{ + name = MetricName, + symbol = MetricSymbol, + last_value = MetricValue, + update_ts = Ts, + queue = queue:in({Ts, MetricValue}, Q0) + }, + Metrics ++ [Metric] + end, + mnesia:write(terminal, Terminal#terminal{metrics = NMetrics}, write) + end + end, + + case mnesia:transaction(Fun) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + + -spec delete(HostId :: binary()) -> ok | {error, Reason :: any()}. delete(ServiceId) when is_binary(ServiceId) -> case mnesia:transaction(fun() -> mnesia:delete(microservice, ServiceId, write) end) of diff --git a/apps/iot/src/model/terminal_model.erl b/apps/iot/src/model/terminal_model.erl index 1a25000..b48bb40 100644 --- a/apps/iot/src/model/terminal_model.erl +++ b/apps/iot/src/model/terminal_model.erl @@ -13,7 +13,7 @@ %% API -export([get_all_terminals/0, get_status_stat/0]). --export([change_status/2, delete/1, update_metric/4]). +-export([change_status/2, delete/1]). %% 获取app信息 get_all_terminals() -> @@ -82,52 +82,6 @@ change_status(TerminalId, Status) when is_binary(TerminalId), is_integer(Status) {error, Reason} end. -update_metric(TerminalId, MetricName, MetricSymbol, MetricValue) when is_binary(TerminalId), is_binary(MetricName), is_binary(MetricSymbol), is_number(MetricValue) -> - Fun = fun() -> - case mnesia:read(terminal, TerminalId) of - [] -> - mnesia:abort(<<"terminal not found">>); - [Terminal = #terminal{metrics = Metrics}] -> - Ts = iot_util:current_time(), - NMetrics = case lists:any(fun(#service_metric{symbol = Symbol}) -> Symbol =:= MetricSymbol end, Metrics) of - true -> - lists:map(fun(Metric=#service_metric{symbol = Symbol, queue = Q}) -> - case Symbol =:= MetricSymbol of - true -> - Q1 = iot_util:queue_limited_in({Ts, MetricValue}, Q, 100), - Metric#service_metric{ - name = MetricName, - last_value = MetricValue, - update_ts = Ts, - queue = Q1 - }; - false -> - Metric - end - end, Metrics); - false -> - Q0 = queue:new(), - Metric = #service_metric{ - name = MetricName, - symbol = MetricSymbol, - last_value = MetricValue, - update_ts = Ts, - queue = queue:in({Ts, MetricValue}, Q0) - }, - Metrics ++ [Metric] - end, - mnesia:write(terminal, Terminal#terminal{metrics = NMetrics}, write) - end - end, - - case mnesia:transaction(Fun) of - {atomic, ok} -> - ok; - {aborted, Reason} -> - {error, Reason} - end. - - delete(TerminalId) when is_binary(TerminalId) -> case mnesia:transaction(fun() -> mnesia:delete(terminal, TerminalId, write) end) of {atomic, ok} ->