将数据存储到设备内存中
This commit is contained in:
parent
bab0089190
commit
c8c95b68ce
@ -101,3 +101,10 @@
|
||||
event_type :: integer(),
|
||||
params :: map()
|
||||
}).
|
||||
|
||||
%% 设备数据
|
||||
-record(device_data, {
|
||||
tags :: map(),
|
||||
val :: any(),
|
||||
timestamp :: integer()
|
||||
}).
|
||||
@ -12,8 +12,8 @@
|
||||
-behaviour(gen_statem).
|
||||
|
||||
%% API
|
||||
-export([get_name/1, get_pid/1]).
|
||||
-export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2]).
|
||||
-export([get_name/1, get_pid/1, serialize/1]).
|
||||
-export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2, data/2, query/5]).
|
||||
|
||||
%% gen_statem callbacks
|
||||
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||
@ -22,12 +22,17 @@
|
||||
-define(DEVICE_AUTH_DENIED, 0).
|
||||
-define(DEVICE_AUTH_AUTHED, 1).
|
||||
|
||||
%% 存储数据的上限
|
||||
-define(MAX_SIZE, 2000).
|
||||
|
||||
%% 状态
|
||||
-define(STATE_DENIED, denied).
|
||||
-define(STATE_ACTIVATED, activated).
|
||||
|
||||
-record(state, {
|
||||
device_uuid :: binary(),
|
||||
%% 用来保存数据,作为存储在influxdb里面的数据的备份
|
||||
queue = queue:new(),
|
||||
status = ?DEVICE_OFFLINE
|
||||
}).
|
||||
|
||||
@ -35,6 +40,26 @@
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
%% 格式化efka上传的数据格式
|
||||
%"fields": [
|
||||
%{
|
||||
% "key": "test"
|
||||
% "value": 124,
|
||||
% "unit": "U",
|
||||
% "type": "AI:遥测值,DI:遥信值,SOE:事件",
|
||||
% "timestamp": int
|
||||
%}
|
||||
%],
|
||||
-spec serialize(FieldsList :: [map()]) -> [Val :: binary()].
|
||||
serialize(FieldsList) when is_list(FieldsList) ->
|
||||
lists:flatmap(fun serialize0/1, FieldsList).
|
||||
serialize0(Fields = #{<<"key">> := Key}) when is_binary(Key) andalso Key /= <<>> ->
|
||||
Values = maps:remove(<<"key">>, Fields),
|
||||
S = base64:encode(iolist_to_binary(jiffy:encode(#{Key => Values}, [force_utf8]))),
|
||||
[<<"base64:", S/binary>>];
|
||||
serialize0(_) ->
|
||||
[].
|
||||
|
||||
-spec is_alive(DeviceUUID :: binary()) -> error | {ok, Pid :: pid()}.
|
||||
is_alive(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||
case iot_device:get_pid(DeviceUUID) of
|
||||
@ -77,6 +102,13 @@ reload(Pid) when is_pid(Pid) ->
|
||||
auth(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
|
||||
gen_statem:cast(Pid, {auth, Auth}).
|
||||
|
||||
-spec data(Pid :: pid(), DataList :: [#device_data{}]) -> no_return().
|
||||
data(Pid, DataList) when is_pid(Pid), is_list(DataList) ->
|
||||
gen_statem:cast(Pid, {data, DataList}).
|
||||
|
||||
query(Pid, Tags, StartTs, StopTs, Limit) when is_pid(Pid), is_map(Tags), is_integer(StartTs), is_integer(StopTs), is_integer(Limit) ->
|
||||
gen_statem:call(Pid, {query, Tags, StartTs, StopTs, Limit}).
|
||||
|
||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||
%% initialize. To ensure a synchronized start-up procedure, this
|
||||
%% function does not return until Module:init/1 has returned.
|
||||
@ -164,6 +196,38 @@ handle_event(cast, reload, _, State = #state{device_uuid = DeviceUUID}) ->
|
||||
{stop, normal, State}
|
||||
end;
|
||||
|
||||
%% 向设备中追加数据
|
||||
handle_event(cast, {data, DataList}, _, State = #state{queue = Q}) ->
|
||||
NQ = lists:foldl(fun(Data, Q0) ->
|
||||
case queue:len(Q0) > ?MAX_SIZE of
|
||||
true ->
|
||||
{_, Q1} = queue:out(Q0),
|
||||
queue:in(Data, Q1);
|
||||
false ->
|
||||
queue:in(Data, Q0)
|
||||
end
|
||||
end, Q, DataList),
|
||||
|
||||
{keep_state, State#state{queue = NQ}};
|
||||
|
||||
%% 查询device里面的数据
|
||||
handle_event({call, From}, {query, Tags, StartTs, StopTs, Limit}, _StateName, State = #state{queue = Q}) ->
|
||||
L = queue:to_list(Q),
|
||||
|
||||
%% 过滤
|
||||
L1 = filter_timestamp(L, StartTs, StopTs),
|
||||
L2 = filter_tags(L1, Tags),
|
||||
|
||||
%% 处理数据截取
|
||||
L3 = lists:reverse(L2),
|
||||
L4 = case Limit =:= 0 of
|
||||
true ->
|
||||
L3;
|
||||
false ->
|
||||
lists:sublist(L3, 1, Limit)
|
||||
end,
|
||||
{keep_state, State, [{reply, From, L4}]};
|
||||
|
||||
%% 处理授权
|
||||
handle_event(cast, {auth, Auth}, StateName, State = #state{device_uuid = DeviceUUID}) ->
|
||||
case {StateName, Auth} of
|
||||
@ -218,3 +282,25 @@ report_event(DeviceUUID, NewStatus) when is_binary(DeviceUUID), is_integer(NewSt
|
||||
}],
|
||||
iot_router:route_uuid(DeviceUUID, FieldsList, Timestamp),
|
||||
lager:debug("[iot_device] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]).
|
||||
|
||||
%% 过滤时间
|
||||
-spec filter_timestamp(L :: list(), StartTs :: integer(), StopTs :: integer()) -> list().
|
||||
filter_timestamp(L, 0, 0) ->
|
||||
L;
|
||||
filter_timestamp(_L, StartTs, StopTs) when StartTs > StopTs ->
|
||||
[];
|
||||
filter_timestamp(L, StartTs0, StopTs0) ->
|
||||
%% 时间的比较统一到毫秒级别
|
||||
StartTs = StartTs0 * 1000,
|
||||
StopTs = StopTs0 * 1000,
|
||||
lists:filter(fun(#device_data{timestamp = Timestamp}) -> Timestamp >= StartTs andalso Timestamp =< StopTs end, L).
|
||||
|
||||
%% 过滤标签
|
||||
-spec filter_tags(L :: list(), Tags :: map()) -> list().
|
||||
filter_tags(L, Tags) when map_size(Tags) =:= 0 ->
|
||||
L;
|
||||
filter_tags(L, Tags) when map_size(Tags) =:= 0 ->
|
||||
lists:filter(fun(#device_data{tags = Tags0}) ->
|
||||
lists:all(fun({TagName, TagVal}) -> maps:is_key(TagName, Tags0) andalso maps:get(TagName, Tags0) =:= TagVal end, maps:to_list(Tags))
|
||||
end, L).
|
||||
|
||||
|
||||
@ -584,7 +584,14 @@ handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName
|
||||
NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName, <<"device_uuid">> => DeviceUUID},
|
||||
influx_client:write_data(DeviceUUID, NTags, FieldsList, Timestamp),
|
||||
|
||||
iot_device:change_status(DevicePid, ?DEVICE_ONLINE);
|
||||
iot_device:change_status(DevicePid, ?DEVICE_ONLINE),
|
||||
|
||||
%% 将数据写入到设备的内存缓存中
|
||||
DeviceVals = iot_device:serialize(FieldsList),
|
||||
%% 时间同样精确到毫秒
|
||||
FormattedTimestamp = iot_util:format_timestamp(Timestamp),
|
||||
DeviceDataList = lists:map(fun(Val) -> #device_data{tags = NTags, timestamp = FormattedTimestamp, val = Val} end, DeviceVals),
|
||||
iot_device:data(DevicePid, DeviceDataList);
|
||||
false ->
|
||||
lager:warning("[iot_host] host uuid: ~p, device_uuid: ~p not activated, fields: ~p, tags: ~p", [UUID, DeviceUUID, FieldsList, Tags])
|
||||
end
|
||||
|
||||
@ -14,6 +14,23 @@
|
||||
-export([step/3, chunks/2, rand_bytes/1, uuid/0, md5/1, parse_mapper/1]).
|
||||
-export([json_data/1, json_error/2]).
|
||||
-export([queue_limited_in/3, assert_call/2, assert/2]).
|
||||
-export([format_timestamp/1]).
|
||||
|
||||
%% 格式化时间
|
||||
-spec format_timestamp(Timestamp :: integer()) -> integer().
|
||||
format_timestamp(Timestamp) when is_integer(Timestamp) ->
|
||||
case length(integer_to_list(Timestamp)) of
|
||||
10 ->
|
||||
Timestamp * 1000;
|
||||
13 ->
|
||||
Timestamp;
|
||||
16 ->
|
||||
Timestamp div 1000;
|
||||
19 ->
|
||||
Timestamp div 1000_000;
|
||||
_ ->
|
||||
timestamp()
|
||||
end.
|
||||
|
||||
%% 时间,精确到毫秒
|
||||
timestamp() ->
|
||||
|
||||
0
docs/jinzhi_http.md
Normal file
0
docs/jinzhi_http.md
Normal file
@ -21,6 +21,8 @@
|
||||
{
|
||||
"version": "1.0",
|
||||
"location_code": "string",
|
||||
// 日期: 2024-06-20, <<新增字段>>
|
||||
"real_location_code": "string",
|
||||
"ts ": 1688606685,
|
||||
"properties": [
|
||||
{
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user