From 0d799a5bc67b4cc4c0829cbe8151a620670f858e Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 11 Jul 2024 14:28:37 +0800 Subject: [PATCH 01/11] change iot_device gen_statem -> gen_server --- apps/iot/src/iot_device.erl | 261 ++++++++++++++++++------------------ 1 file changed, 131 insertions(+), 130 deletions(-) diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index f6ed8d6..0ef05f6 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -1,38 +1,35 @@ %%%------------------------------------------------------------------- -%%% @copyright (C) 2023, +%%% @author anlicheng +%%% @copyright (C) 2024, %%% @doc +%% 1. 终端是否授权 => 1: 授权,0: 未授权 %%% %%% @end -%%% Created : 14. 8月 2023 11:40 +%%% Created : 11. 7月 2024 11:33 %%%------------------------------------------------------------------- --module(iot_device). --author("aresei"). +-module(iot_device1). +-author("anlicheng"). -include("iot.hrl"). --behaviour(gen_statem). +-behaviour(gen_server). %% API -export([get_name/1, get_pid/1, serialize/1]). -export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2, data/2, query/5]). -%% gen_statem callbacks --export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%% 终端是否授权 --define(DEVICE_AUTH_DENIED, 0). --define(DEVICE_AUTH_AUTHED, 1). %% 存储数据的上限 -define(MAX_SIZE, 2000). -%% 状态 --define(STATE_DENIED, denied). --define(STATE_ACTIVATED, activated). - -record(state, { device_uuid :: binary(), %% 用来保存数据,作为存储在influxdb里面的数据的备份 queue = queue:new(), + %% 设备是否授权 + auth_status :: integer(), status = ?DEVICE_OFFLINE }). @@ -50,6 +47,7 @@ % "timestamp": int %} %], +%% 用来保存在内存中的格式,不需要序列话处理 !!!, 放入到influxdb的数据是基于base64的 -spec serialize(FieldsList :: [map()]) -> [Val :: map()]. serialize(FieldsList) when is_list(FieldsList) -> lists:flatmap(fun serialize0/1, FieldsList). @@ -87,46 +85,47 @@ get_name(DeviceUUID) when is_binary(DeviceUUID) -> is_activated(undefined) -> false; is_activated(Pid) when is_pid(Pid) -> - gen_statem:call(Pid, is_activated). + gen_server:call(Pid, is_activated). -spec change_status(Pid :: pid() | undefined, NewStatus :: integer()) -> no_return(). change_status(undefined, _) -> ok; change_status(Pid, NewStatus) when is_pid(Pid), is_integer(NewStatus) -> - gen_statem:cast(Pid, {change_status, NewStatus}). + gen_server:cast(Pid, {change_status, NewStatus}). -spec reload(Pid :: pid()) -> no_return(). reload(Pid) when is_pid(Pid) -> - gen_statem:cast(Pid, reload). + gen_server:cast(Pid, reload). -spec auth(Pid :: pid(), Auth :: boolean()) -> no_return(). auth(Pid, Auth) when is_pid(Pid), is_boolean(Auth) -> - gen_statem:cast(Pid, {auth, Auth}). + gen_server:cast(Pid, {auth, Auth}). -spec data(Pid :: pid(), DataList :: [#device_data{}]) -> no_return(). data(Pid, DataList) when is_pid(Pid), is_list(DataList) -> - gen_statem:cast(Pid, {data, DataList}). + gen_server:cast(Pid, {data, DataList}). -spec query(Pid :: pid(), Tags :: map(), StartTs :: integer(), StopTs :: integer(), Limit :: integer()) -> {ok, [#device_data{}]}. query(Pid, Tags, StartTs, StopTs, Limit) when is_pid(Pid), is_map(Tags), is_integer(StartTs), is_integer(StopTs), is_integer(Limit), StartTs >= 0, StopTs >= 0, Limit >= 0 -> - gen_statem:call(Pid, {query, Tags, StartTs, StopTs, Limit}). + gen_server:call(Pid, {query, Tags, StartTs, StopTs, Limit}). -%% @doc Creates a gen_statem process which calls Module:init/1 to -%% initialize. To ensure a synchronized start-up procedure, this -%% function does not return until Module:init/1 has returned. +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(Name :: atom(), DeviceUUID :: binary()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). start_link(Name, DeviceUUID) when is_atom(Name), is_binary(DeviceUUID) -> - gen_statem:start_link({local, Name}, ?MODULE, [DeviceUUID], []); + gen_server:start_link({local, Name}, ?MODULE, [DeviceUUID], []); start_link(Name, DeviceInfo) when is_atom(Name), is_map(DeviceInfo) -> - gen_statem:start_link({local, Name}, ?MODULE, [DeviceInfo], []). + gen_server:start_link({local, Name}, ?MODULE, [DeviceInfo], []). %%%=================================================================== -%%% gen_statem callbacks +%%% gen_server callbacks %%%=================================================================== %% @private -%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or -%% gen_statem:start_link/[3,4], this function is called by the new -%% process to initialize. +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). init([DeviceUUID]) when is_binary(DeviceUUID) -> case device_bo:get_device_by_uuid(DeviceUUID) of {ok, DeviceInfo} -> @@ -135,85 +134,25 @@ init([DeviceUUID]) when is_binary(DeviceUUID) -> lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]), ignore end; -init([DeviceInfo = #{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}]) when is_map(DeviceInfo) -> - case AuthorizeStatus =:= ?DEVICE_AUTH_AUTHED of - true -> - {ok, ?STATE_ACTIVATED, #state{device_uuid = DeviceUUID, status = Status}}; - false -> - {ok, ?STATE_DENIED, #state{device_uuid = DeviceUUID, status = Status}} - end. +init([#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}]) -> + {ok, #state{device_uuid = DeviceUUID, status = as_integer(Status), auth_status = as_integer(AuthorizeStatus)}}. %% @private -%% @doc This function is called by a gen_statem when it needs to find out -%% the callback mode of the callback module. -callback_mode() -> - handle_event_function. +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +%% 判断当前设备是否是激活状态 +handle_call(is_activated, _From, State = #state{auth_status = AuthStatus}) -> + {reply, AuthStatus =:= 1, State}; -%% @private -%% @doc If callback_mode is handle_event_function, then whenever a -%% gen_statem receives an event from call/2, cast/2, or as a normal -%% process message, this function is called. - -%% 判断是否是激活状态 -handle_event({call, From}, is_activated, StateName, State = #state{}) -> - {keep_state, State, [{reply, From, StateName =:= ?STATE_ACTIVATED}]}; - -%% 改变为在线状态,但是数据库中的状态已经是在线状态,忽略 -handle_event(cast, {change_status, ?DEVICE_ONLINE}, _, State = #state{status = ?DEVICE_ONLINE}) -> - {keep_state, State}; -%% 改变数据库的状态, 其他情况下执行次数都很少 -handle_event(cast, {change_status, ?DEVICE_ONLINE}, _, State = #state{device_uuid = DeviceUUID}) -> - {ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_ONLINE), - report_event(DeviceUUID, ?DEVICE_ONLINE), - {keep_state, State#state{status = ?DEVICE_ONLINE}}; - -handle_event(cast, {change_status, ?DEVICE_OFFLINE}, _, State = #state{device_uuid = DeviceUUID}) -> - {ok, #{<<"status">> := Status}} = device_bo:get_device_by_uuid(DeviceUUID), - case Status of - ?DEVICE_NOT_JOINED -> - lager:debug("[iot_device] device: ~p, device_maybe_offline, not joined, can not change to offline", [DeviceUUID]), - {keep_state, State#state{status = ?DEVICE_NOT_JOINED}}; - ?DEVICE_OFFLINE -> - lager:debug("[iot_device] device: ~p, device_maybe_offline, is offline, do nothing", [DeviceUUID]), - {keep_state, State#state{status = ?DEVICE_OFFLINE}}; - ?DEVICE_ONLINE -> - {ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_OFFLINE), - report_event(DeviceUUID, ?DEVICE_OFFLINE), - {keep_state, State#state{status = ?DEVICE_OFFLINE}} - end; - -%% 重新加载数据库数据 -handle_event(cast, reload, _, State = #state{device_uuid = DeviceUUID}) -> - lager:debug("[iot_device] will reload: ~p", [DeviceUUID]), - case device_bo:get_device_by_uuid(DeviceUUID) of - {ok, #{<<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}} -> - case AuthorizeStatus =:= ?DEVICE_AUTH_AUTHED of - true -> - {next_state, ?STATE_ACTIVATED, State#state{status = Status}}; - false -> - {next_state, ?STATE_DENIED, State#state{status = Status}} - end; - undefined -> - lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]), - {stop, normal, State} - end; - -%% 向设备中追加数据 -handle_event(cast, {data, DataList}, _, State = #state{queue = Q}) -> - NQ = lists:foldl(fun(Data, Q0) -> - case queue:len(Q0) > ?MAX_SIZE of - true -> - {_, Q1} = queue:out(Q0), - queue:in(Data, Q1); - false -> - queue:in(Data, Q0) - end - end, Q, DataList), - - {keep_state, State#state{queue = NQ}}; - -%% 查询device里面的数据 -handle_event({call, From}, {query, Tags, StartTs, StopTs, Limit}, _StateName, State = #state{queue = Q}) -> +%% 查询当前设备中产生的数据,缓存在内存中 +handle_call({query, Tags, StartTs, StopTs, Limit}, _From, State = #state{queue = Q}) -> L = queue:to_list(Q), %% 过滤 @@ -228,38 +167,94 @@ handle_event({call, From}, {query, Tags, StartTs, StopTs, Limit}, _StateName, St false -> lists:sublist(L3, 1, Limit) end, - {keep_state, State, [{reply, From, {ok, L4}}]}; - -%% 处理授权 -handle_event(cast, {auth, Auth}, StateName, State = #state{device_uuid = DeviceUUID}) -> - case {StateName, Auth} of - {?STATE_DENIED, false} -> - lager:debug("[iot_device] device_uuid: ~p, auth: false, will keep state_name: ~p", [DeviceUUID, ?STATE_DENIED]), - {keep_state, State}; - {?STATE_DENIED, true} -> - {next_state, ?STATE_ACTIVATED, State}; - - {?STATE_ACTIVATED, false} -> - lager:debug("[iot_device] device_uuid: ~p, auth: false, state_name from: ~p, to: ~p", [DeviceUUID, ?STATE_ACTIVATED, ?STATE_DENIED]), - {next_state, ?STATE_DENIED, State}; - {?STATE_ACTIVATED, true} -> - lager:debug("[iot_device] device_uuid: ~p, auth: true, will keep state_name: ~p", [DeviceUUID, ?STATE_ACTIVATED]), - {keep_state, State} - end. + {reply, {ok, L4}, State}. %% @private -%% @doc This function is called by a gen_statem when it is about to +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({change_status, ?DEVICE_ONLINE}, State = #state{status = ?DEVICE_ONLINE}) -> + {noreply, State}; +handle_cast({change_status, ?DEVICE_ONLINE}, State = #state{device_uuid = DeviceUUID}) -> + {ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_ONLINE), + report_event(DeviceUUID, ?DEVICE_ONLINE), + {noreply, State#state{status = ?DEVICE_ONLINE}}; +handle_cast({change_status, ?DEVICE_OFFLINE}, State = #state{device_uuid = DeviceUUID}) -> + {ok, #{<<"status">> := Status}} = device_bo:get_device_by_uuid(DeviceUUID), + case Status of + ?DEVICE_NOT_JOINED -> + lager:debug("[iot_device] device: ~p, device_maybe_offline, not joined, can not change to offline", [DeviceUUID]), + {noreply, State#state{status = ?DEVICE_NOT_JOINED}}; + ?DEVICE_OFFLINE -> + lager:debug("[iot_device] device: ~p, device_maybe_offline, is offline, do nothing", [DeviceUUID]), + {noreply, State#state{status = ?DEVICE_OFFLINE}}; + ?DEVICE_ONLINE -> + {ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_OFFLINE), + report_event(DeviceUUID, ?DEVICE_OFFLINE), + {noreply, State#state{status = ?DEVICE_OFFLINE}} + end; + +%% 重新加载数据库数据 +handle_cast(reload, State = #state{device_uuid = DeviceUUID}) -> + lager:debug("[iot_device] will reload: ~p", [DeviceUUID]), + case device_bo:get_device_by_uuid(DeviceUUID) of + {ok, #{<<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}} -> + {noreply, State#state{status = as_integer(Status), auth_status = as_integer(AuthorizeStatus)}}; + undefined -> + lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]), + {stop, normal, State} + end; + +%% 向设备中追加数据 +handle_cast({data, DataList}, State = #state{queue = Q}) -> + NQ = lists:foldl(fun(Data, Q0) -> + case queue:len(Q0) > ?MAX_SIZE of + true -> + {_, Q1} = queue:out(Q0), + queue:in(Data, Q1); + false -> + queue:in(Data, Q0) + end + end, Q, DataList), + {noreply, State#state{queue = NQ}}; + +%% 处理授权 +handle_cast({auth, true}, State = #state{device_uuid = DeviceUUID}) -> + lager:debug("[iot_device] device_uuid: ~p, auth: true", [DeviceUUID]), + {noreply, State#state{auth_status = 1}}; +handle_cast({auth, false}, State = #state{device_uuid = DeviceUUID}) -> + lager:debug("[iot_device] device_uuid: ~p, auth: false", [DeviceUUID]), + {noreply, State#state{auth_status = 0}}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to %% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_statem terminates with -%% Reason. The return value is ignored. -terminate(Reason, StateName, #state{device_uuid = DeviceUUID}) -> - lager:notice("[iot_device] device_uuid: ~p, state_name: ~p, terminate with reason: ~p", [DeviceUUID, StateName, Reason]), +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(Reason, #state{device_uuid = DeviceUUID}) -> + lager:notice("[iot_device] device_uuid: ~p, terminate with reason: ~p", [DeviceUUID, Reason]), ok. %% @private %% @doc Convert process state when code is changed -code_change(_OldVsn, StateName, State = #state{}, _Extra) -> - {ok, StateName, State}. +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. %%%=================================================================== %%% Internal functions @@ -306,3 +301,9 @@ filter_tags(L, Tags) when map_size(Tags) =:= 0 -> lists:all(fun({TagName, TagVal}) -> maps:is_key(TagName, Tags0) andalso maps:get(TagName, Tags0) =:= TagVal end, maps:to_list(Tags)) end, L). +%% 转换成整数,从数据读取的数据有时候不一定都是整数 +-spec as_integer(Val :: integer() | binary()) -> integer(). +as_integer(Val) when is_integer(Val) -> + Val; +as_integer(Val) when is_binary(Val) -> + binary_to_integer(Val). \ No newline at end of file From feaa19b42eb27e13e4aa6e39cf37100d1eba1eb4 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 11 Jul 2024 15:49:01 +0800 Subject: [PATCH 02/11] add event group --- apps/iot/src/iot_device.erl | 33 +++++- apps/iot/src/iot_event_publisher.erl | 171 +++++++++++++++++++++++++++ 2 files changed, 199 insertions(+), 5 deletions(-) create mode 100644 apps/iot/src/iot_event_publisher.erl diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index 0ef05f6..e10c0ed 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -7,7 +7,7 @@ %%% @end %%% Created : 11. 7月 2024 11:33 %%%------------------------------------------------------------------- --module(iot_device1). +-module(iot_device). -author("anlicheng"). -include("iot.hrl"). @@ -16,11 +16,11 @@ %% API -export([get_name/1, get_pid/1, serialize/1]). -export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2, data/2, query/5]). +-export([ai_event/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - %% 存储数据的上限 -define(MAX_SIZE, 2000). @@ -30,7 +30,9 @@ queue = queue:new(), %% 设备是否授权 auth_status :: integer(), - status = ?DEVICE_OFFLINE + status = ?DEVICE_OFFLINE, + %% 事件分组 + ai_groups = #{} }). %%%=================================================================== @@ -109,6 +111,10 @@ data(Pid, DataList) when is_pid(Pid), is_list(DataList) -> query(Pid, Tags, StartTs, StopTs, Limit) when is_pid(Pid), is_map(Tags), is_integer(StartTs), is_integer(StopTs), is_integer(Limit), StartTs >= 0, StopTs >= 0, Limit >= 0 -> gen_server:call(Pid, {query, Tags, StartTs, StopTs, Limit}). +-spec ai_event(Pid :: pid(), EventType :: integer(), Params :: map()) -> no_return(). +ai_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_map(Params) -> + gen_server:cast(Pid, {ai_event, EventType, Params}). + %% @doc Spawns the server and registers the local name (unique) -spec(start_link(Name :: atom(), DeviceUUID :: binary()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). @@ -226,7 +232,20 @@ handle_cast({auth, true}, State = #state{device_uuid = DeviceUUID}) -> {noreply, State#state{auth_status = 1}}; handle_cast({auth, false}, State = #state{device_uuid = DeviceUUID}) -> lager:debug("[iot_device] device_uuid: ~p, auth: false", [DeviceUUID]), - {noreply, State#state{auth_status = 0}}. + {noreply, State#state{auth_status = 0}}; + +%% ai事件的延迟整流逻辑 +handle_cast({ai_event, EventType, Params}, State = #state{ai_groups = Groups, device_uuid = DeviceUUID}) -> + GroupKey = group_by(EventType, Params), + case maps:find(GroupKey, Groups) of + {ok, PublisherPid} -> + iot_event_publisher:publish(PublisherPid, EventType, Params), + {noreply, State}; + error -> + {ok, PublisherPid} = iot_event_publisher:start_link(DeviceUUID, 1), + iot_event_publisher:publish(PublisherPid, EventType, Params), + {noreply, State#state{ai_groups = maps:put(GroupKey, PublisherPid, Groups)}} + end. %% @private %% @doc Handling all non call/cast messages @@ -306,4 +325,8 @@ filter_tags(L, Tags) when map_size(Tags) =:= 0 -> as_integer(Val) when is_integer(Val) -> Val; as_integer(Val) when is_binary(Val) -> - binary_to_integer(Val). \ No newline at end of file + binary_to_integer(Val). + +%% 事件分组函数 +group_by(EventType, _Params) -> + EventType. \ No newline at end of file diff --git a/apps/iot/src/iot_event_publisher.erl b/apps/iot/src/iot_event_publisher.erl new file mode 100644 index 0000000..10edae2 --- /dev/null +++ b/apps/iot/src/iot_event_publisher.erl @@ -0,0 +1,171 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2024, +%%% @doc +%%% 事件发布器,提供基于时间周期的缓冲 +%%% @end +%%% Created : 11. 7月 2024 14:40 +%%%------------------------------------------------------------------- +-module(iot_event_publisher). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/2, publish/3, set_throttle/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + device_uuid :: binary(), + throttle_time = 0 :: integer(), + timer_ref :: undefined | reference(), + %% 已经发送的事件的计数器 + counter = 0, + last_event :: any(), + %% 最后一次发送数据的时间戳, 如果数据跨越了多个时间跨度时也需要立即发送出去 + last_timestamp = 0 :: integer() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec publish(Pid :: pid(), EventType :: integer(), Params :: map()) -> no_return(). +publish(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_map(Params) -> + gen_server:cast(Pid, {publish, EventType, Params}). + +-spec set_throttle(Pid :: pid(), ThrottleTime :: integer()) -> no_return(). +set_throttle(Pid, ThrottleTime) when is_pid(Pid), is_integer(ThrottleTime), ThrottleTime > 0 -> + gen_server:cast(Pid, {set_throttle, ThrottleTime}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(DeviceUUID :: binary(), ThrottleTime :: integer()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(DeviceUUID, ThrottleTime) when is_binary(DeviceUUID), is_integer(ThrottleTime) -> + gen_server:start_link(?MODULE, [DeviceUUID, ThrottleTime], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([DeviceUUID, ThrottleTime]) -> + TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), + {ok, #state{device_uuid = DeviceUUID, throttle_time = ThrottleTime, timer_ref = TimerRef, last_timestamp = iot_util:timestamp_of_seconds()}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +%% 重置时间周期:新的周期大于之前的 +handle_cast({set_throttle, NThrottleTime}, State = #state{throttle_time = ThrottleTime}) when NThrottleTime > ThrottleTime -> + {noreply, State#state{throttle_time = NThrottleTime}}; + +%% 重置时间周期:新的周期小于之前的 +handle_cast({set_throttle, NThrottleTime}, State = #state{device_uuid = DeviceUUID, counter = Counter, timer_ref = TimerRef, last_timestamp = LastTimestamp, last_event = LastEvent}) -> + %% 重置定时器 + is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), + + Timestamp = iot_util:timestamp_of_seconds(), + PassedSeconds = Timestamp - LastTimestamp, + case PassedSeconds >= NThrottleTime of + true -> + NTimerRef = erlang:start_timer(NThrottleTime * 1000, self(), throttle_ticker), + %% 发送消息 + case LastEvent of + {EventType, Params} -> + iot_ai_router:route_uuid(DeviceUUID, EventType, Params), + {noreply, State#state{timer_ref = NTimerRef, counter = Counter + 1, last_timestamp = iot_util:timestamp_of_seconds()}}; + undefined -> + {noreply, State#state{timer_ref = NTimerRef}} + end; + false -> + NTimerRef = erlang:start_timer((NThrottleTime - PassedSeconds) * 1000, self(), throttle_ticker), + {noreply, State#state{timer_ref = NTimerRef}} + end; + +%% 第一次收到消息,则立即发送; 并且重置定时器 +handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, throttle_time = ThrottleTime, timer_ref = TimerRef, counter = 0}) -> + %% 重置定时器 + is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), + NTimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), + %% 发送消息 + iot_ai_router:route_uuid(DeviceUUID, EventType, Params), + {noreply, State#state{counter = 1, timer_ref = NTimerRef, last_timestamp = iot_util:timestamp_of_seconds()}}; + +%% 缓冲数据,等到时间到了再发送 +handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, timer_ref = TimerRef, counter = Counter, last_timestamp = LastTimestamp, throttle_time = ThrottleTime}) -> + Timestamp = iot_util:timestamp_of_seconds(), + %% 如果数据发送间隔已经超过了一个时间跨度,也需要立即发送 + case LastTimestamp > 0 andalso Timestamp > (LastTimestamp + ThrottleTime) of + true -> + %% 重置定时器 + is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), + NTimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), + %% 发送消息 + iot_ai_router:route_uuid(DeviceUUID, EventType, Params), + {noreply, State#state{counter = Counter + 1, timer_ref = NTimerRef, last_timestamp = iot_util:timestamp_of_seconds()}}; + false -> + {noreply, State#state{counter = Counter + 1, last_event = {EventType, Params}}} + end. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +%% 周期时间内没有任何数据 +handle_info({timeout, _, throttle_ticker}, State = #state{throttle_time = ThrottleTime, last_event = undefined}) -> + TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), + {noreply, State#state{timer_ref = TimerRef}}; +%% 已经发送的数据也需要重置,避免推送重复的数据 +handle_info({timeout, _, throttle_ticker}, State = #state{device_uuid = DeviceUUID, throttle_time = ThrottleTime, last_event = {EventType, Params}}) -> + TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), + iot_ai_router:route_uuid(DeviceUUID, EventType, Params), + {noreply, State#state{timer_ref = TimerRef, last_event = undefined, last_timestamp = iot_util:timestamp_of_seconds()}}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== From 5b5f4b604255733e8cb0b02f44e0197019290cd3 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 11 Jul 2024 16:25:15 +0800 Subject: [PATCH 03/11] event peroid --- apps/iot/src/iot_device.erl | 2 +- apps/iot/src/iot_event_period_settings.erl | 123 +++++++++++++++++++++ apps/iot/src/iot_event_publisher.erl | 66 ++++------- apps/iot/src/iot_sup.erl | 9 ++ 4 files changed, 154 insertions(+), 46 deletions(-) create mode 100644 apps/iot/src/iot_event_period_settings.erl diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index e10c0ed..95d5374 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -242,7 +242,7 @@ handle_cast({ai_event, EventType, Params}, State = #state{ai_groups = Groups, de iot_event_publisher:publish(PublisherPid, EventType, Params), {noreply, State}; error -> - {ok, PublisherPid} = iot_event_publisher:start_link(DeviceUUID, 1), + {ok, PublisherPid} = iot_event_publisher:start_link(DeviceUUID, GroupKey), iot_event_publisher:publish(PublisherPid, EventType, Params), {noreply, State#state{ai_groups = maps:put(GroupKey, PublisherPid, Groups)}} end. diff --git a/apps/iot/src/iot_event_period_settings.erl b/apps/iot/src/iot_event_period_settings.erl new file mode 100644 index 0000000..f0a9275 --- /dev/null +++ b/apps/iot/src/iot_event_period_settings.erl @@ -0,0 +1,123 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2024, +%%% @doc +%%% +%%% @end +%%% Created : 11. 7月 2024 15:54 +%%%------------------------------------------------------------------- +-module(iot_event_period_settings). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([get_throttle/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-define(TAB_NAME, iot_ets_event_period). + +-record(state, { + +}). + +-record(period, { + group_key :: any(), + throttle :: integer() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% 获取设置的时间周期 +-spec get_throttle(GroupKey :: any()) -> integer(). +get_throttle(GroupKey) -> + case ets:lookup(?TAB_NAME, GroupKey) of + [] -> + 300; + [#period{throttle = Throttle}|_] -> + Throttle + end. + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + ets:new(?TAB_NAME, [public, set, named_table, {keypos, 2}]), + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({get_throttle, GroupKey}, _From, State = #state{settings = Settings}) -> + case maps:find(GroupKey, Settings) of + {ok, Throttle} -> + {reply, {ok, Throttle}, State}; + error -> + {reply, {ok, 300}, State} + end. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/iot_event_publisher.erl b/apps/iot/src/iot_event_publisher.erl index 10edae2..0b65c4c 100644 --- a/apps/iot/src/iot_event_publisher.erl +++ b/apps/iot/src/iot_event_publisher.erl @@ -21,11 +21,11 @@ -record(state, { device_uuid :: binary(), - throttle_time = 0 :: integer(), - timer_ref :: undefined | reference(), + timer_ref = undefined :: undefined | reference(), + group_key :: any(), %% 已经发送的事件的计数器 counter = 0, - last_event :: any(), + last_event = undefined :: any(), %% 最后一次发送数据的时间戳, 如果数据跨越了多个时间跨度时也需要立即发送出去 last_timestamp = 0 :: integer() }). @@ -43,10 +43,10 @@ set_throttle(Pid, ThrottleTime) when is_pid(Pid), is_integer(ThrottleTime), Thro gen_server:cast(Pid, {set_throttle, ThrottleTime}). %% @doc Spawns the server and registers the local name (unique) --spec(start_link(DeviceUUID :: binary(), ThrottleTime :: integer()) -> +-spec(start_link(DeviceUUID :: binary(), GroupKey :: any()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(DeviceUUID, ThrottleTime) when is_binary(DeviceUUID), is_integer(ThrottleTime) -> - gen_server:start_link(?MODULE, [DeviceUUID, ThrottleTime], []). +start_link(DeviceUUID, GroupKey) when is_binary(DeviceUUID) -> + gen_server:start_link(?MODULE, [DeviceUUID, GroupKey], []). %%%=================================================================== %%% gen_server callbacks @@ -57,9 +57,8 @@ start_link(DeviceUUID, ThrottleTime) when is_binary(DeviceUUID), is_integer(Thro -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([DeviceUUID, ThrottleTime]) -> - TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), - {ok, #state{device_uuid = DeviceUUID, throttle_time = ThrottleTime, timer_ref = TimerRef, last_timestamp = iot_util:timestamp_of_seconds()}}. +init([DeviceUUID, GroupKey]) -> + {ok, #state{device_uuid = DeviceUUID, group_key = GroupKey, last_timestamp = iot_util:timestamp_of_seconds(), last_event = undefined}}. %% @private %% @doc Handling call messages @@ -80,47 +79,21 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -%% 重置时间周期:新的周期大于之前的 -handle_cast({set_throttle, NThrottleTime}, State = #state{throttle_time = ThrottleTime}) when NThrottleTime > ThrottleTime -> - {noreply, State#state{throttle_time = NThrottleTime}}; - -%% 重置时间周期:新的周期小于之前的 -handle_cast({set_throttle, NThrottleTime}, State = #state{device_uuid = DeviceUUID, counter = Counter, timer_ref = TimerRef, last_timestamp = LastTimestamp, last_event = LastEvent}) -> - %% 重置定时器 - is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), - - Timestamp = iot_util:timestamp_of_seconds(), - PassedSeconds = Timestamp - LastTimestamp, - case PassedSeconds >= NThrottleTime of - true -> - NTimerRef = erlang:start_timer(NThrottleTime * 1000, self(), throttle_ticker), - %% 发送消息 - case LastEvent of - {EventType, Params} -> - iot_ai_router:route_uuid(DeviceUUID, EventType, Params), - {noreply, State#state{timer_ref = NTimerRef, counter = Counter + 1, last_timestamp = iot_util:timestamp_of_seconds()}}; - undefined -> - {noreply, State#state{timer_ref = NTimerRef}} - end; - false -> - NTimerRef = erlang:start_timer((NThrottleTime - PassedSeconds) * 1000, self(), throttle_ticker), - {noreply, State#state{timer_ref = NTimerRef}} - end; - %% 第一次收到消息,则立即发送; 并且重置定时器 -handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, throttle_time = ThrottleTime, timer_ref = TimerRef, counter = 0}) -> +handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, group_key = GroupKey, counter = 0}) -> %% 重置定时器 - is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), - NTimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), + ThrottleTime = iot_event_period_settings:get_throttle(GroupKey), + TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), %% 发送消息 iot_ai_router:route_uuid(DeviceUUID, EventType, Params), - {noreply, State#state{counter = 1, timer_ref = NTimerRef, last_timestamp = iot_util:timestamp_of_seconds()}}; + {noreply, State#state{counter = 1, timer_ref = TimerRef, last_timestamp = iot_util:timestamp_of_seconds()}}; %% 缓冲数据,等到时间到了再发送 -handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, timer_ref = TimerRef, counter = Counter, last_timestamp = LastTimestamp, throttle_time = ThrottleTime}) -> +handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, timer_ref = TimerRef, counter = Counter, last_timestamp = LastTimestamp, group_key = GroupKey}) -> + ThrottleTime = iot_event_period_settings:get_throttle(GroupKey), Timestamp = iot_util:timestamp_of_seconds(), %% 如果数据发送间隔已经超过了一个时间跨度,也需要立即发送 - case LastTimestamp > 0 andalso Timestamp > (LastTimestamp + ThrottleTime) of + case Timestamp > LastTimestamp + ThrottleTime of true -> %% 重置定时器 is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), @@ -129,7 +102,7 @@ handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUI iot_ai_router:route_uuid(DeviceUUID, EventType, Params), {noreply, State#state{counter = Counter + 1, timer_ref = NTimerRef, last_timestamp = iot_util:timestamp_of_seconds()}}; false -> - {noreply, State#state{counter = Counter + 1, last_event = {EventType, Params}}} + {noreply, State#state{last_event = {EventType, Params}}} end. %% @private @@ -139,11 +112,14 @@ handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUI {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 周期时间内没有任何数据 -handle_info({timeout, _, throttle_ticker}, State = #state{throttle_time = ThrottleTime, last_event = undefined}) -> +handle_info({timeout, _, throttle_ticker}, State = #state{group_key = GroupKey, last_event = undefined}) -> + ThrottleTime = iot_event_period_settings:get_throttle(GroupKey), TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), {noreply, State#state{timer_ref = TimerRef}}; + %% 已经发送的数据也需要重置,避免推送重复的数据 -handle_info({timeout, _, throttle_ticker}, State = #state{device_uuid = DeviceUUID, throttle_time = ThrottleTime, last_event = {EventType, Params}}) -> +handle_info({timeout, _, throttle_ticker}, State = #state{device_uuid = DeviceUUID, group_key = GroupKey, last_event = {EventType, Params}}) -> + ThrottleTime = iot_event_period_settings:get_throttle(GroupKey), TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), iot_ai_router:route_uuid(DeviceUUID, EventType, Params), {noreply, State#state{timer_ref = TimerRef, last_event = undefined, last_timestamp = iot_util:timestamp_of_seconds()}}. diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 6cd29d5..85de54d 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -37,6 +37,15 @@ init([]) -> modules => ['iot_api'] }, + #{ + id => 'iot_event_period_settings', + start => {'iot_event_period_settings', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_event_period_settings'] + }, + #{ id => 'iot_device_sup', start => {'iot_device_sup', start_link, []}, From 9667909b278d34c9c10b809609482f2918d30247 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 11 Jul 2024 17:05:52 +0800 Subject: [PATCH 04/11] fix iot_api --- apps/iot/src/iot_api.erl | 108 +++------------------------------------ apps/iot/src/iot_sup.erl | 9 ---- 2 files changed, 6 insertions(+), 111 deletions(-) diff --git a/apps/iot/src/iot_api.erl b/apps/iot/src/iot_api.erl index a5fed49..5ed9f07 100644 --- a/apps/iot/src/iot_api.erl +++ b/apps/iot/src/iot_api.erl @@ -1,76 +1,21 @@ %%%------------------------------------------------------------------- %%% @author anlicheng -%%% @copyright (C) 2023, +%%% @copyright (C) 2024, %%% @doc %%% %%% @end -%%% Created : 24. 12月 2023 15:42 +%%% Created : 11. 7月 2024 17:01 %%%------------------------------------------------------------------- -module(iot_api). -author("anlicheng"). --behaviour(gen_server). - -%% API --export([start_link/0]). --export([ai_event/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --define(SERVER, ?MODULE). -define(API_TOKEN, <<"wv6fGyBhl*7@AsD9">>). --record(state, { - -}). - -%%%=================================================================== -%%% API -%%%=================================================================== +%% API +-export([ai_event/1]). ai_event(Id) when is_integer(Id) -> - gen_server:cast(?MODULE, {ai_event, Id}). - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link() -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([]) -> - {ok, #state{}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({ai_event, Id}, State = #state{}) -> - spawn_monitor(fun() -> + spawn(fun() -> Token = iot_util:md5(<>), {ok, Url} = application:get_env(iot, api_url), @@ -94,45 +39,4 @@ handle_cast({ai_event, Id}, State = #state{}) -> {error, Reason} -> lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, Reason]) end - end), - - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -%% Task进程挂掉 -handle_info({'DOWN', _MRef, process, _Pid, normal}, State) -> - {noreply, State}; - -handle_info({'DOWN', _MRef, process, _Pid, Reason}, State) -> - lager:notice("[iot_api] task process down with reason: ~p", [Reason]), - {noreply, State}; - -handle_info(_Info, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== + end). \ No newline at end of file diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 85de54d..6a8255e 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -28,15 +28,6 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, Specs = [ - #{ - id => 'iot_api', - start => {'iot_api', start_link, []}, - restart => permanent, - shutdown => 2000, - type => supervisor, - modules => ['iot_api'] - }, - #{ id => 'iot_event_period_settings', start => {'iot_event_period_settings', start_link, []}, From 686187a9ccdc26601d94428a03befedb850c8806 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 11 Jul 2024 17:10:07 +0800 Subject: [PATCH 05/11] add iot_task --- apps/iot/iot_task.erl | 106 ++++++++++++++++++++++++++++++++++++++ apps/iot/src/iot_host.erl | 2 +- 2 files changed, 107 insertions(+), 1 deletion(-) create mode 100644 apps/iot/iot_task.erl diff --git a/apps/iot/iot_task.erl b/apps/iot/iot_task.erl new file mode 100644 index 0000000..8a34c6c --- /dev/null +++ b/apps/iot/iot_task.erl @@ -0,0 +1,106 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2024, +%%% @doc +%%% +%%% @end +%%% Created : 11. 7月 2024 17:06 +%%%------------------------------------------------------------------- +-module(iot_task). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([submit/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% 提交异步任务 +-spec submit(Task :: fun()) -> no_return(). +submit(Task) when is_function(Task, 0) -> + gen_server:cast(?SERVER, Task). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({submit, Task}, State = #state{}) -> + spawn_monitor(fun() -> Task() end), + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index f92cec9..90bcede 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -491,7 +491,7 @@ handle_event(cast, {handle, {ai_event, Event0}}, ?STATE_ACTIVATED, State = #stat Message = iolist_to_binary(jiffy:encode(Params, [force_utf8])), case ai_event_logs_bo:insert(UUID, DeviceUUID, SceneId, MicroId, EventType, Message) of {ok, LogId} -> - iot_api:ai_event(LogId); + catch iot_api:ai_event(LogId); _ -> ok end, From 9119aaff699cb6cfd2eccd361c40924b2fe4c3b8 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 11 Jul 2024 17:15:03 +0800 Subject: [PATCH 06/11] add iot_task --- apps/iot/src/iot_api.erl | 6 ++++-- apps/iot/src/iot_sup.erl | 9 +++++++++ apps/iot/{ => src}/iot_task.erl | 17 +++++++++++++++-- 3 files changed, 28 insertions(+), 4 deletions(-) rename apps/iot/{ => src}/iot_task.erl (86%) diff --git a/apps/iot/src/iot_api.erl b/apps/iot/src/iot_api.erl index 5ed9f07..abf5f34 100644 --- a/apps/iot/src/iot_api.erl +++ b/apps/iot/src/iot_api.erl @@ -14,8 +14,9 @@ %% API -export([ai_event/1]). +-spec ai_event(Id :: integer()) -> no_return(). ai_event(Id) when is_integer(Id) -> - spawn(fun() -> + Task = fun() -> Token = iot_util:md5(<>), {ok, Url} = application:get_env(iot, api_url), @@ -39,4 +40,5 @@ ai_event(Id) when is_integer(Id) -> {error, Reason} -> lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, Reason]) end - end). \ No newline at end of file + end, + iot_task:submit(Task). \ No newline at end of file diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 6a8255e..4d2fea6 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -28,6 +28,15 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, Specs = [ + #{ + id => 'iot_task', + start => {'iot_task', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_task'] + }, + #{ id => 'iot_event_period_settings', start => {'iot_event_period_settings', start_link, []}, diff --git a/apps/iot/iot_task.erl b/apps/iot/src/iot_task.erl similarity index 86% rename from apps/iot/iot_task.erl rename to apps/iot/src/iot_task.erl index 8a34c6c..0a151e3 100644 --- a/apps/iot/iot_task.erl +++ b/apps/iot/src/iot_task.erl @@ -13,7 +13,7 @@ %% API -export([start_link/0]). --export([submit/1]). +-export([submit/1, debug_info/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -21,7 +21,7 @@ -define(SERVER, ?MODULE). -record(state, { - + counter = 0 }). %%%=================================================================== @@ -33,6 +33,9 @@ submit(Task) when is_function(Task, 0) -> gen_server:cast(?SERVER, Task). +debug_info() -> + gen_server:cast(?SERVER, debug_info). + %% @doc Spawns the server and registers the local name (unique) -spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). @@ -72,6 +75,9 @@ handle_call(_Request, _From, State = #state{}) -> {stop, Reason :: term(), NewState :: #state{}}). handle_cast({submit, Task}, State = #state{}) -> spawn_monitor(fun() -> Task() end), + {noreply, State}; +handle_cast(debug_info, State = #state{counter = Counter}) -> + lager:debug("[iot_task] execute task_num: ~p", [Counter]), {noreply, State}. %% @private @@ -80,6 +86,13 @@ handle_cast({submit, Task}, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). +%% Task进程挂掉 +handle_info({'DOWN', _MRef, process, _Pid, normal}, State=#state{counter = Counter}) -> + {noreply, State#state{counter = Counter + 1}}; +handle_info({'DOWN', _MRef, process, _Pid, Reason}, State) -> + lager:notice("[iot_task] task process down with reason: ~p", [Reason]), + {noreply, State}; + handle_info(_Info, State = #state{}) -> {noreply, State}. From 2e1ea10a49b8c8896f54a86a1cec92ae590efbed Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 11 Jul 2024 17:37:01 +0800 Subject: [PATCH 07/11] fix event_period_settings --- apps/iot/src/iot_api.erl | 33 +++++++++++++++-- apps/iot/src/iot_event_period_settings.erl | 41 +++++++++++++++++----- config/sys-dev.config | 2 +- config/sys-prod.config | 2 +- 4 files changed, 64 insertions(+), 14 deletions(-) diff --git a/apps/iot/src/iot_api.erl b/apps/iot/src/iot_api.erl index abf5f34..4985683 100644 --- a/apps/iot/src/iot_api.erl +++ b/apps/iot/src/iot_api.erl @@ -12,13 +12,14 @@ -define(API_TOKEN, <<"wv6fGyBhl*7@AsD9">>). %% API --export([ai_event/1]). +-export([ai_event/1, get_event_period_settings/0]). -spec ai_event(Id :: integer()) -> no_return(). ai_event(Id) when is_integer(Id) -> Task = fun() -> Token = iot_util:md5(<>), - {ok, Url} = application:get_env(iot, api_url), + {ok, Url0} = application:get_env(iot, api_url), + Url = Url0 ++ "/api/v1/taskLog", Headers = [ {<<"content-type">>, <<"application/json">>} @@ -41,4 +42,30 @@ ai_event(Id) when is_integer(Id) -> lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, Reason]) end end, - iot_task:submit(Task). \ No newline at end of file + iot_task:submit(Task). + +-spec get_event_period_settings() -> {ok, Resp :: binary()} | {error, Reason :: any()}. +get_event_period_settings() -> + Token = iot_util:md5(<>), + {ok, Url0} = application:get_env(iot, api_url), + Url = Url0 ++ "/api/v1/eventPeriodSettings", + + Headers = [ + {<<"content-type">>, <<"application/json">>} + ], + ReqData = #{ + <<"token">> => Token + }, + Body = iolist_to_binary(jiffy:encode(ReqData, [force_utf8])), + case hackney:request(post, Url, Headers, Body, [{pool, false}]) of + {ok, 200, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + hackney:close(ClientRef), + {ok, RespBody}; + {ok, HttpCode, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + hackney:close(ClientRef), + {error, {http_error, HttpCode}}; + {error, Reason} -> + {error, Reason} + end. \ No newline at end of file diff --git a/apps/iot/src/iot_event_period_settings.erl b/apps/iot/src/iot_event_period_settings.erl index f0a9275..47cfff3 100644 --- a/apps/iot/src/iot_event_period_settings.erl +++ b/apps/iot/src/iot_event_period_settings.erl @@ -22,6 +22,12 @@ -define(TAB_NAME, iot_ets_event_period). +%% 更新周期, 单位:秒 +-define(UPDATE_TICKER, 300). + +%% 默认周期, 单位:秒 +-define(DEFAULT_THROTTLE, 300). + -record(state, { }). @@ -40,7 +46,7 @@ get_throttle(GroupKey) -> case ets:lookup(?TAB_NAME, GroupKey) of [] -> - 300; + ?DEFAULT_THROTTLE; [#period{throttle = Throttle}|_] -> Throttle end. @@ -62,6 +68,8 @@ start_link() -> {stop, Reason :: term()} | ignore). init([]) -> ets:new(?TAB_NAME, [public, set, named_table, {keypos, 2}]), + settings(iot_api:get_event_period_settings()), + erlang:start_timer(?UPDATE_TICKER * 1000, self(), update_ticker), {ok, #state{}}. %% @private @@ -74,13 +82,8 @@ init([]) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). -handle_call({get_throttle, GroupKey}, _From, State = #state{settings = Settings}) -> - case maps:find(GroupKey, Settings) of - {ok, Throttle} -> - {reply, {ok, Throttle}, State}; - error -> - {reply, {ok, 300}, State} - end. +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. %% @private %% @doc Handling cast messages @@ -97,7 +100,9 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info(_Info, State = #state{}) -> +handle_info({timeout, _, update_ticker}, State = #state{}) -> + settings(iot_api:get_event_period_settings()), + erlang:start_timer(?UPDATE_TICKER * 1000, self(), update_ticker), {noreply, State}. %% @private @@ -121,3 +126,21 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +-spec settings(tuple()) -> no_return(). +settings({ok, Resp}) when is_binary(Resp) -> + case catch jiffy:decode(Resp, [return_maps]) of + Settings when is_map(Settings) -> + lists:foreach(fun({GroupKey, Throttle}) -> + case is_integer(Throttle) andalso Throttle > 0 of + true -> + ets:insert(?TAB_NAME, #period{group_key = GroupKey, throttle = Throttle}); + false -> + ok + end + end, maps:to_list(Settings)); + Error -> + lager:debug("[iot_event_period_settings] get event_period_settings from api get error: ~p", [Error]) + end; +settings({error, Reason}) -> + lager:debug("[iot_event_period_settings] get event_period_settings from api get error: ~p", [Reason]). \ No newline at end of file diff --git a/config/sys-dev.config b/config/sys-dev.config index 2fb8c42..0919faf 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -18,7 +18,7 @@ {port, 18080} ]}, - {api_url, "http://39.98.184.67:8800/api/v1/taskLog"}, + {api_url, "http://39.98.184.67:8800"}, %% 目标服务器地址 {emqx_server, [ diff --git a/config/sys-prod.config b/config/sys-prod.config index e7f331d..ecbad58 100644 --- a/config/sys-prod.config +++ b/config/sys-prod.config @@ -23,7 +23,7 @@ {<<"test">>, <<"iot2023">>} ]}, - {api_url, "https://lgsiot.njau.edu.cn/api/v1/taskLog"}, + {api_url, "https://lgsiot.njau.edu.cn"}, %% 配置中电的数据转发, mqtt协议 {zhongdian, [ From a3354f314584e51ec77ee2efcccb5989b21e00f1 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 12 Jul 2024 10:25:34 +0800 Subject: [PATCH 08/11] task worker pool --- apps/iot/src/iot_api.erl | 1 - apps/iot/src/iot_event_period_settings.erl | 7 +- apps/iot/src/iot_host.erl | 3 +- apps/iot/src/iot_task.erl | 19 ++-- apps/iot/src/iot_task_worker.erl | 110 +++++++++++++++++++++ 5 files changed, 126 insertions(+), 14 deletions(-) create mode 100644 apps/iot/src/iot_task_worker.erl diff --git a/apps/iot/src/iot_api.erl b/apps/iot/src/iot_api.erl index 4985683..f00fd34 100644 --- a/apps/iot/src/iot_api.erl +++ b/apps/iot/src/iot_api.erl @@ -63,7 +63,6 @@ get_event_period_settings() -> hackney:close(ClientRef), {ok, RespBody}; {ok, HttpCode, _, ClientRef} -> - {ok, RespBody} = hackney:body(ClientRef), hackney:close(ClientRef), {error, {http_error, HttpCode}}; {error, Reason} -> diff --git a/apps/iot/src/iot_event_period_settings.erl b/apps/iot/src/iot_event_period_settings.erl index 47cfff3..159b15b 100644 --- a/apps/iot/src/iot_event_period_settings.erl +++ b/apps/iot/src/iot_event_period_settings.erl @@ -8,12 +8,13 @@ %%%------------------------------------------------------------------- -module(iot_event_period_settings). -author("anlicheng"). +-include_lib("stdlib/include/qlc.hrl"). -behaviour(gen_server). %% API -export([start_link/0]). --export([get_throttle/1]). +-export([get_throttle/1, debug_info/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -41,6 +42,10 @@ %%% API %%%=================================================================== +debug_info() -> + Q = qlc:q([E || E <- ets:table(?TAB_NAME)]), + qlc:e(Q). + %% 获取设置的时间周期 -spec get_throttle(GroupKey :: any()) -> integer(). get_throttle(GroupKey) -> diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 90bcede..f61ea79 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -497,7 +497,8 @@ handle_event(cast, {handle, {ai_event, Event0}}, ?STATE_ACTIVATED, State = #stat end, iot_device:change_status(DevicePid, ?DEVICE_ONLINE), - iot_ai_router:route_uuid(DeviceUUID, EventType, Params) + %iot_ai_router:route_uuid(DeviceUUID, EventType, Params) + iot_device:ai_event(DevicePid, EventType, Params) end; Event when is_map(Event) -> lager:warning("[iot_host] host: ~p, event: ~p, not supported", [UUID, Event]); diff --git a/apps/iot/src/iot_task.erl b/apps/iot/src/iot_task.erl index 0a151e3..fa175b7 100644 --- a/apps/iot/src/iot_task.erl +++ b/apps/iot/src/iot_task.erl @@ -21,7 +21,8 @@ -define(SERVER, ?MODULE). -record(state, { - counter = 0 + counter = 0, + pool_pid :: pid() }). %%%=================================================================== @@ -52,7 +53,9 @@ start_link() -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([]) -> - {ok, #state{}}. + %% 启动工作的线程池 + {ok, PoolPid} = poolboy:start_link([{size, 10}, {max_overflow, 50}, {worker_module, iot_task_worker}], []), + {ok, #state{pool_pid = PoolPid}}. %% @private %% @doc Handling call messages @@ -73,8 +76,9 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({submit, Task}, State = #state{}) -> - spawn_monitor(fun() -> Task() end), +handle_cast({submit, Task}, State = #state{pool_pid = PoolPid}) -> + %% 限制异步任务的最大并发数, 避免进程被耗尽 + poolboy:transaction(PoolPid, fun(WorkerPid) -> iot_task_worker:execute(WorkerPid, Task) end), {noreply, State}; handle_cast(debug_info, State = #state{counter = Counter}) -> lager:debug("[iot_task] execute task_num: ~p", [Counter]), @@ -86,13 +90,6 @@ handle_cast(debug_info, State = #state{counter = Counter}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -%% Task进程挂掉 -handle_info({'DOWN', _MRef, process, _Pid, normal}, State=#state{counter = Counter}) -> - {noreply, State#state{counter = Counter + 1}}; -handle_info({'DOWN', _MRef, process, _Pid, Reason}, State) -> - lager:notice("[iot_task] task process down with reason: ~p", [Reason]), - {noreply, State}; - handle_info(_Info, State = #state{}) -> {noreply, State}. diff --git a/apps/iot/src/iot_task_worker.erl b/apps/iot/src/iot_task_worker.erl new file mode 100644 index 0000000..47e2dfd --- /dev/null +++ b/apps/iot/src/iot_task_worker.erl @@ -0,0 +1,110 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2024, +%%% @doc +%%% +%%% @end +%%% Created : 12. 7月 2024 10:16 +%%%------------------------------------------------------------------- +-module(iot_task_worker). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([execute/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec execute(Pid :: pid(), Task :: fun()) -> ok. +execute(Pid, Task) when is_pid(Pid), is_function(Task, 0) -> + gen_server:call(Pid, {execute, Task}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({execute, Task}, _From, State = #state{}) -> + case catch Task() of + {error, Reason} -> + lager:warning("[iot_task_worker] execute task get error: ~p", Reason); + _ -> + ok + end, + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== From 99bb1aa1d0ba21b56078935a3b8030c1f445862a Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 12 Jul 2024 11:25:59 +0800 Subject: [PATCH 09/11] fix --- apps/iot/src/iot_task.erl | 2 +- apps/iot/src/iot_task_worker.erl | 17 ++++++----------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/apps/iot/src/iot_task.erl b/apps/iot/src/iot_task.erl index fa175b7..a9aa0b9 100644 --- a/apps/iot/src/iot_task.erl +++ b/apps/iot/src/iot_task.erl @@ -32,7 +32,7 @@ %% 提交异步任务 -spec submit(Task :: fun()) -> no_return(). submit(Task) when is_function(Task, 0) -> - gen_server:cast(?SERVER, Task). + gen_server:cast(?SERVER, {submit, Task}). debug_info() -> gen_server:cast(?SERVER, debug_info). diff --git a/apps/iot/src/iot_task_worker.erl b/apps/iot/src/iot_task_worker.erl index 47e2dfd..2243375 100644 --- a/apps/iot/src/iot_task_worker.erl +++ b/apps/iot/src/iot_task_worker.erl @@ -12,7 +12,7 @@ -behaviour(gen_server). %% API --export([start_link/0]). +-export([start_link/1]). -export([execute/2]). %% gen_server callbacks @@ -33,10 +33,10 @@ execute(Pid, Task) when is_pid(Pid), is_function(Task, 0) -> gen_server:call(Pid, {execute, Task}). %% @doc Spawns the server and registers the local name (unique) --spec(start_link() -> +-spec(start_link(Args :: list()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +start_link(Args) when is_list(Args) -> + gen_server:start_link(?MODULE, Args, []). %%%=================================================================== %%% gen_server callbacks @@ -47,7 +47,7 @@ start_link() -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([]) -> +init(_) -> {ok, #state{}}. %% @private @@ -61,12 +61,7 @@ init([]) -> {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). handle_call({execute, Task}, _From, State = #state{}) -> - case catch Task() of - {error, Reason} -> - lager:warning("[iot_task_worker] execute task get error: ~p", Reason); - _ -> - ok - end, + catch Task(), {reply, ok, State}. %% @private From 60d5ccb13d7936a1dd7c4b9fe78fb26eb7a3c764 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 12 Jul 2024 17:02:38 +0800 Subject: [PATCH 10/11] fix period --- apps/iot/src/iot_api.erl | 2 +- apps/iot/src/iot_event_period_settings.erl | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/iot/src/iot_api.erl b/apps/iot/src/iot_api.erl index f00fd34..4d92f02 100644 --- a/apps/iot/src/iot_api.erl +++ b/apps/iot/src/iot_api.erl @@ -48,7 +48,7 @@ ai_event(Id) when is_integer(Id) -> get_event_period_settings() -> Token = iot_util:md5(<>), {ok, Url0} = application:get_env(iot, api_url), - Url = Url0 ++ "/api/v1/eventPeriodSettings", + Url = Url0 ++ "/api/v1/alertPeriod", Headers = [ {<<"content-type">>, <<"application/json">>} diff --git a/apps/iot/src/iot_event_period_settings.erl b/apps/iot/src/iot_event_period_settings.erl index 159b15b..37f4881 100644 --- a/apps/iot/src/iot_event_period_settings.erl +++ b/apps/iot/src/iot_event_period_settings.erl @@ -135,15 +135,15 @@ code_change(_OldVsn, State = #state{}, _Extra) -> -spec settings(tuple()) -> no_return(). settings({ok, Resp}) when is_binary(Resp) -> case catch jiffy:decode(Resp, [return_maps]) of - Settings when is_map(Settings) -> - lists:foreach(fun({GroupKey, Throttle}) -> + #{<<"code">> := 200, <<"data">> := Settings} when is_list(Settings) -> + lists:foreach(fun(#{<<"event_code">> := GroupKey, <<"time_period">> := Throttle}) -> case is_integer(Throttle) andalso Throttle > 0 of true -> ets:insert(?TAB_NAME, #period{group_key = GroupKey, throttle = Throttle}); false -> ok end - end, maps:to_list(Settings)); + end, Settings); Error -> lager:debug("[iot_event_period_settings] get event_period_settings from api get error: ~p", [Error]) end; From 1f2fcfea2deaeeb719e59a88abfb97962a742d75 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 23 Jul 2024 15:38:08 +0800 Subject: [PATCH 11/11] simple jinzhi_endpoint --- apps/iot/src/endpoint/iot_jinzhi_endpoint.erl | 256 +++++++++--------- apps/iot/src/iot_ai_router.erl | 12 + apps/iot/src/iot_device.erl | 25 +- apps/iot/src/iot_event_publisher.erl | 88 +++--- 4 files changed, 182 insertions(+), 199 deletions(-) diff --git a/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl b/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl index e11fd00..5f5fc8c 100644 --- a/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl +++ b/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl @@ -1,41 +1,45 @@ %%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, +%%% @author anlicheng +%%% @copyright (C) 2024, %%% @doc %%% %%% @end -%%% Created : 06. 7月 2023 12:02 +%%% Created : 23. 7月 2024 14:51 %%%------------------------------------------------------------------- -module(iot_jinzhi_endpoint). --author("aresei"). --include("iot.hrl"). +-author("anlicheng"). --behaviour(gen_statem). +-behaviour(gen_server). %% API -export([start_link/0]). --export([get_pid/0, forward/4, get_stat/0]). -%% gen_statem callbacks --export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). +-export([get_pid/0, batch_forward/3]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). %% 消息重发间隔 -define(RETRY_INTERVAL, 5000). %% 系统id -define(SYS_ID, <<"ZNWLZJJKXT">>). +-record(task, { + id :: integer(), + location_code :: binary(), + dynamic_location_code :: binary(), + events =[] :: list() +}). + -record(state, { + id = 1, url :: string(), logger_pid :: pid(), - pool_size = 0, - flight_num = 0, pri_key :: public_key:private_key(), - id = 1, - queue :: queue:queue(), %% 定时器对应关系 - timer_map = #{}, - %% 记录成功处理的消息数 - acc_num = 0 + timer_map = #{} }). %%%=================================================================== @@ -46,166 +50,116 @@ get_pid() -> whereis(?MODULE). --spec forward(LocationCode :: binary(), DynamicLocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return(). -forward(LocationCode, DynamicLocationCode, EventType, Params) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_integer(EventType), is_map(Params) -> - gen_statem:cast(?MODULE, {forward, LocationCode, DynamicLocationCode, EventType, Params}). +-spec batch_forward(LocationCode :: binary(), DynamicLocationCode :: binary(), Events :: list()) -> no_return(). +batch_forward(LocationCode, DynamicLocationCode, Events) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_list(Events) -> + gen_server:cast(?MODULE, {batch_forward, LocationCode, DynamicLocationCode, Events}). --spec get_stat() -> {ok, Stat :: #{}}. -get_stat() -> - gen_statem:call(?MODULE, get_stat, 5000). - -%% @doc Creates a gen_statem process which calls Module:init/1 to -%% initialize. To ensure a synchronized start-up procedure, this -%% function does not return until Module:init/1 has returned. +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). start_link() -> - gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []). + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %%%=================================================================== -%%% gen_statem callbacks +%%% gen_server callbacks %%%=================================================================== %% @private -%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or -%% gen_statem:start_link/[3,4], this function is called by the new -%% process to initialize. +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). init([]) -> {ok, Opts} = application:get_env(iot, jinzhi), - PoolSize = proplists:get_value(pool_size, Opts), PriFile = proplists:get_value(pri_key, Opts), Url = proplists:get_value(url, Opts), {ok, LoggerPid} = iot_logger:start_link("ai_event_data"), PriKey = generate_private_key(PriFile), - {ok, connected, #state{url = Url, logger_pid = LoggerPid, pri_key = PriKey, pool_size = PoolSize, queue = queue:new()}}. + {ok, #state{url = Url, logger_pid = LoggerPid, pri_key = PriKey}}. %% @private -%% @doc This function is called by a gen_statem when it needs to find out -%% the callback mode of the callback module. -callback_mode() -> - handle_event_function. +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. %% @private -%% @doc There should be one instance of this function for each possible -%% state name. If callback_mode is state_functions, one of these -%% functions is called when gen_statem receives and event from -%% call/2, cast/2, or as a normal process message. +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({batch_forward, LocationCode, DynamicLocationCode, Events}, State = #state{id = Id, timer_map = TimerMap, pri_key = PriKey, url = Url}) -> + ReqBody = format_events(LocationCode, DynamicLocationCode, Events, PriKey), + catch do_post(Url, Id, ReqBody), + TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Id, ReqBody}), -handle_event(cast, {forward, LocationCode, DynamicLocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) -> - EventData = #event_data{id = Id, location_code = LocationCode, dynamic_location_code = DynamicLocationCode, event_type = EventType, params = Params}, - %% 避免不必要的内部消息 - Actions = case FlightNum < PoolSize of - true -> [{next_event, info, fetch_next}]; - false -> [] - end, - {keep_state, State#state{queue = queue:in(EventData, Q), id = Id + 1}, Actions}; + {noreply, State#state{id = Id + 1, timer_map = maps:put(Id, TimerRef, TimerMap)}}. -%% 触发读取下一条数据 -handle_event(info, fetch_next, _, State = #state{queue = Q, flight_num = FlightNum, timer_map = TimerMap}) -> - case queue:out(Q) of - {{value, EventData = #event_data{id = Id}}, Q1} -> - lager:debug("[iot_http_endpoint] fetch_next success, event data is: ~p", [EventData]), - catch do_post(EventData, State), - TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}), - - {keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap), queue = Q1, flight_num = FlightNum + 1}}; - {empty, Q1} -> - {keep_state, State#state{queue = Q1}} - end; - -%% 收到确认消息 -handle_event(info, {ack, Id}, _, State = #state{timer_map = TimerMap, acc_num = AccNum, flight_num = FlightNum}) -> +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({ack, Id, Body, RespBody}, State = #state{timer_map = TimerMap, logger_pid = LoggerPid}) -> + %% 记录日志 + iot_logger:write(LoggerPid, [Body, RespBody]), case maps:take(Id, TimerMap) of error -> - {keep_state, State#state{acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]}; + {noreply, State}; {TimerRef, NTimerMap} -> is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), - {keep_state, State#state{timer_map = NTimerMap, acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]} + {noreply, State#state{timer_map = NTimerMap}} end; -%% 收到重发过期请求 -handle_event(info, {timeout, _, {repost_ticker, EventData = #event_data{id = Id}}}, _, State = #state{timer_map = TimerMap}) -> - lager:debug("[iot_jinzhi_endpoint] repost data: ~p", [EventData]), - catch do_post(EventData, State), - TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}), +handle_info({timeout, _, {repost_ticker, Id, ReqBody}}, State = #state{timer_map = TimerMap, url = Url}) -> + lager:debug("[iot_jinzhi_endpoint] repost data: ~p", [ReqBody]), + catch do_post(Url, Id, ReqBody), + TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Id, ReqBody}), {keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}}; -%% Task进程挂掉 -handle_event(info, {'DOWN', _MRef, process, _Pid, normal}, _, State) -> - {keep_state, State}; +handle_info({'DOWN', _MRef, process, _Pid, normal}, State) -> + {noreply, State}; -handle_event(info, {'DOWN', _MRef, process, _Pid, Reason}, _, State) -> +handle_info({'DOWN', _MRef, process, _Pid, Reason}, State) -> lager:notice("[iot_jinzhi_endpoint] task process down with reason: ~p", [Reason]), - {keep_state, State}; - -%% 获取当前统计信息 -handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) -> - Stat = #{ - <<"acc_num">> => AccNum, - <<"queue_num">> => mnesia_queue:table_size(), - <<"state_name">> => atom_to_binary(StateName) - }, - {keep_state, State, [{reply, From, Stat}]}; + {noreply, State}. %% @private -%% @doc If callback_mode is handle_event_function, then whenever a -%% gen_statem receives an event from call/2, cast/2, or as a normal -%% process message, this function is called. -handle_event(EventType, Event, StateName, State) -> - lager:warning("[iot_jinzhi_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]), - {keep_state, State}. - -%% @private -%% @doc This function is called by a gen_statem when it is about to +%% @doc This function is called by a gen_server when it is about to %% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_statem terminates with -%% Reason. The return value is ignored. -terminate(Reason, _StateName, #state{}) -> - lager:debug("[iot_jinzhi_endpoint] terminate with reason: ~p", [Reason]), +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> ok. %% @private %% @doc Convert process state when code is changed -code_change(_OldVsn, StateName, State = #state{}, _Extra) -> - {ok, StateName, State}. +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. %%%=================================================================== %%% Internal functions %%%=================================================================== - --spec do_post(EventData :: #event_data{}, State :: #state{}) -> no_return(). -do_post(#event_data{id = Id, location_code = LocationCode, dynamic_location_code = DynamicLocationCode, event_type = EventType, - params = Params = #{<<"event_code">> := EventCode, <<"description">> := Description, <<"datetime">> := Datetime, <<"attachments">> := Attachments0}}, - #state{pri_key = PriKey, url = Url, logger_pid = LoggerPid}) -> - - % <<"occurrenceTime">> => <<"2023-06-10 12:00:00">>, - - Attachments = lists:map(fun(#{<<"filename">> := Filename}) -> - {ok, FileUrl} = iot_util:file_uri(Filename), - Name = filename:basename(FileUrl), - #{<<"name">> => Name, <<"url">> => FileUrl} - end, Attachments0), - - DeviceInfo = #{ - <<"location">> => LocationCode, - <<"dynamic_location">> => DynamicLocationCode, - <<"category">> => EventCode, - <<"description">> => Description, - <<"occurrenceTime">> => Datetime, - <<"attachments">> => Attachments - }, - - ReqData = #{ - <<"sign">> => sign(DeviceInfo, PriKey), - <<"sysId">> => ?SYS_ID, - <<"deviceInfo">> => DeviceInfo - }, - Body = iolist_to_binary(jiffy:encode(ReqData, [force_utf8])), - - lager:debug("[iot_jinzhi_endpoint] do_post url: ~p, event_type: ~p, params: ~p, location_code: ~p, body: ~p", [Url, EventType, Params, LocationCode, Body]), - +-spec do_post(Url :: string(), Id :: integer(), Body :: binary()) -> no_return(). +do_post(Url, Id, Body) when is_list(Url), is_integer(Id), is_binary(Body) -> ReceiverPid = self(), %% 异步提交任务 spawn_monitor(fun() -> @@ -216,8 +170,7 @@ do_post(#event_data{id = Id, location_code = LocationCode, dynamic_location_code {ok, 200, _, ClientRef} -> {ok, RespBody} = hackney:body(ClientRef), hackney:close(ClientRef), - iot_logger:write(LoggerPid, [Body, RespBody]), - ReceiverPid ! {ack, Id}; + ReceiverPid ! {ack, Id, Body, RespBody}; {ok, HttpCode, _, ClientRef} -> {ok, RespBody} = hackney:body(ClientRef), hackney:close(ClientRef), @@ -227,6 +180,39 @@ do_post(#event_data{id = Id, location_code = LocationCode, dynamic_location_code end end). +%% 格式话要发送的数据,避免多次格式化处理 +-spec format_events(LocationCode :: binary(), DynamicLocationCode :: binary(), Events :: list(), PriKey :: public_key:private_key()) -> binary(). +format_events(LocationCode, DynamicLocationCode, Events, PriKey) -> + % <<"occurrenceTime">> => <<"2023-06-10 12:00:00">>, + %% 格式转换 + TextEvents = lists:map(fun({_EventType, #{<<"event_code">> := EventCode, <<"description">> := Description, <<"datetime">> := Datetime, <<"attachments">> := Attachments0}}) -> + Attachments = lists:map(fun(#{<<"filename">> := Filename}) -> + {ok, FileUrl} = iot_util:file_uri(Filename), + Name = filename:basename(FileUrl), + #{<<"name">> => Name, <<"url">> => FileUrl} + end, Attachments0), + + #{ + <<"category">> => EventCode, + <<"description">> => Description, + <<"occurrenceTime">> => Datetime, + <<"attachments">> => Attachments + } + end, Events), + + DeviceInfo = #{ + <<"location">> => LocationCode, + <<"dynamic_location">> => DynamicLocationCode, + <<"events">> => TextEvents + }, + + ReqData = #{ + <<"sign">> => sign(DeviceInfo, PriKey), + <<"sysId">> => ?SYS_ID, + <<"deviceInfo">> => DeviceInfo + }, + iolist_to_binary(jiffy:encode(ReqData, [force_utf8])). + -spec generate_private_key(PriFile :: string()) -> public_key:private_key(). generate_private_key(PriFile) when is_list(PriFile) -> PriKeyFile = code:priv_dir(iot) ++ "/" ++ PriFile, @@ -271,4 +257,4 @@ serialize([{K, V}|T], Target) -> <<$[, V0/binary, $]>> end, Item = <<$", K/binary, $", $:, V1/binary>>, - serialize(T, [Item|Target]). + serialize(T, [Item|Target]). \ No newline at end of file diff --git a/apps/iot/src/iot_ai_router.erl b/apps/iot/src/iot_ai_router.erl index 42075e1..2bcbdd1 100644 --- a/apps/iot/src/iot_ai_router.erl +++ b/apps/iot/src/iot_ai_router.erl @@ -23,4 +23,16 @@ route_uuid(RouterUUID, EventType, Params) when is_binary(RouterUUID), is_integer lager:debug("[iot_ai_router] the event_data hget location_code, uuid: ~p, not found", [RouterUUID]); {error, Reason} -> lager:debug("[iot_ai_router] the event_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason]) + end. + +-spec batch_route_uuid(RouterUUID :: binary(), Events :: list()) -> no_return(). +batch_route_uuid(RouterUUID, Events) when is_binary(RouterUUID), is_list(Events) -> + %% 查找终端设备对应的点位信息 + case redis_client:hgetall(RouterUUID) of + {ok, #{<<"location_code">> := LocationCode, <<"dynamic_location_code">> := DynamicLocationCode}} when is_binary(LocationCode), is_binary(DynamicLocationCode) -> + iot_jinzhi_endpoint:batch_forward(LocationCode, DynamicLocationCode, Events); + {ok, _} -> + lager:debug("[iot_ai_router] the event_data hget location_code, uuid: ~p, not found", [RouterUUID]); + {error, Reason} -> + lager:debug("[iot_ai_router] the event_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason]) end. \ No newline at end of file diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index 95d5374..4269228 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -32,7 +32,7 @@ auth_status :: integer(), status = ?DEVICE_OFFLINE, %% 事件分组 - ai_groups = #{} + publisher_pid :: pid() }). %%%=================================================================== @@ -141,7 +141,8 @@ init([DeviceUUID]) when is_binary(DeviceUUID) -> ignore end; init([#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}]) -> - {ok, #state{device_uuid = DeviceUUID, status = as_integer(Status), auth_status = as_integer(AuthorizeStatus)}}. + {ok, PublisherPid} = iot_event_publisher:start_link(DeviceUUID), + {ok, #state{device_uuid = DeviceUUID, publisher_pid = PublisherPid, status = as_integer(Status), auth_status = as_integer(AuthorizeStatus)}}. %% @private %% @doc Handling call messages @@ -235,17 +236,9 @@ handle_cast({auth, false}, State = #state{device_uuid = DeviceUUID}) -> {noreply, State#state{auth_status = 0}}; %% ai事件的延迟整流逻辑 -handle_cast({ai_event, EventType, Params}, State = #state{ai_groups = Groups, device_uuid = DeviceUUID}) -> - GroupKey = group_by(EventType, Params), - case maps:find(GroupKey, Groups) of - {ok, PublisherPid} -> - iot_event_publisher:publish(PublisherPid, EventType, Params), - {noreply, State}; - error -> - {ok, PublisherPid} = iot_event_publisher:start_link(DeviceUUID, GroupKey), - iot_event_publisher:publish(PublisherPid, EventType, Params), - {noreply, State#state{ai_groups = maps:put(GroupKey, PublisherPid, Groups)}} - end. +handle_cast({ai_event, EventType, Params}, State = #state{publisher_pid = PublisherPid}) -> + iot_event_publisher:publish(PublisherPid, EventType, Params), + {noreply, State}. %% @private %% @doc Handling all non call/cast messages @@ -325,8 +318,4 @@ filter_tags(L, Tags) when map_size(Tags) =:= 0 -> as_integer(Val) when is_integer(Val) -> Val; as_integer(Val) when is_binary(Val) -> - binary_to_integer(Val). - -%% 事件分组函数 -group_by(EventType, _Params) -> - EventType. \ No newline at end of file + binary_to_integer(Val). \ No newline at end of file diff --git a/apps/iot/src/iot_event_publisher.erl b/apps/iot/src/iot_event_publisher.erl index 0b65c4c..ba41522 100644 --- a/apps/iot/src/iot_event_publisher.erl +++ b/apps/iot/src/iot_event_publisher.erl @@ -12,22 +12,23 @@ -behaviour(gen_server). %% API --export([start_link/2, publish/3, set_throttle/2]). +-export([start_link/1, publish/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(SERVER, ?MODULE). +%% 单个任务 +-record(task, { + counter = 0, + %% 缓存周期内的数据 + buffer = [] +}). + -record(state, { device_uuid :: binary(), - timer_ref = undefined :: undefined | reference(), - group_key :: any(), - %% 已经发送的事件的计数器 - counter = 0, - last_event = undefined :: any(), - %% 最后一次发送数据的时间戳, 如果数据跨越了多个时间跨度时也需要立即发送出去 - last_timestamp = 0 :: integer() + group_tasks = #{} :: map() }). %%%=================================================================== @@ -38,15 +39,11 @@ publish(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_map(Params) -> gen_server:cast(Pid, {publish, EventType, Params}). --spec set_throttle(Pid :: pid(), ThrottleTime :: integer()) -> no_return(). -set_throttle(Pid, ThrottleTime) when is_pid(Pid), is_integer(ThrottleTime), ThrottleTime > 0 -> - gen_server:cast(Pid, {set_throttle, ThrottleTime}). - %% @doc Spawns the server and registers the local name (unique) --spec(start_link(DeviceUUID :: binary(), GroupKey :: any()) -> +-spec(start_link(DeviceUUID :: binary()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(DeviceUUID, GroupKey) when is_binary(DeviceUUID) -> - gen_server:start_link(?MODULE, [DeviceUUID, GroupKey], []). +start_link(DeviceUUID) when is_binary(DeviceUUID) -> + gen_server:start_link(?MODULE, [DeviceUUID], []). %%%=================================================================== %%% gen_server callbacks @@ -57,8 +54,8 @@ start_link(DeviceUUID, GroupKey) when is_binary(DeviceUUID) -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([DeviceUUID, GroupKey]) -> - {ok, #state{device_uuid = DeviceUUID, group_key = GroupKey, last_timestamp = iot_util:timestamp_of_seconds(), last_event = undefined}}. +init([DeviceUUID]) -> + {ok, #state{device_uuid = DeviceUUID}}. %% @private %% @doc Handling call messages @@ -80,29 +77,21 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 第一次收到消息,则立即发送; 并且重置定时器 -handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, group_key = GroupKey, counter = 0}) -> - %% 重置定时器 - ThrottleTime = iot_event_period_settings:get_throttle(GroupKey), - TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), - %% 发送消息 - iot_ai_router:route_uuid(DeviceUUID, EventType, Params), - {noreply, State#state{counter = 1, timer_ref = TimerRef, last_timestamp = iot_util:timestamp_of_seconds()}}; +handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, group_tasks = GroupTasks}) -> + GroupKey = group_by(EventType, Params), -%% 缓冲数据,等到时间到了再发送 -handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, timer_ref = TimerRef, counter = Counter, last_timestamp = LastTimestamp, group_key = GroupKey}) -> - ThrottleTime = iot_event_period_settings:get_throttle(GroupKey), - Timestamp = iot_util:timestamp_of_seconds(), - %% 如果数据发送间隔已经超过了一个时间跨度,也需要立即发送 - case Timestamp > LastTimestamp + ThrottleTime of - true -> + case maps:find(GroupKey, GroupTasks) of + {ok, Task0 = #task{counter = Counter, buffer = Buffer}} -> + Task = Task0#task{counter = Counter + 1, buffer = [{EventType, Params}|Buffer]}, + {noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}}; + error -> %% 重置定时器 - is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), - NTimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), + ThrottleTime = iot_event_period_settings:get_throttle(GroupKey), + erlang:start_timer(ThrottleTime * 1000, self(), {throttle_ticker, GroupKey}), %% 发送消息 iot_ai_router:route_uuid(DeviceUUID, EventType, Params), - {noreply, State#state{counter = Counter + 1, timer_ref = NTimerRef, last_timestamp = iot_util:timestamp_of_seconds()}}; - false -> - {noreply, State#state{last_event = {EventType, Params}}} + Task = #task{buffer = [], counter = 1}, + {noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}} end. %% @private @@ -111,18 +100,21 @@ handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUI {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -%% 周期时间内没有任何数据 -handle_info({timeout, _, throttle_ticker}, State = #state{group_key = GroupKey, last_event = undefined}) -> - ThrottleTime = iot_event_period_settings:get_throttle(GroupKey), - TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), - {noreply, State#state{timer_ref = TimerRef}}; -%% 已经发送的数据也需要重置,避免推送重复的数据 -handle_info({timeout, _, throttle_ticker}, State = #state{device_uuid = DeviceUUID, group_key = GroupKey, last_event = {EventType, Params}}) -> +handle_info({timeout, _, {throttle_ticker, GroupKey}}, State = #state{device_uuid = DeviceUUID, group_tasks = GroupTasks}) -> ThrottleTime = iot_event_period_settings:get_throttle(GroupKey), - TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), - iot_ai_router:route_uuid(DeviceUUID, EventType, Params), - {noreply, State#state{timer_ref = TimerRef, last_event = undefined, last_timestamp = iot_util:timestamp_of_seconds()}}. + erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), + + case maps:find(GroupKey, GroupTasks) of + {ok, Task0 = #task{buffer = Buffer}} -> + Events = lists:reverse(Buffer), + iot_ai_router:route_uuid(DeviceUUID, Events), + + Task = Task0#task{buffer = []}, + {noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}}; + error -> + {noreply, State} + end. %% @private %% @doc This function is called by a gen_server when it is about to @@ -145,3 +137,7 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +%% 事件分组函数 +group_by(EventType, _Params) -> + EventType. \ No newline at end of file