From 5b5f4b604255733e8cb0b02f44e0197019290cd3 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 11 Jul 2024 16:25:15 +0800 Subject: [PATCH] event peroid --- apps/iot/src/iot_device.erl | 2 +- apps/iot/src/iot_event_period_settings.erl | 123 +++++++++++++++++++++ apps/iot/src/iot_event_publisher.erl | 66 ++++------- apps/iot/src/iot_sup.erl | 9 ++ 4 files changed, 154 insertions(+), 46 deletions(-) create mode 100644 apps/iot/src/iot_event_period_settings.erl diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index e10c0ed..95d5374 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -242,7 +242,7 @@ handle_cast({ai_event, EventType, Params}, State = #state{ai_groups = Groups, de iot_event_publisher:publish(PublisherPid, EventType, Params), {noreply, State}; error -> - {ok, PublisherPid} = iot_event_publisher:start_link(DeviceUUID, 1), + {ok, PublisherPid} = iot_event_publisher:start_link(DeviceUUID, GroupKey), iot_event_publisher:publish(PublisherPid, EventType, Params), {noreply, State#state{ai_groups = maps:put(GroupKey, PublisherPid, Groups)}} end. diff --git a/apps/iot/src/iot_event_period_settings.erl b/apps/iot/src/iot_event_period_settings.erl new file mode 100644 index 0000000..f0a9275 --- /dev/null +++ b/apps/iot/src/iot_event_period_settings.erl @@ -0,0 +1,123 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2024, +%%% @doc +%%% +%%% @end +%%% Created : 11. 7月 2024 15:54 +%%%------------------------------------------------------------------- +-module(iot_event_period_settings). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([get_throttle/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-define(TAB_NAME, iot_ets_event_period). + +-record(state, { + +}). + +-record(period, { + group_key :: any(), + throttle :: integer() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% 获取设置的时间周期 +-spec get_throttle(GroupKey :: any()) -> integer(). +get_throttle(GroupKey) -> + case ets:lookup(?TAB_NAME, GroupKey) of + [] -> + 300; + [#period{throttle = Throttle}|_] -> + Throttle + end. + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + ets:new(?TAB_NAME, [public, set, named_table, {keypos, 2}]), + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({get_throttle, GroupKey}, _From, State = #state{settings = Settings}) -> + case maps:find(GroupKey, Settings) of + {ok, Throttle} -> + {reply, {ok, Throttle}, State}; + error -> + {reply, {ok, 300}, State} + end. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/iot_event_publisher.erl b/apps/iot/src/iot_event_publisher.erl index 10edae2..0b65c4c 100644 --- a/apps/iot/src/iot_event_publisher.erl +++ b/apps/iot/src/iot_event_publisher.erl @@ -21,11 +21,11 @@ -record(state, { device_uuid :: binary(), - throttle_time = 0 :: integer(), - timer_ref :: undefined | reference(), + timer_ref = undefined :: undefined | reference(), + group_key :: any(), %% 已经发送的事件的计数器 counter = 0, - last_event :: any(), + last_event = undefined :: any(), %% 最后一次发送数据的时间戳, 如果数据跨越了多个时间跨度时也需要立即发送出去 last_timestamp = 0 :: integer() }). @@ -43,10 +43,10 @@ set_throttle(Pid, ThrottleTime) when is_pid(Pid), is_integer(ThrottleTime), Thro gen_server:cast(Pid, {set_throttle, ThrottleTime}). %% @doc Spawns the server and registers the local name (unique) --spec(start_link(DeviceUUID :: binary(), ThrottleTime :: integer()) -> +-spec(start_link(DeviceUUID :: binary(), GroupKey :: any()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(DeviceUUID, ThrottleTime) when is_binary(DeviceUUID), is_integer(ThrottleTime) -> - gen_server:start_link(?MODULE, [DeviceUUID, ThrottleTime], []). +start_link(DeviceUUID, GroupKey) when is_binary(DeviceUUID) -> + gen_server:start_link(?MODULE, [DeviceUUID, GroupKey], []). %%%=================================================================== %%% gen_server callbacks @@ -57,9 +57,8 @@ start_link(DeviceUUID, ThrottleTime) when is_binary(DeviceUUID), is_integer(Thro -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([DeviceUUID, ThrottleTime]) -> - TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), - {ok, #state{device_uuid = DeviceUUID, throttle_time = ThrottleTime, timer_ref = TimerRef, last_timestamp = iot_util:timestamp_of_seconds()}}. +init([DeviceUUID, GroupKey]) -> + {ok, #state{device_uuid = DeviceUUID, group_key = GroupKey, last_timestamp = iot_util:timestamp_of_seconds(), last_event = undefined}}. %% @private %% @doc Handling call messages @@ -80,47 +79,21 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -%% 重置时间周期:新的周期大于之前的 -handle_cast({set_throttle, NThrottleTime}, State = #state{throttle_time = ThrottleTime}) when NThrottleTime > ThrottleTime -> - {noreply, State#state{throttle_time = NThrottleTime}}; - -%% 重置时间周期:新的周期小于之前的 -handle_cast({set_throttle, NThrottleTime}, State = #state{device_uuid = DeviceUUID, counter = Counter, timer_ref = TimerRef, last_timestamp = LastTimestamp, last_event = LastEvent}) -> - %% 重置定时器 - is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), - - Timestamp = iot_util:timestamp_of_seconds(), - PassedSeconds = Timestamp - LastTimestamp, - case PassedSeconds >= NThrottleTime of - true -> - NTimerRef = erlang:start_timer(NThrottleTime * 1000, self(), throttle_ticker), - %% 发送消息 - case LastEvent of - {EventType, Params} -> - iot_ai_router:route_uuid(DeviceUUID, EventType, Params), - {noreply, State#state{timer_ref = NTimerRef, counter = Counter + 1, last_timestamp = iot_util:timestamp_of_seconds()}}; - undefined -> - {noreply, State#state{timer_ref = NTimerRef}} - end; - false -> - NTimerRef = erlang:start_timer((NThrottleTime - PassedSeconds) * 1000, self(), throttle_ticker), - {noreply, State#state{timer_ref = NTimerRef}} - end; - %% 第一次收到消息,则立即发送; 并且重置定时器 -handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, throttle_time = ThrottleTime, timer_ref = TimerRef, counter = 0}) -> +handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, group_key = GroupKey, counter = 0}) -> %% 重置定时器 - is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), - NTimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), + ThrottleTime = iot_event_period_settings:get_throttle(GroupKey), + TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), %% 发送消息 iot_ai_router:route_uuid(DeviceUUID, EventType, Params), - {noreply, State#state{counter = 1, timer_ref = NTimerRef, last_timestamp = iot_util:timestamp_of_seconds()}}; + {noreply, State#state{counter = 1, timer_ref = TimerRef, last_timestamp = iot_util:timestamp_of_seconds()}}; %% 缓冲数据,等到时间到了再发送 -handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, timer_ref = TimerRef, counter = Counter, last_timestamp = LastTimestamp, throttle_time = ThrottleTime}) -> +handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, timer_ref = TimerRef, counter = Counter, last_timestamp = LastTimestamp, group_key = GroupKey}) -> + ThrottleTime = iot_event_period_settings:get_throttle(GroupKey), Timestamp = iot_util:timestamp_of_seconds(), %% 如果数据发送间隔已经超过了一个时间跨度,也需要立即发送 - case LastTimestamp > 0 andalso Timestamp > (LastTimestamp + ThrottleTime) of + case Timestamp > LastTimestamp + ThrottleTime of true -> %% 重置定时器 is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), @@ -129,7 +102,7 @@ handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUI iot_ai_router:route_uuid(DeviceUUID, EventType, Params), {noreply, State#state{counter = Counter + 1, timer_ref = NTimerRef, last_timestamp = iot_util:timestamp_of_seconds()}}; false -> - {noreply, State#state{counter = Counter + 1, last_event = {EventType, Params}}} + {noreply, State#state{last_event = {EventType, Params}}} end. %% @private @@ -139,11 +112,14 @@ handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUI {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 周期时间内没有任何数据 -handle_info({timeout, _, throttle_ticker}, State = #state{throttle_time = ThrottleTime, last_event = undefined}) -> +handle_info({timeout, _, throttle_ticker}, State = #state{group_key = GroupKey, last_event = undefined}) -> + ThrottleTime = iot_event_period_settings:get_throttle(GroupKey), TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), {noreply, State#state{timer_ref = TimerRef}}; + %% 已经发送的数据也需要重置,避免推送重复的数据 -handle_info({timeout, _, throttle_ticker}, State = #state{device_uuid = DeviceUUID, throttle_time = ThrottleTime, last_event = {EventType, Params}}) -> +handle_info({timeout, _, throttle_ticker}, State = #state{device_uuid = DeviceUUID, group_key = GroupKey, last_event = {EventType, Params}}) -> + ThrottleTime = iot_event_period_settings:get_throttle(GroupKey), TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), iot_ai_router:route_uuid(DeviceUUID, EventType, Params), {noreply, State#state{timer_ref = TimerRef, last_event = undefined, last_timestamp = iot_util:timestamp_of_seconds()}}. diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 6cd29d5..85de54d 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -37,6 +37,15 @@ init([]) -> modules => ['iot_api'] }, + #{ + id => 'iot_event_period_settings', + start => {'iot_event_period_settings', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_event_period_settings'] + }, + #{ id => 'iot_device_sup', start => {'iot_device_sup', start_link, []},