基于建筑物聚合数据

This commit is contained in:
anlicheng 2025-01-07 21:45:52 +08:00
parent 2861667377
commit a4125f3093
6 changed files with 359 additions and 8 deletions

View File

@ -4534,4 +4534,4 @@
30409246922034790417259608594274
30409246927067955217259608594398
30409246932520550417259608594526
30409246937973145617259608594650
30409246937973145617259608594650

182
apps/iot/src/iot_build.erl Normal file
View File

@ -0,0 +1,182 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2024, <COMPANY>
%%% @doc
%% 1. 5
%%% @end
%%% Created : 11. 7 2024 11:33
%%%-------------------------------------------------------------------
-module(iot_build).
-author("anlicheng").
-include("iot.hrl").
-behaviour(gen_server).
%% 5
-define(REPORT_INTERVAL, 300 * 1000).
%% API
-export([get_name/1, get_pid/1]).
-export([start_link/2]).
-export([handle_data/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(state, {
build_location_code :: binary(),
%% , : #{device_uuid => #{key0 => props0, key1 => Props1}}
accumulators = #{},
last_timestamp = 0
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec get_pid(BuildLocationCode :: binary()) -> Pid :: pid() | undefined.
get_pid(BuildLocationCode) when is_binary(BuildLocationCode) ->
whereis(get_name(BuildLocationCode)).
-spec get_name(BuildLocationCode :: binary()) -> atom().
get_name(BuildLocationCode) when is_binary(BuildLocationCode) ->
binary_to_atom(<<"iot_build:", BuildLocationCode/binary>>).
-spec handle_data(Pid :: pid(), Fields :: list(), Timestamp :: integer()) -> no_return().
handle_data(Pid, Fields, Timestamp) when is_pid(Pid), is_list(Fields), is_integer(Timestamp) ->
gen_server:cast(Pid, {handle_data, Fields, Timestamp}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Name :: atom(), BuildLocationCode :: binary()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(Name, BuildLocationCode) when is_atom(Name), is_binary(BuildLocationCode) ->
gen_server:start_link({local, Name}, ?MODULE, [BuildLocationCode], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([BuildLocationCode]) when is_binary(BuildLocationCode) ->
erlang:start_timer(?REPORT_INTERVAL, self(), report_ticker),
{ok, #state{build_location_code = BuildLocationCode}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
%%
handle_call(_Request, _From, State) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% , Fields是一个数组:
%% [
%% {"value":12733,"unit":2,"type":"AI","timestamp":1736251262,"name":"总能耗","label":"W.h","key":"total_power","device_uuid":"33806805232936960017340612696766"},
%% {"value":244,"unit":16,"type":"AI","timestamp":1736251262,"name":"总运行时间","label":"h","key":"total_runtime","device_uuid":"33806805232936960017340612696766"}
%% ]
%%
%% acc的格式为#{device_uuid => #{key0 => props0, key1 => Props1}}
handle_cast({handle_data, DeviceUUID, Fields, Timestamp}, State = #state{accumulators = Accumulators, last_timestamp = LastTimestamp}) ->
DeviceProps = maps:get(DeviceUUID, Accumulators, #{}),
NDeviceProps = lists:foldl(fun(Prop = #{<<"key">> := Key}, Acc0) -> Acc0#{Key => Prop} end, DeviceProps, Fields),
{noreply, State#state{accumulators = maps:put(DeviceUUID, NDeviceProps, Accumulators), last_timestamp = max(Timestamp, LastTimestamp)}}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, report_ticker}, State = #state{accumulators = Accumulators, build_location_code = LocationCode, last_timestamp = Timestamp}) ->
%% keys: total_power, total_runtime, use_times; key的值是可以直接累加的
L = maps:values(Accumulators),
Props0 = lists:map(fun(Key) -> merge(Key, L) end, [<<"total_power">>, <<"total_runtime">>, <<"use_times">>]),
Fields = lists:flatten(Props0),
lager:debug("[iot_build] merged fileds is: ~p", [Fields]),
%iot_zd_endpoint:forward(LocationCode, LocationCode, Fields, Timestamp),
iot_build_logger:write([LocationCode, jiffy:encode(Fields, [force_utf8])]),
erlang:start_timer(?REPORT_INTERVAL, self(), report_ticker),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, #state{build_location_code = BuildLocationCode}) ->
lager:notice("[iot_build] device_uuid: ~p, terminate with reason: ~p", [BuildLocationCode, Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec merge(Key :: any(), L :: list()) -> [map()].
merge(Key, L) when is_list(L) ->
Items = extract_key(Key, L),
AccVal = acc_value(Items),
case Items of
[Hd|_] ->
[Hd#{<<"value">> => AccVal, <<"timestamp">> => max_timestamp(Items)}];
[] ->
[]
end.
-spec extract_key(Key :: any(), L :: list()) -> [map()].
extract_key(Key, L) when is_list(L) ->
lists:foldl(fun(Props, L0) ->
case Props of
#{Key := Prop} ->
[Prop|L0];
_ ->
L0
end
end, [], L).
-spec acc_value(L :: list()) -> number().
acc_value(L) when is_list(L) ->
lists:foldl(fun(Props, Acc) ->
case Props of
#{<<"value">> := Val} when is_number(Val) andalso Val >= 0 ->
Acc + Val;
_ ->
Acc
end
end, 0, L).
-spec max_timestamp(Props :: list()) -> integer().
max_timestamp(Props) when is_list(Props) ->
Timestamps = lists:map(fun(#{<<"timestamp">> := Timestamp}) -> Timestamp end, Props),
lists:max(Timestamps).

View File

@ -0,0 +1,104 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 07. 1 2025 21:32
%%%-------------------------------------------------------------------
-module(iot_build_logger).
-author("anlicheng").
-behaviour(gen_server).
%% API
-export([start_link/0, write/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {
logger_pid :: pid()
}).
%%%===================================================================
%%% API
%%%===================================================================
write(Data) ->
gen_server:cast(?SERVER, {write, Data}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, LoggerPid} = iot_logger:start_link("build_data"),
{ok, #state{logger_pid = LoggerPid}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({write, Data}, State = #state{logger_pid = LoggerPid}) ->
iot_logger:write(LoggerPid, Data),
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -0,0 +1,46 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%% @end
%%%-------------------------------------------------------------------
-module(iot_build_sup).
-include("iot.hrl").
-behaviour(supervisor).
-export([start_link/0, init/1, ensured_build_started/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
{ok, {#{strategy => one_for_one, intensity => 1000, period => 3600}, []}}.
-spec ensured_build_started(BuildLocationCode :: binary()) -> {ok, Pid :: pid()} | {error, Reason :: any()}.
ensured_build_started(BuildLocationCode) when is_binary(BuildLocationCode) ->
case iot_build:get_pid(BuildLocationCode) of
undefined ->
%%
Id = iot_build:get_name(BuildLocationCode),
supervisor:delete_child(?MODULE, Id),
case supervisor:start_child(?MODULE, child_spec(BuildLocationCode)) of
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};
{error, {'already_started', Pid}} when is_pid(Pid) ->
{ok, Pid};
{error, Error} ->
{error, Error}
end;
Pid when is_pid(Pid) ->
{ok, Pid}
end.
child_spec(BuildLocationCode) when is_binary(BuildLocationCode) ->
Name = iot_build:get_name(BuildLocationCode),
#{id => Name,
start => {iot_build, start_link, [Name, BuildLocationCode]},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['iot_device']}.

View File

@ -293,13 +293,14 @@ handle_cast({handle_data, Fields, Timestamp}, State = #state{device_uuid = Devic
case ModelId =:= 20 of
true ->
NLocationCode = extract_build_location_code(LocationCode),
NDynamicLocationCode = extract_build_location_code(DynamicLocationCode),
lager:debug("[iot_device] light device: ~p, location_code: ~p", [DeviceUUID, NLocationCode]),
iot_zd_endpoint:forward(NLocationCode, NDynamicLocationCode, Fields, Timestamp);
lager:debug("[iot_device] light device: ~p, build location_code: ~p", [DeviceUUID, NLocationCode]),
{ok, BuildPid} = iot_build_sup:ensured_build_started(NLocationCode),
iot_build:handle_data(BuildPid, Fields, Timestamp);
false ->
iot_zd_endpoint:forward(LocationCode, DynamicLocationCode, Fields, Timestamp)
end;
ok
%iot_zd_endpoint:forward(LocationCode, DynamicLocationCode, Fields, Timestamp)
end,
iot_zd_endpoint:forward(LocationCode, DynamicLocationCode, Fields, Timestamp);
{ok, _} ->
lager:warning("[iot_device] the north_data hget location_code, uuid: ~p, not found, fields: ~p", [DeviceUUID, Fields]);
{error, Reason} ->
@ -396,7 +397,7 @@ as_integer(Val) when is_binary(Val) ->
binary_to_integer(Val).
-spec extract_build_location_code(binary()) -> binary().
extract_build_location_code(<<BuildLocationCode:17/binary, Suffix/binary>>) ->
extract_build_location_code(<<BuildLocationCode:9/binary, Suffix/binary>>) ->
Len = byte_size(Suffix),
NSuffix = iolist_to_binary(lists:map(fun(_) -> <<"0">> end, lists:seq(1, Len))),
<<BuildLocationCode/binary, NSuffix/binary>>;

View File

@ -55,6 +55,24 @@ init([]) ->
modules => ['iot_task']
},
#{
id => 'iot_build_logger',
start => {'iot_build_logger', start_link, []},
restart => permanent,
shutdown => 2000,
type => supervisor,
modules => ['iot_build_logger']
},
#{
id => 'iot_build_sup',
start => {'iot_build_sup', start_link, []},
restart => permanent,
shutdown => 2000,
type => supervisor,
modules => ['iot_build_sup']
},
#{
id => 'iot_device_sup',
start => {'iot_device_sup', start_link, []},