diff --git a/Cargo.lock b/Cargo.lock index e77bbcd..188963b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -849,6 +849,18 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "local-ip-address" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136ef34e18462b17bf39a7826f8f3bbc223341f8e83822beb8b77db9a3d49696" +dependencies = [ + "libc", + "neli", + "thiserror", + "windows-sys 0.48.0", +] + [[package]] name = "lock_api" version = "0.4.12" @@ -913,6 +925,31 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "neli" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1100229e06604150b3becd61a4965d5c70f3be1759544ea7274166f4be41ef43" +dependencies = [ + "byteorder", + "libc", + "log", + "neli-proc-macros", +] + +[[package]] +name = "neli-proc-macros" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c168194d373b1e134786274020dae7fc5513d565ea2ebb9bc9ff17ffb69106d4" +dependencies = [ + "either", + "proc-macro2", + "quote", + "serde", + "syn 1.0.109", +] + [[package]] name = "nom" version = "7.1.3" @@ -1406,6 +1443,7 @@ dependencies = [ "dns-lookup", "etherparse", "futures-util", + "local-ip-address", "num_enum", "once_cell", "prost", diff --git a/Cargo.toml b/Cargo.toml index 19979a2..af853cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ dashmap = "6.0.1" dns-lookup = "2.0.4" etherparse = "0.15.0" futures-util = "0.3.30" +local-ip-address = "0.6.1" num_enum = "0.7.2" once_cell = "1.19.0" prost = "0.12.6" diff --git a/message.proto b/message.proto index 7d2b9bb..86e2401 100644 --- a/message.proto +++ b/message.proto @@ -56,7 +56,7 @@ message SDLQueryInfo { message SDLPeerInfo { uint32 dst_ip = 1; - SDLV4Info v4_info = 2; + SDLV4Info v4_info = 2; optional SDLV6Info v6_info = 3; } @@ -78,6 +78,7 @@ message SDLSendRegisterEvent { uint32 dst_ip = 1; uint32 nat_ip = 2; uint32 nat_port = 3; + optional SDLV6Info v6_info = 4; } message SDLNetworkShutdownEvent { @@ -115,6 +116,7 @@ message SDLStunRequest { uint32 network_id = 3; uint32 ip = 4; uint32 nat_type = 5; + optional SDLV6Info v6_info = 6; } message SDLStunReply { diff --git a/src/bin/build_pb/main.rs b/src/bin/build_pb/main.rs index af26abd..67a37a4 100644 --- a/src/bin/build_pb/main.rs +++ b/src/bin/build_pb/main.rs @@ -3,6 +3,6 @@ fn main() { .out_dir("src/pb") // .out_dir("../tcp_mock/pb") .protoc_arg("--experimental_allow_proto3_optional") - .compile_protos(&["message.proto"], &["../sdlan/"]) + .compile_protos(&["message.proto"], &["."]) .unwrap(); } diff --git a/src/network/async_main.rs b/src/network/async_main.rs index 535c5ec..0bd6d0a 100644 --- a/src/network/async_main.rs +++ b/src/network/async_main.rs @@ -1,19 +1,21 @@ use std::net::IpAddr; use std::sync::atomic::Ordering; use std::time::Duration; +use std::sync::Arc; use crate::config::TCP_PING_TIME; +use crate::network::ipv6::run_ipv6; use crate::network::{get_edge, ping_to_sn, read_and_parse_packet, RegisterSuperFeedback}; use crate::pb::{ encode_to_tcp_message, encode_to_udp_message, SdlData, SdlDevAddr, SdlRegisterSuper, - SdlRegisterSuperAck, SdlRegisterSuperNak, SdlSendRegisterEvent, SdlStunRequest, + SdlRegisterSuperAck, SdlRegisterSuperNak, SdlSendRegisterEvent, SdlStunRequest, Sdlv6Info, }; use crate::tcp::{init_tcp_conn, EventType, NakMsgCode, PacketType, SdlanTcp}; use crate::utils::{send_to_sock, CommandLine}; use crate::ConnectionState; use etherparse::IpHeaders; -use sdlan_sn_rs::config::{AF_INET, SDLAN_DEFAULT_TTL}; -use sdlan_sn_rs::peer::SdlanSock; +use sdlan_sn_rs::config::{AF_INET, AF_INET6, SDLAN_DEFAULT_TTL}; +use sdlan_sn_rs::peer::{SdlanSock, V6Info}; use sdlan_sn_rs::utils::Result; use sdlan_sn_rs::utils::{ aes_encrypt, get_current_timestamp, ip_to_string, is_multi_broadcast, rsa_decrypt, @@ -189,11 +191,21 @@ async fn handle_tcp_event(edge: &Node, eventtype: EventType, eventprotobuf: &[u8 return; }; let v4 = reg.nat_ip.to_be_bytes(); + let mut v6_sock = None; + if let Some(v6_info) = reg.v6_info { + if let Ok(v6_bytes) = v6_info.v6.try_into() { + v6_sock = Some(V6Info { + port: v6_info.port as u16, + v6: v6_bytes, + }); + } + } check_peer_registration_needed( edge, true, reg.dst_ip, - &None, + // &v6_sock, + &v6_sock, &SdlanSock { family: AF_INET, port: reg.nat_port as u16, @@ -249,6 +261,8 @@ pub async fn async_main( // let token = args.token.clone(); let cancel_tcp = cancel.clone(); + let (ipv6_network_restarter, rx) = channel(10); + tokio::spawn(run_ipv6(edge, rx)); init_tcp_conn( cancel_tcp, &args.tcp, @@ -306,6 +320,7 @@ pub async fn async_main( // tcp_pong, start_stop_chan, connecting_chan, + Some(ipv6_network_restarter), ); // tcp_conn.send("hello".as_bytes()).await; @@ -402,13 +417,26 @@ async fn run_edge_loop(eee: &'static Node, cancel: CancellationToken) { } async fn send_stun_request(eee: &Node) { + let sdl_v6_info = match *eee.ipv6.read().unwrap() { + Some(ref l) => { + Some(Sdlv6Info { + port: l.port as u32, + v6: Vec::from(l.v6), + }) + } + None => { + None + } + }; let req = SdlStunRequest { cookie: 0, client_id: eee.config.node_uuid.clone(), network_id: eee.network_id.load(Ordering::Relaxed), ip: eee.device_config.get_ip(), nat_type: eee.get_nat_type() as u32, + v6_info: sdl_v6_info, }; + debug!("stun request: {:?}", req); let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap(); if let Err(e) = send_to_sock( eee, @@ -421,7 +449,39 @@ async fn send_stun_request(eee: &Node) { } } -async fn loop_socket_v4( +pub async fn loop_socket_v6( + eee: &Node, + socket: Arc, + cancel: CancellationToken, +) { + debug!("loop sock v6"); + loop { + tokio::select! { + _ = cancel.cancelled() => { + break; + } + _ = read_and_parse_packet(eee, &socket, Some(Duration::from_secs(10)), false) => { } + _ = tokio::time::sleep(Duration::from_secs(10)) => { + /* + let req = SdlStunRequest { + cookie: 0, + client_id: eee.config.node_uuid.clone(), + network_id: eee.network_id.load(Ordering::Relaxed), + ip: eee.device_config.get_ip(), + nat_type: 0, + }; + let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap(); + if let Err(e) = send_to_sock(eee, &msg, &eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize]).await { + error!("failed to send to sock: {:?}", e); + }*/ + } + } + } + debug!("loop_socket_v4 exited"); + +} + +pub async fn loop_socket_v4( eee: &Node, socket: &Socket, cancel: CancellationToken, diff --git a/src/network/ipv6.rs b/src/network/ipv6.rs new file mode 100644 index 0000000..900b104 --- /dev/null +++ b/src/network/ipv6.rs @@ -0,0 +1,80 @@ +use std::{net::{IpAddr, Ipv6Addr}, time::Duration}; +use std::sync::Mutex; + +use sdlan_sn_rs::{config::AF_INET6, peer::SdlanSock}; +use tokio::{net::UdpSocket, sync::mpsc::Receiver}; +use tokio_util::sync::CancellationToken; +use tracing::error; +use std::sync::Arc; + +use crate::{network::{loop_socket_v4, loop_socket_v6}, utils::Socket}; + +use super::Node; + +pub async fn run_ipv6(edge: &'static Node, mut v6_may_change: Receiver) { + v6_may_change.recv().await; + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + let ipv6 = get_current_ipv6(); + if ipv6.is_none() { + v6_may_change.recv().await; + continue; + } + let ipv6 = ipv6.unwrap(); + + /* + let Ok(udpsocket) = UdpSocket::bind(format!("{}:0", ipv6)).await else { + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + }; + let port = udpsocket.local_addr().unwrap().port(); + let addr = udpsocket.local_addr().unwrap().ip(); + println!("ipv6: {}:{}", addr, port); + */ + + let socket = Arc::new(Socket::build_v6(ipv6, 0).await.unwrap()); + let port = socket.get_local_port(); + let socket_clone = socket.clone(); + let cancel = CancellationToken::new(); + + + *edge.ipv6.write().unwrap() = Some(SdlanSock { + family: AF_INET6, + port, + v4: [0;4], + v6: ipv6.octets(), + }); + *edge.udp_sock_v6.write().unwrap() = Some(socket); + tokio::select! { + _ = loop_socket_v6(edge, socket_clone, cancel.clone()) => { + + } + _ = v6_may_change.recv() => { + cancel.cancel() + } + } + } +} + +pub fn get_current_ipv6() -> Option { + let Ok(ips) = local_ip_address::list_afinet_netifas() else { + error!("failed to get ip address"); + return None; + }; + + for (_, ip) in ips { + match ip { + IpAddr::V4(_ipv4) => { + continue; + } + IpAddr::V6(ipv6) => { + if (ipv6.octets()[0] & 0x70 == 0x20) { + println!("got global ip: {}", ipv6); + return Some(ipv6) + } + } + } + } + + None +} diff --git a/src/network/mod.rs b/src/network/mod.rs index 16db429..8162463 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -4,6 +4,8 @@ pub use node::*; mod async_main; pub use async_main::*; +mod ipv6; + mod packet; pub use packet::*; diff --git a/src/network/node.rs b/src/network/node.rs index e6bfd73..9391c87 100644 --- a/src/network/node.rs +++ b/src/network/node.rs @@ -136,7 +136,8 @@ pub struct Node { pub udp_sock_multicast: Socket, pub udp_sock_v4: Socket, pub outer_ip_v4: AtomicU32, - pub udp_sock_v6: RwLock>>, + pub udp_sock_v6: RwLock>>, + pub ipv6: RwLock>, pub multicast_sock: SdlanSock, @@ -268,7 +269,9 @@ impl Node { udp_sock_multicast: multicast_sock, udp_sock_v4: sock, outer_ip_v4: AtomicU32::new(0), - udp_sock_v6: RwLock::new(Arc::new(None)), + udp_sock_v6: RwLock::new(None), + + ipv6: RwLock::new(None), multicast_sock: SdlanSock { family: AF_INET, @@ -328,7 +331,7 @@ impl Node { */ pub fn _remove_v6(&self) { - *(self.udp_sock_v6.write().unwrap()) = Arc::new(None); + *(self.udp_sock_v6.write().unwrap()) = None; } /* diff --git a/src/network/packet.rs b/src/network/packet.rs index b7a99e0..fe859ff 100644 --- a/src/network/packet.rs +++ b/src/network/packet.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, sync::atomic::Ordering, time::Duration}; +use std::{net::SocketAddr, sync::{atomic::Ordering, RwLock}, time::Duration}; use crate::{ config::REGISTER_INTERVAL, @@ -12,7 +12,7 @@ use crate::{ use etherparse::IpHeaders; use prost::Message; use sdlan_sn_rs::{ - config::AF_INET, + config::{AF_INET, AF_INET6}, peer::{is_sdlan_sock_equal, SdlanSock, V6Info}, utils::{ aes_decrypt, get_current_timestamp, get_sdlan_sock_from_socketaddr, ip_to_string, @@ -239,6 +239,15 @@ pub async fn handle_packet_peer_info( error!("failed to convert v4"); return Ok(()); }; + + let mut v6: [u8; 16] = [0;16]; + let mut v6_port = 0; + if let Some(v6_info) = pi.v6_info { + if let Ok(v6_bytes) = v6_info.v6.as_slice().try_into() { + v6 = v6_bytes; + v6_port = v6_info.port as u16; + } + } // let src_ip = u32::from_be_bytes(v4_u32); if pi.dst_ip == 0 { @@ -246,7 +255,7 @@ pub async fn handle_packet_peer_info( } else { match eee.pending_peers.get_peer(&pi.dst_ip) { Some(edgeinfo) => { - let sock = SdlanSock { + let mut sock = SdlanSock { family: AF_INET, port: v4.port as u16, v4: v4_u32, @@ -258,7 +267,15 @@ pub async fn handle_packet_peer_info( ip_to_string(&pi.dst_ip), sock.to_string() ); - send_register(eee, &sock, &None).await; + let mut v6_info = None; + if v6_port != 0 { + v6_info = Some(V6Info { + port: v6_port, + v6, + }) + } + send_register(eee, &sock, &v6_info).await; + // register_with_local_peers(eee).await; } None => { @@ -405,6 +422,7 @@ async fn handle_packet_register( origin_sender.to_string(), ); } + // check_peer_registration_needed(eee, from_sn, reg.src_ip, &None, origin_sender).await; check_peer_registration_needed(eee, from_sn, reg.src_ip, &None, origin_sender).await; Ok(()) @@ -463,7 +481,7 @@ pub async fn check_peer_registration_needed( eee: &Node, from_sn: bool, src_ip: u32, - v6_info: &Option, + _v6_info: &Option, peer_sock: &SdlanSock, ) { let mut p = eee.known_peers.get_peer(&src_ip); @@ -475,18 +493,26 @@ pub async fn check_peer_registration_needed( } match p { None => { - let _ = register_with_new_peer(eee, from_sn, src_ip, v6_info, peer_sock).await; + let _ = register_with_new_peer(eee, from_sn, src_ip, _v6_info, peer_sock).await; // unimplemented!(); } Some(k) => { + let mut ipv4_to_ipv6 = false; let now = get_current_timestamp(); if !from_sn { k.last_p2p.store(now, Ordering::Relaxed); } + if peer_sock.family == AF_INET6 && k.sock.read().unwrap().family == AF_INET { + println!("changing to ipv6"); + *k.sock.write().unwrap() = peer_sock.deepcopy(); + ipv4_to_ipv6 = true; + } else { + println!("already is ipv6"); + } let last_seen = k.last_seen.load(Ordering::Relaxed); // more than 3 seconds if now - last_seen > 1 { - check_known_peer_sock_change(eee, from_sn, src_ip, v6_info, peer_sock, now).await; + check_known_peer_sock_change(eee, from_sn, src_ip, peer_sock, now, ipv4_to_ipv6).await; } } } @@ -496,17 +522,18 @@ async fn check_known_peer_sock_change( eee: &Node, from_sn: bool, ip: u32, - v6_info: &Option, + // v6_info: &Option, // dev_addr: &IpSubnet, peersock: &SdlanSock, when: u64, + ipv4_to_ipv6: bool, ) { if is_multi_broadcast(ip) { return; } match eee.known_peers.get_peer(&ip) { Some(p) => { - if !is_sdlan_sock_equal(&p.sock.read().unwrap(), peersock) { + if !ipv4_to_ipv6 && !is_sdlan_sock_equal(&p.sock.read().unwrap(), peersock) { if !from_sn { info!( "peer changed: {}: {} -> {}", @@ -515,7 +542,7 @@ async fn check_known_peer_sock_change( peersock.to_string() ); eee.known_peers.delete_peer_with_ip(&ip); - register_with_new_peer(eee, from_sn, ip, v6_info, peersock).await; + register_with_new_peer(eee, from_sn, ip, &None, peersock).await; } else { // from sn, sn could see a different sock with us, just ignore it } @@ -544,7 +571,7 @@ async fn register_with_new_peer( ip, eee.device_config.get_net_bit(), peersock, - v6_info, + &None, now, )); debug!( @@ -569,7 +596,7 @@ async fn register_with_new_peer( let _ = eee.udp_sock_v4.set_ttl(eee.config.register_ttl as u32); while alter > 0 { temp.port += 1; - send_register(eee, &temp, &None).await; + send_register(eee, &temp, v6_info).await; alter -= 1; } let _ = eee.udp_sock_v4.set_ttl(ttl); @@ -583,7 +610,7 @@ async fn register_with_new_peer( eee, &eee.config.super_nodes [eee.config.super_node_index.load(Ordering::Relaxed) as usize], - &None, + v6_info, ) .await; } else { @@ -609,7 +636,11 @@ async fn register_with_local_peers(eee: &Node) { } } -async fn send_register(eee: &Node, sock: &SdlanSock, _v6_info: &Option) { +async fn send_register( + eee: &Node, + sock: &SdlanSock, + _v6_info: &Option +) { if !eee.config.allow_p2p { debug!("skipping register as p2p is disabled"); return; @@ -629,6 +660,14 @@ async fn send_register(eee: &Node, sock: &SdlanSock, _v6_info: &Option) let msg = encode_to_udp_message(Some(register), PacketType::Register as u8).unwrap(); let _ = send_to_sock(eee, &msg, sock).await; + if let Some(ref v6_info) = _v6_info { + let _ = send_to_sock(eee, &msg, &SdlanSock { + family: AF_INET6, + port: v6_info.port, + v4: [0;4], + v6: v6_info.v6, + }).await; + } /* let key = eee.get_header_key(); if key.len() > 0 { @@ -741,6 +780,8 @@ fn peer_set_p2p_confirmed(eee: &Node, src_ip: u32, sender_sock: &SdlanSock) { Some(scantmp) => { eee.known_peers.delete_peer_with_ip(&src_ip); scan = scantmp; + // set the remote peer sock + *scan.sock.write().unwrap() = sender_sock.deepcopy(); scan.dev_addr.net_addr.store(src_ip, Ordering::Relaxed); scan.dev_addr .net_bit_len diff --git a/src/pb/message.rs b/src/pb/message.rs index 3b51b6d..500e8e7 100644 --- a/src/pb/message.rs +++ b/src/pb/message.rs @@ -115,6 +115,8 @@ pub struct SdlSendRegisterEvent { pub nat_ip: u32, #[prost(uint32, tag = "3")] pub nat_port: u32, + #[prost(message, optional, tag = "4")] + pub v6_info: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -167,6 +169,8 @@ pub struct SdlStunRequest { pub ip: u32, #[prost(uint32, tag = "5")] pub nat_type: u32, + #[prost(message, optional, tag = "6")] + pub v6_info: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/src/tcp/tcp_conn.rs b/src/tcp/tcp_conn.rs index c9b2492..ddbe7b0 100644 --- a/src/tcp/tcp_conn.rs +++ b/src/tcp/tcp_conn.rs @@ -38,6 +38,7 @@ pub struct ReadWriteActor { from_tcp: Sender, cancel: CancellationToken, connecting_chan: Option>, + ipv6_network_restarter: Option>, } impl ReadWriteActor { @@ -48,6 +49,7 @@ impl ReadWriteActor { connected: Arc, pong_time: Arc, connecting_chan: Option>, + ipv6_network_restarter: Option>, ) -> Self { Self { // to_tcp, @@ -57,6 +59,7 @@ impl ReadWriteActor { remote: remote.to_owned(), from_tcp, connecting_chan, + ipv6_network_restarter } } @@ -138,6 +141,9 @@ impl ReadWriteActor { if let Some(ref connecting_chan) = self.connecting_chan { let _ = connecting_chan.send(ConnectionState::Connected).await; } + if let Some(ref ipv6_restarter) = self.ipv6_network_restarter { + let _ = ipv6_restarter.send(true).await; + } // stream.write("hello".as_bytes()).await; let (reader, mut write) = stream.into_split(); @@ -257,6 +263,7 @@ impl ReadWriterHandle { start_stop_chan: Receiver, // cancel: CancellationToken, connecting_chan: Option>, + ipv6_network_restarter: Option>, ) -> Self where T: for<'b> Fn(&'b mut TcpStream, Option) -> BoxFuture<'b, ()> + Send + 'static, @@ -269,7 +276,7 @@ impl ReadWriterHandle { let (from_tcp, mut data_from_tcp) = channel(20); let connected: Arc = Arc::new(AtomicBool::new(false)); - let actor = ReadWriteActor::new(cancel, addr, from_tcp, connected.clone(), pong_time, connecting_chan); + let actor = ReadWriteActor::new(cancel, addr, from_tcp, connected.clone(), pong_time, connecting_chan, ipv6_network_restarter); tokio::spawn(async move { actor .run(true, to_tcp, on_connected, on_disconnected, start_stop_chan) @@ -303,7 +310,8 @@ pub fn init_tcp_conn<'a, T, T3, T2, F, F2>( pong_time: Arc, // cancel: CancellationToken, start_stop_chan: Receiver, - connecting_chan: Option> + connecting_chan: Option>, + ipv6_network_restarter: Option>, ) where T: for<'b> Fn(&'b mut TcpStream, Option) -> BoxFuture<'b, ()> + Send + 'static, T3: Fn() -> F2 + Send + 'static, @@ -320,6 +328,7 @@ pub fn init_tcp_conn<'a, T, T3, T2, F, F2>( pong_time, start_stop_chan, connecting_chan, + ipv6_network_restarter, ); GLOBAL_TCP_HANDLE diff --git a/src/utils/socks.rs b/src/utils/socks.rs index 5c7397c..f3caeaa 100644 --- a/src/utils/socks.rs +++ b/src/utils/socks.rs @@ -11,6 +11,13 @@ use tokio::net::UdpSocket; use crate::network::Node; +pub struct SocketV6 { + ipv6: Option, + port: u16, + has_v6: bool, + +} + pub struct Socket { udp: UdpSocket, } @@ -21,6 +28,13 @@ impl Socket { Ok(m) } + pub fn get_local_port(&self) -> u16 { + match self.udp.local_addr() { + Ok(addr) => addr.port(), + Err(_e) => 0, + } + } + pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> { let m = self.udp.recv_from(buf).await?; Ok(m) @@ -42,6 +56,13 @@ impl Socket { } } + pub async fn build_v6(v6: Ipv6Addr, port: u16) -> Result { + let udp = UdpSocket::bind(format!("[{}]:{}", v6, port)).await?; + Ok(Self { + udp + }) + } + pub async fn build(port: u16, bind_any: bool, join_multicast: bool, tos: u32) -> Result { let addr = match bind_any { true => "0.0.0.0", @@ -121,6 +142,7 @@ pub async fn send_to_sock( } Some(sk) => { let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::from(sock.v6)), sock.port); + debug!("send with ipv6"); sk.send_to(content, addr).await?; return Ok(()); }