simple jinzhi_endpoint
This commit is contained in:
parent
60d5ccb13d
commit
1f2fcfea2d
@ -1,41 +1,45 @@
|
|||||||
%%%-------------------------------------------------------------------
|
%%%-------------------------------------------------------------------
|
||||||
%%% @author aresei
|
%%% @author anlicheng
|
||||||
%%% @copyright (C) 2023, <COMPANY>
|
%%% @copyright (C) 2024, <COMPANY>
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%% Created : 06. 7月 2023 12:02
|
%%% Created : 23. 7月 2024 14:51
|
||||||
%%%-------------------------------------------------------------------
|
%%%-------------------------------------------------------------------
|
||||||
-module(iot_jinzhi_endpoint).
|
-module(iot_jinzhi_endpoint).
|
||||||
-author("aresei").
|
-author("anlicheng").
|
||||||
-include("iot.hrl").
|
|
||||||
|
|
||||||
-behaviour(gen_statem).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
-export([get_pid/0, forward/4, get_stat/0]).
|
|
||||||
|
|
||||||
%% gen_statem callbacks
|
-export([get_pid/0, batch_forward/3]).
|
||||||
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
%% 消息重发间隔
|
%% 消息重发间隔
|
||||||
-define(RETRY_INTERVAL, 5000).
|
-define(RETRY_INTERVAL, 5000).
|
||||||
%% 系统id
|
%% 系统id
|
||||||
-define(SYS_ID, <<"ZNWLZJJKXT">>).
|
-define(SYS_ID, <<"ZNWLZJJKXT">>).
|
||||||
|
|
||||||
|
-record(task, {
|
||||||
|
id :: integer(),
|
||||||
|
location_code :: binary(),
|
||||||
|
dynamic_location_code :: binary(),
|
||||||
|
events =[] :: list()
|
||||||
|
}).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
|
id = 1,
|
||||||
url :: string(),
|
url :: string(),
|
||||||
logger_pid :: pid(),
|
logger_pid :: pid(),
|
||||||
pool_size = 0,
|
|
||||||
flight_num = 0,
|
|
||||||
pri_key :: public_key:private_key(),
|
pri_key :: public_key:private_key(),
|
||||||
id = 1,
|
|
||||||
queue :: queue:queue(),
|
|
||||||
%% 定时器对应关系
|
%% 定时器对应关系
|
||||||
timer_map = #{},
|
timer_map = #{}
|
||||||
%% 记录成功处理的消息数
|
|
||||||
acc_num = 0
|
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
@ -46,166 +50,116 @@
|
|||||||
get_pid() ->
|
get_pid() ->
|
||||||
whereis(?MODULE).
|
whereis(?MODULE).
|
||||||
|
|
||||||
-spec forward(LocationCode :: binary(), DynamicLocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return().
|
-spec batch_forward(LocationCode :: binary(), DynamicLocationCode :: binary(), Events :: list()) -> no_return().
|
||||||
forward(LocationCode, DynamicLocationCode, EventType, Params) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_integer(EventType), is_map(Params) ->
|
batch_forward(LocationCode, DynamicLocationCode, Events) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_list(Events) ->
|
||||||
gen_statem:cast(?MODULE, {forward, LocationCode, DynamicLocationCode, EventType, Params}).
|
gen_server:cast(?MODULE, {batch_forward, LocationCode, DynamicLocationCode, Events}).
|
||||||
|
|
||||||
-spec get_stat() -> {ok, Stat :: #{}}.
|
%% @doc Spawns the server and registers the local name (unique)
|
||||||
get_stat() ->
|
-spec(start_link() ->
|
||||||
gen_statem:call(?MODULE, get_stat, 5000).
|
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||||
|
|
||||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
|
||||||
%% initialize. To ensure a synchronized start-up procedure, this
|
|
||||||
%% function does not return until Module:init/1 has returned.
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% gen_statem callbacks
|
%%% gen_server callbacks
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
%% @doc Initializes the server
|
||||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
-spec(init(Args :: term()) ->
|
||||||
%% process to initialize.
|
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term()} | ignore).
|
||||||
init([]) ->
|
init([]) ->
|
||||||
{ok, Opts} = application:get_env(iot, jinzhi),
|
{ok, Opts} = application:get_env(iot, jinzhi),
|
||||||
|
|
||||||
PoolSize = proplists:get_value(pool_size, Opts),
|
|
||||||
PriFile = proplists:get_value(pri_key, Opts),
|
PriFile = proplists:get_value(pri_key, Opts),
|
||||||
Url = proplists:get_value(url, Opts),
|
Url = proplists:get_value(url, Opts),
|
||||||
|
|
||||||
{ok, LoggerPid} = iot_logger:start_link("ai_event_data"),
|
{ok, LoggerPid} = iot_logger:start_link("ai_event_data"),
|
||||||
PriKey = generate_private_key(PriFile),
|
PriKey = generate_private_key(PriFile),
|
||||||
|
|
||||||
{ok, connected, #state{url = Url, logger_pid = LoggerPid, pri_key = PriKey, pool_size = PoolSize, queue = queue:new()}}.
|
{ok, #state{url = Url, logger_pid = LoggerPid, pri_key = PriKey}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc This function is called by a gen_statem when it needs to find out
|
%% @doc Handling call messages
|
||||||
%% the callback mode of the callback module.
|
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||||
callback_mode() ->
|
State :: #state{}) ->
|
||||||
handle_event_function.
|
{reply, Reply :: term(), NewState :: #state{}} |
|
||||||
|
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{noreply, NewState :: #state{}} |
|
||||||
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
handle_call(_Request, _From, State = #state{}) ->
|
||||||
|
{reply, ok, State}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc There should be one instance of this function for each possible
|
%% @doc Handling cast messages
|
||||||
%% state name. If callback_mode is state_functions, one of these
|
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||||
%% functions is called when gen_statem receives and event from
|
{noreply, NewState :: #state{}} |
|
||||||
%% call/2, cast/2, or as a normal process message.
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
handle_cast({batch_forward, LocationCode, DynamicLocationCode, Events}, State = #state{id = Id, timer_map = TimerMap, pri_key = PriKey, url = Url}) ->
|
||||||
|
ReqBody = format_events(LocationCode, DynamicLocationCode, Events, PriKey),
|
||||||
|
catch do_post(Url, Id, ReqBody),
|
||||||
|
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Id, ReqBody}),
|
||||||
|
|
||||||
handle_event(cast, {forward, LocationCode, DynamicLocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) ->
|
{noreply, State#state{id = Id + 1, timer_map = maps:put(Id, TimerRef, TimerMap)}}.
|
||||||
EventData = #event_data{id = Id, location_code = LocationCode, dynamic_location_code = DynamicLocationCode, event_type = EventType, params = Params},
|
|
||||||
%% 避免不必要的内部消息
|
|
||||||
Actions = case FlightNum < PoolSize of
|
|
||||||
true -> [{next_event, info, fetch_next}];
|
|
||||||
false -> []
|
|
||||||
end,
|
|
||||||
{keep_state, State#state{queue = queue:in(EventData, Q), id = Id + 1}, Actions};
|
|
||||||
|
|
||||||
%% 触发读取下一条数据
|
%% @private
|
||||||
handle_event(info, fetch_next, _, State = #state{queue = Q, flight_num = FlightNum, timer_map = TimerMap}) ->
|
%% @doc Handling all non call/cast messages
|
||||||
case queue:out(Q) of
|
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||||
{{value, EventData = #event_data{id = Id}}, Q1} ->
|
{noreply, NewState :: #state{}} |
|
||||||
lager:debug("[iot_http_endpoint] fetch_next success, event data is: ~p", [EventData]),
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
catch do_post(EventData, State),
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}),
|
handle_info({ack, Id, Body, RespBody}, State = #state{timer_map = TimerMap, logger_pid = LoggerPid}) ->
|
||||||
|
%% 记录日志
|
||||||
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap), queue = Q1, flight_num = FlightNum + 1}};
|
iot_logger:write(LoggerPid, [Body, RespBody]),
|
||||||
{empty, Q1} ->
|
|
||||||
{keep_state, State#state{queue = Q1}}
|
|
||||||
end;
|
|
||||||
|
|
||||||
%% 收到确认消息
|
|
||||||
handle_event(info, {ack, Id}, _, State = #state{timer_map = TimerMap, acc_num = AccNum, flight_num = FlightNum}) ->
|
|
||||||
case maps:take(Id, TimerMap) of
|
case maps:take(Id, TimerMap) of
|
||||||
error ->
|
error ->
|
||||||
{keep_state, State#state{acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]};
|
{noreply, State};
|
||||||
{TimerRef, NTimerMap} ->
|
{TimerRef, NTimerMap} ->
|
||||||
is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef),
|
is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef),
|
||||||
{keep_state, State#state{timer_map = NTimerMap, acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]}
|
{noreply, State#state{timer_map = NTimerMap}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% 收到重发过期请求
|
handle_info({timeout, _, {repost_ticker, Id, ReqBody}}, State = #state{timer_map = TimerMap, url = Url}) ->
|
||||||
handle_event(info, {timeout, _, {repost_ticker, EventData = #event_data{id = Id}}}, _, State = #state{timer_map = TimerMap}) ->
|
lager:debug("[iot_jinzhi_endpoint] repost data: ~p", [ReqBody]),
|
||||||
lager:debug("[iot_jinzhi_endpoint] repost data: ~p", [EventData]),
|
catch do_post(Url, Id, ReqBody),
|
||||||
catch do_post(EventData, State),
|
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Id, ReqBody}),
|
||||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}),
|
|
||||||
|
|
||||||
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}};
|
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}};
|
||||||
|
|
||||||
%% Task进程挂掉
|
handle_info({'DOWN', _MRef, process, _Pid, normal}, State) ->
|
||||||
handle_event(info, {'DOWN', _MRef, process, _Pid, normal}, _, State) ->
|
{noreply, State};
|
||||||
{keep_state, State};
|
|
||||||
|
|
||||||
handle_event(info, {'DOWN', _MRef, process, _Pid, Reason}, _, State) ->
|
handle_info({'DOWN', _MRef, process, _Pid, Reason}, State) ->
|
||||||
lager:notice("[iot_jinzhi_endpoint] task process down with reason: ~p", [Reason]),
|
lager:notice("[iot_jinzhi_endpoint] task process down with reason: ~p", [Reason]),
|
||||||
{keep_state, State};
|
{noreply, State}.
|
||||||
|
|
||||||
%% 获取当前统计信息
|
|
||||||
handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) ->
|
|
||||||
Stat = #{
|
|
||||||
<<"acc_num">> => AccNum,
|
|
||||||
<<"queue_num">> => mnesia_queue:table_size(),
|
|
||||||
<<"state_name">> => atom_to_binary(StateName)
|
|
||||||
},
|
|
||||||
{keep_state, State, [{reply, From, Stat}]};
|
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc If callback_mode is handle_event_function, then whenever a
|
%% @doc This function is called by a gen_server when it is about to
|
||||||
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
|
||||||
%% process message, this function is called.
|
|
||||||
handle_event(EventType, Event, StateName, State) ->
|
|
||||||
lager:warning("[iot_jinzhi_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]),
|
|
||||||
{keep_state, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc This function is called by a gen_statem when it is about to
|
|
||||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||||
%% necessary cleaning up. When it returns, the gen_statem terminates with
|
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||||
%% Reason. The return value is ignored.
|
%% with Reason. The return value is ignored.
|
||||||
terminate(Reason, _StateName, #state{}) ->
|
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||||
lager:debug("[iot_jinzhi_endpoint] terminate with reason: ~p", [Reason]),
|
State :: #state{}) -> term()).
|
||||||
|
terminate(_Reason, _State = #state{}) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Convert process state when code is changed
|
%% @doc Convert process state when code is changed
|
||||||
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||||
{ok, StateName, State}.
|
Extra :: term()) ->
|
||||||
|
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||||
|
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
-spec do_post(Url :: string(), Id :: integer(), Body :: binary()) -> no_return().
|
||||||
-spec do_post(EventData :: #event_data{}, State :: #state{}) -> no_return().
|
do_post(Url, Id, Body) when is_list(Url), is_integer(Id), is_binary(Body) ->
|
||||||
do_post(#event_data{id = Id, location_code = LocationCode, dynamic_location_code = DynamicLocationCode, event_type = EventType,
|
|
||||||
params = Params = #{<<"event_code">> := EventCode, <<"description">> := Description, <<"datetime">> := Datetime, <<"attachments">> := Attachments0}},
|
|
||||||
#state{pri_key = PriKey, url = Url, logger_pid = LoggerPid}) ->
|
|
||||||
|
|
||||||
% <<"occurrenceTime">> => <<"2023-06-10 12:00:00">>,
|
|
||||||
|
|
||||||
Attachments = lists:map(fun(#{<<"filename">> := Filename}) ->
|
|
||||||
{ok, FileUrl} = iot_util:file_uri(Filename),
|
|
||||||
Name = filename:basename(FileUrl),
|
|
||||||
#{<<"name">> => Name, <<"url">> => FileUrl}
|
|
||||||
end, Attachments0),
|
|
||||||
|
|
||||||
DeviceInfo = #{
|
|
||||||
<<"location">> => LocationCode,
|
|
||||||
<<"dynamic_location">> => DynamicLocationCode,
|
|
||||||
<<"category">> => EventCode,
|
|
||||||
<<"description">> => Description,
|
|
||||||
<<"occurrenceTime">> => Datetime,
|
|
||||||
<<"attachments">> => Attachments
|
|
||||||
},
|
|
||||||
|
|
||||||
ReqData = #{
|
|
||||||
<<"sign">> => sign(DeviceInfo, PriKey),
|
|
||||||
<<"sysId">> => ?SYS_ID,
|
|
||||||
<<"deviceInfo">> => DeviceInfo
|
|
||||||
},
|
|
||||||
Body = iolist_to_binary(jiffy:encode(ReqData, [force_utf8])),
|
|
||||||
|
|
||||||
lager:debug("[iot_jinzhi_endpoint] do_post url: ~p, event_type: ~p, params: ~p, location_code: ~p, body: ~p", [Url, EventType, Params, LocationCode, Body]),
|
|
||||||
|
|
||||||
ReceiverPid = self(),
|
ReceiverPid = self(),
|
||||||
%% 异步提交任务
|
%% 异步提交任务
|
||||||
spawn_monitor(fun() ->
|
spawn_monitor(fun() ->
|
||||||
@ -216,8 +170,7 @@ do_post(#event_data{id = Id, location_code = LocationCode, dynamic_location_code
|
|||||||
{ok, 200, _, ClientRef} ->
|
{ok, 200, _, ClientRef} ->
|
||||||
{ok, RespBody} = hackney:body(ClientRef),
|
{ok, RespBody} = hackney:body(ClientRef),
|
||||||
hackney:close(ClientRef),
|
hackney:close(ClientRef),
|
||||||
iot_logger:write(LoggerPid, [Body, RespBody]),
|
ReceiverPid ! {ack, Id, Body, RespBody};
|
||||||
ReceiverPid ! {ack, Id};
|
|
||||||
{ok, HttpCode, _, ClientRef} ->
|
{ok, HttpCode, _, ClientRef} ->
|
||||||
{ok, RespBody} = hackney:body(ClientRef),
|
{ok, RespBody} = hackney:body(ClientRef),
|
||||||
hackney:close(ClientRef),
|
hackney:close(ClientRef),
|
||||||
@ -227,6 +180,39 @@ do_post(#event_data{id = Id, location_code = LocationCode, dynamic_location_code
|
|||||||
end
|
end
|
||||||
end).
|
end).
|
||||||
|
|
||||||
|
%% 格式话要发送的数据,避免多次格式化处理
|
||||||
|
-spec format_events(LocationCode :: binary(), DynamicLocationCode :: binary(), Events :: list(), PriKey :: public_key:private_key()) -> binary().
|
||||||
|
format_events(LocationCode, DynamicLocationCode, Events, PriKey) ->
|
||||||
|
% <<"occurrenceTime">> => <<"2023-06-10 12:00:00">>,
|
||||||
|
%% 格式转换
|
||||||
|
TextEvents = lists:map(fun({_EventType, #{<<"event_code">> := EventCode, <<"description">> := Description, <<"datetime">> := Datetime, <<"attachments">> := Attachments0}}) ->
|
||||||
|
Attachments = lists:map(fun(#{<<"filename">> := Filename}) ->
|
||||||
|
{ok, FileUrl} = iot_util:file_uri(Filename),
|
||||||
|
Name = filename:basename(FileUrl),
|
||||||
|
#{<<"name">> => Name, <<"url">> => FileUrl}
|
||||||
|
end, Attachments0),
|
||||||
|
|
||||||
|
#{
|
||||||
|
<<"category">> => EventCode,
|
||||||
|
<<"description">> => Description,
|
||||||
|
<<"occurrenceTime">> => Datetime,
|
||||||
|
<<"attachments">> => Attachments
|
||||||
|
}
|
||||||
|
end, Events),
|
||||||
|
|
||||||
|
DeviceInfo = #{
|
||||||
|
<<"location">> => LocationCode,
|
||||||
|
<<"dynamic_location">> => DynamicLocationCode,
|
||||||
|
<<"events">> => TextEvents
|
||||||
|
},
|
||||||
|
|
||||||
|
ReqData = #{
|
||||||
|
<<"sign">> => sign(DeviceInfo, PriKey),
|
||||||
|
<<"sysId">> => ?SYS_ID,
|
||||||
|
<<"deviceInfo">> => DeviceInfo
|
||||||
|
},
|
||||||
|
iolist_to_binary(jiffy:encode(ReqData, [force_utf8])).
|
||||||
|
|
||||||
-spec generate_private_key(PriFile :: string()) -> public_key:private_key().
|
-spec generate_private_key(PriFile :: string()) -> public_key:private_key().
|
||||||
generate_private_key(PriFile) when is_list(PriFile) ->
|
generate_private_key(PriFile) when is_list(PriFile) ->
|
||||||
PriKeyFile = code:priv_dir(iot) ++ "/" ++ PriFile,
|
PriKeyFile = code:priv_dir(iot) ++ "/" ++ PriFile,
|
||||||
|
|||||||
@ -24,3 +24,15 @@ route_uuid(RouterUUID, EventType, Params) when is_binary(RouterUUID), is_integer
|
|||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:debug("[iot_ai_router] the event_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason])
|
lager:debug("[iot_ai_router] the event_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec batch_route_uuid(RouterUUID :: binary(), Events :: list()) -> no_return().
|
||||||
|
batch_route_uuid(RouterUUID, Events) when is_binary(RouterUUID), is_list(Events) ->
|
||||||
|
%% 查找终端设备对应的点位信息
|
||||||
|
case redis_client:hgetall(RouterUUID) of
|
||||||
|
{ok, #{<<"location_code">> := LocationCode, <<"dynamic_location_code">> := DynamicLocationCode}} when is_binary(LocationCode), is_binary(DynamicLocationCode) ->
|
||||||
|
iot_jinzhi_endpoint:batch_forward(LocationCode, DynamicLocationCode, Events);
|
||||||
|
{ok, _} ->
|
||||||
|
lager:debug("[iot_ai_router] the event_data hget location_code, uuid: ~p, not found", [RouterUUID]);
|
||||||
|
{error, Reason} ->
|
||||||
|
lager:debug("[iot_ai_router] the event_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason])
|
||||||
|
end.
|
||||||
@ -32,7 +32,7 @@
|
|||||||
auth_status :: integer(),
|
auth_status :: integer(),
|
||||||
status = ?DEVICE_OFFLINE,
|
status = ?DEVICE_OFFLINE,
|
||||||
%% 事件分组
|
%% 事件分组
|
||||||
ai_groups = #{}
|
publisher_pid :: pid()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
@ -141,7 +141,8 @@ init([DeviceUUID]) when is_binary(DeviceUUID) ->
|
|||||||
ignore
|
ignore
|
||||||
end;
|
end;
|
||||||
init([#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}]) ->
|
init([#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}]) ->
|
||||||
{ok, #state{device_uuid = DeviceUUID, status = as_integer(Status), auth_status = as_integer(AuthorizeStatus)}}.
|
{ok, PublisherPid} = iot_event_publisher:start_link(DeviceUUID),
|
||||||
|
{ok, #state{device_uuid = DeviceUUID, publisher_pid = PublisherPid, status = as_integer(Status), auth_status = as_integer(AuthorizeStatus)}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling call messages
|
%% @doc Handling call messages
|
||||||
@ -235,17 +236,9 @@ handle_cast({auth, false}, State = #state{device_uuid = DeviceUUID}) ->
|
|||||||
{noreply, State#state{auth_status = 0}};
|
{noreply, State#state{auth_status = 0}};
|
||||||
|
|
||||||
%% ai事件的延迟整流逻辑
|
%% ai事件的延迟整流逻辑
|
||||||
handle_cast({ai_event, EventType, Params}, State = #state{ai_groups = Groups, device_uuid = DeviceUUID}) ->
|
handle_cast({ai_event, EventType, Params}, State = #state{publisher_pid = PublisherPid}) ->
|
||||||
GroupKey = group_by(EventType, Params),
|
|
||||||
case maps:find(GroupKey, Groups) of
|
|
||||||
{ok, PublisherPid} ->
|
|
||||||
iot_event_publisher:publish(PublisherPid, EventType, Params),
|
iot_event_publisher:publish(PublisherPid, EventType, Params),
|
||||||
{noreply, State};
|
{noreply, State}.
|
||||||
error ->
|
|
||||||
{ok, PublisherPid} = iot_event_publisher:start_link(DeviceUUID, GroupKey),
|
|
||||||
iot_event_publisher:publish(PublisherPid, EventType, Params),
|
|
||||||
{noreply, State#state{ai_groups = maps:put(GroupKey, PublisherPid, Groups)}}
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling all non call/cast messages
|
%% @doc Handling all non call/cast messages
|
||||||
@ -326,7 +319,3 @@ as_integer(Val) when is_integer(Val) ->
|
|||||||
Val;
|
Val;
|
||||||
as_integer(Val) when is_binary(Val) ->
|
as_integer(Val) when is_binary(Val) ->
|
||||||
binary_to_integer(Val).
|
binary_to_integer(Val).
|
||||||
|
|
||||||
%% 事件分组函数
|
|
||||||
group_by(EventType, _Params) ->
|
|
||||||
EventType.
|
|
||||||
@ -12,22 +12,23 @@
|
|||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/2, publish/3, set_throttle/2]).
|
-export([start_link/1, publish/3]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
|
%% 单个任务
|
||||||
|
-record(task, {
|
||||||
|
counter = 0,
|
||||||
|
%% 缓存周期内的数据
|
||||||
|
buffer = []
|
||||||
|
}).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
device_uuid :: binary(),
|
device_uuid :: binary(),
|
||||||
timer_ref = undefined :: undefined | reference(),
|
group_tasks = #{} :: map()
|
||||||
group_key :: any(),
|
|
||||||
%% 已经发送的事件的计数器
|
|
||||||
counter = 0,
|
|
||||||
last_event = undefined :: any(),
|
|
||||||
%% 最后一次发送数据的时间戳, 如果数据跨越了多个时间跨度时也需要立即发送出去
|
|
||||||
last_timestamp = 0 :: integer()
|
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
@ -38,15 +39,11 @@
|
|||||||
publish(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_map(Params) ->
|
publish(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_map(Params) ->
|
||||||
gen_server:cast(Pid, {publish, EventType, Params}).
|
gen_server:cast(Pid, {publish, EventType, Params}).
|
||||||
|
|
||||||
-spec set_throttle(Pid :: pid(), ThrottleTime :: integer()) -> no_return().
|
|
||||||
set_throttle(Pid, ThrottleTime) when is_pid(Pid), is_integer(ThrottleTime), ThrottleTime > 0 ->
|
|
||||||
gen_server:cast(Pid, {set_throttle, ThrottleTime}).
|
|
||||||
|
|
||||||
%% @doc Spawns the server and registers the local name (unique)
|
%% @doc Spawns the server and registers the local name (unique)
|
||||||
-spec(start_link(DeviceUUID :: binary(), GroupKey :: any()) ->
|
-spec(start_link(DeviceUUID :: binary()) ->
|
||||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||||
start_link(DeviceUUID, GroupKey) when is_binary(DeviceUUID) ->
|
start_link(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||||
gen_server:start_link(?MODULE, [DeviceUUID, GroupKey], []).
|
gen_server:start_link(?MODULE, [DeviceUUID], []).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
@ -57,8 +54,8 @@ start_link(DeviceUUID, GroupKey) when is_binary(DeviceUUID) ->
|
|||||||
-spec(init(Args :: term()) ->
|
-spec(init(Args :: term()) ->
|
||||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term()} | ignore).
|
{stop, Reason :: term()} | ignore).
|
||||||
init([DeviceUUID, GroupKey]) ->
|
init([DeviceUUID]) ->
|
||||||
{ok, #state{device_uuid = DeviceUUID, group_key = GroupKey, last_timestamp = iot_util:timestamp_of_seconds(), last_event = undefined}}.
|
{ok, #state{device_uuid = DeviceUUID}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling call messages
|
%% @doc Handling call messages
|
||||||
@ -80,29 +77,21 @@ handle_call(_Request, _From, State = #state{}) ->
|
|||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
%% 第一次收到消息,则立即发送; 并且重置定时器
|
%% 第一次收到消息,则立即发送; 并且重置定时器
|
||||||
handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, group_key = GroupKey, counter = 0}) ->
|
handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, group_tasks = GroupTasks}) ->
|
||||||
%% 重置定时器
|
GroupKey = group_by(EventType, Params),
|
||||||
ThrottleTime = iot_event_period_settings:get_throttle(GroupKey),
|
|
||||||
TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker),
|
|
||||||
%% 发送消息
|
|
||||||
iot_ai_router:route_uuid(DeviceUUID, EventType, Params),
|
|
||||||
{noreply, State#state{counter = 1, timer_ref = TimerRef, last_timestamp = iot_util:timestamp_of_seconds()}};
|
|
||||||
|
|
||||||
%% 缓冲数据,等到时间到了再发送
|
case maps:find(GroupKey, GroupTasks) of
|
||||||
handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, timer_ref = TimerRef, counter = Counter, last_timestamp = LastTimestamp, group_key = GroupKey}) ->
|
{ok, Task0 = #task{counter = Counter, buffer = Buffer}} ->
|
||||||
ThrottleTime = iot_event_period_settings:get_throttle(GroupKey),
|
Task = Task0#task{counter = Counter + 1, buffer = [{EventType, Params}|Buffer]},
|
||||||
Timestamp = iot_util:timestamp_of_seconds(),
|
{noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}};
|
||||||
%% 如果数据发送间隔已经超过了一个时间跨度,也需要立即发送
|
error ->
|
||||||
case Timestamp > LastTimestamp + ThrottleTime of
|
|
||||||
true ->
|
|
||||||
%% 重置定时器
|
%% 重置定时器
|
||||||
is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef),
|
ThrottleTime = iot_event_period_settings:get_throttle(GroupKey),
|
||||||
NTimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker),
|
erlang:start_timer(ThrottleTime * 1000, self(), {throttle_ticker, GroupKey}),
|
||||||
%% 发送消息
|
%% 发送消息
|
||||||
iot_ai_router:route_uuid(DeviceUUID, EventType, Params),
|
iot_ai_router:route_uuid(DeviceUUID, EventType, Params),
|
||||||
{noreply, State#state{counter = Counter + 1, timer_ref = NTimerRef, last_timestamp = iot_util:timestamp_of_seconds()}};
|
Task = #task{buffer = [], counter = 1},
|
||||||
false ->
|
{noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}}
|
||||||
{noreply, State#state{last_event = {EventType, Params}}}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
@ -111,18 +100,21 @@ handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUI
|
|||||||
{noreply, NewState :: #state{}} |
|
{noreply, NewState :: #state{}} |
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
%% 周期时间内没有任何数据
|
|
||||||
handle_info({timeout, _, throttle_ticker}, State = #state{group_key = GroupKey, last_event = undefined}) ->
|
|
||||||
ThrottleTime = iot_event_period_settings:get_throttle(GroupKey),
|
|
||||||
TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker),
|
|
||||||
{noreply, State#state{timer_ref = TimerRef}};
|
|
||||||
|
|
||||||
%% 已经发送的数据也需要重置,避免推送重复的数据
|
handle_info({timeout, _, {throttle_ticker, GroupKey}}, State = #state{device_uuid = DeviceUUID, group_tasks = GroupTasks}) ->
|
||||||
handle_info({timeout, _, throttle_ticker}, State = #state{device_uuid = DeviceUUID, group_key = GroupKey, last_event = {EventType, Params}}) ->
|
|
||||||
ThrottleTime = iot_event_period_settings:get_throttle(GroupKey),
|
ThrottleTime = iot_event_period_settings:get_throttle(GroupKey),
|
||||||
TimerRef = erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker),
|
erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker),
|
||||||
iot_ai_router:route_uuid(DeviceUUID, EventType, Params),
|
|
||||||
{noreply, State#state{timer_ref = TimerRef, last_event = undefined, last_timestamp = iot_util:timestamp_of_seconds()}}.
|
case maps:find(GroupKey, GroupTasks) of
|
||||||
|
{ok, Task0 = #task{buffer = Buffer}} ->
|
||||||
|
Events = lists:reverse(Buffer),
|
||||||
|
iot_ai_router:route_uuid(DeviceUUID, Events),
|
||||||
|
|
||||||
|
Task = Task0#task{buffer = []},
|
||||||
|
{noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}};
|
||||||
|
error ->
|
||||||
|
{noreply, State}
|
||||||
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc This function is called by a gen_server when it is about to
|
%% @doc This function is called by a gen_server when it is about to
|
||||||
@ -145,3 +137,7 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
|
|||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
|
%% 事件分组函数
|
||||||
|
group_by(EventType, _Params) ->
|
||||||
|
EventType.
|
||||||
Loading…
x
Reference in New Issue
Block a user