diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 7b52175..7d3cf30 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -60,6 +60,8 @@ name :: binary(), %% 当前最新值 last_value = 0 :: number(), + %% 单位, 如电压: a、v + unit :: any(), %% 历史值序列, 队列;保存最近的100数值, 格式为: [{ts, value}] queue = [] :: queus:queue(), %% 最后更新时间 @@ -129,7 +131,7 @@ }). %% 微服务表 --record(microservice, { +-record(service, { %% ID, 基于UUID生成 service_id :: binary(), %% 关联主机 diff --git a/apps/iot/src/iot_message_handler.erl b/apps/iot/src/iot_message_handler.erl index 3ae1e0f..e5edd9d 100644 --- a/apps/iot/src/iot_message_handler.erl +++ b/apps/iot/src/iot_message_handler.erl @@ -88,10 +88,11 @@ handle(<<"server.data">>, #{<<"c_id">> := HostId, <<"d">> := Data}) -> lager:debug("the data is: ~p", [Infos]), %% 一次可能提前多组数据 - lists:foreach(fun(#{<<"service_name">> := ServiceName, <<"time">> := Time, <<"data">> := Items}) -> - case lists:search(fun(#microservice{name = Name}) -> Name =:= ServiceName end, Services) of - {value, Service=#microservice{metrics = Metrics}} -> + lists:foreach(fun(#{<<"service_name">> := ServiceName, <<"data">> := Items}) -> + case lists:search(fun(#service{name = Name}) -> Name =:= ServiceName end, Services) of + {value, Service=#service{metrics = Metrics}} -> %% 更新数据 + NMetrics = lists:foldl(fun(MetricData, MetricsAcc) -> append_metric(MetricsAcc, MetricData) end, Metrics, Items), @@ -119,16 +120,21 @@ handle(<<"server.data">>, #{<<"c_id">> := HostId, <<"d">> := Data}) -> ok. -append_metric(Metrics, MetricName, MetricSymbol, MetricValue, Ts) -> - case lists:any(fun(#service_metric{symbol = Symbol}) -> Symbol =:= MetricSymbol end, Metrics) of +append_metric(Metrics, MetricData) when is_map(MetricData) -> + Name = maps:get(<<"name">>, MetricData, <<"">>), + Symbol = maps:get(<<"symbol">>, MetricData, <<"">>), + Value = maps:get(<<"value">>, MetricData), + Ts = maps:get(<<"time">>, MetricData, 0), + + case lists:any(fun(#service_metric{symbol = Symbol0}) -> Symbol =:= Symbol0 end, Metrics) of true -> - lists:map(fun(Metric=#service_metric{symbol = Symbol, queue = Q}) -> - case Symbol =:= MetricSymbol of + lists:map(fun(Metric=#service_metric{symbol = Symbol0, queue = Q}) -> + case Symbol =:= Symbol0 of true -> - Q1 = iot_util:queue_limited_in({Ts, MetricValue}, Q, 100), + Q1 = iot_util:queue_limited_in({Ts, Value}, Q, 100), Metric#service_metric{ - name = MetricName, - last_value = MetricValue, + name = Name, + last_value = Value, update_ts = Ts, queue = Q1 }; @@ -139,11 +145,11 @@ append_metric(Metrics, MetricName, MetricSymbol, MetricValue, Ts) -> false -> Q0 = queue:new(), Metric = #service_metric{ - name = MetricName, - symbol = MetricSymbol, - last_value = MetricValue, + name = Name, + symbol = Symbol, + last_value = Value, update_ts = Ts, - queue = queue:in({Ts, MetricValue}, Q0) + queue = queue:in({Ts, Value}, Q0) }, Metrics ++ [Metric] end. diff --git a/apps/iot/src/model/microservice_model.erl b/apps/iot/src/model/microservice_model.erl index c968474..35bcaf7 100644 --- a/apps/iot/src/model/microservice_model.erl +++ b/apps/iot/src/model/microservice_model.erl @@ -16,7 +16,7 @@ get_service(HostId, ServiceName) when is_binary(HostId), is_binary(ServiceName) -> Fun = fun() -> - Q = qlc:q([E || E <- mnesia:table(microservice), E#microservice.host_id =:= HostId, E#microservice.name =:= ServiceName]), + Q = qlc:q([E || E <- mnesia:table(microservice), E#service.host_id =:= HostId, E#service.name =:= ServiceName]), qlc:e(Q) end, case mnesia:transaction(Fun) of @@ -28,7 +28,7 @@ get_service(HostId, ServiceName) when is_binary(HostId), is_binary(ServiceName) get_services(HostId) when is_binary(HostId) -> Fun = fun() -> - Q = qlc:q([E || E <- mnesia:table(microservice), E#microservice.host_id =:= HostId]), + Q = qlc:q([E || E <- mnesia:table(microservice), E#service.host_id =:= HostId]), qlc:e(Q) end, case mnesia:transaction(Fun) of @@ -38,7 +38,7 @@ get_services(HostId) when is_binary(HostId) -> [] end. -add_service(Service = #microservice{}) -> +add_service(Service = #service{}) -> case mnesia:transaction(fun() -> mnesia:write(microservice, Service, write) end) of {atomic, _} -> ok; @@ -52,7 +52,7 @@ change_status(ServiceId, Status) when is_binary(ServiceId), is_integer(Status) - [] -> mnesia:abort(<<"host not found">>); [Service] -> - NService = Service#microservice{status = Status}, + NService = Service#service{status = Status}, mnesia:write(microservice, NService, write) end end, @@ -63,7 +63,7 @@ change_status(ServiceId, Status) when is_binary(ServiceId), is_integer(Status) - {error, Reason} end. -update_metric(TerminalId, MetricName, MetricSymbol, MetricValue) when is_binary(TerminalId), is_binary(MetricName), is_binary(MetricSymbol), is_number(MetricValue) -> +update_metric(ServiceId, Metrics) when is_binary(ServiceId), is_list(Metrics) -> Fun = fun() -> case mnesia:read(terminal, TerminalId) of [] ->