2025-05-12 10:37:16 +08:00

677 lines
18 KiB
Rust
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use dashmap::DashMap;
use rsa::RsaPrivateKey;
use sdlan_sn_rs::config::{AF_INET, AF_INET6};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicU8, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tracing::{debug, error};
use crate::pb::{
encode_to_tcp_message, encode_to_udp_message, SdlEmpty, SdlStunProbe, SdlStunProbeReply,
};
use crate::tcp::{get_tcp_conn, NatType, PacketType, StunProbeAttr};
use crate::utils::{PidRecorder, Socket};
use sdlan_sn_rs::peer::{IpSubnet, V6Info};
use once_cell::sync::OnceCell;
use super::device::{DeviceConfig, Mode};
use super::tun::{new_iface, Iface};
use tokio::fs::File;
use sdlan_sn_rs::utils::{gen_rsa_keys, load_private_key_file, Mac};
use sdlan_sn_rs::utils::{Result, SDLanError};
static EDGE: OnceCell<Node> = OnceCell::new();
pub async fn init_edge(
token: &str,
node_conf: NodeConfig,
tos: u32,
start_stop: Sender<StartStopInfo>,
mtu: u32,
) -> Result<()> {
let _ = PidRecorder::new(".pid");
// gen public key
gen_rsa_keys(".client");
let mut pubkey = String::new();
File::open(".client/id_rsa.pub")
.await?
.read_to_string(&mut pubkey)
.await?;
let privatekey = load_private_key_file(".client/id_rsa")?;
// init sock
// let edge_uuid = create_or_load_uuid("")?;
//let node_conf = parse_config(edge_uuid, &args).await?;
let Ok(sock_v4) = Socket::build(node_conf._local_port, true, false, tos).await else {
panic!("failed to build sock for sock v4");
};
let mut sock_multicast = None;
if !node_conf._drop_multicast {
sock_multicast = Some(Socket::build(MULTICAST_PORT, true, true, 0).await?);
}
// let sock_multicast = Socket::build(MULTICAST_PORT, true, true, 0).await?;
// allow multicast
// TODO: set the sn's tcp socket
// let tcpsock = TCPSocket::build("121.4.79.234:1234").await?;
let tcp_pong = Arc::new(AtomicU64::new(0));
let edge = Node::new(
pubkey,
node_conf,
sock_v4,
sock_multicast,
token,
privatekey,
tcp_pong.clone(),
start_stop,
mtu,
);
do_init_edge(edge)?;
Ok(())
}
fn do_init_edge(edge: Node) -> Result<()> {
if let Err(_) = EDGE.set(edge) {
return Err(SDLanError::NormalError("initialize sn error"));
}
Ok(())
}
pub fn get_edge() -> &'static Node {
let edge = EDGE.get();
if edge.is_none() {
panic!("should init_edge first");
}
edge.unwrap()
}
pub struct RegisterSuperFeedback {
// 0 for success, other for error
pub result: u8,
pub message: String,
pub should_exit: bool,
}
pub struct StartStopInfo {
pub is_start: bool,
pub pkt_id: Option<u32>,
}
pub struct Node {
packet_id: AtomicU32,
pub network_id: AtomicU32,
pub tcp_pong: Arc<AtomicU64>,
start_stop_sender: Sender<StartStopInfo>,
// user token info
pub _token: Mutex<String>,
pub device_config: DeviceConfig,
pub device: Iface,
// authorize related
pub authorized: AtomicBool,
// pub header_key: RwLock<Arc<Vec<u8>>>,
pub encrypt_key: RwLock<Arc<Vec<u8>>>,
pub rsa_pubkey: String,
pub rsa_private: RsaPrivateKey,
pub config: NodeConfig,
// pub super_node: Vec<Peer>,
// pub super_attempts: AtomicU8,
// store pending, and known peers
pub pending_peers: PeerMap,
pub known_peers: PeerMap,
// pub tcp_sock_v4: TCPSocket,
pub udp_sock_multicast: Option<Socket>,
pub udp_sock_v4: Socket,
pub outer_ip_v4: AtomicU32,
pub udp_sock_v6: RwLock<Option<Arc<Socket>>>,
pub ipv6: RwLock<Option<SdlanSock>>,
pub multicast_sock: SdlanSock,
pub _local_v6: RwLock<Option<[u8; 16]>>,
pub stats: NodeStats,
// last register super time, in unix
pub _last_register_req: AtomicU64,
nat_type: Mutex<NatType>,
nat_cookie: AtomicU32,
cookie_match: DashMap<u32, oneshot::Sender<SdlStunProbeReply>>,
packet_id_match: DashMap<u32, oneshot::Sender<RegisterSuperFeedback>>,
}
unsafe impl Sync for Node {}
impl Node {
pub fn send_register_super_feedback(&self, pktid: u32, feed: RegisterSuperFeedback) {
match self.packet_id_match.remove(&pktid) {
Some(sender) => {
let _ = sender.1.send(feed);
}
None => {
return;
}
}
}
pub fn get_nat_type(&self) -> NatType {
self.nat_type.lock().unwrap().clone()
}
pub async fn start_without_feedback(&self, token: String) -> Result<()> {
*self._token.lock().unwrap() = token;
let _ = self
.start_stop_sender
.send(StartStopInfo {
is_start: true,
pkt_id: None,
})
.await;
Ok(())
}
pub async fn start_with_feedback(
&self,
token: String,
timeout: Duration,
) -> Result<RegisterSuperFeedback> {
*self._token.lock().unwrap() = token;
let (tx, rx) = oneshot::channel();
let id = self.get_next_packet_id();
self.packet_id_match.insert(id, tx);
let _ = self
.start_stop_sender
.send(StartStopInfo {
is_start: true,
pkt_id: Some(id),
})
.await;
debug!("start with feedback");
tokio::select! {
rx_info = rx => {
if let Ok(result) = rx_info {
self.packet_id_match.remove(&id);
Ok(result)
} else {
Err(SDLanError::NormalError("rx closed"))
}
}
_ = tokio::time::sleep(timeout) => {
Err(SDLanError::NormalError("timed out"))
}
}
}
pub async fn stop(&self) {
*self._token.lock().unwrap() = "".to_owned();
let _ = self
.start_stop_sender
.send(StartStopInfo {
is_start: false,
pkt_id: None,
})
.await;
}
pub fn new(
pubkey: String,
config: NodeConfig,
sock: Socket,
multicast_sock: Option<Socket>,
// tcpsock: TCPSocket,
token: &str,
private: RsaPrivateKey,
tcp_pong: Arc<AtomicU64>,
start_stop: Sender<StartStopInfo>,
mtu: u32,
) -> Self {
let mode = if cfg!(not(feature = "tun")) {
Mode::Tap
} else {
Mode::Tun
};
Self {
packet_id: AtomicU32::new(1),
network_id: AtomicU32::new(0),
_token: Mutex::new(token.to_owned()),
start_stop_sender: start_stop,
tcp_pong,
nat_type: Mutex::new(NatType::Blocked),
device_config: DeviceConfig::new(mtu),
device: new_iface("dev", mode),
authorized: AtomicBool::new(false),
encrypt_key: RwLock::new(Arc::new(Vec::new())),
// rsa_pubkey:
rsa_pubkey: pubkey,
rsa_private: private,
config,
// super_node: Vec::new(),
// super_attempts: AtomicU8::new(0),
pending_peers: PeerMap::new(),
known_peers: PeerMap::new(),
// tcp_sock_v4: tcpsock,
udp_sock_multicast: multicast_sock,
udp_sock_v4: sock,
outer_ip_v4: AtomicU32::new(0),
udp_sock_v6: RwLock::new(None),
ipv6: RwLock::new(None),
multicast_sock: SdlanSock {
family: AF_INET,
port: config::MULTICAST_PORT,
v4: config::MULITCAST_V4,
v6: [0; 16],
},
_local_v6: RwLock::new(None),
stats: NodeStats::new(),
_last_register_req: AtomicU64::new(0),
packet_id_match: DashMap::new(),
nat_cookie: AtomicU32::new(1),
cookie_match: DashMap::new(),
}
}
pub fn get_next_packet_id(&self) -> u32 {
self.packet_id.fetch_add(1, Ordering::Relaxed)
}
pub fn is_authorized(&self) -> bool {
// self.header_key
self.authorized.load(Ordering::Relaxed)
}
pub fn set_authorized(&self, authorized: bool, encrypt_key: Vec<u8>) {
self.authorized.store(authorized, Ordering::Relaxed);
*(self.encrypt_key.write().unwrap()) = Arc::new(encrypt_key);
}
/*
pub fn get_header_key(&self) -> Arc<Vec<u8>> {
// self.header_key.read().unwrap().clone()
}
*/
pub fn get_encrypt_key(&self) -> Arc<Vec<u8>> {
self.encrypt_key.read().unwrap().clone()
}
/*
pub fn sn_is_known(&self, sock: &SdlanSock) -> bool {
for sn in self.config.super_nodes.iter() {
if sn.family != sock.family || sn.port != sock.port {
continue;
}
if sn.family == AF_INET && sn.v4 == sock.v4 {
return true;
}
if sn.family == AF_INET6 && sn.v6 == sock.v6 {
return true;
}
}
return false;
}
*/
pub fn _remove_v6(&self) {
*(self.udp_sock_v6.write().unwrap()) = None;
}
/*
pub async fn send_to_v4<A: ToSocketAddrs>(&self, info: &[u8], target: A) -> Result<usize> {
match self.udp_sock_v4.send_to(info, target).await {
Ok(n) => Ok(n),
Err(e) => {
println!("failed to send");
Err(SDLanError::NormalError("failed to send"))
}
}
}
*/
/*
pub async fn send_to_v6<A: ToSocketAddrs>(&self, info: &[u8], target: A) -> Result<usize> {
let m = self.udp_sock_v6.read().unwrap().clone();
if let Some(ref l) = m.as_ref() {
match l.send_to(info, target).await {
Err(e) => {
return Err(SDLanError::NormalError("send error"));
}
Ok(n) => return Ok(n),
}
}
Err(SDLanError::NormalError("no udp6 conn is bined"))
}
*/
pub async fn send_nat_probe_reply(&self, cookie: u32, buf: SdlStunProbeReply) {
if let Some((_key, chan)) = self.cookie_match.remove(&cookie) {
let _ = chan.send(buf);
return;
}
error!("failed to get such cookie stun probe");
}
pub async fn probe_nat_type(&self) -> NatType {
/*
if !self.is_authorized() {
return None;
}
*/
let Ok(reply1) = self
.send_and_wait_for_probe_reply(StunProbeAttr::None, &self.config.nat_server1)
.await
else {
*self.nat_type.lock().unwrap() = NatType::Blocked;
return NatType::Blocked;
};
if reply1.ip == self.outer_ip_v4.load(Ordering::Relaxed) {
let Ok(_reply2) = self
.send_and_wait_for_probe_reply(StunProbeAttr::Peer, &self.config.nat_server1)
.await
else {
*self.nat_type.lock().unwrap() = NatType::Symmetric;
return NatType::Symmetric;
};
*self.nat_type.lock().unwrap() = NatType::NoNat;
return NatType::NoNat;
}
if let Ok(_reply2_2) = self
.send_and_wait_for_probe_reply(StunProbeAttr::Peer, &self.config.nat_server1)
.await
{
*self.nat_type.lock().unwrap() = NatType::FullCone;
return NatType::FullCone;
}
let Ok(reply3) = self
.send_and_wait_for_probe_reply(StunProbeAttr::None, &self.config.nat_server2)
.await
else {
*self.nat_type.lock().unwrap() = NatType::Blocked;
return NatType::Blocked;
};
if reply3.ip != reply1.ip || reply3.port != reply1.port {
*self.nat_type.lock().unwrap() = NatType::Symmetric;
return NatType::Symmetric;
}
if let Ok(_reply4) = self
.send_and_wait_for_probe_reply(StunProbeAttr::Port, &self.config.nat_server1)
.await
{
*self.nat_type.lock().unwrap() = NatType::ConeRestricted;
return NatType::ConeRestricted;
} else {
*self.nat_type.lock().unwrap() = NatType::PortRestricted;
return NatType::PortRestricted;
}
}
pub async fn send_unregister_super(&self) -> Result<()> {
let content =
encode_to_tcp_message::<SdlEmpty>(None, 0, PacketType::UnRegisterSuper as u8).unwrap();
let conn = get_tcp_conn();
let _ = conn.send(&content).await;
Ok(())
}
async fn send_and_wait_for_probe_reply(
&self,
msgattr: StunProbeAttr,
to_server: &SocketAddr,
) -> Result<SdlStunProbeReply> {
let cookie = self.nat_cookie.fetch_add(1, Ordering::Relaxed);
let probe = SdlStunProbe {
attr: msgattr as u32,
cookie,
};
// println!("==> sending probe request: {:?}", probe);
let (tx, rx) = oneshot::channel();
self.cookie_match.insert(cookie, tx);
// let cookie = msg.cookie;
let msg = encode_to_udp_message(Some(probe), PacketType::StunProbe as u8).unwrap();
if let Err(_e) = self.udp_sock_v4.send_to(&msg, to_server).await {
self.cookie_match.remove(&cookie);
return Err(SDLanError::NormalError("send error"));
}
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(3)) => {
self.cookie_match.remove(&cookie);
return Err(SDLanError::NormalError("timed out"));
}
reply = rx => {
self.cookie_match.remove(&cookie);
if let Ok(reply) = reply {
// reply received,
return Ok(reply);
// println!("got nat ip: {}:{}", ip_to_string(&reply.ip), reply.port);
}
return Err(SDLanError::NormalError("reply recv error"));
// step 1 received
}
}
}
}
pub struct PeerMap {
pub peers: DashMap<Mac, EdgePeer>,
}
#[allow(unused)]
impl PeerMap {
pub fn new() -> PeerMap {
Self {
peers: DashMap::new(),
}
}
/*
pub fn get_peer(&self, mac: &Mac) -> Option<Arc<EdgePeer>> {
if let Some(v) = self.peers.get(mac) {
Some(v.clone())
} else {
None
}
}
*/
pub fn clear(&self) {
self.peers.clear();
}
pub fn get_peer_by_sock(&self, sock: &SdlanSock) -> Option<Arc<EdgePeer>> {
/*
for s in self.peers.iter() {
let m = s.sock.read().unwrap();
if is_sdlan_sock_equal(&m, sock) {
return Some(s.value().clone());
}
}
*/
None
}
pub fn delete_peer_with_mac(&self, mac: &Mac) -> Option<(Mac, EdgePeer)> {
self.peers.remove(mac)
}
pub fn insert_peer(&self, mac: Mac, p: EdgePeer) {
self.peers.insert(mac, p);
}
}
pub struct NodeStats {
pub tx_p2p: AtomicU64,
pub rx_p2p: AtomicU64,
pub tx_sup: AtomicU64,
pub rx_sup: AtomicU64,
pub tx_broadcast: AtomicU64,
pub rx_broadcast: AtomicU64,
pub last_sup: AtomicU64,
pub last_p2p: AtomicU64,
}
impl NodeStats {
pub fn new() -> Self {
Self {
tx_p2p: AtomicU64::new(0),
rx_p2p: AtomicU64::new(0),
tx_sup: AtomicU64::new(0),
rx_sup: AtomicU64::new(0),
tx_broadcast: AtomicU64::new(0),
rx_broadcast: AtomicU64::new(0),
last_p2p: AtomicU64::new(0),
last_sup: AtomicU64::new(0),
}
}
}
use sdlan_sn_rs::peer::SdlanSock;
use crate::config::{self, MULTICAST_PORT, REGISTER_SUPER_INTERVAL};
pub struct NodeConfig {
// node name
pub name: String,
// 允许路由
pub allow_routing: bool,
// 丢弃多播,广播消息
pub _drop_multicast: bool,
// 允许p2p打洞
pub allow_p2p: bool,
// mtu of the tun
pub mtu: u32,
// udp消息的服务类型
pub _tos: u8,
// 打洞时候,时间间隔
pub _register_super_interval: u16,
// 打洞时候register消息的ttl
pub register_ttl: u8,
// 本地打开的udp端口
pub _local_port: u16,
pub node_uuid: String,
// pub super_attempts: AtomicU8,
pub super_nodes: Vec<SdlanSock>,
pub super_node_index: AtomicU8,
pub nat_server1: SocketAddr,
pub nat_server2: SocketAddr,
}
#[derive(Debug)]
pub struct EdgePeer {
// pub mac: Mac,
pub dev_addr: IpSubnet,
pub nat_type: NatType,
// 对端对外开放的ip和端口信息
pub sock: SdlanSock,
// peer's ipv6 info
pub _v6_info: Option<SdlanSock>,
pub timeout: u8,
// 最近一次遇见
pub last_seen: AtomicU64,
// 最近一次p2p消息
pub last_p2p: AtomicU64,
// 最近一次合法时间
pub _last_valid_timestamp: AtomicU64,
// 当时间超过重新注册的时间之后,
// pub has_sent_last_p2p: AtomicBool,
// 最近一次发送query
pub last_sent_query: AtomicU64,
}
impl EdgePeer {
pub fn new(
// mac: Mac,
net_addr: u32,
net_bit_len: u8,
sock: &SdlanSock,
v6info: &Option<V6Info>,
now: u64,
) -> Self {
let mut v6_info = None;
if let Some(v6info) = v6info {
v6_info = Some(SdlanSock {
family: AF_INET6,
port: v6info.port,
v4: [0; 4],
v6: v6info.v6,
})
}
Self {
// mac,
dev_addr: IpSubnet::new(net_addr, net_bit_len),
sock: sock.deepcopy(),
_v6_info: v6_info,
timeout: REGISTER_SUPER_INTERVAL as u8,
last_p2p: AtomicU64::new(0),
last_seen: AtomicU64::new(0),
_last_valid_timestamp: AtomicU64::new(now),
last_sent_query: AtomicU64::new(0),
nat_type: NatType::Invalid,
}
}
pub fn get_nat_type(&self) -> NatType {
self.nat_type
}
}