From 76edfcd11f4fbb3216d87c54f149aa0f385f1373 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 29 Apr 2025 15:05:12 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AE=80=E5=8C=96poll=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/iot/src/iot_device.erl | 34 +--- apps/iot/src/iot_device_poll_task.erl | 135 --------------- apps/iot/src/iot_host.erl | 53 ++++-- apps/iot/src/iot_host_device_poller.erl | 220 ++++++++++++++++++++++++ 4 files changed, 266 insertions(+), 176 deletions(-) delete mode 100644 apps/iot/src/iot_device_poll_task.erl create mode 100644 apps/iot/src/iot_host_device_poller.erl diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index c89f1fa..e44fdb1 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -37,9 +37,6 @@ %% 设备类型 model_id = 0, - %% 设备状态轮询周期 - poll_interval = 0 :: integer(), - poll_ref :: undefined | reference(), %% 设备的最后状态,避免重复更新数据库 edge_status = 0 :: integer(), @@ -131,13 +128,8 @@ init([DeviceInfo = #{<<"id">> := DeviceId, <<"device_uuid">> := DeviceUUID, <<"h EdgeStatus = maps:get(<<"edge_status">>, DeviceInfo, 0), - %% 轮询机制 - {ok, PollInterval0} = application:get_env(iot, device_poll_interval), - PollInterval = PollInterval0 * 1000, - erlang:start_timer(PollInterval, self(), poll_ticker), - {ok, #state{device_id = DeviceId, device_uuid = DeviceUUID, ai_event_throttle = AiEventThrottle, host_id = as_integer(HostId), edge_status = EdgeStatus, - status = as_integer(Status), auth_status = as_integer(AuthorizeStatus), model_id = as_integer(ModelId), poll_interval = PollInterval}}. + status = as_integer(Status), auth_status = as_integer(AuthorizeStatus), model_id = as_integer(ModelId)}}. %% @private %% @doc Handling call messages @@ -260,26 +252,8 @@ handle_cast({handle_data, Fields, Timestamp}, State = #state{device_uuid = Devic {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({timeout, _, poll_ticker}, State = #state{poll_interval = PollInterval, device_uuid = DeviceUUID, host_id = HostId}) -> - %% 开启下一次轮询 - erlang:start_timer(PollInterval, self(), poll_ticker), - - AliasName = iot_host:get_alias_name(HostId), - case global:whereis_name(AliasName) of - undefined -> - lager:warning("[iot_device] device_uuid: ~p, host not found", [DeviceUUID]), - {noreply, State}; - HostPid when is_pid(HostPid) -> - %% 建立临时任务,不阻塞当前进程 - Ref = make_ref(), - {ok, {TaskPid, _}} = iot_device_poll_task:start_monitor(), - iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, 10), - - {noreply, State#state{poll_ref = Ref}} - end; - %% 处理响应 -handle_info({poll_task_reply, Ref, {ok, EdgeStatus}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status, poll_ref = Ref}) -> +handle_info({poll_task_reply, {ok, EdgeStatus}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status}) -> case EdgeStatus =/= LastEdgeStatus of true -> lager:debug("[iot_device] device_uuid: ~p, poll edge_status is: ~p", [DeviceUUID, EdgeStatus]), @@ -291,7 +265,7 @@ handle_info({poll_task_reply, Ref, {ok, EdgeStatus}}, State = #state{device_uuid end; %% 通讯超时逻辑需要处理 -handle_info({poll_task_reply, Ref, {error, timeout}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status, poll_ref = Ref}) -> +handle_info({poll_task_reply, {error, timeout}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status}) -> lager:debug("[iot_device] device_uuid: ~p, poll timeout", [DeviceUUID]), EdgeStatus = -1, case EdgeStatus =/= LastEdgeStatus of @@ -304,7 +278,7 @@ handle_info({poll_task_reply, Ref, {error, timeout}}, State = #state{device_uuid end; %% 其他类型的错误暂时不更新数据库状态 -handle_info({poll_task_reply, Ref, {error, Reason}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, poll_ref = Ref}) -> +handle_info({poll_task_reply, {error, Reason}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus}) -> lager:notice("[iot_device] device_uuid: ~p, poll get error: ~p, last edge_status: ~p", [DeviceUUID, Reason, LastEdgeStatus]), {noreply, State}; diff --git a/apps/iot/src/iot_device_poll_task.erl b/apps/iot/src/iot_device_poll_task.erl deleted file mode 100644 index 585f80d..0000000 --- a/apps/iot/src/iot_device_poll_task.erl +++ /dev/null @@ -1,135 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author anlicheng -%%% @copyright (C) 2025, -%%% @doc -%%% -%%% @end -%%% Created : 10. 3月 2025 21:09 -%%%------------------------------------------------------------------- --module(iot_device_poll_task). --author("anlicheng"). - --behaviour(gen_server). - -%% API --export([start_monitor/0]). --export([poll_task/6]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - --record(state, {}). - -%%%=================================================================== -%%% API -%%%=================================================================== - --spec poll_task(Pid :: pid(), Ref :: reference(), HostPid :: pid(), ReceiverPid :: pid(), DeviceUUID :: binary(), Timeout :: integer()) -> no_return(). -poll_task(Pid, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout) - when is_pid(Pid), is_reference(Ref), is_pid(HostPid), is_pid(ReceiverPid), is_binary(DeviceUUID), is_integer(Timeout) -> - gen_server:cast(Pid, {poll_task, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout}). - -%% @doc Spawns the server and registers the local name (unique) --spec(start_monitor() -> - {'ok', {Pid :: pid(), MonRef :: reference()}} | 'ignore' | {'error', Reason :: term()}). -start_monitor() -> - gen_server:start_monitor(?MODULE, [], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([]) -> - {ok, #state{}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({poll_task, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout}, State = #state{}) -> - Command = #{ - <<"device_uuid">> => DeviceUUID, - <<"timeout">> => Timeout, - <<"command">> => <<"query_status">> - }, - BinCommand = iolist_to_binary(jiffy:encode(Command, [force_utf8])), - - case iot_host:publish_message(HostPid, 17, {aes, BinCommand}, Timeout * 1000) of - {ok, Reply} when is_binary(Reply) -> - case catch jiffy:decode(Reply, [return_maps]) of - %% 返回的是一个数组 - #{<<"result">> := ResponseList} when is_list(ResponseList) andalso length(ResponseList) > 0 -> - EdgeStatusList = lists:map(fun(#{<<"edge_status">> := EdgeStatus}) -> EdgeStatus end, ResponseList), - case lists:any(fun(S) -> S =:= 1 end, EdgeStatusList) of - true -> - ReceiverPid ! {poll_task_reply, Ref, {ok, 1}}; - false -> - case lists:all(fun(S) -> S =:= 0 end, EdgeStatusList) of - true -> - ReceiverPid ! {poll_task_reply, Ref, {ok, 0}}; - false -> - ReceiverPid ! {poll_task_reply, Ref, {ok, lists:min(EdgeStatusList)}} - end - end; - _ -> - ReceiverPid ! {poll_task_reply, Ref, {error, <<"invalid reply json">>}} - end; - {error, Reason} -> - ReceiverPid ! {poll_task_reply, Ref, {error, Reason}} - end, - {stop, normal, State}; -handle_cast(_Request, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info(_Info, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 26c26a1..f995d57 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -43,7 +43,10 @@ %% websocket相关 channel_pid :: undefined | pid(), %% 主机的相关信息 - metrics = #{} :: map() + metrics = #{} :: map(), + + %% 设备状态轮询器 + poller_pid :: pid() }). %%%=================================================================== @@ -199,10 +202,13 @@ init([UUID]) -> erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), StateName = case AuthorizeStatus =:= 1 of - true -> ?STATE_ACTIVATED; - false -> ?STATE_DENIED + true -> + ?STATE_ACTIVATED; + false -> + ?STATE_DENIED end, - {ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, name = Name, has_session = false}}; + {ok, PollerPid} = iot_host_device_poller:start_link(HostId, self()), + {ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, name = Name, poller_pid = PollerPid, has_session = false}}; undefined -> lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]), ignore @@ -305,25 +311,35 @@ handle_event({call, From}, {send_directive, Directive}, _, State = #state{uuid = {keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]}; %% 激活主机 -handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid}) when is_pid(ChannelPid) -> +handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid, poller_pid = PollerPid}) when is_pid(ChannelPid) -> BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]), ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>), lager:debug("[iot_host] uuid: ~p, activate: true, will send message: ~p", [UUID, BinReply]), + %% 启动轮询 + iot_host_device_poller:boot(PollerPid), + {next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]}; handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, channel_pid = undefined}) -> lager:debug("[iot_host] uuid: ~p, activate: true, no channel", [UUID]), {next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]}; %% 关闭授权 -handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_pid(ChannelPid) -> +handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, poller_pid = PollerPid}) when is_pid(ChannelPid) -> BinReply = jiffy:encode(#{<<"auth">> => false}, [force_utf8]), ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>), ws_channel:stop(ChannelPid, closed), lager:debug("[iot_host] uuid: ~p, activate: false, will send message: ~p", [UUID, BinReply]), + + %% 暂停轮询 + iot_host_device_poller:pause(PollerPid), + {next_state, ?STATE_DENIED, State#state{channel_pid = undefined, has_session = false}, [{reply, From, ok}]}; -handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = undefined}) -> +handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, poller_pid = PollerPid, channel_pid = undefined}) -> lager:debug("[iot_host] uuid: ~p, activate: false, no channel", [UUID]), + %% 暂停轮询 + iot_host_device_poller:pause(PollerPid), + {next_state, ?STATE_DENIED, State#state{has_session = false}, [{reply, From, ok}]}; %% 绑定channel @@ -337,7 +353,7 @@ handle_event({call, From}, {attach_channel, _}, _, State = #state{uuid = UUID, c {keep_state, State, [{reply, From, {error, <<"channel existed">>}}]}; %% 授权通过后,才能将主机的状态设置为在线状态 -handle_event({call, From}, {create_session, PubKey}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = Aes, name = Name}) -> +handle_event({call, From}, {create_session, PubKey}, ?STATE_ACTIVATED, State = #state{uuid = UUID, poller_pid = PollerPid, aes = Aes, name = Name}) -> Reply = #{<<"a">> => true, <<"aes">> => Aes}, EncReply = iot_cipher_rsa:encode(Reply, PubKey), {ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE), @@ -348,6 +364,9 @@ handle_event({call, From}, {create_session, PubKey}, ?STATE_ACTIVATED, State = # report_event(UUID, ?HOST_ONLINE), + %% 启动轮询 + iot_host_device_poller:boot(PollerPid), + lager:debug("[iot_host] host_id(session) uuid: ~p, create_session, will change status, affected_row: ~p", [UUID, AffectedRow]), {keep_state, State#state{has_session = true}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]}; @@ -358,30 +377,42 @@ handle_event({call, From}, {create_session, PubKey}, ?STATE_DENIED, State = #sta {keep_state, State#state{has_session = false}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]}; %% 重新加载设备信息 -handle_event({call, From}, {reload_device, DeviceUUID}, _, State) -> +handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{poller_pid = PollerPid}) -> case iot_device_sup:ensured_device_started(DeviceUUID) of {ok, DevicePid} -> iot_device:reload(DevicePid), + %% 增加设备 + iot_host_device_poller:add_device(PollerPid, DeviceUUID), + {keep_state, State, [{reply, From, ok}]}; {error, Reason} -> {keep_state, State, [{reply, From, {error, Reason}}]} end; %% 删除设备 -handle_event({call, From}, {delete_device, DeviceUUID}, _, State) -> +handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{poller_pid = PollerPid}) -> case iot_device:get_pid(DeviceUUID) of undefined -> ok; DevicePid when is_pid(DevicePid) -> + %% 删除设备 + iot_host_device_poller:delete_device(PollerPid, DeviceUUID), + iot_device_sup:delete_device(DeviceUUID) end, {keep_state, State, [{reply, From, ok}]}; %% 激活设备 -handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State) -> +handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State = #state{poller_pid = PollerPid}) -> case iot_device_sup:ensured_device_started(DeviceUUID) of {ok, DevicePid} -> iot_device:auth(DevicePid, Auth), + case Auth of + true -> + iot_host_device_poller:add_device(PollerPid, DeviceUUID); + false -> + iot_host_device_poller:delete_device(PollerPid, DeviceUUID) + end, {keep_state, State, [{reply, From, ok}]}; {error, Reason} -> diff --git a/apps/iot/src/iot_host_device_poller.erl b/apps/iot/src/iot_host_device_poller.erl new file mode 100644 index 0000000..1996550 --- /dev/null +++ b/apps/iot/src/iot_host_device_poller.erl @@ -0,0 +1,220 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 29. 4月 2025 12:19 +%%%------------------------------------------------------------------- +-module(iot_host_device_poller). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/2]). +-export([boot/1, pause/1, add_device/2, delete_device/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-define(STATUS_BOOTING, booting). +-define(STATUS_PAUSED, paused). + +-record(state, { + host_id :: integer(), + host_pid :: pid(), + devices :: [binary()], + status = ?STATUS_PAUSED, + %% 当前的任务集合 + tasks = [] +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +boot(Pid) when is_pid(Pid) -> + gen_server:cast(Pid, boot). + +pause(Pid) when is_pid(Pid) -> + gen_server:cast(Pid, pause). + +add_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) -> + gen_server:cast(Pid, {add_device, DeviceUUID}). + +delete_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) -> + gen_server:cast(Pid, {delete_device, DeviceUUID}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(HostId :: integer(), HostPid :: pid()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(HostId, HostPid) when is_integer(HostId), is_pid(HostPid) -> + gen_server:start_link(?MODULE, [HostId, HostPid], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([HostId, HostPid]) -> + {ok, Devices} = device_bo:get_host_devices(HostId), + {ok, #state{host_id = HostId, host_pid = HostPid, devices = Devices}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +%% 启动 +handle_cast(boot, State = #state{status = ?STATUS_BOOTING}) -> + {noreply, State}; +handle_cast(boot, State = #state{status = ?STATUS_PAUSED, devices = Devices}) -> + case length(Devices) > 0 of + true -> + trigger_task(0); + false -> + trigger_task(5000) + end, + {noreply, State#state{status = ?STATUS_BOOTING, tasks = Devices}}; + +%% 暂停 +handle_cast(pause, State = #state{status = ?STATUS_PAUSED}) -> + {noreply, State}; +handle_cast(pause, State = #state{status = ?STATUS_BOOTING}) -> + {noreply, State#state{status = ?STATUS_PAUSED, tasks = []}}; + +%% 添加设备 +handle_cast({add_device, DeviceUUID}, State = #state{devices = Devices}) -> + NDevices = case lists:member(DeviceUUID, Devices) of + true -> + Devices; + false -> + [DeviceUUID|Devices] + end, + {noreply, State#state{devices = NDevices}}; + +%% 删除设备 +handle_cast({delete_device, DeviceUUID}, State = #state{devices = Devices}) -> + {noreply, State#state{devices = lists:delete(DeviceUUID, Devices)}}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({timeout, _, task_ticker}, State = #state{host_id = HostId, host_pid = HostPid, status = Status, tasks = [DeviceUUID|Tail]}) -> + case Status of + ?STATUS_BOOTING -> + lager:debug("[iot_host_device_poller] host_id: ~p, start new task for device_uuid: ~p", [HostId, DeviceUUID]), + catch task(HostPid, DeviceUUID, 5), + trigger_task(0), + {noreply, State#state{tasks = Tail}}; + ?STATUS_PAUSED -> + lager:debug("[iot_host_device_poller] host_id: ~p, host status is paused", [HostId]), + {noreply, State#state{tasks = []}} + end; + +handle_info(next_task, State = #state{devices = Devices, status = Status, tasks = []}) -> + case Status of + ?STATUS_BOOTING -> + case length(Devices) > 0 of + true -> + trigger_task(0); + false -> + trigger_task(5000) + end, + {noreply, State#state{tasks = Devices}}; + ?STATUS_PAUSED -> + {noreply, State#state{tasks = []}} + end. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec trigger_task(Time :: integer()) -> no_return(). +trigger_task(Time) when is_integer(Time) -> + erlang:start_timer(Time, self(), task_ticker). + +-spec task(HostPid :: pid(), DeviceUUID :: binary(), Timeout :: integer()) -> no_return(). +task(HostPid, DeviceUUID, Timeout) when is_pid(HostPid), is_binary(DeviceUUID), is_integer(Timeout) -> + Command = #{ + <<"device_uuid">> => DeviceUUID, + <<"timeout">> => Timeout, + <<"command">> => <<"query_status">> + }, + BinCommand = iolist_to_binary(jiffy:encode(Command, [force_utf8])), + + case iot_device:get_pid(DeviceUUID) of + undefined -> + ok; + DevicePid when is_pid(DevicePid) -> + case is_process_alive(DevicePid) andalso iot_device:is_activated(DevicePid) of + true -> + case iot_host:publish_message(HostPid, 17, {aes, BinCommand}, Timeout * 1000) of + {ok, Reply} when is_binary(Reply) -> + case catch jiffy:decode(Reply, [return_maps]) of + %% 返回的是一个数组 + #{<<"result">> := ResponseList} when is_list(ResponseList) andalso length(ResponseList) > 0 -> + EdgeStatusList = lists:map(fun(#{<<"edge_status">> := EdgeStatus}) -> EdgeStatus end, ResponseList), + case lists:any(fun(S) -> S =:= 1 end, EdgeStatusList) of + true -> + DevicePid ! {poll_task_reply, {ok, 1}}; + false -> + case lists:all(fun(S) -> S =:= 0 end, EdgeStatusList) of + true -> + DevicePid, {poll_task_reply, {ok, 0}}; + false -> + DevicePid, {poll_task_reply, {ok, lists:min(EdgeStatusList)}} + end + end; + _ -> + DevicePid, {poll_task_reply, {error, <<"invalid reply json">>}} + end; + {error, Reason} -> + DevicePid, {poll_task_reply, {error, Reason}} + end; + false -> + ok + end + end. \ No newline at end of file