fix channel

This commit is contained in:
anlicheng 2026-02-13 18:41:43 +08:00
parent c820f62a53
commit f69ff60703
5 changed files with 117 additions and 62 deletions

View File

@ -20,18 +20,10 @@
%% %%
%% token不存在
-define(NAK_INVALID_TOKEN, 1).
%%
-define(NAK_NODE_DISABLE, 2).
%% IP地址可以用
-define(NAK_NO_IP, 3).
%% %%
-define(NAK_NETWORK_FAULT, 4). -define(NAK_NETWORK_FAULT, 4).
%% %%
-define(NAK_INTERNAL_FAULT, 5). -define(NAK_INTERNAL_FAULT, 5).
%% hostname被占用
-define(NAK_HOSTNAME_USED, 6).
%% API %% API
-export([start_link/1]). -export([start_link/1]).
@ -42,32 +34,32 @@
-record(state, { -record(state, {
conn :: quicer:connection_handle(), conn :: quicer:connection_handle(),
stream_handle :: undefined | quicer:stream_handle(), stream :: undefined | quicer:stream_handle(),
%% framing的解析 %% framing的解析
buf = <<>>, buf = <<>>,
client_id :: undefined | binary(), client_id :: undefined | binary(),
network_id = 0 :: integer(),
%% id %% id
network_pid :: undefined | pid(), network_pid :: undefined | pid(),
%% mac地址 %% mac地址
mac :: undefined | binary(), mac :: undefined | binary(),
ip = 0 :: integer(), ip = 0 :: integer(),
ping_counter = 0, ping_counter = 0
%%
is_registered = false
}). }).
%%%=================================================================== %%%===================================================================
%%% API %%% API
%%%=================================================================== %%%===================================================================
send_event(Pid, EventType, Event) -> -spec send_event(Pid :: pid(), EventType :: integer(), Event :: binary()) -> no_return().
send_event(Pid, EventType, Event) when is_pid(Pid), is_integer(EventType), is_binary(Event) ->
gen_statem:cast(Pid, {send_event, EventType, Event}). gen_statem:cast(Pid, {send_event, EventType, Event}).
stop(Pid, Reason) -> -spec stop(Pid :: pid(), Reason :: term()) -> ok.
ok. stop(Pid, Reason) when is_pid(Pid) ->
gen_statem:stop(Pid, Reason, 2000).
%% @doc Creates a gen_statem process which calls Module:init/1 to %% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this %% initialize. To ensure a synchronized start-up procedure, this
@ -98,21 +90,16 @@ callback_mode() ->
%% process message, this function is called. %% process message, this function is called.
handle_event(internal, do_init, initializing, State=#state{conn = Conn}) -> handle_event(internal, do_init, initializing, State=#state{conn = Conn}) ->
logger:debug("[sdlan_quic_channel] conn: ~p, do_init", [Conn]),
case quicer:accept_stream(Conn, #{active => true}) of case quicer:accept_stream(Conn, #{active => true}) of
{ok, Stream} -> {ok, Stream} ->
logger:debug("[sdlan_quic_channel] get stream: ~p", [Stream]), logger:debug("[sdlan_quic_channel] get stream: ~p", [Stream]),
{next_state, initialized, State#state{stream_handle = Stream}}; {next_state, initialized, State#state{stream = Stream}};
{error, Reason} -> {error, Reason} ->
logger:error("accept stream failed: ~p", [Reason]), logger:error("[sdlan_quic_channel] accept stream failed: ~p", [Reason]),
{stop, Reason, State} {stop, Reason, State}
end; end;
handle_event(info, {frame, Frame}, _StateName, State=#state{}) -> handle_event(info, {frame, <<PacketId:32, ?PACKET_REGISTER_SUPER, Body/binary>>}, initialized, State=#state{stream = Stream}) ->
logger:debug("[sdlan_quic_channel] get frame: ~p", [Frame]),
{keep_state, State};
handle_event(info, {frame, <<PacketId:32, ?PACKET_REGISTER_SUPER, Body/binary>>}, registered, State=#state{stream_handle = Stream}) ->
#sdl_register_super{ #sdl_register_super{
client_id = ClientId, network_id = NetworkId, mac = Mac, ip = Ip, mask_len = MaskLen, client_id = ClientId, network_id = NetworkId, mac = Mac, ip = Ip, mask_len = MaskLen,
hostname = HostName, pub_key = PubKey, access_token = AccessToken} = sdlan_pb:decode_msg(Body, sdl_register_super), hostname = HostName, pub_key = PubKey, access_token = AccessToken} = sdlan_pb:decode_msg(Body, sdl_register_super),
@ -142,23 +129,18 @@ handle_event(info, {frame, <<PacketId:32, ?PACKET_REGISTER_SUPER, Body/binary>>}
case sdlan_network:get_pid(NetworkId) of case sdlan_network:get_pid(NetworkId) of
NetworkPid when is_pid(NetworkPid) -> NetworkPid when is_pid(NetworkPid) ->
{ok, AesKey, SessionToken} = sdlan_network:attach(NetworkPid, self(), ClientId, Mac, Ip, HostName), {ok, AesKey, SessionToken} = sdlan_network:attach(NetworkPid, self(), ClientId, Mac, Ip, HostName),
RsaPubKey = sdlan_cipher:rsa_pem_decode(PubKey), RsaPubKey = sdlan_cipher:rsa_pem_decode(PubKey),
EncodedAesKey = rsa_encode(AesKey, RsaPubKey),
RegisterSuperAck = sdlan_pb:encode_msg(#sdl_register_super_ack { RegisterSuperAck = sdlan_pb:encode_msg(#sdl_register_super_ack {
aes_key = EncodedAesKey, aes_key = rsa_encode(AesKey, RsaPubKey),
session_token = SessionToken session_token = SessionToken
}), }),
%% %%
Reply = <<?PACKET_REGISTER_SUPER_ACK, RegisterSuperAck/binary>>, {ok, _} = quicer:send(Stream, <<PacketId:32, ?PACKET_REGISTER_SUPER_ACK, RegisterSuperAck/binary>>),
{ok, _} = quicer:send(Stream, <<PacketId:32, Reply/binary>>),
%% 线 %% 线
Result = sdlan_api:node_online(ClientId, NetworkId, sdlan_ipaddr:int_to_ipv4(Ip)), Result = sdlan_api:node_online(ClientId, NetworkId, sdlan_ipaddr:int_to_ipv4(Ip)),
logger:debug("[sdlan_register_worker] client_id: ~p, set none online result is: ~p", [ClientId, Result]), logger:debug("[sdlan_register_worker] client_id: ~p, set none online result is: ~p", [ClientId, Result]),
{next_state, registered, State#state{network_pid = NetworkPid, client_id = ClientId, mac = Mac, ip = Ip}}; {next_state, registered, State#state{network_id = NetworkId, network_pid = NetworkPid, client_id = ClientId, mac = Mac, ip = Ip}};
undefined -> undefined ->
logger:warning("[sdlan_register_worker] client_id: ~p, register get error: network not found", [ClientId]), logger:warning("[sdlan_register_worker] client_id: ~p, register get error: network not found", [ClientId]),
{ok, _} = quicer:send(Stream, register_nak_reply(PacketId, ?NAK_INTERNAL_FAULT, <<"Internal Error">>)), {ok, _} = quicer:send(Stream, register_nak_reply(PacketId, ?NAK_INTERNAL_FAULT, <<"Internal Error">>)),
@ -174,14 +156,16 @@ handle_event(info, {frame, <<PacketId:32, ?PACKET_REGISTER_SUPER, Body/binary>>}
{stop, normal, State} {stop, normal, State}
end; end;
handle_event(info, {frame, <<PacketId:32, ?PACKET_QUERY_INFO, Body/binary>>}, registered, State=#state{stream_handle = Stream, network_pid = NetworkPid, mac = SrcMac, is_registered = true}) when is_pid(NetworkPid) -> handle_event(info, {frame, <<PacketId:32, ?PACKET_QUERY_INFO, Body/binary>>}, registered, #state{stream = Stream, network_pid = NetworkPid, mac = SrcMac}) when is_pid(NetworkPid) ->
#sdl_query_info{dst_mac = DstMac} = sdlan_pb:decode_msg(Body, sdl_query_info), #sdl_query_info{dst_mac = DstMac} = sdlan_pb:decode_msg(Body, sdl_query_info),
case sdlan_network:peer_info(NetworkPid, SrcMac, DstMac) of case sdlan_network:peer_info(NetworkPid, SrcMac, DstMac) of
error -> error ->
logger:debug("[sdlan_channel] query_info src_mac is: ~p, dst_mac: ~p, nat_peer not found", logger:debug("[sdlan_channel] query_info src_mac is: ~p, dst_mac: ~p, nat_peer not found",
[sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac)]), [sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac)]),
{ok, _} = quicer:send(Stream, <<PacketId:32, ?PACKET_EMPTY>>),
{keep_state, State}; EmptyResponse = sdlan_pb:encode_msg(#sdl_empty{}),
{ok, _} = quicer:send(Stream, <<PacketId:32, ?PACKET_PEER_INFO, EmptyResponse/binary>>),
keep_state_and_data;
{ok, {NatPeer = {{Ip0, Ip1, Ip2, Ip3}, NatPort}, NatType}, V6Info} -> {ok, {NatPeer = {{Ip0, Ip1, Ip2, Ip3}, NatPort}, NatType}, V6Info} ->
logger:debug("[sdlan_channel] query_info src_mac is: ~p, dst_mac: ~p, nat_peer: ~p", logger:debug("[sdlan_channel] query_info src_mac is: ~p, dst_mac: ~p, nat_peer: ~p",
[sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac), NatPeer]), [sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac), NatPeer]),
@ -196,11 +180,10 @@ handle_event(info, {frame, <<PacketId:32, ?PACKET_QUERY_INFO, Body/binary>>}, re
v6_info = V6Info v6_info = V6Info
}), }),
{ok, _} = quicer:send(Stream, <<PacketId:32, ?PACKET_PEER_INFO, PeerInfo/binary>>), {ok, _} = quicer:send(Stream, <<PacketId:32, ?PACKET_PEER_INFO, PeerInfo/binary>>),
{keep_state, State} keep_state_and_data
end; end;
handle_event(info, {frame, <<0:32, ?PACKET_PING>>}, registered, State = #state{stream_handle = Stream, ping_counter = PingCounter}) -> handle_event(info, {frame, <<0:32, ?PACKET_PING>>}, registered, State = #state{stream = Stream, ping_counter = PingCounter}) ->
%logger:debug("[sdlan_channel] client_id: ~p, get ping", [ClientId]),
{ok, _} = quicer:send(Stream, <<0:32, ?PACKET_PONG>>), {ok, _} = quicer:send(Stream, <<0:32, ?PACKET_PONG>>),
{keep_state, State#state{ping_counter = PingCounter + 1}}; {keep_state, State#state{ping_counter = PingCounter + 1}};
@ -216,19 +199,22 @@ handle_event(info, {timeout, _, ping_ticker}, _, State = #state{client_id = Clie
end; end;
%% %%
handle_event(info, {send_event, EventType, Event}, _, State = #state{stream_handle = Stream, client_id = ClientId, is_registered = true}) -> handle_event(info, {send_event, EventType, Event}, registered, #state{stream = Stream, client_id = ClientId}) ->
logger:debug("[sdlan_channel] client_id: ~p, will send eventType: ~p, event: ~p", [ClientId, EventType, Event]), logger:debug("[sdlan_channel] client_id: ~p, will send eventType: ~p, event: ~p", [ClientId, EventType, Event]),
{ok, _} = quicer:send(Stream, <<0:32, ?PACKET_EVENT, EventType, Event/binary>>), {ok, _} = quicer:send(Stream, <<0:32, ?PACKET_EVENT, EventType, Event/binary>>),
{noreply, State}; keep_state_and_data;
%% %%
handle_event(info, {frame, <<0:32, ?PACKET_UNREGISTER>>}, registered, State = #state{client_id = ClientId, network_pid = NetworkPid, is_registered = true}) when is_pid(NetworkPid) -> handle_event(info, {frame, <<0:32, ?PACKET_UNREGISTER>>}, registered, State=#state{client_id = ClientId, mac = Mac, network_pid = NetworkPid}) when is_pid(NetworkPid) ->
logger:warning("[sdlan_channel] unregister client_id: ~p", [ClientId]), logger:warning("[sdlan_channel] unregister client_id: ~p", [ClientId]),
% sdlan_network:unregister(NetworkPid, ClientId), sdlan_network:unregister(NetworkPid, ClientId, Mac),
{stop, normal, State}; {stop, normal, State};
handle_event(info, {quic, send_shutdown_complete, Stream, _Props}, _StateName, State = #state{stream = Stream}) ->
{stop, connection_shutdown, State};
%% quicer相关的信息, frame消息 %% quicer相关的信息, frame消息
handle_event(info, {quic, Data, Stream, _Props}, _StateName, State = #state{stream_handle = Stream, buf = Buf}) -> handle_event(info, {quic, Data, Stream, _Props}, _StateName, State = #state{stream = Stream, buf = Buf}) ->
logger:debug("[sdlan_quic_channel] get message: ~p", [Data]), logger:debug("[sdlan_quic_channel] get message: ~p", [Data]),
case decode_frames(<<Buf/binary, Data/binary>>) of case decode_frames(<<Buf/binary, Data/binary>>) of
{error, Reason} -> {error, Reason} ->
@ -238,30 +224,19 @@ handle_event(info, {quic, Data, Stream, _Props}, _StateName, State = #state{stre
{keep_state, State#state{buf = NBuf}, Actions} {keep_state, State#state{buf = NBuf}, Actions}
end; end;
handle_event(info, {quic_closed, Stream, _Props}, _StateName, State = #state{conn = Conn, stream_handle = Stream}) -> handle_event(info, {quic_closed, Stream, _Props}, _StateName, State = #state{stream = Stream}) ->
quicer:close_connection(Conn),
{stop, connection_closed, State}; {stop, connection_closed, State};
handle_event(info, {'EXIT', _, _}, _StateName, State = #state{conn = Conn}) -> handle_event(info, {'EXIT', _, _}, _StateName, State) ->
quicer:close_connection(Conn), {stop, connection_closed, State}.
{stop, connection_closed, State};
handle_event(_EventType, _EventContent, _StateName, State = #state{}) ->
NextStateName = the_next_state_name,
{next_state, NextStateName, State}.
%% @private %% @private
%% @doc This function is called by a gen_statem when it is about to %% @doc This function is called by a gen_statem when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any %% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with %% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored. %% Reason. The return value is ignored.
terminate(Reason, _StateName, _State = #state{conn = Conn, stream_handle = Stream}) -> terminate(Reason, _StateName, _State = #state{conn = Conn, stream = Stream}) ->
case Stream /= undefined of Stream /= undefined andalso quicer:close_stream(Stream),
true ->
quicer:close_stream(Stream);
false ->
ok
end,
quicer:close_connection(Conn), quicer:close_connection(Conn),
logger:warning("[sdlan_quic_conn] terminate closed with reason: ~p", [Reason]), logger:warning("[sdlan_quic_conn] terminate closed with reason: ~p", [Reason]),
ok. ok.

View File

@ -0,0 +1,65 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2026, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 13. 2 2026 18:00
%%%-------------------------------------------------------------------
-module(sdlan_quic_channel_sup).
-author("anlicheng").
-behaviour(supervisor).
%% API
-export([start_link/0]).
-export([start_channel/1]).
%% Supervisor callbacks
-export([init/1]).
-define(SERVER, ?MODULE).
%%%===================================================================
%%% API functions
%%%===================================================================
%% @doc Starts the supervisor
-spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================
%% @private
%% @doc Whenever a supervisor is started using supervisor:start_link/[2,3],
%% this function is called by the new process to find out about
%% restart strategy, maximum restart frequency and child
%% specifications.
-spec(init(Args :: term()) ->
{ok, {SupFlags :: {RestartStrategy :: supervisor:strategy(),
MaxR :: non_neg_integer(), MaxT :: non_neg_integer()},
[ChildSpec :: supervisor:child_spec()]}}
| ignore | {error, Reason :: term()}).
init([]) ->
SupFlags = #{strategy => simple_one_for_one, intensity => 0, period => 1},
AChild = #{
id => sdlan_quic_channel,
start => {'sdlan_quic_channel', start_link, []},
restart => temporary,
shutdown => 2000,
type => worker,
modules => ['sdlan_quic_channel']
},
{ok, {SupFlags, [AChild]}}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec start_channel(NConn :: quicer:connection_handle()) -> supervisor:startchild_ret().
start_channel(NConn) ->
supervisor:start_child(?MODULE, [NConn]).

View File

@ -45,8 +45,13 @@ loop_accept(L) ->
case quicer:handshake(Conn) of case quicer:handshake(Conn) of
{ok, NConn} -> {ok, NConn} ->
logger:debug("[sdlan_quic_server] conn: ~p, handshake success", [NConn]), logger:debug("[sdlan_quic_server] conn: ~p, handshake success", [NConn]),
{ok, ChannelPid} = sdlan_quic_channel:start_link(NConn), case sdlan_quic_channel_sup:start_channel(NConn) of
quicer:controlling_process(NConn, ChannelPid), {ok, ChannelPid} ->
quicer:controlling_process(NConn, ChannelPid);
Error ->
quicer:close_connection(NConn),
logger:notice("[sdlan_quic_server] start channel get error: ~p", [Error])
end,
loop_accept(L); loop_accept(L);
{error, _} -> {error, _} ->
quicer:close_connection(Conn), quicer:close_connection(Conn),

View File

@ -62,6 +62,16 @@ init([]) ->
type => supervisor, type => supervisor,
modules => ['sdlan_stun_sup'] modules => ['sdlan_stun_sup']
}, },
#{
id => sdlan_quic_channel_sup,
start => {sdlan_quic_channel_sup, start_link, []},
restart => permanent,
shutdown => 2000,
type => supervisor,
modules => ['sdlan_quic_channel_sup']
},
#{ #{
id => sdlan_quic_server, id => sdlan_quic_server,
start => {sdlan_quic_server, start_link, []}, start => {sdlan_quic_server, start_link, []},

View File

@ -3,7 +3,7 @@
{http_server, [ {http_server, [
{port, 18082}, {port, 18082},
{acceptors, 500}, {acceptors, 1},
{max_connections, 10240}, {max_connections, 10240},
{backlog, 10240} {backlog, 10240}
]}, ]},