added nat type detect

This commit is contained in:
asxalex 2024-07-08 18:40:38 +08:00
parent cf08d2d53c
commit b3b9fcd243
8 changed files with 211 additions and 94 deletions

View File

@ -15,6 +15,8 @@ async fn main() {
CommandLine {
sn: "39.98.184.67:1265".to_owned(),
tcp: "39.98.184.67:18083".to_owned(),
nat_server1: "39.98.184.67:1265".to_owned(),
nat_server2: "47.98.178.3:1265".to_owned(),
_allow_routing: true,
register_ttl: 1,
mtu: 1290,

View File

@ -4,19 +4,18 @@ mod pb;
mod tcp;
mod utils;
use std::time::Duration;
use std::{sync::atomic::AtomicU8, time::Duration};
use std::net::SocketAddr;
pub use network::get_edge;
use network::{async_main, init_edge, NodeConfig};
use tokio::sync::mpsc::{channel, Receiver};
use tokio::sync::mpsc::channel;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
pub use utils::CommandLine;
use sdlan_sn_rs::{
log,
peer::SdlanSock,
utils::{create_or_load_uuid, get_sdlan_sock_from_socketaddr, Result, SDLanError},
};
@ -51,8 +50,14 @@ async fn parse_config(uuid: String, args: &CommandLine) -> Result<NodeConfig> {
if args.sn.len() == 0 {
return Err(SDLanError::NormalError("no sn is specified"));
}
let mut node_conf = NodeConfig::new();
node_conf.allow_p2p = args.allow_p2p;
// node_conf.allow_p2p = args.allow_p2p;
let Ok(nat1) = args.nat_server1.parse::<SocketAddr>() else {
return Err(SDLanError::NormalError("failed to parse nat server1"));
};
let Ok(nat2) = args.nat_server2.parse::<SocketAddr>() else {
return Err(SDLanError::NormalError("failed to parse nat server2"));
};
let sns: Vec<&str> = args.sn.split(",").collect();
let mut correct_sns: Vec<_>;
@ -72,18 +77,32 @@ async fn parse_config(uuid: String, args: &CommandLine) -> Result<NodeConfig> {
}
}
node_conf.super_nodes = sockaddr;
// node_conf.super_nodes = sockaddr;
break;
}
node_conf.name = args.name.to_owned();
node_conf.mtu = args.mtu;
node_conf.node_uuid = uuid;
let mut register_ttl = 1;
if args.register_ttl > 1 {
node_conf.register_ttl = args.register_ttl;
register_ttl = args.register_ttl;
}
let node_conf = NodeConfig {
name: args.name.to_owned(),
allow_routing: true,
_drop_multicast: false,
allow_p2p: args.allow_p2p,
mtu: args.mtu,
_tos: 0,
_register_super_interval: config::REGISTER_SUPER_INTERVAL,
register_ttl,
_local_port: 0,
node_uuid: uuid,
super_nodes: sockaddr,
super_node_index: AtomicU8::new(0),
nat_server1: nat1,
nat_server2: nat2,
};
Ok(node_conf)
}

View File

