From 4bd1f26a35cc08c455d84b9f3f9b8a6df1744680 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sat, 19 Apr 2025 14:26:43 +0800 Subject: [PATCH] add tcp channel --- apps/efka/src/efka_app.erl | 2 +- apps/efka/src/tcp_channel.erl | 116 ++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 apps/efka/src/tcp_channel.erl diff --git a/apps/efka/src/efka_app.erl b/apps/efka/src/efka_app.erl index d96df0d..b4845aa 100644 --- a/apps/efka/src/efka_app.erl +++ b/apps/efka/src/efka_app.erl @@ -41,6 +41,6 @@ start_tcp_server() -> {acceptors, 10}, {max_connections, 1024} ], - {ok, _} = esockd:open('efka/tcp_server', Port, TransOpts, {sdlan_channel, start_link, []}), + {ok, _} = esockd:open('efka/tcp_server', Port, TransOpts, {tcp_channel, start_link, []}), lager:debug("[efka_app] the tcp server start at: ~p", [Port]). \ No newline at end of file diff --git a/apps/efka/src/tcp_channel.erl b/apps/efka/src/tcp_channel.erl new file mode 100644 index 0000000..677be7a --- /dev/null +++ b/apps/efka/src/tcp_channel.erl @@ -0,0 +1,116 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2020, +%%% @doc +%%% +%%% @end +%%% Created : 10. 12月 2020 上午11:17 +%%%------------------------------------------------------------------- +-module(tcp_channel). +-author("licheng5"). +-behaviour(gen_server). + +%% 心跳包监测机制 +-define(PING_TICKER, 15000). + +%% API +-export([start_link/2]). +-export([publish_command/4, send_event/3, stop/2, move_network/3]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-record(state, { + transport, + socket, + %% 标记是否已经注册 + is_registered = false, + %% 记录ping的次数 + ping_counter = 0, + %% 发送消息对应的id + packet_id = 1 :: integer(), + %% 请求响应的对应关系 + inflight = #{} +}). + +%% 向通道中写入消息 +-spec publish_command(Pid :: pid(), ReceiverPid :: pid(), CommandType :: integer(), Msg :: binary()) -> Ref :: reference(). +publish_command(Pid, ReceiverPid, CommandType, Msg) when is_pid(Pid), is_pid(ReceiverPid), is_integer(CommandType), is_binary(Msg) -> + Ref = make_ref(), + Pid ! {publish_command, ReceiverPid, Ref, CommandType, Msg}, + Ref. + +%% 网络迁移是一种特殊的指令信息,需要单独处理 +-spec move_network(Pid :: pid(), ReceiverPid :: pid(), NetworkPid :: pid()) -> Ref :: reference(). +move_network(Pid, ReceiverPid, NetworkPid) when is_pid(Pid), is_pid(ReceiverPid), is_pid(NetworkPid) -> + Ref = make_ref(), + Pid ! {move_network, ReceiverPid, Ref, NetworkPid}, + Ref. + +%% 向通道中写入消息 +-spec send_event(Pid :: pid(), EventType :: integer(), Event :: binary()) -> no_return(). +send_event(Pid, EventType, Event) when is_pid(Pid), is_integer(EventType), is_binary(Event) -> + Pid ! {send_event, EventType, Event}. + +%% 关闭方法 +-spec stop(Pid :: pid(), Reason :: any()) -> no_return(). +stop(undefined, _Reason) -> + ok; +stop(Pid, Reason) when is_pid(Pid) -> + Pid ! {stop, Reason}. + +%%-------------------------------------------------------------------- +%% esockd callback +%%-------------------------------------------------------------------- + +start_link(Transport, Sock) -> + {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock]])}. + +init([Transport, Sock]) -> + lager:debug("[sdlan_channel] get a new connection: ~p", [Sock]), + case Transport:wait(Sock) of + {ok, NewSock} -> + Transport:setopts(Sock, [{active, true}]), + erlang:start_timer(?PING_TICKER, self(), ping_ticker), + gen_server:enter_loop(?MODULE, [], #state{transport = Transport, socket = NewSock}); + {error, Reason} -> + {stop, Reason} + end. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +%% 网络流量统计 +handle_info({tcp, _Sock, <>}, State = #state{is_registered = true}) -> + % #sdl_flows{forward_num = ForwardNum, p2p_num = P2PNum, inbound_num = InboundNum} = sdlan_pb:decode_msg(Body, sdl_flows), + lager:debug("[sdlan_channel] body: ~p", [Body]), + {noreply, State}; + +handle_info({tcp_error, Sock, Reason}, State = #state{socket = Sock}) -> + lager:notice("[sdlan_channel] tcp_error: ~p", [Reason]), + {stop, normal, State}; + +handle_info({tcp_closed, Sock}, State = #state{socket = Sock}) -> + lager:notice("[sdlan_channel] tcp_closed", []), + {stop, normal, State}; + +%% 关闭当前通道 +handle_info({stop, Reason}, State) -> + {stop, Reason, State}; + +handle_info(Info, State) -> + lager:warning("[sdlan_channel] get a unknown message: ~p, channel will closed", [Info]), + {noreply, State}. + +terminate(Reason, #state{}) -> + lager:warning("[sdlan_channel] stop with reason: ~p", [Reason]), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% \ No newline at end of file