diff --git a/src/bin/sdlan/main.rs b/src/bin/sdlan/main.rs index 05f6bfe..4b89827 100644 --- a/src/bin/sdlan/main.rs +++ b/src/bin/sdlan/main.rs @@ -15,6 +15,8 @@ async fn main() { CommandLine { sn: "39.98.184.67:1265".to_owned(), tcp: "39.98.184.67:18083".to_owned(), + nat_server1: "39.98.184.67:1265".to_owned(), + nat_server2: "47.98.178.3:1265".to_owned(), _allow_routing: true, register_ttl: 1, mtu: 1290, diff --git a/src/lib.rs b/src/lib.rs index f731ccc..653eb0d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,19 +4,18 @@ mod pb; mod tcp; mod utils; -use std::time::Duration; +use std::{sync::atomic::AtomicU8, time::Duration}; use std::net::SocketAddr; pub use network::get_edge; use network::{async_main, init_edge, NodeConfig}; -use tokio::sync::mpsc::{channel, Receiver}; +use tokio::sync::mpsc::channel; use tokio_util::sync::CancellationToken; use tracing::{debug, error}; pub use utils::CommandLine; use sdlan_sn_rs::{ - log, peer::SdlanSock, utils::{create_or_load_uuid, get_sdlan_sock_from_socketaddr, Result, SDLanError}, }; @@ -51,8 +50,14 @@ async fn parse_config(uuid: String, args: &CommandLine) -> Result { if args.sn.len() == 0 { return Err(SDLanError::NormalError("no sn is specified")); } - let mut node_conf = NodeConfig::new(); - node_conf.allow_p2p = args.allow_p2p; + // node_conf.allow_p2p = args.allow_p2p; + + let Ok(nat1) = args.nat_server1.parse::() else { + return Err(SDLanError::NormalError("failed to parse nat server1")); + }; + let Ok(nat2) = args.nat_server2.parse::() else { + return Err(SDLanError::NormalError("failed to parse nat server2")); + }; let sns: Vec<&str> = args.sn.split(",").collect(); let mut correct_sns: Vec<_>; @@ -72,18 +77,32 @@ async fn parse_config(uuid: String, args: &CommandLine) -> Result { } } - node_conf.super_nodes = sockaddr; + // node_conf.super_nodes = sockaddr; break; } - node_conf.name = args.name.to_owned(); - node_conf.mtu = args.mtu; - node_conf.node_uuid = uuid; - + let mut register_ttl = 1; if args.register_ttl > 1 { - node_conf.register_ttl = args.register_ttl; + register_ttl = args.register_ttl; } + let node_conf = NodeConfig { + name: args.name.to_owned(), + allow_routing: true, + _drop_multicast: false, + allow_p2p: args.allow_p2p, + mtu: args.mtu, + _tos: 0, + _register_super_interval: config::REGISTER_SUPER_INTERVAL, + register_ttl, + _local_port: 0, + node_uuid: uuid, + super_nodes: sockaddr, + super_node_index: AtomicU8::new(0), + nat_server1: nat1, + nat_server2: nat2, + }; + Ok(node_conf) } diff --git a/src/network/async_main.rs b/src/network/async_main.rs index f228d88..5a91b0a 100644 --- a/src/network/async_main.rs +++ b/src/network/async_main.rs @@ -66,6 +66,10 @@ async fn handle_tcp_message(msg: SdlanTcp) { edge.set_authorized(true, aes); send_stun_request(edge).await; + tokio::spawn(async { + let nattype = edge.probe_nat_type().await; + println!("nat type is: {:?}", nattype); + }); } PacketType::RegisterSuperNAK => { let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else { diff --git a/src/network/node.rs b/src/network/node.rs index b68b4fb..948eab4 100644 --- a/src/network/node.rs +++ b/src/network/node.rs @@ -1,14 +1,20 @@ use dashmap::DashMap; use rsa::RsaPrivateKey; use sdlan_sn_rs::config::{AF_INET, AF_INET6}; +use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicU8, Ordering}; use std::sync::{Arc, Mutex, RwLock}; +use std::time::Duration; use tokio::io::AsyncReadExt; -use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::oneshot; +use tracing::{debug, error}; -use crate::pb::{encode_to_tcp_message, SdlEmpty}; -use crate::tcp::{get_tcp_conn, PacketType}; -use crate::utils::{PidRecorder, Socket}; +use crate::pb::{ + encode_to_tcp_message, encode_to_udp_message, SdlEmpty, SdlStunProbe, SdlStunProbeReply, +}; +use crate::tcp::{get_tcp_conn, NatType, PacketType, StunProbeAttr}; +use crate::utils::{get_socketaddr_from_sock, send_to_sock, PidRecorder, Socket}; use sdlan_sn_rs::peer::{is_sdlan_sock_equal, IpSubnet, V6Info}; @@ -18,7 +24,7 @@ use super::device::{DeviceConfig, Mode}; use super::tun::{new_iface, Iface}; use tokio::fs::File; -use sdlan_sn_rs::utils::{create_or_load_uuid, gen_rsa_keys, load_private_key_file}; +use sdlan_sn_rs::utils::{create_or_load_uuid, gen_rsa_keys, ip_to_string, load_private_key_file}; use sdlan_sn_rs::utils::{Result, SDLanError}; static EDGE: OnceCell = OnceCell::new(); @@ -124,6 +130,11 @@ pub struct Node { // last register super time, in unix pub _last_register_req: AtomicU64, + + nat_type: Mutex, + + nat_cookie: AtomicU32, + cookie_match: DashMap>, } unsafe impl Sync for Node {} @@ -158,6 +169,8 @@ impl Node { tcp_pong, + nat_type: Mutex::new(NatType::Blocked), + device_config: DeviceConfig::new(), device: new_iface("dev", Mode::Tun), @@ -184,6 +197,9 @@ impl Node { _local_v6: RwLock::new(None), stats: NodeStats::new(), _last_register_req: AtomicU64::new(0), + + nat_cookie: AtomicU32::new(1), + cookie_match: DashMap::new(), } } @@ -260,6 +276,61 @@ impl Node { } */ + pub async fn send_nat_probe_reply(&self, cookie: u32, buf: SdlStunProbeReply) { + if let Some((_key, chan)) = self.cookie_match.remove(&cookie) { + chan.send(buf); + } + error!("failed to get such cookie stun probe"); + } + + pub async fn probe_nat_type(&self) -> NatType { + /* + if !self.is_authorized() { + return None; + } + */ + + let Ok(reply1) = self + .send_and_wait_for_probe_reply(StunProbeAttr::None, &self.config.nat_server1) + .await + else { + *self.nat_type.lock().unwrap() = NatType::Blocked; + return NatType::Blocked; + }; + + let Ok(reply2) = self + .send_and_wait_for_probe_reply(StunProbeAttr::None, &self.config.nat_server2) + .await + else { + *self.nat_type.lock().unwrap() = NatType::Blocked; + return NatType::Blocked; + }; + + if reply1.ip != reply2.ip || reply1.port != reply2.port { + *self.nat_type.lock().unwrap() = NatType::Symmetric; + return NatType::Symmetric; + } + + if let Ok(_reply3) = self + .send_and_wait_for_probe_reply(StunProbeAttr::Peer, &self.config.nat_server1) + .await + { + *self.nat_type.lock().unwrap() = NatType::FullCone; + return NatType::FullCone; + } + + if let Ok(_reply4) = self + .send_and_wait_for_probe_reply(StunProbeAttr::Port, &self.config.nat_server1) + .await + { + *self.nat_type.lock().unwrap() = NatType::ConeRestricted; + return NatType::ConeRestricted; + } else { + *self.nat_type.lock().unwrap() = NatType::PortRestricted; + return NatType::PortRestricted; + } + } + pub async fn send_unregister_super(&self) -> Result<()> { let content = encode_to_tcp_message::(None, 0, PacketType::UnRegisterSuper as u8).unwrap(); @@ -270,43 +341,43 @@ impl Node { Ok(()) } - /* - pub async fn send_register_super(&self) -> Result<()> { - let packet_id = self.packet_id.fetch_add(1, Ordering::Relaxed); - let cmn = Common { - packet_id, - version: SDLAN_VERSION, - id: &self.config.node_uuid, - token: self.token, - ttl: SDLAN_DEFAULT_TTL, - pc: PacketType::PKTRegisterSuper, - flags: 0, + async fn send_and_wait_for_probe_reply( + &self, + msgattr: StunProbeAttr, + to_server: &SocketAddr, + ) -> Result { + let cookie = self.nat_cookie.fetch_add(1, Ordering::Relaxed); + let probe = SdlStunProbe { + attr: msgattr as u32, + cookie, }; - let rs = RegisterSuper { - pass: "encrypt!", - cookie: 0, - sock: None, - v6_info: None, - dev_addr: IpSubnetNonAtomic::new( - self.device_config.get_ip(), - self.device_config.get_net_bit(), - ), - pub_key: self.rsa_pubkey.clone(), - }; - let content = packet::encode_packet(&cmn, &rs)?; - // self.udp_sock_v4.send_to(&content, self.config.super_nodes) - send_to_sock( - &self, - &content, - &self.config.super_nodes[self.config.super_node_index.load(Ordering::Relaxed) as usize], - ) - .await?; - // write_to_addr(&sock, "127.0.0.1:7655", &content).await?; - // println!("sent!"); - Ok(()) + let (tx, rx) = oneshot::channel(); + self.cookie_match.insert(cookie, tx); + // let cookie = msg.cookie; + let msg = encode_to_udp_message(Some(probe), PacketType::StunProbe as u8).unwrap(); + if let Err(e) = self.udp_sock_v4.send_to(&msg, to_server).await { + self.cookie_match.remove(&cookie); + return Err(SDLanError::NormalError("send error")); + } + + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(3)) => { + self.cookie_match.remove(&cookie); + return Err(SDLanError::NormalError("timed out")); + } + reply = rx => { + self.cookie_match.remove(&cookie); + if let Ok(reply) = reply { + // reply received, + return Ok(reply); + // println!("got nat ip: {}:{}", ip_to_string(&reply.ip), reply.port); + } + return Err(SDLanError::NormalError("reply recv error")); + // step 1 received + } + } } - */ } pub struct PeerMap { @@ -418,30 +489,9 @@ pub struct NodeConfig { // pub super_attempts: AtomicU8, pub super_nodes: Vec, pub super_node_index: AtomicU8, -} -impl NodeConfig { - pub fn new() -> Self { - Self { - name: String::new(), - allow_routing: true, - allow_p2p: true, - _drop_multicast: false, - _tos: 0, - _register_super_interval: config::REGISTER_SUPER_INTERVAL, - register_ttl: 1, - // any port, - _local_port: 0, - - node_uuid: String::new(), - - super_nodes: Vec::new(), - // super_attempts: AtomicU8::new(config::SUPER_ATTEMPTS_DEFAULT), - super_node_index: AtomicU8::new(0), - - mtu: 1290, - } - } + pub nat_server1: SocketAddr, + pub nat_server2: SocketAddr, } #[derive(Debug)] diff --git a/src/network/packet.rs b/src/network/packet.rs index ae10baf..4f46f6b 100644 --- a/src/network/packet.rs +++ b/src/network/packet.rs @@ -4,7 +4,7 @@ use crate::{ config::REGISTER_INTERVAL, pb::{ encode_to_tcp_message, encode_to_udp_message, SdlData, SdlEmpty, SdlPeerInfo, SdlQueryInfo, - SdlRegister, SdlRegisterAck, + SdlRegister, SdlRegisterAck, SdlStunProbeReply, }, tcp::{get_tcp_conn, PacketType}, utils::{send_to_sock, Socket}, @@ -80,25 +80,6 @@ pub async fn handle_packet(eee: &Node, addr: SocketAddr, buf: &[u8]) -> Result<( return Err(SDLanError::NormalError("invalid packet type")); }; match pkt_type { - PacketType::StunReply => { - // stun reply, like pong - } - PacketType::Register => { - if !eee.is_authorized() { - error!("dropping REGISTER received before authorized"); - return Ok(()); - } - let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap(); - let _ = handle_packet_register(eee, &buf[1..], false, &from_sock).await; - } - PacketType::RegisterACK => { - if !eee.is_authorized() { - error!("dropping REGISTER received before authorized"); - return Ok(()); - } - let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap(); - let _ = handle_packet_register_ack(eee, &buf[1..], &from_sock).await; - } PacketType::Data => { let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap(); debug!("[PPP]Rx data from {}", from_sock.to_string()); @@ -127,6 +108,33 @@ pub async fn handle_packet(eee: &Node, addr: SocketAddr, buf: &[u8]) -> Result<( } handle_tun_packet(eee, !data.is_p2p, data).await; } + PacketType::StunProbeReply => { + println!("got stunprobeReply"); + let Ok(reply) = SdlStunProbeReply::decode(&buf[1..]) else { + error!("failed to decode SdlStunReply"); + return Ok(()); + }; + eee.send_nat_probe_reply(reply.cookie, reply).await; + } + PacketType::StunReply => { + // stun reply, like pong + } + PacketType::Register => { + if !eee.is_authorized() { + error!("dropping REGISTER received before authorized"); + return Ok(()); + } + let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap(); + let _ = handle_packet_register(eee, &buf[1..], false, &from_sock).await; + } + PacketType::RegisterACK => { + if !eee.is_authorized() { + error!("dropping REGISTER received before authorized"); + return Ok(()); + } + let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap(); + let _ = handle_packet_register_ack(eee, &buf[1..], &from_sock).await; + } other => { error!("udp not processing {:?}", other); } diff --git a/src/tcp/tcp_codec.rs b/src/tcp/tcp_codec.rs index 097a739..c3530ef 100644 --- a/src/tcp/tcp_codec.rs +++ b/src/tcp/tcp_codec.rs @@ -27,6 +27,25 @@ pub enum EventType { NetworkShutdown = 0xFF, } +#[derive(Debug, Copy, Clone, TryFromPrimitive)] +#[repr(u8)] +pub enum StunProbeAttr { + None = 0, + Port = 1, + Peer = 2, +} + +#[derive(Debug, Copy, Clone, TryFromPrimitive)] +#[repr(u8)] +pub enum NatType { + Blocked = 0, + NoNat = 1, + FullCone = 2, + PortRestricted = 3, + ConeRestricted = 4, + Symmetric = 5, +} + #[derive(Debug, Copy, Clone, TryFromPrimitive)] #[repr(u8)] pub enum PacketType { diff --git a/src/utils/command.rs b/src/utils/command.rs index 02ed1c8..d130b72 100644 --- a/src/utils/command.rs +++ b/src/utils/command.rs @@ -8,6 +8,14 @@ pub struct CommandLine { #[structopt(short = "t", long = "tcp", default_value = "127.0.0.1:7656")] pub tcp: String, + /// in the format of "localhost:1234" + #[structopt(long = "nat1")] + pub nat_server1: String, + + /// in the format of "localhost:1234" + #[structopt(long = "nat2")] + pub nat_server2: String, + #[structopt(short = "r")] pub _allow_routing: bool, @@ -46,6 +54,8 @@ impl Clone for CommandLine { tos: self.tos, token: self.token.clone(), allow_p2p: self.allow_p2p, + nat_server1: self.nat_server1.clone(), + nat_server2: self.nat_server2.clone(), } } } diff --git a/src/utils/socks.rs b/src/utils/socks.rs index 3f7f67d..702a4e0 100644 --- a/src/utils/socks.rs +++ b/src/utils/socks.rs @@ -2,7 +2,7 @@ use sdlan_sn_rs::{ config::{AF_INET, AF_INET6}, utils::{Result, SDLanError}, }; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4}; use tokio::net::ToSocketAddrs; use tracing::error; @@ -87,6 +87,11 @@ pub async fn send_to_sock_v4_and_v6( } */ +pub fn get_socketaddr_from_sock(s: &SdlanSock) -> SocketAddr { + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(s.v4)), s.port); + addr +} + pub async fn send_to_sock( // sk: &Socket, eee: &Node,