diff --git a/apps/iot/priv/weigang_light.txt b/apps/iot/priv/weigang_light.txt index af6b4b9..7b42fbe 100644 --- a/apps/iot/priv/weigang_light.txt +++ b/apps/iot/priv/weigang_light.txt @@ -4534,4 +4534,4 @@ 30409246922034790417259608594274 30409246927067955217259608594398 30409246932520550417259608594526 -30409246937973145617259608594650 +30409246937973145617259608594650 \ No newline at end of file diff --git a/apps/iot/src/iot_build.erl b/apps/iot/src/iot_build.erl new file mode 100644 index 0000000..5c1c157 --- /dev/null +++ b/apps/iot/src/iot_build.erl @@ -0,0 +1,182 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2024, +%%% @doc +%% 1. 智慧照明基于建筑物的汇总逻辑处理,每5分钟上报一次 +%%% @end +%%% Created : 11. 7月 2024 11:33 +%%%------------------------------------------------------------------- +-module(iot_build). +-author("anlicheng"). +-include("iot.hrl"). + +-behaviour(gen_server). + +%% 数据汇报周期,默认值为 5分钟 +-define(REPORT_INTERVAL, 300 * 1000). + +%% API +-export([get_name/1, get_pid/1]). +-export([start_link/2]). +-export([handle_data/3]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-record(state, { + build_location_code :: binary(), + %% 数据累加器, 数据格式: #{device_uuid => #{key0 => props0, key1 => Props1}} + accumulators = #{}, + last_timestamp = 0 +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec get_pid(BuildLocationCode :: binary()) -> Pid :: pid() | undefined. +get_pid(BuildLocationCode) when is_binary(BuildLocationCode) -> + whereis(get_name(BuildLocationCode)). + +-spec get_name(BuildLocationCode :: binary()) -> atom(). +get_name(BuildLocationCode) when is_binary(BuildLocationCode) -> + binary_to_atom(<<"iot_build:", BuildLocationCode/binary>>). + +-spec handle_data(Pid :: pid(), Fields :: list(), Timestamp :: integer()) -> no_return(). +handle_data(Pid, Fields, Timestamp) when is_pid(Pid), is_list(Fields), is_integer(Timestamp) -> + gen_server:cast(Pid, {handle_data, Fields, Timestamp}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(Name :: atom(), BuildLocationCode :: binary()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(Name, BuildLocationCode) when is_atom(Name), is_binary(BuildLocationCode) -> + gen_server:start_link({local, Name}, ?MODULE, [BuildLocationCode], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([BuildLocationCode]) when is_binary(BuildLocationCode) -> + erlang:start_timer(?REPORT_INTERVAL, self(), report_ticker), + {ok, #state{build_location_code = BuildLocationCode}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +%% 判断当前设备是否是激活状态 +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +%% 设备数据转发, Fields是一个数组,格式为: +%% [ +%% {"value":12733,"unit":2,"type":"AI","timestamp":1736251262,"name":"总能耗","label":"W.h","key":"total_power","device_uuid":"33806805232936960017340612696766"}, +%% {"value":244,"unit":16,"type":"AI","timestamp":1736251262,"name":"总运行时间","label":"h","key":"total_runtime","device_uuid":"33806805232936960017340612696766"} +%% ] +%% 数据采用当前设备的最新数据覆盖来的数据 +%% acc的格式为:#{device_uuid => #{key0 => props0, key1 => Props1}} +handle_cast({handle_data, DeviceUUID, Fields, Timestamp}, State = #state{accumulators = Accumulators, last_timestamp = LastTimestamp}) -> + DeviceProps = maps:get(DeviceUUID, Accumulators, #{}), + NDeviceProps = lists:foldl(fun(Prop = #{<<"key">> := Key}, Acc0) -> Acc0#{Key => Prop} end, DeviceProps, Fields), + + {noreply, State#state{accumulators = maps:put(DeviceUUID, NDeviceProps, Accumulators), last_timestamp = max(Timestamp, LastTimestamp)}}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({timeout, _, report_ticker}, State = #state{accumulators = Accumulators, build_location_code = LocationCode, last_timestamp = Timestamp}) -> + %% 需要汇报的keys: total_power, total_runtime, use_times; 这几个key的值是可以直接累加的 + L = maps:values(Accumulators), + + Props0 = lists:map(fun(Key) -> merge(Key, L) end, [<<"total_power">>, <<"total_runtime">>, <<"use_times">>]), + Fields = lists:flatten(Props0), + + lager:debug("[iot_build] merged fileds is: ~p", [Fields]), + %iot_zd_endpoint:forward(LocationCode, LocationCode, Fields, Timestamp), + iot_build_logger:write([LocationCode, jiffy:encode(Fields, [force_utf8])]), + + erlang:start_timer(?REPORT_INTERVAL, self(), report_ticker), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(Reason, #state{build_location_code = BuildLocationCode}) -> + lager:notice("[iot_build] device_uuid: ~p, terminate with reason: ~p", [BuildLocationCode, Reason]), + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec merge(Key :: any(), L :: list()) -> [map()]. +merge(Key, L) when is_list(L) -> + Items = extract_key(Key, L), + AccVal = acc_value(Items), + case Items of + [Hd|_] -> + [Hd#{<<"value">> => AccVal, <<"timestamp">> => max_timestamp(Items)}]; + [] -> + [] + end. + +-spec extract_key(Key :: any(), L :: list()) -> [map()]. +extract_key(Key, L) when is_list(L) -> + lists:foldl(fun(Props, L0) -> + case Props of + #{Key := Prop} -> + [Prop|L0]; + _ -> + L0 + end + end, [], L). + +-spec acc_value(L :: list()) -> number(). +acc_value(L) when is_list(L) -> + lists:foldl(fun(Props, Acc) -> + case Props of + #{<<"value">> := Val} when is_number(Val) andalso Val >= 0 -> + Acc + Val; + _ -> + Acc + end + end, 0, L). + +-spec max_timestamp(Props :: list()) -> integer(). +max_timestamp(Props) when is_list(Props) -> + Timestamps = lists:map(fun(#{<<"timestamp">> := Timestamp}) -> Timestamp end, Props), + lists:max(Timestamps). \ No newline at end of file diff --git a/apps/iot/src/iot_build_logger.erl b/apps/iot/src/iot_build_logger.erl new file mode 100644 index 0000000..1fdef4e --- /dev/null +++ b/apps/iot/src/iot_build_logger.erl @@ -0,0 +1,104 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 07. 1月 2025 21:32 +%%%------------------------------------------------------------------- +-module(iot_build_logger). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0, write/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + logger_pid :: pid() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +write(Data) -> + gen_server:cast(?SERVER, {write, Data}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, LoggerPid} = iot_logger:start_link("build_data"), + {ok, #state{logger_pid = LoggerPid}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({write, Data}, State = #state{logger_pid = LoggerPid}) -> + iot_logger:write(LoggerPid, Data), + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/iot_build_sup.erl b/apps/iot/src/iot_build_sup.erl new file mode 100644 index 0000000..35f681f --- /dev/null +++ b/apps/iot/src/iot_build_sup.erl @@ -0,0 +1,46 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% @end +%%%------------------------------------------------------------------- +-module(iot_build_sup). +-include("iot.hrl"). + +-behaviour(supervisor). + +-export([start_link/0, init/1, ensured_build_started/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + {ok, {#{strategy => one_for_one, intensity => 1000, period => 3600}, []}}. + +-spec ensured_build_started(BuildLocationCode :: binary()) -> {ok, Pid :: pid()} | {error, Reason :: any()}. +ensured_build_started(BuildLocationCode) when is_binary(BuildLocationCode) -> + case iot_build:get_pid(BuildLocationCode) of + undefined -> + %% 先删除,然后添加 + Id = iot_build:get_name(BuildLocationCode), + supervisor:delete_child(?MODULE, Id), + case supervisor:start_child(?MODULE, child_spec(BuildLocationCode)) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + {error, {'already_started', Pid}} when is_pid(Pid) -> + {ok, Pid}; + {error, Error} -> + {error, Error} + end; + Pid when is_pid(Pid) -> + {ok, Pid} + end. + +child_spec(BuildLocationCode) when is_binary(BuildLocationCode) -> + Name = iot_build:get_name(BuildLocationCode), + #{id => Name, + start => {iot_build, start_link, [Name, BuildLocationCode]}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_device']}. \ No newline at end of file diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index 9ac1c75..e736c86 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -293,13 +293,14 @@ handle_cast({handle_data, Fields, Timestamp}, State = #state{device_uuid = Devic case ModelId =:= 20 of true -> NLocationCode = extract_build_location_code(LocationCode), - NDynamicLocationCode = extract_build_location_code(DynamicLocationCode), - lager:debug("[iot_device] light device: ~p, location_code: ~p", [DeviceUUID, NLocationCode]), - - iot_zd_endpoint:forward(NLocationCode, NDynamicLocationCode, Fields, Timestamp); + lager:debug("[iot_device] light device: ~p, build location_code: ~p", [DeviceUUID, NLocationCode]), + {ok, BuildPid} = iot_build_sup:ensured_build_started(NLocationCode), + iot_build:handle_data(BuildPid, Fields, Timestamp); false -> - iot_zd_endpoint:forward(LocationCode, DynamicLocationCode, Fields, Timestamp) - end; + ok + %iot_zd_endpoint:forward(LocationCode, DynamicLocationCode, Fields, Timestamp) + end, + iot_zd_endpoint:forward(LocationCode, DynamicLocationCode, Fields, Timestamp); {ok, _} -> lager:warning("[iot_device] the north_data hget location_code, uuid: ~p, not found, fields: ~p", [DeviceUUID, Fields]); {error, Reason} -> @@ -396,7 +397,7 @@ as_integer(Val) when is_binary(Val) -> binary_to_integer(Val). -spec extract_build_location_code(binary()) -> binary(). -extract_build_location_code(<>) -> +extract_build_location_code(<>) -> Len = byte_size(Suffix), NSuffix = iolist_to_binary(lists:map(fun(_) -> <<"0">> end, lists:seq(1, Len))), <>; diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 4c32dd3..87e00a9 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -55,6 +55,24 @@ init([]) -> modules => ['iot_task'] }, + #{ + id => 'iot_build_logger', + start => {'iot_build_logger', start_link, []}, + restart => permanent, + shutdown => 2000, + type => supervisor, + modules => ['iot_build_logger'] + }, + + #{ + id => 'iot_build_sup', + start => {'iot_build_sup', start_link, []}, + restart => permanent, + shutdown => 2000, + type => supervisor, + modules => ['iot_build_sup'] + }, + #{ id => 'iot_device_sup', start => {'iot_device_sup', start_link, []},