add tcp channel
This commit is contained in:
parent
aa78ccee0a
commit
4bd1f26a35
@ -41,6 +41,6 @@ start_tcp_server() ->
|
||||
{acceptors, 10},
|
||||
{max_connections, 1024}
|
||||
],
|
||||
{ok, _} = esockd:open('efka/tcp_server', Port, TransOpts, {sdlan_channel, start_link, []}),
|
||||
{ok, _} = esockd:open('efka/tcp_server', Port, TransOpts, {tcp_channel, start_link, []}),
|
||||
|
||||
lager:debug("[efka_app] the tcp server start at: ~p", [Port]).
|
||||
116
apps/efka/src/tcp_channel.erl
Normal file
116
apps/efka/src/tcp_channel.erl
Normal file
@ -0,0 +1,116 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author licheng5
|
||||
%%% @copyright (C) 2020, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 10. 12月 2020 上午11:17
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(tcp_channel).
|
||||
-author("licheng5").
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% 心跳包监测机制
|
||||
-define(PING_TICKER, 15000).
|
||||
|
||||
%% API
|
||||
-export([start_link/2]).
|
||||
-export([publish_command/4, send_event/3, stop/2, move_network/3]).
|
||||
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-record(state, {
|
||||
transport,
|
||||
socket,
|
||||
%% 标记是否已经注册
|
||||
is_registered = false,
|
||||
%% 记录ping的次数
|
||||
ping_counter = 0,
|
||||
%% 发送消息对应的id
|
||||
packet_id = 1 :: integer(),
|
||||
%% 请求响应的对应关系
|
||||
inflight = #{}
|
||||
}).
|
||||
|
||||
%% 向通道中写入消息
|
||||
-spec publish_command(Pid :: pid(), ReceiverPid :: pid(), CommandType :: integer(), Msg :: binary()) -> Ref :: reference().
|
||||
publish_command(Pid, ReceiverPid, CommandType, Msg) when is_pid(Pid), is_pid(ReceiverPid), is_integer(CommandType), is_binary(Msg) ->
|
||||
Ref = make_ref(),
|
||||
Pid ! {publish_command, ReceiverPid, Ref, CommandType, Msg},
|
||||
Ref.
|
||||
|
||||
%% 网络迁移是一种特殊的指令信息,需要单独处理
|
||||
-spec move_network(Pid :: pid(), ReceiverPid :: pid(), NetworkPid :: pid()) -> Ref :: reference().
|
||||
move_network(Pid, ReceiverPid, NetworkPid) when is_pid(Pid), is_pid(ReceiverPid), is_pid(NetworkPid) ->
|
||||
Ref = make_ref(),
|
||||
Pid ! {move_network, ReceiverPid, Ref, NetworkPid},
|
||||
Ref.
|
||||
|
||||
%% 向通道中写入消息
|
||||
-spec send_event(Pid :: pid(), EventType :: integer(), Event :: binary()) -> no_return().
|
||||
send_event(Pid, EventType, Event) when is_pid(Pid), is_integer(EventType), is_binary(Event) ->
|
||||
Pid ! {send_event, EventType, Event}.
|
||||
|
||||
%% 关闭方法
|
||||
-spec stop(Pid :: pid(), Reason :: any()) -> no_return().
|
||||
stop(undefined, _Reason) ->
|
||||
ok;
|
||||
stop(Pid, Reason) when is_pid(Pid) ->
|
||||
Pid ! {stop, Reason}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% esockd callback
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
start_link(Transport, Sock) ->
|
||||
{ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock]])}.
|
||||
|
||||
init([Transport, Sock]) ->
|
||||
lager:debug("[sdlan_channel] get a new connection: ~p", [Sock]),
|
||||
case Transport:wait(Sock) of
|
||||
{ok, NewSock} ->
|
||||
Transport:setopts(Sock, [{active, true}]),
|
||||
erlang:start_timer(?PING_TICKER, self(), ping_ticker),
|
||||
gen_server:enter_loop(?MODULE, [], #state{transport = Transport, socket = NewSock});
|
||||
{error, Reason} ->
|
||||
{stop, Reason}
|
||||
end.
|
||||
|
||||
handle_call(_Request, _From, State) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% 网络流量统计
|
||||
handle_info({tcp, _Sock, <<Body/binary>>}, State = #state{is_registered = true}) ->
|
||||
% #sdl_flows{forward_num = ForwardNum, p2p_num = P2PNum, inbound_num = InboundNum} = sdlan_pb:decode_msg(Body, sdl_flows),
|
||||
lager:debug("[sdlan_channel] body: ~p", [Body]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info({tcp_error, Sock, Reason}, State = #state{socket = Sock}) ->
|
||||
lager:notice("[sdlan_channel] tcp_error: ~p", [Reason]),
|
||||
{stop, normal, State};
|
||||
|
||||
handle_info({tcp_closed, Sock}, State = #state{socket = Sock}) ->
|
||||
lager:notice("[sdlan_channel] tcp_closed", []),
|
||||
{stop, normal, State};
|
||||
|
||||
%% 关闭当前通道
|
||||
handle_info({stop, Reason}, State) ->
|
||||
{stop, Reason, State};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
lager:warning("[sdlan_channel] get a unknown message: ~p, channel will closed", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(Reason, #state{}) ->
|
||||
lager:warning("[sdlan_channel] stop with reason: ~p", [Reason]),
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
%% helper methods
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
Loading…
x
Reference in New Issue
Block a user