%%%------------------------------------------------------------------- %%% @author licheng5 %%% @copyright (C) 2021, %%% @doc %%% %%% @end %%% Created : 11. 1月 2021 上午12:17 %%%------------------------------------------------------------------- -module(tcp_channel). -author("licheng5"). -include("iot.hrl"). -include("message_pb.hrl"). %% API -export([publish/3, stop/2, send/2]). -export([start_link/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, terminate/2]). -record(state, { transport, socket, uuid :: undefined | binary(), %% 用户进程id host_pid = undefined, %% 发送消息对应的id packet_id = 1 :: integer(), %% 请求响应的对应关系 inflight = #{} }). %% 向通道中写入消息 -spec publish(Pid :: pid(), ReceiverPid :: pid(), Msg :: binary()) -> Ref :: reference(). publish(Pid, ReceiverPid, Msg) when is_pid(Pid), is_binary(Msg) -> Ref = make_ref(), Pid ! {publish, ReceiverPid, Ref, Msg}, Ref. %% 向通道中写入消息 -spec send(Pid :: pid(), Msg :: binary()) -> no_return(). send(Pid, Msg) when is_pid(Pid), is_binary(Msg) -> Pid ! {send, Msg}. %% 关闭方法 -spec stop(Pid :: pid(), Reason :: any()) -> no_return(). stop(undefined, _Reason) -> ok; stop(Pid, Reason) when is_pid(Pid) -> Pid ! {stop, Reason}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% 逻辑处理方法 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% start_link(Transport, Sock) -> {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock]])}. init([Transport, Sock]) -> lager:debug("[sdlan_channel] get a new connection: ~p", [Sock]), case Transport:wait(Sock) of {ok, NewSock} -> Transport:setopts(Sock, [{active, true}]), % erlang:start_timer(?PING_TICKER, self(), ping_ticker), gen_server:enter_loop(?MODULE, [], #state{transport = Transport, socket = NewSock}); {error, Reason} -> {stop, Reason} end. handle_call(_Request, _From, State) -> {reply, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. %% auth验证 handle_info({tcp, Socket, <>}, State = #state{transport = Transport, socket = Socket}) -> #auth_request{ uuid = UUID, username = Username, token = Token, salt = Salt, timestamp = Timestamp } = message_pb:decode_msg(AuthRequestBin, auth_request), lager:debug("[ws_channel] auth uuid: ~p, request message: ~p", [UUID]), case iot_auth:check(Username, Token, UUID, Salt, Timestamp) of true -> case host_bo:get_host_by_uuid(UUID) of undefined -> lager:warning("[ws_channel] uuid: ~p, user: ~p, host not found", [UUID, Username]), {stop, State}; {ok, _} -> %% 尝试启动主机的服务进程 {ok, HostPid} = iot_host_sup:ensured_host_started(UUID), case iot_host:attach_channel(HostPid, self()) of ok -> %% 建立到host的monitor erlang:monitor(process, HostPid), AuthReplyBin = message_pb:encode_msg(#auth_reply{ code = 1, message = <<"ok">>, repository_url = <<"https://www.baidu.com">> }), Transport:send(Socket, <>), {noreply, State#state{uuid = UUID, host_pid = HostPid}}; {error, Reason} -> lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p", [UUID, Reason]), {stop, State} end end; false -> lager:warning("[ws_channel] uuid: ~p, user: ~p, auth failed", [UUID, Username]), {stop, State} end; handle_info({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> {ok, Reply} = iot_host:create_session(HostPid, PubKey), {reply, {binary, <>}, State}; handle_info({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> iot_host:handle(HostPid, {data, Data}), {ok, State}; handle_info({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> iot_host:handle(HostPid, {ping, CipherMetric}), {ok, State}; handle_info({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> iot_host:handle(HostPid, {inform, CipherInfo}), {ok, State}; handle_info({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> iot_host:handle(HostPid, {feedback_step, CipherInfo}), {ok, State}; handle_info({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> iot_host:handle(HostPid, {feedback_result, CipherInfo}), {ok, State}; handle_info({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> iot_host:handle(HostPid, {event, CipherEvent}), {ok, State}; handle_info({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> iot_host:handle(HostPid, {ai_event, CipherEvent}), {ok, State}; %% 主机端的消息响应 handle_info({binary, <>}, State = #state{uuid = UUID}) -> lager:debug("[ws_channel] uuid: ~p, get send response message: ~p", [UUID, Body]), {ok, State}; handle_info({binary, <>}, State = #state{uuid = UUID, inflight = Inflight}) when PacketId > 0 -> lager:debug("[ws_channel] uuid: ~p, get publish response message: ~p, packet_id: ~p", [UUID, Body, PacketId]), case maps:take(PacketId, Inflight) of error -> lager:warning("[ws_channel] get unknown publish response message: ~p, packet_id: ~p", [Body, PacketId]), {ok, State}; {{ReceiverPid, Ref}, NInflight} -> case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of true when Body == <<>> -> ReceiverPid ! {ws_response, Ref}; true -> ReceiverPid ! {ws_response, Ref, Body}; false -> lager:warning("[ws_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Body, PacketId]) end, {ok, State#state{inflight = NInflight}} end; handle_info({tcp, Socket, Data}, State = #state{transport = Transport, socket = Socket}) -> lager:debug("[ws_channel] socket: ~p, get a message: ~p", [Socket, Data]), Transport:send(Socket, <<"hello world reply">>), {noreply, State}; handle_info(Info, State) -> lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]), {stop, State}; handle_info({tcp_error, Sock, Reason}, State = #state{socket = Sock}) -> lager:notice("[sdlan_channel] tcp_error: ~p", [Reason]), {stop, normal, State}; handle_info({tcp_closed, Sock}, State = #state{socket = Sock}) -> lager:notice("[sdlan_channel] tcp_closed"), {stop, normal, State}; %% 关闭当前通道 handle_info({stop, Reason}, State) -> {stop, Reason, State}; handle_info(Info, State) -> lager:warning("[sdlan_channel] get a unknown message: ~p, channel will closed", [Info]), {noreply, State}. terminate(Reason, #state{}) -> lager:warning("[sdlan_channel] stop with reason: ~p", [Reason]), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. %% 处理关闭信号 websocket_info({stop, Reason}, State) -> lager:debug("[ws_channel] the channel will be closed with reason: ~p", [Reason]), {stop, State}; %% 发送消息 websocket_info({publish, ReceiverPid, Ref, Msg}, State = #state{packet_id = PacketId, inflight = Inflight}) when is_binary(Msg) -> NInflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight), {reply, {binary, <>}, State#state{packet_id = PacketId + 1, inflight = NInflight}}; %% 发送消息, 不需要等待回复 websocket_info({send, Msg}, State) when is_binary(Msg) -> {reply, {binary, <>}, State}; %% 用户进程关闭,则关闭通道 websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) -> lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]), {stop, State}; %% 处理其他未知消息 websocket_info(Info, State = #state{uuid = UUID}) -> lager:debug("[ws_channel] channel get unknown info: ~p, uuid: ~p", [Info, UUID]), {ok, State}.