调整内存数据缓存
This commit is contained in:
parent
b15520eefd
commit
5bbeff4246
@ -105,6 +105,7 @@
|
|||||||
%% 设备数据
|
%% 设备数据
|
||||||
-record(device_data, {
|
-record(device_data, {
|
||||||
tags :: map(),
|
tags :: map(),
|
||||||
|
field :: any(),
|
||||||
val :: any(),
|
val :: any(),
|
||||||
timestamp :: integer()
|
timestamp :: integer()
|
||||||
}).
|
}).
|
||||||
@ -74,16 +74,20 @@ handle_request("POST", "/device/query", _, Params = #{<<"device_uuid">> := Devic
|
|||||||
StartTs = maps:get(<<"start_ts">>, Params, 0),
|
StartTs = maps:get(<<"start_ts">>, Params, 0),
|
||||||
StopTs = maps:get(<<"stop_ts">>, Params, 0),
|
StopTs = maps:get(<<"stop_ts">>, Params, 0),
|
||||||
Limit = maps:get(<<"limit">>, Params, 0),
|
Limit = maps:get(<<"limit">>, Params, 0),
|
||||||
|
%% 增加对fields的过滤
|
||||||
|
Fields = maps:get(<<"fields">>, Params, []),
|
||||||
|
|
||||||
case is_map(Tags) andalso is_integer(StartTs) andalso is_integer(StopTs) andalso is_integer(Limit)
|
case is_map(Tags) andalso is_integer(StartTs) andalso is_integer(StopTs) andalso is_integer(Limit)
|
||||||
andalso StartTs >= 0 andalso StopTs >= 0 andalso Limit >= 0 of
|
andalso StartTs >= 0 andalso StopTs >= 0 andalso Limit >= 0 of
|
||||||
|
|
||||||
true ->
|
true ->
|
||||||
{ok, DeviceDataList} = iot_device:query(DevicePid, Tags, StartTs, StopTs, Limit),
|
{ok, DeviceDataList} = iot_device:query(DevicePid, Fields, Tags, StartTs, StopTs, Limit),
|
||||||
DataItems = lists:map(fun(#device_data{tags = Tags0, val = Val, timestamp = Timestamp}) ->
|
DataItems = lists:map(fun({Field, Items}) ->
|
||||||
#{<<"tags">> => Tags0, <<"val">> => Val, <<"timestamp">> => Timestamp}
|
NItems = lists:map(fun as_map/1, Items),
|
||||||
|
{Field, NItems}
|
||||||
end, DeviceDataList),
|
end, DeviceDataList),
|
||||||
{ok, 200, iot_util:json_data(DataItems)};
|
|
||||||
|
{ok, 200, iot_util:json_data(maps:from_list(DataItems))};
|
||||||
false ->
|
false ->
|
||||||
{ok, 200, iot_util:json_error(404, <<"invalid params">>)}
|
{ok, 200, iot_util:json_error(404, <<"invalid params">>)}
|
||||||
end
|
end
|
||||||
@ -92,3 +96,6 @@ handle_request("POST", "/device/query", _, Params = #{<<"device_uuid">> := Devic
|
|||||||
handle_request(_, Path, _, _) ->
|
handle_request(_, Path, _, _) ->
|
||||||
Path1 = list_to_binary(Path),
|
Path1 = list_to_binary(Path),
|
||||||
{ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}.
|
{ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}.
|
||||||
|
|
||||||
|
as_map(#device_data{tags = Tags0, val = Val, timestamp = Timestamp}) ->
|
||||||
|
#{<<"tags">> => Tags0, <<"val">> => Val, <<"timestamp">> => Timestamp}.
|
||||||
@ -15,19 +15,18 @@
|
|||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([get_name/1, get_pid/1, serialize/1]).
|
-export([get_name/1, get_pid/1, serialize/1]).
|
||||||
-export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2, data/2, query/5]).
|
-export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2, data/2, query/6]).
|
||||||
-export([ai_event/3]).
|
-export([ai_event/3]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
|
|
||||||
%% 存储数据的上限
|
|
||||||
-define(MAX_SIZE, 2000).
|
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
device_uuid :: binary(),
|
device_uuid :: binary(),
|
||||||
%% 用来保存数据,作为存储在influxdb里面的数据的备份
|
%% 用来保存数据,作为存储在influxdb里面的数据的备份; 数据是按照<$field>来分组的, 格式: {field => queue:queue()}
|
||||||
queue = queue:new(),
|
metrics = #{},
|
||||||
|
%% 最大数据缓存量
|
||||||
|
cache_size = 200,
|
||||||
%% 设备是否授权
|
%% 设备是否授权
|
||||||
auth_status :: integer(),
|
auth_status :: integer(),
|
||||||
status = ?DEVICE_OFFLINE
|
status = ?DEVICE_OFFLINE
|
||||||
@ -48,14 +47,14 @@
|
|||||||
%}
|
%}
|
||||||
%],
|
%],
|
||||||
%% 用来保存在内存中的格式,不需要序列话处理 !!!, 放入到influxdb的数据是基于base64的
|
%% 用来保存在内存中的格式,不需要序列话处理 !!!, 放入到influxdb的数据是基于base64的
|
||||||
-spec serialize(FieldsList :: [map()]) -> [Val :: map()].
|
-spec serialize(FieldsList :: [map()]) -> [{Key :: binary(), Values :: map()}].
|
||||||
serialize(FieldsList) when is_list(FieldsList) ->
|
serialize(FieldsList) when is_list(FieldsList) ->
|
||||||
lists:flatmap(fun serialize0/1, FieldsList).
|
lists:flatmap(fun serialize0/1, FieldsList).
|
||||||
serialize0(Fields = #{<<"key">> := Key}) when is_binary(Key) andalso Key /= <<>> ->
|
serialize0(Fields = #{<<"key">> := Key}) when is_binary(Key) andalso Key /= <<>> ->
|
||||||
Values = maps:remove(<<"key">>, Fields),
|
Values = maps:remove(<<"key">>, Fields),
|
||||||
%S = base64:encode(iolist_to_binary(jiffy:encode(#{Key => Values}, [force_utf8]))),
|
%S = base64:encode(iolist_to_binary(jiffy:encode(#{Key => Values}, [force_utf8]))),
|
||||||
%[<<"base64:", S/binary>>];
|
%[<<"base64:", S/binary>>];
|
||||||
[#{Key => Values}];
|
[{Key, Values}];
|
||||||
serialize0(_) ->
|
serialize0(_) ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
@ -105,9 +104,9 @@ auth(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
|
|||||||
data(Pid, DataList) when is_pid(Pid), is_list(DataList) ->
|
data(Pid, DataList) when is_pid(Pid), is_list(DataList) ->
|
||||||
gen_server:cast(Pid, {data, DataList}).
|
gen_server:cast(Pid, {data, DataList}).
|
||||||
|
|
||||||
-spec query(Pid :: pid(), Tags :: map(), StartTs :: integer(), StopTs :: integer(), Limit :: integer()) -> {ok, [#device_data{}]}.
|
-spec query(Pid :: pid(), Fields :: list(), Tags :: map(), StartTs :: integer(), StopTs :: integer(), Limit :: integer()) -> {ok, [{Field :: binary(), #device_data{}}]}.
|
||||||
query(Pid, Tags, StartTs, StopTs, Limit) when is_pid(Pid), is_map(Tags), is_integer(StartTs), is_integer(StopTs), is_integer(Limit), StartTs >= 0, StopTs >= 0, Limit >= 0 ->
|
query(Pid, Fields, Tags, StartTs, StopTs, Limit) when is_pid(Pid), is_list(Fields), is_map(Tags), is_integer(StartTs), is_integer(StopTs), is_integer(Limit), StartTs >= 0, StopTs >= 0, Limit >= 0 ->
|
||||||
gen_server:call(Pid, {query, Tags, StartTs, StopTs, Limit}).
|
gen_server:call(Pid, {query, Fields, Tags, StartTs, StopTs, Limit}).
|
||||||
|
|
||||||
-spec ai_event(Pid :: pid(), EventType :: integer(), Params :: map()) -> no_return().
|
-spec ai_event(Pid :: pid(), EventType :: integer(), Params :: map()) -> no_return().
|
||||||
ai_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_map(Params) ->
|
ai_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_map(Params) ->
|
||||||
@ -139,7 +138,9 @@ init([DeviceUUID]) when is_binary(DeviceUUID) ->
|
|||||||
ignore
|
ignore
|
||||||
end;
|
end;
|
||||||
init([#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}]) ->
|
init([#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}]) ->
|
||||||
{ok, #state{device_uuid = DeviceUUID, status = as_integer(Status), auth_status = as_integer(AuthorizeStatus)}}.
|
{ok, CacheSize} = application:get_env(iot, device_cache_size),
|
||||||
|
|
||||||
|
{ok, #state{device_uuid = DeviceUUID, cache_size = CacheSize, status = as_integer(Status), auth_status = as_integer(AuthorizeStatus)}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling call messages
|
%% @doc Handling call messages
|
||||||
@ -156,22 +157,34 @@ handle_call(is_activated, _From, State = #state{auth_status = AuthStatus}) ->
|
|||||||
{reply, AuthStatus =:= 1, State};
|
{reply, AuthStatus =:= 1, State};
|
||||||
|
|
||||||
%% 查询当前设备中产生的数据,缓存在内存中
|
%% 查询当前设备中产生的数据,缓存在内存中
|
||||||
handle_call({query, Tags, StartTs, StopTs, Limit}, _From, State = #state{queue = Q}) ->
|
handle_call({query, Fields, Tags, StartTs, StopTs, Limit}, _From, State = #state{metrics = Metrics}) ->
|
||||||
L = queue:to_list(Q),
|
LMetrics = maps:to_list(Metrics),
|
||||||
|
NLMetrics = case length(Fields) > 0 of
|
||||||
|
true ->
|
||||||
|
lists:filter(fun({F0, _}) -> lists:member(F0, Fields) end, LMetrics);
|
||||||
|
false ->
|
||||||
|
LMetrics
|
||||||
|
end,
|
||||||
|
|
||||||
%% 过滤
|
DataList = lists:map(fun({Field, Q}) ->
|
||||||
L1 = filter_timestamp(L, StartTs, StopTs),
|
L = queue:to_list(Q),
|
||||||
L2 = filter_tags(L1, Tags),
|
|
||||||
|
|
||||||
%% 处理数据截取
|
%% 过滤
|
||||||
L3 = lists:reverse(L2),
|
L1 = filter_timestamp(L, StartTs, StopTs),
|
||||||
L4 = case Limit =:= 0 of
|
L2 = filter_tags(L1, Tags),
|
||||||
true ->
|
|
||||||
L3;
|
%% 处理数据截取
|
||||||
false ->
|
L3 = lists:reverse(L2),
|
||||||
lists:sublist(L3, 1, Limit)
|
L4 = case Limit =:= 0 of
|
||||||
end,
|
true ->
|
||||||
{reply, {ok, L4}, State}.
|
L3;
|
||||||
|
false ->
|
||||||
|
lists:sublist(L3, 1, Limit)
|
||||||
|
end,
|
||||||
|
{Field, L4}
|
||||||
|
end, NLMetrics),
|
||||||
|
|
||||||
|
{reply, {ok, DataList}, State}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling cast messages
|
%% @doc Handling cast messages
|
||||||
@ -212,17 +225,19 @@ handle_cast(reload, State = #state{device_uuid = DeviceUUID}) ->
|
|||||||
end;
|
end;
|
||||||
|
|
||||||
%% 向设备中追加数据
|
%% 向设备中追加数据
|
||||||
handle_cast({data, DataList}, State = #state{queue = Q}) ->
|
handle_cast({data, DataList}, State = #state{metrics = Metrics, cache_size = CacheSize}) ->
|
||||||
NQ = lists:foldl(fun(Data, Q0) ->
|
NMetrics = lists:foldl(fun(Data = #device_data{field = Field}, Metrics0) ->
|
||||||
case queue:len(Q0) > ?MAX_SIZE of
|
Q0 = maps:get(Field, Metrics0, queue:new()),
|
||||||
true ->
|
NQ = case queue:len(Q0) > CacheSize of
|
||||||
{_, Q1} = queue:out(Q0),
|
true ->
|
||||||
queue:in(Data, Q1);
|
{_, Q1} = queue:out(Q0),
|
||||||
false ->
|
queue:in(Data, Q1);
|
||||||
queue:in(Data, Q0)
|
false ->
|
||||||
end
|
queue:in(Data, Q0)
|
||||||
end, Q, DataList),
|
end,
|
||||||
{noreply, State#state{queue = NQ}};
|
Metrics0#{Field => NQ}
|
||||||
|
end, Metrics, DataList),
|
||||||
|
{noreply, State#state{metrics = NMetrics}};
|
||||||
|
|
||||||
%% 处理授权
|
%% 处理授权
|
||||||
handle_cast({auth, true}, State = #state{device_uuid = DeviceUUID}) ->
|
handle_cast({auth, true}, State = #state{device_uuid = DeviceUUID}) ->
|
||||||
|
|||||||
@ -586,11 +586,12 @@ handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName
|
|||||||
|
|
||||||
iot_device:change_status(DevicePid, ?DEVICE_ONLINE),
|
iot_device:change_status(DevicePid, ?DEVICE_ONLINE),
|
||||||
|
|
||||||
%% 将数据写入到设备的内存缓存中
|
|
||||||
DeviceVals = iot_device:serialize(FieldsList),
|
|
||||||
%% 时间同样精确到毫秒
|
%% 时间同样精确到毫秒
|
||||||
FormattedTimestamp = iot_util:format_timestamp(Timestamp),
|
FormattedTimestamp = iot_util:format_timestamp(Timestamp),
|
||||||
DeviceDataList = lists:map(fun(Val) -> #device_data{tags = NTags, timestamp = FormattedTimestamp, val = Val} end, DeviceVals),
|
DeviceDataList = lists:map(fun({Field, Val}) ->
|
||||||
|
#device_data{tags = NTags, field = Field, val = Val, timestamp = FormattedTimestamp}
|
||||||
|
end, iot_device:serialize(FieldsList)),
|
||||||
|
|
||||||
iot_device:data(DevicePid, DeviceDataList);
|
iot_device:data(DevicePid, DeviceDataList);
|
||||||
false ->
|
false ->
|
||||||
lager:warning("[iot_host] host uuid: ~p, device_uuid: ~p not activated, fields: ~p, tags: ~p", [UUID, DeviceUUID, FieldsList, Tags])
|
lager:warning("[iot_host] host uuid: ~p, device_uuid: ~p not activated, fields: ~p, tags: ~p", [UUID, DeviceUUID, FieldsList, Tags])
|
||||||
|
|||||||
@ -11,6 +11,9 @@
|
|||||||
{port, 18080}
|
{port, 18080}
|
||||||
]},
|
]},
|
||||||
|
|
||||||
|
%% 数据的最大缓存量
|
||||||
|
{device_cache_size, 200},
|
||||||
|
|
||||||
{api_url, "http://39.98.184.67:8800"},
|
{api_url, "http://39.98.184.67:8800"},
|
||||||
|
|
||||||
%% 目标服务器地址
|
%% 目标服务器地址
|
||||||
|
|||||||
@ -11,6 +11,9 @@
|
|||||||
{port, 18080}
|
{port, 18080}
|
||||||
]},
|
]},
|
||||||
|
|
||||||
|
%% 数据的最大缓存量
|
||||||
|
{device_cache_size, 200},
|
||||||
|
|
||||||
%% 权限检验时的预埋token
|
%% 权限检验时的预埋token
|
||||||
{pre_tokens, [
|
{pre_tokens, [
|
||||||
{<<"test">>, <<"iot2023">>}
|
{<<"test">>, <<"iot2023">>}
|
||||||
|
|||||||
@ -11,6 +11,9 @@
|
|||||||
{port, 18080}
|
{port, 18080}
|
||||||
]},
|
]},
|
||||||
|
|
||||||
|
%% 数据的最大缓存量
|
||||||
|
{device_cache_size, 200},
|
||||||
|
|
||||||
%% 权限检验时的预埋token
|
%% 权限检验时的预埋token
|
||||||
{pre_tokens, [
|
{pre_tokens, [
|
||||||
{<<"test">>, <<"iot2023">>}
|
{<<"test">>, <<"iot2023">>}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user