From 98746b99ac1602813d79e41dfd95948816fde22c Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 6 Mar 2025 16:49:03 +0800 Subject: [PATCH] remove data from device --- apps/iot/src/http_handler/device_handler.erl | 35 +------- apps/iot/src/iot_device.erl | 85 +------------------- apps/iot/src/iot_host.erl | 8 -- 3 files changed, 4 insertions(+), 124 deletions(-) diff --git a/apps/iot/src/http_handler/device_handler.erl b/apps/iot/src/http_handler/device_handler.erl index 60c34e9..3c4308c 100644 --- a/apps/iot/src/http_handler/device_handler.erl +++ b/apps/iot/src/http_handler/device_handler.erl @@ -63,39 +63,6 @@ handle_request("POST", "/device/activate", _, #{<<"host_id">> := HostId, <<"devi end end; -%% 重新加载对应的主机信息 -handle_request("POST", "/device/query", _, Params = #{<<"device_uuid">> := DeviceUUID}) when is_binary(DeviceUUID) -> - lager:debug("[device_handler] query device uuid: ~p, params: ~p", [DeviceUUID, Params]), - case iot_device:get_pid(DeviceUUID) of - undefined -> - {ok, 200, iot_util:json_error(404, <<"device not found">>)}; - DevicePid when is_pid(DevicePid) -> - Tags = maps:get(<<"tags">>, Params, #{}), - StartTs = maps:get(<<"start_ts">>, Params, 0), - StopTs = maps:get(<<"stop_ts">>, Params, 0), - Limit = maps:get(<<"limit">>, Params, 0), - %% 增加对fields的过滤 - Fields = maps:get(<<"fields">>, Params, []), - - case is_map(Tags) andalso is_integer(StartTs) andalso is_integer(StopTs) andalso is_integer(Limit) - andalso StartTs >= 0 andalso StopTs >= 0 andalso Limit >= 0 of - - true -> - {ok, DeviceDataList} = iot_device:query(DevicePid, Fields, Tags, StartTs, StopTs, Limit), - DataItems = lists:map(fun({Field, Items}) -> - NItems = lists:map(fun as_map/1, Items), - {Field, NItems} - end, DeviceDataList), - - {ok, 200, iot_util:json_data(maps:from_list(DataItems))}; - false -> - {ok, 200, iot_util:json_error(404, <<"invalid params">>)} - end - end; - handle_request(_, Path, _, _) -> Path1 = list_to_binary(Path), - {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. - -as_map(#device_data{tags = Tags0, val = Val, timestamp = Timestamp}) -> - #{<<"tags">> => Tags0, <<"val">> => Val, <<"timestamp">> => Timestamp}. \ No newline at end of file + {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. \ No newline at end of file diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index acf8db4..e75cee0 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -15,7 +15,7 @@ %% API -export([get_name/1, get_pid/1, serialize/1]). --export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2, data/2, query/6]). +-export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2]). -export([ai_event/3, handle_data/3]). %% gen_server callbacks @@ -23,10 +23,6 @@ -record(state, { device_uuid :: binary(), - %% 用来保存数据,作为存储在influxdb里面的数据的备份; 数据是按照<$field>来分组的, 格式: {field => queue:queue()} - metrics = #{}, - %% 最大数据缓存量 - cache_size = 200, %% 设备是否授权 auth_status :: integer(), %% 事件触发周期 @@ -108,14 +104,6 @@ reload(Pid) when is_pid(Pid) -> auth(Pid, Auth) when is_pid(Pid), is_boolean(Auth) -> gen_server:cast(Pid, {auth, Auth}). --spec data(Pid :: pid(), DataList :: [#device_data{}]) -> no_return(). -data(Pid, DataList) when is_pid(Pid), is_list(DataList) -> - gen_server:cast(Pid, {data, DataList}). - --spec query(Pid :: pid(), Fields :: list(), Tags :: map(), StartTs :: integer(), StopTs :: integer(), Limit :: integer()) -> {ok, [{Field :: binary(), #device_data{}}]}. -query(Pid, Fields, Tags, StartTs, StopTs, Limit) when is_pid(Pid), is_list(Fields), is_map(Tags), is_integer(StartTs), is_integer(StopTs), is_integer(Limit), StartTs >= 0, StopTs >= 0, Limit >= 0 -> - gen_server:call(Pid, {query, Fields, Tags, StartTs, StopTs, Limit}). - -spec ai_event(Pid :: pid(), EventType :: integer(), Params :: map()) -> no_return(). ai_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_map(Params) -> gen_server:cast(Pid, {ai_event, EventType, Params}). @@ -150,10 +138,9 @@ init([DeviceUUID]) when is_binary(DeviceUUID) -> ignore end; init([#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status, <<"model_id">> := ModelId}]) -> - {ok, CacheSize} = application:get_env(iot, device_cache_size), {ok, AiEventThrottle} = application:get_env(iot, ai_event_throttle), - {ok, #state{device_uuid = DeviceUUID, ai_event_throttle = AiEventThrottle, cache_size = CacheSize, + {ok, #state{device_uuid = DeviceUUID, ai_event_throttle = AiEventThrottle, status = as_integer(Status), auth_status = as_integer(AuthorizeStatus), model_id = as_integer(ModelId)}}. %% @private @@ -168,37 +155,7 @@ init([#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStat {stop, Reason :: term(), NewState :: #state{}}). %% 判断当前设备是否是激活状态 handle_call(is_activated, _From, State = #state{auth_status = AuthStatus}) -> - {reply, AuthStatus =:= 1, State}; - -%% 查询当前设备中产生的数据,缓存在内存中 -handle_call({query, Fields, Tags, StartTs, StopTs, Limit}, _From, State = #state{metrics = Metrics}) -> - LMetrics = maps:to_list(Metrics), - NLMetrics = case length(Fields) > 0 of - true -> - lists:filter(fun({F0, _}) -> lists:member(F0, Fields) end, LMetrics); - false -> - LMetrics - end, - - DataList = lists:map(fun({Field, Q}) -> - L = queue:to_list(Q), - - %% 过滤 - L1 = filter_timestamp(L, StartTs, StopTs), - L2 = filter_tags(L1, Tags), - - %% 处理数据截取 - L3 = lists:reverse(L2), - L4 = case Limit =:= 0 of - true -> - L3; - false -> - lists:sublist(L3, 1, Limit) - end, - {Field, L4} - end, NLMetrics), - - {reply, {ok, DataList}, State}. + {reply, AuthStatus =:= 1, State}. %% @private %% @doc Handling cast messages @@ -238,21 +195,6 @@ handle_cast(reload, State = #state{device_uuid = DeviceUUID}) -> {stop, normal, State} end; -%% 向设备中追加数据 -handle_cast({data, DataList}, State = #state{metrics = Metrics, cache_size = CacheSize}) -> - NMetrics = lists:foldl(fun(Data = #device_data{field = Field}, Metrics0) -> - Q0 = maps:get(Field, Metrics0, queue:new()), - NQ = case queue:len(Q0) > CacheSize of - true -> - {_, Q1} = queue:out(Q0), - queue:in(Data, Q1); - false -> - queue:in(Data, Q0) - end, - Metrics0#{Field => NQ} - end, Metrics, DataList), - {noreply, State#state{metrics = NMetrics}}; - %% 处理授权 handle_cast({auth, true}, State = #state{device_uuid = DeviceUUID}) -> lager:debug("[iot_device] device_uuid: ~p, auth: true", [DeviceUUID]), @@ -365,27 +307,6 @@ report_event(DeviceUUID, NewStatus) when is_binary(DeviceUUID), is_integer(NewSt end, lager:debug("[iot_device] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]). -%% 过滤时间 --spec filter_timestamp(L :: list(), StartTs :: integer(), StopTs :: integer()) -> list(). -filter_timestamp(L, 0, 0) -> - L; -filter_timestamp(_L, StartTs, StopTs) when StartTs > StopTs -> - []; -filter_timestamp(L, StartTs0, StopTs0) -> - %% 时间的比较统一到毫秒级别 - StartTs = StartTs0 * 1000, - StopTs = StopTs0 * 1000, - lists:filter(fun(#device_data{timestamp = Timestamp}) -> Timestamp >= StartTs andalso Timestamp =< StopTs end, L). - -%% 过滤标签 --spec filter_tags(L :: list(), Tags :: map()) -> list(). -filter_tags(L, Tags) when map_size(Tags) =:= 0 -> - L; -filter_tags(L, Tags) when map_size(Tags) =:= 0 -> - lists:filter(fun(#device_data{tags = Tags0}) -> - lists:all(fun({TagName, TagVal}) -> maps:is_key(TagName, Tags0) andalso maps:get(TagName, Tags0) =:= TagVal end, maps:to_list(Tags)) - end, L). - %% 转换成整数,从数据读取的数据有时候不一定都是整数 -spec as_integer(Val :: integer() | binary()) -> integer(). as_integer(Val) when is_integer(Val) -> diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index fc942a6..a709772 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -602,14 +602,6 @@ handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName influx_client_pool:write_data(DeviceUUID, NTags, FieldsList, Timestamp), iot_device:change_status(DevicePid, ?DEVICE_ONLINE), - - %% 时间同样精确到毫秒 - %FormattedTimestamp = iot_util:format_timestamp(Timestamp), - %DeviceDataList = lists:map(fun({Field, Val}) -> - % #device_data{tags = NTags, field = Field, val = Val, timestamp = FormattedTimestamp} - % end, iot_device:serialize(FieldsList)), - - % iot_device:data(DevicePid, DeviceDataList); ok; false -> lager:warning("[iot_host] host uuid: ~p, device_uuid: ~p not activated, fields: ~p, tags: ~p", [UUID, DeviceUUID, FieldsList, Tags])