use dashmap::DashMap; use rsa::RsaPrivateKey; use sdlan_sn_rs::config::{AF_INET, AF_INET6}; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicU8, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; use tokio::io::AsyncReadExt; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; use tracing::{debug, error}; use crate::pb::{ encode_to_tcp_message, encode_to_udp_message, SdlEmpty, SdlStunProbe, SdlStunProbeReply, }; use crate::tcp::{get_tcp_conn, NatType, PacketType, StunProbeAttr}; use crate::utils::{PidRecorder, Socket}; use sdlan_sn_rs::peer::{IpSubnet, V6Info}; use once_cell::sync::OnceCell; use super::device::{DeviceConfig, Mode}; use super::tun::{new_iface, Iface}; use tokio::fs::File; use sdlan_sn_rs::utils::{gen_rsa_keys, load_private_key_file, Mac}; use sdlan_sn_rs::utils::{Result, SDLanError}; static EDGE: OnceCell = OnceCell::new(); pub async fn init_edge( token: &str, network_code: &str, node_conf: NodeConfig, tos: u32, start_stop: Sender, mtu: u32, ) -> Result<()> { let _ = PidRecorder::new(".pid"); // gen public key gen_rsa_keys(".client"); let mut pubkey = String::new(); File::open(".client/id_rsa.pub") .await? .read_to_string(&mut pubkey) .await?; let privatekey = load_private_key_file(".client/id_rsa")?; // init sock // let edge_uuid = create_or_load_uuid("")?; //let node_conf = parse_config(edge_uuid, &args).await?; let Ok(sock_v4) = Socket::build(node_conf._local_port, true, false, tos).await else { panic!("failed to build sock for sock v4"); }; let mut sock_multicast = None; if !node_conf._drop_multicast { sock_multicast = Some(Socket::build(MULTICAST_PORT, true, true, 0).await?); } // let sock_multicast = Socket::build(MULTICAST_PORT, true, true, 0).await?; // allow multicast // TODO: set the sn's tcp socket // let tcpsock = TCPSocket::build("121.4.79.234:1234").await?; let tcp_pong = Arc::new(AtomicU64::new(0)); let edge = Node::new( pubkey, node_conf, sock_v4, sock_multicast, token, network_code, privatekey, tcp_pong.clone(), start_stop, mtu, ); do_init_edge(edge)?; Ok(()) } fn do_init_edge(edge: Node) -> Result<()> { if let Err(_) = EDGE.set(edge) { return Err(SDLanError::NormalError("initialize sn error")); } Ok(()) } pub fn get_edge() -> &'static Node { let edge = EDGE.get(); if edge.is_none() { panic!("should init_edge first"); } edge.unwrap() } pub struct RegisterSuperFeedback { // 0 for success, other for error pub result: u8, pub message: String, pub should_exit: bool, } pub struct StartStopInfo { pub is_start: bool, pub pkt_id: Option, } pub struct Node { packet_id: AtomicU32, pub network_id: AtomicU32, pub tcp_pong: Arc, start_stop_sender: Sender, // user token info pub _token: Mutex, pub network_code: Mutex, pub device_config: DeviceConfig, pub device: Iface, // authorize related pub authorized: AtomicBool, // pub header_key: RwLock>>, pub encrypt_key: RwLock>>, pub rsa_pubkey: String, pub rsa_private: RsaPrivateKey, pub config: NodeConfig, // pub super_node: Vec, // pub super_attempts: AtomicU8, // store pending, and known peers pub pending_peers: PeerMap, pub known_peers: PeerMap, // pub tcp_sock_v4: TCPSocket, pub udp_sock_multicast: Option, pub udp_sock_v4: Socket, pub outer_ip_v4: AtomicU32, pub udp_sock_v6: RwLock>>, pub ipv6: RwLock>, pub multicast_sock: SdlanSock, pub _local_v6: RwLock>, pub stats: NodeStats, // last register super time, in unix pub _last_register_req: AtomicU64, nat_type: Mutex, nat_cookie: AtomicU32, cookie_match: DashMap>, packet_id_match: DashMap>, } unsafe impl Sync for Node {} impl Node { pub fn send_register_super_feedback(&self, pktid: u32, feed: RegisterSuperFeedback) { match self.packet_id_match.remove(&pktid) { Some(sender) => { let _ = sender.1.send(feed); } None => { return; } } } pub fn get_nat_type(&self) -> NatType { self.nat_type.lock().unwrap().clone() } pub async fn start_without_feedback(&self, token: String, network_code: String) -> Result<()> { *self._token.lock().unwrap() = token; *self.network_code.lock().unwrap() = network_code; let _ = self .start_stop_sender .send(StartStopInfo { is_start: true, pkt_id: None, }) .await; Ok(()) } pub async fn start_with_feedback( &self, token: String, network_code: String, timeout: Duration, ) -> Result { *self._token.lock().unwrap() = token; *self.network_code.lock().unwrap() = network_code; let (tx, rx) = oneshot::channel(); let id = self.get_next_packet_id(); self.packet_id_match.insert(id, tx); let _ = self .start_stop_sender .send(StartStopInfo { is_start: true, pkt_id: Some(id), }) .await; debug!("start with feedback"); tokio::select! { rx_info = rx => { if let Ok(result) = rx_info { self.packet_id_match.remove(&id); Ok(result) } else { Err(SDLanError::NormalError("rx closed")) } } _ = tokio::time::sleep(timeout) => { Err(SDLanError::NormalError("timed out")) } } } pub async fn stop(&self) { *self._token.lock().unwrap() = "".to_owned(); let _ = self .start_stop_sender .send(StartStopInfo { is_start: false, pkt_id: None, }) .await; } pub fn new( pubkey: String, config: NodeConfig, sock: Socket, multicast_sock: Option, // tcpsock: TCPSocket, token: &str, network_code: &str, private: RsaPrivateKey, tcp_pong: Arc, start_stop: Sender, mtu: u32, ) -> Self { let mode = if cfg!(not(feature = "tun")) { Mode::Tap } else { Mode::Tun }; Self { packet_id: AtomicU32::new(1), network_id: AtomicU32::new(0), _token: Mutex::new(token.to_owned()), network_code: Mutex::new(network_code.to_owned()), start_stop_sender: start_stop, tcp_pong, nat_type: Mutex::new(NatType::Blocked), device_config: DeviceConfig::new(mtu), device: new_iface("dev", mode), authorized: AtomicBool::new(false), encrypt_key: RwLock::new(Arc::new(Vec::new())), // rsa_pubkey: rsa_pubkey: pubkey, rsa_private: private, config, // super_node: Vec::new(), // super_attempts: AtomicU8::new(0), pending_peers: PeerMap::new(), known_peers: PeerMap::new(), // tcp_sock_v4: tcpsock, udp_sock_multicast: multicast_sock, udp_sock_v4: sock, outer_ip_v4: AtomicU32::new(0), udp_sock_v6: RwLock::new(None), ipv6: RwLock::new(None), multicast_sock: SdlanSock { family: AF_INET, port: config::MULTICAST_PORT, v4: config::MULITCAST_V4, v6: [0; 16], }, _local_v6: RwLock::new(None), stats: NodeStats::new(), _last_register_req: AtomicU64::new(0), packet_id_match: DashMap::new(), nat_cookie: AtomicU32::new(1), cookie_match: DashMap::new(), } } pub fn get_next_packet_id(&self) -> u32 { self.packet_id.fetch_add(1, Ordering::Relaxed) } pub fn is_authorized(&self) -> bool { // self.header_key self.authorized.load(Ordering::Relaxed) } pub fn set_authorized(&self, authorized: bool, encrypt_key: Vec) { self.authorized.store(authorized, Ordering::Relaxed); *(self.encrypt_key.write().unwrap()) = Arc::new(encrypt_key); } /* pub fn get_header_key(&self) -> Arc> { // self.header_key.read().unwrap().clone() } */ pub fn get_encrypt_key(&self) -> Arc> { self.encrypt_key.read().unwrap().clone() } /* pub fn sn_is_known(&self, sock: &SdlanSock) -> bool { for sn in self.config.super_nodes.iter() { if sn.family != sock.family || sn.port != sock.port { continue; } if sn.family == AF_INET && sn.v4 == sock.v4 { return true; } if sn.family == AF_INET6 && sn.v6 == sock.v6 { return true; } } return false; } */ pub fn _remove_v6(&self) { *(self.udp_sock_v6.write().unwrap()) = None; } /* pub async fn send_to_v4(&self, info: &[u8], target: A) -> Result { match self.udp_sock_v4.send_to(info, target).await { Ok(n) => Ok(n), Err(e) => { println!("failed to send"); Err(SDLanError::NormalError("failed to send")) } } } */ /* pub async fn send_to_v6(&self, info: &[u8], target: A) -> Result { let m = self.udp_sock_v6.read().unwrap().clone(); if let Some(ref l) = m.as_ref() { match l.send_to(info, target).await { Err(e) => { return Err(SDLanError::NormalError("send error")); } Ok(n) => return Ok(n), } } Err(SDLanError::NormalError("no udp6 conn is bined")) } */ pub async fn send_nat_probe_reply(&self, cookie: u32, buf: SdlStunProbeReply) { if let Some((_key, chan)) = self.cookie_match.remove(&cookie) { let _ = chan.send(buf); return; } error!("failed to get such cookie stun probe"); } pub async fn probe_nat_type(&self) -> NatType { /* if !self.is_authorized() { return None; } */ let Ok(reply1) = self .send_and_wait_for_probe_reply(StunProbeAttr::None, &self.config.nat_server1) .await else { *self.nat_type.lock().unwrap() = NatType::Blocked; return NatType::Blocked; }; if reply1.ip == self.outer_ip_v4.load(Ordering::Relaxed) { let Ok(_reply2) = self .send_and_wait_for_probe_reply(StunProbeAttr::Peer, &self.config.nat_server1) .await else { *self.nat_type.lock().unwrap() = NatType::Symmetric; return NatType::Symmetric; }; *self.nat_type.lock().unwrap() = NatType::NoNat; return NatType::NoNat; } if let Ok(_reply2_2) = self .send_and_wait_for_probe_reply(StunProbeAttr::Peer, &self.config.nat_server1) .await { *self.nat_type.lock().unwrap() = NatType::FullCone; return NatType::FullCone; } let Ok(reply3) = self .send_and_wait_for_probe_reply(StunProbeAttr::None, &self.config.nat_server2) .await else { *self.nat_type.lock().unwrap() = NatType::Blocked; return NatType::Blocked; }; if reply3.ip != reply1.ip || reply3.port != reply1.port { *self.nat_type.lock().unwrap() = NatType::Symmetric; return NatType::Symmetric; } if let Ok(_reply4) = self .send_and_wait_for_probe_reply(StunProbeAttr::Port, &self.config.nat_server1) .await { *self.nat_type.lock().unwrap() = NatType::ConeRestricted; return NatType::ConeRestricted; } else { *self.nat_type.lock().unwrap() = NatType::PortRestricted; return NatType::PortRestricted; } } pub async fn send_unregister_super(&self) -> Result<()> { let content = encode_to_tcp_message::(None, 0, PacketType::UnRegisterSuper as u8).unwrap(); let conn = get_tcp_conn(); let _ = conn.send(&content).await; Ok(()) } async fn send_and_wait_for_probe_reply( &self, msgattr: StunProbeAttr, to_server: &SocketAddr, ) -> Result { let cookie = self.nat_cookie.fetch_add(1, Ordering::Relaxed); let probe = SdlStunProbe { attr: msgattr as u32, cookie, }; // println!("==> sending probe request: {:?}", probe); let (tx, rx) = oneshot::channel(); self.cookie_match.insert(cookie, tx); // let cookie = msg.cookie; let msg = encode_to_udp_message(Some(probe), PacketType::StunProbe as u8).unwrap(); if let Err(_e) = self.udp_sock_v4.send_to(&msg, to_server).await { self.cookie_match.remove(&cookie); return Err(SDLanError::NormalError("send error")); } tokio::select! { _ = tokio::time::sleep(Duration::from_secs(3)) => { self.cookie_match.remove(&cookie); return Err(SDLanError::NormalError("timed out")); } reply = rx => { self.cookie_match.remove(&cookie); if let Ok(reply) = reply { // reply received, return Ok(reply); // println!("got nat ip: {}:{}", ip_to_string(&reply.ip), reply.port); } return Err(SDLanError::NormalError("reply recv error")); // step 1 received } } } } pub struct PeerMap { pub peers: DashMap, } #[allow(unused)] impl PeerMap { pub fn new() -> PeerMap { Self { peers: DashMap::new(), } } /* pub fn get_peer(&self, mac: &Mac) -> Option> { if let Some(v) = self.peers.get(mac) { Some(v.clone()) } else { None } } */ pub fn clear(&self) { self.peers.clear(); } pub fn get_peer_by_sock(&self, sock: &SdlanSock) -> Option> { /* for s in self.peers.iter() { let m = s.sock.read().unwrap(); if is_sdlan_sock_equal(&m, sock) { return Some(s.value().clone()); } } */ None } pub fn delete_peer_with_mac(&self, mac: &Mac) -> Option<(Mac, EdgePeer)> { self.peers.remove(mac) } pub fn insert_peer(&self, mac: Mac, p: EdgePeer) { self.peers.insert(mac, p); } } pub struct NodeStats { pub tx_p2p: AtomicU64, pub rx_p2p: AtomicU64, pub tx_sup: AtomicU64, pub rx_sup: AtomicU64, pub tx_broadcast: AtomicU64, pub rx_broadcast: AtomicU64, pub last_sup: AtomicU64, pub last_p2p: AtomicU64, } impl NodeStats { pub fn new() -> Self { Self { tx_p2p: AtomicU64::new(0), rx_p2p: AtomicU64::new(0), tx_sup: AtomicU64::new(0), rx_sup: AtomicU64::new(0), tx_broadcast: AtomicU64::new(0), rx_broadcast: AtomicU64::new(0), last_p2p: AtomicU64::new(0), last_sup: AtomicU64::new(0), } } } use sdlan_sn_rs::peer::SdlanSock; use crate::config::{self, MULTICAST_PORT, REGISTER_SUPER_INTERVAL}; pub struct NodeConfig { // node name pub name: String, // 允许路由 pub allow_routing: bool, // 丢弃多播,广播消息 pub _drop_multicast: bool, // 允许p2p打洞 pub allow_p2p: bool, // mtu of the tun pub mtu: u32, // udp消息的服务类型 pub _tos: u8, // 打洞时候,时间间隔 pub _register_super_interval: u16, // 打洞时候,register消息的ttl pub register_ttl: u8, // 本地打开的udp端口 pub _local_port: u16, pub node_uuid: String, // pub super_attempts: AtomicU8, pub super_nodes: Vec, pub super_node_index: AtomicU8, pub nat_server1: SocketAddr, pub nat_server2: SocketAddr, } #[derive(Debug)] pub struct EdgePeer { // pub mac: Mac, pub dev_addr: IpSubnet, pub nat_type: NatType, // 对端对外开放的ip和端口信息 pub sock: SdlanSock, // peer's ipv6 info pub _v6_info: Option, pub timeout: u8, // 最近一次遇见 pub last_seen: AtomicU64, // 最近一次p2p消息 pub last_p2p: AtomicU64, // 最近一次合法时间 pub _last_valid_timestamp: AtomicU64, // 当时间超过重新注册的时间之后, // pub has_sent_last_p2p: AtomicBool, // 最近一次发送query pub last_sent_query: AtomicU64, } impl EdgePeer { pub fn new( // mac: Mac, net_addr: u32, net_bit_len: u8, sock: &SdlanSock, v6info: &Option, now: u64, ) -> Self { let mut v6_info = None; if let Some(v6info) = v6info { v6_info = Some(SdlanSock { family: AF_INET6, port: v6info.port, v4: [0; 4], v6: v6info.v6, }) } Self { // mac, dev_addr: IpSubnet::new(net_addr, net_bit_len), sock: sock.deepcopy(), _v6_info: v6_info, timeout: REGISTER_SUPER_INTERVAL as u8, last_p2p: AtomicU64::new(0), last_seen: AtomicU64::new(0), _last_valid_timestamp: AtomicU64::new(now), last_sent_query: AtomicU64::new(0), nat_type: NatType::Invalid, } } pub fn get_nat_type(&self) -> NatType { self.nat_type } }