删除轮询机制
This commit is contained in:
parent
089463e11f
commit
b70a2a35d1
@ -73,19 +73,24 @@ handle_request("POST", "/device/query_edge_status", _, #{<<"host_id">> := HostId
|
||||
undefined ->
|
||||
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)};
|
||||
HostPid when is_pid(HostPid) ->
|
||||
Ref = make_ref(),
|
||||
{ok, {TaskPid, _}} = iot_device_poll_task:start_monitor(),
|
||||
iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, Timeout),
|
||||
receive
|
||||
{poll_task_reply, Ref, {ok, EdgeStatus}} ->
|
||||
case iot_device:get_pid(DeviceUUID) of
|
||||
undefined ->
|
||||
ok;
|
||||
DevicePid when is_pid(DevicePid) ->
|
||||
case is_process_alive(DevicePid) andalso iot_device:is_activated(DevicePid) of
|
||||
true ->
|
||||
case do_poll(HostPid, DeviceUUID, Timeout) of
|
||||
{ok, EdgeStatus} ->
|
||||
{ok, _} = device_bo:change_edge_status(DeviceUUID, EdgeStatus),
|
||||
{ok, 200, iot_util:json_data(#{<<"edge_status">> => EdgeStatus})};
|
||||
{poll_task_reply, Ref, {error, Reason}} ->
|
||||
{error, Reason} ->
|
||||
lager:debug("[device_handler] query_edge_status device: ~p, get error: ~p", [DeviceUUID, Reason]),
|
||||
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)}
|
||||
after Timeout * 1000 ->
|
||||
lager:debug("[device_handler] query_edge_status device: ~p, timeout", [DeviceUUID]),
|
||||
{ok, 200, iot_util:json_error(404, <<"query_edge_status timeout">>)}
|
||||
end;
|
||||
false ->
|
||||
lager:debug("[device_handler] query_edge_status device: ~p, not found", [DeviceUUID]),
|
||||
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)}
|
||||
end
|
||||
end
|
||||
end;
|
||||
|
||||
@ -93,7 +98,6 @@ handle_request(_, Path, _, _) ->
|
||||
Path1 = list_to_binary(Path),
|
||||
{ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}.
|
||||
|
||||
|
||||
test(HostUUID, DeviceUUID) when is_binary(HostUUID), is_binary(DeviceUUID) ->
|
||||
Timeout = 10,
|
||||
lager:debug("[device_handler] host_id: ~p, will query_edge_status uuid: ~p", [HostUUID, DeviceUUID]),
|
||||
@ -101,19 +105,42 @@ test(HostUUID, DeviceUUID) when is_binary(HostUUID), is_binary(DeviceUUID) ->
|
||||
undefined ->
|
||||
{ok, <<"host not found">>};
|
||||
HostPid when is_pid(HostPid) ->
|
||||
Ref = make_ref(),
|
||||
{ok, {TaskPid, MRef}} = iot_device_poll_task:start_monitor(),
|
||||
iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, Timeout),
|
||||
receive
|
||||
{poll_task_reply, Ref, {ok, EdgeStatus}} ->
|
||||
{ok, _} = device_bo:change_edge_status(DeviceUUID, EdgeStatus),
|
||||
{ok, 200, iot_util:json_data(#{<<"edge_status">> => EdgeStatus})};
|
||||
{poll_task_reply, Ref, {error, Reason}} ->
|
||||
lager:debug("[device_handler] query_edge_status device: ~p, get error: ~p", [DeviceUUID, Reason]),
|
||||
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)};
|
||||
{'DOWN', MRef, process, TaskPid, Reason} ->
|
||||
{ok, <<"task pid down with reason:">>, Reason}
|
||||
after Timeout * 1000 ->
|
||||
{ok, 200, <<"query_edge_status timeout">>}
|
||||
case iot_device:get_pid(DeviceUUID) of
|
||||
undefined ->
|
||||
{error, device_not_found};
|
||||
DevicePid when is_pid(DevicePid) ->
|
||||
case is_process_alive(DevicePid) andalso iot_device:is_activated(DevicePid) of
|
||||
true ->
|
||||
do_poll(HostPid, DeviceUUID, Timeout);
|
||||
false ->
|
||||
{error, device_is_dead}
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
-spec do_poll(HostPid :: pid(), DeviceUUID :: binary(), Timeout :: integer()) -> {ok, EdgeStatus :: integer()} | {error, Reason :: any()}.
|
||||
do_poll(HostPid, DeviceUUID, Timeout) when is_pid(HostPid), is_binary(DeviceUUID), is_integer(Timeout) ->
|
||||
Command = #{
|
||||
<<"device_uuid">> => DeviceUUID,
|
||||
<<"timeout">> => Timeout,
|
||||
<<"command">> => <<"query_status">>
|
||||
},
|
||||
BinCommand = iolist_to_binary(jiffy:encode(Command, [force_utf8])),
|
||||
case iot_host:publish_message(HostPid, 17, {aes, BinCommand}, Timeout * 1000) of
|
||||
{ok, Reply} when is_binary(Reply) ->
|
||||
case catch jiffy:decode(Reply, [return_maps]) of
|
||||
%% 返回的是一个数组
|
||||
#{<<"result">> := ResponseList} when is_list(ResponseList) andalso length(ResponseList) > 0 ->
|
||||
EdgeStatusList = lists:map(fun(#{<<"edge_status">> := EdgeStatus}) -> EdgeStatus end, ResponseList),
|
||||
case lists:any(fun(S) -> S =:= 1 end, EdgeStatusList) of
|
||||
true ->
|
||||
{ok, 1};
|
||||
false ->
|
||||
{ok, 0}
|
||||
end;
|
||||
_ ->
|
||||
{error, <<"invalid reply json">>}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
@ -43,10 +43,7 @@
|
||||
%% websocket相关
|
||||
channel_pid :: undefined | pid(),
|
||||
%% 主机的相关信息
|
||||
metrics = #{} :: map(),
|
||||
|
||||
%% 设备状态轮询器
|
||||
poller_pid :: pid()
|
||||
metrics = #{} :: map()
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
@ -207,8 +204,7 @@ init([UUID]) ->
|
||||
false ->
|
||||
?STATE_DENIED
|
||||
end,
|
||||
{ok, PollerPid} = iot_host_device_poller:start_link(HostId, self()),
|
||||
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, name = Name, poller_pid = PollerPid, has_session = false}};
|
||||
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, name = Name, has_session = false}};
|
||||
undefined ->
|
||||
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
|
||||
ignore
|
||||
@ -311,12 +307,10 @@ handle_event({call, From}, {send_directive, Directive}, _, State = #state{uuid =
|
||||
{keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]};
|
||||
|
||||
%% 激活主机
|
||||
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid, poller_pid = PollerPid}) when is_pid(ChannelPid) ->
|
||||
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid}) when is_pid(ChannelPid) ->
|
||||
BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]),
|
||||
ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>),
|
||||
lager:debug("[iot_host] uuid: ~p, activate: true, will send message: ~p", [UUID, BinReply]),
|
||||
%% 启动轮询
|
||||
iot_host_device_poller:boot(PollerPid),
|
||||
|
||||
{next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]};
|
||||
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, channel_pid = undefined}) ->
|
||||
@ -324,22 +318,16 @@ handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, chan
|
||||
{next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]};
|
||||
|
||||
%% 关闭授权
|
||||
handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, poller_pid = PollerPid}) when is_pid(ChannelPid) ->
|
||||
handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_pid(ChannelPid) ->
|
||||
BinReply = jiffy:encode(#{<<"auth">> => false}, [force_utf8]),
|
||||
ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>),
|
||||
ws_channel:stop(ChannelPid, closed),
|
||||
lager:debug("[iot_host] uuid: ~p, activate: false, will send message: ~p", [UUID, BinReply]),
|
||||
|
||||
%% 暂停轮询
|
||||
iot_host_device_poller:pause(PollerPid),
|
||||
|
||||
{next_state, ?STATE_DENIED, State#state{channel_pid = undefined, has_session = false}, [{reply, From, ok}]};
|
||||
|
||||
handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, poller_pid = PollerPid, channel_pid = undefined}) ->
|
||||
handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = undefined}) ->
|
||||
lager:debug("[iot_host] uuid: ~p, activate: false, no channel", [UUID]),
|
||||
%% 暂停轮询
|
||||
iot_host_device_poller:pause(PollerPid),
|
||||
|
||||
{next_state, ?STATE_DENIED, State#state{has_session = false}, [{reply, From, ok}]};
|
||||
|
||||
%% 绑定channel
|
||||
@ -353,7 +341,7 @@ handle_event({call, From}, {attach_channel, _}, _, State = #state{uuid = UUID, c
|
||||
{keep_state, State, [{reply, From, {error, <<"channel existed">>}}]};
|
||||
|
||||
%% 授权通过后,才能将主机的状态设置为在线状态
|
||||
handle_event({call, From}, {create_session, PubKey}, ?STATE_ACTIVATED, State = #state{uuid = UUID, poller_pid = PollerPid, aes = Aes, name = Name}) ->
|
||||
handle_event({call, From}, {create_session, PubKey}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = Aes, name = Name}) ->
|
||||
Reply = #{<<"a">> => true, <<"aes">> => Aes},
|
||||
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
|
||||
{ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE),
|
||||
@ -364,9 +352,6 @@ handle_event({call, From}, {create_session, PubKey}, ?STATE_ACTIVATED, State = #
|
||||
|
||||
report_event(UUID, ?HOST_ONLINE),
|
||||
|
||||
%% 启动轮询
|
||||
iot_host_device_poller:boot(PollerPid),
|
||||
|
||||
lager:debug("[iot_host] host_id(session) uuid: ~p, create_session, will change status, affected_row: ~p", [UUID, AffectedRow]),
|
||||
{keep_state, State#state{has_session = true}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
||||
|
||||
@ -377,12 +362,10 @@ handle_event({call, From}, {create_session, PubKey}, ?STATE_DENIED, State = #sta
|
||||
{keep_state, State#state{has_session = false}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
||||
|
||||
%% 重新加载设备信息
|
||||
handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{poller_pid = PollerPid}) ->
|
||||
handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{}) ->
|
||||
case iot_device_sup:ensured_device_started(DeviceUUID) of
|
||||
{ok, DevicePid} ->
|
||||
iot_device:reload(DevicePid),
|
||||
%% 增加设备
|
||||
iot_host_device_poller:add_device(PollerPid, DeviceUUID),
|
||||
|
||||
{keep_state, State, [{reply, From, ok}]};
|
||||
{error, Reason} ->
|
||||
@ -390,30 +373,20 @@ handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{poller
|
||||
end;
|
||||
|
||||
%% 删除设备
|
||||
handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{poller_pid = PollerPid}) ->
|
||||
handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{}) ->
|
||||
case iot_device:get_pid(DeviceUUID) of
|
||||
undefined ->
|
||||
ok;
|
||||
DevicePid when is_pid(DevicePid) ->
|
||||
%% 删除设备
|
||||
iot_host_device_poller:delete_device(PollerPid, DeviceUUID),
|
||||
|
||||
iot_device_sup:delete_device(DeviceUUID)
|
||||
end,
|
||||
{keep_state, State, [{reply, From, ok}]};
|
||||
|
||||
%% 激活设备
|
||||
handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State = #state{poller_pid = PollerPid}) ->
|
||||
handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State = #state{}) ->
|
||||
case iot_device_sup:ensured_device_started(DeviceUUID) of
|
||||
{ok, DevicePid} ->
|
||||
iot_device:auth(DevicePid, Auth),
|
||||
case Auth of
|
||||
true ->
|
||||
iot_host_device_poller:add_device(PollerPid, DeviceUUID);
|
||||
false ->
|
||||
iot_host_device_poller:delete_device(PollerPid, DeviceUUID)
|
||||
end,
|
||||
|
||||
{keep_state, State, [{reply, From, ok}]};
|
||||
{error, Reason} ->
|
||||
{keep_state, State, [{reply, From, {error, Reason}}]}
|
||||
|
||||
@ -1,229 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author anlicheng
|
||||
%%% @copyright (C) 2025, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 29. 4月 2025 12:19
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(iot_host_device_poller).
|
||||
-author("anlicheng").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/2]).
|
||||
-export([boot/1, pause/1, add_device/2, delete_device/2]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
-define(STATUS_BOOTING, booting).
|
||||
-define(STATUS_PAUSED, paused).
|
||||
|
||||
-record(state, {
|
||||
host_id :: integer(),
|
||||
host_pid :: pid(),
|
||||
devices :: [binary()],
|
||||
status = ?STATUS_PAUSED,
|
||||
%% 当前的任务集合
|
||||
tasks = []
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
boot(Pid) when is_pid(Pid) ->
|
||||
gen_server:cast(Pid, boot).
|
||||
|
||||
pause(Pid) when is_pid(Pid) ->
|
||||
gen_server:cast(Pid, pause).
|
||||
|
||||
add_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) ->
|
||||
gen_server:cast(Pid, {add_device, DeviceUUID}).
|
||||
|
||||
delete_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) ->
|
||||
gen_server:cast(Pid, {delete_device, DeviceUUID}).
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link(HostId :: integer(), HostPid :: pid()) ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link(HostId, HostPid) when is_integer(HostId), is_pid(HostPid) ->
|
||||
gen_server:start_link(?MODULE, [HostId, HostPid], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Initializes the server
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([HostId, HostPid]) ->
|
||||
{ok, Devices} = device_bo:get_host_devices(HostId),
|
||||
{ok, #state{host_id = HostId, host_pid = HostPid, devices = Devices}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call(_Request, _From, State = #state{}) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
%% 启动
|
||||
handle_cast(boot, State = #state{status = ?STATUS_BOOTING}) ->
|
||||
{noreply, State};
|
||||
handle_cast(boot, State = #state{status = ?STATUS_PAUSED, devices = Devices}) ->
|
||||
case length(Devices) > 0 of
|
||||
true ->
|
||||
trigger_task(0);
|
||||
false ->
|
||||
trigger_task(5000)
|
||||
end,
|
||||
{noreply, State#state{status = ?STATUS_BOOTING, tasks = Devices}};
|
||||
|
||||
%% 暂停
|
||||
handle_cast(pause, State = #state{status = ?STATUS_PAUSED}) ->
|
||||
{noreply, State};
|
||||
handle_cast(pause, State = #state{status = ?STATUS_BOOTING}) ->
|
||||
{noreply, State#state{status = ?STATUS_PAUSED, tasks = []}};
|
||||
|
||||
%% 添加设备
|
||||
handle_cast({add_device, DeviceUUID}, State = #state{devices = Devices}) ->
|
||||
NDevices = case lists:member(DeviceUUID, Devices) of
|
||||
true ->
|
||||
Devices;
|
||||
false ->
|
||||
[DeviceUUID|Devices]
|
||||
end,
|
||||
{noreply, State#state{devices = NDevices}};
|
||||
|
||||
%% 删除设备
|
||||
handle_cast({delete_device, DeviceUUID}, State = #state{devices = Devices}) ->
|
||||
{noreply, State#state{devices = lists:delete(DeviceUUID, Devices)}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info({timeout, _, task_ticker}, State = #state{host_id = HostId, host_pid = HostPid, status = Status, tasks = [DeviceUUID|Tail]}) ->
|
||||
case Status of
|
||||
?STATUS_BOOTING ->
|
||||
lager:debug("[iot_host_device_poller] host_id: ~p, start new task for device_uuid: ~p", [HostId, DeviceUUID]),
|
||||
catch task(HostPid, DeviceUUID, 5),
|
||||
trigger_task(0),
|
||||
{noreply, State#state{tasks = Tail}};
|
||||
?STATUS_PAUSED ->
|
||||
lager:debug("[iot_host_device_poller] host_id: ~p, host status is paused", [HostId]),
|
||||
{noreply, State#state{tasks = []}}
|
||||
end;
|
||||
|
||||
handle_info({timeout, _, task_ticker}, State = #state{devices = Devices, status = Status, tasks = []}) ->
|
||||
case Status of
|
||||
?STATUS_BOOTING ->
|
||||
case length(Devices) > 0 of
|
||||
true ->
|
||||
trigger_task(0);
|
||||
false ->
|
||||
trigger_task(5000)
|
||||
end,
|
||||
{noreply, State#state{tasks = Devices}};
|
||||
?STATUS_PAUSED ->
|
||||
{noreply, State#state{tasks = []}}
|
||||
end;
|
||||
|
||||
%% 部分请求可能在task执行超时后才返回
|
||||
handle_info({ws_response, _Ref, Response}, State = #state{host_id = HostId}) ->
|
||||
lager:debug("[iot_host_device_poller] host_id: ~p, get ws_response: ~p, after timeout", [HostId, Response]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info(Info, State = #state{host_id = HostId}) ->
|
||||
lager:debug("[iot_host_device_poller] host_id: ~p, get unknown info: ~p, after timeout", [HostId, Info]),
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(_Reason, _State = #state{}) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
||||
-spec trigger_task(Time :: integer()) -> no_return().
|
||||
trigger_task(Time) when is_integer(Time) ->
|
||||
erlang:start_timer(Time, self(), task_ticker).
|
||||
|
||||
-spec task(HostPid :: pid(), DeviceUUID :: binary(), Timeout :: integer()) -> no_return().
|
||||
task(HostPid, DeviceUUID, Timeout) when is_pid(HostPid), is_binary(DeviceUUID), is_integer(Timeout) ->
|
||||
Command = #{
|
||||
<<"device_uuid">> => DeviceUUID,
|
||||
<<"timeout">> => Timeout,
|
||||
<<"command">> => <<"query_status">>
|
||||
},
|
||||
BinCommand = iolist_to_binary(jiffy:encode(Command, [force_utf8])),
|
||||
|
||||
case iot_device:get_pid(DeviceUUID) of
|
||||
undefined ->
|
||||
ok;
|
||||
DevicePid when is_pid(DevicePid) ->
|
||||
case is_process_alive(DevicePid) andalso iot_device:is_activated(DevicePid) of
|
||||
true ->
|
||||
case iot_host:publish_message(HostPid, 17, {aes, BinCommand}, Timeout * 1000) of
|
||||
{ok, Reply} when is_binary(Reply) ->
|
||||
case catch jiffy:decode(Reply, [return_maps]) of
|
||||
%% 返回的是一个数组
|
||||
#{<<"result">> := ResponseList} when is_list(ResponseList) andalso length(ResponseList) > 0 ->
|
||||
EdgeStatusList = lists:map(fun(#{<<"edge_status">> := EdgeStatus}) -> EdgeStatus end, ResponseList),
|
||||
case lists:any(fun(S) -> S =:= 1 end, EdgeStatusList) of
|
||||
true ->
|
||||
DevicePid ! {poll_task_reply, {ok, 1}};
|
||||
false ->
|
||||
case lists:all(fun(S) -> S =:= 0 end, EdgeStatusList) of
|
||||
true ->
|
||||
DevicePid, {poll_task_reply, {ok, 0}};
|
||||
false ->
|
||||
DevicePid, {poll_task_reply, {ok, lists:min(EdgeStatusList)}}
|
||||
end
|
||||
end;
|
||||
_ ->
|
||||
DevicePid, {poll_task_reply, {error, <<"invalid reply json">>}}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
DevicePid, {poll_task_reply, {error, Reason}}
|
||||
end;
|
||||
false ->
|
||||
ok
|
||||
end
|
||||
end.
|
||||
Loading…
x
Reference in New Issue
Block a user