From 2062c5612ca31e478d495dd674e0c09965154912 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 12 Aug 2024 22:37:01 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AE=80=E5=8C=96=E6=8E=A8=E9=80=81=E7=BB=99?= =?UTF-8?q?=E9=87=91=E6=99=BA=E7=9A=84=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/iot/src/endpoint/iot_jinzhi_endpoint.erl | 55 ++++--- apps/iot/src/iot_ai_router.erl | 8 +- apps/iot/src/iot_device.erl | 11 +- apps/iot/src/iot_event_publisher.erl | 136 ------------------ apps/iot/src/iot_host.erl | 1 - 5 files changed, 42 insertions(+), 169 deletions(-) delete mode 100644 apps/iot/src/iot_event_publisher.erl diff --git a/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl b/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl index 3a74dcb..29cfeb0 100644 --- a/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl +++ b/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl @@ -14,7 +14,7 @@ %% API -export([start_link/0]). --export([get_pid/0, batch_forward/3]). +-export([get_pid/0, forward/4]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -43,9 +43,9 @@ get_pid() -> whereis(?MODULE). --spec batch_forward(LocationCode :: binary(), DynamicLocationCode :: binary(), Events :: list()) -> no_return(). -batch_forward(LocationCode, DynamicLocationCode, Events) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_list(Events) -> - gen_server:cast(?MODULE, {batch_forward, LocationCode, DynamicLocationCode, Events}). +-spec forward(LocationCode :: binary(), DynamicLocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return(). +forward(LocationCode, DynamicLocationCode, EventType, Params) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_integer(EventType), is_map(Params) -> + gen_server:cast(?MODULE, {forward, LocationCode, DynamicLocationCode, EventType, Params}). %% @doc Spawns the server and registers the local name (unique) -spec(start_link() -> @@ -92,8 +92,8 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({batch_forward, LocationCode, DynamicLocationCode, Events}, State = #state{id = Id, timer_map = TimerMap, pri_key = PriKey, url = Url}) -> - ReqBody = format_events(LocationCode, DynamicLocationCode, Events, PriKey), +handle_cast({forward, LocationCode, DynamicLocationCode, EventType, Params}, State = #state{id = Id, timer_map = TimerMap, pri_key = PriKey, url = Url}) -> + ReqBody = format_events(LocationCode, DynamicLocationCode, EventType, Params, PriKey), catch do_post(Url, Id, ReqBody), TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Id, ReqBody}), @@ -174,34 +174,47 @@ do_post(Url, Id, Body) when is_list(Url), is_integer(Id), is_binary(Body) -> end). %% 格式话要发送的数据,避免多次格式化处理 --spec format_events(LocationCode :: binary(), DynamicLocationCode :: binary(), Events :: list(), PriKey :: public_key:private_key()) -> binary(). -format_events(LocationCode, DynamicLocationCode, Events, PriKey) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_list(Events) -> - DeviceInfo = format_event(LocationCode, hd(Events)), - ReqData = #{ - <<"sign">> => sign(DeviceInfo, PriKey), - <<"sysId">> => ?SYS_ID, - <<"taskId">> => <<"">>, - <<"count">> => length(Events), - <<"deviceInfo">> => DeviceInfo - }, - iolist_to_binary(jiffy:encode(ReqData, [force_utf8])). +-spec format_events(LocationCode :: binary(), DynamicLocationCode :: binary(), EventType :: integer(), Params :: map(), PriKey :: public_key:private_key()) -> binary(). +format_events(LocationCode, DynamicLocationCode, EventType, #{<<"event_code">> := EventCode, <<"description">> := Description, <<"datetime">> := Datetime, <<"attachments">> := Attachments0}, PriKey) + when is_binary(LocationCode), is_binary(DynamicLocationCode), is_integer(EventType) -> -format_event(LocationCode, {_EventType, #{<<"event_code">> := EventCode, <<"description">> := Description, <<"datetime">> := Datetime, <<"attachments">> := Attachments0}}) -> Attachments = lists:map(fun(#{<<"filename">> := Filename}) -> {ok, FileUrl} = iot_util:file_uri(Filename), Name = filename:basename(FileUrl), #{<<"name">> => Name, <<"url">> => FileUrl} - end, Attachments0), + end, Attachments0), % <<"occurrenceTime">> => <<"2023-06-10 12:00:00">>, - #{ + DeviceInfo = #{ <<"location">> => LocationCode, <<"category">> => EventCode, <<"description">> => Description, <<"occurrenceTime">> => Datetime, <<"attachments">> => Attachments - }. + }, + + ReqData = #{ + <<"sign">> => sign(DeviceInfo, PriKey), + <<"sysId">> => ?SYS_ID, + <<"taskId">> => generate_task_id(DynamicLocationCode, EventCode), + <<"count">> => 1, + <<"deviceInfo">> => DeviceInfo + }, + iolist_to_binary(jiffy:encode(ReqData, [force_utf8])). + +-spec generate_task_id(DynamicLocationCode :: binary(), EventCode :: binary()) -> binary(). +generate_task_id(DynamicLocationCode, EventCode) when is_binary(DynamicLocationCode), is_binary(EventCode) -> + case byte_size(DynamicLocationCode) of + 22 -> + LocCode = binary_part(DynamicLocationCode, 0, 17), + <>; + 28 -> + LocCode = binary_part(DynamicLocationCode, 0, 22), + <>; + _ -> + <> + end. -spec generate_private_key(PriFile :: string()) -> public_key:private_key(). generate_private_key(PriFile) when is_list(PriFile) -> diff --git a/apps/iot/src/iot_ai_router.erl b/apps/iot/src/iot_ai_router.erl index 6376840..42075e1 100644 --- a/apps/iot/src/iot_ai_router.erl +++ b/apps/iot/src/iot_ai_router.erl @@ -11,14 +11,14 @@ -include("iot.hrl"). %% API --export([batch_route_uuid/2]). +-export([route_uuid/3]). --spec batch_route_uuid(RouterUUID :: binary(), Events :: list()) -> no_return(). -batch_route_uuid(RouterUUID, Events) when is_binary(RouterUUID), is_list(Events) -> +-spec route_uuid(RouterUUID :: binary(), EventType :: integer(), Params :: map()) -> no_return(). +route_uuid(RouterUUID, EventType, Params) when is_binary(RouterUUID), is_integer(EventType), is_map(Params) -> %% 查找终端设备对应的点位信息 case redis_client:hgetall(RouterUUID) of {ok, #{<<"location_code">> := LocationCode, <<"dynamic_location_code">> := DynamicLocationCode}} when is_binary(LocationCode), is_binary(DynamicLocationCode) -> - iot_jinzhi_endpoint:batch_forward(LocationCode, DynamicLocationCode, Events); + iot_jinzhi_endpoint:forward(LocationCode, DynamicLocationCode, EventType, Params); {ok, _} -> lager:debug("[iot_ai_router] the event_data hget location_code, uuid: ~p, not found", [RouterUUID]); {error, Reason} -> diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index 4269228..b6c879b 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -30,9 +30,7 @@ queue = queue:new(), %% 设备是否授权 auth_status :: integer(), - status = ?DEVICE_OFFLINE, - %% 事件分组 - publisher_pid :: pid() + status = ?DEVICE_OFFLINE }). %%%=================================================================== @@ -141,8 +139,7 @@ init([DeviceUUID]) when is_binary(DeviceUUID) -> ignore end; init([#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}]) -> - {ok, PublisherPid} = iot_event_publisher:start_link(DeviceUUID), - {ok, #state{device_uuid = DeviceUUID, publisher_pid = PublisherPid, status = as_integer(Status), auth_status = as_integer(AuthorizeStatus)}}. + {ok, #state{device_uuid = DeviceUUID, status = as_integer(Status), auth_status = as_integer(AuthorizeStatus)}}. %% @private %% @doc Handling call messages @@ -236,8 +233,8 @@ handle_cast({auth, false}, State = #state{device_uuid = DeviceUUID}) -> {noreply, State#state{auth_status = 0}}; %% ai事件的延迟整流逻辑 -handle_cast({ai_event, EventType, Params}, State = #state{publisher_pid = PublisherPid}) -> - iot_event_publisher:publish(PublisherPid, EventType, Params), +handle_cast({ai_event, EventType, Params}, State = #state{device_uuid = DeviceUUID}) -> + iot_ai_router:route_uuid(DeviceUUID, EventType, Params), {noreply, State}. %% @private diff --git a/apps/iot/src/iot_event_publisher.erl b/apps/iot/src/iot_event_publisher.erl deleted file mode 100644 index 37a165e..0000000 --- a/apps/iot/src/iot_event_publisher.erl +++ /dev/null @@ -1,136 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author anlicheng -%%% @copyright (C) 2024, -%%% @doc -%%% 事件发布器,提供基于时间周期的缓冲; -%%% !!! 暂时弃用,改回了原来最简单的推送方式了; 2024-08-12 -%%% @end -%%% Created : 11. 7月 2024 14:40 -%%%------------------------------------------------------------------- --module(iot_event_publisher). --author("anlicheng"). - --behaviour(gen_server). - -%% API --export([start_link/1, publish/3]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - --record(state, { - device_uuid :: binary(), - group_buffers = #{} :: map() -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - --spec publish(Pid :: pid(), EventType :: integer(), Params :: map()) -> no_return(). -publish(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_map(Params) -> - gen_server:cast(Pid, {publish, EventType, Params}). - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link(DeviceUUID :: binary()) -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(DeviceUUID) when is_binary(DeviceUUID) -> - gen_server:start_link(?MODULE, [DeviceUUID], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([DeviceUUID]) -> - {ok, #state{device_uuid = DeviceUUID}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -%% 第一次收到消息,则立即发送; 并且重置定时器 -handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, group_buffers = GroupBuffers}) -> - GroupKey = group_by(EventType, Params), - - case maps:find(GroupKey, GroupBuffers) of - {ok, Buffer} -> - NBuffer = [{EventType, Params}|Buffer], - {noreply, State#state{group_buffers = maps:put(GroupKey, NBuffer, GroupBuffers)}}; - error -> - ThrottleTime = iot_event_period_settings:get_throttle(GroupKey), - erlang:start_timer(ThrottleTime * 1000, self(), {throttle_ticker, GroupKey}), - %% 发送消息 - iot_ai_router:batch_route_uuid(DeviceUUID, [{EventType, Params}]), - {noreply, State#state{group_buffers = maps:put(GroupKey, [], GroupBuffers)}} - end. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info({timeout, _, {throttle_ticker, GroupKey}}, State = #state{device_uuid = DeviceUUID, group_buffers = GroupBuffers}) -> - ThrottleTime = iot_event_period_settings:get_throttle(GroupKey), - erlang:start_timer(ThrottleTime * 1000, self(), {throttle_ticker, GroupKey}), - - case maps:find(GroupKey, GroupBuffers) of - {ok, Buffer} -> - case length(Buffer) > 0 of - true -> - iot_ai_router:batch_route_uuid(DeviceUUID, Buffer); - false -> - ok - end, - {noreply, State#state{group_buffers = maps:put(GroupKey, [], GroupBuffers)}}; - error -> - {noreply, State} - end. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - -%% 事件分组函数 -group_by(_EventType, #{<<"event_code">> := EventCode}) -> - EventCode. \ No newline at end of file diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index f61ea79..94432f5 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -497,7 +497,6 @@ handle_event(cast, {handle, {ai_event, Event0}}, ?STATE_ACTIVATED, State = #stat end, iot_device:change_status(DevicePid, ?DEVICE_ONLINE), - %iot_ai_router:route_uuid(DeviceUUID, EventType, Params) iot_device:ai_event(DevicePid, EventType, Params) end; Event when is_map(Event) ->