diff --git a/apps/iot/src/iot_ai_router.erl b/apps/iot/src/iot_ai_router.erl index 2bcbdd1..6376840 100644 --- a/apps/iot/src/iot_ai_router.erl +++ b/apps/iot/src/iot_ai_router.erl @@ -11,19 +11,7 @@ -include("iot.hrl"). %% API --export([route_uuid/3]). - --spec route_uuid(RouterUUID :: binary(), EventType :: integer(), Params :: map()) -> no_return(). -route_uuid(RouterUUID, EventType, Params) when is_binary(RouterUUID), is_integer(EventType), is_map(Params) -> - %% 查找终端设备对应的点位信息 - case redis_client:hgetall(RouterUUID) of - {ok, #{<<"location_code">> := LocationCode, <<"dynamic_location_code">> := DynamicLocationCode}} when is_binary(LocationCode), is_binary(DynamicLocationCode) -> - iot_jinzhi_endpoint:forward(LocationCode, DynamicLocationCode, EventType, Params); - {ok, _} -> - lager:debug("[iot_ai_router] the event_data hget location_code, uuid: ~p, not found", [RouterUUID]); - {error, Reason} -> - lager:debug("[iot_ai_router] the event_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason]) - end. +-export([batch_route_uuid/2]). -spec batch_route_uuid(RouterUUID :: binary(), Events :: list()) -> no_return(). batch_route_uuid(RouterUUID, Events) when is_binary(RouterUUID), is_list(Events) -> diff --git a/apps/iot/src/iot_event_period_settings.erl b/apps/iot/src/iot_event_period_settings.erl index 37f4881..de06053 100644 --- a/apps/iot/src/iot_event_period_settings.erl +++ b/apps/iot/src/iot_event_period_settings.erl @@ -73,8 +73,9 @@ start_link() -> {stop, Reason :: term()} | ignore). init([]) -> ets:new(?TAB_NAME, [public, set, named_table, {keypos, 2}]), - settings(iot_api:get_event_period_settings()), + catch settings(), erlang:start_timer(?UPDATE_TICKER * 1000, self(), update_ticker), + {ok, #state{}}. %% @private @@ -106,7 +107,7 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). handle_info({timeout, _, update_ticker}, State = #state{}) -> - settings(iot_api:get_event_period_settings()), + catch settings(), erlang:start_timer(?UPDATE_TICKER * 1000, self(), update_ticker), {noreply, State}. @@ -132,20 +133,23 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== --spec settings(tuple()) -> no_return(). -settings({ok, Resp}) when is_binary(Resp) -> - case catch jiffy:decode(Resp, [return_maps]) of - #{<<"code">> := 200, <<"data">> := Settings} when is_list(Settings) -> - lists:foreach(fun(#{<<"event_code">> := GroupKey, <<"time_period">> := Throttle}) -> - case is_integer(Throttle) andalso Throttle > 0 of - true -> - ets:insert(?TAB_NAME, #period{group_key = GroupKey, throttle = Throttle}); - false -> - ok - end - end, Settings); - Error -> - lager:debug("[iot_event_period_settings] get event_period_settings from api get error: ~p", [Error]) - end; -settings({error, Reason}) -> - lager:debug("[iot_event_period_settings] get event_period_settings from api get error: ~p", [Reason]). \ No newline at end of file +-spec settings() -> no_return(). +settings() -> + case iot_api:get_event_period_settings() of + {ok, Resp} -> + case catch jiffy:decode(Resp, [return_maps]) of + #{<<"code">> := 200, <<"data">> := Settings} when is_list(Settings) -> + lists:foreach(fun(#{<<"event_code">> := GroupKey, <<"time_period">> := Throttle}) -> + case is_integer(Throttle) andalso Throttle > 0 of + true -> + ets:insert(?TAB_NAME, #period{group_key = GroupKey, throttle = Throttle}); + false -> + ok + end + end, Settings); + Error -> + lager:debug("[iot_event_period_settings] get event_period_settings from api get error: ~p", [Error]) + end; + {error, Reason} -> + lager:debug("[iot_event_period_settings] get event_period_settings from api get error: ~p", [Reason]) + end. \ No newline at end of file diff --git a/apps/iot/src/iot_event_publisher.erl b/apps/iot/src/iot_event_publisher.erl index ba41522..4e19835 100644 --- a/apps/iot/src/iot_event_publisher.erl +++ b/apps/iot/src/iot_event_publisher.erl @@ -85,11 +85,10 @@ handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUI Task = Task0#task{counter = Counter + 1, buffer = [{EventType, Params}|Buffer]}, {noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}}; error -> - %% 重置定时器 ThrottleTime = iot_event_period_settings:get_throttle(GroupKey), erlang:start_timer(ThrottleTime * 1000, self(), {throttle_ticker, GroupKey}), %% 发送消息 - iot_ai_router:route_uuid(DeviceUUID, EventType, Params), + iot_ai_router:batch_route_uuid(DeviceUUID, [{EventType, Params}]), Task = #task{buffer = [], counter = 1}, {noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}} end. @@ -100,16 +99,19 @@ handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUI {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). - handle_info({timeout, _, {throttle_ticker, GroupKey}}, State = #state{device_uuid = DeviceUUID, group_tasks = GroupTasks}) -> ThrottleTime = iot_event_period_settings:get_throttle(GroupKey), - erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker), + erlang:start_timer(ThrottleTime * 1000, self(), {throttle_ticker, GroupKey}), case maps:find(GroupKey, GroupTasks) of {ok, Task0 = #task{buffer = Buffer}} -> - Events = lists:reverse(Buffer), - iot_ai_router:route_uuid(DeviceUUID, Events), - + case length(Buffer) > 0 of + true -> + Events = lists:reverse(Buffer), + iot_ai_router:batch_route_uuid(DeviceUUID, Events); + false -> + ok + end, Task = Task0#task{buffer = []}, {noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}}; error -> @@ -139,5 +141,5 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%%=================================================================== %% 事件分组函数 -group_by(EventType, _Params) -> - EventType. \ No newline at end of file +group_by(_EventType, #{<<"event_code">> := EventCode}) -> + EventCode. \ No newline at end of file