diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 6ff7095..f9dfa82 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -105,6 +105,7 @@ %% 设备数据 -record(device_data, { tags :: map(), + field :: any(), val :: any(), timestamp :: integer() }). \ No newline at end of file diff --git a/apps/iot/src/http_handler/device_handler.erl b/apps/iot/src/http_handler/device_handler.erl index 2252058..60c34e9 100644 --- a/apps/iot/src/http_handler/device_handler.erl +++ b/apps/iot/src/http_handler/device_handler.erl @@ -74,16 +74,20 @@ handle_request("POST", "/device/query", _, Params = #{<<"device_uuid">> := Devic StartTs = maps:get(<<"start_ts">>, Params, 0), StopTs = maps:get(<<"stop_ts">>, Params, 0), Limit = maps:get(<<"limit">>, Params, 0), + %% 增加对fields的过滤 + Fields = maps:get(<<"fields">>, Params, []), case is_map(Tags) andalso is_integer(StartTs) andalso is_integer(StopTs) andalso is_integer(Limit) andalso StartTs >= 0 andalso StopTs >= 0 andalso Limit >= 0 of true -> - {ok, DeviceDataList} = iot_device:query(DevicePid, Tags, StartTs, StopTs, Limit), - DataItems = lists:map(fun(#device_data{tags = Tags0, val = Val, timestamp = Timestamp}) -> - #{<<"tags">> => Tags0, <<"val">> => Val, <<"timestamp">> => Timestamp} + {ok, DeviceDataList} = iot_device:query(DevicePid, Fields, Tags, StartTs, StopTs, Limit), + DataItems = lists:map(fun({Field, Items}) -> + NItems = lists:map(fun as_map/1, Items), + {Field, NItems} end, DeviceDataList), - {ok, 200, iot_util:json_data(DataItems)}; + + {ok, 200, iot_util:json_data(maps:from_list(DataItems))}; false -> {ok, 200, iot_util:json_error(404, <<"invalid params">>)} end @@ -92,3 +96,6 @@ handle_request("POST", "/device/query", _, Params = #{<<"device_uuid">> := Devic handle_request(_, Path, _, _) -> Path1 = list_to_binary(Path), {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. + +as_map(#device_data{tags = Tags0, val = Val, timestamp = Timestamp}) -> + #{<<"tags">> => Tags0, <<"val">> => Val, <<"timestamp">> => Timestamp}. \ No newline at end of file diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index b6c879b..02c6bbd 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -15,19 +15,18 @@ %% API -export([get_name/1, get_pid/1, serialize/1]). --export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2, data/2, query/5]). +-export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2, data/2, query/6]). -export([ai_event/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%% 存储数据的上限 --define(MAX_SIZE, 2000). - -record(state, { device_uuid :: binary(), - %% 用来保存数据,作为存储在influxdb里面的数据的备份 - queue = queue:new(), + %% 用来保存数据,作为存储在influxdb里面的数据的备份; 数据是按照<$field>来分组的, 格式: {field => queue:queue()} + metrics = #{}, + %% 最大数据缓存量 + cache_size = 200, %% 设备是否授权 auth_status :: integer(), status = ?DEVICE_OFFLINE @@ -48,14 +47,14 @@ %} %], %% 用来保存在内存中的格式,不需要序列话处理 !!!, 放入到influxdb的数据是基于base64的 --spec serialize(FieldsList :: [map()]) -> [Val :: map()]. +-spec serialize(FieldsList :: [map()]) -> [{Key :: binary(), Values :: map()}]. serialize(FieldsList) when is_list(FieldsList) -> lists:flatmap(fun serialize0/1, FieldsList). serialize0(Fields = #{<<"key">> := Key}) when is_binary(Key) andalso Key /= <<>> -> Values = maps:remove(<<"key">>, Fields), %S = base64:encode(iolist_to_binary(jiffy:encode(#{Key => Values}, [force_utf8]))), %[<<"base64:", S/binary>>]; - [#{Key => Values}]; + [{Key, Values}]; serialize0(_) -> []. @@ -105,9 +104,9 @@ auth(Pid, Auth) when is_pid(Pid), is_boolean(Auth) -> data(Pid, DataList) when is_pid(Pid), is_list(DataList) -> gen_server:cast(Pid, {data, DataList}). --spec query(Pid :: pid(), Tags :: map(), StartTs :: integer(), StopTs :: integer(), Limit :: integer()) -> {ok, [#device_data{}]}. -query(Pid, Tags, StartTs, StopTs, Limit) when is_pid(Pid), is_map(Tags), is_integer(StartTs), is_integer(StopTs), is_integer(Limit), StartTs >= 0, StopTs >= 0, Limit >= 0 -> - gen_server:call(Pid, {query, Tags, StartTs, StopTs, Limit}). +-spec query(Pid :: pid(), Fields :: list(), Tags :: map(), StartTs :: integer(), StopTs :: integer(), Limit :: integer()) -> {ok, [{Field :: binary(), #device_data{}}]}. +query(Pid, Fields, Tags, StartTs, StopTs, Limit) when is_pid(Pid), is_list(Fields), is_map(Tags), is_integer(StartTs), is_integer(StopTs), is_integer(Limit), StartTs >= 0, StopTs >= 0, Limit >= 0 -> + gen_server:call(Pid, {query, Fields, Tags, StartTs, StopTs, Limit}). -spec ai_event(Pid :: pid(), EventType :: integer(), Params :: map()) -> no_return(). ai_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_map(Params) -> @@ -139,7 +138,9 @@ init([DeviceUUID]) when is_binary(DeviceUUID) -> ignore end; init([#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}]) -> - {ok, #state{device_uuid = DeviceUUID, status = as_integer(Status), auth_status = as_integer(AuthorizeStatus)}}. + {ok, CacheSize} = application:get_env(iot, device_cache_size), + + {ok, #state{device_uuid = DeviceUUID, cache_size = CacheSize, status = as_integer(Status), auth_status = as_integer(AuthorizeStatus)}}. %% @private %% @doc Handling call messages @@ -156,22 +157,34 @@ handle_call(is_activated, _From, State = #state{auth_status = AuthStatus}) -> {reply, AuthStatus =:= 1, State}; %% 查询当前设备中产生的数据,缓存在内存中 -handle_call({query, Tags, StartTs, StopTs, Limit}, _From, State = #state{queue = Q}) -> - L = queue:to_list(Q), +handle_call({query, Fields, Tags, StartTs, StopTs, Limit}, _From, State = #state{metrics = Metrics}) -> + LMetrics = maps:to_list(Metrics), + NLMetrics = case length(Fields) > 0 of + true -> + lists:filter(fun({F0, _}) -> lists:member(F0, Fields) end, LMetrics); + false -> + LMetrics + end, - %% 过滤 - L1 = filter_timestamp(L, StartTs, StopTs), - L2 = filter_tags(L1, Tags), + DataList = lists:map(fun({Field, Q}) -> + L = queue:to_list(Q), - %% 处理数据截取 - L3 = lists:reverse(L2), - L4 = case Limit =:= 0 of - true -> - L3; - false -> - lists:sublist(L3, 1, Limit) - end, - {reply, {ok, L4}, State}. + %% 过滤 + L1 = filter_timestamp(L, StartTs, StopTs), + L2 = filter_tags(L1, Tags), + + %% 处理数据截取 + L3 = lists:reverse(L2), + L4 = case Limit =:= 0 of + true -> + L3; + false -> + lists:sublist(L3, 1, Limit) + end, + {Field, L4} + end, NLMetrics), + + {reply, {ok, DataList}, State}. %% @private %% @doc Handling cast messages @@ -212,17 +225,19 @@ handle_cast(reload, State = #state{device_uuid = DeviceUUID}) -> end; %% 向设备中追加数据 -handle_cast({data, DataList}, State = #state{queue = Q}) -> - NQ = lists:foldl(fun(Data, Q0) -> - case queue:len(Q0) > ?MAX_SIZE of - true -> - {_, Q1} = queue:out(Q0), - queue:in(Data, Q1); - false -> - queue:in(Data, Q0) - end - end, Q, DataList), - {noreply, State#state{queue = NQ}}; +handle_cast({data, DataList}, State = #state{metrics = Metrics, cache_size = CacheSize}) -> + NMetrics = lists:foldl(fun(Data = #device_data{field = Field}, Metrics0) -> + Q0 = maps:get(Field, Metrics0, queue:new()), + NQ = case queue:len(Q0) > CacheSize of + true -> + {_, Q1} = queue:out(Q0), + queue:in(Data, Q1); + false -> + queue:in(Data, Q0) + end, + Metrics0#{Field => NQ} + end, Metrics, DataList), + {noreply, State#state{metrics = NMetrics}}; %% 处理授权 handle_cast({auth, true}, State = #state{device_uuid = DeviceUUID}) -> diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 94432f5..cf88c7a 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -586,11 +586,12 @@ handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName iot_device:change_status(DevicePid, ?DEVICE_ONLINE), - %% 将数据写入到设备的内存缓存中 - DeviceVals = iot_device:serialize(FieldsList), %% 时间同样精确到毫秒 FormattedTimestamp = iot_util:format_timestamp(Timestamp), - DeviceDataList = lists:map(fun(Val) -> #device_data{tags = NTags, timestamp = FormattedTimestamp, val = Val} end, DeviceVals), + DeviceDataList = lists:map(fun({Field, Val}) -> + #device_data{tags = NTags, field = Field, val = Val, timestamp = FormattedTimestamp} + end, iot_device:serialize(FieldsList)), + iot_device:data(DevicePid, DeviceDataList); false -> lager:warning("[iot_host] host uuid: ~p, device_uuid: ~p not activated, fields: ~p, tags: ~p", [UUID, DeviceUUID, FieldsList, Tags]) diff --git a/config/sys-dev.config b/config/sys-dev.config index 93e9eb8..87fc07d 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -11,6 +11,9 @@ {port, 18080} ]}, + %% 数据的最大缓存量 + {device_cache_size, 200}, + {api_url, "http://39.98.184.67:8800"}, %% 目标服务器地址 diff --git a/config/sys-prod.config b/config/sys-prod.config index a3ce37c..682dff3 100644 --- a/config/sys-prod.config +++ b/config/sys-prod.config @@ -11,6 +11,9 @@ {port, 18080} ]}, + %% 数据的最大缓存量 + {device_cache_size, 200}, + %% 权限检验时的预埋token {pre_tokens, [ {<<"test">>, <<"iot2023">>} diff --git a/config/sys-test.config b/config/sys-test.config index 506d350..ec53f2e 100644 --- a/config/sys-test.config +++ b/config/sys-test.config @@ -11,6 +11,9 @@ {port, 18080} ]}, + %% 数据的最大缓存量 + {device_cache_size, 200}, + %% 权限检验时的预埋token {pre_tokens, [ {<<"test">>, <<"iot2023">>}