From f39f92e97860d94009cdca533cfa7cd53ec19c32 Mon Sep 17 00:00:00 2001 From: anlicheng Date: Tue, 19 Sep 2023 14:14:39 +0800 Subject: [PATCH] fix ws_channel --- apps/iot/src/iot_host.erl | 25 +++++++++++-------------- apps/iot/src/websocket/ws_channel.erl | 15 ++++++++++----- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 46bf96c..f432c1e 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -83,7 +83,7 @@ activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) -> get_metric(Pid) when is_pid(Pid) -> gen_server:call(Pid, get_metric). --spec attach_channel(pid(), pid()) -> ok. +-spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()}. attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) -> gen_server:call(Pid, {attach_channel, ChannelPid}). @@ -259,18 +259,16 @@ handle_call({activate, Auth}, _From, State = #state{host_id = HostId, uuid = UUI {reply, ok, State#state{channel_pid = undefined, has_session = false}}; %% 绑定channel -handle_call({attach_channel, ChannelPid}, _From, State = #state{uuid = UUID, channel_pid = undefined}) -> +handle_call({attach_channel, ChannelPid}, _From, State = #state{uuid = UUID, channel_pid = OldChannelPid}) -> lager:debug("[iot_host] attach_channel host_id uuid: ~p, channel: ~p", [UUID, ChannelPid]), - erlang:monitor(process, ChannelPid), - - {reply, ok, State#state{channel_pid = ChannelPid}}; - -handle_call({attach_channel, ChannelPid}, _From, State = #state{uuid = UUID, channel_pid = OldChannelPid}) when is_pid(OldChannelPid) -> - lager:debug("[iot_host] attach_channel host_id uuid: ~p, old channel: ~p replace with: ~p", [UUID, OldChannelPid, ChannelPid]), - ws_channel:stop(OldChannelPid, closed), - %% 建立到新的channel的monitor - erlang:monitor(process, ChannelPid), - {reply, ok, State#state{channel_pid = ChannelPid}}; + case OldChannelPid =:= undefined of + true -> + erlang:monitor(process, ChannelPid), + {reply, ok, State#state{channel_pid = ChannelPid}}; + false -> + lager:debug("[iot_host] attach_channel host_id uuid: ~p, old channel exists: ~p", [UUID, OldChannelPid]), + {reply, {error, <<"channel existed">>}, State#state{channel_pid = ChannelPid}} + end; %% 授权通过后,才能将主机的状态设置为在线状态 handle_call({create_session, PubKey}, _From, State = #state{uuid = UUID, aes = Aes}) -> @@ -429,8 +427,7 @@ handle_cast({handle, {event, Event0}}, State = #state{uuid = UUID, aes = AES, ha {noreply, State}; %% 心跳机制 -handle_cast(heartbeat, State = #state{uuid = UUID, heartbeat_counter = HeartbeatCounter}) -> - lager:debug("[iot_host] uuip: ~p, get heartbeat, counter is: ~p", [UUID, HeartbeatCounter]), +handle_cast(heartbeat, State = #state{heartbeat_counter = HeartbeatCounter}) -> {noreply, State#state{heartbeat_counter = HeartbeatCounter + 1}}. -spec handle_info(Info :: timeout | term(), State :: term()) -> diff --git a/apps/iot/src/websocket/ws_channel.erl b/apps/iot/src/websocket/ws_channel.erl index b6be843..3f2286f 100644 --- a/apps/iot/src/websocket/ws_channel.erl +++ b/apps/iot/src/websocket/ws_channel.erl @@ -64,12 +64,17 @@ websocket_handle({binary, < %% 尝试启动主机的服务进程 {ok, HostPid} = iot_host_sup:ensured_host_started(UUID), - ok = iot_host:attach_channel(HostPid, self()), - %% 建立到host的monitor - erlang:monitor(process, HostPid), - Reply = jiffy:encode(#{<<"code">> => 1, <<"message">> => <<"ok">>}, [force_utf8]), + case iot_host:attach_channel(HostPid, self()) of + ok -> + %% 建立到host的monitor + erlang:monitor(process, HostPid), + Reply = jiffy:encode(#{<<"code">> => 1, <<"message">> => <<"ok">>}, [force_utf8]), - {reply, {binary, <>}, State#state{uuid = UUID, host_pid = HostPid}} + {reply, {binary, <>}, State#state{uuid = UUID, host_pid = HostPid}}; + {error, Reason} -> + lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p", [UUID, Reason]), + {stop, State} + end end; false -> lager:warning("[ws_channel] uuid: ~p, user: ~p, auth failed", [UUID, Username]),