fix quic_channel

This commit is contained in:
anlicheng 2026-02-13 16:37:21 +08:00
parent 21e26c3ab2
commit c6cb4eb49e
6 changed files with 1602 additions and 1582 deletions

View File

@ -25,17 +25,6 @@
}). }).
-endif. -endif.
-ifndef('SDL_DEV_ADDR_PB_H').
-define('SDL_DEV_ADDR_PB_H', true).
-record(sdl_dev_addr,
{network_id = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
mac = <<>> :: iodata() | undefined, % = 2, optional
net_addr = 0 :: non_neg_integer() | undefined, % = 3, optional, 32 bits
net_bit_len = 0 :: non_neg_integer() | undefined, % = 4, optional, 32 bits
network_domain = <<>> :: unicode:chardata() | undefined % = 5, optional
}).
-endif.
-ifndef('SDL_EMPTY_PB_H'). -ifndef('SDL_EMPTY_PB_H').
-define('SDL_EMPTY_PB_H', true). -define('SDL_EMPTY_PB_H', true).
-record(sdl_empty, -record(sdl_empty,
@ -47,24 +36,22 @@
-define('SDL_REGISTER_SUPER_PB_H', true). -define('SDL_REGISTER_SUPER_PB_H', true).
-record(sdl_register_super, -record(sdl_register_super,
{version = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits {version = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
installed_channel = <<>> :: unicode:chardata() | undefined, % = 2, optional client_id = <<>> :: unicode:chardata() | undefined, % = 2, optional
client_id = <<>> :: unicode:chardata() | undefined, % = 3, optional network_id = 0 :: non_neg_integer() | undefined, % = 3, optional, 32 bits
dev_addr = undefined :: sdlan_pb:sdl_dev_addr() | undefined, % = 4, optional mac = <<>> :: iodata() | undefined, % = 4, optional
pub_key = <<>> :: unicode:chardata() | undefined, % = 5, optional ip = 0 :: non_neg_integer() | undefined, % = 5, optional, 32 bits
token = <<>> :: unicode:chardata() | undefined, % = 6, optional mask_len = 0 :: non_neg_integer() | undefined, % = 6, optional, 32 bits
network_code = <<>> :: unicode:chardata() | undefined, % = 7, optional hostname = <<>> :: unicode:chardata() | undefined, % = 7, optional
hostname = <<>> :: unicode:chardata() | undefined % = 8, optional pub_key = <<>> :: unicode:chardata() | undefined, % = 8, optional
access_token = <<>> :: unicode:chardata() | undefined % = 9, optional
}). }).
-endif. -endif.
-ifndef('SDL_REGISTER_SUPER_ACK_PB_H'). -ifndef('SDL_REGISTER_SUPER_ACK_PB_H').
-define('SDL_REGISTER_SUPER_ACK_PB_H', true). -define('SDL_REGISTER_SUPER_ACK_PB_H', true).
-record(sdl_register_super_ack, -record(sdl_register_super_ack,
{dev_addr = undefined :: sdlan_pb:sdl_dev_addr() | undefined, % = 1, optional {aes_key = <<>> :: iodata() | undefined, % = 1, optional
aes_key = <<>> :: iodata() | undefined, % = 2, optional session_token = <<>> :: iodata() | undefined % = 2, optional
upgrade_type = 0 :: non_neg_integer() | undefined, % = 3, optional, 32 bits
upgrade_prompt :: unicode:chardata() | undefined, % = 4, optional
upgrade_address :: unicode:chardata() | undefined % = 5, optional
}). }).
-endif. -endif.
@ -92,6 +79,40 @@
}). }).
-endif. -endif.
-ifndef('SDL_ARP_REQUEST_PB_H').
-define('SDL_ARP_REQUEST_PB_H', true).
-record(sdl_arp_request,
{target_ip = 0 :: non_neg_integer() | undefined % = 1, optional, 32 bits
}).
-endif.
-ifndef('SDL_ARP_RESPONSE_PB_H').
-define('SDL_ARP_RESPONSE_PB_H', true).
-record(sdl_arp_response,
{target_ip = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
target_mac = <<>> :: iodata() | undefined % = 2, optional
}).
-endif.
-ifndef('SDL_POLICY_REQUEST_PB_H').
-define('SDL_POLICY_REQUEST_PB_H', true).
-record(sdl_policy_request,
{src_identity_id = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
dst_identity_id = 0 :: non_neg_integer() | undefined, % = 2, optional, 32 bits
version = 0 :: non_neg_integer() | undefined % = 3, optional, 32 bits
}).
-endif.
-ifndef('SDL_POLICY_RESPONSE_PB_H').
-define('SDL_POLICY_RESPONSE_PB_H', true).
-record(sdl_policy_response,
{src_identity_id = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
dst_identity_id = 0 :: non_neg_integer() | undefined, % = 2, optional, 32 bits
version = 0 :: non_neg_integer() | undefined, % = 3, optional, 32 bits
rules = <<>> :: iodata() | undefined % = 4, optional
}).
-endif.
-ifndef('SDL_NAT_CHANGED_EVENT_PB_H'). -ifndef('SDL_NAT_CHANGED_EVENT_PB_H').
-define('SDL_NAT_CHANGED_EVENT_PB_H', true). -define('SDL_NAT_CHANGED_EVENT_PB_H', true).
-record(sdl_nat_changed_event, -record(sdl_nat_changed_event,
@ -118,31 +139,6 @@
}). }).
-endif. -endif.
-ifndef('SDL_CHANGE_NETWORK_COMMAND_PB_H').
-define('SDL_CHANGE_NETWORK_COMMAND_PB_H', true).
-record(sdl_change_network_command,
{dev_addr = undefined :: sdlan_pb:sdl_dev_addr() | undefined, % = 1, optional
aes_key = <<>> :: iodata() | undefined % = 2, optional
}).
-endif.
-ifndef('SDL_COMMAND_ACK_PB_H').
-define('SDL_COMMAND_ACK_PB_H', true).
-record(sdl_command_ack,
{status = false :: boolean() | 0 | 1 | undefined, % = 1, optional
message :: unicode:chardata() | undefined % = 2, optional
}).
-endif.
-ifndef('SDL_FLOWS_PB_H').
-define('SDL_FLOWS_PB_H', true).
-record(sdl_flows,
{forward_num = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
p2p_num = 0 :: non_neg_integer() | undefined, % = 2, optional, 32 bits
inbound_num = 0 :: non_neg_integer() | undefined % = 3, optional, 32 bits
}).
-endif.
-ifndef('SDL_STUN_REQUEST_PB_H'). -ifndef('SDL_STUN_REQUEST_PB_H').
-define('SDL_STUN_REQUEST_PB_H', true). -define('SDL_STUN_REQUEST_PB_H', true).
-record(sdl_stun_request, -record(sdl_stun_request,
@ -171,7 +167,28 @@
dst_mac = <<>> :: iodata() | undefined, % = 3, optional dst_mac = <<>> :: iodata() | undefined, % = 3, optional
is_p2p = false :: boolean() | 0 | 1 | undefined, % = 4, optional is_p2p = false :: boolean() | 0 | 1 | undefined, % = 4, optional
ttl = 0 :: non_neg_integer() | undefined, % = 5, optional, 32 bits ttl = 0 :: non_neg_integer() | undefined, % = 5, optional, 32 bits
data = <<>> :: iodata() | undefined % = 6, optional data = <<>> :: iodata() | undefined, % = 6, optional
session_token = <<>> :: iodata() | undefined, % = 7, optional
identity_id = 0 :: non_neg_integer() | undefined % = 8, optional, 32 bits
}).
-endif.
-ifndef('SDL_STUN_PROBE_PB_H').
-define('SDL_STUN_PROBE_PB_H', true).
-record(sdl_stun_probe,
{cookie = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
attr = 0 :: non_neg_integer() | undefined, % = 2, optional, 32 bits
step = 0 :: non_neg_integer() | undefined % = 3, optional, 32 bits
}).
-endif.
-ifndef('SDL_STUN_PROBE_REPLY_PB_H').
-define('SDL_STUN_PROBE_REPLY_PB_H', true).
-record(sdl_stun_probe_reply,
{cookie = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
step = 0 :: non_neg_integer() | undefined, % = 2, optional, 32 bits
port = 0 :: non_neg_integer() | undefined, % = 3, optional, 32 bits
ip = 0 :: non_neg_integer() | undefined % = 4, optional, 32 bits
}). }).
-endif. -endif.
@ -193,21 +210,4 @@
}). }).
-endif. -endif.
-ifndef('SDL_STUN_PROBE_PB_H').
-define('SDL_STUN_PROBE_PB_H', true).
-record(sdl_stun_probe,
{cookie = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
attr = 0 :: non_neg_integer() | undefined % = 2, optional, 32 bits
}).
-endif.
-ifndef('SDL_STUN_PROBE_REPLY_PB_H').
-define('SDL_STUN_PROBE_REPLY_PB_H', true).
-record(sdl_stun_probe_reply,
{cookie = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
port = 0 :: non_neg_integer() | undefined, % = 2, optional, 32 bits
ip = 0 :: non_neg_integer() | undefined % = 3, optional, 32 bits
}).
-endif.
-endif. -endif.

