fix jinzhi endpoint
This commit is contained in:
parent
d4130f94b4
commit
150de00f3b
@ -11,19 +11,7 @@
|
|||||||
-include("iot.hrl").
|
-include("iot.hrl").
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([route_uuid/3]).
|
-export([batch_route_uuid/2]).
|
||||||
|
|
||||||
-spec route_uuid(RouterUUID :: binary(), EventType :: integer(), Params :: map()) -> no_return().
|
|
||||||
route_uuid(RouterUUID, EventType, Params) when is_binary(RouterUUID), is_integer(EventType), is_map(Params) ->
|
|
||||||
%% 查找终端设备对应的点位信息
|
|
||||||
case redis_client:hgetall(RouterUUID) of
|
|
||||||
{ok, #{<<"location_code">> := LocationCode, <<"dynamic_location_code">> := DynamicLocationCode}} when is_binary(LocationCode), is_binary(DynamicLocationCode) ->
|
|
||||||
iot_jinzhi_endpoint:forward(LocationCode, DynamicLocationCode, EventType, Params);
|
|
||||||
{ok, _} ->
|
|
||||||
lager:debug("[iot_ai_router] the event_data hget location_code, uuid: ~p, not found", [RouterUUID]);
|
|
||||||
{error, Reason} ->
|
|
||||||
lager:debug("[iot_ai_router] the event_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason])
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec batch_route_uuid(RouterUUID :: binary(), Events :: list()) -> no_return().
|
-spec batch_route_uuid(RouterUUID :: binary(), Events :: list()) -> no_return().
|
||||||
batch_route_uuid(RouterUUID, Events) when is_binary(RouterUUID), is_list(Events) ->
|
batch_route_uuid(RouterUUID, Events) when is_binary(RouterUUID), is_list(Events) ->
|
||||||
|
|||||||
@ -73,8 +73,9 @@ start_link() ->
|
|||||||
{stop, Reason :: term()} | ignore).
|
{stop, Reason :: term()} | ignore).
|
||||||
init([]) ->
|
init([]) ->
|
||||||
ets:new(?TAB_NAME, [public, set, named_table, {keypos, 2}]),
|
ets:new(?TAB_NAME, [public, set, named_table, {keypos, 2}]),
|
||||||
settings(iot_api:get_event_period_settings()),
|
catch settings(),
|
||||||
erlang:start_timer(?UPDATE_TICKER * 1000, self(), update_ticker),
|
erlang:start_timer(?UPDATE_TICKER * 1000, self(), update_ticker),
|
||||||
|
|
||||||
{ok, #state{}}.
|
{ok, #state{}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
@ -106,7 +107,7 @@ handle_cast(_Request, State = #state{}) ->
|
|||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_info({timeout, _, update_ticker}, State = #state{}) ->
|
handle_info({timeout, _, update_ticker}, State = #state{}) ->
|
||||||
settings(iot_api:get_event_period_settings()),
|
catch settings(),
|
||||||
erlang:start_timer(?UPDATE_TICKER * 1000, self(), update_ticker),
|
erlang:start_timer(?UPDATE_TICKER * 1000, self(), update_ticker),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
@ -132,8 +133,10 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
|
|||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
-spec settings(tuple()) -> no_return().
|
-spec settings() -> no_return().
|
||||||
settings({ok, Resp}) when is_binary(Resp) ->
|
settings() ->
|
||||||
|
case iot_api:get_event_period_settings() of
|
||||||
|
{ok, Resp} ->
|
||||||
case catch jiffy:decode(Resp, [return_maps]) of
|
case catch jiffy:decode(Resp, [return_maps]) of
|
||||||
#{<<"code">> := 200, <<"data">> := Settings} when is_list(Settings) ->
|
#{<<"code">> := 200, <<"data">> := Settings} when is_list(Settings) ->
|
||||||
lists:foreach(fun(#{<<"event_code">> := GroupKey, <<"time_period">> := Throttle}) ->
|
lists:foreach(fun(#{<<"event_code">> := GroupKey, <<"time_period">> := Throttle}) ->
|
||||||
@ -147,5 +150,6 @@ settings({ok, Resp}) when is_binary(Resp) ->
|
|||||||
Error ->
|
Error ->
|
||||||
lager:debug("[iot_event_period_settings] get event_period_settings from api get error: ~p", [Error])
|
lager:debug("[iot_event_period_settings] get event_period_settings from api get error: ~p", [Error])
|
||||||
end;
|
end;
|
||||||
settings({error, Reason}) ->
|
{error, Reason} ->
|
||||||
lager:debug("[iot_event_period_settings] get event_period_settings from api get error: ~p", [Reason]).
|
lager:debug("[iot_event_period_settings] get event_period_settings from api get error: ~p", [Reason])
|
||||||
|
end.
|
||||||
@ -85,11 +85,10 @@ handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUI
|
|||||||
Task = Task0#task{counter = Counter + 1, buffer = [{EventType, Params}|Buffer]},
|
Task = Task0#task{counter = Counter + 1, buffer = [{EventType, Params}|Buffer]},
|
||||||
{noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}};
|
{noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}};
|
||||||
error ->
|
error ->
|
||||||
%% 重置定时器
|
|
||||||
ThrottleTime = iot_event_period_settings:get_throttle(GroupKey),
|
ThrottleTime = iot_event_period_settings:get_throttle(GroupKey),
|
||||||
erlang:start_timer(ThrottleTime * 1000, self(), {throttle_ticker, GroupKey}),
|
erlang:start_timer(ThrottleTime * 1000, self(), {throttle_ticker, GroupKey}),
|
||||||
%% 发送消息
|
%% 发送消息
|
||||||
iot_ai_router:route_uuid(DeviceUUID, EventType, Params),
|
iot_ai_router:batch_route_uuid(DeviceUUID, [{EventType, Params}]),
|
||||||
Task = #task{buffer = [], counter = 1},
|
Task = #task{buffer = [], counter = 1},
|
||||||
{noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}}
|
{noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}}
|
||||||
end.
|
end.
|
||||||
@ -100,16 +99,19 @@ handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUI
|
|||||||
{noreply, NewState :: #state{}} |
|
{noreply, NewState :: #state{}} |
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
|
||||||
handle_info({timeout, _, {throttle_ticker, GroupKey}}, State = #state{device_uuid = DeviceUUID, group_tasks = GroupTasks}) ->
|
handle_info({timeout, _, {throttle_ticker, GroupKey}}, State = #state{device_uuid = DeviceUUID, group_tasks = GroupTasks}) ->
|
||||||
ThrottleTime = iot_event_period_settings:get_throttle(GroupKey),
|
ThrottleTime = iot_event_period_settings:get_throttle(GroupKey),
|
||||||
erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker),
|
erlang:start_timer(ThrottleTime * 1000, self(), {throttle_ticker, GroupKey}),
|
||||||
|
|
||||||
case maps:find(GroupKey, GroupTasks) of
|
case maps:find(GroupKey, GroupTasks) of
|
||||||
{ok, Task0 = #task{buffer = Buffer}} ->
|
{ok, Task0 = #task{buffer = Buffer}} ->
|
||||||
|
case length(Buffer) > 0 of
|
||||||
|
true ->
|
||||||
Events = lists:reverse(Buffer),
|
Events = lists:reverse(Buffer),
|
||||||
iot_ai_router:route_uuid(DeviceUUID, Events),
|
iot_ai_router:batch_route_uuid(DeviceUUID, Events);
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
Task = Task0#task{buffer = []},
|
Task = Task0#task{buffer = []},
|
||||||
{noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}};
|
{noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}};
|
||||||
error ->
|
error ->
|
||||||
@ -139,5 +141,5 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
|
|||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
%% 事件分组函数
|
%% 事件分组函数
|
||||||
group_by(EventType, _Params) ->
|
group_by(_EventType, #{<<"event_code">> := EventCode}) ->
|
||||||
EventType.
|
EventCode.
|
||||||
Loading…
x
Reference in New Issue
Block a user