From ef6a81ce44ee42acd90f233ada0c477c3d02b9dd Mon Sep 17 00:00:00 2001 From: anlicheng Date: Thu, 10 Aug 2023 15:01:49 +0800 Subject: [PATCH] fix channel --- apps/iot/src/websocket/ws_channel.erl | 53 ++++++++++----------------- 1 file changed, 20 insertions(+), 33 deletions(-) diff --git a/apps/iot/src/websocket/ws_channel.erl b/apps/iot/src/websocket/ws_channel.erl index 20ab89f..776c906 100644 --- a/apps/iot/src/websocket/ws_channel.erl +++ b/apps/iot/src/websocket/ws_channel.erl @@ -54,61 +54,48 @@ websocket_init(_State) -> websocket_handle({binary, <>}, State) -> #{<<"uuid">> := UUID, <<"timestamp">> := Timestamp, <<"salt">> := Salt, <<"username">> := Username, <<"token">> := Token} = jiffy:decode(Data, [return_maps]), - lager:debug("[ws_channel] register uuid: ~p, messag: ~p", [UUID, Data]), + lager:debug("[ws_channel] register uuid: ~p, request message: ~p", [UUID, Data]), case iot_auth:check(Username, Token, UUID, Salt, Timestamp) of true -> %% 查找数据库,如果没有则插入 - case host_bo:ensured_host(UUID) of - ok -> - lager:debug("[ws_channel] register success, host uuid: ~p", [UUID]), - %% 尝试启动主机的服务进程 - {ok, HostPid} = iot_host_sup:ensured_host_started(UUID), - ok = iot_host:attach_channel(HostPid, self()), - %% 建立到host的monitor - erlang:monitor(process, HostPid), + ok = host_bo:ensured_host(UUID), + %% 尝试启动主机的服务进程 + {ok, HostPid} = iot_host_sup:ensured_host_started(UUID), + ok = iot_host:attach_channel(HostPid, self()), + %% 建立到host的monitor + erlang:monitor(process, HostPid), - Reply = jiffy:encode(#{ - <<"code">> => 1, - <<"message">> => <<"ok">> - }), - {reply, {binary, <>}, State#state{uuid = UUID, host_pid = HostPid}}; + Reply = jiffy:encode(#{<<"code">> => 1, <<"message">> => <<"ok">>}, [force_utf8]), - {error, Reason} -> - lager:warning("[ws_channel] register failed, uuid: ~p, reason: ~p", [UUID, Reason]), - {stop, State} - end; + lager:debug("[ws_channel] register success, host uuid: ~p", [UUID]), + + {reply, {binary, <>}, State#state{uuid = UUID, host_pid = HostPid}}; false -> lager:warning("[ws_channel] uuid: ~p, user: ~p, auth failed", [UUID, Username]), {stop, State} end; -websocket_handle({binary, <>}, State = #state{host_pid = HostPid, uuid = UUID}) when is_pid(HostPid) -> - lager:debug("[ws_channel] create session, uuid: ~p", [UUID]), +websocket_handle({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> {ok, Reply} = iot_host:create_session(HostPid, PubKey), {reply, {binary, <>}, State}; -websocket_handle({binary, <>}, State = #state{uuid = UUID, host_pid = HostPid}) when is_pid(HostPid) -> - lager:debug("[ws_channel] data uuid: ~p, data: ~p", [UUID, Data]), +websocket_handle({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> iot_host:handle(HostPid, {data, Data}), {ok, State}; -websocket_handle({binary, <>}, State = #state{uuid = UUID, host_pid = HostPid}) when is_pid(HostPid) -> - lager:debug("[ws_channel] ping uuid: ~p", [UUID]), +websocket_handle({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> iot_host:handle(HostPid, {ping, CipherMetric}), {ok, State}; -websocket_handle({binary, <>}, State = #state{uuid = UUID, host_pid = HostPid}) when is_pid(HostPid) -> - lager:debug("[ws_channel] inform uuid: ~p", [UUID]), +websocket_handle({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> iot_host:handle(HostPid, {inform, CipherInfo}), {ok, State}; -websocket_handle({binary, <>}, State = #state{uuid = UUID, host_pid = HostPid}) when is_pid(HostPid) -> - lager:debug("[ws_channel] feedback step uuid: ~p", [UUID]), +websocket_handle({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> iot_host:handle(HostPid, {feedback_step, CipherInfo}), {ok, State}; -websocket_handle({binary, <>}, State = #state{uuid = UUID, host_pid = HostPid}) when is_pid(HostPid) -> - lager:debug("[ws_channel] feedback result uuid: ~p", [UUID]), +websocket_handle({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> iot_host:handle(HostPid, {feedback_result, CipherInfo}), {ok, State}; @@ -126,13 +113,13 @@ websocket_handle({binary, <> true -> ReceiverPid ! {ws_response, Ref, Body}; false -> - lager:warning("[ws_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is dead", [Body, PacketId]) + lager:warning("[ws_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Body, PacketId]) end, {ok, State#state{inflight = NInflight}} end; websocket_handle(Info, State) -> - lager:error("[ws_channel] get a error messag: ~p, channel will close", [Info]), + lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]), {stop, State}. %% 处理关闭信号 @@ -147,7 +134,7 @@ websocket_info({publish, ReceiverPid, Ref, Msg}, State = #state{packet_id = Pack %% 用户进程关闭,则关闭通道 websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) -> - lager:debug("[ws_channel] uuid: ~p, channel will close because user exited with reason: ~p", [UUID, Reason]), + lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]), {stop, State}; %% 处理其他未知消息