View File

@ -8,13 +8,34 @@
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
-module(sdlan_quic_channel). -module(sdlan_quic_channel).
-author("anlicheng"). -author("anlicheng").
-include("sdlan.hrl").
-include("sdlan_pb.hrl").
-behaviour(gen_statem). -behaviour(gen_statem).
-define(MAX_FRAME_LEN, 16384). -define(MAX_FRAME_LEN, 16384).
%%
-define(PING_TICKER, 15000).
%%
%% token不存在
-define(NAK_INVALID_TOKEN, 1).
%%
-define(NAK_NODE_DISABLE, 2).
%% IP地址可以用
-define(NAK_NO_IP, 3).
%%
-define(NAK_NETWORK_FAULT, 4).
%%
-define(NAK_INTERNAL_FAULT, 5).
%% hostname被占用
-define(NAK_HOSTNAME_USED, 6).
%% API %% API
-export([start_link/1]). -export([start_link/1]).
-export([send_event/3, stop/2]).
%% gen_statem callbacks %% gen_statem callbacks
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
@ -23,13 +44,31 @@
conn :: quicer:connection_handle(), conn :: quicer:connection_handle(),
stream_handle :: undefined | quicer:stream_handle(), stream_handle :: undefined | quicer:stream_handle(),
%% framing的解析 %% framing的解析
buf = <<>> buf = <<>>,
client_id :: undefined | binary(),
%% id
network_pid :: undefined | pid(),
%% mac地址
mac :: undefined | binary(),
ip = 0 :: integer(),
ping_counter = 0,
%%
is_registered = false
}). }).
%%%=================================================================== %%%===================================================================
%%% API %%% API
%%%=================================================================== %%%===================================================================
send_event(Pid, EventType, Event) ->
gen_statem:cast(Pid, {send_event, EventType, Event}).
stop(Pid, Reason) ->
ok.
%% @doc Creates a gen_statem process which calls Module:init/1 to %% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this %% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned. %% function does not return until Module:init/1 has returned.
@ -58,7 +97,7 @@ callback_mode() ->
%% gen_statem receives an event from call/2, cast/2, or as a normal %% gen_statem receives an event from call/2, cast/2, or as a normal
%% process message, this function is called. %% process message, this function is called.
handle_event(internal, do_init, initializing, State = #state{conn = Conn}) -> handle_event(internal, do_init, initializing, State=#state{conn = Conn}) ->
logger:debug("[sdlan_quic_channel] conn: ~p, do_init", [Conn]), logger:debug("[sdlan_quic_channel] conn: ~p, do_init", [Conn]),
case quicer:accept_stream(Conn, []) of case quicer:accept_stream(Conn, []) of
{ok, Stream} -> {ok, Stream} ->
@ -69,11 +108,126 @@ handle_event(internal, do_init, initializing, State = #state{conn = Conn}) ->
{stop, Reason, State} {stop, Reason, State}
end; end;
handle_event(info, {frame, Frame}, _StateName, State = #state{}) -> handle_event(info, {frame, Frame}, _StateName, State=#state{}) ->
logger:debug("[sdlan_quic_channel] get frame: ~p", [Frame]), logger:debug("[sdlan_quic_channel] get frame: ~p", [Frame]),
{keep_state, State}; {keep_state, State};
%% quicer相关的信息 handle_event(info, {frame, <<PacketId:32, ?PACKET_REGISTER_SUPER, Body/binary>>}, registered, State=#state{stream_handle = Stream}) ->
#sdl_register_super{
client_id = ClientId, network_id = NetworkId, mac = Mac, ip = Ip, mask_len = MaskLen,
hostname = HostName, pub_key = PubKey, access_token = AccessToken} = sdlan_pb:decode_msg(Body, sdl_register_super),
true = (Mac =/= <<>> andalso PubKey =/= <<>> andalso ClientId =/= <<>>),
%% Mac地址不能是广播地址
true = not (sdlan_util:is_multicast_mac(Mac) orelse sdlan_util:is_broadcast_mac(Mac)),
MacBinStr = sdlan_util:format_mac(Mac),
IpAddr = sdlan_ipaddr:int_to_ipv4(Ip),
Params = #{
<<"network_id">> => NetworkId,
<<"client_id">> => ClientId,
<<"mac">> => MacBinStr,
<<"ip">> => IpAddr,
<<"mask_len">> => MaskLen,
<<"hostname">> => HostName,
<<"access_token">> => AccessToken
},
%%
logger:debug("[sdlan_register_worker] client_id: ~p, ip: ~p, mac: ~p, host_name: ~p, access_token: ~p, network_id: ~p",
[ClientId, Ip, Mac, HostName, AccessToken, NetworkId]),
case sdlan_api:auth_access_token(Params) of
{ok, #{<<"result">> := <<"ok">>}} ->
%% network的对应关系
case sdlan_network:get_pid(NetworkId) of
NetworkPid when is_pid(NetworkPid) ->
{ok, AesKey, SessionToken} = sdlan_network:attach(NetworkPid, {ClientIp, ClientPort}, ClientId, Mac, Ip, HostName),
RsaPubKey = sdlan_cipher:rsa_pem_decode(PubKey),
EncodedAesKey = rsa_encode(AesKey, RsaPubKey),
RegisterSuperAck = sdlan_pb:encode_msg(#sdl_register_super_ack {
aes_key = EncodedAesKey,
session_token = SessionToken
}),
%%
Reply = <<?PACKET_REGISTER_SUPER_ACK, RegisterSuperAck/binary>>,
{ok, _} = quicer:send(Stream, <<PacketId:32, Reply/binary>>),
%% 线
Result = sdlan_api:node_online(ClientId, NetworkId, sdlan_ipaddr:int_to_ipv4(Ip)),
logger:debug("[sdlan_register_worker] client_id: ~p, set none online result is: ~p", [ClientId, Result]),
{next_state, registered, State#state{network_pid = NetworkPid, client_id = ClientId, mac = Mac, ip = Ip}};
undefined ->
logger:warning("[sdlan_register_worker] client_id: ~p, register get error: network not found", [ClientId]),
{ok, _} = quicer:send(Stream, register_nak_reply(PacketId, ?NAK_INTERNAL_FAULT, <<"Internal Error">>)),
{stop, normal, State}
end;
{ok, #{<<"error">> := #{<<"code">> := Code, <<"message">> := Message}}} ->
logger:warning("[sdlan_register_worker] network_id: ~p, client_id: ~p, register get error: ~ts, error_code: ~p", [NetworkId, ClientId, Message, Code]),
{ok, _} = quicer:send(Stream, register_nak_reply(PacketId, Code, Message)),
{stop, normal, State};
{error, Reason} ->
logger:warning("[sdlan_register_worker] network_id: ~p, client_id: ~p, register get error: ~p", [NetworkId, ClientId, Reason]),
{ok, _} = quicer:send(Stream, register_nak_reply(PacketId, ?NAK_NETWORK_FAULT, <<"Network Error">>)),
{stop, normal, State}
end;
handle_event(info, {frame, <<PacketId:32, ?PACKET_QUERY_INFO, Body/binary>>}, registered, State=#state{stream_handle = Stream, network_pid = NetworkPid, mac = SrcMac, is_registered = true}) when is_pid(NetworkPid) ->
#sdl_query_info{dst_mac = DstMac} = sdlan_pb:decode_msg(Body, sdl_query_info),
case sdlan_network:peer_info(NetworkPid, SrcMac, DstMac) of
error ->
logger:debug("[sdlan_channel] query_info src_mac is: ~p, dst_mac: ~p, nat_peer not found",
[sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac)]),
{ok, _} = quicer:send(Stream, <<PacketId:32, ?PACKET_EMPTY>>),
{keep_state, State};
{ok, {NatPeer = {{Ip0, Ip1, Ip2, Ip3}, NatPort}, NatType}, V6Info} ->
logger:debug("[sdlan_channel] query_info src_mac is: ~p, dst_mac: ~p, nat_peer: ~p",
[sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac), NatPeer]),
PeerInfo = sdlan_pb:encode_msg(#sdl_peer_info{
dst_mac = DstMac,
v4_info = #sdl_v4_info {
port = NatPort,
v4 = <<Ip0, Ip1, Ip2, Ip3>>,
nat_type = NatType
},
v6_info = V6Info
}),
{ok, _} = quicer:send(Stream, <<PacketId:32, ?PACKET_PEER_INFO, PeerInfo/binary>>),
{keep_state, State}
end;
handle_event(info, {frame, <<0:32, ?PACKET_PING>>}, registered, State = #state{stream_handle = Stream, ping_counter = PingCounter}) ->
%logger:debug("[sdlan_channel] client_id: ~p, get ping", [ClientId]),
{ok, _} = quicer:send(Stream, <<0:32, ?PACKET_PONG>>),
{keep_state, State#state{ping_counter = PingCounter + 1}};
handle_event(info, {timeout, _, ping_ticker}, _, State = #state{client_id = ClientId, ping_counter = PingCounter}) ->
%%
erlang:start_timer(?PING_TICKER, self(), ping_ticker),
case PingCounter > 0 of
true ->
{noreply, State#state{ping_counter = 0}};
false ->
logger:debug("[sdlan_channel] client_id: ~p, ping losted", [ClientId]),
{stop, normal, State#state{ping_counter = 0}}
end;
%%
handle_event(info, {send_event, EventType, Event}, _, State = #state{stream_handle = Stream, client_id = ClientId, is_registered = true}) ->
logger:debug("[sdlan_channel] client_id: ~p, will send eventType: ~p, event: ~p", [ClientId, EventType, Event]),
{ok, _} = quicer:send(Stream, <<0:32, ?PACKET_EVENT, EventType, Event/binary>>),
{noreply, State};
%%
handle_event(info, {frame, <<0:32, ?PACKET_UNREGISTER>>}, registered, State = #state{client_id = ClientId, network_pid = NetworkPid, is_registered = true}) when is_pid(NetworkPid) ->
logger:warning("[sdlan_channel] unregister client_id: ~p", [ClientId]),
% sdlan_network:unregister(NetworkPid, ClientId),
{stop, normal, State};
%% quicer相关的信息, frame消息
handle_event(info, {quic, Data, Stream, _Props}, _StateName, State = #state{stream_handle = Stream, buf = Buf}) -> handle_event(info, {quic, Data, Stream, _Props}, _StateName, State = #state{stream_handle = Stream, buf = Buf}) ->
logger:debug("[sdlan_quic_channel] get message: ~p", [Data]), logger:debug("[sdlan_quic_channel] get message: ~p", [Data]),
case decode_frames(<<Buf/binary, Data/binary>>) of case decode_frames(<<Buf/binary, Data/binary>>) of
@ -133,3 +287,14 @@ decode_frames0(<<Len:16, Frame:Len/binary, Rest/binary>>, Frames) ->
decode_frames0(Rest, [Frame|Frames]); decode_frames0(Rest, [Frame|Frames]);
decode_frames0(Rest, Frames) -> decode_frames0(Rest, Frames) ->
{ok, Rest, lists:reverse(Frames)}. {ok, Rest, lists:reverse(Frames)}.
-spec register_nak_reply(PacketId :: integer(), ErrorCode :: integer(), ErrorMsg :: binary()) -> binary().
register_nak_reply(PacketId, ErrorCode, ErrorMsg) when is_integer(PacketId), is_integer(ErrorCode), is_binary(ErrorMsg) ->
RegisterNakReply = sdlan_pb:encode_msg(#sdl_register_super_nak {
error_code = ErrorCode,
error_message = ErrorMsg
}),
<<PacketId:32, ?PACKET_REGISTER_SUPER_NAK, RegisterNakReply/binary>>.
rsa_encode(PlainText, RsaPubKey) when is_binary(PlainText) ->
iolist_to_binary(sdlan_cipher:rsa_encrypt(PlainText, RsaPubKey)).

View File

@ -13,7 +13,7 @@
%% API %% API
-export([get_all_networks/0, get_network/1]). -export([get_all_networks/0, get_network/1]).
-export([auth_token/3, node_online/3, node_offline/2, flow_report/5, network_forward_report/2, auth_network_code/3]). -export([node_online/3, node_offline/2, flow_report/5, network_forward_report/2, auth_access_token/1]).
-spec get_all_networks() -> {ok, [NetworkId :: integer()]} | {error, Reason :: any()}. -spec get_all_networks() -> {ok, [NetworkId :: integer()]} | {error, Reason :: any()}.
get_all_networks() -> get_all_networks() ->
@ -47,23 +47,9 @@ get_network(Id) when is_integer(Id) ->
Error Error
end. end.
-spec auth_token(ClientId :: binary(), Token :: binary(), Version :: integer()) -> {ok, Resp :: map()} | {error, Reason :: any()}. -spec auth_access_token(Params :: map()) -> {ok, Resp :: map()} | {error, Reason :: any()}.
auth_token(ClientId, Token, Version) when is_binary(ClientId), is_binary(Token), is_integer(Version) -> auth_access_token(Params) when is_map(Params) ->
case catch do_post("auth_token", #{<<"client_id">> => ClientId, <<"token">> => Token, <<"version">> => Version}) of case catch do_post("auth_token", Params) of
{ok, Resp} ->
case catch jiffy:decode(Resp, [return_maps]) of
Result when is_map(Result) ->
{ok, Result};
{error, Reason} ->
{error, Reason}
end;
Error ->
Error
end.
-spec auth_network_code(ClientId :: binary(), NetworkCode :: binary(), Version :: integer()) -> {ok, Resp :: map()} | {error, Reason :: any()}.
auth_network_code(ClientId, NetworkCode, Version) when is_binary(ClientId), is_binary(NetworkCode), is_integer(Version) ->
case catch do_post("check_network", #{<<"client_id">> => ClientId, <<"code">> => NetworkCode, <<"version">> => Version}) of
{ok, Resp} -> {ok, Resp} ->
case catch jiffy:decode(Resp, [return_maps]) of case catch jiffy:decode(Resp, [return_maps]) of
Result when is_map(Result) -> Result when is_map(Result) ->

View File

@ -10,7 +10,7 @@
-author("anlicheng"). -author("anlicheng").
%% API %% API
-export([init/0, lookup/1, insert/2]). -export([init/0, lookup/1, insert/3]).
-define(TABLE, sdlan_hostname_regedit). -define(TABLE, sdlan_hostname_regedit).
@ -26,7 +26,10 @@ lookup(FullHostname) when is_binary(FullHostname) ->
error error
end. end.
-spec insert(FullHostname :: binary(), Ip :: integer()) -> no_return(). -spec insert(any(), Domain :: binary(), Ip :: integer()) -> no_return().
insert(FullHostname, Ip) when is_binary(FullHostname), is_integer(Ip) -> insert(HostName, Domain, Ip) when is_binary(HostName), is_binary(Domain), is_integer(Ip), HostName /= <<>> ->
FullHostname = <<HostName/binary, ".", Domain/binary>>,
<<Ip0, Ip1, Ip2, Ip3>> = <<Ip:32>>, <<Ip0, Ip1, Ip2, Ip3>> = <<Ip:32>>,
true = ets:insert(?TABLE, {FullHostname, {Ip0, Ip1, Ip2, Ip3}}). true = ets:insert(?TABLE, {FullHostname, {Ip0, Ip1, Ip2, Ip3}});
insert(_, _, _) ->
ok.

View File

@ -21,8 +21,8 @@
%% API %% API
-export([start_link/2]). -export([start_link/2]).
-export([get_name/1, get_pid/1, assign_ip_addr/6, peer_info/3, unregister/3, debug_info/1, get_network_id/1, get_used_map/1]). -export([get_name/1, get_pid/1, peer_info/3, unregister/3, debug_info/1, get_network_id/1, attach/6]).
-export([forward/5, update_hole/6, disable_client/2, get_channel/2, dropout_client/2, reload/1]). -export([forward/5, update_hole/6, disable_client/2, get_channel/2]).
-export([test_event/1]). -export([test_event/1]).
%% gen_server callbacks %% gen_server callbacks
@ -33,14 +33,20 @@
nat_type :: integer() nat_type :: integer()
}). }).
%% ip的使用信息, %% ip的使用信息, Node的运行时状态信息
-record(host, { -record(endpoint, {
client_id :: binary(),
channel_pid :: undefined | pid(), channel_pid :: undefined | pid(),
monitor_ref :: undefined | reference(), channel_ref :: undefined | reference(),
client_id :: binary(),
mac :: binary(),
ip :: integer(),
hostname :: binary(),
hole :: undefined | #hole{}, hole :: undefined | #hole{},
%% ip和ip_v6的映射关系, #{ip_addr :: integer() => {}} %% ip和ip_v6的映射关系, #{ip_addr :: integer() => {}}
v6_info :: undefined | #sdl_v6_info{} v6_info :: undefined | #sdl_v6_info{},
session_token :: binary(),
last_seen :: integer() %% monotonic_time(second),
}). }).
-record(state, { -record(state, {
@ -53,19 +59,12 @@
%% %%
throttle_key :: atom(), throttle_key :: atom(),
%% %%
forward_bytes = 0, forward_bytes = 0,
%% , AES-256 %% , AES-256
aes_key :: binary(), aes_key :: binary(),
%% 使ip, #{mac :: integer() => Host :: #endpoint{}}
%% ip分配器 endpoints = #{}
%% ip地址
ips = [] :: [Ip :: integer()],
%% 使ip, #{mac :: integer() => Host :: #host{}}
used_map = #{}
}). }).
%%%=================================================================== %%%===================================================================
@ -84,19 +83,14 @@ get_pid(Id) when is_integer(Id) ->
get_name(Id) when is_integer(Id) -> get_name(Id) when is_integer(Id) ->
list_to_atom("sdlan_network:" ++ integer_to_list(Id)). list_to_atom("sdlan_network:" ++ integer_to_list(Id)).
-spec reload(Pid :: pid()) -> ok | {error, Reason :: any()}.
reload(Pid) when is_pid(Pid) ->
gen_server:call(Pid, reload).
-spec assign_ip_addr(Pid :: pid(), ChannelPid :: pid(), ClientId :: binary(), Mac :: binary(), NetAddr :: integer(), HostName :: binary()) ->
{ok, Domain :: binary(), NetAddr :: integer(), MaskLen :: integer(), AesKey :: binary()} | {error, Reason :: any()}.
assign_ip_addr(Pid, ChannelPid, ClientId, Mac, NetAddr, HostName) when is_pid(Pid), is_pid(ChannelPid), is_binary(ClientId), is_binary(Mac), is_integer(NetAddr) ->
gen_server:call(Pid, {assign_ip_addr, ChannelPid, ClientId, Mac, NetAddr, HostName}).
-spec get_network_id(Pid :: pid()) -> {ok, NetworkId :: integer()}. -spec get_network_id(Pid :: pid()) -> {ok, NetworkId :: integer()}.
get_network_id(Pid) when is_pid(Pid) -> get_network_id(Pid) when is_pid(Pid) ->
gen_server:call(Pid, get_network_id). gen_server:call(Pid, get_network_id).
-spec attach(Pid :: pid(), ChannelPid :: pid(), ClientId :: binary(), Mac :: binary(), Ip :: integer(), Hostname :: binary()) -> any().
attach(Pid, ChannelPid, ClientId, Mac, Ip, Hostname) when is_pid(Pid), is_pid(ChannelPid), is_binary(ClientId), is_binary(Mac), is_integer(Ip), is_binary(Hostname) ->
gen_server:call(Pid, {attach, ChannelPid, ClientId, Mac, Ip, Hostname}).
-spec unregister(Pid :: pid(), ClientId :: binary(), Mac :: binary()) -> no_return(). -spec unregister(Pid :: pid(), ClientId :: binary(), Mac :: binary()) -> no_return().
unregister(Pid, ClientId, Mac) when is_pid(Pid), is_binary(ClientId), is_binary(Mac) -> unregister(Pid, ClientId, Mac) when is_pid(Pid), is_binary(ClientId), is_binary(Mac) ->
gen_server:cast(Pid, {unregister, ClientId, Mac}). gen_server:cast(Pid, {unregister, ClientId, Mac}).
@ -123,21 +117,10 @@ disable_client(Pid, ClientId) when is_pid(Pid), is_binary(ClientId) ->
get_channel(Pid, ClientId) when is_pid(Pid), is_binary(ClientId) -> get_channel(Pid, ClientId) when is_pid(Pid), is_binary(ClientId) ->
gen_server:call(Pid, {get_channel, ClientId}). gen_server:call(Pid, {get_channel, ClientId}).
%% client_idchannel不关闭; channel会被重新绑定到其他的network里面
-spec dropout_client(Pid :: pid(), ClientId :: binary()) -> {ok, ChannelPid :: pid(), HostName :: binary()} | error.
dropout_client(Pid, ClientId) when is_pid(Pid), is_binary(ClientId) ->
gen_server:call(Pid, {dropout_client, ClientId}).
-spec debug_info(Pid :: pid()) -> map(). -spec debug_info(Pid :: pid()) -> map().
debug_info(Pid) when is_pid(Pid) -> debug_info(Pid) when is_pid(Pid) ->
gen_server:call(Pid, debug_info). gen_server:call(Pid, debug_info).
-spec get_used_map(Pid :: pid()) -> map().
get_used_map(Pid) when is_pid(Pid) ->
gen_server:call(Pid, get_used_map);
get_used_map(undefined) ->
#{}.
%% @doc Spawns the server and registers the local name (unique) %% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Name :: atom(), Id :: integer()) -> -spec(start_link(Name :: atom(), Id :: integer()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}). {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
@ -160,7 +143,6 @@ init([Id]) when is_integer(Id) ->
ignore; ignore;
{ok, #{<<"id">> := Id, <<"name">> := Name, <<"domain">> := Domain, <<"ipaddr">> := IpAddr0, <<"owner_id">> := OwnerId}} -> {ok, #{<<"id">> := Id, <<"name">> := Name, <<"domain">> := Domain, <<"ipaddr">> := IpAddr0, <<"owner_id">> := OwnerId}} ->
{IpAddr, MaskLen} = parse_ipaddr(IpAddr0), {IpAddr, MaskLen} = parse_ipaddr(IpAddr0),
Ips = sdlan_ipaddr:ips(IpAddr, MaskLen),
AesKey = sdlan_util:rand_byte(32), AesKey = sdlan_util:rand_byte(32),
%% key %% key
ThrottleKey = list_to_atom("network_throttle:" ++ integer_to_list(Id)), ThrottleKey = list_to_atom("network_throttle:" ++ integer_to_list(Id)),
@ -169,15 +151,9 @@ init([Id]) when is_integer(Id) ->
%% %%
erlang:start_timer(?FLOW_REPORT_INTERVAL, self(), flow_report_ticker), erlang:start_timer(?FLOW_REPORT_INTERVAL, self(), flow_report_ticker),
%%
create_mnesia_table(Id),
logger:debug("[sdlan_network] network: ~p, ips: ~p", [Id, lists:map(fun sdlan_ipaddr:int_to_ipv4/1, Ips)]),
sdlan_domain_regedit:insert(Domain), sdlan_domain_regedit:insert(Domain),
{ok, #state{network_id = Id, name = Name, domain = Domain, ipaddr = IpAddr, owner_id = OwnerId, mask_len = MaskLen, ips = Ips, aes_key = AesKey, throttle_key = ThrottleKey}}; {ok, #state{network_id = Id, name = Name, domain = Domain, ipaddr = IpAddr, owner_id = OwnerId, mask_len = MaskLen, aes_key = AesKey, throttle_key = ThrottleKey}};
{error, Reason} -> {error, Reason} ->
logger:warning("[sdlan_network] load network: ~p, get error: ~p", [Id, Reason]), logger:warning("[sdlan_network] load network: ~p, get error: ~p", [Id, Reason]),
ignore ignore
@ -193,127 +169,59 @@ init([Id]) when is_integer(Id) ->
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
%%
handle_call(reload, _From, State = #state{network_id = Id, ipaddr = OldIpAddr, mask_len = OldMarkLen, used_map = UsedMap}) ->
case sdlan_api:get_network(Id) of
{ok, #{<<"name">> := Name, <<"ipaddr">> := IpAddr0, <<"owner_id">> := OwnerId}} ->
{IpAddr, MaskLen} = parse_ipaddr(IpAddr0),
case OldIpAddr =:= IpAddr andalso OldMarkLen =:= MaskLen of
true ->
{reply, ok, State#state{name = Name, owner_id = OwnerId}};
false ->
logger:debug("[sdlan_networkd] network_id: ~p, reload will close all channels", [Id]),
Ips = sdlan_ipaddr:ips(IpAddr, MaskLen),
%%
maps:foreach(fun(_, #host{channel_pid = ChannelPid, monitor_ref = MRef}) ->
is_reference(MRef) andalso demonitor(MRef),
is_process_alive(ChannelPid) andalso sdlan_channel:stop(ChannelPid, normal)
end, UsedMap),
%%
ok = client_model:delete_clients(Id),
{reply, ok, State#state{name = Name, ipaddr = IpAddr,
owner_id = OwnerId, mask_len = MaskLen, ips = Ips, used_map = maps:new()}}
end;
{error, Reason} ->
logger:warning("[sdlan_network] reload network: ~p, get error: ~p", [Id, Reason]),
{reply, {error, Reason}, State}
end;
%% ip地址 %% ip地址
handle_call({assign_ip_addr, ChannelPid, ClientId, Mac, NetAddr0, HostName}, _From, handle_call({attach, ChannelPid, ClientId, Mac, Ip, Hostname}, _From,
State = #state{network_id = NetworkId, domain = Domain, ips = Ips, used_map = UsedMap, mask_len = MaskLen, aes_key = AesKey}) -> State = #state{network_id = NetworkId, domain = Domain, endpoints = Endpoints, aes_key = AesKey}) ->
%% ip地址的时候mac地址为唯一基准 %% ip地址的时候mac地址为唯一基准
logger:debug("[sdlan_network] alloc_ip, network_id: ~p, ips: ~p, client_id: ~p, mac: ~p, net_addr: ~p", logger:debug("[sdlan_network] alloc_ip, network_id: ~p, client_id: ~p, mac: ~p, ip_addr: ~p",
[NetworkId, Ips, ClientId, sdlan_util:format_mac(Mac), sdlan_ipaddr:int_to_ipv4(NetAddr0)]), [NetworkId, ClientId, sdlan_util:format_mac(Mac), sdlan_ipaddr:int_to_ipv4(Ip)]),
case client_model:alloc_ip(NetworkId, Ips, ClientId, Mac, NetAddr0, HostName) of
{ok, Ip} ->
%% channel
maybe_close_channel(maps:get(Mac, UsedMap, undefined)),
%% ->ip的映射关系 %% ->ip的映射关系
case HostName =/= <<>> of sdlan_hostname_regedit:insert(Hostname, Domain, Ip),
%% mac对应的Endpoint存在ip变了arp
%% attach需要清理之前的绑定信息
maybe
{ok, #endpoint{ip = OldIp, channel_pid = OldChannelPid, channel_ref = OldChannelRef}} ?= maps:find(Mac, Endpoints),
true ?= OldIp =/= Ip,
NatChangedEvent = sdlan_pb:encode_msg(#sdl_nat_changed_event{
mac = Mac,
ip = Ip
}),
broadcast(fun(#endpoint{channel_pid = ChannelPid}) ->
sdlan_quic_channel:send_event(ChannelPid, ?PACKET_EVENT_NAT_CHANGED, NatChangedEvent)
end, [Mac], Endpoints),
%%
is_reference(OldChannelRef) andalso demonitor(OldChannelRef),
case OldChannelPid /= undefined andalso is_process_alive(OldChannelPid) of
true -> true ->
FullHostname = <<HostName/binary, ".", Domain/binary>>, sdlan_quic_channel:stop(OldChannelPid, rebind);
sdlan_hostname_regedit:insert(FullHostname, Ip);
false -> false ->
ok ok
end
end, end,
%% channel之间的关系 ChannelRef = monitor(process, ChannelPid),
MRef = monitor(process, ChannelPid), SessionToken = gen_session_token(),
NUsedMap = maps:put(Mac, #host{client_id = ClientId, channel_pid = ChannelPid, monitor_ref = MRef}, UsedMap), Endpoint = #endpoint{channel_pid = ChannelPid, channel_ref = ChannelRef,
client_id = ClientId, mac = Mac, ip = Ip, hostname = Hostname, session_token = SessionToken, last_seen = erlang:monotonic_time(second)},
{reply, {ok, Domain, Ip, MaskLen, AesKey}, State#state{used_map = NUsedMap}}; {reply, {ok, AesKey, SessionToken}, State#state{endpoints = maps:put(Mac, Endpoint, Endpoints)}};
{error, Reason} ->
{reply, {error, Reason}, State}
end;
handle_call(get_used_map, _From, State = #state{used_map = UsedMap}) ->
UsedInfos = maps:map(fun(_, #host{hole = Hole, v6_info = V6Info}) ->
HoleMap = case Hole of
#hole{peer = {NatIp, NatPort}} ->
#{
<<"nat_ip">> => sdlan_ipaddr:int_to_ipv4(sdlan_ipaddr:ipv4_to_int(NatIp)),
<<"nat_port">> => NatPort
};
_ ->
#{}
end,
V6Map = case V6Info of
#sdl_v6_info{v6 = IpV6, port = Port} ->
#{
<<"v6_ip">> => sdlan_ipaddr:ipv6_bytes_to_binary(IpV6),
<<"v6_port">> => Port
};
_ ->
#{}
end,
#{<<"hole">> => HoleMap, <<"v6_info">> => V6Map}
end, UsedMap),
{reply, {ok, UsedInfos}, State};
%% client设置为禁止状态 %% client设置为禁止状态
handle_call({disable_client, ClientId}, _From, State = #state{network_id = NetworkId, used_map = UsedMap}) -> handle_call({disable_client, ClientId}, _From, State = #state{endpoints = Endpoints}) ->
case lists:search(fun({_, #host{client_id = ClientId0}}) -> ClientId =:= ClientId0 end, maps:to_list(UsedMap)) of case search_endpoint(fun(_, #endpoint{client_id = ClientId0}) -> ClientId =:= ClientId0 end, Endpoints) of
{value, {Mac, #host{channel_pid = ChannelPid, monitor_ref = MRef}}} -> {ok, Mac, _} ->
is_reference(MRef) andalso demonitor(MRef), {reply, ok, State#state{endpoints = maps:remove(Mac, Endpoints)}};
sdlan_channel:stop(ChannelPid, disable), error ->
NUsedMap = maps:remove(Mac, UsedMap), {reply, ok, State}
%%
client_model:disable_client(NetworkId, ClientId),
{reply, ok, State#state{used_map = NUsedMap}};
false ->
{reply, error, State}
end; end;
handle_call({get_channel, ClientId}, _From, State = #state{used_map = UsedMap}) -> handle_call({get_channel, ClientId}, _From, State = #state{endpoints = Endpoints}) ->
case lists:search(fun({_, #host{client_id = ClientId0}}) -> ClientId =:= ClientId0 end, maps:to_list(UsedMap)) of case search_endpoint(fun(_, #endpoint{client_id = ClientId0}) -> ClientId =:= ClientId0 end, Endpoints) of
{value, {_Ip, #host{channel_pid = ChannelPid}}} -> {ok, _, #endpoint{channel_pid = ChannelPid}} ->
{reply, {ok, ChannelPid}, State}; {reply, {ok, ChannelPid}, State};
false -> error ->
{reply, error, State}
end;
%% channel, ; drop的时候需要从当前网络中移除
handle_call({dropout_client, ClientId}, _From, State = #state{network_id = NetworkId, used_map = UsedMap}) ->
case lists:search(fun({_, #host{client_id = ClientId0}}) -> ClientId =:= ClientId0 end, maps:to_list(UsedMap)) of
{value, {Mac, #host{channel_pid = ChannelPid, monitor_ref = MRef}}} ->
is_reference(MRef) andalso demonitor(MRef),
NUsedMap = maps:remove(Mac, UsedMap),
%%
case client_model:delete_client(NetworkId, ClientId) of
{ok, #client{host_name = HostName}} ->
{reply, {ok, ChannelPid, HostName}, State#state{used_map = NUsedMap}};
{error, _} ->
{reply, error, State}
end;
false ->
{reply, error, State} {reply, error, State}
end; end;
@ -321,43 +229,33 @@ handle_call(get_network_id, _From, State = #state{network_id = NetworkId}) ->
{reply, {ok, NetworkId}, State}; {reply, {ok, NetworkId}, State};
%% nat_peer信息 %% nat_peer信息
handle_call({peer_info, SrcMac, DstMac}, _From, State = #state{used_map = UsedMap}) -> handle_call({peer_info, SrcMac, DstMac}, _From, State = #state{endpoints = Endpoints}) ->
case maps:find(DstMac, UsedMap) of case maps:find(DstMac, Endpoints) of
{ok, #host{channel_pid = DstChannelPid, hole = #hole{peer = DstNatPeer, nat_type = DstNatType}, v6_info = DstV6Info}} -> {ok, #endpoint{channel_pid = DstChannelPid, hole = #hole{peer = DstNatPeer, nat_type = DstNatType}, v6_info = DstV6Info}} ->
%% sendRegister事件(2024-06-25 ) %% sendRegister事件(2024-06-25 )
case maps:get(SrcMac, UsedMap, undefined) of maybe
#host{hole = #hole{peer = {SrcNatIp, SrcNatPort}, nat_type = NatType}, v6_info = SrcV6Info} -> {ok, #endpoint{hole = #hole{peer = {SrcNatIp, SrcNatPort}, nat_type = SrcNatType}, v6_info = SrcV6Info}} ?= maps:find(SrcMac, Endpoints),
Event = sdlan_pb:encode_msg(#sdl_send_register_event { RegisterEvent = sdlan_pb:encode_msg(#sdl_send_register_event {
dst_mac = SrcMac, dst_mac = SrcMac,
nat_ip = sdlan_ipaddr:ipv4_to_int(SrcNatIp), nat_ip = sdlan_ipaddr:ipv4_to_int(SrcNatIp),
nat_type = NatType, nat_type = SrcNatType,
nat_port = SrcNatPort, nat_port = SrcNatPort,
v6_info = SrcV6Info v6_info = SrcV6Info
}), }),
sdlan_channel:send_event(DstChannelPid, ?PACKET_EVENT_SEND_REGISTER, Event); sdlan_quic_channel:send_event(DstChannelPid, ?PACKET_EVENT_SEND_REGISTER, RegisterEvent)
_ ->
ok
end, end,
{reply, {ok, {DstNatPeer, DstNatType}, DstV6Info}, State}; {reply, {ok, {DstNatPeer, DstNatType}, DstV6Info}, State};
_ -> _ ->
{reply, error, State} {reply, error, State}
end; end;
handle_call(debug_info, _From, State = #state{network_id = NetworkId, ipaddr = IpAddr, mask_len = MaskLen, owner_id = OwnerId, ips = Ips, used_map = UsedMap}) -> handle_call(debug_info, _From, State = #state{network_id = NetworkId, ipaddr = IpAddr, mask_len = MaskLen, owner_id = OwnerId, endpoints = Endpoints}) ->
Reply = #{ Reply = #{
<<"network_id">> => NetworkId, <<"network_id">> => NetworkId,
<<"ipaddr">> => IpAddr, <<"ipaddr">> => IpAddr,
<<"mask_len">> => MaskLen, <<"mask_len">> => MaskLen,
<<"owner_id">> => OwnerId, <<"owner_id">> => OwnerId,
<<"ips">> => lists:map(fun sdlan_ipaddr:int_to_ipv4/1, Ips), <<"used_ips">> => lists:map(fun format_endpoint/1, maps:to_list(Endpoints))
<<"used_ips">> => lists:map(fun({_, Host = #host{client_id = ClientId}}) ->
case client_model:get_client(NetworkId, ClientId) of
error ->
#{};
{ok, #client{mac = Mac, ip = Ip}} ->
format_host(Host, Ip, Mac)
end
end, maps:to_list(UsedMap))
}, },
{reply, Reply, State}. {reply, Reply, State}.
@ -368,34 +266,24 @@ handle_call(debug_info, _From, State = #state{network_id = NetworkId, ipaddr = I
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
%% , mac地址单播 %% , mac地址单播
handle_cast({forward, Sock, SrcMac, DstMac, Packet}, State = #state{network_id = NetworkId, used_map = UsedMap, throttle_key = ThrottleKey, forward_bytes = ForwardBytes}) handle_cast({forward, Sock, SrcMac, DstMac, Packet}, State = #state{network_id = NetworkId, endpoints = Endpoints, throttle_key = ThrottleKey, forward_bytes = ForwardBytes})
when is_map_key(SrcMac, UsedMap), is_map_key(DstMac, UsedMap) -> when is_map_key(SrcMac, Endpoints), is_map_key(DstMac, Endpoints) ->
PacketBytes = byte_size(Packet), PacketBytes = byte_size(Packet),
case maps:find(DstMac, UsedMap) of case maps:find(DstMac, Endpoints) of
{ok, #host{hole = #hole{peer = Peer = {Ip, Port}}}} -> {ok, #endpoint{hole = #hole{peer = Peer = {Ip, Port}}}} ->
case throttle:check(sdlan_network, ThrottleKey) of case limiting_check(ThrottleKey) of
{ok, _RestCount, _LeftToReset} -> pass ->
%% client和stun之间必须有心跳机制保持nat映射可用udp包肯定可以到达对端的nat %% client和stun之间必须有心跳机制保持nat映射可用udp包肯定可以到达对端的nat
logger:debug("[sdlan_network] forward data networkd_id: ~p, src_mac: ~p, dst_mac: ~p, hole: ~p", logger:debug("[sdlan_network] forward data networkd_id: ~p, src_mac: ~p, dst_mac: ~p, hole: ~p",
[NetworkId, sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac), Peer]), [NetworkId, sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac), Peer]),
gen_udp:send(Sock, Ip, Port, Packet), gen_udp:send(Sock, Ip, Port, Packet),
{noreply, State#state{forward_bytes = ForwardBytes + PacketBytes}}; {noreply, State#state{forward_bytes = ForwardBytes + PacketBytes}};
{limit_exceeded, 0, _LeftToReset} -> denied ->
%%
case sdlan_network_coordinator:checkout() of
ok ->
logger:debug("[sdlan_network] use release forward data networkd_id: ~p, src_mac: ~p, dst_mac: ~p, hole: ~p",
[NetworkId, sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac), Peer]),
gen_udp:send(Sock, Ip, Port, Packet),
{noreply, State#state{forward_bytes = ForwardBytes + PacketBytes}};
error ->
logger:notice("[sdlan_network] networkd_id: ~p, src_mac: ~p, dst_mac: ~p, rate limited, discard", logger:notice("[sdlan_network] networkd_id: ~p, src_mac: ~p, dst_mac: ~p, rate limited, discard",
[NetworkId, sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac)]), [NetworkId, sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac)]),
{noreply, State} {noreply, State}
end
end; end;
{ok, _} -> {ok, _} ->
logger:debug("[sdlan_network] networkd_id: ~p, src_mac: ~p, dst_mac: ~p, hole not found", logger:debug("[sdlan_network] networkd_id: ~p, src_mac: ~p, dst_mac: ~p, hole not found",
@ -408,23 +296,16 @@ handle_cast({forward, Sock, SrcMac, DstMac, Packet}, State = #state{network_id =
end; end;
%% , ip广播或组播, %% , ip广播或组播,
handle_cast({forward, Sock, SrcMac, DstMac, Packet}, State = #state{network_id = NetworkId, used_map = UsedMap, forward_bytes = ForwardBytes}) handle_cast({forward, Sock, SrcMac, DstMac, Packet}, State = #state{network_id = NetworkId, endpoints = Endpoints, forward_bytes = ForwardBytes})
when is_map_key(SrcMac, UsedMap) -> when is_map_key(SrcMac, Endpoints) ->
%% 广 %% 广
case sdlan_util:is_broadcast_mac(DstMac) orelse sdlan_util:is_multicast_mac(DstMac) of case sdlan_util:is_broadcast_mac(DstMac) orelse sdlan_util:is_multicast_mac(DstMac) of
true -> true ->
PacketBytes = byte_size(Packet), PacketBytes = byte_size(Packet),
%% 广 %% 广
maps:foreach(fun(Mac, #host{hole = Hole}) -> broadcast(fun(#endpoint{hole = #hole{peer = {NatIp, NatPort}}}) ->
case {Mac =/= SrcMac, Hole} of gen_udp:send(Sock, NatIp, NatPort, Packet)
{true, #hole{peer = {NatIp, NatPort}}} -> end, [SrcMac], Endpoints),
logger:debug("[sdlan_network] call me here"),
gen_udp:send(Sock, NatIp, NatPort, Packet);
_ ->
ok
end
end, UsedMap),
%% client和stun之间必须有心跳机制保持nat映射可用udp包肯定可以到达对端的nat %% client和stun之间必须有心跳机制保持nat映射可用udp包肯定可以到达对端的nat
logger:debug("[sdlan_network] broadcast data networkd_id: ~p, src_mac: ~p, dst_mac: ~p", logger:debug("[sdlan_network] broadcast data networkd_id: ~p, src_mac: ~p, dst_mac: ~p",
[NetworkId, sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac)]), [NetworkId, sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac)]),
@ -442,34 +323,30 @@ handle_cast({forward, _Sock, SrcMac, DstMac, _Packet}, State = #state{network_id
{noreply, State}; {noreply, State};
%% ip的占用并关闭channel %% ip的占用并关闭channel
handle_cast({unregister, _ClientId, Mac}, State = #state{network_id = NetworkId, used_map = UsedMap}) -> handle_cast({unregister, _ClientId, Mac}, State = #state{network_id = NetworkId, endpoints = Endpoints}) ->
logger:debug("[sdlan_network] networkd_id: ~p, unregister Mac: ~p", [NetworkId, sdlan_util:format_mac(Mac)]), logger:debug("[sdlan_network] networkd_id: ~p, unregister Mac: ~p", [NetworkId, sdlan_util:format_mac(Mac)]),
case maps:take(Mac, UsedMap) of {noreply, State#state{endpoints = maps:remove(Mac, Endpoints)}};
error ->
{noreply, State};
{#host{channel_pid = ChannelPid, monitor_ref = MRef}, NUsedMap} ->
is_reference(MRef) andalso demonitor(MRef),
sdlan_channel:stop(ChannelPid, normal),
{noreply, State#state{used_map = NUsedMap}}
end;
%% client是属于当前网络的 %% client是属于当前网络的
handle_cast({update_hole, ClientId, Mac, Peer, NatType, V6Info}, State = #state{network_id = NetworkId, used_map = UsedMap}) -> handle_cast({update_hole, SessionToken, ClientId, Mac, Peer, NatType, V6Info}, State = #state{endpoints = Endpoints}) ->
case {maps:find(Mac, UsedMap), client_model:get_client(NetworkId, ClientId)} of case maps:find(Mac, Endpoints) of
{{ok, Host0 = #host{client_id = ClientId0, hole = OldHole}}, {ok, #client{ip = Ip}}} when ClientId =:= ClientId0 -> %% ClientId =:= ClientId0, SessionToken =:= SessionToken0
case OldHole =:= undefined orelse (OldHole#hole.peer =/= Peer orelse OldHole#hole.nat_type =/= NatType) of {ok, Endpoint0 = #endpoint{ip = Ip, client_id = ClientId, hole = OldHole, session_token = SessionToken}} ->
true -> NHole = #hole{peer = Peer, nat_type = NatType},
maybe
true ?= not same_hole(OldHole, NHole),
NatChangedEvent = sdlan_pb:encode_msg(#sdl_nat_changed_event{ NatChangedEvent = sdlan_pb:encode_msg(#sdl_nat_changed_event{
mac = Mac, mac = Mac,
ip = Ip ip = Ip
}), }),
broadcast(?PACKET_EVENT_NAT_CHANGED, NatChangedEvent, Mac, UsedMap); broadcast(fun(#endpoint{channel_pid = ChannelPid}) ->
false -> sdlan_quic_channel:send_event(ChannelPid, ?PACKET_EVENT_NAT_CHANGED, NatChangedEvent)
ok end, [Mac], Endpoints)
end, end,
Host = Host0#host{hole = #hole{peer = Peer, nat_type = NatType}, v6_info = V6Info}, NEndpoint = Endpoint0#endpoint{hole = NHole, v6_info = V6Info, last_seen = erlang:monotonic_time(second)},
logger:debug("[sdlan_network] mac: ~p, ip: ~p, endpoint is: ~p", [Mac, Ip, NEndpoint]),
{noreply, State#state{used_map = maps:put(Mac, Host, UsedMap)}}; {noreply, State#state{endpoints = maps:put(Mac, NEndpoint, Endpoints)}};
_ -> _ ->
{noreply, State} {noreply, State}
end. end.
@ -485,15 +362,11 @@ handle_info({timeout, _, flow_report_ticker}, State = #state{network_id = Networ
catch sdlan_api:network_forward_report(NetworkId, ForwardBytes), catch sdlan_api:network_forward_report(NetworkId, ForwardBytes),
{noreply, State#state{forward_bytes = 0}}; {noreply, State#state{forward_bytes = 0}};
handle_info({'EXIT', _Pid, shutdown}, State = #state{network_id = NetworkId, used_map = UsedMap}) ->
logger:warning("[sdlan_network] network: ~p, get shutdown message", [NetworkId]),
broadcast_shutdown(UsedMap),
{stop, shutdown, State};
%% Channel进程退出, hole里面的数据也需要清理 %% Channel进程退出, hole里面的数据也需要清理
handle_info({'DOWN', _MRef, process, ChannelPid, Reason}, State = #state{network_id = NetworkId, used_map = UsedMap}) -> handle_info({'DOWN', _MRef, process, ChannelPid, Reason}, State = #state{network_id = NetworkId, endpoints = Endpoints}) ->
logger:notice("[sdlan_network] network_id: ~p, channel_pid: ~p, close with reason: ~p", [NetworkId, ChannelPid, Reason]), logger:notice("[sdlan_network] network_id: ~p, channel_pid: ~p, close with reason: ~p", [NetworkId, ChannelPid, Reason]),
NUsedMap = maps:filter(fun(_, #host{channel_pid = ChannelPid0}) -> ChannelPid =/= ChannelPid0 end, UsedMap), NEndpoints = maps:filter(fun(_, #endpoint{channel_pid = ChannelPid0}) -> ChannelPid =/= ChannelPid0 end, Endpoints),
{noreply, State#state{used_map = NUsedMap}}. {noreply, State#state{endpoints = NEndpoints}}.
%% @private %% @private
%% @doc This function is called by a gen_server when it is about to %% @doc This function is called by a gen_server when it is about to
@ -502,9 +375,20 @@ handle_info({'DOWN', _MRef, process, ChannelPid, Reason}, State = #state{network
%% with Reason. The return value is ignored. %% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()). State :: #state{}) -> term()).
terminate(Reason, #state{network_id = NetworkId, used_map = UsedMap}) -> terminate(Reason, #state{network_id = NetworkId, endpoints = Endpoints}) ->
broadcast(fun(#endpoint{channel_pid = ChannelPid}) ->
case is_process_alive(ChannelPid) of
true ->
NetworkShutdownEvent = sdlan_pb:encode_msg(#sdl_network_shutdown_event{
message = <<"Network shutdown">>
}),
sdlan_quic_channel:send_event(ChannelPid, ?PACKET_EVENT_NETWORK_SHUTDOWN, NetworkShutdownEvent),
sdlan_quic_channel:stop(ChannelPid, normal);
false ->
ok
end
end, Endpoints),
logger:debug("[sdlan_network] network: ~p, will terminate with reason: ~p", [NetworkId, Reason]), logger:debug("[sdlan_network] network: ~p, will terminate with reason: ~p", [NetworkId, Reason]),
broadcast_shutdown(UsedMap),
ok. ok.
%% @private %% @private
@ -519,56 +403,6 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
%%
-spec create_mnesia_table(NetworkId :: integer()) -> no_return().
create_mnesia_table(NetworkId) when is_integer(NetworkId) ->
Tab = client_model:get_table_name(NetworkId),
Tables = mnesia:system_info(tables),
case lists:member(Tab, Tables) of
true ->
ok;
false ->
Res = client_model:create_table(Tab),
logger:debug("[sdlan_network] create table result: ~p", [Res])
end.
-spec maybe_close_channel(undefined | #host{}) -> no_return().
maybe_close_channel(#host{channel_pid = ChannelPid0, monitor_ref = MRef0}) ->
case is_pid(ChannelPid0) andalso is_process_alive(ChannelPid0) of
true ->
is_reference(MRef0) andalso demonitor(MRef0),
sdlan_channel:stop(ChannelPid0, channel_rebind);
false ->
ok
end;
maybe_close_channel(_) ->
ok.
-spec broadcast(EventType :: integer(), Event :: binary(), ExcludeMac :: binary(), UsedMap :: map()) -> no_return().
broadcast(EventType, Event, ExcludeMac, UsedMap) when is_map(UsedMap), is_binary(ExcludeMac), is_integer(EventType), is_binary(Event) ->
maps:foreach(fun(Mac, #host{channel_pid = ChannelPid}) ->
case is_process_alive(ChannelPid) andalso ExcludeMac /= Mac of
true ->
sdlan_channel:send_event(ChannelPid, EventType, Event);
false ->
ok
end
end, UsedMap).
broadcast_shutdown(UsedMap) when is_map(UsedMap) ->
maps:foreach(fun(_, #host{channel_pid = ChannelPid}) ->
case is_process_alive(ChannelPid) of
true ->
NetworkShutdownEvent = sdlan_pb:encode_msg(#sdl_network_shutdown_event {
message = <<"Network shutdown">>
}),
sdlan_channel:send_event(ChannelPid, ?PACKET_EVENT_NETWORK_SHUTDOWN, NetworkShutdownEvent),
sdlan_channel:stop(ChannelPid, normal);
false ->
ok
end
end, UsedMap).
%% IpAddr: <<"192.168.172/24">> %% IpAddr: <<"192.168.172/24">>
-spec parse_ipaddr(IpAddr0 :: binary()) -> {IpAddr :: binary(), MaskLen :: integer()}. -spec parse_ipaddr(IpAddr0 :: binary()) -> {IpAddr :: binary(), MaskLen :: integer()}.
parse_ipaddr(IpAddr0) when is_binary(IpAddr0) -> parse_ipaddr(IpAddr0) when is_binary(IpAddr0) ->
@ -580,18 +414,43 @@ parse_ipaddr(IpAddr0) when is_binary(IpAddr0) ->
{IpAddr0, 24} {IpAddr0, 24}
end. end.
-spec format_host(Host :: #host{}, Ip :: integer(), Mac :: binary()) -> map(). -spec limiting_check(ThrottleKey :: any()) -> pass | denied.
format_host(#host{client_id = ClientId, hole = Hole, v6_info = V6Info}, Ip, Mac) when is_integer(Ip), is_binary(Mac) -> limiting_check(ThrottleKey) ->
HoleMap = case Hole of case throttle:check(sdlan_network, ThrottleKey) of
undefined -> {ok, _RestCount, _LeftToReset} ->
#{}; pass;
#hole{peer = {NatIp, NatPort}, nat_type = NatType} -> {limit_exceeded, 0, _LeftToReset} ->
#{ %%
case sdlan_network_coordinator:checkout() of
ok ->
pass;
error ->
denied
end
end.
-spec broadcast(Fun :: fun((#endpoint{}) -> no_return()), Endpoints :: map()) -> no_return().
broadcast(Fun, Endpoints) when is_function(Fun, 1), is_map(Endpoints) ->
broadcast(Fun, [], Endpoints).
-spec broadcast(Fun :: fun((#endpoint{}) -> no_return()), ExcludeMacs :: [binary()], Endpoints :: map()) -> no_return().
broadcast(Fun, ExcludeMacs, Endpoints) when is_function(Fun, 1), is_map(Endpoints), is_list(ExcludeMacs) ->
maps:foreach(fun(Mac, Endpoint) ->
case lists:member(Mac, ExcludeMacs) of
true ->
ok;
false ->
Fun(Endpoint)
end
end, Endpoints).
-spec format_endpoint({Mac :: binary(), Host :: #endpoint{}}) -> map().
format_endpoint({Mac, #endpoint{client_id = ClientId, ip = Ip, hole = #hole{peer = {NatIp, NatPort}, nat_type = NatType}, v6_info = V6Info}}) ->
HoleMap = #{
nat_ip => NatIp, nat_ip => NatIp,
nat_port => NatPort, nat_port => NatPort,
nat_type => NatType nat_type => NatType
} },
end,
V6InfoMap = case V6Info of V6InfoMap = case V6Info of
undefined -> undefined ->
@ -599,7 +458,6 @@ format_host(#host{client_id = ClientId, hole = Hole, v6_info = V6Info}, Ip, Mac)
#sdl_v6_info{v6 = V6, port = V6Port} -> #sdl_v6_info{v6 = V6, port = V6Port} ->
#{v6 => V6, port => V6Port} #{v6 => V6, port => V6Port}
end, end,
#{ #{
client_id => ClientId, client_id => ClientId,
mac => sdlan_util:format_mac(Mac), mac => sdlan_util:format_mac(Mac),
@ -607,3 +465,30 @@ format_host(#host{client_id = ClientId, hole = Hole, v6_info = V6Info}, Ip, Mac)
hole_map => HoleMap, hole_map => HoleMap,
v6_info => V6InfoMap v6_info => V6InfoMap
}. }.
-spec search_endpoint(F :: fun((term(), term()) -> boolean()), Endpoints :: map()) -> error | {ok, Key :: any(), Val :: any()}.
search_endpoint(F, Endpoints) when is_function(F, 2), is_map(Endpoints) ->
search_endpoint0(F, maps:iterator(Endpoints)).
search_endpoint0(F, Iter) when is_function(F, 2) ->
case maps:next(Iter) of
{Key, Value, NextIter} ->
case F(Key, Value) of
true ->
{ok, Key, Value};
false ->
search_endpoint0(F, NextIter)
end;
'none' ->
error
end.
-spec same_hole(Hole :: #hole{}, Hole :: #hole{}) -> boolean().
same_hole(#hole{peer = OldPeer, nat_type = OldNatType}, #hole{peer = Peer, nat_type = NatType}) when OldPeer =:= Peer, OldNatType =:= NatType ->
true;
same_hole(_, _) ->
false.
-spec gen_session_token() -> binary().
gen_session_token() ->
Bytes = crypto:strong_rand_bytes(32),
base64:encode(Bytes).

File diff suppressed because it is too large Load Diff