mnesia build data
This commit is contained in:
parent
3eac69633e
commit
8391ace302
@ -13,3 +13,10 @@
|
|||||||
key,
|
key,
|
||||||
count = 0 :: integer()
|
count = 0 :: integer()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-record(build_data, {
|
||||||
|
location_code :: binary(),
|
||||||
|
device_uuid :: binary(),
|
||||||
|
props = #{},
|
||||||
|
timestamp = 0 :: integer()
|
||||||
|
}).
|
||||||
@ -78,26 +78,6 @@ start_mnesia() ->
|
|||||||
%% 启动数据库
|
%% 启动数据库
|
||||||
ok = mnesia:start(),
|
ok = mnesia:start(),
|
||||||
Tables = mnesia:system_info(tables),
|
Tables = mnesia:system_info(tables),
|
||||||
|
|
||||||
LoadTables = [counter],
|
|
||||||
case lists:all(fun(Tab) -> lists:member(Tab, Tables) end, LoadTables) of
|
|
||||||
true ->
|
|
||||||
lager:debug("[iot_app] waiting for mnesia start: ~p", [LoadTables]),
|
|
||||||
%% 加载必须等待的数据库表
|
|
||||||
mnesia:wait_for_tables(LoadTables, infinity),
|
|
||||||
lager:debug("[iot_app] waiting for mnesia end");
|
|
||||||
false ->
|
|
||||||
lager:warning("[iot_app] tables: ~p not exists, recreate mnesia schema", [LoadTables]),
|
|
||||||
%% 清理掉以前的schema
|
|
||||||
mnesia:stop(),
|
|
||||||
mnesia:delete_schema([node()]),
|
|
||||||
|
|
||||||
%% 创建schema
|
|
||||||
ok = mnesia:create_schema([node()]),
|
|
||||||
ok = mnesia:start(),
|
|
||||||
|
|
||||||
%% 创建数据库表
|
%% 创建数据库表
|
||||||
|
not lists:member(build_data, Tables) andalso mnesia_build_data:create_table(),
|
||||||
%% 大数据统计表
|
not lists:member(counter, Tables) andalso mnesia_counter:create_table().
|
||||||
mnesia_counter:create_table()
|
|
||||||
end.
|
|
||||||
@ -9,6 +9,7 @@
|
|||||||
-module(iot_build).
|
-module(iot_build).
|
||||||
-author("anlicheng").
|
-author("anlicheng").
|
||||||
-include("iot.hrl").
|
-include("iot.hrl").
|
||||||
|
-include("iot_tables.hrl").
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
@ -24,10 +25,7 @@
|
|||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
build_location_code :: binary(),
|
build_location_code :: binary()
|
||||||
%% 数据累加器, 数据格式: #{device_uuid => #{key0 => props0, key1 => Props1}}
|
|
||||||
accumulators = #{},
|
|
||||||
last_timestamp = 0
|
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
@ -92,11 +90,10 @@ handle_call(_Request, _From, State) ->
|
|||||||
%% ]
|
%% ]
|
||||||
%% 数据采用当前设备的最新数据覆盖来的数据
|
%% 数据采用当前设备的最新数据覆盖来的数据
|
||||||
%% acc的格式为:#{device_uuid => #{key0 => props0, key1 => Props1}}
|
%% acc的格式为:#{device_uuid => #{key0 => props0, key1 => Props1}}
|
||||||
handle_cast({handle_data, DeviceUUID, Fields, Timestamp}, State = #state{accumulators = Accumulators, last_timestamp = LastTimestamp}) ->
|
handle_cast({handle_data, DeviceUUID, Fields, Timestamp}, State = #state{build_location_code = LocationCode}) ->
|
||||||
DeviceProps = maps:get(DeviceUUID, Accumulators, #{}),
|
mnesia_build_data:insert(LocationCode, DeviceUUID, Fields, Timestamp),
|
||||||
NDeviceProps = lists:foldl(fun(Prop = #{<<"key">> := Key}, Acc0) -> Acc0#{Key => Prop} end, DeviceProps, Fields),
|
|
||||||
|
|
||||||
{noreply, State#state{accumulators = maps:put(DeviceUUID, NDeviceProps, Accumulators), last_timestamp = max(Timestamp, LastTimestamp)}}.
|
{noreply, State}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling all non call/cast messages
|
%% @doc Handling all non call/cast messages
|
||||||
@ -104,14 +101,17 @@ handle_cast({handle_data, DeviceUUID, Fields, Timestamp}, State = #state{accumul
|
|||||||
{noreply, NewState :: #state{}} |
|
{noreply, NewState :: #state{}} |
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_info({timeout, _, report_ticker}, State = #state{accumulators = Accumulators, build_location_code = LocationCode, last_timestamp = Timestamp}) ->
|
handle_info({timeout, _, report_ticker}, State = #state{build_location_code = LocationCode, build_location_code = LocationCode}) ->
|
||||||
%% 需要汇报的keys: total_power, total_runtime, use_times; 这几个key的值是可以直接累加的
|
%% 需要汇报的keys: total_power, total_runtime, use_times; 这几个key的值是可以直接累加的
|
||||||
L = maps:values(Accumulators),
|
%% props的格式为:#{key0 => props0, key1 => Props1}
|
||||||
|
|
||||||
|
Datasets = mnesia_build_data:get_build_dataset(LocationCode),
|
||||||
|
L = lists:map(fun(#build_data{props = P}) -> P end, Datasets),
|
||||||
|
|
||||||
Props0 = lists:map(fun(Key) -> merge(Key, L) end, [<<"total_power">>, <<"total_runtime">>, <<"use_times">>]),
|
Props0 = lists:map(fun(Key) -> merge(Key, L) end, [<<"total_power">>, <<"total_runtime">>, <<"use_times">>]),
|
||||||
Fields = lists:flatten(Props0),
|
Fields = lists:flatten(Props0),
|
||||||
|
|
||||||
iot_zd_endpoint:forward(LocationCode, LocationCode, Fields, Timestamp),
|
iot_zd_endpoint:forward(LocationCode, LocationCode, Fields, iot_util:current_time()),
|
||||||
iot_build_logger:write([LocationCode, jiffy:encode(Fields, [force_utf8])]),
|
iot_build_logger:write([LocationCode, jiffy:encode(Fields, [force_utf8])]),
|
||||||
|
|
||||||
erlang:start_timer(?REPORT_INTERVAL, self(), report_ticker),
|
erlang:start_timer(?REPORT_INTERVAL, self(), report_ticker),
|
||||||
|
|||||||
62
apps/iot/src/mnesia/mnesia_build_data.erl
Normal file
62
apps/iot/src/mnesia/mnesia_build_data.erl
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
%%% @author aresei
|
||||||
|
%%% @copyright (C) 2023, <COMPANY>
|
||||||
|
%%% @doc
|
||||||
|
%%%
|
||||||
|
%%% @end
|
||||||
|
%%% Created : 04. 7月 2023 12:31
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
-module(mnesia_build_data).
|
||||||
|
-author("aresei").
|
||||||
|
-include("iot_tables.hrl").
|
||||||
|
-include_lib("stdlib/include/qlc.hrl").
|
||||||
|
-define(TAB, build_data).
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([create_table/0]).
|
||||||
|
-export([insert/4, get_build_dataset/1]).
|
||||||
|
|
||||||
|
create_table() ->
|
||||||
|
%% id生成器
|
||||||
|
mnesia:create_table(build_data, [
|
||||||
|
{attributes, record_info(fields, build_data)},
|
||||||
|
{record_name, build_data},
|
||||||
|
{disc_copies, [node()]},
|
||||||
|
{type, bag}
|
||||||
|
]).
|
||||||
|
|
||||||
|
-spec insert(LocationCode :: binary(), DeviceUUID :: binary(), Fields :: list(), Timestamp :: integer()) -> ok | {error, Reason :: any()}.
|
||||||
|
insert(LocationCode, DeviceUUID, Fields, Timestamp) when is_integer(LocationCode), is_integer(DeviceUUID), is_list(Fields), is_integer(Timestamp) ->
|
||||||
|
Fun = fun() ->
|
||||||
|
Q = qlc:q([E || E <- mnesia:table(?TAB), E#build_data.location_code =:= LocationCode, E#build_data.device_uuid =:= DeviceUUID]),
|
||||||
|
case qlc:e(Q) of
|
||||||
|
[] ->
|
||||||
|
Props = lists:foldl(fun(Prop = #{<<"key">> := Key}, Acc0) -> Acc0#{Key => Prop} end, #{}, Fields),
|
||||||
|
mnesia:write(?TAB, #build_data{location_code = LocationCode, device_uuid = DeviceUUID, props = Props, timestamp = Timestamp}, write);
|
||||||
|
[Item = #build_data{props = Props}|_] ->
|
||||||
|
ok = mnesia:delete_object(?TAB, Item, write),
|
||||||
|
|
||||||
|
NProps = lists:foldl(fun(Prop = #{<<"key">> := Key}, Acc0) -> Acc0#{Key => Prop} end, Props, Fields),
|
||||||
|
mnesia:write(?TAB, Item#build_data{props = NProps, timestamp = Timestamp}, write)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
case mnesia:transaction(Fun) of
|
||||||
|
{'atomic', Res} ->
|
||||||
|
Res;
|
||||||
|
{'aborted', Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec get_build_dataset(LocationCode :: binary()) -> [#build_data{}].
|
||||||
|
get_build_dataset(LocationCode) when is_integer(LocationCode) ->
|
||||||
|
Fun = fun() ->
|
||||||
|
Q = qlc:q([E || E <- mnesia:table(?TAB), E#build_data.location_code =:= LocationCode]),
|
||||||
|
qlc:e(Q)
|
||||||
|
end,
|
||||||
|
|
||||||
|
case mnesia:transaction(Fun) of
|
||||||
|
{'atomic', Res} ->
|
||||||
|
Res;
|
||||||
|
{'aborted', _} ->
|
||||||
|
[]
|
||||||
|
end.
|
||||||
Loading…
x
Reference in New Issue
Block a user