644 lines
17 KiB
Rust
644 lines
17 KiB
Rust
use dashmap::DashMap;
|
||
use rsa::RsaPrivateKey;
|
||
use sdlan_sn_rs::config::{AF_INET, AF_INET6};
|
||
use std::collections::HashMap;
|
||
use std::net::SocketAddr;
|
||
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicU8, Ordering};
|
||
use std::sync::{Arc, Mutex, RwLock};
|
||
use std::time::Duration;
|
||
use tokio::io::AsyncReadExt;
|
||
use tokio::sync::mpsc::Sender;
|
||
use tokio::sync::oneshot;
|
||
use tracing::{debug, error};
|
||
|
||
use crate::pb::{
|
||
encode_to_tcp_message, encode_to_udp_message, SdlEmpty, SdlStunProbe, SdlStunProbeReply,
|
||
};
|
||
use crate::tcp::{get_tcp_conn, NatType, PacketType, StunProbeAttr};
|
||
use crate::utils::{PidRecorder, Socket};
|
||
|
||
use sdlan_sn_rs::peer::{is_sdlan_sock_equal, IpSubnet, V6Info};
|
||
|
||
use once_cell::sync::OnceCell;
|
||
|
||
use super::device::{DeviceConfig, Mode};
|
||
use super::tun::{new_iface, Iface};
|
||
use tokio::fs::File;
|
||
|
||
use sdlan_sn_rs::utils::{gen_rsa_keys, load_private_key_file, Mac};
|
||
use sdlan_sn_rs::utils::{Result, SDLanError};
|
||
|
||
static EDGE: OnceCell<Node> = OnceCell::new();
|
||
|
||
pub async fn init_edge(
|
||
token: &str,
|
||
node_conf: NodeConfig,
|
||
tos: u32,
|
||
start_stop: Sender<StartStopInfo>,
|
||
) -> Result<()> {
|
||
let _ = PidRecorder::new(".pid");
|
||
|
||
// gen public key
|
||
gen_rsa_keys(".client");
|
||
let mut pubkey = String::new();
|
||
|
||
File::open(".client/id_rsa.pub")
|
||
.await?
|
||
.read_to_string(&mut pubkey)
|
||
.await?;
|
||
let privatekey = load_private_key_file(".client/id_rsa")?;
|
||
|
||
// init sock
|
||
// let edge_uuid = create_or_load_uuid("")?;
|
||
//let node_conf = parse_config(edge_uuid, &args).await?;
|
||
|
||
let sock_v4 = Socket::build(0, true, false, tos).await?;
|
||
|
||
let sock_multicast = Socket::build(MULTICAST_PORT, true, true, 0).await?;
|
||
// allow multicast
|
||
|
||
// TODO: set the sn's tcp socket
|
||
// let tcpsock = TCPSocket::build("121.4.79.234:1234").await?;
|
||
let tcp_pong = Arc::new(AtomicU64::new(0));
|
||
let edge = Node::new(
|
||
pubkey,
|
||
node_conf,
|
||
sock_v4,
|
||
sock_multicast,
|
||
token,
|
||
privatekey,
|
||
tcp_pong.clone(),
|
||
start_stop,
|
||
);
|
||
do_init_edge(edge)?;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
fn do_init_edge(edge: Node) -> Result<()> {
|
||
if let Err(_) = EDGE.set(edge) {
|
||
return Err(SDLanError::NormalError("initialize sn error"));
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
pub fn get_edge() -> &'static Node {
|
||
let edge = EDGE.get();
|
||
if edge.is_none() {
|
||
panic!("should init_edge first");
|
||
}
|
||
edge.unwrap()
|
||
}
|
||
|
||
pub struct RegisterSuperFeedback {
|
||
// 0 for success, other for error
|
||
pub result: u8,
|
||
pub message: String,
|
||
pub should_exit: bool,
|
||
}
|
||
|
||
pub struct StartStopInfo {
|
||
pub is_start: bool,
|
||
pub pkt_id: Option<u32>,
|
||
}
|
||
|
||
pub struct Node {
|
||
packet_id: AtomicU32,
|
||
|
||
pub network_id: AtomicU32,
|
||
|
||
pub tcp_pong: Arc<AtomicU64>,
|
||
|
||
start_stop_sender: Sender<StartStopInfo>,
|
||
|
||
// user token info
|
||
pub _token: Mutex<String>,
|
||
|
||
pub device_config: DeviceConfig,
|
||
pub device: Iface,
|
||
|
||
// authorize related
|
||
pub authorized: AtomicBool,
|
||
// pub header_key: RwLock<Arc<Vec<u8>>>,
|
||
pub encrypt_key: RwLock<Arc<Vec<u8>>>,
|
||
|
||
pub rsa_pubkey: String,
|
||
pub rsa_private: RsaPrivateKey,
|
||
|
||
pub config: NodeConfig,
|
||
// pub super_node: Vec<Peer>,
|
||
// pub super_attempts: AtomicU8,
|
||
|
||
// store pending, and known peers
|
||
pub pending_peers: PeerMap,
|
||
pub known_peers: PeerMap,
|
||
|
||
// pub tcp_sock_v4: TCPSocket,
|
||
pub udp_sock_multicast: Socket,
|
||
pub udp_sock_v4: Socket,
|
||
pub outer_ip_v4: AtomicU32,
|
||
pub udp_sock_v6: RwLock<Option<Arc<Socket>>>,
|
||
pub ipv6: RwLock<Option<SdlanSock>>,
|
||
|
||
pub multicast_sock: SdlanSock,
|
||
|
||
pub _local_v6: RwLock<Option<[u8; 16]>>,
|
||
|
||
pub stats: NodeStats,
|
||
|
||
// last register super time, in unix
|
||
pub _last_register_req: AtomicU64,
|
||
|
||
nat_type: Mutex<NatType>,
|
||
|
||
nat_cookie: AtomicU32,
|
||
cookie_match: DashMap<u32, oneshot::Sender<SdlStunProbeReply>>,
|
||
|
||
packet_id_match: DashMap<u32, oneshot::Sender<RegisterSuperFeedback>>,
|
||
}
|
||
|
||
unsafe impl Sync for Node {}
|
||
|
||
impl Node {
|
||
pub fn send_register_super_feedback(&self, pktid: u32, feed: RegisterSuperFeedback) {
|
||
match self.packet_id_match.remove(&pktid) {
|
||
Some(sender) => {
|
||
let _ = sender.1.send(feed);
|
||
}
|
||
None => {
|
||
return;
|
||
}
|
||
}
|
||
}
|
||
|
||
pub fn get_nat_type(&self) -> NatType {
|
||
self.nat_type.lock().unwrap().clone()
|
||
}
|
||
|
||
pub async fn start_without_feedback(&self, token: String) -> Result<()> {
|
||
*self._token.lock().unwrap() = token;
|
||
let _ = self
|
||
.start_stop_sender
|
||
.send(StartStopInfo {
|
||
is_start: true,
|
||
pkt_id: None,
|
||
})
|
||
.await;
|
||
Ok(())
|
||
}
|
||
|
||
pub async fn start_with_feedback(
|
||
&self,
|
||
token: String,
|
||
timeout: Duration,
|
||
) -> Result<RegisterSuperFeedback> {
|
||
*self._token.lock().unwrap() = token;
|
||
let (tx, rx) = oneshot::channel();
|
||
let id = self.get_next_packet_id();
|
||
self.packet_id_match.insert(id, tx);
|
||
let _ = self
|
||
.start_stop_sender
|
||
.send(StartStopInfo {
|
||
is_start: true,
|
||
pkt_id: Some(id),
|
||
})
|
||
.await;
|
||
debug!("start with feedback");
|
||
|
||
tokio::select! {
|
||
rx_info = rx => {
|
||
if let Ok(result) = rx_info {
|
||
self.packet_id_match.remove(&id);
|
||
Ok(result)
|
||
} else {
|
||
Err(SDLanError::NormalError("rx closed"))
|
||
}
|
||
}
|
||
_ = tokio::time::sleep(timeout) => {
|
||
Err(SDLanError::NormalError("timed out"))
|
||
}
|
||
}
|
||
}
|
||
|
||
pub async fn stop(&self) {
|
||
*self._token.lock().unwrap() = "".to_owned();
|
||
let _ = self
|
||
.start_stop_sender
|
||
.send(StartStopInfo {
|
||
is_start: false,
|
||
pkt_id: None,
|
||
})
|
||
.await;
|
||
}
|
||
|
||
pub fn new(
|
||
pubkey: String,
|
||
config: NodeConfig,
|
||
sock: Socket,
|
||
multicast_sock: Socket,
|
||
// tcpsock: TCPSocket,
|
||
token: &str,
|
||
private: RsaPrivateKey,
|
||
tcp_pong: Arc<AtomicU64>,
|
||
start_stop: Sender<StartStopInfo>,
|
||
) -> Self {
|
||
Self {
|
||
packet_id: AtomicU32::new(1),
|
||
network_id: AtomicU32::new(0),
|
||
_token: Mutex::new(token.to_owned()),
|
||
|
||
start_stop_sender: start_stop,
|
||
|
||
tcp_pong,
|
||
|
||
nat_type: Mutex::new(NatType::Blocked),
|
||
|
||
device_config: DeviceConfig::new(),
|
||
device: new_iface("dev", Mode::Tun),
|
||
|
||
authorized: AtomicBool::new(false),
|
||
encrypt_key: RwLock::new(Arc::new(Vec::new())),
|
||
// rsa_pubkey:
|
||
rsa_pubkey: pubkey,
|
||
rsa_private: private,
|
||
config,
|
||
// super_node: Vec::new(),
|
||
// super_attempts: AtomicU8::new(0),
|
||
pending_peers: PeerMap::new(),
|
||
known_peers: PeerMap::new(),
|
||
|
||
// tcp_sock_v4: tcpsock,
|
||
udp_sock_multicast: multicast_sock,
|
||
udp_sock_v4: sock,
|
||
outer_ip_v4: AtomicU32::new(0),
|
||
udp_sock_v6: RwLock::new(None),
|
||
|
||
ipv6: RwLock::new(None),
|
||
|
||
multicast_sock: SdlanSock {
|
||
family: AF_INET,
|
||
port: config::MULTICAST_PORT,
|
||
v4: config::MULITCAST_V4,
|
||
v6: [0; 16],
|
||
},
|
||
_local_v6: RwLock::new(None),
|
||
stats: NodeStats::new(),
|
||
_last_register_req: AtomicU64::new(0),
|
||
|
||
packet_id_match: DashMap::new(),
|
||
nat_cookie: AtomicU32::new(1),
|
||
cookie_match: DashMap::new(),
|
||
}
|
||
}
|
||
|
||
pub fn get_next_packet_id(&self) -> u32 {
|
||
self.packet_id.fetch_add(1, Ordering::Relaxed)
|
||
}
|
||
|
||
pub fn is_authorized(&self) -> bool {
|
||
// self.header_key
|
||
self.authorized.load(Ordering::Relaxed)
|
||
}
|
||
|
||
pub fn set_authorized(&self, authorized: bool, encrypt_key: Vec<u8>) {
|
||
self.authorized.store(authorized, Ordering::Relaxed);
|
||
*(self.encrypt_key.write().unwrap()) = Arc::new(encrypt_key);
|
||
}
|
||
|
||
/*
|
||
pub fn get_header_key(&self) -> Arc<Vec<u8>> {
|
||
// self.header_key.read().unwrap().clone()
|
||
}
|
||
*/
|
||
|
||
pub fn get_encrypt_key(&self) -> Arc<Vec<u8>> {
|
||
self.encrypt_key.read().unwrap().clone()
|
||
}
|
||
|
||
/*
|
||
pub fn sn_is_known(&self, sock: &SdlanSock) -> bool {
|
||
for sn in self.config.super_nodes.iter() {
|
||
if sn.family != sock.family || sn.port != sock.port {
|
||
continue;
|
||
}
|
||
if sn.family == AF_INET && sn.v4 == sock.v4 {
|
||
return true;
|
||
}
|
||
if sn.family == AF_INET6 && sn.v6 == sock.v6 {
|
||
return true;
|
||
}
|
||
}
|
||
return false;
|
||
}
|
||
*/
|
||
|
||
pub fn _remove_v6(&self) {
|
||
*(self.udp_sock_v6.write().unwrap()) = None;
|
||
}
|
||
|
||
/*
|
||
pub async fn send_to_v4<A: ToSocketAddrs>(&self, info: &[u8], target: A) -> Result<usize> {
|
||
match self.udp_sock_v4.send_to(info, target).await {
|
||
Ok(n) => Ok(n),
|
||
Err(e) => {
|
||
println!("failed to send");
|
||
Err(SDLanError::NormalError("failed to send"))
|
||
}
|
||
}
|
||
}
|
||
*/
|
||
|
||
/*
|
||
pub async fn send_to_v6<A: ToSocketAddrs>(&self, info: &[u8], target: A) -> Result<usize> {
|
||
let m = self.udp_sock_v6.read().unwrap().clone();
|
||
if let Some(ref l) = m.as_ref() {
|
||
match l.send_to(info, target).await {
|
||
Err(e) => {
|
||
return Err(SDLanError::NormalError("send error"));
|
||
}
|
||
Ok(n) => return Ok(n),
|
||
}
|
||
}
|
||
|
||
Err(SDLanError::NormalError("no udp6 conn is bined"))
|
||
}
|
||
*/
|
||
|
||
pub async fn send_nat_probe_reply(&self, cookie: u32, buf: SdlStunProbeReply) {
|
||
if let Some((_key, chan)) = self.cookie_match.remove(&cookie) {
|
||
let _ = chan.send(buf);
|
||
}
|
||
error!("failed to get such cookie stun probe");
|
||
}
|
||
|
||
pub async fn probe_nat_type(&self) -> NatType {
|
||
/*
|
||
if !self.is_authorized() {
|
||
return None;
|
||
}
|
||
*/
|
||
|
||
let Ok(reply1) = self
|
||
.send_and_wait_for_probe_reply(StunProbeAttr::None, &self.config.nat_server1)
|
||
.await
|
||
else {
|
||
*self.nat_type.lock().unwrap() = NatType::Blocked;
|
||
return NatType::Blocked;
|
||
};
|
||
|
||
let Ok(reply2) = self
|
||
.send_and_wait_for_probe_reply(StunProbeAttr::None, &self.config.nat_server2)
|
||
.await
|
||
else {
|
||
*self.nat_type.lock().unwrap() = NatType::Blocked;
|
||
return NatType::Blocked;
|
||
};
|
||
|
||
if reply1.ip == self.outer_ip_v4.load(Ordering::Relaxed) {
|
||
*self.nat_type.lock().unwrap() = NatType::NoNat;
|
||
return NatType::NoNat;
|
||
}
|
||
|
||
if reply1.ip != reply2.ip || reply1.port != reply2.port {
|
||
*self.nat_type.lock().unwrap() = NatType::Symmetric;
|
||
return NatType::Symmetric;
|
||
}
|
||
|
||
if let Ok(_reply3) = self
|
||
.send_and_wait_for_probe_reply(StunProbeAttr::Peer, &self.config.nat_server1)
|
||
.await
|
||
{
|
||
*self.nat_type.lock().unwrap() = NatType::FullCone;
|
||
return NatType::FullCone;
|
||
}
|
||
|
||
if let Ok(_reply4) = self
|
||
.send_and_wait_for_probe_reply(StunProbeAttr::Port, &self.config.nat_server1)
|
||
.await
|
||
{
|
||
*self.nat_type.lock().unwrap() = NatType::ConeRestricted;
|
||
return NatType::ConeRestricted;
|
||
} else {
|
||
*self.nat_type.lock().unwrap() = NatType::PortRestricted;
|
||
return NatType::PortRestricted;
|
||
}
|
||
}
|
||
|
||
pub async fn send_unregister_super(&self) -> Result<()> {
|
||
let content =
|
||
encode_to_tcp_message::<SdlEmpty>(None, 0, PacketType::UnRegisterSuper as u8).unwrap();
|
||
|
||
let conn = get_tcp_conn();
|
||
let _ = conn.send(&content).await;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn send_and_wait_for_probe_reply(
|
||
&self,
|
||
msgattr: StunProbeAttr,
|
||
to_server: &SocketAddr,
|
||
) -> Result<SdlStunProbeReply> {
|
||
let cookie = self.nat_cookie.fetch_add(1, Ordering::Relaxed);
|
||
let probe = SdlStunProbe {
|
||
attr: msgattr as u32,
|
||
cookie,
|
||
};
|
||
|
||
let (tx, rx) = oneshot::channel();
|
||
self.cookie_match.insert(cookie, tx);
|
||
// let cookie = msg.cookie;
|
||
let msg = encode_to_udp_message(Some(probe), PacketType::StunProbe as u8).unwrap();
|
||
if let Err(_e) = self.udp_sock_v4.send_to(&msg, to_server).await {
|
||
self.cookie_match.remove(&cookie);
|
||
return Err(SDLanError::NormalError("send error"));
|
||
}
|
||
|
||
tokio::select! {
|
||
_ = tokio::time::sleep(Duration::from_secs(3)) => {
|
||
self.cookie_match.remove(&cookie);
|
||
return Err(SDLanError::NormalError("timed out"));
|
||
}
|
||
reply = rx => {
|
||
self.cookie_match.remove(&cookie);
|
||
if let Ok(reply) = reply {
|
||
// reply received,
|
||
return Ok(reply);
|
||
// println!("got nat ip: {}:{}", ip_to_string(&reply.ip), reply.port);
|
||
}
|
||
return Err(SDLanError::NormalError("reply recv error"));
|
||
// step 1 received
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
pub struct PeerMap {
|
||
pub peers: DashMap<Mac, Arc<EdgePeer>>,
|
||
}
|
||
|
||
#[allow(unused)]
|
||
impl PeerMap {
|
||
pub fn new() -> PeerMap {
|
||
Self {
|
||
peers: DashMap::new(),
|
||
}
|
||
}
|
||
|
||
pub fn get_peer(&self, ip: &Mac) -> Option<Arc<EdgePeer>> {
|
||
if let Some(v) = self.peers.get(ip) {
|
||
Some(v.clone())
|
||
} else {
|
||
None
|
||
}
|
||
}
|
||
|
||
pub fn clear(&self) {
|
||
self.peers.clear();
|
||
}
|
||
|
||
pub fn get_peer_by_sock(&self, sock: &SdlanSock) -> Option<Arc<EdgePeer>> {
|
||
for s in self.peers.iter() {
|
||
let m = s.sock.read().unwrap();
|
||
if is_sdlan_sock_equal(&m, sock) {
|
||
return Some(s.value().clone());
|
||
}
|
||
}
|
||
None
|
||
}
|
||
|
||
pub fn delete_peer_with_mac(&self, mac: &Mac) {
|
||
self.peers.remove(mac);
|
||
}
|
||
|
||
pub fn insert_peer(&self, mac: Mac, p: Arc<EdgePeer>) {
|
||
self.peers.insert(mac, p);
|
||
}
|
||
}
|
||
|
||
pub struct NodeStats {
|
||
pub tx_p2p: AtomicU64,
|
||
pub rx_p2p: AtomicU64,
|
||
pub tx_sup: AtomicU64,
|
||
pub rx_sup: AtomicU64,
|
||
pub tx_broadcast: AtomicU64,
|
||
pub rx_broadcast: AtomicU64,
|
||
|
||
pub last_sup: AtomicU64,
|
||
pub last_p2p: AtomicU64,
|
||
}
|
||
|
||
impl NodeStats {
|
||
pub fn new() -> Self {
|
||
Self {
|
||
tx_p2p: AtomicU64::new(0),
|
||
rx_p2p: AtomicU64::new(0),
|
||
tx_sup: AtomicU64::new(0),
|
||
rx_sup: AtomicU64::new(0),
|
||
tx_broadcast: AtomicU64::new(0),
|
||
rx_broadcast: AtomicU64::new(0),
|
||
|
||
last_p2p: AtomicU64::new(0),
|
||
last_sup: AtomicU64::new(0),
|
||
}
|
||
}
|
||
}
|
||
|
||
use sdlan_sn_rs::peer::SdlanSock;
|
||
|
||
use crate::config::{self, MULTICAST_PORT, REGISTER_INTERVAL};
|
||
pub struct NodeConfig {
|
||
// node name
|
||
pub name: String,
|
||
// 允许路由
|
||
pub allow_routing: bool,
|
||
|
||
// 丢弃多播,广播消息
|
||
pub _drop_multicast: bool,
|
||
|
||
// 允许p2p打洞
|
||
pub allow_p2p: bool,
|
||
|
||
// mtu of the tun
|
||
pub mtu: u32,
|
||
|
||
// udp消息的服务类型
|
||
pub _tos: u8,
|
||
|
||
// 打洞时候,时间间隔
|
||
pub _register_super_interval: u16,
|
||
|
||
// 打洞时候,register消息的ttl
|
||
pub register_ttl: u8,
|
||
|
||
// 本地打开的udp端口
|
||
pub _local_port: u16,
|
||
|
||
pub node_uuid: String,
|
||
|
||
// pub super_attempts: AtomicU8,
|
||
pub super_nodes: Vec<SdlanSock>,
|
||
pub super_node_index: AtomicU8,
|
||
|
||
pub nat_server1: SocketAddr,
|
||
pub nat_server2: SocketAddr,
|
||
}
|
||
|
||
#[derive(Debug)]
|
||
pub struct EdgePeer {
|
||
// pub mac: Mac,
|
||
pub dev_addr: IpSubnet,
|
||
|
||
// 对端对外开放的ip和端口信息
|
||
pub sock: RwLock<SdlanSock>,
|
||
// peer's ipv6 info
|
||
pub _v6_info: RwLock<Option<SdlanSock>>,
|
||
|
||
pub timeout: u8,
|
||
|
||
// 最近一次遇见
|
||
pub last_seen: AtomicU64,
|
||
// 最近一次p2p消息
|
||
pub last_p2p: AtomicU64,
|
||
// 最近一次合法时间
|
||
pub _last_valid_timestamp: AtomicU64,
|
||
|
||
// 当时间超过重新注册的时间之后,
|
||
// pub has_sent_last_p2p: AtomicBool,
|
||
|
||
// 最近一次发送query
|
||
pub last_sent_query: AtomicU64,
|
||
}
|
||
|
||
impl EdgePeer {
|
||
pub fn new(
|
||
// mac: Mac,
|
||
net_addr: u32,
|
||
net_bit_len: u8,
|
||
sock: &SdlanSock,
|
||
v6info: &Option<V6Info>,
|
||
now: u64,
|
||
) -> Self {
|
||
let mut v6_info = None;
|
||
if let Some(v6info) = v6info {
|
||
v6_info = Some(SdlanSock {
|
||
family: AF_INET6,
|
||
port: v6info.port,
|
||
v4: [0; 4],
|
||
v6: v6info.v6,
|
||
})
|
||
}
|
||
Self {
|
||
// mac,
|
||
dev_addr: IpSubnet::new(net_addr, net_bit_len),
|
||
sock: RwLock::new(sock.deepcopy()),
|
||
_v6_info: RwLock::new(v6_info),
|
||
timeout: REGISTER_INTERVAL,
|
||
last_p2p: AtomicU64::new(0),
|
||
last_seen: AtomicU64::new(0),
|
||
_last_valid_timestamp: AtomicU64::new(now),
|
||
last_sent_query: AtomicU64::new(0),
|
||
}
|
||
}
|
||
}
|