remove data from device

This commit is contained in:
anlicheng 2025-03-06 16:49:03 +08:00
parent 159b923698
commit 98746b99ac
3 changed files with 4 additions and 124 deletions

View File

@ -63,39 +63,6 @@ handle_request("POST", "/device/activate", _, #{<<"host_id">> := HostId, <<"devi
end
end;
%%
handle_request("POST", "/device/query", _, Params = #{<<"device_uuid">> := DeviceUUID}) when is_binary(DeviceUUID) ->
lager:debug("[device_handler] query device uuid: ~p, params: ~p", [DeviceUUID, Params]),
case iot_device:get_pid(DeviceUUID) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"device not found">>)};
DevicePid when is_pid(DevicePid) ->
Tags = maps:get(<<"tags">>, Params, #{}),
StartTs = maps:get(<<"start_ts">>, Params, 0),
StopTs = maps:get(<<"stop_ts">>, Params, 0),
Limit = maps:get(<<"limit">>, Params, 0),
%% fields的过滤
Fields = maps:get(<<"fields">>, Params, []),
case is_map(Tags) andalso is_integer(StartTs) andalso is_integer(StopTs) andalso is_integer(Limit)
andalso StartTs >= 0 andalso StopTs >= 0 andalso Limit >= 0 of
true ->
{ok, DeviceDataList} = iot_device:query(DevicePid, Fields, Tags, StartTs, StopTs, Limit),
DataItems = lists:map(fun({Field, Items}) ->
NItems = lists:map(fun as_map/1, Items),
{Field, NItems}
end, DeviceDataList),
{ok, 200, iot_util:json_data(maps:from_list(DataItems))};
false ->
{ok, 200, iot_util:json_error(404, <<"invalid params">>)}
end
end;
handle_request(_, Path, _, _) ->
Path1 = list_to_binary(Path),
{ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}.
as_map(#device_data{tags = Tags0, val = Val, timestamp = Timestamp}) ->
#{<<"tags">> => Tags0, <<"val">> => Val, <<"timestamp">> => Timestamp}.
{ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}.

View File

@ -15,7 +15,7 @@
%% API
-export([get_name/1, get_pid/1, serialize/1]).
-export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2, data/2, query/6]).
-export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2]).
-export([ai_event/3, handle_data/3]).
%% gen_server callbacks
@ -23,10 +23,6 @@
-record(state, {
device_uuid :: binary(),
%% influxdb里面的数据的备份; <$field>, : {field => queue:queue()}
metrics = #{},
%%
cache_size = 200,
%%
auth_status :: integer(),
%%
@ -108,14 +104,6 @@ reload(Pid) when is_pid(Pid) ->
auth(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
gen_server:cast(Pid, {auth, Auth}).
-spec data(Pid :: pid(), DataList :: [#device_data{}]) -> no_return().
data(Pid, DataList) when is_pid(Pid), is_list(DataList) ->
gen_server:cast(Pid, {data, DataList}).
-spec query(Pid :: pid(), Fields :: list(), Tags :: map(), StartTs :: integer(), StopTs :: integer(), Limit :: integer()) -> {ok, [{Field :: binary(), #device_data{}}]}.
query(Pid, Fields, Tags, StartTs, StopTs, Limit) when is_pid(Pid), is_list(Fields), is_map(Tags), is_integer(StartTs), is_integer(StopTs), is_integer(Limit), StartTs >= 0, StopTs >= 0, Limit >= 0 ->
gen_server:call(Pid, {query, Fields, Tags, StartTs, StopTs, Limit}).
-spec ai_event(Pid :: pid(), EventType :: integer(), Params :: map()) -> no_return().
ai_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_map(Params) ->
gen_server:cast(Pid, {ai_event, EventType, Params}).
@ -150,10 +138,9 @@ init([DeviceUUID]) when is_binary(DeviceUUID) ->
ignore
end;
init([#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status, <<"model_id">> := ModelId}]) ->
{ok, CacheSize} = application:get_env(iot, device_cache_size),
{ok, AiEventThrottle} = application:get_env(iot, ai_event_throttle),
{ok, #state{device_uuid = DeviceUUID, ai_event_throttle = AiEventThrottle, cache_size = CacheSize,
{ok, #state{device_uuid = DeviceUUID, ai_event_throttle = AiEventThrottle,
status = as_integer(Status), auth_status = as_integer(AuthorizeStatus), model_id = as_integer(ModelId)}}.
%% @private
@ -168,37 +155,7 @@ init([#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStat
{stop, Reason :: term(), NewState :: #state{}}).
%%
handle_call(is_activated, _From, State = #state{auth_status = AuthStatus}) ->
{reply, AuthStatus =:= 1, State};
%%
handle_call({query, Fields, Tags, StartTs, StopTs, Limit}, _From, State = #state{metrics = Metrics}) ->
LMetrics = maps:to_list(Metrics),
NLMetrics = case length(Fields) > 0 of
true ->
lists:filter(fun({F0, _}) -> lists:member(F0, Fields) end, LMetrics);
false ->
LMetrics
end,
DataList = lists:map(fun({Field, Q}) ->
L = queue:to_list(Q),
%%
L1 = filter_timestamp(L, StartTs, StopTs),
L2 = filter_tags(L1, Tags),
%%
L3 = lists:reverse(L2),
L4 = case Limit =:= 0 of
true ->
L3;
false ->
lists:sublist(L3, 1, Limit)
end,
{Field, L4}
end, NLMetrics),
{reply, {ok, DataList}, State}.
{reply, AuthStatus =:= 1, State}.
%% @private
%% @doc Handling cast messages
@ -238,21 +195,6 @@ handle_cast(reload, State = #state{device_uuid = DeviceUUID}) ->
{stop, normal, State}
end;
%%
handle_cast({data, DataList}, State = #state{metrics = Metrics, cache_size = CacheSize}) ->
NMetrics = lists:foldl(fun(Data = #device_data{field = Field}, Metrics0) ->
Q0 = maps:get(Field, Metrics0, queue:new()),
NQ = case queue:len(Q0) > CacheSize of
true ->
{_, Q1} = queue:out(Q0),
queue:in(Data, Q1);
false ->
queue:in(Data, Q0)
end,
Metrics0#{Field => NQ}
end, Metrics, DataList),
{noreply, State#state{metrics = NMetrics}};
%%
handle_cast({auth, true}, State = #state{device_uuid = DeviceUUID}) ->
lager:debug("[iot_device] device_uuid: ~p, auth: true", [DeviceUUID]),
@ -365,27 +307,6 @@ report_event(DeviceUUID, NewStatus) when is_binary(DeviceUUID), is_integer(NewSt
end,
lager:debug("[iot_device] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]).
%%
-spec filter_timestamp(L :: list(), StartTs :: integer(), StopTs :: integer()) -> list().
filter_timestamp(L, 0, 0) ->
L;
filter_timestamp(_L, StartTs, StopTs) when StartTs > StopTs ->
[];
filter_timestamp(L, StartTs0, StopTs0) ->
%%
StartTs = StartTs0 * 1000,
StopTs = StopTs0 * 1000,
lists:filter(fun(#device_data{timestamp = Timestamp}) -> Timestamp >= StartTs andalso Timestamp =< StopTs end, L).
%%
-spec filter_tags(L :: list(), Tags :: map()) -> list().
filter_tags(L, Tags) when map_size(Tags) =:= 0 ->
L;
filter_tags(L, Tags) when map_size(Tags) =:= 0 ->
lists:filter(fun(#device_data{tags = Tags0}) ->
lists:all(fun({TagName, TagVal}) -> maps:is_key(TagName, Tags0) andalso maps:get(TagName, Tags0) =:= TagVal end, maps:to_list(Tags))
end, L).
%%
-spec as_integer(Val :: integer() | binary()) -> integer().
as_integer(Val) when is_integer(Val) ->

View File

@ -602,14 +602,6 @@ handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName
influx_client_pool:write_data(DeviceUUID, NTags, FieldsList, Timestamp),
iot_device:change_status(DevicePid, ?DEVICE_ONLINE),
%%
%FormattedTimestamp = iot_util:format_timestamp(Timestamp),
%DeviceDataList = lists:map(fun({Field, Val}) ->
% #device_data{tags = NTags, field = Field, val = Val, timestamp = FormattedTimestamp}
% end, iot_device:serialize(FieldsList)),
% iot_device:data(DevicePid, DeviceDataList);
ok;
false ->
lager:warning("[iot_host] host uuid: ~p, device_uuid: ~p not activated, fields: ~p, tags: ~p", [UUID, DeviceUUID, FieldsList, Tags])