@ -66,6 +66,10 @@ async fn handle_tcp_message(msg: SdlanTcp) {
edge.set_authorized(true, aes);
send_stun_request(edge).await;
tokio::spawn(async {
let nattype = edge.probe_nat_type().await;
println!("nat type is: {:?}", nattype);
});
}
PacketType::RegisterSuperNAK => {
let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else {

View File

@ -1,14 +1,20 @@
use dashmap::DashMap;
use rsa::RsaPrivateKey;
use sdlan_sn_rs::config::{AF_INET, AF_INET6};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicU8, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::oneshot;
use tracing::{debug, error};
use crate::pb::{encode_to_tcp_message, SdlEmpty};
use crate::tcp::{get_tcp_conn, PacketType};
use crate::utils::{PidRecorder, Socket};
use crate::pb::{
encode_to_tcp_message, encode_to_udp_message, SdlEmpty, SdlStunProbe, SdlStunProbeReply,
};
use crate::tcp::{get_tcp_conn, NatType, PacketType, StunProbeAttr};
use crate::utils::{get_socketaddr_from_sock, send_to_sock, PidRecorder, Socket};
use sdlan_sn_rs::peer::{is_sdlan_sock_equal, IpSubnet, V6Info};
@ -18,7 +24,7 @@ use super::device::{DeviceConfig, Mode};
use super::tun::{new_iface, Iface};
use tokio::fs::File;
use sdlan_sn_rs::utils::{create_or_load_uuid, gen_rsa_keys, load_private_key_file};
use sdlan_sn_rs::utils::{create_or_load_uuid, gen_rsa_keys, ip_to_string, load_private_key_file};
use sdlan_sn_rs::utils::{Result, SDLanError};
static EDGE: OnceCell<Node> = OnceCell::new();
@ -124,6 +130,11 @@ pub struct Node {
// last register super time, in unix
pub _last_register_req: AtomicU64,
nat_type: Mutex<NatType>,
nat_cookie: AtomicU32,
cookie_match: DashMap<u32, oneshot::Sender<SdlStunProbeReply>>,
}
unsafe impl Sync for Node {}
@ -158,6 +169,8 @@ impl Node {
tcp_pong,
nat_type: Mutex::new(NatType::Blocked),
device_config: DeviceConfig::new(),
device: new_iface("dev", Mode::Tun),
@ -184,6 +197,9 @@ impl Node {
_local_v6: RwLock::new(None),
stats: NodeStats::new(),
_last_register_req: AtomicU64::new(0),
nat_cookie: AtomicU32::new(1),
cookie_match: DashMap::new(),
}
}
@ -260,6 +276,61 @@ impl Node {
}
*/
pub async fn send_nat_probe_reply(&self, cookie: u32, buf: SdlStunProbeReply) {
if let Some((_key, chan)) = self.cookie_match.remove(&cookie) {
chan.send(buf);
}
error!("failed to get such cookie stun probe");
}
pub async fn probe_nat_type(&self) -> NatType {
/*
if !self.is_authorized() {
return None;
}
*/
let Ok(reply1) = self
.send_and_wait_for_probe_reply(StunProbeAttr::None, &self.config.nat_server1)
.await
else {
*self.nat_type.lock().unwrap() = NatType::Blocked;
return NatType::Blocked;
};
let Ok(reply2) = self
.send_and_wait_for_probe_reply(StunProbeAttr::None, &self.config.nat_server2)
.await
else {
*self.nat_type.lock().unwrap() = NatType::Blocked;
return NatType::Blocked;
};
if reply1.ip != reply2.ip || reply1.port != reply2.port {
*self.nat_type.lock().unwrap() = NatType::Symmetric;
return NatType::Symmetric;
}
if let Ok(_reply3) = self
.send_and_wait_for_probe_reply(StunProbeAttr::Peer, &self.config.nat_server1)
.await
{
*self.nat_type.lock().unwrap() = NatType::FullCone;
return NatType::FullCone;
}
if let Ok(_reply4) = self
.send_and_wait_for_probe_reply(StunProbeAttr::Port, &self.config.nat_server1)
.await
{
*self.nat_type.lock().unwrap() = NatType::ConeRestricted;
return NatType::ConeRestricted;
} else {
*self.nat_type.lock().unwrap() = NatType::PortRestricted;
return NatType::PortRestricted;
}
}
pub async fn send_unregister_super(&self) -> Result<()> {
let content =
encode_to_tcp_message::<SdlEmpty>(None, 0, PacketType::UnRegisterSuper as u8).unwrap();
@ -270,43 +341,43 @@ impl Node {
Ok(())
}
/*
pub async fn send_register_super(&self) -> Result<()> {
let packet_id = self.packet_id.fetch_add(1, Ordering::Relaxed);
let cmn = Common {
packet_id,
version: SDLAN_VERSION,
id: &self.config.node_uuid,
token: self.token,
ttl: SDLAN_DEFAULT_TTL,
pc: PacketType::PKTRegisterSuper,
flags: 0,
};
let rs = RegisterSuper {
pass: "encrypt!",
cookie: 0,
sock: None,
v6_info: None,
dev_addr: IpSubnetNonAtomic::new(
self.device_config.get_ip(),
self.device_config.get_net_bit(),
),
pub_key: self.rsa_pubkey.clone(),
};
let content = packet::encode_packet(&cmn, &rs)?;
// self.udp_sock_v4.send_to(&content, self.config.super_nodes)
send_to_sock(
async fn send_and_wait_for_probe_reply(
&self,
&content,
&self.config.super_nodes[self.config.super_node_index.load(Ordering::Relaxed) as usize],
)
.await?;
// write_to_addr(&sock, "127.0.0.1:7655", &content).await?;
// println!("sent!");
Ok(())
msgattr: StunProbeAttr,
to_server: &SocketAddr,
) -> Result<SdlStunProbeReply> {
let cookie = self.nat_cookie.fetch_add(1, Ordering::Relaxed);
let probe = SdlStunProbe {
attr: msgattr as u32,
cookie,
};
let (tx, rx) = oneshot::channel();
self.cookie_match.insert(cookie, tx);
// let cookie = msg.cookie;
let msg = encode_to_udp_message(Some(probe), PacketType::StunProbe as u8).unwrap();
if let Err(e) = self.udp_sock_v4.send_to(&msg, to_server).await {
self.cookie_match.remove(&cookie);
return Err(SDLanError::NormalError("send error"));
}
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(3)) => {
self.cookie_match.remove(&cookie);
return Err(SDLanError::NormalError("timed out"));
}
reply = rx => {
self.cookie_match.remove(&cookie);
if let Ok(reply) = reply {
// reply received,
return Ok(reply);
// println!("got nat ip: {}:{}", ip_to_string(&reply.ip), reply.port);
}
return Err(SDLanError::NormalError("reply recv error"));
// step 1 received
}
}
}
*/
}
pub struct PeerMap {
@ -418,30 +489,9 @@ pub struct NodeConfig {
// pub super_attempts: AtomicU8,
pub super_nodes: Vec<SdlanSock>,
pub super_node_index: AtomicU8,
}
impl NodeConfig {
pub fn new() -> Self {
Self {
name: String::new(),
allow_routing: true,
allow_p2p: true,
_drop_multicast: false,
_tos: 0,
_register_super_interval: config::REGISTER_SUPER_INTERVAL,
register_ttl: 1,
// any port,
_local_port: 0,
node_uuid: String::new(),
super_nodes: Vec::new(),
// super_attempts: AtomicU8::new(config::SUPER_ATTEMPTS_DEFAULT),
super_node_index: AtomicU8::new(0),
mtu: 1290,
}
}
pub nat_server1: SocketAddr,
pub nat_server2: SocketAddr,
}
#[derive(Debug)]

View File

@ -4,7 +4,7 @@ use crate::{
config::REGISTER_INTERVAL,
pb::{
encode_to_tcp_message, encode_to_udp_message, SdlData, SdlEmpty, SdlPeerInfo, SdlQueryInfo,
SdlRegister, SdlRegisterAck,
SdlRegister, SdlRegisterAck, SdlStunProbeReply,
},
tcp::{get_tcp_conn, PacketType},
utils::{send_to_sock, Socket},
@ -80,25 +80,6 @@ pub async fn handle_packet(eee: &Node, addr: SocketAddr, buf: &[u8]) -> Result<(
return Err(SDLanError::NormalError("invalid packet type"));
};
match pkt_type {
PacketType::StunReply => {
// stun reply, like pong
}
PacketType::Register => {
if !eee.is_authorized() {
error!("dropping REGISTER received before authorized");
return Ok(());
}
let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap();
let _ = handle_packet_register(eee, &buf[1..], false, &from_sock).await;
}
PacketType::RegisterACK => {
if !eee.is_authorized() {
error!("dropping REGISTER received before authorized");
return Ok(());
}
let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap();
let _ = handle_packet_register_ack(eee, &buf[1..], &from_sock).await;
}
PacketType::Data => {
let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap();
debug!("[PPP]Rx data from {}", from_sock.to_string());
@ -127,6 +108,33 @@ pub async fn handle_packet(eee: &Node, addr: SocketAddr, buf: &[u8]) -> Result<(
}
handle_tun_packet(eee, !data.is_p2p, data).await;
}
PacketType::StunProbeReply => {
println!("got stunprobeReply");
let Ok(reply) = SdlStunProbeReply::decode(&buf[1..]) else {
error!("failed to decode SdlStunReply");
return Ok(());
};
eee.send_nat_probe_reply(reply.cookie, reply).await;
}
PacketType::StunReply => {
// stun reply, like pong
}
PacketType::Register => {
if !eee.is_authorized() {
error!("dropping REGISTER received before authorized");
return Ok(());
}
let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap();
let _ = handle_packet_register(eee, &buf[1..], false, &from_sock).await;
}
PacketType::RegisterACK => {
if !eee.is_authorized() {
error!("dropping REGISTER received before authorized");
return Ok(());
}
let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap();
let _ = handle_packet_register_ack(eee, &buf[1..], &from_sock).await;
}
other => {
error!("udp not processing {:?}", other);
}

View File

@ -27,6 +27,25 @@ pub enum EventType {
NetworkShutdown = 0xFF,
}
#[derive(Debug, Copy, Clone, TryFromPrimitive)]
#[repr(u8)]
pub enum StunProbeAttr {
None = 0,
Port = 1,
Peer = 2,
}
#[derive(Debug, Copy, Clone, TryFromPrimitive)]
#[repr(u8)]
pub enum NatType {
Blocked = 0,
NoNat = 1,
FullCone = 2,
PortRestricted = 3,
ConeRestricted = 4,
Symmetric = 5,
}
#[derive(Debug, Copy, Clone, TryFromPrimitive)]
#[repr(u8)]
pub enum PacketType {

View File

@ -8,6 +8,14 @@ pub struct CommandLine {
#[structopt(short = "t", long = "tcp", default_value = "127.0.0.1:7656")]
pub tcp: String,
/// in the format of "localhost:1234"
#[structopt(long = "nat1")]
pub nat_server1: String,
/// in the format of "localhost:1234"
#[structopt(long = "nat2")]
pub nat_server2: String,
#[structopt(short = "r")]
pub _allow_routing: bool,
@ -46,6 +54,8 @@ impl Clone for CommandLine {
tos: self.tos,
token: self.token.clone(),
allow_p2p: self.allow_p2p,
nat_server1: self.nat_server1.clone(),
nat_server2: self.nat_server2.clone(),
}
}
}

View File

@ -2,7 +2,7 @@ use sdlan_sn_rs::{
config::{AF_INET, AF_INET6},
utils::{Result, SDLanError},
};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4};
use tokio::net::ToSocketAddrs;
use tracing::error;
@ -87,6 +87,11 @@ pub async fn send_to_sock_v4_and_v6(
}
*/
pub fn get_socketaddr_from_sock(s: &SdlanSock) -> SocketAddr {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(s.v4)), s.port);
addr
}
pub async fn send_to_sock(
// sk: &Socket,
eee: &Node,