602 lines
26 KiB
Erlang
602 lines
26 KiB
Erlang
%%%-------------------------------------------------------------------
|
||
%%% @author anlicheng
|
||
%%% @copyright (C) 2024, <COMPANY>
|
||
%%% @doc
|
||
%%%
|
||
%%% @end
|
||
%%% Created : 27. 3月 2024 15:13
|
||
%%%-------------------------------------------------------------------
|
||
-module(sdlan_network).
|
||
-author("anlicheng").
|
||
-include("sdlan.hrl").
|
||
-include("sdlan_pb.hrl").
|
||
|
||
-behaviour(gen_server).
|
||
|
||
-define(FLOW_REPORT_INTERVAL, 60 * 1000).
|
||
|
||
%% broadcast, "FF-FF-FF-FF-FF-FF"
|
||
-define(BROADCAST_MAC, <<16#FF,16#FF,16#FF,16#FF,16#FF,16#FF>>).
|
||
|
||
%% API
|
||
-export([start_link/2]).
|
||
-export([get_name/1, get_pid/1, lookup_pid/1, peer_info/3, unregister/3, debug_info/1, get_network_id/1, attach/6, arp_request/2]).
|
||
-export([forward/5, update_hole/7, disable_client/2, get_channel/2]).
|
||
-export([command/4, wait_command_ack/2]).
|
||
-export([test_event/1]).
|
||
|
||
%% gen_server callbacks
|
||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||
|
||
-record(hole, {
|
||
peer :: {Ip :: inet:ip4_address(), Port :: integer()},
|
||
nat_type :: integer()
|
||
}).
|
||
|
||
%% ip的使用信息, 记录Node的运行时状态信息
|
||
-record(endpoint, {
|
||
channel_pid :: undefined | pid(),
|
||
channel_ref :: undefined | reference(),
|
||
|
||
client_id :: binary(),
|
||
mac :: binary(),
|
||
ip :: integer(),
|
||
hostname :: binary(),
|
||
hole :: undefined | #hole{},
|
||
%% 记录ip和ip_v6的映射关系, #{ip_addr :: integer() => {}}
|
||
v6_info :: undefined | #'SDLV6Info'{},
|
||
session_token :: binary(),
|
||
last_seen :: integer() %% monotonic_time(second),
|
||
}).
|
||
|
||
-record(state, {
|
||
network_id :: integer(),
|
||
name :: binary(),
|
||
domain :: binary(),
|
||
ipaddr :: binary(),
|
||
mask_len :: integer(),
|
||
owner_id :: integer(),
|
||
|
||
%% 加密算法, 默认为chacha20
|
||
algorithm :: binary(),
|
||
%% 同一个网络下公用的密钥, 采用AES-256加密算法;随机生成
|
||
key :: binary(),
|
||
|
||
%% 设置网络带宽
|
||
throttle_key :: atom(),
|
||
%% 转发流量统计
|
||
forward_bytes = 0,
|
||
|
||
%% 记录已经使用了的ip, #{mac :: integer() => Host :: #endpoint{}}
|
||
endpoints = #{}
|
||
}).
|
||
|
||
%%%===================================================================
|
||
%%% API
|
||
%%%===================================================================
|
||
|
||
%% -- MARK: 测试函数
|
||
test_event(Pid) ->
|
||
gen_server:cast(Pid, test_event).
|
||
|
||
-spec get_pid(Id :: integer()) -> undefined | pid().
|
||
get_pid(Id) when is_integer(Id) ->
|
||
whereis(get_name(Id)).
|
||
|
||
-spec lookup_pid(Id :: integer()) -> {ok, Pid :: pid()} | error.
|
||
lookup_pid(Id) when is_integer(Id) ->
|
||
case whereis(get_name(Id)) of
|
||
undefined ->
|
||
error;
|
||
Pid ->
|
||
{ok, Pid}
|
||
end.
|
||
|
||
-spec get_name(Id :: integer()) -> atom().
|
||
get_name(Id) when is_integer(Id) ->
|
||
list_to_atom("sdlan_network:" ++ integer_to_list(Id)).
|
||
|
||
-spec get_network_id(Pid :: pid()) -> {ok, NetworkId :: integer()}.
|
||
get_network_id(Pid) when is_pid(Pid) ->
|
||
gen_server:call(Pid, get_network_id).
|
||
|
||
-spec attach(Pid :: pid(), ChannelPid :: pid(), ClientId :: binary(), Mac :: binary(), Ip :: integer(), Hostname :: binary()) ->
|
||
{ok, Algorithm :: binary(), Key :: binary(), RegionId :: integer(), SessionToken :: binary()}.
|
||
attach(Pid, ChannelPid, ClientId, Mac, Ip, Hostname) when is_pid(Pid), is_pid(ChannelPid), is_binary(ClientId), is_binary(Mac), is_integer(Ip), is_binary(Hostname) ->
|
||
gen_server:call(Pid, {attach, ChannelPid, ClientId, Mac, Ip, Hostname}).
|
||
|
||
-spec unregister(Pid :: pid(), ClientId :: binary(), Mac :: binary()) -> no_return().
|
||
unregister(Pid, ClientId, Mac) when is_pid(Pid), is_binary(ClientId), is_binary(Mac) ->
|
||
gen_server:cast(Pid, {unregister, ClientId, Mac}).
|
||
|
||
-spec peer_info(Pid :: pid(), SrcMac :: binary(), DstMac :: binary()) ->
|
||
error | {ok, {NatPeer :: {Ip :: inet:ip4_address(), Port :: integer()}, NatType :: integer()}, V6Info :: undefined | #'SDLV6Info'{}}.
|
||
peer_info(Pid, SrcMac, DstMac) when is_pid(Pid), is_binary(SrcMac), is_binary(DstMac) ->
|
||
gen_server:call(Pid, {peer_info, SrcMac, DstMac}).
|
||
|
||
-spec arp_request(Pid :: pid(), TargetIp :: integer()) -> error | {ok, Mac :: binary()}.
|
||
arp_request(Pid, TargetIp) when is_pid(Pid), is_integer(TargetIp) ->
|
||
gen_server:call(Pid, {arp_request, TargetIp}).
|
||
|
||
-spec command(Pid :: pid(), ReceiverPid :: pid(), ClientId :: binary(), {Tag :: atom(), SubCommand :: any()}) ->
|
||
{error, Reason :: binary()} | {ok, Ref :: reference()}.
|
||
command(Pid, ReceiverPid, ClientId, SubCommand) when is_pid(Pid), is_pid(ReceiverPid), is_binary(ClientId) ->
|
||
gen_server:call(Pid, {command, ReceiverPid, ClientId, SubCommand}).
|
||
|
||
-spec wait_command_ack(Ref :: reference(), Timeout :: integer()) -> {error, timeout} | {ok, CommandAck :: #'SDLCommandAck'{}}.
|
||
wait_command_ack(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) ->
|
||
receive
|
||
{quic_command_ack, Ref, CommandAck} ->
|
||
{ok, CommandAck}
|
||
after Timeout ->
|
||
{error, timeout}
|
||
end.
|
||
|
||
-spec forward(pid(), Sock :: any(), SrcMac :: binary(), DstMac :: binary(), Packet :: binary()) -> no_return().
|
||
forward(Pid, Sock, SrcMac, DstMac, Packet) when is_pid(Pid), is_binary(SrcMac), is_binary(DstMac), is_binary(Packet) ->
|
||
gen_server:cast(Pid, {forward, Sock, SrcMac, DstMac, Packet}).
|
||
|
||
%% 更新ip地址对应的nat关系
|
||
-spec update_hole(Pid :: pid(), SessionToken :: binary(), ClientId :: binary(), Mac :: binary(), Peer :: tuple(), NatType :: integer(), V6Info :: undefined | #'SDLV6Info'{}) -> no_return().
|
||
update_hole(Pid, SessionToken, ClientId, Mac, Peer, NatType, V6Info) when is_pid(Pid), is_binary(ClientId), is_binary(Mac), is_integer(NatType) ->
|
||
gen_server:cast(Pid, {update_hole, SessionToken, ClientId, Mac, Peer, NatType, V6Info}).
|
||
|
||
-spec disable_client(Pid :: pid(), ClientId :: binary()) -> ok | error.
|
||
disable_client(Pid, ClientId) when is_pid(Pid), is_binary(ClientId) ->
|
||
gen_server:call(Pid, {disable_client, ClientId}).
|
||
|
||
-spec get_channel(Pid :: pid(), ClientId :: binary()) -> error | {ok, ChannelPid :: pid()}.
|
||
get_channel(Pid, ClientId) when is_pid(Pid), is_binary(ClientId) ->
|
||
gen_server:call(Pid, {get_channel, ClientId}).
|
||
|
||
-spec debug_info(Pid :: pid()) -> map().
|
||
debug_info(Pid) when is_pid(Pid) ->
|
||
gen_server:call(Pid, debug_info).
|
||
|
||
%% @doc Spawns the server and registers the local name (unique)
|
||
-spec(start_link(Name :: atom(), Id :: integer()) ->
|
||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||
start_link(Name, Id) when is_atom(Name), is_integer(Id) ->
|
||
gen_server:start_link({local, Name}, ?MODULE, [Id], []).
|
||
|
||
%%%===================================================================
|
||
%%% gen_server callbacks
|
||
%%%===================================================================
|
||
|
||
%% @private
|
||
%% @doc Initializes the server
|
||
-spec(init(Args :: term()) ->
|
||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||
{stop, Reason :: term()} | ignore).
|
||
init([Id]) when is_integer(Id) ->
|
||
erlang:process_flag(trap_exit, true),
|
||
case sdlan_api:get_network(Id) of
|
||
{ok, #{<<"ipaddr">> := Null}} when Null == <<"null">>; Null == <<"NULL">> ->
|
||
ignore;
|
||
{ok, NetworkInfo = #{<<"id">> := Id, <<"name">> := Name, <<"domain">> := Domain, <<"algorithm">> := Algorithm0, <<"ipaddr">> := IpAddr0, <<"owner_id">> := OwnerId}} ->
|
||
logger:debug("[sdlan_network] load network info: ~p", [NetworkInfo]),
|
||
{IpAddr, MaskLen} = parse_ipaddr(IpAddr0),
|
||
%% 限流key
|
||
ThrottleKey = list_to_atom("network_throttle:" ++ integer_to_list(Id)),
|
||
%% 绑定到资源协调器
|
||
sdlan_network_coordinator:attach(self(), ThrottleKey),
|
||
|
||
%% 每分钟汇报一次转发的流量
|
||
erlang:start_timer(?FLOW_REPORT_INTERVAL, self(), flow_report_ticker),
|
||
|
||
sdlan_domain_regedit:insert(Domain),
|
||
|
||
%% 处理加密算法
|
||
Algorithm = normalization_algorithm(Algorithm0),
|
||
Key = gen_key(Algorithm),
|
||
|
||
{ok, #state{network_id = Id, name = Name, domain = Domain, ipaddr = IpAddr, algorithm = Algorithm,
|
||
owner_id = OwnerId, mask_len = MaskLen, key = Key, throttle_key = ThrottleKey}};
|
||
{error, Reason} ->
|
||
logger:warning("[sdlan_network] load network: ~p, get error: ~p", [Id, Reason]),
|
||
ignore
|
||
end.
|
||
|
||
%% @private
|
||
%% @doc Handling call messages
|
||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||
State :: #state{}) ->
|
||
{reply, Reply :: term(), NewState :: #state{}} |
|
||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||
{noreply, NewState :: #state{}} |
|
||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||
{stop, Reason :: term(), NewState :: #state{}}).
|
||
%% 给客户端分配ip地址
|
||
handle_call({attach, ChannelPid, ClientId, Mac, Ip, Hostname}, _From,
|
||
State = #state{network_id = NetworkId, domain = Domain, endpoints = Endpoints, algorithm = Algorithm, key = Key}) ->
|
||
%% 分配ip地址的时候,以mac地址为唯一基准
|
||
logger:debug("[sdlan_network] alloc_ip, network_id: ~p, client_id: ~p, mac: ~p, ip_addr: ~p",
|
||
[NetworkId, ClientId, sdlan_util:format_mac(Mac), sdlan_util:int_to_ipv4(Ip)]),
|
||
%% 添加域名->ip的映射关系
|
||
sdlan_hostname_regedit:insert(Hostname, Domain, Ip),
|
||
|
||
%% mac对应的Endpoint存在,并且对应的ip变了,需要通知端上清理arp
|
||
%% 重复attach需要清理之前的绑定信息
|
||
maybe
|
||
{ok, #endpoint{ip = OldIp, channel_pid = OldChannelPid, channel_ref = OldChannelRef}} ?= maps:find(Mac, Endpoints),
|
||
true ?= OldIp =/= Ip,
|
||
|
||
Event = sdlan_pb:encode_msg(#'SDLEvent'{
|
||
event = {nat_changed, #'SDLEvent.NatChanged' {
|
||
mac = Mac,
|
||
ip = Ip
|
||
}}
|
||
}),
|
||
logger:debug("Event: nat_changed, for attach"),
|
||
|
||
broadcast(fun(#endpoint{channel_pid = ChannelPid0}) ->
|
||
sdlan_quic_channel:send_event(ChannelPid0, Event)
|
||
end, [Mac], Endpoints),
|
||
|
||
%% 清理就的绑定关系
|
||
is_reference(OldChannelRef) andalso demonitor(OldChannelRef),
|
||
case OldChannelPid /= undefined andalso is_process_alive(OldChannelPid) of
|
||
true ->
|
||
sdlan_quic_channel:stop(OldChannelPid, rebind);
|
||
false ->
|
||
ok
|
||
end
|
||
end,
|
||
|
||
ChannelRef = monitor(process, ChannelPid),
|
||
SessionToken = gen_session_token(),
|
||
Endpoint = #endpoint{channel_pid = ChannelPid, channel_ref = ChannelRef,
|
||
client_id = ClientId, mac = Mac, ip = Ip, hostname = Hostname, session_token = SessionToken, last_seen = erlang:monotonic_time(second)},
|
||
|
||
%% 生成对应的分区id
|
||
RegionId = gen_region_id(Ip),
|
||
|
||
{reply, {ok, Algorithm, Key, RegionId, SessionToken}, State#state{endpoints = maps:put(Mac, Endpoint, Endpoints)}};
|
||
|
||
%% client设置为禁止状态,不允许重连
|
||
handle_call({disable_client, ClientId}, _From, State = #state{endpoints = Endpoints}) ->
|
||
case search_endpoint(fun(_, #endpoint{client_id = ClientId0}) -> ClientId =:= ClientId0 end, Endpoints) of
|
||
{ok, Mac, _} ->
|
||
{reply, ok, State#state{endpoints = maps:remove(Mac, Endpoints)}};
|
||
error ->
|
||
{reply, ok, State}
|
||
end;
|
||
|
||
handle_call({get_channel, ClientId}, _From, State = #state{endpoints = Endpoints}) ->
|
||
case search_endpoint(fun(_, #endpoint{client_id = ClientId0}) -> ClientId =:= ClientId0 end, Endpoints) of
|
||
{ok, _, #endpoint{channel_pid = ChannelPid}} ->
|
||
{reply, {ok, ChannelPid}, State};
|
||
error ->
|
||
{reply, error, State}
|
||
end;
|
||
|
||
handle_call(get_network_id, _From, State = #state{network_id = NetworkId}) ->
|
||
{reply, {ok, NetworkId}, State};
|
||
|
||
%% 网络存在的nat_peer信息
|
||
handle_call({peer_info, SrcMac, DstMac}, _From, State = #state{endpoints = Endpoints}) ->
|
||
case maps:find(DstMac, Endpoints) of
|
||
{ok, #endpoint{channel_pid = DstChannelPid, hole = #hole{peer = DstNatPeer, nat_type = DstNatType}, v6_info = DstV6Info}} ->
|
||
%% 让目标服务器发送sendRegister事件(2024-06-25 新增,提高打洞的成功率)
|
||
maybe
|
||
{ok, #endpoint{hole = #hole{peer = {SrcNatIp, SrcNatPort}, nat_type = SrcNatType}, v6_info = SrcV6Info}} ?= maps:find(SrcMac, Endpoints),
|
||
|
||
RegisterEvent = sdlan_pb:encode_msg(#'SDLEvent' {
|
||
event = {send_register, #'SDLEvent.SendRegister'{
|
||
dst_mac = SrcMac,
|
||
nat_ip = sdlan_util:ipv4_to_int(SrcNatIp),
|
||
nat_type = SrcNatType,
|
||
nat_port = SrcNatPort,
|
||
v6_info = SrcV6Info
|
||
}}
|
||
}),
|
||
logger:debug("Event: send_register, for peer_info"),
|
||
|
||
sdlan_quic_channel:send_event(DstChannelPid, RegisterEvent)
|
||
end,
|
||
{reply, {ok, {DstNatPeer, DstNatType}, DstV6Info}, State};
|
||
_ ->
|
||
{reply, error, State}
|
||
end;
|
||
|
||
%% arp查询
|
||
handle_call({arp_request, TargetIp}, _From, State = #state{endpoints = Endpoints}) ->
|
||
case search_endpoint(fun(_, #endpoint{ip = Ip0}) -> Ip0 =:= TargetIp end, Endpoints) of
|
||
error ->
|
||
{reply, error, State};
|
||
{ok, Mac, _} ->
|
||
{reply, {ok, Mac}, State}
|
||
end;
|
||
|
||
%% 发送命令
|
||
handle_call({command, ReceiverPid, ClientId, SubCommand}, _From, State = #state{endpoints = Endpoints}) ->
|
||
case search_endpoint(fun(_, #endpoint{client_id = ClientId0}) -> ClientId =:= ClientId0 end, Endpoints) of
|
||
{ok, _Mac, #endpoint{channel_pid = ChannelPid}} ->
|
||
Ref = make_ref(),
|
||
sdlan_quic_channel:command(ChannelPid, Ref, ReceiverPid, SubCommand),
|
||
{reply, {ok, Ref}, State};
|
||
error ->
|
||
{reply, {error, <<"目标Node不在线"/utf8>>}}
|
||
end;
|
||
|
||
handle_call(debug_info, _From, State = #state{network_id = NetworkId, ipaddr = IpAddr, mask_len = MaskLen, owner_id = OwnerId, endpoints = Endpoints}) ->
|
||
Reply = #{
|
||
<<"network_id">> => NetworkId,
|
||
<<"ipaddr">> => IpAddr,
|
||
<<"mask_len">> => MaskLen,
|
||
<<"owner_id">> => OwnerId,
|
||
<<"used_ips">> => lists:map(fun format_endpoint/1, maps:to_list(Endpoints))
|
||
},
|
||
{reply, Reply, State}.
|
||
|
||
%% @private
|
||
%% @doc Handling cast messages
|
||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||
{noreply, NewState :: #state{}} |
|
||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||
{stop, Reason :: term(), NewState :: #state{}}).
|
||
%% 网络数据转发, mac地址单播
|
||
handle_cast({forward, Sock, SrcMac, DstMac, Packet}, State = #state{network_id = NetworkId, endpoints = Endpoints, throttle_key = ThrottleKey, forward_bytes = ForwardBytes})
|
||
when is_map_key(SrcMac, Endpoints), is_map_key(DstMac, Endpoints) ->
|
||
|
||
PacketBytes = byte_size(Packet),
|
||
case maps:find(DstMac, Endpoints) of
|
||
{ok, #endpoint{hole = #hole{peer = Peer = {NatIp, NatPort}}}} ->
|
||
case limiting_check(ThrottleKey) of
|
||
pass ->
|
||
%% client和stun之间必须有心跳机制保持nat映射可用,并且通过服务转发的udp包肯定可以到达对端的nat
|
||
logger:debug("[sdlan_network] forward data networkd_id: ~p, src_mac: ~p, dst_mac: ~p, hole: ~p",
|
||
[NetworkId, sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac), Peer]),
|
||
|
||
gen_udp:send(Sock, NatIp, NatPort, Packet),
|
||
{noreply, State#state{forward_bytes = ForwardBytes + PacketBytes}};
|
||
denied ->
|
||
logger:notice("[sdlan_network] networkd_id: ~p, src_mac: ~p, dst_mac: ~p, rate limited, discard",
|
||
[NetworkId, sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac)]),
|
||
{noreply, State}
|
||
end;
|
||
{ok, _} ->
|
||
logger:debug("[sdlan_network] networkd_id: ~p, src_mac: ~p, dst_mac: ~p, hole not found",
|
||
[NetworkId, sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac)]),
|
||
{noreply, State};
|
||
error ->
|
||
logger:debug("[sdlan_network] networkd_id: ~p, src_mac: ~p, dst_mac: ~p not found",
|
||
[NetworkId, sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac)]),
|
||
{noreply, State}
|
||
end;
|
||
|
||
%% 网络数据转发, ip广播或组播, 不限流
|
||
handle_cast({forward, Sock, SrcMac, DstMac, Packet}, State = #state{network_id = NetworkId, endpoints = Endpoints, forward_bytes = ForwardBytes})
|
||
when is_map_key(SrcMac, Endpoints) ->
|
||
%% 广播地址和组播地址,需要转发到整个网络
|
||
case sdlan_util:is_broadcast_mac(DstMac) orelse sdlan_util:is_multicast_mac(DstMac) of
|
||
true ->
|
||
PacketBytes = byte_size(Packet),
|
||
%% 消息广播
|
||
broadcast(fun(#endpoint{hole = #hole{peer = {NatIp, NatPort}}}) ->
|
||
gen_udp:send(Sock, NatIp, NatPort, Packet)
|
||
end, [SrcMac], Endpoints),
|
||
%% client和stun之间必须有心跳机制保持nat映射可用,并且通过服务转发的udp包肯定可以到达对端的nat
|
||
logger:debug("[sdlan_network] broadcast data networkd_id: ~p, src_mac: ~p, dst_mac: ~p",
|
||
[NetworkId, sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac)]),
|
||
|
||
{noreply, State#state{forward_bytes = ForwardBytes + PacketBytes}};
|
||
false ->
|
||
logger:debug("[sdlan_network] networkd_id: ~p, src_mac: ~p, dst_mac: ~p, forward discard 1",
|
||
[NetworkId, sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac)]),
|
||
{noreply, State}
|
||
end;
|
||
|
||
handle_cast({forward, _Sock, SrcMac, DstMac, _Packet}, State = #state{network_id = NetworkId}) ->
|
||
logger:debug("[sdlan_network] networkd_id: ~p, src_mac: ~p, dst_mac: ~p, forward discard 2",
|
||
[NetworkId, sdlan_util:format_mac(SrcMac), sdlan_util:format_mac(DstMac)]),
|
||
{noreply, State};
|
||
|
||
%% 删除ip的占用并关闭channel
|
||
handle_cast({unregister, _ClientId, Mac}, State = #state{network_id = NetworkId, endpoints = Endpoints}) ->
|
||
logger:debug("[sdlan_network] networkd_id: ~p, unregister Mac: ~p", [NetworkId, sdlan_util:format_mac(Mac)]),
|
||
{noreply, State#state{endpoints = maps:remove(Mac, Endpoints)}};
|
||
|
||
%% 需要判断,client是属于当前网络的
|
||
handle_cast({update_hole, SessionToken, ClientId, Mac, Peer, NatType, V6Info}, State = #state{endpoints = Endpoints}) ->
|
||
case maps:find(Mac, Endpoints) of
|
||
%% ClientId =:= ClientId0, SessionToken =:= SessionToken0
|
||
{ok, Endpoint0 = #endpoint{ip = Ip, client_id = ClientId, hole = OldHole, session_token = SessionToken}} ->
|
||
NHole = #hole{peer = Peer, nat_type = NatType},
|
||
maybe
|
||
true ?= not same_hole(OldHole, NHole),
|
||
|
||
NatChangedEvent = sdlan_pb:encode_msg(#'SDLEvent' {
|
||
event = {nat_changed, #'SDLEvent.NatChanged'{
|
||
mac = Mac,
|
||
ip = Ip
|
||
}}
|
||
}),
|
||
|
||
logger:debug("[sdlan_network] Event: nat_changed, update_hole, client_id: ~p(~p), hole changed", [ClientId, Ip]),
|
||
broadcast(fun(#endpoint{channel_pid = ChannelPid}) ->
|
||
sdlan_quic_channel:send_event(ChannelPid, NatChangedEvent)
|
||
end, [Mac], Endpoints)
|
||
end,
|
||
NEndpoint = Endpoint0#endpoint{hole = NHole, v6_info = V6Info, last_seen = erlang:monotonic_time(second)},
|
||
logger:debug("[sdlan_network] mac: ~p, ip: ~p, endpoint is: ~p", [Mac, Ip, NEndpoint]),
|
||
|
||
{noreply, State#state{endpoints = maps:put(Mac, NEndpoint, Endpoints)}};
|
||
_ ->
|
||
{noreply, State}
|
||
end.
|
||
|
||
%% @private
|
||
%% @doc Handling all non call/cast messages
|
||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||
{noreply, NewState :: #state{}} |
|
||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||
{stop, Reason :: term(), NewState :: #state{}}).
|
||
handle_info({timeout, _, flow_report_ticker}, State = #state{network_id = NetworkId, forward_bytes = ForwardBytes}) ->
|
||
erlang:start_timer(?FLOW_REPORT_INTERVAL, self(), flow_report_ticker),
|
||
catch sdlan_api:network_forward_report(NetworkId, ForwardBytes),
|
||
{noreply, State#state{forward_bytes = 0}};
|
||
|
||
%% Channel进程退出, hole里面的数据也需要清理
|
||
handle_info({'DOWN', _MRef, process, ChannelPid, Reason}, State = #state{network_id = NetworkId, endpoints = Endpoints}) ->
|
||
logger:notice("[sdlan_network] network_id: ~p, channel_pid: ~p, close with reason: ~p", [NetworkId, ChannelPid, Reason]),
|
||
NEndpoints = maps:filter(fun(_, #endpoint{channel_pid = ChannelPid0}) -> ChannelPid =/= ChannelPid0 end, Endpoints),
|
||
{noreply, State#state{endpoints = NEndpoints}}.
|
||
|
||
%% @private
|
||
%% @doc This function is called by a gen_server when it is about to
|
||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||
%% with Reason. The return value is ignored.
|
||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||
State :: #state{}) -> none()).
|
||
terminate(Reason, #state{network_id = NetworkId, endpoints = Endpoints}) ->
|
||
broadcast(fun(#endpoint{channel_pid = ChannelPid}) ->
|
||
case is_process_alive(ChannelPid) of
|
||
true ->
|
||
NetworkShutdownEvent = sdlan_pb:encode_msg(#'SDLEvent'{
|
||
event = {shutdown, #'SDLEvent.NetworkShutdown'{
|
||
message = <<"Network shutdown">>
|
||
}}
|
||
}),
|
||
|
||
logger:debug("[sdlan_network] Event: shutdown"),
|
||
|
||
sdlan_quic_channel:send_event(ChannelPid, NetworkShutdownEvent),
|
||
sdlan_quic_channel:stop(ChannelPid, normal);
|
||
false ->
|
||
ok
|
||
end
|
||
end, [], Endpoints),
|
||
logger:debug("[sdlan_network] network: ~p, will terminate with reason: ~p", [NetworkId, Reason]),
|
||
ok.
|
||
|
||
%% @private
|
||
%% @doc Convert process state when code is changed
|
||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||
Extra :: term()) ->
|
||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||
{ok, State}.
|
||
|
||
%%%===================================================================
|
||
%%% Internal functions
|
||
%%%===================================================================
|
||
|
||
%% 解析IpAddr: <<"192.168.172/24">>
|
||
-spec parse_ipaddr(IpAddr0 :: binary()) -> {IpAddr :: binary(), MaskLen :: integer()}.
|
||
parse_ipaddr(IpAddr0) when is_binary(IpAddr0) ->
|
||
case binary:split(IpAddr0, <<"/">>) of
|
||
[IpAddr, MaskLen] ->
|
||
MaskLen1 = binary_to_integer(MaskLen),
|
||
{IpAddr, MaskLen1};
|
||
_ ->
|
||
{IpAddr0, 24}
|
||
end.
|
||
|
||
-spec limiting_check(ThrottleKey :: any()) -> pass | denied.
|
||
limiting_check(ThrottleKey) ->
|
||
case throttle:check(sdlan_network, ThrottleKey) of
|
||
{ok, _RestCount, _LeftToReset} ->
|
||
pass;
|
||
{limit_exceeded, 0, _LeftToReset} ->
|
||
%% 尝试获取其他网络是否有让渡的资源
|
||
case sdlan_network_coordinator:checkout() of
|
||
ok ->
|
||
pass;
|
||
error ->
|
||
denied
|
||
end
|
||
end.
|
||
|
||
-spec broadcast(Fun :: fun((#endpoint{}) -> no_return()), ExcludeMacs :: [binary()], Endpoints :: map()) -> no_return().
|
||
broadcast(Fun, ExcludeMacs, Endpoints) when is_function(Fun, 1), is_map(Endpoints), is_list(ExcludeMacs) ->
|
||
maps:foreach(fun(Mac, Endpoint) ->
|
||
case lists:member(Mac, ExcludeMacs) of
|
||
true ->
|
||
ok;
|
||
false ->
|
||
Fun(Endpoint)
|
||
end
|
||
end, Endpoints).
|
||
|
||
-spec format_endpoint({Mac :: binary(), Host :: #endpoint{}}) -> map().
|
||
format_endpoint({Mac, #endpoint{client_id = ClientId, ip = Ip, hole = #hole{peer = {NatIp, NatPort}, nat_type = NatType}, v6_info = V6Info}}) ->
|
||
HoleMap = #{
|
||
nat_ip => NatIp,
|
||
nat_port => NatPort,
|
||
nat_type => NatType
|
||
},
|
||
|
||
V6InfoMap = case V6Info of
|
||
undefined ->
|
||
#{};
|
||
#'SDLV6Info'{v6 = V6, port = V6Port} ->
|
||
#{v6 => V6, port => V6Port}
|
||
end,
|
||
#{
|
||
client_id => ClientId,
|
||
mac => sdlan_util:format_mac(Mac),
|
||
ip => sdlan_util:int_to_ipv4(Ip),
|
||
hole_map => HoleMap,
|
||
v6_info => V6InfoMap
|
||
}.
|
||
|
||
-spec search_endpoint(F :: fun((term(), term()) -> boolean()), Endpoints :: map()) -> error | {ok, Key :: any(), Val :: any()}.
|
||
search_endpoint(F, Endpoints) when is_function(F, 2), is_map(Endpoints) ->
|
||
search_endpoint0(F, maps:iterator(Endpoints)).
|
||
search_endpoint0(F, Iter) when is_function(F, 2) ->
|
||
case maps:next(Iter) of
|
||
{Key, Value, NextIter} ->
|
||
case F(Key, Value) of
|
||
true ->
|
||
{ok, Key, Value};
|
||
false ->
|
||
search_endpoint0(F, NextIter)
|
||
end;
|
||
'none' ->
|
||
error
|
||
end.
|
||
|
||
-spec same_hole(Hole :: #hole{}, Hole :: #hole{}) -> boolean().
|
||
same_hole(#hole{peer = OldPeer, nat_type = OldNatType}, #hole{peer = Peer, nat_type = NatType}) when OldPeer =:= Peer, OldNatType =:= NatType ->
|
||
true;
|
||
same_hole(_, _) ->
|
||
false.
|
||
|
||
-spec gen_session_token() -> binary().
|
||
gen_session_token() ->
|
||
Bytes = crypto:strong_rand_bytes(32),
|
||
base64:encode(Bytes).
|
||
|
||
-spec normalization_algorithm(any()) -> binary().
|
||
normalization_algorithm(<<"aes">>) ->
|
||
<<"aes">>;
|
||
normalization_algorithm(<<"chacha20">>) ->
|
||
<<"chacha20">>;
|
||
normalization_algorithm(_) ->
|
||
<<"chacha20">>.
|
||
|
||
-spec gen_key(Algorithm :: binary()) -> Key :: binary().
|
||
gen_key(<<"aes">>) ->
|
||
sdlan_util:rand_byte(32);
|
||
gen_key(<<"chacha20">>) ->
|
||
sdlan_util:rand_byte(32).
|
||
|
||
-spec gen_region_id(IpInt :: integer()) -> integer().
|
||
gen_region_id(IpInt) ->
|
||
%% 把整数 IP 转成字符串
|
||
IpStr = integer_to_list(IpInt),
|
||
%% 拼接盐
|
||
FullStr = "salt_fG7xQp2BzH9L" ++ IpStr,
|
||
time33(FullStr, 5381).
|
||
|
||
%% 核心Time33算法
|
||
-spec time33(string(), integer()) -> integer().
|
||
time33([], Hash) ->
|
||
Hash band 16#FFFFFFFF; % 32位
|
||
time33([C|Rest], Hash) ->
|
||
%% hash = hash * 33 + char
|
||
NewHash = ((Hash bsl 5) + Hash) + C,
|
||
time33(Rest, NewHash). |