fix microservice
This commit is contained in:
parent
9baa0e7d4d
commit
375db51b9c
@ -80,15 +80,70 @@ handle(<<"server.register">>, Msg = #{<<"c_id">> := ClientId, <<"r">> := PubKey,
|
|||||||
|
|
||||||
handle(<<"server.data">>, #{<<"c_id">> := HostId, <<"d">> := Data}) ->
|
handle(<<"server.data">>, #{<<"c_id">> := HostId, <<"d">> := Data}) ->
|
||||||
{ok, Host = #host{aes = Aes}} = host_model:get_host(HostId),
|
{ok, Host = #host{aes = Aes}} = host_model:get_host(HostId),
|
||||||
|
Services = microservice_model:get_services(HostId),
|
||||||
|
|
||||||
PlainData = iot_cipher_aes:decrypt(Aes, Aes, Data),
|
PlainData = iot_cipher_aes:decrypt(Aes, Aes, Data),
|
||||||
case jiffy:decode(PlainData, [return_maps]) of
|
case jiffy:decode(PlainData, [return_maps]) of
|
||||||
Metrics when is_list(Metrics) ->
|
Infos when is_list(Infos) ->
|
||||||
lager:debug("the metric is: ~p", [Metrics]),
|
lager:debug("the data is: ~p", [Infos]),
|
||||||
|
|
||||||
|
%% 一次可能提前多组数据
|
||||||
|
lists:foreach(fun(#{<<"service_name">> := ServiceName, <<"time">> := Time, <<"data">> := Items}) ->
|
||||||
|
case lists:search(fun(#microservice{name = Name}) -> Name =:= ServiceName end, Services) of
|
||||||
|
{value, Service=#microservice{metrics = Metrics}} ->
|
||||||
|
%% 更新数据
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
ok;
|
||||||
|
false ->
|
||||||
|
lager:warning("[iot_message_handler] host_id: ~p, not found service_name: ~p", [HostId, ServiceName])
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
end, Infos),
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
ok;
|
ok;
|
||||||
_ ->
|
_ ->
|
||||||
lager:debug("the metric is invalid json")
|
lager:debug("the metric is invalid json")
|
||||||
end,
|
end,
|
||||||
|
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
||||||
|
append_metric(Metrics, MetricName, MetricSymbol, MetricValue, Ts) ->
|
||||||
|
case lists:any(fun(#service_metric{symbol = Symbol}) -> Symbol =:= MetricSymbol end, Metrics) of
|
||||||
|
true ->
|
||||||
|
lists:map(fun(Metric=#service_metric{symbol = Symbol, queue = Q}) ->
|
||||||
|
case Symbol =:= MetricSymbol of
|
||||||
|
true ->
|
||||||
|
Q1 = iot_util:queue_limited_in({Ts, MetricValue}, Q, 100),
|
||||||
|
Metric#service_metric{
|
||||||
|
name = MetricName,
|
||||||
|
last_value = MetricValue,
|
||||||
|
update_ts = Ts,
|
||||||
|
queue = Q1
|
||||||
|
};
|
||||||
|
false ->
|
||||||
|
Metric
|
||||||
|
end
|
||||||
|
end, Metrics);
|
||||||
|
false ->
|
||||||
|
Q0 = queue:new(),
|
||||||
|
Metric = #service_metric{
|
||||||
|
name = MetricName,
|
||||||
|
symbol = MetricSymbol,
|
||||||
|
last_value = MetricValue,
|
||||||
|
update_ts = Ts,
|
||||||
|
queue = queue:in({Ts, MetricValue}, Q0)
|
||||||
|
},
|
||||||
|
Metrics ++ [Metric]
|
||||||
|
end.
|
||||||
|
|||||||
@ -63,6 +63,52 @@ change_status(ServiceId, Status) when is_binary(ServiceId), is_integer(Status) -
|
|||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
update_metric(TerminalId, MetricName, MetricSymbol, MetricValue) when is_binary(TerminalId), is_binary(MetricName), is_binary(MetricSymbol), is_number(MetricValue) ->
|
||||||
|
Fun = fun() ->
|
||||||
|
case mnesia:read(terminal, TerminalId) of
|
||||||
|
[] ->
|
||||||
|
mnesia:abort(<<"terminal not found">>);
|
||||||
|
[Terminal = #terminal{metrics = Metrics}] ->
|
||||||
|
Ts = iot_util:current_time(),
|
||||||
|
NMetrics = case lists:any(fun(#service_metric{symbol = Symbol}) -> Symbol =:= MetricSymbol end, Metrics) of
|
||||||
|
true ->
|
||||||
|
lists:map(fun(Metric=#service_metric{symbol = Symbol, queue = Q}) ->
|
||||||
|
case Symbol =:= MetricSymbol of
|
||||||
|
true ->
|
||||||
|
Q1 = iot_util:queue_limited_in({Ts, MetricValue}, Q, 100),
|
||||||
|
Metric#service_metric{
|
||||||
|
name = MetricName,
|
||||||
|
last_value = MetricValue,
|
||||||
|
update_ts = Ts,
|
||||||
|
queue = Q1
|
||||||
|
};
|
||||||
|
false ->
|
||||||
|
Metric
|
||||||
|
end
|
||||||
|
end, Metrics);
|
||||||
|
false ->
|
||||||
|
Q0 = queue:new(),
|
||||||
|
Metric = #service_metric{
|
||||||
|
name = MetricName,
|
||||||
|
symbol = MetricSymbol,
|
||||||
|
last_value = MetricValue,
|
||||||
|
update_ts = Ts,
|
||||||
|
queue = queue:in({Ts, MetricValue}, Q0)
|
||||||
|
},
|
||||||
|
Metrics ++ [Metric]
|
||||||
|
end,
|
||||||
|
mnesia:write(terminal, Terminal#terminal{metrics = NMetrics}, write)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
|
||||||
|
case mnesia:transaction(Fun) of
|
||||||
|
{atomic, ok} ->
|
||||||
|
ok;
|
||||||
|
{aborted, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
-spec delete(HostId :: binary()) -> ok | {error, Reason :: any()}.
|
-spec delete(HostId :: binary()) -> ok | {error, Reason :: any()}.
|
||||||
delete(ServiceId) when is_binary(ServiceId) ->
|
delete(ServiceId) when is_binary(ServiceId) ->
|
||||||
case mnesia:transaction(fun() -> mnesia:delete(microservice, ServiceId, write) end) of
|
case mnesia:transaction(fun() -> mnesia:delete(microservice, ServiceId, write) end) of
|
||||||
|
|||||||
@ -13,7 +13,7 @@
|
|||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([get_all_terminals/0, get_status_stat/0]).
|
-export([get_all_terminals/0, get_status_stat/0]).
|
||||||
-export([change_status/2, delete/1, update_metric/4]).
|
-export([change_status/2, delete/1]).
|
||||||
|
|
||||||
%% 获取app信息
|
%% 获取app信息
|
||||||
get_all_terminals() ->
|
get_all_terminals() ->
|
||||||
@ -82,52 +82,6 @@ change_status(TerminalId, Status) when is_binary(TerminalId), is_integer(Status)
|
|||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
update_metric(TerminalId, MetricName, MetricSymbol, MetricValue) when is_binary(TerminalId), is_binary(MetricName), is_binary(MetricSymbol), is_number(MetricValue) ->
|
|
||||||
Fun = fun() ->
|
|
||||||
case mnesia:read(terminal, TerminalId) of
|
|
||||||
[] ->
|
|
||||||
mnesia:abort(<<"terminal not found">>);
|
|
||||||
[Terminal = #terminal{metrics = Metrics}] ->
|
|
||||||
Ts = iot_util:current_time(),
|
|
||||||
NMetrics = case lists:any(fun(#service_metric{symbol = Symbol}) -> Symbol =:= MetricSymbol end, Metrics) of
|
|
||||||
true ->
|
|
||||||
lists:map(fun(Metric=#service_metric{symbol = Symbol, queue = Q}) ->
|
|
||||||
case Symbol =:= MetricSymbol of
|
|
||||||
true ->
|
|
||||||
Q1 = iot_util:queue_limited_in({Ts, MetricValue}, Q, 100),
|
|
||||||
Metric#service_metric{
|
|
||||||
name = MetricName,
|
|
||||||
last_value = MetricValue,
|
|
||||||
update_ts = Ts,
|
|
||||||
queue = Q1
|
|
||||||
};
|
|
||||||
false ->
|
|
||||||
Metric
|
|
||||||
end
|
|
||||||
end, Metrics);
|
|
||||||
false ->
|
|
||||||
Q0 = queue:new(),
|
|
||||||
Metric = #service_metric{
|
|
||||||
name = MetricName,
|
|
||||||
symbol = MetricSymbol,
|
|
||||||
last_value = MetricValue,
|
|
||||||
update_ts = Ts,
|
|
||||||
queue = queue:in({Ts, MetricValue}, Q0)
|
|
||||||
},
|
|
||||||
Metrics ++ [Metric]
|
|
||||||
end,
|
|
||||||
mnesia:write(terminal, Terminal#terminal{metrics = NMetrics}, write)
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
|
|
||||||
case mnesia:transaction(Fun) of
|
|
||||||
{atomic, ok} ->
|
|
||||||
ok;
|
|
||||||
{aborted, Reason} ->
|
|
||||||
{error, Reason}
|
|
||||||
end.
|
|
||||||
|
|
||||||
|
|
||||||
delete(TerminalId) when is_binary(TerminalId) ->
|
delete(TerminalId) when is_binary(TerminalId) ->
|
||||||
case mnesia:transaction(fun() -> mnesia:delete(terminal, TerminalId, write) end) of
|
case mnesia:transaction(fun() -> mnesia:delete(terminal, TerminalId, write) end) of
|
||||||
{atomic, ok} ->
|
{atomic, ok} ->
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user