From e1b097134c4f7fa4bdfcce23c02ce9388f90f8a1 Mon Sep 17 00:00:00 2001 From: anlicheng Date: Tue, 19 Sep 2023 15:03:45 +0800 Subject: [PATCH] fix host --- apps/iot/src/http_handler/host_handler.erl | 13 ++------ apps/iot/src/iot_host.erl | 39 ++++++++++------------ apps/iot/src/websocket/ws_channel.erl | 14 +++++++- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/apps/iot/src/http_handler/host_handler.erl b/apps/iot/src/http_handler/host_handler.erl index 75cdb0b..9307d25 100644 --- a/apps/iot/src/http_handler/host_handler.erl +++ b/apps/iot/src/http_handler/host_handler.erl @@ -112,20 +112,15 @@ handle_request("POST", "/host/publish_command", _, end; %% 处理主机的授权的激活 -handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> := true, <<"timeout">> := Timeout}) when is_binary(UUID) -> +handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> := true}) when is_binary(UUID) -> case iot_host_sup:ensured_host_started(UUID) of {error, Reason} -> lager:debug("[host_handler] activate host_id: ~p, failed with reason: ~p", [UUID, Reason]), {ok, 200, iot_util:json_error(400, <<"host not found">>)}; {ok, Pid} when is_pid(Pid) -> lager:debug("[host_handler] activate host_id: ~p, start", [UUID]), - {ok, Aes} = iot_host:get_aes(Pid), - BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]), ok = iot_host:activate(Pid, true), - CmdResult = iot_host:publish_message(Pid, 8, BinReply, Timeout * 1000), - lager:debug("[iot_host_handler] host_id uuid: ~p, activate result is: ~p", [UUID, CmdResult]), - {ok, 200, iot_util:json_data(<<"success">>)} end; @@ -136,12 +131,8 @@ handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> := lager:debug("[host_handler] activate host_id: ~p, failed with reason: ~p", [UUID, Reason]), {ok, 200, iot_util:json_error(400, <<"host not found">>)}; {ok, Pid} when is_pid(Pid) -> - ok = iot_host:activate(Pid, false), - lager:debug("[host_handler] activate host_id: ~p, start", [UUID]), - BinReply = jiffy:encode(#{<<"auth">> => false}, [force_utf8]), - CmdResult = iot_host:publish_message(Pid, 8, BinReply, Timeout * 1000), - lager:debug("[iot_host_handler] host_id uuid: ~p, inactivate result is: ~p", [UUID, CmdResult]), + ok = iot_host:activate(Pid, false), {ok, 200, iot_util:json_data(<<"success">>)} end; diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index f432c1e..699fa97 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -229,33 +229,30 @@ handle_call({publish_message, _, _, _}, _From, State = #state{uuid = UUID}) -> lager:debug("[iot_host] uuid: ~p, publish_message invalid state: ~p", [UUID, state_map(State)]), {reply, {error, <<"主机状态错误,发送命令失败"/utf8>>}, State}; +%% 激活主机 +handle_call({activate, true}, _From, State = #state{aes = Aes, channel_pid = ChannelPid}) when is_pid(ChannelPid) -> + BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]), + ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>), + {reply, ok, State}; +handle_call({activate, true}, _From, State = #state{uuid = UUID, channel_pid = undefined}) -> + lager:debug("[iot_host] uuid: ~p, activate: true, no channel", [UUID]), + {reply, ok, State}; + %% 关闭授权 -handle_call({activate, Auth}, _From, State = #state{uuid = UUID, host_id = HostId, channel_pid = ChannelPid, has_session = false}) -> - case Auth of +handle_call({activate, false}, _From, State = #state{uuid = UUID, host_id = HostId, channel_pid = ChannelPid}) -> + case is_pid(ChannelPid) of true -> - ok; + BinReply = jiffy:encode(#{<<"auth">> => false}, [force_utf8]), + ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>), + ws_channel:stop(ChannelPid, closed); false -> - is_pid(ChannelPid) andalso exit(ChannelPid, kill), - - {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), - report_event(UUID, ?HOST_OFFLINE), - - change_devices_status(HostId, ?DEVICE_UNKNOWN) + lager:debug("[iot_host] uuid: ~p, activate: false, no channel", [UUID]) end, - {reply, ok, State#state{channel_pid = undefined}}; -handle_call({activate, Auth}, _From, State = #state{host_id = HostId, uuid = UUID, channel_pid = ChannelPid, has_session = true}) -> - case Auth of - true -> - ok; - false -> - exit(ChannelPid, kill), + {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), + report_event(UUID, ?HOST_OFFLINE), + change_devices_status(HostId, ?DEVICE_UNKNOWN), - {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), - report_event(UUID, ?HOST_OFFLINE), - - change_devices_status(HostId, ?DEVICE_UNKNOWN) - end, {reply, ok, State#state{channel_pid = undefined, has_session = false}}; %% 绑定channel diff --git a/apps/iot/src/websocket/ws_channel.erl b/apps/iot/src/websocket/ws_channel.erl index 3f2286f..93f3604 100644 --- a/apps/iot/src/websocket/ws_channel.erl +++ b/apps/iot/src/websocket/ws_channel.erl @@ -13,7 +13,7 @@ %% API -export([init/2]). -export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]). --export([publish/3, stop/2]). +-export([publish/3, stop/2, send/2]). -record(state, { uuid :: undefined | binary(), @@ -33,6 +33,11 @@ publish(Pid, ReceiverPid, Msg) when is_pid(Pid), is_binary(Msg) -> Pid ! {publish, ReceiverPid, Ref, Msg}, Ref. +%% 向通道中写入消息 +-spec send(Pid :: pid(), Msg :: binary()) -> no_return(). +send(Pid, Msg) when is_pid(Pid), is_binary(Msg) -> + Pid ! {send, Msg}. + %% 关闭方法 -spec stop(Pid :: pid(), Reason :: any()) -> no_return(). stop(undefined, _Reason) -> @@ -110,6 +115,9 @@ websocket_handle({binary, <>}, State = #state{uuid = UUID}) -> + lager:debug("[ws_channel] uuid: ~p, get send response message: ~p", [UUID, Body]), + {ok, State}; websocket_handle({binary, <>}, State = #state{uuid = UUID, inflight = Inflight}) when PacketId > 0 -> lager:debug("[ws_channel] uuid: ~p, get publish response message: ~p, packet_id: ~p", [UUID, Body, PacketId]), case maps:take(PacketId, Inflight) of @@ -142,6 +150,10 @@ websocket_info({publish, ReceiverPid, Ref, Msg}, State = #state{packet_id = Pack NInflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight), {reply, {binary, <>}, State#state{packet_id = PacketId + 1, inflight = NInflight}}; +%% 发送消息, 不需要等待回复 +websocket_info({send, Msg}, State) when is_binary(Msg) -> + {reply, {binary, <>}, State}; + %% 用户进程关闭,则关闭通道 websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) -> lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]),