Merge pull request 'ai_group_mst' (#3) from ai_group_mst into master
Reviewed-on: http://101.43.184.190:3000/alc/iot/pulls/3
This commit is contained in:
commit
d4130f94b4
@ -1,41 +1,45 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @author anlicheng
|
||||
%%% @copyright (C) 2024, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 06. 7月 2023 12:02
|
||||
%%% Created : 23. 7月 2024 14:51
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(iot_jinzhi_endpoint).
|
||||
-author("aresei").
|
||||
-include("iot.hrl").
|
||||
-author("anlicheng").
|
||||
|
||||
-behaviour(gen_statem).
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
-export([get_pid/0, forward/4, get_stat/0]).
|
||||
|
||||
%% gen_statem callbacks
|
||||
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||
-export([get_pid/0, batch_forward/3]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
%% 消息重发间隔
|
||||
-define(RETRY_INTERVAL, 5000).
|
||||
%% 系统id
|
||||
-define(SYS_ID, <<"ZNWLZJJKXT">>).
|
||||
|
||||
-record(task, {
|
||||
id :: integer(),
|
||||
location_code :: binary(),
|
||||
dynamic_location_code :: binary(),
|
||||
events =[] :: list()
|
||||
}).
|
||||
|
||||
-record(state, {
|
||||
id = 1,
|
||||
url :: string(),
|
||||
logger_pid :: pid(),
|
||||
pool_size = 0,
|
||||
flight_num = 0,
|
||||
pri_key :: public_key:private_key(),
|
||||
id = 1,
|
||||
queue :: queue:queue(),
|
||||
%% 定时器对应关系
|
||||
timer_map = #{},
|
||||
%% 记录成功处理的消息数
|
||||
acc_num = 0
|
||||
timer_map = #{}
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
@ -46,166 +50,116 @@
|
||||
get_pid() ->
|
||||
whereis(?MODULE).
|
||||
|
||||
-spec forward(LocationCode :: binary(), DynamicLocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return().
|
||||
forward(LocationCode, DynamicLocationCode, EventType, Params) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_integer(EventType), is_map(Params) ->
|
||||
gen_statem:cast(?MODULE, {forward, LocationCode, DynamicLocationCode, EventType, Params}).
|
||||
-spec batch_forward(LocationCode :: binary(), DynamicLocationCode :: binary(), Events :: list()) -> no_return().
|
||||
batch_forward(LocationCode, DynamicLocationCode, Events) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_list(Events) ->
|
||||
gen_server:cast(?MODULE, {batch_forward, LocationCode, DynamicLocationCode, Events}).
|
||||
|
||||
-spec get_stat() -> {ok, Stat :: #{}}.
|
||||
get_stat() ->
|
||||
gen_statem:call(?MODULE, get_stat, 5000).
|
||||
|
||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||
%% initialize. To ensure a synchronized start-up procedure, this
|
||||
%% function does not return until Module:init/1 has returned.
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link() ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link() ->
|
||||
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_statem callbacks
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||
%% process to initialize.
|
||||
%% @doc Initializes the server
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([]) ->
|
||||
{ok, Opts} = application:get_env(iot, jinzhi),
|
||||
|
||||
PoolSize = proplists:get_value(pool_size, Opts),
|
||||
PriFile = proplists:get_value(pri_key, Opts),
|
||||
Url = proplists:get_value(url, Opts),
|
||||
|
||||
{ok, LoggerPid} = iot_logger:start_link("ai_event_data"),
|
||||
PriKey = generate_private_key(PriFile),
|
||||
|
||||
{ok, connected, #state{url = Url, logger_pid = LoggerPid, pri_key = PriKey, pool_size = PoolSize, queue = queue:new()}}.
|
||||
{ok, #state{url = Url, logger_pid = LoggerPid, pri_key = PriKey}}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_statem when it needs to find out
|
||||
%% the callback mode of the callback module.
|
||||
callback_mode() ->
|
||||
handle_event_function.
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call(_Request, _From, State = #state{}) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc There should be one instance of this function for each possible
|
||||
%% state name. If callback_mode is state_functions, one of these
|
||||
%% functions is called when gen_statem receives and event from
|
||||
%% call/2, cast/2, or as a normal process message.
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast({batch_forward, LocationCode, DynamicLocationCode, Events}, State = #state{id = Id, timer_map = TimerMap, pri_key = PriKey, url = Url}) ->
|
||||
ReqBody = format_events(LocationCode, DynamicLocationCode, Events, PriKey),
|
||||
catch do_post(Url, Id, ReqBody),
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Id, ReqBody}),
|
||||
|
||||
handle_event(cast, {forward, LocationCode, DynamicLocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) ->
|
||||
EventData = #event_data{id = Id, location_code = LocationCode, dynamic_location_code = DynamicLocationCode, event_type = EventType, params = Params},
|
||||
%% 避免不必要的内部消息
|
||||
Actions = case FlightNum < PoolSize of
|
||||
true -> [{next_event, info, fetch_next}];
|
||||
false -> []
|
||||
end,
|
||||
{keep_state, State#state{queue = queue:in(EventData, Q), id = Id + 1}, Actions};
|
||||
{noreply, State#state{id = Id + 1, timer_map = maps:put(Id, TimerRef, TimerMap)}}.
|
||||
|
||||
%% 触发读取下一条数据
|
||||
handle_event(info, fetch_next, _, State = #state{queue = Q, flight_num = FlightNum, timer_map = TimerMap}) ->
|
||||
case queue:out(Q) of
|
||||
{{value, EventData = #event_data{id = Id}}, Q1} ->
|
||||
lager:debug("[iot_http_endpoint] fetch_next success, event data is: ~p", [EventData]),
|
||||
catch do_post(EventData, State),
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}),
|
||||
|
||||
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap), queue = Q1, flight_num = FlightNum + 1}};
|
||||
{empty, Q1} ->
|
||||
{keep_state, State#state{queue = Q1}}
|
||||
end;
|
||||
|
||||
%% 收到确认消息
|
||||
handle_event(info, {ack, Id}, _, State = #state{timer_map = TimerMap, acc_num = AccNum, flight_num = FlightNum}) ->
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info({ack, Id, Body, RespBody}, State = #state{timer_map = TimerMap, logger_pid = LoggerPid}) ->
|
||||
%% 记录日志
|
||||
iot_logger:write(LoggerPid, [Body, RespBody]),
|
||||
case maps:take(Id, TimerMap) of
|
||||
error ->
|
||||
{keep_state, State#state{acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]};
|
||||
{noreply, State};
|
||||
{TimerRef, NTimerMap} ->
|
||||
is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef),
|
||||
{keep_state, State#state{timer_map = NTimerMap, acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]}
|
||||
{noreply, State#state{timer_map = NTimerMap}}
|
||||
end;
|
||||
|
||||
%% 收到重发过期请求
|
||||
handle_event(info, {timeout, _, {repost_ticker, EventData = #event_data{id = Id}}}, _, State = #state{timer_map = TimerMap}) ->
|
||||
lager:debug("[iot_jinzhi_endpoint] repost data: ~p", [EventData]),
|
||||
catch do_post(EventData, State),
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}),
|
||||
handle_info({timeout, _, {repost_ticker, Id, ReqBody}}, State = #state{timer_map = TimerMap, url = Url}) ->
|
||||
lager:debug("[iot_jinzhi_endpoint] repost data: ~p", [ReqBody]),
|
||||
catch do_post(Url, Id, ReqBody),
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Id, ReqBody}),
|
||||
|
||||
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}};
|
||||
|
||||
%% Task进程挂掉
|
||||
handle_event(info, {'DOWN', _MRef, process, _Pid, normal}, _, State) ->
|
||||
{keep_state, State};
|
||||
handle_info({'DOWN', _MRef, process, _Pid, normal}, State) ->
|
||||
{noreply, State};
|
||||
|
||||
handle_event(info, {'DOWN', _MRef, process, _Pid, Reason}, _, State) ->
|
||||
handle_info({'DOWN', _MRef, process, _Pid, Reason}, State) ->
|
||||
lager:notice("[iot_jinzhi_endpoint] task process down with reason: ~p", [Reason]),
|
||||
{keep_state, State};
|
||||
|
||||
%% 获取当前统计信息
|
||||
handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) ->
|
||||
Stat = #{
|
||||
<<"acc_num">> => AccNum,
|
||||
<<"queue_num">> => mnesia_queue:table_size(),
|
||||
<<"state_name">> => atom_to_binary(StateName)
|
||||
},
|
||||
{keep_state, State, [{reply, From, Stat}]};
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc If callback_mode is handle_event_function, then whenever a
|
||||
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
||||
%% process message, this function is called.
|
||||
handle_event(EventType, Event, StateName, State) ->
|
||||
lager:warning("[iot_jinzhi_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]),
|
||||
{keep_state, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_statem when it is about to
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_statem terminates with
|
||||
%% Reason. The return value is ignored.
|
||||
terminate(Reason, _StateName, #state{}) ->
|
||||
lager:debug("[iot_jinzhi_endpoint] terminate with reason: ~p", [Reason]),
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(_Reason, _State = #state{}) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
||||
{ok, StateName, State}.
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
||||
-spec do_post(EventData :: #event_data{}, State :: #state{}) -> no_return().
|
||||
do_post(#event_data{id = Id, location_code = LocationCode, dynamic_location_code = DynamicLocationCode, event_type = EventType,
|
||||
params = Params = #{<<"event_code">> := EventCode, <<"description">> := Description, <<"datetime">> := Datetime, <<"attachments">> := Attachments0}},
|
||||
#state{pri_key = PriKey, url = Url, logger_pid = LoggerPid}) ->
|
||||
|
||||
% <<"occurrenceTime">> => <<"2023-06-10 12:00:00">>,
|
||||
|
||||
Attachments = lists:map(fun(#{<<"filename">> := Filename}) ->
|
||||
{ok, FileUrl} = iot_util:file_uri(Filename),
|
||||
Name = filename:basename(FileUrl),
|
||||
#{<<"name">> => Name, <<"url">> => FileUrl}
|
||||
end, Attachments0),
|
||||
|
||||
DeviceInfo = #{
|
||||
<<"location">> => LocationCode,
|
||||
<<"dynamic_location">> => DynamicLocationCode,
|
||||
<<"category">> => EventCode,
|
||||
<<"description">> => Description,
|
||||
<<"occurrenceTime">> => Datetime,
|
||||
<<"attachments">> => Attachments
|
||||
},
|
||||
|
||||
ReqData = #{
|
||||
<<"sign">> => sign(DeviceInfo, PriKey),
|
||||
<<"sysId">> => ?SYS_ID,
|
||||
<<"deviceInfo">> => DeviceInfo
|
||||
},
|
||||
Body = iolist_to_binary(jiffy:encode(ReqData, [force_utf8])),
|
||||
|
||||
lager:debug("[iot_jinzhi_endpoint] do_post url: ~p, event_type: ~p, params: ~p, location_code: ~p, body: ~p", [Url, EventType, Params, LocationCode, Body]),
|
||||
|
||||
-spec do_post(Url :: string(), Id :: integer(), Body :: binary()) -> no_return().
|
||||
do_post(Url, Id, Body) when is_list(Url), is_integer(Id), is_binary(Body) ->
|
||||
ReceiverPid = self(),
|
||||
%% 异步提交任务
|
||||
spawn_monitor(fun() ->
|
||||
@ -216,8 +170,7 @@ do_post(#event_data{id = Id, location_code = LocationCode, dynamic_location_code
|
||||
{ok, 200, _, ClientRef} ->
|
||||
{ok, RespBody} = hackney:body(ClientRef),
|
||||
hackney:close(ClientRef),
|
||||
iot_logger:write(LoggerPid, [Body, RespBody]),
|
||||
ReceiverPid ! {ack, Id};
|
||||
ReceiverPid ! {ack, Id, Body, RespBody};
|
||||
{ok, HttpCode, _, ClientRef} ->
|
||||
{ok, RespBody} = hackney:body(ClientRef),
|
||||
hackney:close(ClientRef),
|
||||
@ -227,6 +180,39 @@ do_post(#event_data{id = Id, location_code = LocationCode, dynamic_location_code
|
||||
end
|
||||
end).
|
||||
|
||||
%% 格式话要发送的数据,避免多次格式化处理
|
||||
-spec format_events(LocationCode :: binary(), DynamicLocationCode :: binary(), Events :: list(), PriKey :: public_key:private_key()) -> binary().
|
||||
format_events(LocationCode, DynamicLocationCode, Events, PriKey) ->
|
||||
% <<"occurrenceTime">> => <<"2023-06-10 12:00:00">>,
|
||||
%% 格式转换
|
||||
TextEvents = lists:map(fun({_EventType, #{<<"event_code">> := EventCode, <<"description">> := Description, <<"datetime">> := Datetime, <<"attachments">> := Attachments0}}) ->
|
||||
Attachments = lists:map(fun(#{<<"filename">> := Filename}) ->
|
||||
{ok, FileUrl} = iot_util:file_uri(Filename),
|
||||
Name = filename:basename(FileUrl),
|
||||
#{<<"name">> => Name, <<"url">> => FileUrl}
|
||||
end, Attachments0),
|
||||
|
||||
#{
|
||||
<<"category">> => EventCode,
|
||||
<<"description">> => Description,
|
||||
<<"occurrenceTime">> => Datetime,
|
||||
<<"attachments">> => Attachments
|
||||
}
|
||||
end, Events),
|
||||
|
||||
DeviceInfo = #{
|
||||
<<"location">> => LocationCode,
|
||||
<<"dynamic_location">> => DynamicLocationCode,
|
||||
<<"events">> => TextEvents
|
||||
},
|
||||
|
||||
ReqData = #{
|
||||
<<"sign">> => sign(DeviceInfo, PriKey),
|
||||
<<"sysId">> => ?SYS_ID,
|
||||
<<"deviceInfo">> => DeviceInfo
|
||||
},
|
||||
iolist_to_binary(jiffy:encode(ReqData, [force_utf8])).
|
||||
|
||||
-spec generate_private_key(PriFile :: string()) -> public_key:private_key().
|
||||
generate_private_key(PriFile) when is_list(PriFile) ->
|
||||
PriKeyFile = code:priv_dir(iot) ++ "/" ++ PriFile,
|
||||
@ -271,4 +257,4 @@ serialize([{K, V}|T], Target) ->
|
||||
<<$[, V0/binary, $]>>
|
||||
end,
|
||||
Item = <<$", K/binary, $", $:, V1/binary>>,
|
||||
serialize(T, [Item|Target]).
|
||||
serialize(T, [Item|Target]).
|
||||
@ -23,4 +23,16 @@ route_uuid(RouterUUID, EventType, Params) when is_binary(RouterUUID), is_integer
|
||||
lager:debug("[iot_ai_router] the event_data hget location_code, uuid: ~p, not found", [RouterUUID]);
|
||||
{error, Reason} ->
|
||||
lager:debug("[iot_ai_router] the event_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason])
|
||||
end.
|
||||
|
||||
-spec batch_route_uuid(RouterUUID :: binary(), Events :: list()) -> no_return().
|
||||
batch_route_uuid(RouterUUID, Events) when is_binary(RouterUUID), is_list(Events) ->
|
||||
%% 查找终端设备对应的点位信息
|
||||
case redis_client:hgetall(RouterUUID) of
|
||||
{ok, #{<<"location_code">> := LocationCode, <<"dynamic_location_code">> := DynamicLocationCode}} when is_binary(LocationCode), is_binary(DynamicLocationCode) ->
|
||||
iot_jinzhi_endpoint:batch_forward(LocationCode, DynamicLocationCode, Events);
|
||||
{ok, _} ->
|
||||
lager:debug("[iot_ai_router] the event_data hget location_code, uuid: ~p, not found", [RouterUUID]);
|
||||
{error, Reason} ->
|
||||
lager:debug("[iot_ai_router] the event_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason])
|
||||
end.
|
||||
@ -1,78 +1,25 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author anlicheng
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @copyright (C) 2024, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 24. 12月 2023 15:42
|
||||
%%% Created : 11. 7月 2024 17:01
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(iot_api).
|
||||
-author("anlicheng").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
-export([ai_event/1]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
-define(API_TOKEN, <<"wv6fGyBhl*7@AsD9">>).
|
||||
|
||||
-record(state, {
|
||||
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
%% API
|
||||
-export([ai_event/1, get_event_period_settings/0]).
|
||||
|
||||
-spec ai_event(Id :: integer()) -> no_return().
|
||||
ai_event(Id) when is_integer(Id) ->
|
||||
gen_server:cast(?MODULE, {ai_event, Id}).
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link() ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Initializes the server
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([]) ->
|
||||
{ok, #state{}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call(_Request, _From, State = #state{}) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast({ai_event, Id}, State = #state{}) ->
|
||||
spawn_monitor(fun() ->
|
||||
Task = fun() ->
|
||||
Token = iot_util:md5(<<?API_TOKEN/binary, (integer_to_binary(Id))/binary, ?API_TOKEN/binary>>),
|
||||
{ok, Url} = application:get_env(iot, api_url),
|
||||
{ok, Url0} = application:get_env(iot, api_url),
|
||||
Url = Url0 ++ "/api/v1/taskLog",
|
||||
|
||||
Headers = [
|
||||
{<<"content-type">>, <<"application/json">>}
|
||||
@ -94,45 +41,30 @@ handle_cast({ai_event, Id}, State = #state{}) ->
|
||||
{error, Reason} ->
|
||||
lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, Reason])
|
||||
end
|
||||
end),
|
||||
end,
|
||||
iot_task:submit(Task).
|
||||
|
||||
{noreply, State}.
|
||||
-spec get_event_period_settings() -> {ok, Resp :: binary()} | {error, Reason :: any()}.
|
||||
get_event_period_settings() ->
|
||||
Token = iot_util:md5(<<?API_TOKEN/binary, ?API_TOKEN/binary>>),
|
||||
{ok, Url0} = application:get_env(iot, api_url),
|
||||
Url = Url0 ++ "/api/v1/alertPeriod",
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
%% Task进程挂掉
|
||||
handle_info({'DOWN', _MRef, process, _Pid, normal}, State) ->
|
||||
{noreply, State};
|
||||
|
||||
handle_info({'DOWN', _MRef, process, _Pid, Reason}, State) ->
|
||||
lager:notice("[iot_api] task process down with reason: ~p", [Reason]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info(_Info, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(_Reason, _State = #state{}) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
Headers = [
|
||||
{<<"content-type">>, <<"application/json">>}
|
||||
],
|
||||
ReqData = #{
|
||||
<<"token">> => Token
|
||||
},
|
||||
Body = iolist_to_binary(jiffy:encode(ReqData, [force_utf8])),
|
||||
case hackney:request(post, Url, Headers, Body, [{pool, false}]) of
|
||||
{ok, 200, _, ClientRef} ->
|
||||
{ok, RespBody} = hackney:body(ClientRef),
|
||||
hackney:close(ClientRef),
|
||||
{ok, RespBody};
|
||||
{ok, HttpCode, _, ClientRef} ->
|
||||
hackney:close(ClientRef),
|
||||
{error, {http_error, HttpCode}};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
@ -1,39 +1,38 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @author anlicheng
|
||||
%%% @copyright (C) 2024, <COMPANY>
|
||||
%%% @doc
|
||||
%% 1. 终端是否授权 => 1: 授权,0: 未授权
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 14. 8月 2023 11:40
|
||||
%%% Created : 11. 7月 2024 11:33
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(iot_device).
|
||||
-author("aresei").
|
||||
-author("anlicheng").
|
||||
-include("iot.hrl").
|
||||
|
||||
-behaviour(gen_statem).
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([get_name/1, get_pid/1, serialize/1]).
|
||||
-export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2, data/2, query/5]).
|
||||
-export([ai_event/3]).
|
||||
|
||||
%% gen_statem callbacks
|
||||
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||
|
||||
%% 终端是否授权
|
||||
-define(DEVICE_AUTH_DENIED, 0).
|
||||
-define(DEVICE_AUTH_AUTHED, 1).
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
%% 存储数据的上限
|
||||
-define(MAX_SIZE, 2000).
|
||||
|
||||
%% 状态
|
||||
-define(STATE_DENIED, denied).
|
||||
-define(STATE_ACTIVATED, activated).
|
||||
|
||||
-record(state, {
|
||||
device_uuid :: binary(),
|
||||
%% 用来保存数据,作为存储在influxdb里面的数据的备份
|
||||
queue = queue:new(),
|
||||
status = ?DEVICE_OFFLINE
|
||||
%% 设备是否授权
|
||||
auth_status :: integer(),
|
||||
status = ?DEVICE_OFFLINE,
|
||||
%% 事件分组
|
||||
publisher_pid :: pid()
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
@ -50,6 +49,7 @@
|
||||
% "timestamp": int
|
||||
%}
|
||||
%],
|
||||
%% 用来保存在内存中的格式,不需要序列话处理 !!!, 放入到influxdb的数据是基于base64的
|
||||
-spec serialize(FieldsList :: [map()]) -> [Val :: map()].
|
||||
serialize(FieldsList) when is_list(FieldsList) ->
|
||||
lists:flatmap(fun serialize0/1, FieldsList).
|
||||
@ -87,46 +87,51 @@ get_name(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||
is_activated(undefined) ->
|
||||
false;
|
||||
is_activated(Pid) when is_pid(Pid) ->
|
||||
gen_statem:call(Pid, is_activated).
|
||||
gen_server:call(Pid, is_activated).
|
||||
|
||||
-spec change_status(Pid :: pid() | undefined, NewStatus :: integer()) -> no_return().
|
||||
change_status(undefined, _) ->
|
||||
ok;
|
||||
change_status(Pid, NewStatus) when is_pid(Pid), is_integer(NewStatus) ->
|
||||
gen_statem:cast(Pid, {change_status, NewStatus}).
|
||||
gen_server:cast(Pid, {change_status, NewStatus}).
|
||||
|
||||
-spec reload(Pid :: pid()) -> no_return().
|
||||
reload(Pid) when is_pid(Pid) ->
|
||||
gen_statem:cast(Pid, reload).
|
||||
gen_server:cast(Pid, reload).
|
||||
|
||||
-spec auth(Pid :: pid(), Auth :: boolean()) -> no_return().
|
||||
auth(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
|
||||
gen_statem:cast(Pid, {auth, Auth}).
|
||||
gen_server:cast(Pid, {auth, Auth}).
|
||||
|
||||
-spec data(Pid :: pid(), DataList :: [#device_data{}]) -> no_return().
|
||||
data(Pid, DataList) when is_pid(Pid), is_list(DataList) ->
|
||||
gen_statem:cast(Pid, {data, DataList}).
|
||||
gen_server:cast(Pid, {data, DataList}).
|
||||
|
||||
-spec query(Pid :: pid(), Tags :: map(), StartTs :: integer(), StopTs :: integer(), Limit :: integer()) -> {ok, [#device_data{}]}.
|
||||
query(Pid, Tags, StartTs, StopTs, Limit) when is_pid(Pid), is_map(Tags), is_integer(StartTs), is_integer(StopTs), is_integer(Limit), StartTs >= 0, StopTs >= 0, Limit >= 0 ->
|
||||
gen_statem:call(Pid, {query, Tags, StartTs, StopTs, Limit}).
|
||||
gen_server:call(Pid, {query, Tags, StartTs, StopTs, Limit}).
|
||||
|
||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||
%% initialize. To ensure a synchronized start-up procedure, this
|
||||
%% function does not return until Module:init/1 has returned.
|
||||
-spec ai_event(Pid :: pid(), EventType :: integer(), Params :: map()) -> no_return().
|
||||
ai_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_map(Params) ->
|
||||
gen_server:cast(Pid, {ai_event, EventType, Params}).
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link(Name :: atom(), DeviceUUID :: binary()) ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link(Name, DeviceUUID) when is_atom(Name), is_binary(DeviceUUID) ->
|
||||
gen_statem:start_link({local, Name}, ?MODULE, [DeviceUUID], []);
|
||||
gen_server:start_link({local, Name}, ?MODULE, [DeviceUUID], []);
|
||||
start_link(Name, DeviceInfo) when is_atom(Name), is_map(DeviceInfo) ->
|
||||
gen_statem:start_link({local, Name}, ?MODULE, [DeviceInfo], []).
|
||||
gen_server:start_link({local, Name}, ?MODULE, [DeviceInfo], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_statem callbacks
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||
%% process to initialize.
|
||||
%% @doc Initializes the server
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([DeviceUUID]) when is_binary(DeviceUUID) ->
|
||||
case device_bo:get_device_by_uuid(DeviceUUID) of
|
||||
{ok, DeviceInfo} ->
|
||||
@ -135,85 +140,26 @@ init([DeviceUUID]) when is_binary(DeviceUUID) ->
|
||||
lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]),
|
||||
ignore
|
||||
end;
|
||||
init([DeviceInfo = #{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}]) when is_map(DeviceInfo) ->
|
||||
case AuthorizeStatus =:= ?DEVICE_AUTH_AUTHED of
|
||||
true ->
|
||||
{ok, ?STATE_ACTIVATED, #state{device_uuid = DeviceUUID, status = Status}};
|
||||
false ->
|
||||
{ok, ?STATE_DENIED, #state{device_uuid = DeviceUUID, status = Status}}
|
||||
end.
|
||||
init([#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}]) ->
|
||||
{ok, PublisherPid} = iot_event_publisher:start_link(DeviceUUID),
|
||||
{ok, #state{device_uuid = DeviceUUID, publisher_pid = PublisherPid, status = as_integer(Status), auth_status = as_integer(AuthorizeStatus)}}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_statem when it needs to find out
|
||||
%% the callback mode of the callback module.
|
||||
callback_mode() ->
|
||||
handle_event_function.
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
%% 判断当前设备是否是激活状态
|
||||
handle_call(is_activated, _From, State = #state{auth_status = AuthStatus}) ->
|
||||
{reply, AuthStatus =:= 1, State};
|
||||
|
||||
%% @private
|
||||
%% @doc If callback_mode is handle_event_function, then whenever a
|
||||
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
||||
%% process message, this function is called.
|
||||
|
||||
%% 判断是否是激活状态
|
||||
handle_event({call, From}, is_activated, StateName, State = #state{}) ->
|
||||
{keep_state, State, [{reply, From, StateName =:= ?STATE_ACTIVATED}]};
|
||||
|
||||
%% 改变为在线状态,但是数据库中的状态已经是在线状态,忽略
|
||||
handle_event(cast, {change_status, ?DEVICE_ONLINE}, _, State = #state{status = ?DEVICE_ONLINE}) ->
|
||||
{keep_state, State};
|
||||
%% 改变数据库的状态, 其他情况下执行次数都很少
|
||||
handle_event(cast, {change_status, ?DEVICE_ONLINE}, _, State = #state{device_uuid = DeviceUUID}) ->
|
||||
{ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_ONLINE),
|
||||
report_event(DeviceUUID, ?DEVICE_ONLINE),
|
||||
{keep_state, State#state{status = ?DEVICE_ONLINE}};
|
||||
|
||||
handle_event(cast, {change_status, ?DEVICE_OFFLINE}, _, State = #state{device_uuid = DeviceUUID}) ->
|
||||
{ok, #{<<"status">> := Status}} = device_bo:get_device_by_uuid(DeviceUUID),
|
||||
case Status of
|
||||
?DEVICE_NOT_JOINED ->
|
||||
lager:debug("[iot_device] device: ~p, device_maybe_offline, not joined, can not change to offline", [DeviceUUID]),
|
||||
{keep_state, State#state{status = ?DEVICE_NOT_JOINED}};
|
||||
?DEVICE_OFFLINE ->
|
||||
lager:debug("[iot_device] device: ~p, device_maybe_offline, is offline, do nothing", [DeviceUUID]),
|
||||
{keep_state, State#state{status = ?DEVICE_OFFLINE}};
|
||||
?DEVICE_ONLINE ->
|
||||
{ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_OFFLINE),
|
||||
report_event(DeviceUUID, ?DEVICE_OFFLINE),
|
||||
{keep_state, State#state{status = ?DEVICE_OFFLINE}}
|
||||
end;
|
||||
|
||||
%% 重新加载数据库数据
|
||||
handle_event(cast, reload, _, State = #state{device_uuid = DeviceUUID}) ->
|
||||
lager:debug("[iot_device] will reload: ~p", [DeviceUUID]),
|
||||
case device_bo:get_device_by_uuid(DeviceUUID) of
|
||||
{ok, #{<<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}} ->
|
||||
case AuthorizeStatus =:= ?DEVICE_AUTH_AUTHED of
|
||||
true ->
|
||||
{next_state, ?STATE_ACTIVATED, State#state{status = Status}};
|
||||
false ->
|
||||
{next_state, ?STATE_DENIED, State#state{status = Status}}
|
||||
end;
|
||||
undefined ->
|
||||
lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]),
|
||||
{stop, normal, State}
|
||||
end;
|
||||
|
||||
%% 向设备中追加数据
|
||||
handle_event(cast, {data, DataList}, _, State = #state{queue = Q}) ->
|
||||
NQ = lists:foldl(fun(Data, Q0) ->
|
||||
case queue:len(Q0) > ?MAX_SIZE of
|
||||
true ->
|
||||
{_, Q1} = queue:out(Q0),
|
||||
queue:in(Data, Q1);
|
||||
false ->
|
||||
queue:in(Data, Q0)
|
||||
end
|
||||
end, Q, DataList),
|
||||
|
||||
{keep_state, State#state{queue = NQ}};
|
||||
|
||||
%% 查询device里面的数据
|
||||
handle_event({call, From}, {query, Tags, StartTs, StopTs, Limit}, _StateName, State = #state{queue = Q}) ->
|
||||
%% 查询当前设备中产生的数据,缓存在内存中
|
||||
handle_call({query, Tags, StartTs, StopTs, Limit}, _From, State = #state{queue = Q}) ->
|
||||
L = queue:to_list(Q),
|
||||
|
||||
%% 过滤
|
||||
@ -228,38 +174,99 @@ handle_event({call, From}, {query, Tags, StartTs, StopTs, Limit}, _StateName, St
|
||||
false ->
|
||||
lists:sublist(L3, 1, Limit)
|
||||
end,
|
||||
{keep_state, State, [{reply, From, {ok, L4}}]};
|
||||
|
||||
%% 处理授权
|
||||
handle_event(cast, {auth, Auth}, StateName, State = #state{device_uuid = DeviceUUID}) ->
|
||||
case {StateName, Auth} of
|
||||
{?STATE_DENIED, false} ->
|
||||
lager:debug("[iot_device] device_uuid: ~p, auth: false, will keep state_name: ~p", [DeviceUUID, ?STATE_DENIED]),
|
||||
{keep_state, State};
|
||||
{?STATE_DENIED, true} ->
|
||||
{next_state, ?STATE_ACTIVATED, State};
|
||||
|
||||
{?STATE_ACTIVATED, false} ->
|
||||
lager:debug("[iot_device] device_uuid: ~p, auth: false, state_name from: ~p, to: ~p", [DeviceUUID, ?STATE_ACTIVATED, ?STATE_DENIED]),
|
||||
{next_state, ?STATE_DENIED, State};
|
||||
{?STATE_ACTIVATED, true} ->
|
||||
lager:debug("[iot_device] device_uuid: ~p, auth: true, will keep state_name: ~p", [DeviceUUID, ?STATE_ACTIVATED]),
|
||||
{keep_state, State}
|
||||
end.
|
||||
{reply, {ok, L4}, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_statem when it is about to
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast({change_status, ?DEVICE_ONLINE}, State = #state{status = ?DEVICE_ONLINE}) ->
|
||||
{noreply, State};
|
||||
handle_cast({change_status, ?DEVICE_ONLINE}, State = #state{device_uuid = DeviceUUID}) ->
|
||||
{ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_ONLINE),
|
||||
report_event(DeviceUUID, ?DEVICE_ONLINE),
|
||||
{noreply, State#state{status = ?DEVICE_ONLINE}};
|
||||
handle_cast({change_status, ?DEVICE_OFFLINE}, State = #state{device_uuid = DeviceUUID}) ->
|
||||
{ok, #{<<"status">> := Status}} = device_bo:get_device_by_uuid(DeviceUUID),
|
||||
case Status of
|
||||
?DEVICE_NOT_JOINED ->
|
||||
lager:debug("[iot_device] device: ~p, device_maybe_offline, not joined, can not change to offline", [DeviceUUID]),
|
||||
{noreply, State#state{status = ?DEVICE_NOT_JOINED}};
|
||||
?DEVICE_OFFLINE ->
|
||||
lager:debug("[iot_device] device: ~p, device_maybe_offline, is offline, do nothing", [DeviceUUID]),
|
||||
{noreply, State#state{status = ?DEVICE_OFFLINE}};
|
||||
?DEVICE_ONLINE ->
|
||||
{ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_OFFLINE),
|
||||
report_event(DeviceUUID, ?DEVICE_OFFLINE),
|
||||
{noreply, State#state{status = ?DEVICE_OFFLINE}}
|
||||
end;
|
||||
|
||||
%% 重新加载数据库数据
|
||||
handle_cast(reload, State = #state{device_uuid = DeviceUUID}) ->
|
||||
lager:debug("[iot_device] will reload: ~p", [DeviceUUID]),
|
||||
case device_bo:get_device_by_uuid(DeviceUUID) of
|
||||
{ok, #{<<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}} ->
|
||||
{noreply, State#state{status = as_integer(Status), auth_status = as_integer(AuthorizeStatus)}};
|
||||
undefined ->
|
||||
lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]),
|
||||
{stop, normal, State}
|
||||
end;
|
||||
|
||||
%% 向设备中追加数据
|
||||
handle_cast({data, DataList}, State = #state{queue = Q}) ->
|
||||
NQ = lists:foldl(fun(Data, Q0) ->
|
||||
case queue:len(Q0) > ?MAX_SIZE of
|
||||
true ->
|
||||
{_, Q1} = queue:out(Q0),
|
||||
queue:in(Data, Q1);
|
||||
false ->
|
||||
queue:in(Data, Q0)
|
||||
end
|
||||
end, Q, DataList),
|
||||
{noreply, State#state{queue = NQ}};
|
||||
|
||||
%% 处理授权
|
||||
handle_cast({auth, true}, State = #state{device_uuid = DeviceUUID}) ->
|
||||
lager:debug("[iot_device] device_uuid: ~p, auth: true", [DeviceUUID]),
|
||||
{noreply, State#state{auth_status = 1}};
|
||||
handle_cast({auth, false}, State = #state{device_uuid = DeviceUUID}) ->
|
||||
lager:debug("[iot_device] device_uuid: ~p, auth: false", [DeviceUUID]),
|
||||
{noreply, State#state{auth_status = 0}};
|
||||
|
||||
%% ai事件的延迟整流逻辑
|
||||
handle_cast({ai_event, EventType, Params}, State = #state{publisher_pid = PublisherPid}) ->
|
||||
iot_event_publisher:publish(PublisherPid, EventType, Params),
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info(_Info, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_statem terminates with
|
||||
%% Reason. The return value is ignored.
|
||||
terminate(Reason, StateName, #state{device_uuid = DeviceUUID}) ->
|
||||
lager:notice("[iot_device] device_uuid: ~p, state_name: ~p, terminate with reason: ~p", [DeviceUUID, StateName, Reason]),
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(Reason, #state{device_uuid = DeviceUUID}) ->
|
||||
lager:notice("[iot_device] device_uuid: ~p, terminate with reason: ~p", [DeviceUUID, Reason]),
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
||||
{ok, StateName, State}.
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
@ -306,3 +313,9 @@ filter_tags(L, Tags) when map_size(Tags) =:= 0 ->
|
||||
lists:all(fun({TagName, TagVal}) -> maps:is_key(TagName, Tags0) andalso maps:get(TagName, Tags0) =:= TagVal end, maps:to_list(Tags))
|
||||
end, L).
|
||||
|
||||
%% 转换成整数,从数据读取的数据有时候不一定都是整数
|
||||
-spec as_integer(Val :: integer() | binary()) -> integer().
|
||||
as_integer(Val) when is_integer(Val) ->
|
||||
Val;
|
||||
as_integer(Val) when is_binary(Val) ->
|
||||
binary_to_integer(Val).
|
||||
151
apps/iot/src/iot_event_period_settings.erl
Normal file
151
apps/iot/src/iot_event_period_settings.erl
Normal file
@ -0,0 +1,151 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author anlicheng
|
||||
%%% @copyright (C) 2024, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 11. 7月 2024 15:54
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(iot_event_period_settings).
|
||||
-author("anlicheng").
|
||||
-include_lib("stdlib/include/qlc.hrl").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
-export([get_throttle/1, debug_info/0]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
-define(TAB_NAME, iot_ets_event_period).
|
||||
|
||||
%% 更新周期, 单位:秒
|
||||
-define(UPDATE_TICKER, 300).
|
||||
|
||||
%% 默认周期, 单位:秒
|
||||
-define(DEFAULT_THROTTLE, 300).
|
||||
|
||||
-record(state, {
|
||||
|
||||
}).
|
||||
|
||||
-record(period, {
|
||||
group_key :: any(),
|
||||
throttle :: integer()
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
debug_info() ->
|
||||
Q = qlc:q([E || E <- ets:table(?TAB_NAME)]),
|
||||
qlc:e(Q).
|
||||
|
||||
%% 获取设置的时间周期
|
||||
-spec get_throttle(GroupKey :: any()) -> integer().
|
||||
get_throttle(GroupKey) ->
|
||||
case ets:lookup(?TAB_NAME, GroupKey) of
|
||||
[] ->
|
||||
?DEFAULT_THROTTLE;
|
||||
[#period{throttle = Throttle}|_] ->
|
||||
Throttle
|
||||
end.
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link() ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Initializes the server
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([]) ->
|
||||
ets:new(?TAB_NAME, [public, set, named_table, {keypos, 2}]),
|
||||
settings(iot_api:get_event_period_settings()),
|
||||
erlang:start_timer(?UPDATE_TICKER * 1000, self(), update_ticker),
|
||||
{ok, #state{}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call(_Request, _From, State = #state{}) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast(_Request, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info({timeout, _, update_ticker}, State = #state{}) ->
|
||||
settings(iot_api:get_event_period_settings()),
|
||||
erlang:start_timer(?UPDATE_TICKER * 1000, self(), update_ticker),
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(_Reason, _State = #state{}) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
||||
-spec settings(tuple()) -> no_return().
|
||||
settings({ok, Resp}) when is_binary(Resp) ->
|
||||
case catch jiffy:decode(Resp, [return_maps]) of
|
||||
#{<<"code">> := 200, <<"data">> := Settings} when is_list(Settings) ->
|
||||
lists:foreach(fun(#{<<"event_code">> := GroupKey, <<"time_period">> := Throttle}) ->
|
||||
case is_integer(Throttle) andalso Throttle > 0 of
|
||||
true ->
|
||||
ets:insert(?TAB_NAME, #period{group_key = GroupKey, throttle = Throttle});
|
||||
false ->
|
||||
ok
|
||||
end
|
||||
end, Settings);
|
||||
Error ->
|
||||
lager:debug("[iot_event_period_settings] get event_period_settings from api get error: ~p", [Error])
|
||||
end;
|
||||
settings({error, Reason}) ->
|
||||
lager:debug("[iot_event_period_settings] get event_period_settings from api get error: ~p", [Reason]).
|
||||
143
apps/iot/src/iot_event_publisher.erl
Normal file
143
apps/iot/src/iot_event_publisher.erl
Normal file
@ -0,0 +1,143 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author anlicheng
|
||||
%%% @copyright (C) 2024, <COMPANY>
|
||||
%%% @doc
|
||||
%%% 事件发布器,提供基于时间周期的缓冲
|
||||
%%% @end
|
||||
%%% Created : 11. 7月 2024 14:40
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(iot_event_publisher).
|
||||
-author("anlicheng").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/1, publish/3]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
%% 单个任务
|
||||
-record(task, {
|
||||
counter = 0,
|
||||
%% 缓存周期内的数据
|
||||
buffer = []
|
||||
}).
|
||||
|
||||
-record(state, {
|
||||
device_uuid :: binary(),
|
||||
group_tasks = #{} :: map()
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
-spec publish(Pid :: pid(), EventType :: integer(), Params :: map()) -> no_return().
|
||||
publish(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_map(Params) ->
|
||||
gen_server:cast(Pid, {publish, EventType, Params}).
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link(DeviceUUID :: binary()) ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||
gen_server:start_link(?MODULE, [DeviceUUID], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Initializes the server
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([DeviceUUID]) ->
|
||||
{ok, #state{device_uuid = DeviceUUID}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call(_Request, _From, State = #state{}) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
%% 第一次收到消息,则立即发送; 并且重置定时器
|
||||
handle_cast({publish, EventType, Params}, State = #state{device_uuid = DeviceUUID, group_tasks = GroupTasks}) ->
|
||||
GroupKey = group_by(EventType, Params),
|
||||
|
||||
case maps:find(GroupKey, GroupTasks) of
|
||||
{ok, Task0 = #task{counter = Counter, buffer = Buffer}} ->
|
||||
Task = Task0#task{counter = Counter + 1, buffer = [{EventType, Params}|Buffer]},
|
||||
{noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}};
|
||||
error ->
|
||||
%% 重置定时器
|
||||
ThrottleTime = iot_event_period_settings:get_throttle(GroupKey),
|
||||
erlang:start_timer(ThrottleTime * 1000, self(), {throttle_ticker, GroupKey}),
|
||||
%% 发送消息
|
||||
iot_ai_router:route_uuid(DeviceUUID, EventType, Params),
|
||||
Task = #task{buffer = [], counter = 1},
|
||||
{noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}}
|
||||
end.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
|
||||
handle_info({timeout, _, {throttle_ticker, GroupKey}}, State = #state{device_uuid = DeviceUUID, group_tasks = GroupTasks}) ->
|
||||
ThrottleTime = iot_event_period_settings:get_throttle(GroupKey),
|
||||
erlang:start_timer(ThrottleTime * 1000, self(), throttle_ticker),
|
||||
|
||||
case maps:find(GroupKey, GroupTasks) of
|
||||
{ok, Task0 = #task{buffer = Buffer}} ->
|
||||
Events = lists:reverse(Buffer),
|
||||
iot_ai_router:route_uuid(DeviceUUID, Events),
|
||||
|
||||
Task = Task0#task{buffer = []},
|
||||
{noreply, State#state{group_tasks = maps:put(GroupKey, Task, GroupTasks)}};
|
||||
error ->
|
||||
{noreply, State}
|
||||
end.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(_Reason, _State = #state{}) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
||||
%% 事件分组函数
|
||||
group_by(EventType, _Params) ->
|
||||
EventType.
|
||||
@ -491,13 +491,14 @@ handle_event(cast, {handle, {ai_event, Event0}}, ?STATE_ACTIVATED, State = #stat
|
||||
Message = iolist_to_binary(jiffy:encode(Params, [force_utf8])),
|
||||
case ai_event_logs_bo:insert(UUID, DeviceUUID, SceneId, MicroId, EventType, Message) of
|
||||
{ok, LogId} ->
|
||||
iot_api:ai_event(LogId);
|
||||
catch iot_api:ai_event(LogId);
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
iot_device:change_status(DevicePid, ?DEVICE_ONLINE),
|
||||
|
||||
iot_ai_router:route_uuid(DeviceUUID, EventType, Params)
|
||||
%iot_ai_router:route_uuid(DeviceUUID, EventType, Params)
|
||||
iot_device:ai_event(DevicePid, EventType, Params)
|
||||
end;
|
||||
Event when is_map(Event) ->
|
||||
lager:warning("[iot_host] host: ~p, event: ~p, not supported", [UUID, Event]);
|
||||
|
||||
@ -29,12 +29,21 @@ init([]) ->
|
||||
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
|
||||
Specs = [
|
||||
#{
|
||||
id => 'iot_api',
|
||||
start => {'iot_api', start_link, []},
|
||||
id => 'iot_task',
|
||||
start => {'iot_task', start_link, []},
|
||||
restart => permanent,
|
||||
shutdown => 2000,
|
||||
type => supervisor,
|
||||
modules => ['iot_api']
|
||||
type => worker,
|
||||
modules => ['iot_task']
|
||||
},
|
||||
|
||||
#{
|
||||
id => 'iot_event_period_settings',
|
||||
start => {'iot_event_period_settings', start_link, []},
|
||||
restart => permanent,
|
||||
shutdown => 2000,
|
||||
type => worker,
|
||||
modules => ['iot_event_period_settings']
|
||||
},
|
||||
|
||||
#{
|
||||
|
||||
116
apps/iot/src/iot_task.erl
Normal file
116
apps/iot/src/iot_task.erl
Normal file
@ -0,0 +1,116 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author anlicheng
|
||||
%%% @copyright (C) 2024, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 11. 7月 2024 17:06
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(iot_task).
|
||||
-author("anlicheng").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
-export([submit/1, debug_info/0]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
-record(state, {
|
||||
counter = 0,
|
||||
pool_pid :: pid()
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
%% 提交异步任务
|
||||
-spec submit(Task :: fun()) -> no_return().
|
||||
submit(Task) when is_function(Task, 0) ->
|
||||
gen_server:cast(?SERVER, {submit, Task}).
|
||||
|
||||
debug_info() ->
|
||||
gen_server:cast(?SERVER, debug_info).
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link() ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Initializes the server
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([]) ->
|
||||
%% 启动工作的线程池
|
||||
{ok, PoolPid} = poolboy:start_link([{size, 10}, {max_overflow, 50}, {worker_module, iot_task_worker}], []),
|
||||
{ok, #state{pool_pid = PoolPid}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call(_Request, _From, State = #state{}) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast({submit, Task}, State = #state{pool_pid = PoolPid}) ->
|
||||
%% 限制异步任务的最大并发数, 避免进程被耗尽
|
||||
poolboy:transaction(PoolPid, fun(WorkerPid) -> iot_task_worker:execute(WorkerPid, Task) end),
|
||||
{noreply, State};
|
||||
handle_cast(debug_info, State = #state{counter = Counter}) ->
|
||||
lager:debug("[iot_task] execute task_num: ~p", [Counter]),
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info(_Info, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(_Reason, _State = #state{}) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
105
apps/iot/src/iot_task_worker.erl
Normal file
105
apps/iot/src/iot_task_worker.erl
Normal file
@ -0,0 +1,105 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author anlicheng
|
||||
%%% @copyright (C) 2024, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 12. 7月 2024 10:16
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(iot_task_worker).
|
||||
-author("anlicheng").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/1]).
|
||||
-export([execute/2]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
-record(state, {
|
||||
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
-spec execute(Pid :: pid(), Task :: fun()) -> ok.
|
||||
execute(Pid, Task) when is_pid(Pid), is_function(Task, 0) ->
|
||||
gen_server:call(Pid, {execute, Task}).
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link(Args :: list()) ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link(Args) when is_list(Args) ->
|
||||
gen_server:start_link(?MODULE, Args, []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Initializes the server
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init(_) ->
|
||||
{ok, #state{}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call({execute, Task}, _From, State = #state{}) ->
|
||||
catch Task(),
|
||||
{reply, ok, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast(_Request, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info(_Info, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(_Reason, _State = #state{}) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
@ -18,7 +18,7 @@
|
||||
{port, 18080}
|
||||
]},
|
||||
|
||||
{api_url, "http://39.98.184.67:8800/api/v1/taskLog"},
|
||||
{api_url, "http://39.98.184.67:8800"},
|
||||
|
||||
%% 目标服务器地址
|
||||
{emqx_server, [
|
||||
|
||||
@ -23,7 +23,7 @@
|
||||
{<<"test">>, <<"iot2023">>}
|
||||
]},
|
||||
|
||||
{api_url, "https://lgsiot.njau.edu.cn/api/v1/taskLog"},
|
||||
{api_url, "https://lgsiot.njau.edu.cn"},
|
||||
|
||||
%% 配置中电的数据转发, mqtt协议
|
||||
{zhongdian, [
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user