%%%------------------------------------------------------------------- %%% @author licheng5 %%% @copyright (C) 2021, %%% @doc %%% %%% @end %%% Created : 11. 1月 2021 上午12:17 %%%------------------------------------------------------------------- -module(tcp_channel). -author("licheng5"). -include("message.hrl"). -behaviour(ranch_protocol). %% API -export([pub/3, jsonrpc_call/3, command/3]). -export([start_link/3, stop/2]). %% gen_server callbacks -export([init/3, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). -record(state, { transport, socket, uuid :: undefined | binary(), %% 用户进程id host_pid = undefined, %% 发送消息对应的id packet_id = 1 :: integer(), %% 请求响应的对应关系 inflight = #{} }). %% 向通道中写入消息 -spec pub(Pid :: pid(), Topic :: binary(), Content :: binary()) -> no_return(). pub(Pid, Topic, Content) when is_pid(Pid), is_binary(Topic), is_binary(Content) -> gen_server:cast(Pid, {pub, Topic, Content}). %% 向通道中写入消息 -spec command(Pid :: pid(), CommandType :: integer(), Command :: binary()) -> no_return(). command(Pid, CommandType, Command) when is_pid(Pid), is_integer(CommandType), is_binary(Command) -> gen_server:cast(Pid, {command, CommandType, Command}). %% 向通道中写入消息 -spec jsonrpc_call(Pid :: pid(), ReceiverPid :: pid(), CallBin :: binary()) -> Ref :: reference(). jsonrpc_call(Pid, ReceiverPid, CallBin) when is_pid(Pid), is_pid(ReceiverPid), is_binary(CallBin) -> Ref = make_ref(), gen_server:cast(Pid, {jsonrpc_call, ReceiverPid, Ref, CallBin}), Ref. %% 关闭方法 -spec stop(Pid :: pid(), Reason :: any()) -> no_return(). stop(undefined, _Reason) -> ok; stop(Pid, Reason) when is_pid(Pid) -> gen_server:stop(Pid, Reason, 5000). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% 逻辑处理方法 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% start_link(Ref, Transport, Opts) -> {ok, proc_lib:spawn_link(?MODULE, init, [Ref, Transport, Opts])}. init(Ref, Transport, _Opts = []) -> {ok, Socket} = ranch:handshake(Ref), lager:debug("[sdlan_channel] get a new connection: ~p", [Socket]), Transport:setopts(Socket, [binary, {active, true}, {packet, 4}]), % erlang:start_timer(?PING_TICKER, self(), ping_ticker), gen_server:enter_loop(?MODULE, [], #state{transport = Transport, socket = Socket}). handle_call(_Request, _From, State) -> {reply, ok, State}. %% 发送消息, 基于pub/sub机制 handle_cast({pub, Topic, Content}, State = #state{transport = Transport, socket = Socket}) -> EncPub = message_codec:encode(?MESSAGE_PUB, #pub{topic = Topic, content = Content}), Transport:send(Socket, <>), {noreply, State}; %% 发送Command消息 handle_cast({command, CommandType, Command}, State = #state{transport = Transport, socket = Socket}) -> EncCommand = message_codec:encode(?MESSAGE_COMMAND, #command{command_type = CommandType, command = Command}), Transport:send(Socket, <>), {noreply, State}; %% 推送消息 handle_cast({jsonrpc_call, ReceiverPid, Ref, CallBin}, State = #state{transport = Transport, socket = Socket, packet_id = PacketId, inflight = Inflight}) -> Transport:send(Socket, <>), {noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}. %% auth验证 handle_info({tcp, Socket, <>}, State = #state{transport = Transport, socket = Socket}) -> {ok, #auth_request{uuid = UUID, username = Username, token = Token, salt = Salt, timestamp = Timestamp}} = message_codec:decode(RequestBin), lager:debug("[ws_channel] auth uuid: ~p", [UUID]), case iot_auth:check(Username, Token, UUID, Salt, Timestamp) of true -> case iot_api:get_host_by_uuid(UUID) of undefined -> lager:warning("[ws_channel] uuid: ~p, user: ~p, host not found", [UUID, Username]), {stop, State}; {ok, _} -> %% 尝试启动主机的服务进程 {ok, HostPid} = iot_host_sup:ensured_host_started(UUID), case iot_host:attach_channel(HostPid, self()) of ok -> %% 建立到host的monitor erlang:monitor(process, HostPid), AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 0, payload = <<"ok">>}), Transport:send(Socket, <>), {noreply, State#state{uuid = UUID, host_pid = HostPid}}; {denied, Reason} when is_binary(Reason) -> erlang:monitor(process, HostPid), AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 1, payload = Reason}), Transport:send(Socket, <>), lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]), {noreply, State#state{uuid = UUID, host_pid = HostPid}}; {error, Reason} when is_binary(Reason) -> AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 2, payload = Reason}), Transport:send(Socket, <>), lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]), {stop, State} end end; false -> lager:warning("[ws_channel] uuid: ~p, user: ~p, auth failed", [UUID, Username]), {stop, State} end; handle_info({tcp, Socket, <>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) -> {ok, CastMessage} = message_codec:decode(CastBin), case CastMessage of #data{} = Data -> iot_host:handle(HostPid, {data, Data}); #event{} = Event -> iot_host:handle(HostPid, {event, Event}); #task_event_stream{task_id = TaskId, type = <<"close">>, stream = Reason} -> iot_event_stream_observer:stream_close(TaskId, Reason); #task_event_stream{task_id = TaskId, type = Type, stream = Stream} -> lager:debug("[tcp_channel] get task_id: ~p, type: ~ts, stream: ~ts", [TaskId, Type, Stream]), iot_event_stream_observer:stream_data(TaskId, Type, Stream) end, {noreply, State}; %handle_info({tcp, Socket, <>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) -> % Ping = message_pb:decode_msg(PingData, ping), % iot_host:handle(HostPid, {ping, Ping}), % {noreply, State}; %% 主机端的消息响应 handle_info({tcp, Socket, <>}, State = #state{socket = Socket, uuid = UUID, inflight = Inflight}) when PacketId > 0 -> {ok, RpcReply} = message_codec:decode(ResponseBin), case maps:take(PacketId, Inflight) of error -> {noreply, State}; {{ReceiverPid, Ref}, NInflight} -> case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of true -> ReceiverPid ! {jsonrpc_reply, Ref, RpcReply}; false -> lager:warning("[ws_channel] get async_call_reply message: ~p, packet_id: ~p, but receiver_pid is deaded", [RpcReply, PacketId]) end, {noreply, State#state{inflight = NInflight}} end; handle_info({tcp_error, Sock, Reason}, State = #state{socket = Sock}) -> lager:notice("[sdlan_channel] tcp_error: ~p", [Reason]), {stop, normal, State}; handle_info({tcp_closed, Sock}, State = #state{socket = Sock}) -> lager:notice("[sdlan_channel] tcp_closed"), {stop, normal, State}; %% 关闭当前通道 handle_info({stop, Reason}, State) -> {stop, Reason, State}; %% 主机进程挂掉的时候,需要关闭掉链接 handle_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) -> lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]), {stop, State}; handle_info(Info, State) -> lager:warning("[sdlan_channel] get a unknown message: ~p, channel will closed, state: ~p", [Info, State]), {noreply, State}. terminate(Reason, #state{}) -> lager:warning("[sdlan_channel] stop with reason: ~p", [Reason]), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}.