From b70a2a35d1a4a42a5aee617a32761557c7abd68c Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 29 Apr 2025 15:45:19 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=E8=BD=AE=E8=AF=A2=E6=9C=BA?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/iot/src/http_handler/device_handler.erl | 83 ++++--- apps/iot/src/iot_host.erl | 45 +--- apps/iot/src/iot_host_device_poller.erl | 229 ------------------- 3 files changed, 64 insertions(+), 293 deletions(-) delete mode 100644 apps/iot/src/iot_host_device_poller.erl diff --git a/apps/iot/src/http_handler/device_handler.erl b/apps/iot/src/http_handler/device_handler.erl index 50d2290..f41c567 100644 --- a/apps/iot/src/http_handler/device_handler.erl +++ b/apps/iot/src/http_handler/device_handler.erl @@ -73,19 +73,24 @@ handle_request("POST", "/device/query_edge_status", _, #{<<"host_id">> := HostId undefined -> {ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)}; HostPid when is_pid(HostPid) -> - Ref = make_ref(), - {ok, {TaskPid, _}} = iot_device_poll_task:start_monitor(), - iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, Timeout), - receive - {poll_task_reply, Ref, {ok, EdgeStatus}} -> - {ok, _} = device_bo:change_edge_status(DeviceUUID, EdgeStatus), - {ok, 200, iot_util:json_data(#{<<"edge_status">> => EdgeStatus})}; - {poll_task_reply, Ref, {error, Reason}} -> - lager:debug("[device_handler] query_edge_status device: ~p, get error: ~p", [DeviceUUID, Reason]), - {ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)} - after Timeout * 1000 -> - lager:debug("[device_handler] query_edge_status device: ~p, timeout", [DeviceUUID]), - {ok, 200, iot_util:json_error(404, <<"query_edge_status timeout">>)} + case iot_device:get_pid(DeviceUUID) of + undefined -> + ok; + DevicePid when is_pid(DevicePid) -> + case is_process_alive(DevicePid) andalso iot_device:is_activated(DevicePid) of + true -> + case do_poll(HostPid, DeviceUUID, Timeout) of + {ok, EdgeStatus} -> + {ok, _} = device_bo:change_edge_status(DeviceUUID, EdgeStatus), + {ok, 200, iot_util:json_data(#{<<"edge_status">> => EdgeStatus})}; + {error, Reason} -> + lager:debug("[device_handler] query_edge_status device: ~p, get error: ~p", [DeviceUUID, Reason]), + {ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)} + end; + false -> + lager:debug("[device_handler] query_edge_status device: ~p, not found", [DeviceUUID]), + {ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)} + end end end; @@ -93,7 +98,6 @@ handle_request(_, Path, _, _) -> Path1 = list_to_binary(Path), {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. - test(HostUUID, DeviceUUID) when is_binary(HostUUID), is_binary(DeviceUUID) -> Timeout = 10, lager:debug("[device_handler] host_id: ~p, will query_edge_status uuid: ~p", [HostUUID, DeviceUUID]), @@ -101,19 +105,42 @@ test(HostUUID, DeviceUUID) when is_binary(HostUUID), is_binary(DeviceUUID) -> undefined -> {ok, <<"host not found">>}; HostPid when is_pid(HostPid) -> - Ref = make_ref(), - {ok, {TaskPid, MRef}} = iot_device_poll_task:start_monitor(), - iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, Timeout), - receive - {poll_task_reply, Ref, {ok, EdgeStatus}} -> - {ok, _} = device_bo:change_edge_status(DeviceUUID, EdgeStatus), - {ok, 200, iot_util:json_data(#{<<"edge_status">> => EdgeStatus})}; - {poll_task_reply, Ref, {error, Reason}} -> - lager:debug("[device_handler] query_edge_status device: ~p, get error: ~p", [DeviceUUID, Reason]), - {ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)}; - {'DOWN', MRef, process, TaskPid, Reason} -> - {ok, <<"task pid down with reason:">>, Reason} - after Timeout * 1000 -> - {ok, 200, <<"query_edge_status timeout">>} + case iot_device:get_pid(DeviceUUID) of + undefined -> + {error, device_not_found}; + DevicePid when is_pid(DevicePid) -> + case is_process_alive(DevicePid) andalso iot_device:is_activated(DevicePid) of + true -> + do_poll(HostPid, DeviceUUID, Timeout); + false -> + {error, device_is_dead} + end end end. + +-spec do_poll(HostPid :: pid(), DeviceUUID :: binary(), Timeout :: integer()) -> {ok, EdgeStatus :: integer()} | {error, Reason :: any()}. +do_poll(HostPid, DeviceUUID, Timeout) when is_pid(HostPid), is_binary(DeviceUUID), is_integer(Timeout) -> + Command = #{ + <<"device_uuid">> => DeviceUUID, + <<"timeout">> => Timeout, + <<"command">> => <<"query_status">> + }, + BinCommand = iolist_to_binary(jiffy:encode(Command, [force_utf8])), + case iot_host:publish_message(HostPid, 17, {aes, BinCommand}, Timeout * 1000) of + {ok, Reply} when is_binary(Reply) -> + case catch jiffy:decode(Reply, [return_maps]) of + %% 返回的是一个数组 + #{<<"result">> := ResponseList} when is_list(ResponseList) andalso length(ResponseList) > 0 -> + EdgeStatusList = lists:map(fun(#{<<"edge_status">> := EdgeStatus}) -> EdgeStatus end, ResponseList), + case lists:any(fun(S) -> S =:= 1 end, EdgeStatusList) of + true -> + {ok, 1}; + false -> + {ok, 0} + end; + _ -> + {error, <<"invalid reply json">>} + end; + {error, Reason} -> + {error, Reason} + end. \ No newline at end of file diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index f995d57..6541eaf 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -43,10 +43,7 @@ %% websocket相关 channel_pid :: undefined | pid(), %% 主机的相关信息 - metrics = #{} :: map(), - - %% 设备状态轮询器 - poller_pid :: pid() + metrics = #{} :: map() }). %%%=================================================================== @@ -207,8 +204,7 @@ init([UUID]) -> false -> ?STATE_DENIED end, - {ok, PollerPid} = iot_host_device_poller:start_link(HostId, self()), - {ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, name = Name, poller_pid = PollerPid, has_session = false}}; + {ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, name = Name, has_session = false}}; undefined -> lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]), ignore @@ -311,12 +307,10 @@ handle_event({call, From}, {send_directive, Directive}, _, State = #state{uuid = {keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]}; %% 激活主机 -handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid, poller_pid = PollerPid}) when is_pid(ChannelPid) -> +handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid}) when is_pid(ChannelPid) -> BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]), ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>), lager:debug("[iot_host] uuid: ~p, activate: true, will send message: ~p", [UUID, BinReply]), - %% 启动轮询 - iot_host_device_poller:boot(PollerPid), {next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]}; handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, channel_pid = undefined}) -> @@ -324,22 +318,16 @@ handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, chan {next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]}; %% 关闭授权 -handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, poller_pid = PollerPid}) when is_pid(ChannelPid) -> +handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_pid(ChannelPid) -> BinReply = jiffy:encode(#{<<"auth">> => false}, [force_utf8]), ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>), ws_channel:stop(ChannelPid, closed), lager:debug("[iot_host] uuid: ~p, activate: false, will send message: ~p", [UUID, BinReply]), - %% 暂停轮询 - iot_host_device_poller:pause(PollerPid), - {next_state, ?STATE_DENIED, State#state{channel_pid = undefined, has_session = false}, [{reply, From, ok}]}; -handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, poller_pid = PollerPid, channel_pid = undefined}) -> +handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = undefined}) -> lager:debug("[iot_host] uuid: ~p, activate: false, no channel", [UUID]), - %% 暂停轮询 - iot_host_device_poller:pause(PollerPid), - {next_state, ?STATE_DENIED, State#state{has_session = false}, [{reply, From, ok}]}; %% 绑定channel @@ -353,7 +341,7 @@ handle_event({call, From}, {attach_channel, _}, _, State = #state{uuid = UUID, c {keep_state, State, [{reply, From, {error, <<"channel existed">>}}]}; %% 授权通过后,才能将主机的状态设置为在线状态 -handle_event({call, From}, {create_session, PubKey}, ?STATE_ACTIVATED, State = #state{uuid = UUID, poller_pid = PollerPid, aes = Aes, name = Name}) -> +handle_event({call, From}, {create_session, PubKey}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = Aes, name = Name}) -> Reply = #{<<"a">> => true, <<"aes">> => Aes}, EncReply = iot_cipher_rsa:encode(Reply, PubKey), {ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE), @@ -364,9 +352,6 @@ handle_event({call, From}, {create_session, PubKey}, ?STATE_ACTIVATED, State = # report_event(UUID, ?HOST_ONLINE), - %% 启动轮询 - iot_host_device_poller:boot(PollerPid), - lager:debug("[iot_host] host_id(session) uuid: ~p, create_session, will change status, affected_row: ~p", [UUID, AffectedRow]), {keep_state, State#state{has_session = true}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]}; @@ -377,12 +362,10 @@ handle_event({call, From}, {create_session, PubKey}, ?STATE_DENIED, State = #sta {keep_state, State#state{has_session = false}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]}; %% 重新加载设备信息 -handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{poller_pid = PollerPid}) -> +handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{}) -> case iot_device_sup:ensured_device_started(DeviceUUID) of {ok, DevicePid} -> iot_device:reload(DevicePid), - %% 增加设备 - iot_host_device_poller:add_device(PollerPid, DeviceUUID), {keep_state, State, [{reply, From, ok}]}; {error, Reason} -> @@ -390,30 +373,20 @@ handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{poller end; %% 删除设备 -handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{poller_pid = PollerPid}) -> +handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{}) -> case iot_device:get_pid(DeviceUUID) of undefined -> ok; DevicePid when is_pid(DevicePid) -> - %% 删除设备 - iot_host_device_poller:delete_device(PollerPid, DeviceUUID), - iot_device_sup:delete_device(DeviceUUID) end, {keep_state, State, [{reply, From, ok}]}; %% 激活设备 -handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State = #state{poller_pid = PollerPid}) -> +handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State = #state{}) -> case iot_device_sup:ensured_device_started(DeviceUUID) of {ok, DevicePid} -> iot_device:auth(DevicePid, Auth), - case Auth of - true -> - iot_host_device_poller:add_device(PollerPid, DeviceUUID); - false -> - iot_host_device_poller:delete_device(PollerPid, DeviceUUID) - end, - {keep_state, State, [{reply, From, ok}]}; {error, Reason} -> {keep_state, State, [{reply, From, {error, Reason}}]} diff --git a/apps/iot/src/iot_host_device_poller.erl b/apps/iot/src/iot_host_device_poller.erl deleted file mode 100644 index 483205c..0000000 --- a/apps/iot/src/iot_host_device_poller.erl +++ /dev/null @@ -1,229 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author anlicheng -%%% @copyright (C) 2025, -%%% @doc -%%% -%%% @end -%%% Created : 29. 4月 2025 12:19 -%%%------------------------------------------------------------------- --module(iot_host_device_poller). --author("anlicheng"). - --behaviour(gen_server). - -%% API --export([start_link/2]). --export([boot/1, pause/1, add_device/2, delete_device/2]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - --define(STATUS_BOOTING, booting). --define(STATUS_PAUSED, paused). - --record(state, { - host_id :: integer(), - host_pid :: pid(), - devices :: [binary()], - status = ?STATUS_PAUSED, - %% 当前的任务集合 - tasks = [] -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - -boot(Pid) when is_pid(Pid) -> - gen_server:cast(Pid, boot). - -pause(Pid) when is_pid(Pid) -> - gen_server:cast(Pid, pause). - -add_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) -> - gen_server:cast(Pid, {add_device, DeviceUUID}). - -delete_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) -> - gen_server:cast(Pid, {delete_device, DeviceUUID}). - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link(HostId :: integer(), HostPid :: pid()) -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(HostId, HostPid) when is_integer(HostId), is_pid(HostPid) -> - gen_server:start_link(?MODULE, [HostId, HostPid], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([HostId, HostPid]) -> - {ok, Devices} = device_bo:get_host_devices(HostId), - {ok, #state{host_id = HostId, host_pid = HostPid, devices = Devices}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -%% 启动 -handle_cast(boot, State = #state{status = ?STATUS_BOOTING}) -> - {noreply, State}; -handle_cast(boot, State = #state{status = ?STATUS_PAUSED, devices = Devices}) -> - case length(Devices) > 0 of - true -> - trigger_task(0); - false -> - trigger_task(5000) - end, - {noreply, State#state{status = ?STATUS_BOOTING, tasks = Devices}}; - -%% 暂停 -handle_cast(pause, State = #state{status = ?STATUS_PAUSED}) -> - {noreply, State}; -handle_cast(pause, State = #state{status = ?STATUS_BOOTING}) -> - {noreply, State#state{status = ?STATUS_PAUSED, tasks = []}}; - -%% 添加设备 -handle_cast({add_device, DeviceUUID}, State = #state{devices = Devices}) -> - NDevices = case lists:member(DeviceUUID, Devices) of - true -> - Devices; - false -> - [DeviceUUID|Devices] - end, - {noreply, State#state{devices = NDevices}}; - -%% 删除设备 -handle_cast({delete_device, DeviceUUID}, State = #state{devices = Devices}) -> - {noreply, State#state{devices = lists:delete(DeviceUUID, Devices)}}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info({timeout, _, task_ticker}, State = #state{host_id = HostId, host_pid = HostPid, status = Status, tasks = [DeviceUUID|Tail]}) -> - case Status of - ?STATUS_BOOTING -> - lager:debug("[iot_host_device_poller] host_id: ~p, start new task for device_uuid: ~p", [HostId, DeviceUUID]), - catch task(HostPid, DeviceUUID, 5), - trigger_task(0), - {noreply, State#state{tasks = Tail}}; - ?STATUS_PAUSED -> - lager:debug("[iot_host_device_poller] host_id: ~p, host status is paused", [HostId]), - {noreply, State#state{tasks = []}} - end; - -handle_info({timeout, _, task_ticker}, State = #state{devices = Devices, status = Status, tasks = []}) -> - case Status of - ?STATUS_BOOTING -> - case length(Devices) > 0 of - true -> - trigger_task(0); - false -> - trigger_task(5000) - end, - {noreply, State#state{tasks = Devices}}; - ?STATUS_PAUSED -> - {noreply, State#state{tasks = []}} - end; - -%% 部分请求可能在task执行超时后才返回 -handle_info({ws_response, _Ref, Response}, State = #state{host_id = HostId}) -> - lager:debug("[iot_host_device_poller] host_id: ~p, get ws_response: ~p, after timeout", [HostId, Response]), - {noreply, State}; - -handle_info(Info, State = #state{host_id = HostId}) -> - lager:debug("[iot_host_device_poller] host_id: ~p, get unknown info: ~p, after timeout", [HostId, Info]), - {noreply, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - --spec trigger_task(Time :: integer()) -> no_return(). -trigger_task(Time) when is_integer(Time) -> - erlang:start_timer(Time, self(), task_ticker). - --spec task(HostPid :: pid(), DeviceUUID :: binary(), Timeout :: integer()) -> no_return(). -task(HostPid, DeviceUUID, Timeout) when is_pid(HostPid), is_binary(DeviceUUID), is_integer(Timeout) -> - Command = #{ - <<"device_uuid">> => DeviceUUID, - <<"timeout">> => Timeout, - <<"command">> => <<"query_status">> - }, - BinCommand = iolist_to_binary(jiffy:encode(Command, [force_utf8])), - - case iot_device:get_pid(DeviceUUID) of - undefined -> - ok; - DevicePid when is_pid(DevicePid) -> - case is_process_alive(DevicePid) andalso iot_device:is_activated(DevicePid) of - true -> - case iot_host:publish_message(HostPid, 17, {aes, BinCommand}, Timeout * 1000) of - {ok, Reply} when is_binary(Reply) -> - case catch jiffy:decode(Reply, [return_maps]) of - %% 返回的是一个数组 - #{<<"result">> := ResponseList} when is_list(ResponseList) andalso length(ResponseList) > 0 -> - EdgeStatusList = lists:map(fun(#{<<"edge_status">> := EdgeStatus}) -> EdgeStatus end, ResponseList), - case lists:any(fun(S) -> S =:= 1 end, EdgeStatusList) of - true -> - DevicePid ! {poll_task_reply, {ok, 1}}; - false -> - case lists:all(fun(S) -> S =:= 0 end, EdgeStatusList) of - true -> - DevicePid, {poll_task_reply, {ok, 0}}; - false -> - DevicePid, {poll_task_reply, {ok, lists:min(EdgeStatusList)}} - end - end; - _ -> - DevicePid, {poll_task_reply, {error, <<"invalid reply json">>}} - end; - {error, Reason} -> - DevicePid, {poll_task_reply, {error, Reason}} - end; - false -> - ok - end - end. \ No newline at end of file