From df8cc6f108ab3f6788a545fcfd49c9ece0ceb49c Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 22 Apr 2025 11:10:26 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E6=8F=A1=E6=89=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/iot/include/iot.hrl | 1 - apps/iot/src/iot_host.erl | 36 ++--- apps/iot/src/websocket/tcp_channel.erl | 16 ++- apps/iot/src/websocket/ws_channel.erl | 174 ------------------------- 4 files changed, 23 insertions(+), 204 deletions(-) delete mode 100644 apps/iot/src/websocket/ws_channel.erl diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 39cd286..3274851 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -26,7 +26,6 @@ %% 主机端上报数据类型标识 %% 建立到websocket的register关系 -define(METHOD_AUTH, 16#00). --define(METHOD_CREATE_SESSION, 16#01). -define(METHOD_DATA, 16#02). -define(METHOD_PING, 16#03). diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 718a023..1bdfabd 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -23,7 +23,7 @@ -export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]). -export([get_metric/1, publish_message/4, get_aes/1, get_status/1]). -export([publish_directive/6, send_directive/5]). --export([create_session/2, attach_channel/2]). +-export([attach_channel/2]). -export([reload_device/2, delete_device/2, activate_device/3]). -export([heartbeat/1]). @@ -88,10 +88,6 @@ get_metric(Pid) when is_pid(Pid) -> attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) -> gen_statem:call(Pid, {attach_channel, ChannelPid}). --spec create_session(Pid :: pid(), PubKey :: binary()) -> {ok, Reply :: binary()}. -create_session(Pid, PubKey) when is_pid(Pid), is_binary(PubKey) -> - gen_statem:call(Pid, {create_session, PubKey}). - %% 这里占用的的调用进程的时间 -spec publish_message(Pid :: pid(), CommandType :: integer(), Params :: binary() | {Encrypt :: atom(), Params :: binary()}, Timeout :: integer()) -> ok | {ok, Response :: binary()} | {error, Reason :: any()}. @@ -326,30 +322,24 @@ handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, cha {next_state, ?STATE_DENIED, State#state{has_session = false}, [{reply, From, ok}]}; %% 绑定channel -handle_event({call, From}, {attach_channel, ChannelPid}, _, State = #state{uuid = UUID, channel_pid = undefined}) -> - lager:debug("[iot_host] attach_channel host_id uuid: ~p, channel: ~p", [UUID, ChannelPid]), +handle_event({call, From}, {attach_channel, ChannelPid}, ?STATE_ACTIVATED, State = #state{uuid = UUID, channel_pid = undefined}) -> erlang:monitor(process, ChannelPid), - {keep_state, State#state{channel_pid = ChannelPid}, [{reply, From, ok}]}; + %% 更新主机为在线状态 + {ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE), + report_event(UUID, ?HOST_ONLINE), + lager:debug("[iot_host] host_id(attach_channel) uuid: ~p, will change status, affected_row: ~p", [UUID, AffectedRow]), + {keep_state, State#state{channel_pid = ChannelPid, has_session = true}, [{reply, From, ok}]}; +%% 主机没有激活 +handle_event({call, From}, {attach_channel, ChannelPid}, ?STATE_DENIED, State = #state{uuid = UUID, channel_pid = undefined}) -> + lager:notice("[iot_host] attach_channel host_id uuid: ~p, channel: ~p, host inactivated", [UUID, ChannelPid]), + erlang:monitor(process, ChannelPid), + {keep_state, State, [{reply, From, {error, <<"host inactivated">>}}]}; +%% 已经绑定了channel handle_event({call, From}, {attach_channel, _}, _, State = #state{uuid = UUID, channel_pid = OldChannelPid}) -> lager:notice("[iot_host] attach_channel host_id uuid: ~p, old channel exists: ~p", [UUID, OldChannelPid]), {keep_state, State, [{reply, From, {error, <<"channel existed">>}}]}; -%% 授权通过后,才能将主机的状态设置为在线状态 -handle_event({call, From}, {create_session, PubKey}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = Aes}) -> - Reply = #{<<"a">> => true, <<"aes">> => Aes}, - EncReply = iot_cipher_rsa:encode(Reply, PubKey), - {ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE), - report_event(UUID, ?HOST_ONLINE), - lager:debug("[iot_host] host_id(session) uuid: ~p, create_session, will change status, affected_row: ~p", [UUID, AffectedRow]), - {keep_state, State#state{has_session = true}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]}; - -handle_event({call, From}, {create_session, PubKey}, ?STATE_DENIED, State = #state{uuid = UUID}) -> - lager:debug("[iot_host] host_id(denied) uuid: ~p, create_session, will not change host status", [UUID]), - Reply = #{<<"a">> => false, <<"aes">> => <<"">>}, - EncReply = iot_cipher_rsa:encode(Reply, PubKey), - {keep_state, State#state{has_session = false}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]}; - %% 重新加载设备信息 handle_event({call, From}, {reload_device, DeviceUUID}, _, State) -> case iot_device_sup:ensured_device_started(DeviceUUID) of diff --git a/apps/iot/src/websocket/tcp_channel.erl b/apps/iot/src/websocket/tcp_channel.erl index 9ae6666..7eb770e 100644 --- a/apps/iot/src/websocket/tcp_channel.erl +++ b/apps/iot/src/websocket/tcp_channel.erl @@ -100,8 +100,16 @@ handle_info({tcp, Socket, <>), {noreply, State#state{uuid = UUID, host_pid = HostPid}}; - {error, Reason} -> - lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p", [UUID, Reason]), + {error, Reason} when is_binary(Reason) -> + AuthReplyBin = message_pb:encode_msg(#auth_reply{ + code = -1, + message = Reason, + repository_url = <<"">> + }), + Transport:send(Socket, <>), + + lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]), + {stop, State} end end; @@ -110,10 +118,6 @@ handle_info({tcp, Socket, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> - {ok, Reply} = iot_host:create_session(HostPid, PubKey), - {reply, {binary, <>}, State}; - handle_info({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> iot_host:handle(HostPid, {data, Data}), {ok, State}; diff --git a/apps/iot/src/websocket/ws_channel.erl b/apps/iot/src/websocket/ws_channel.erl deleted file mode 100644 index 943ec6c..0000000 --- a/apps/iot/src/websocket/ws_channel.erl +++ /dev/null @@ -1,174 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author licheng5 -%%% @copyright (C) 2021, -%%% @doc -%%% -%%% @end -%%% Created : 11. 1月 2021 上午12:17 -%%%------------------------------------------------------------------- --module(ws_channel). --author("licheng5"). --include("iot.hrl"). - -%% API --export([init/2]). --export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]). --export([publish/3, stop/2, send/2]). - --record(state, { - uuid :: undefined | binary(), - %% 用户进程id - host_pid = undefined, - %% 发送消息对应的id - packet_id = 1 :: integer(), - - %% 请求响应的对应关系 - inflight = #{} -}). - -%% 向通道中写入消息 --spec publish(Pid :: pid(), ReceiverPid :: pid(), Msg :: binary()) -> Ref :: reference(). -publish(Pid, ReceiverPid, Msg) when is_pid(Pid), is_binary(Msg) -> - Ref = make_ref(), - Pid ! {publish, ReceiverPid, Ref, Msg}, - Ref. - -%% 向通道中写入消息 --spec send(Pid :: pid(), Msg :: binary()) -> no_return(). -send(Pid, Msg) when is_pid(Pid), is_binary(Msg) -> - Pid ! {send, Msg}. - -%% 关闭方法 --spec stop(Pid :: pid(), Reason :: any()) -> no_return(). -stop(undefined, _Reason) -> - ok; -stop(Pid, Reason) when is_pid(Pid) -> - Pid ! {stop, Reason}. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% 逻辑处理方法 -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -init(Req, Opts) -> - {cowboy_websocket, Req, Opts}. - -websocket_init(_State) -> - lager:debug("[ws_channel] get a new connection"), - %% 初始状态为true - {ok, #state{packet_id = 1}}. - -websocket_handle({binary, <>}, State) -> - #{<<"uuid">> := UUID, <<"timestamp">> := Timestamp, <<"salt">> := Salt, <<"username">> := Username, <<"token">> := Token} = jiffy:decode(Data, [return_maps]), - lager:debug("[ws_channel] auth uuid: ~p, request message: ~p", [UUID, Data]), - case iot_auth:check(Username, Token, UUID, Salt, Timestamp) of - true -> - case host_bo:get_host_by_uuid(UUID) of - undefined -> - lager:warning("[ws_channel] uuid: ~p, user: ~p, host not found", [UUID, Username]), - {stop, State}; - {ok, _} -> - %% 尝试启动主机的服务进程 - {ok, HostPid} = iot_host_sup:ensured_host_started(UUID), - case iot_host:attach_channel(HostPid, self()) of - ok -> - %% 建立到host的monitor - erlang:monitor(process, HostPid), - Reply = jiffy:encode(#{<<"code">> => 1, <<"message">> => <<"ok">>}, [force_utf8]), - - {reply, {binary, <>}, State#state{uuid = UUID, host_pid = HostPid}}; - {error, Reason} -> - lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p", [UUID, Reason]), - {stop, State} - end - end; - false -> - lager:warning("[ws_channel] uuid: ~p, user: ~p, auth failed", [UUID, Username]), - {stop, State} - end; - -websocket_handle({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> - {ok, Reply} = iot_host:create_session(HostPid, PubKey), - {reply, {binary, <>}, State}; - -websocket_handle({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> - iot_host:handle(HostPid, {data, Data}), - {ok, State}; - -websocket_handle({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> - iot_host:handle(HostPid, {ping, CipherMetric}), - {ok, State}; - -websocket_handle({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> - iot_host:handle(HostPid, {inform, CipherInfo}), - {ok, State}; - -websocket_handle({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> - iot_host:handle(HostPid, {feedback_step, CipherInfo}), - {ok, State}; - -websocket_handle({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> - iot_host:handle(HostPid, {feedback_result, CipherInfo}), - {ok, State}; - -websocket_handle({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> - iot_host:handle(HostPid, {event, CipherEvent}), - {ok, State}; - -websocket_handle({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> - iot_host:handle(HostPid, {ai_event, CipherEvent}), - {ok, State}; - -%% 主机端的消息响应 -websocket_handle({binary, <>}, State = #state{uuid = UUID}) -> - lager:debug("[ws_channel] uuid: ~p, get send response message: ~p", [UUID, Body]), - {ok, State}; -websocket_handle({binary, <>}, State = #state{uuid = UUID, inflight = Inflight}) when PacketId > 0 -> - lager:debug("[ws_channel] uuid: ~p, get publish response message: ~p, packet_id: ~p", [UUID, Body, PacketId]), - case maps:take(PacketId, Inflight) of - error -> - lager:warning("[ws_channel] get unknown publish response message: ~p, packet_id: ~p", [Body, PacketId]), - {ok, State}; - {{ReceiverPid, Ref}, NInflight} -> - case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of - true when Body == <<>> -> - ReceiverPid ! {ws_response, Ref}; - true -> - ReceiverPid ! {ws_response, Ref, Body}; - false -> - lager:warning("[ws_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Body, PacketId]) - end, - {ok, State#state{inflight = NInflight}} - end; - -websocket_handle(Info, State) -> - lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]), - {stop, State}. - -%% 处理关闭信号 -websocket_info({stop, Reason}, State) -> - lager:debug("[ws_channel] the channel will be closed with reason: ~p", [Reason]), - {stop, State}; - -%% 发送消息 -websocket_info({publish, ReceiverPid, Ref, Msg}, State = #state{packet_id = PacketId, inflight = Inflight}) when is_binary(Msg) -> - NInflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight), - {reply, {binary, <>}, State#state{packet_id = PacketId + 1, inflight = NInflight}}; - -%% 发送消息, 不需要等待回复 -websocket_info({send, Msg}, State) when is_binary(Msg) -> - {reply, {binary, <>}, State}; - -%% 用户进程关闭,则关闭通道 -websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) -> - lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]), - {stop, State}; - -%% 处理其他未知消息 -websocket_info(Info, State = #state{uuid = UUID}) -> - lager:debug("[ws_channel] channel get unknown info: ~p, uuid: ~p", [Info, UUID]), - {ok, State}. - -%% 进程关闭事件 -terminate(Reason, _Req, State) -> - lager:debug("[ws_channel] channel close with reason: ~p, state is: ~p", [Reason, State]), - ok. \ No newline at end of file