From c8c95b68ce0988b0773139c5b02a07fb71746d67 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 21 Jun 2024 01:14:59 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86=E6=95=B0=E6=8D=AE=E5=AD=98=E5=82=A8?= =?UTF-8?q?=E5=88=B0=E8=AE=BE=E5=A4=87=E5=86=85=E5=AD=98=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/iot/include/iot.hrl | 7 +++ apps/iot/src/iot_device.erl | 92 +++++++++++++++++++++++++++++++++++-- apps/iot/src/iot_host.erl | 9 +++- apps/iot/src/iot_util.erl | 17 +++++++ docs/jinzhi_http.md | 0 docs/zhongdian_mqtt.md | 2 + 6 files changed, 123 insertions(+), 4 deletions(-) create mode 100644 docs/jinzhi_http.md diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index efeef44..124f60d 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -100,4 +100,11 @@ real_location_code :: binary(), event_type :: integer(), params :: map() +}). + +%% 设备数据 +-record(device_data, { + tags :: map(), + val :: any(), + timestamp :: integer() }). \ No newline at end of file diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index 3a09fed..8c0cafc 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -12,8 +12,8 @@ -behaviour(gen_statem). %% API --export([get_name/1, get_pid/1]). --export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2]). +-export([get_name/1, get_pid/1, serialize/1]). +-export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2, data/2, query/5]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). @@ -22,12 +22,17 @@ -define(DEVICE_AUTH_DENIED, 0). -define(DEVICE_AUTH_AUTHED, 1). +%% 存储数据的上限 +-define(MAX_SIZE, 2000). + %% 状态 -define(STATE_DENIED, denied). -define(STATE_ACTIVATED, activated). -record(state, { device_uuid :: binary(), + %% 用来保存数据,作为存储在influxdb里面的数据的备份 + queue = queue:new(), status = ?DEVICE_OFFLINE }). @@ -35,6 +40,26 @@ %%% API %%%=================================================================== +%% 格式化efka上传的数据格式 +%"fields": [ +%{ +% "key": "test" +% "value": 124, +% "unit": "U", +% "type": "AI:遥测值,DI:遥信值,SOE:事件", +% "timestamp": int +%} +%], +-spec serialize(FieldsList :: [map()]) -> [Val :: binary()]. +serialize(FieldsList) when is_list(FieldsList) -> + lists:flatmap(fun serialize0/1, FieldsList). +serialize0(Fields = #{<<"key">> := Key}) when is_binary(Key) andalso Key /= <<>> -> + Values = maps:remove(<<"key">>, Fields), + S = base64:encode(iolist_to_binary(jiffy:encode(#{Key => Values}, [force_utf8]))), + [<<"base64:", S/binary>>]; +serialize0(_) -> + []. + -spec is_alive(DeviceUUID :: binary()) -> error | {ok, Pid :: pid()}. is_alive(DeviceUUID) when is_binary(DeviceUUID) -> case iot_device:get_pid(DeviceUUID) of @@ -77,6 +102,13 @@ reload(Pid) when is_pid(Pid) -> auth(Pid, Auth) when is_pid(Pid), is_boolean(Auth) -> gen_statem:cast(Pid, {auth, Auth}). +-spec data(Pid :: pid(), DataList :: [#device_data{}]) -> no_return(). +data(Pid, DataList) when is_pid(Pid), is_list(DataList) -> + gen_statem:cast(Pid, {data, DataList}). + +query(Pid, Tags, StartTs, StopTs, Limit) when is_pid(Pid), is_map(Tags), is_integer(StartTs), is_integer(StopTs), is_integer(Limit) -> + gen_statem:call(Pid, {query, Tags, StartTs, StopTs, Limit}). + %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. @@ -164,6 +196,38 @@ handle_event(cast, reload, _, State = #state{device_uuid = DeviceUUID}) -> {stop, normal, State} end; +%% 向设备中追加数据 +handle_event(cast, {data, DataList}, _, State = #state{queue = Q}) -> + NQ = lists:foldl(fun(Data, Q0) -> + case queue:len(Q0) > ?MAX_SIZE of + true -> + {_, Q1} = queue:out(Q0), + queue:in(Data, Q1); + false -> + queue:in(Data, Q0) + end + end, Q, DataList), + + {keep_state, State#state{queue = NQ}}; + +%% 查询device里面的数据 +handle_event({call, From}, {query, Tags, StartTs, StopTs, Limit}, _StateName, State = #state{queue = Q}) -> + L = queue:to_list(Q), + + %% 过滤 + L1 = filter_timestamp(L, StartTs, StopTs), + L2 = filter_tags(L1, Tags), + + %% 处理数据截取 + L3 = lists:reverse(L2), + L4 = case Limit =:= 0 of + true -> + L3; + false -> + lists:sublist(L3, 1, Limit) + end, + {keep_state, State, [{reply, From, L4}]}; + %% 处理授权 handle_event(cast, {auth, Auth}, StateName, State = #state{device_uuid = DeviceUUID}) -> case {StateName, Auth} of @@ -217,4 +281,26 @@ report_event(DeviceUUID, NewStatus) when is_binary(DeviceUUID), is_integer(NewSt <<"timestamp">> => Timestamp }], iot_router:route_uuid(DeviceUUID, FieldsList, Timestamp), - lager:debug("[iot_device] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]). \ No newline at end of file + lager:debug("[iot_device] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]). + +%% 过滤时间 +-spec filter_timestamp(L :: list(), StartTs :: integer(), StopTs :: integer()) -> list(). +filter_timestamp(L, 0, 0) -> + L; +filter_timestamp(_L, StartTs, StopTs) when StartTs > StopTs -> + []; +filter_timestamp(L, StartTs0, StopTs0) -> + %% 时间的比较统一到毫秒级别 + StartTs = StartTs0 * 1000, + StopTs = StopTs0 * 1000, + lists:filter(fun(#device_data{timestamp = Timestamp}) -> Timestamp >= StartTs andalso Timestamp =< StopTs end, L). + +%% 过滤标签 +-spec filter_tags(L :: list(), Tags :: map()) -> list(). +filter_tags(L, Tags) when map_size(Tags) =:= 0 -> + L; +filter_tags(L, Tags) when map_size(Tags) =:= 0 -> + lists:filter(fun(#device_data{tags = Tags0}) -> + lists:all(fun({TagName, TagVal}) -> maps:is_key(TagName, Tags0) andalso maps:get(TagName, Tags0) =:= TagVal end, maps:to_list(Tags)) + end, L). + diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 718a023..f92cec9 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -584,7 +584,14 @@ handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName, <<"device_uuid">> => DeviceUUID}, influx_client:write_data(DeviceUUID, NTags, FieldsList, Timestamp), - iot_device:change_status(DevicePid, ?DEVICE_ONLINE); + iot_device:change_status(DevicePid, ?DEVICE_ONLINE), + + %% 将数据写入到设备的内存缓存中 + DeviceVals = iot_device:serialize(FieldsList), + %% 时间同样精确到毫秒 + FormattedTimestamp = iot_util:format_timestamp(Timestamp), + DeviceDataList = lists:map(fun(Val) -> #device_data{tags = NTags, timestamp = FormattedTimestamp, val = Val} end, DeviceVals), + iot_device:data(DevicePid, DeviceDataList); false -> lager:warning("[iot_host] host uuid: ~p, device_uuid: ~p not activated, fields: ~p, tags: ~p", [UUID, DeviceUUID, FieldsList, Tags]) end diff --git a/apps/iot/src/iot_util.erl b/apps/iot/src/iot_util.erl index 2b3404b..026816e 100644 --- a/apps/iot/src/iot_util.erl +++ b/apps/iot/src/iot_util.erl @@ -14,6 +14,23 @@ -export([step/3, chunks/2, rand_bytes/1, uuid/0, md5/1, parse_mapper/1]). -export([json_data/1, json_error/2]). -export([queue_limited_in/3, assert_call/2, assert/2]). +-export([format_timestamp/1]). + +%% 格式化时间 +-spec format_timestamp(Timestamp :: integer()) -> integer(). +format_timestamp(Timestamp) when is_integer(Timestamp) -> + case length(integer_to_list(Timestamp)) of + 10 -> + Timestamp * 1000; + 13 -> + Timestamp; + 16 -> + Timestamp div 1000; + 19 -> + Timestamp div 1000_000; + _ -> + timestamp() + end. %% 时间,精确到毫秒 timestamp() -> diff --git a/docs/jinzhi_http.md b/docs/jinzhi_http.md new file mode 100644 index 0000000..e69de29 diff --git a/docs/zhongdian_mqtt.md b/docs/zhongdian_mqtt.md index c055798..010aa02 100644 --- a/docs/zhongdian_mqtt.md +++ b/docs/zhongdian_mqtt.md @@ -21,6 +21,8 @@ { "version": "1.0", "location_code": "string", + // 日期: 2024-06-20, <<新增字段>> + "real_location_code": "string", "ts ": 1688606685, "properties": [ {