From 8391ace30275147d3423b68d3c404d9852f19662 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 21 Jan 2025 01:05:07 +0800 Subject: [PATCH] mnesia build data --- apps/iot/include/iot_tables.hrl | 7 +++ apps/iot/src/iot_app.erl | 26 ++-------- apps/iot/src/iot_build.erl | 22 ++++---- apps/iot/src/mnesia/mnesia_build_data.erl | 62 +++++++++++++++++++++++ 4 files changed, 83 insertions(+), 34 deletions(-) create mode 100644 apps/iot/src/mnesia/mnesia_build_data.erl diff --git a/apps/iot/include/iot_tables.hrl b/apps/iot/include/iot_tables.hrl index 52f4820..b7aaec3 100644 --- a/apps/iot/include/iot_tables.hrl +++ b/apps/iot/include/iot_tables.hrl @@ -12,4 +12,11 @@ -record(counter, { key, count = 0 :: integer() +}). + +-record(build_data, { + location_code :: binary(), + device_uuid :: binary(), + props = #{}, + timestamp = 0 :: integer() }). \ No newline at end of file diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index 92296fe..0890139 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -78,26 +78,6 @@ start_mnesia() -> %% 启动数据库 ok = mnesia:start(), Tables = mnesia:system_info(tables), - - LoadTables = [counter], - case lists:all(fun(Tab) -> lists:member(Tab, Tables) end, LoadTables) of - true -> - lager:debug("[iot_app] waiting for mnesia start: ~p", [LoadTables]), - %% 加载必须等待的数据库表 - mnesia:wait_for_tables(LoadTables, infinity), - lager:debug("[iot_app] waiting for mnesia end"); - false -> - lager:warning("[iot_app] tables: ~p not exists, recreate mnesia schema", [LoadTables]), - %% 清理掉以前的schema - mnesia:stop(), - mnesia:delete_schema([node()]), - - %% 创建schema - ok = mnesia:create_schema([node()]), - ok = mnesia:start(), - - %% 创建数据库表 - - %% 大数据统计表 - mnesia_counter:create_table() - end. \ No newline at end of file + %% 创建数据库表 + not lists:member(build_data, Tables) andalso mnesia_build_data:create_table(), + not lists:member(counter, Tables) andalso mnesia_counter:create_table(). \ No newline at end of file diff --git a/apps/iot/src/iot_build.erl b/apps/iot/src/iot_build.erl index e354a0d..9013d50 100644 --- a/apps/iot/src/iot_build.erl +++ b/apps/iot/src/iot_build.erl @@ -9,6 +9,7 @@ -module(iot_build). -author("anlicheng"). -include("iot.hrl"). +-include("iot_tables.hrl"). -behaviour(gen_server). @@ -24,10 +25,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, { - build_location_code :: binary(), - %% 数据累加器, 数据格式: #{device_uuid => #{key0 => props0, key1 => Props1}} - accumulators = #{}, - last_timestamp = 0 + build_location_code :: binary() }). %%%=================================================================== @@ -92,11 +90,10 @@ handle_call(_Request, _From, State) -> %% ] %% 数据采用当前设备的最新数据覆盖来的数据 %% acc的格式为:#{device_uuid => #{key0 => props0, key1 => Props1}} -handle_cast({handle_data, DeviceUUID, Fields, Timestamp}, State = #state{accumulators = Accumulators, last_timestamp = LastTimestamp}) -> - DeviceProps = maps:get(DeviceUUID, Accumulators, #{}), - NDeviceProps = lists:foldl(fun(Prop = #{<<"key">> := Key}, Acc0) -> Acc0#{Key => Prop} end, DeviceProps, Fields), +handle_cast({handle_data, DeviceUUID, Fields, Timestamp}, State = #state{build_location_code = LocationCode}) -> + mnesia_build_data:insert(LocationCode, DeviceUUID, Fields, Timestamp), - {noreply, State#state{accumulators = maps:put(DeviceUUID, NDeviceProps, Accumulators), last_timestamp = max(Timestamp, LastTimestamp)}}. + {noreply, State}. %% @private %% @doc Handling all non call/cast messages @@ -104,14 +101,17 @@ handle_cast({handle_data, DeviceUUID, Fields, Timestamp}, State = #state{accumul {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({timeout, _, report_ticker}, State = #state{accumulators = Accumulators, build_location_code = LocationCode, last_timestamp = Timestamp}) -> +handle_info({timeout, _, report_ticker}, State = #state{build_location_code = LocationCode, build_location_code = LocationCode}) -> %% 需要汇报的keys: total_power, total_runtime, use_times; 这几个key的值是可以直接累加的 - L = maps:values(Accumulators), + %% props的格式为:#{key0 => props0, key1 => Props1} + + Datasets = mnesia_build_data:get_build_dataset(LocationCode), + L = lists:map(fun(#build_data{props = P}) -> P end, Datasets), Props0 = lists:map(fun(Key) -> merge(Key, L) end, [<<"total_power">>, <<"total_runtime">>, <<"use_times">>]), Fields = lists:flatten(Props0), - iot_zd_endpoint:forward(LocationCode, LocationCode, Fields, Timestamp), + iot_zd_endpoint:forward(LocationCode, LocationCode, Fields, iot_util:current_time()), iot_build_logger:write([LocationCode, jiffy:encode(Fields, [force_utf8])]), erlang:start_timer(?REPORT_INTERVAL, self(), report_ticker), diff --git a/apps/iot/src/mnesia/mnesia_build_data.erl b/apps/iot/src/mnesia/mnesia_build_data.erl new file mode 100644 index 0000000..5dce324 --- /dev/null +++ b/apps/iot/src/mnesia/mnesia_build_data.erl @@ -0,0 +1,62 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 04. 7月 2023 12:31 +%%%------------------------------------------------------------------- +-module(mnesia_build_data). +-author("aresei"). +-include("iot_tables.hrl"). +-include_lib("stdlib/include/qlc.hrl"). +-define(TAB, build_data). + +%% API +-export([create_table/0]). +-export([insert/4, get_build_dataset/1]). + +create_table() -> + %% id生成器 + mnesia:create_table(build_data, [ + {attributes, record_info(fields, build_data)}, + {record_name, build_data}, + {disc_copies, [node()]}, + {type, bag} + ]). + +-spec insert(LocationCode :: binary(), DeviceUUID :: binary(), Fields :: list(), Timestamp :: integer()) -> ok | {error, Reason :: any()}. +insert(LocationCode, DeviceUUID, Fields, Timestamp) when is_integer(LocationCode), is_integer(DeviceUUID), is_list(Fields), is_integer(Timestamp) -> + Fun = fun() -> + Q = qlc:q([E || E <- mnesia:table(?TAB), E#build_data.location_code =:= LocationCode, E#build_data.device_uuid =:= DeviceUUID]), + case qlc:e(Q) of + [] -> + Props = lists:foldl(fun(Prop = #{<<"key">> := Key}, Acc0) -> Acc0#{Key => Prop} end, #{}, Fields), + mnesia:write(?TAB, #build_data{location_code = LocationCode, device_uuid = DeviceUUID, props = Props, timestamp = Timestamp}, write); + [Item = #build_data{props = Props}|_] -> + ok = mnesia:delete_object(?TAB, Item, write), + + NProps = lists:foldl(fun(Prop = #{<<"key">> := Key}, Acc0) -> Acc0#{Key => Prop} end, Props, Fields), + mnesia:write(?TAB, Item#build_data{props = NProps, timestamp = Timestamp}, write) + end + end, + case mnesia:transaction(Fun) of + {'atomic', Res} -> + Res; + {'aborted', Reason} -> + {error, Reason} + end. + +-spec get_build_dataset(LocationCode :: binary()) -> [#build_data{}]. +get_build_dataset(LocationCode) when is_integer(LocationCode) -> + Fun = fun() -> + Q = qlc:q([E || E <- mnesia:table(?TAB), E#build_data.location_code =:= LocationCode]), + qlc:e(Q) + end, + + case mnesia:transaction(Fun) of + {'atomic', Res} -> + Res; + {'aborted', _} -> + [] + end. \ No newline at end of file