diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index 0ef05f6..e10c0ed 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -7,7 +7,7 @@ %%% @end %%% Created : 11. 7月 2024 11:33 %%%------------------------------------------------------------------- --module(iot_device1). +-module(iot_device). -author("anlicheng"). -include("iot.hrl"). @@ -16,11 +16,11 @@ %% API -export([get_name/1, get_pid/1, serialize/1]). -export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2, data/2, query/5]). +-export([ai_event/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - %% 存储数据的上限 -define(MAX_SIZE, 2000). @@ -30,7 +30,9 @@ queue = queue:new(), %% 设备是否授权 auth_status :: integer(), - status = ?DEVICE_OFFLINE + status = ?DEVICE_OFFLINE, + %% 事件分组 + ai_groups = #{} }). %%%=================================================================== @@ -109,6 +111,10 @@ data(Pid, DataList) when is_pid(Pid), is_list(DataList) -> query(Pid, Tags, StartTs, StopTs, Limit) when is_pid(Pid), is_map(Tags), is_integer(StartTs), is_integer(StopTs), is_integer(Limit), StartTs >= 0, StopTs >= 0, Limit >= 0 -> gen_server:call(Pid, {query, Tags, StartTs, StopTs, Limit}). +-spec ai_event(Pid :: pid(), EventType :: integer(), Params :: map()) -> no_return(). +ai_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_map(Params) -> + gen_server:cast(Pid, {ai_event, EventType, Params}). + %% @doc Spawns the server and registers the local name (unique) -spec(start_link(Name :: atom(), DeviceUUID :: binary()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). @@ -226,7 +232,20 @@ handle_cast({auth, true}, State = #state{device_uuid = DeviceUUID}) -> {noreply, State#state{auth_status = 1}}; handle_cast({auth, false}, State = #state{device_uuid = DeviceUUID}) -> lager:debug("[iot_device] device_uuid: ~p, auth: false", [DeviceUUID]), - {noreply, State#state{auth_status = 0}}. + {noreply, State#state{auth_status = 0}}; + +%% ai事件的延迟整流逻辑 +handle_cast({ai_event, EventType, Params}, State = #state{ai_groups = Groups, device_uuid = DeviceUUID}) -> + GroupKey = group_by(EventType, Params), + case maps:find(GroupKey, Groups) of + {ok, PublisherPid} -> + iot_event_publisher:publish(PublisherPid, EventType, Params), + {noreply, State}; + error -> + {ok, PublisherPid} = iot_event_publisher:start_link(DeviceUUID, 1), + iot_event_publisher:publish(PublisherPid, EventType, Params), + {noreply, State#state{ai_groups = maps:put(GroupKey, PublisherPid, Groups)}} + end. %% @private %% @doc Handling all non call/cast messages @@ -306,4 +325,8 @@ filter_tags(L, Tags) when map_size(Tags) =:= 0 -> as_integer(Val) when is_integer(Val) -> Val; as_integer(Val) when is_binary(Val) -> - binary_to_integer(Val). \ No newline at end of file + binary_to_integer(Val). + +%% 事件分组函数 +group_by(EventType, _Params) -> + EventType. \ No newline at end of file diff --git a/apps/iot/src/iot_event_publisher.erl b/apps/iot/src/iot_event_publisher.erl new file mode 100644 index 0000000..10edae2 --- /dev/null +++ b/apps/iot/src/iot_event_publisher.erl @@ -0,0 +1,171 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2024, +%%% @doc +%%% 事件发布器,提供基于时间周期的缓冲 +%%% @end +%%% Created : 11. 7月 2024 14:40 +%%%------------------------------------------------------------------- +-module(iot_event_publisher). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/2, publish/3, set_throttle/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + device_uuid :: binary(), + throttle_time = 0 :: integer(), + timer_ref :: undefined | reference(), + %% 已经发送的事件的计数器 + counter = 0, + last_event :: any(), + %% 最后一次发送数据的时间戳, 如果数据跨越了多个时间跨度时也需要立即发送出去 + last_timestamp = 0 :: integer() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec publish(Pid :: pid(), EventType :: integer(), Params :: map()) -> no_return(). +publish(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_map(Params) -> + gen_server:cast(Pid, {publish, EventType, Params}). + +-spec set_throttle(Pid :: pid(), ThrottleTime :: integer()) -> no_return(). +set_throttle(Pid, ThrottleTime) when is_pid(Pid), is_integer(ThrottleTime), ThrottleTime > 0 -> + gen_server:cast(Pid, {set_throttle, ThrottleTime}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(DeviceUUID :: binary(), ThrottleTime :: integer()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(DeviceUUID, ThrottleTime) when is_binary(DeviceUUID), is_integer(ThrottleTime) -> + gen_server:start_link(?MODULE, [DeviceUUID, ThrottleTime], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([DeviceUUID, ThrottleTime]) -> + TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), + {ok, #state{device_uuid = DeviceUUID, throttle_time = ThrottleTime, timer_ref = TimerRef, last_timestamp = iot_util:timestamp_of_seconds()}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +%% 重置时间周期:新的周期大于之前的 +handle_cast({set_throttle, NThrottleTime}, State = #state{throttle_time = ThrottleTime}) when NThrottleTime > ThrottleTime -> + {noreply, State#state{throttle_time = NThrottleTime}}; + +%% 重置时间周期:新的周期小于之前的 +handle_cast({set_throttle, NThrottleTime}, State = #state{device_uuid = DeviceUUID, counter = Counter, timer_ref = TimerRef, last_timestamp = LastTimestamp, last_event = LastEvent}) -> + %% 重置定时器 + is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), + + Timestamp = iot_util:timestamp_of_seconds(), + PassedSeconds = Timestamp - LastTimestamp, + case PassedSeconds >= NThrottleTime of + true -> + NTimerRef = erlang:start_timer(NThrottleTime * 1000, self(), throttle_ticker), + %% 发送消息 + case LastEvent of + {EventType, Params} -> + iot_ai_router:route_uuid(DeviceUUID, EventType, Params), + {noreply, State#state{timer_ref = NTimerRef, counter = Counter + 1, last_timestamp = iot_util:timestamp_of_seconds()}}; + undefined -> + {noreply, State#state{timer_ref = NTimerRef}} + end; + false -> + NTimerRef = erlang:start_timer((NThrottleTime - PassedSeconds) * 1000, self(), throttle_ticker), + {noreply, State#state{timer_ref = NTimerRef}} + end; + +%% 第一次收到消息,则立即发送; 并且重置定时器 +handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, throttle_time = ThrottleTime, timer_ref = TimerRef, counter = 0}) -> + %% 重置定时器 + is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), + NTimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), + %% 发送消息 + iot_ai_router:route_uuid(DeviceUUID, EventType, Params), + {noreply, State#state{counter = 1, timer_ref = NTimerRef, last_timestamp = iot_util:timestamp_of_seconds()}}; + +%% 缓冲数据,等到时间到了再发送 +handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, timer_ref = TimerRef, counter = Counter, last_timestamp = LastTimestamp, throttle_time = ThrottleTime}) -> + Timestamp = iot_util:timestamp_of_seconds(), + %% 如果数据发送间隔已经超过了一个时间跨度,也需要立即发送 + case LastTimestamp > 0 andalso Timestamp > (LastTimestamp + ThrottleTime) of + true -> + %% 重置定时器 + is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), + NTimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), + %% 发送消息 + iot_ai_router:route_uuid(DeviceUUID, EventType, Params), + {noreply, State#state{counter = Counter + 1, timer_ref = NTimerRef, last_timestamp = iot_util:timestamp_of_seconds()}}; + false -> + {noreply, State#state{counter = Counter + 1, last_event = {EventType, Params}}} + end. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +%% 周期时间内没有任何数据 +handle_info({timeout, _, throttle_ticker}, State = #state{throttle_time = ThrottleTime, last_event = undefined}) -> + TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), + {noreply, State#state{timer_ref = TimerRef}}; +%% 已经发送的数据也需要重置,避免推送重复的数据 +handle_info({timeout, _, throttle_ticker}, State = #state{device_uuid = DeviceUUID, throttle_time = ThrottleTime, last_event = {EventType, Params}}) -> + TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), + iot_ai_router:route_uuid(DeviceUUID, EventType, Params), + {noreply, State#state{timer_ref = TimerRef, last_event = undefined, last_timestamp = iot_util:timestamp_of_seconds()}}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%===================================================================