%%%------------------------------------------------------------------- %%% @author licheng5 %%% @copyright (C) 2020, %%% @doc %%% %%% @end %%% Created : 10. 12月 2020 上午11:17 %%%------------------------------------------------------------------- -module(tcp_channel). -author("licheng5"). -behaviour(gen_server). %% 心跳包监测机制 -define(PING_TICKER, 15000). %% API -export([start_link/2]). -export([publish_command/4, send_event/3, stop/2, move_network/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, { transport, socket, %% 标记是否已经注册 is_registered = false, %% 记录ping的次数 ping_counter = 0, %% 发送消息对应的id packet_id = 1 :: integer(), %% 请求响应的对应关系 inflight = #{} }). %% 向通道中写入消息 -spec publish_command(Pid :: pid(), ReceiverPid :: pid(), CommandType :: integer(), Msg :: binary()) -> Ref :: reference(). publish_command(Pid, ReceiverPid, CommandType, Msg) when is_pid(Pid), is_pid(ReceiverPid), is_integer(CommandType), is_binary(Msg) -> Ref = make_ref(), Pid ! {publish_command, ReceiverPid, Ref, CommandType, Msg}, Ref. %% 网络迁移是一种特殊的指令信息,需要单独处理 -spec move_network(Pid :: pid(), ReceiverPid :: pid(), NetworkPid :: pid()) -> Ref :: reference(). move_network(Pid, ReceiverPid, NetworkPid) when is_pid(Pid), is_pid(ReceiverPid), is_pid(NetworkPid) -> Ref = make_ref(), Pid ! {move_network, ReceiverPid, Ref, NetworkPid}, Ref. %% 向通道中写入消息 -spec send_event(Pid :: pid(), EventType :: integer(), Event :: binary()) -> no_return(). send_event(Pid, EventType, Event) when is_pid(Pid), is_integer(EventType), is_binary(Event) -> Pid ! {send_event, EventType, Event}. %% 关闭方法 -spec stop(Pid :: pid(), Reason :: any()) -> no_return(). stop(undefined, _Reason) -> ok; stop(Pid, Reason) when is_pid(Pid) -> Pid ! {stop, Reason}. %%-------------------------------------------------------------------- %% esockd callback %%-------------------------------------------------------------------- start_link(Transport, Sock) -> {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock]])}. init([Transport, Sock]) -> efka_logger:debug("[sdlan_channel] get a new connection: ~p", [Sock]), case Transport:wait(Sock) of {ok, NewSock} -> Transport:setopts(Sock, [{active, true}]), erlang:start_timer(?PING_TICKER, self(), ping_ticker), gen_server:enter_loop(?MODULE, [], #state{transport = Transport, socket = NewSock}); {error, Reason} -> {stop, Reason} end. handle_call(_Request, _From, State) -> {reply, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. %% 网络流量统计 handle_info({tcp, _Sock, <>}, State = #state{is_registered = true}) -> % #sdl_flows{forward_num = ForwardNum, p2p_num = P2PNum, inbound_num = InboundNum} = sdlan_pb:decode_msg(Body, sdl_flows), efka_logger:debug("[sdlan_channel] body: ~p", [Body]), {noreply, State}; handle_info({tcp_error, Sock, Reason}, State = #state{socket = Sock}) -> efka_logger:notice("[sdlan_channel] tcp_error: ~p", [Reason]), {stop, normal, State}; handle_info({tcp_closed, Sock}, State = #state{socket = Sock}) -> efka_logger:notice("[sdlan_channel] tcp_closed", []), {stop, normal, State}; %% 关闭当前通道 handle_info({stop, Reason}, State) -> {stop, Reason, State}; handle_info(Info, State) -> efka_logger:warning("[sdlan_channel] get a unknown message: ~p, channel will closed", [Info]), {noreply, State}. terminate(Reason, #state{}) -> efka_logger:warning("[sdlan_channel] stop with reason: ~p", [Reason]), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% helper methods %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%