%%%------------------------------------------------------------------- %%% @author licheng5 %%% @copyright (C) 2021, %%% @doc %%% %%% @end %%% Created : 11. 1月 2021 上午12:17 %%%------------------------------------------------------------------- -module(tcp_channel). -author("licheng5"). -include("iot.hrl"). -include("message_pb.hrl"). %% API -export([pub/3, async_call/4, command/3]). -export([stop/2]). -export([start_link/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). -record(state, { transport, socket, uuid :: undefined | binary(), %% 用户进程id host_pid = undefined, %% 发送消息对应的id packet_id = 1 :: integer(), %% 请求响应的对应关系 inflight = #{} }). %% 向通道中写入消息 -spec pub(Pid :: pid(), Topic :: binary(), Content :: binary()) -> no_return(). pub(Pid, Topic, Content) when is_pid(Pid), is_binary(Topic), is_binary(Content) -> gen_server:cast(Pid, {pub, Topic, Content}). %% 向通道中写入消息 -spec command(Pid :: pid(), CommandType :: integer(), Command :: binary()) -> no_return(). command(Pid, CommandType, Command) when is_pid(Pid), is_integer(CommandType), is_binary(Command) -> gen_server:cast(Pid, {command, CommandType, Command}). %% 向通道中写入消息 -spec async_call(Pid :: pid(), ReceiverPid :: pid(), CallType :: integer(), CallBin :: binary()) -> Ref :: reference(). async_call(Pid, ReceiverPid, CallType, CallBin) when is_pid(Pid), is_pid(ReceiverPid), is_integer(CallType), is_binary(CallBin) -> Ref = make_ref(), gen_server:cast(Pid, {async_call, ReceiverPid, Ref, CallType, CallBin}), Ref. %% 关闭方法 -spec stop(Pid :: pid(), Reason :: any()) -> no_return(). stop(undefined, _Reason) -> ok; stop(Pid, Reason) when is_pid(Pid) -> gen_server:stop(Pid, Reason, 5000). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% 逻辑处理方法 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% start_link(Transport, Sock) -> {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock]])}. init([Transport, Sock]) -> lager:debug("[sdlan_channel] get a new connection: ~p", [Sock]), case Transport:wait(Sock) of {ok, NewSock} -> Transport:setopts(Sock, [{active, true}]), % erlang:start_timer(?PING_TICKER, self(), ping_ticker), gen_server:enter_loop(?MODULE, [], #state{transport = Transport, socket = NewSock}); {error, Reason} -> {stop, Reason} end. handle_call(_Request, _From, State) -> {reply, ok, State}. %% 发送消息, 基于pub/sub机制 handle_cast({pub, Topic, Content}, State = #state{transport = Transport, socket = Socket}) -> PubBin = message_pb:encode_msg(#pub{topic = Topic, content = Content}), Transport:send(Socket, <>), {noreply, State}; %% 发送Command消息 handle_cast({command, CommandType, Command}, State = #state{transport = Transport, socket = Socket}) -> Transport:send(Socket, <>), {noreply, State}; %% 推送消息 handle_cast({async_call, ReceiverPid, Ref, CallType, CallBin}, State = #state{transport = Transport, socket = Socket, packet_id = PacketId, inflight = Inflight}) -> Transport:send(Socket, <>), {noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}. %% auth验证 handle_info({tcp, Socket, <>}, State = #state{transport = Transport, socket = Socket}) -> #auth_request{ uuid = UUID, username = Username, token = Token, salt = Salt, timestamp = Timestamp } = message_pb:decode_msg(AuthRequestBin, auth_request), lager:debug("[ws_channel] auth uuid: ~p", [UUID]), case iot_auth:check(Username, Token, UUID, Salt, Timestamp) of true -> case host_bo:get_host_by_uuid(UUID) of undefined -> lager:warning("[ws_channel] uuid: ~p, user: ~p, host not found", [UUID, Username]), {stop, State}; {ok, _} -> %% 尝试启动主机的服务进程 {ok, HostPid} = iot_host_sup:ensured_host_started(UUID), case iot_host:attach_channel(HostPid, self()) of ok -> %% 建立到host的monitor erlang:monitor(process, HostPid), AuthReplyBin = message_pb:encode_msg(#auth_reply{code = 0, message = <<"ok">>}), Transport:send(Socket, <>), {noreply, State#state{uuid = UUID, host_pid = HostPid}}; {denied, Reason} when is_binary(Reason) -> erlang:monitor(process, HostPid), AuthReplyBin = message_pb:encode_msg(#auth_reply{code = 1, message = Reason}), Transport:send(Socket, <>), lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]), {noreply, State#state{uuid = UUID, host_pid = HostPid}}; {error, Reason} when is_binary(Reason) -> AuthReplyBin = message_pb:encode_msg(#auth_reply{code = 2, message = Reason}), Transport:send(Socket, <>), lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]), {stop, State} end end; false -> lager:warning("[ws_channel] uuid: ~p, user: ~p, auth failed", [UUID, Username]), {stop, State} end; handle_info({tcp, Socket, <>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) -> Data = message_pb:decode_msg(Data0, data), iot_host:handle(HostPid, {data, Data}), {noreply, State}; handle_info({tcp, Socket, <>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) -> Ping = message_pb:decode_msg(PingData, ping), iot_host:handle(HostPid, {ping, Ping}), {noreply, State}; handle_info({tcp, Socket, <>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) -> ServiceInform = message_pb:decode_msg(InformData, service_inform), iot_host:handle(HostPid, {inform, ServiceInform}), {noreply, State}; handle_info({tcp, Socket, <>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) -> Event = message_pb:decode_msg(EventData, event), iot_host:handle(HostPid, {event, Event}), {noreply, State}; %% 主机端的消息响应 handle_info({tcp, Socket, <>}, State = #state{socket = Socket, uuid = UUID, inflight = Inflight}) when PacketId > 0 -> AsyncCallReply = message_pb:decode_msg(ResponseBin, async_call_reply), lager:debug("[ws_channel] uuid: ~p, get async_call_reply: ~p, packet_id: ~p", [UUID, AsyncCallReply, PacketId]), case maps:take(PacketId, Inflight) of error -> lager:warning("[ws_channel] get unknown async_call_reply message: ~p, packet_id: ~p", [AsyncCallReply, PacketId]), {noreply, State}; {{ReceiverPid, Ref}, NInflight} -> case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of true -> ReceiverPid ! {async_call_reply, Ref, AsyncCallReply}; false -> lager:warning("[ws_channel] get async_call_reply message: ~p, packet_id: ~p, but receiver_pid is deaded", [AsyncCallReply, PacketId]) end, {noreply, State#state{inflight = NInflight}} end; handle_info({tcp_error, Sock, Reason}, State = #state{socket = Sock}) -> lager:notice("[sdlan_channel] tcp_error: ~p", [Reason]), {stop, normal, State}; handle_info({tcp_closed, Sock}, State = #state{socket = Sock}) -> lager:notice("[sdlan_channel] tcp_closed"), {stop, normal, State}; %% 关闭当前通道 handle_info({stop, Reason}, State) -> {stop, Reason, State}; %% 主机进程挂掉的时候,需要关闭掉链接 handle_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) -> lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]), {stop, State}; handle_info(Info, State) -> lager:warning("[sdlan_channel] get a unknown message: ~p, channel will closed", [Info]), {noreply, State}. terminate(Reason, #state{}) -> lager:warning("[sdlan_channel] stop with reason: ~p", [Reason]), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}.