From 8645b655340a5685e6a2fabbddd606be1e14056a Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 27 Feb 2026 16:30:46 +0800 Subject: [PATCH] changed tcp to quic, can remove tcp_conn.rs safely --- Cargo.lock | 30 +- Cargo.toml | 1 + src/bin/punchnet/api/mod.rs | 47 ++- src/bin/punchnet/main.rs | 14 +- src/lib.rs | 2 +- src/network/async_main.rs | 56 +--- src/network/node.rs | 4 +- src/network/packet.rs | 8 +- src/tcp/mod.rs | 6 +- src/tcp/quic.rs | 637 ++++++++++++++++++++++++++++++++++++ src/tcp/tcp_codec.rs | 40 ++- src/tcp/tcp_conn.rs | 479 --------------------------- src/utils/command.rs | 44 ++- 13 files changed, 821 insertions(+), 547 deletions(-) create mode 100644 src/tcp/quic.rs diff --git a/Cargo.lock b/Cargo.lock index 9fb71bb..0a885e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -280,7 +280,7 @@ dependencies = [ "anstyle", "ar", "cargo_toml", - "clap 4.5.54", + "clap 4.5.60", "elf", "env_logger", "glob", @@ -387,18 +387,19 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.54" +version = "4.5.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6e6ff9dcd79cff5cd969a17a545d79e84ab086e444102a591e288a8aa3ce394" +checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a" dependencies = [ "clap_builder", + "clap_derive", ] [[package]] name = "clap_builder" -version = "4.5.54" +version = "4.5.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa42cf4d2b7a41bc8f663a7cab4031ebafa1bf3875705bfaf8466dc60ab52c00" +checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876" dependencies = [ "anstream", "anstyle", @@ -407,10 +408,22 @@ dependencies = [ ] [[package]] -name = "clap_lex" -version = "0.7.7" +name = "clap_derive" +version = "4.5.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3e64b0cc0439b12df2fa678eae89a1c56a529fd067a9115f7827f1fffd22b32" +checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.114", +] + +[[package]] +name = "clap_lex" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" [[package]] name = "cmake" @@ -1991,6 +2004,7 @@ version = "1.0.0" dependencies = [ "bytes", "cargo-deb", + "clap 4.5.60", "crc", "crc32fast", "dashmap 6.1.0", diff --git a/Cargo.toml b/Cargo.toml index 1993892..6c6b3d7 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ bytes = "1.11.1" quinn = "0.11.9" rustls = "0.23.37" rustls-pemfile = "2.2.0" +clap = { version = "4.5.60", features = ["derive"] } # rolling-file = { path = "../rolling-file" } [target.'cfg(unix)'.dependencies] diff --git a/src/bin/punchnet/api/mod.rs b/src/bin/punchnet/api/mod.rs index b7ce30d..9c2a91f 100644 --- a/src/bin/punchnet/api/mod.rs +++ b/src/bin/punchnet/api/mod.rs @@ -40,7 +40,7 @@ pub struct LoginData { pub audit: u32, pub network_id: u32, pub network_name: String, - pub network_domain: String, + // pub network_domain: String, pub exit_node: Vec, } @@ -140,6 +140,9 @@ pub struct ConnectResponse { #[derive(Deserialize, Debug)] pub struct ConnectData { pub ip: String, + pub mask_len: u8, + pub hostname: String, + pub identity_id: u32, pub resource_list: Vec, pub node_list: Vec, // pub acl: Vec, @@ -147,8 +150,10 @@ pub struct ConnectData { #[derive(Deserialize, Debug)] pub struct ResourceList { + pub id: i32, pub name: String, pub url: String, + pub connection_status: String, } #[derive(Deserialize, Debug)] @@ -189,4 +194,44 @@ pub async fn disconnect( access_token, }; post_with_data(&url, data).await +} + +#[derive(Serialize)] +struct GetResourceRequest<'a> { + client_id: &'a str, + access_token: &'a str, + id: i32, +} + +#[derive(Deserialize, Debug)] +pub struct GetResourceResponse { + pub code: i32, + pub message: String, + pub data: Option, +} + +#[derive(Deserialize, Debug)] +pub struct ResourceData { + pub id: i32, + pub name: String, + pub ip: String, + pub system: String, + pub connection_status: String, + pub resource_list: Vec, +} + +pub async fn get_node_resource( + url_prefix: &str, + client_id: &str, + access_token: &str, + id: i32, +) -> Result { + let data = GetResourceRequest { + client_id, + access_token, + id, + }; + + let url = format!("{}/get_node_resource", url_prefix); + post_with_data(&url, data).await } \ No newline at end of file diff --git a/src/bin/punchnet/main.rs b/src/bin/punchnet/main.rs index b75e56b..2505e5f 100755 --- a/src/bin/punchnet/main.rs +++ b/src/bin/punchnet/main.rs @@ -1,5 +1,6 @@ mod api; +use punchnet::create_or_load_mac; use punchnet::get_base_dir; use punchnet::get_edge; use punchnet::mod_hostname; @@ -10,11 +11,15 @@ use punchnet::CommandLine; use punchnet::CommandLineInput; use sdlan_sn_rs::log; +use sdlan_sn_rs::utils::create_or_load_uuid; use tracing::error; use std::net::ToSocketAddrs; use structopt::StructOpt; +use crate::api::TEST_PREFIX; +use crate::api::login_with_token; + #[tokio::main] @@ -22,9 +27,14 @@ async fn main() { set_base_dir("/usr/local/punchnet"); let _guard = log::init_log(&format!("{}/.output", get_base_dir())); + let client_id = create_or_load_uuid(&format!("{}/.id", get_base_dir()), None).unwrap(); + let token = "49722584273728716817720074439183"; + + let mac = create_or_load_mac(); + let system = "linux"; + let version = "1.0.0"; + let cmd = CommandLineInput::from_args(); - - // println!("port is {}", cmd.port); let (tx, rx) = std::sync::mpsc::channel(); diff --git a/src/lib.rs b/src/lib.rs index 62bbc7c..7eb5b1e 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,7 +19,7 @@ use tokio::net::UdpSocket; use tokio::sync::mpsc::{channel, Sender}; use tokio_util::sync::CancellationToken; use tracing::{debug, error}; -pub use utils::{CommandLine, CommandLineInput, mod_hostname}; +pub use utils::{CommandLine, CommandLineInput, mod_hostname, create_or_load_mac}; pub use config::{get_base_dir, set_base_dir}; diff --git a/src/network/async_main.rs b/src/network/async_main.rs index f001889..5c69a7b 100755 --- a/src/network/async_main.rs +++ b/src/network/async_main.rs @@ -1,4 +1,4 @@ -use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -7,7 +7,7 @@ use crate::network::ipv6::run_ipv6; use crate::network::{ get_edge, ping_to_sn, read_and_parse_packet, TunTapPacketHandler, }; -use crate::tcp::{init_tcp_conn, send_stun_request}; +use crate::tcp::{init_quic_conn, send_stun_request}; use crate::utils::{send_to_sock, CommandLine}; use crate::{ConnectionInfo}; use sdlan_sn_rs::peer::{SdlanSock}; @@ -37,41 +37,7 @@ pub async fn async_main( let (ipv6_network_restarter, rx) = channel(10); tokio::spawn(run_ipv6(edge, rx)); - // TODO: change the quic logic - tokio::spawn(async move { - loop { - let conn = edge.quic_endpoint.connect("192.168.0.1".parse().unwrap(), "www.punchnet.com").unwrap().await.unwrap(); - println!("连接成功!"); - - let (mut send, mut recv) = conn.open_bi().await.unwrap(); - - loop { - send.write_all(b"Hello QUIC!").await.unwrap(); - let mut buf = vec![0u8; 1024]; - if let Ok(size) = recv.read(&mut buf).await { - if let Some(size) = size { - println!("got data from server: {}", String::from_utf8_lossy(&buf[..size])) - } else { - println!("no data size found"); - } - } else { - println!("read failed"); - break; - } - - recv.read(&mut buf).await.unwrap(); - sleep(Duration::from_secs(11)).await; - } - - println!("hello"); - // conn.close(0u32.into(), b"donw"); - - edge.quic_endpoint.wait_idle().await; - } - }); - ////////////////// to here - - init_tcp_conn( + init_quic_conn( cancel_tcp, &args.tcp, // |msg| handle_tcp_message(msg), @@ -82,11 +48,6 @@ pub async fn async_main( Some(ipv6_network_restarter), ); - // tcp_conn.send("hello".as_bytes()).await; - // tokio::spawn(handle_tcp_message(tcp_conn.data_from_tcp)); - - // tcp_conn.send("".as_bytes()); - debug!("waiting for authorization..."); /* @@ -155,6 +116,17 @@ pub async fn async_main( Ok(()) } +async fn run_quic_loop( + edge: &Node, + quic_addr: &str, + pong_time: Arc, + start_stop_chan: Receiver, + connecting_chan: Sender, + ipv6_ntework_restarter: Option> +) { + +} + async fn run_edge_loop(eee: &'static Node, cancel: CancellationToken) { ping_to_sn().await; { diff --git a/src/network/node.rs b/src/network/node.rs index 9d804fc..e37f8c1 100755 --- a/src/network/node.rs +++ b/src/network/node.rs @@ -17,7 +17,7 @@ use crate::{ConnectionInfo, get_base_dir}; use crate::pb::{ encode_to_tcp_message, encode_to_udp_message, SdlEmpty, SdlStunProbe, SdlStunProbeReply, }; -use crate::tcp::{get_tcp_conn, NatType, PacketType, StunProbeAttr}; +use crate::tcp::{NatType, PacketType, StunProbeAttr, get_quic_write_conn}; use crate::utils::{Socket, create_or_load_mac}; use sdlan_sn_rs::peer::{IpSubnet, V6Info}; @@ -588,7 +588,7 @@ impl Node { let content = encode_to_tcp_message::(None, 0, PacketType::UnRegisterSuper as u8).unwrap(); - let conn = get_tcp_conn(); + let conn = get_quic_write_conn(); let _ = conn.send(content).await; Ok(()) diff --git a/src/network/packet.rs b/src/network/packet.rs index 2fb3368..4ba5640 100755 --- a/src/network/packet.rs +++ b/src/network/packet.rs @@ -1,6 +1,6 @@ use std::{net::SocketAddr, sync::atomic::Ordering, time::Duration}; -use crate::tcp::NatType; +use crate::tcp::{NatType, get_quic_write_conn}; use crate::{network::TunTapPacketHandler, utils::mac_to_string}; use crate::{ @@ -9,7 +9,7 @@ use crate::{ encode_to_tcp_message, encode_to_udp_message, SdlData, SdlEmpty, SdlPeerInfo, SdlQueryInfo, SdlRegister, SdlRegisterAck, SdlStunProbeReply, }, - tcp::{get_tcp_conn, PacketType}, + tcp::{PacketType}, utils::{send_to_sock, Socket}, }; use etherparse::Ethernet2Header; @@ -1143,7 +1143,7 @@ async fn send_query_peer(eee: &Node, dst_mac: Mac) -> Result<()> { error!("failed to encode query"); return Err(SDLanError::NormalError("encode query error")); }; - let tcp_conn = get_tcp_conn(); + let tcp_conn = get_quic_write_conn(); tcp_conn.send(content).await } @@ -1153,7 +1153,7 @@ pub async fn ping_to_sn() { return; }; debug!("ping to sn"); - let tcp_conn = get_tcp_conn(); + let tcp_conn = get_quic_write_conn(); if let Err(e) = tcp_conn.send(msg).await { error!("failed to ping to sn: {:?}", e); } diff --git a/src/tcp/mod.rs b/src/tcp/mod.rs index c1ad114..6584523 100755 --- a/src/tcp/mod.rs +++ b/src/tcp/mod.rs @@ -1,5 +1,7 @@ mod tcp_codec; -mod tcp_conn; +// mod tcp_conn; +mod quic; pub use tcp_codec::*; -pub use tcp_conn::*; +pub use quic::*; +// pub use tcp_conn::*; diff --git a/src/tcp/quic.rs b/src/tcp/quic.rs new file mode 100644 index 0000000..c184dcd --- /dev/null +++ b/src/tcp/quic.rs @@ -0,0 +1,637 @@ +use std::{net::IpAddr, sync::{Arc, OnceLock, atomic::{AtomicBool, AtomicU64, Ordering}}, time::Duration}; + +use futures_util::pin_mut; +use prost::Message; +use quinn::SendStream; +use sdlan_sn_rs::{config::AF_INET, peer::{SdlanSock, V6Info}, utils::{Result, SDLanError, get_current_timestamp, ip_to_string, rsa_decrypt}}; +use tokio::{io::BufReader, net::TcpStream, sync::mpsc::{Receiver, Sender, channel}}; + +use tokio_util::sync::CancellationToken; +use tracing::{debug, error}; + +use crate::{ConnectionInfo, ConnectionState, config::{NULL_MAC, TCP_PING_TIME}, get_edge, network::{Node, RegisterSuperFeedback, StartStopInfo, check_peer_registration_needed, handle_packet_peer_info}, pb::{SdlRegisterSuper, SdlRegisterSuperAck, SdlRegisterSuperNak, SdlSendRegisterEvent, encode_to_tcp_message}, tcp::{EventType, NakMsgCode, NatType, PacketType, SdlanTcp, read_a_packet, send_stun_request}}; + +static GLOBAL_QUIC_HANDLE: OnceLock = OnceLock::new(); + +#[derive(Debug)] +pub struct ReadWriterHandle { + connected: Arc, + send_to_tcp: Sender>, + // pub data_from_tcp: Receiver, +} + +impl ReadWriterHandle { + pub async fn send(&self, data: Vec) -> Result<()> { + if self.connected.load(Ordering::Relaxed) { + // connected, send to it + if let Err(e) = self.send_to_tcp.send(data).await { + error!("failed to send to send_to_tcp: {}", e.to_string()); + return Err(SDLanError::NormalError("failed to send")); + }; + debug!("tcp info sent"); + } else { + error!("tcp not connected, so not sending data"); + return Err(SDLanError::NormalError("not connected, so not sending")); + } + Ok(()) + } + + fn new<>( + cancel: CancellationToken, + addr: &str, + // on_connected: OnConnectedCallback<'a>, + // on_disconnected: T3, + // on_message: T2, + pong_time: Arc, + start_stop_chan: Receiver, + // cancel: CancellationToken, + connecting_chan: Option>, + ipv6_network_restarter: Option>, + ) -> Self { + let (send_to_tcp, to_tcp) = channel(20); + let (from_tcp, mut data_from_tcp) = channel(20); + + let connected: Arc = Arc::new(AtomicBool::new(false)); + + tokio::spawn(async move { + + }); + + let actor = ReadWriteActor::new( + cancel, + addr, + from_tcp, + connected.clone(), + pong_time, + connecting_chan, + ipv6_network_restarter, + ); + tokio::spawn(async move { + actor + .run( + true, + to_tcp, + // on_connected, + // on_disconnected, + start_stop_chan + ) + .await + }); + + tokio::spawn(async move { + loop { + if let Some(msg) = data_from_tcp.recv().await { + handle_tcp_message(msg).await; + } else { + error!("data from tcp exited"); + // eprintln!("data from tcp exited"); + return; + } + } + }); + + ReadWriterHandle { + connected, + send_to_tcp, + // data_from_tcp, + } + } +} + +pub fn get_quic_write_conn() -> &'static ReadWriterHandle { + match GLOBAL_QUIC_HANDLE.get() { + Some(v) => v, + None => panic!("should call init_tcp_conn first"), + } +} + + +async fn handle_tcp_message(msg: SdlanTcp) { + let edge = get_edge(); + + // let now = get_current_timestamp(); + // edge.tcp_pong.store(now, Ordering::Relaxed); + + debug!("got tcp message: {:?}", msg.packet_type); + match msg.packet_type { + PacketType::RegisterSuperACK => { + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 0, + message: "".to_owned(), + should_exit: false, + }, + ); + + let Ok(ack) = SdlRegisterSuperAck::decode(&msg.current_packet[..]) else { + error!("failed to decode REGISTER_SUPER_ACK"); + return; + }; + debug!("got register super ack: {:?}", ack); + edge.session_token.set(ack.session_token); + let Ok(aes) = rsa_decrypt(&edge.rsa_private, &ack.aes_key) else { + error!("failed to rsa decrypt aes key"); + return; + }; + /* + let Some(dev) = ack.dev_addr else { + error!("no dev_addr is specified"); + return; + }; + */ + + let ip = ip_to_string(&edge.device_config.get_ip()); + // debug!("aes key is {:?}, ip is {}/{}", aes, ip, dev.net_bit_len,); + println!("assigned ip: {}", ip); + let hostname = edge.hostname.read().unwrap().clone(); + // println!("network is: {}.{}", hostname, dev.network_domain); + /* + edge.device_config + .ip + .net_addr + .store(dev.net_addr, Ordering::Relaxed); + */ + if let Some(ref chan) = edge.connection_chan { + let _ = chan.send(ConnectionInfo::IPInfo(ip)).await; + } + /* + let mac = match dev.mac.try_into() { + Err(_) => NULL_MAC, + Ok(m) => m, + }; + */ + // *edge.device_config.mac.write().unwrap() = mac; + /* + edge.device_config + .ip + .net_bit_len + .store(dev.net_bit_len as u8, Ordering::Relaxed); + edge.network_id.store(dev.network_id, Ordering::Relaxed); + */ + // edge.device.reload_config(&edge.device_config, &dev.network_domain); + edge.device.reload_config(&edge.device_config, &edge.network_domain.read().unwrap().clone()); + + edge.set_authorized(true, aes); + send_stun_request(edge).await; + tokio::spawn(async { + let nattype = edge.probe_nat_type().await; + debug!("nat type is {:?}", nattype); + // println!("nat type is: {:?}", nattype); + }); + } + PacketType::RegisterSuperNAK => { + let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else { + error!("failed to decode REGISTER_SUPER_NAK"); + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 1, + message: "failed to decode REGISTER SUPER NAK".to_owned(), + should_exit: false, + }, + ); + return; + }; + + let Ok(error_code) = NakMsgCode::try_from(_nak.error_code as u8) else { + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 2, + message: "error_code not recognized".to_owned(), + should_exit: false, + }, + ); + return; + }; + match error_code { + NakMsgCode::InvalidToken => { + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 3, + message: "invalid token".to_owned(), + should_exit: true, + }, + ); + edge.stop().await; + } + NakMsgCode::NodeDisabled => { + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 4, + message: "Node is disabled".to_owned(), + should_exit: true, + }, + ); + edge.stop().await; + } + _other => { + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 0, + message: "".to_owned(), + should_exit: false, + }, + ); + } + } + /* + edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback { + result: 1, + message: "failed to decode REGISTER SUPER NAK".to_owned(), + }); + */ + edge.set_authorized(false, Vec::new()); + // std::process::exit(0); + } + PacketType::Command => { + if msg.current_packet.len() < 1 { + error!("malformed COMMAND received"); + return; + } + handle_tcp_command(edge, msg.current_packet[0], &msg.current_packet[1..]).await; + } + PacketType::Event => { + if msg.current_packet.len() < 1 { + error!("malformed EVENT received"); + return; + } + let Ok(event) = msg.current_packet[0].try_into() else { + error!("failed to parse event type"); + return; + }; + handle_tcp_event(edge, event, &msg.current_packet[1..]).await; + } + PacketType::PeerInfo => { + let _ = handle_packet_peer_info(edge, &msg.current_packet[..]).await; + } + PacketType::Pong => { + debug!("tcp pong received"); + let now = get_current_timestamp(); + edge.tcp_pong.store(now, Ordering::Relaxed); + } + other => { + debug!("tcp not handling {:?}", other); + } + } +} + + +async fn handle_tcp_command(_edge: &Node, _cmdtype: u8, _cmdprotobuf: &[u8]) {} + +async fn handle_tcp_event(edge: &'static Node, eventtype: EventType, eventprotobuf: &[u8]) { + match eventtype { + EventType::SendRegister => { + let Ok(reg) = SdlSendRegisterEvent::decode(eventprotobuf) else { + error!("failed to decode SendRegister Event"); + return; + }; + let v4 = reg.nat_ip.to_be_bytes(); + let mut v6_sock = None; + if let Some(v6_info) = reg.v6_info { + if let Ok(v6_bytes) = v6_info.v6.try_into() { + v6_sock = Some(V6Info { + port: v6_info.port as u16, + v6: v6_bytes, + }); + } + } + + let dst_mac = match reg.dst_mac.try_into() { + Ok(m) => m, + Err(_e) => NULL_MAC, + }; + + let remote_nat_byte = reg.nat_type as u8; + let remote_nat = match remote_nat_byte.try_into() { + Ok(t) => t, + Err(_) => NatType::NoNat, + }; + + check_peer_registration_needed( + edge, + false, + dst_mac, + // &v6_sock, + remote_nat, + &v6_sock, + &SdlanSock { + family: AF_INET, + port: reg.nat_port as u16, + v4, + v6: [0; 16], + }, + ) + .await; + } + other => { + debug!("unhandled event {:?}", other); + } + } +} + + +pub fn init_quic_conn( + cancel: CancellationToken, + addr: &str, + // on_connected: OnConnectedCallback<'a>, + // on_disconnected: T3, + // on_message: T2, + pong_time: Arc, + // cancel: CancellationToken, + start_stop_chan: Receiver, + connecting_chan: Option>, + ipv6_network_restarter: Option>, +) + // T2: Fn(SdlanTcp) -> F + Send + 'static, + // F: Future + Send, +{ + let tcp_handle = ReadWriterHandle::new( + cancel, + addr, + // on_connected, + // on_disconnected, + // on_message, + pong_time, + start_stop_chan, + connecting_chan, + ipv6_network_restarter, + ); + + GLOBAL_QUIC_HANDLE + .set(tcp_handle) + .expect("failed to set global tcp handle"); +} + +pub struct ReadWriteActor { + // actor接收的发送给tcp的接收端,由handle存放发送端 + // to_tcp: Receiver>, + remote: String, + connected: Arc, + pong_time: Arc, + // actor收到数据之后,发送给上层的发送端口,接收端由handle保存 + from_tcp: Sender, + _cancel: CancellationToken, + connecting_chan: Option>, + ipv6_network_restarter: Option>, +} + +impl ReadWriteActor { + pub fn new( + cancel: CancellationToken, + remote: &str, + from_tcp: Sender, + connected: Arc, + pong_time: Arc, + connecting_chan: Option>, + ipv6_network_restarter: Option>, + ) -> Self { + Self { + // to_tcp, + _cancel: cancel, + pong_time, + connected, + remote: remote.to_owned(), + from_tcp, + connecting_chan, + ipv6_network_restarter, + } + } + + pub async fn run<'a>( + &self, + keep_reconnect: bool, + mut to_tcp: Receiver>, + mut start_stop_chan: Receiver, + ) { + let edge = get_edge(); + + let mut started = false; + let mut start_pkt_id = None; + loop { + if let Some(ref connecting_chan) = self.connecting_chan { + let state = ConnectionInfo::ConnState(ConnectionState::NotConnected); + let _ = connecting_chan.send(state).await; + } + self.connected.store(false, Ordering::Relaxed); + if !started { + // println!("waiting for start"); + loop { + let start_or_stop = start_stop_chan.recv().await; + if let Some(m) = start_or_stop { + if m.is_start { + started = true; + start_pkt_id = m.pkt_id; + break; + } + } else { + // None, just return + return; + } + } + /* + while let Some(m) = start_stop_chan.recv().await { + println!("4"); + if m.is_start { + // println!("start received"); + started = true; + start_pkt_id = m.pkt_id; + break; + } else { + // println!("stop received"); + } + } + */ + debug!("start stop chan received: {}", started); + continue; + } + + if let Some(ref connecting_chan) = self.connecting_chan { + let state = ConnectionInfo::ConnState(ConnectionState::Connecting); + let _ = connecting_chan.send(state).await; + } + debug!("try connecting..."); + + let Ok(conn) = edge.quic_endpoint.connect(self.remote.parse().unwrap(), "") else { + self.connected.store(false, Ordering::Relaxed); + if keep_reconnect { + tokio::time::sleep(Duration::from_secs(3)).await; + continue; + } + return; + }; + let Ok(conn) = conn.await else { + self.connected.store(false, Ordering::Relaxed); + if keep_reconnect { + tokio::time::sleep(Duration::from_secs(3)).await; + continue; + } + return; + }; + + let local_ip = conn.local_ip(); + + let Ok((mut send, mut recv)) = conn.open_bi().await else { + self.connected.store(false, Ordering::Relaxed); + if keep_reconnect { + tokio::time::sleep(Duration::from_secs(3)).await; + continue; + } + return; + }; + + self.connected.store(true, Ordering::Relaxed); + debug!("connected"); + on_connected_callback(local_ip, &mut send, start_pkt_id.take()).await; + + if let Some(ref connecting_chan) = self.connecting_chan { + let state = ConnectionInfo::ConnState(ConnectionState::Connected); + let _ = connecting_chan.send(state).await; + } + if let Some(ref ipv6_restarter) = self.ipv6_network_restarter { + let _ = ipv6_restarter.send(true).await; + } + // stream.write("hello".as_bytes()).await; + // let (reader, mut write) = stream.into_split(); + + let read_from_tcp = async move { + let mut buffed_reader = BufReader::new(recv); + loop { + match read_a_packet(&mut buffed_reader).await { + Ok(packet) => { + debug!("got packet: {:?}", packet); + if let Err(_e) = self.from_tcp.send(packet).await { + error!("failed to receive a packet: {:?}", _e); + } + } + Err(e) => { + error!("failed to read a packet: {}, reconnecting...", e); + return; + } + } + } + }; + + let write_to_tcp = async { + while let Some(data) = to_tcp.recv().await { + match send.write(&data).await { + Ok(size) => { + debug!("{} bytes sent to tcp", size); + } + Err(e) => { + error!("failed to write to tcp: {}", e.to_string()); + return; + } + } + } + error!("to_tcp recv None"); + }; + + let check_pong = async { + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + + let connected = self.connected.load(Ordering::Relaxed); + let now = get_current_timestamp(); + if connected && now - self.pong_time.load(Ordering::Relaxed) > TCP_PING_TIME * 2 + { + // pong time expire, need to re-connect + error!("pong check expired"); + return; + } + } + }; + + let check_stop = async { + loop { + match start_stop_chan.recv().await { + Some(v) => { + if !v.is_start { + started = false; + return; + } + } + _other => { + // send chan is closed; + started = false; + return; + } + } + } + }; + + pin_mut!(read_from_tcp, write_to_tcp); + + tokio::select! { + _ = read_from_tcp => {}, + _ = write_to_tcp => {}, + _ = check_pong => {}, + _ = check_stop => {}, + } + on_disconnected_callback().await; + debug!("connect retrying"); + tokio::time::sleep(Duration::from_secs(1)).await; + debug!("disconnected"); + // future::select(read_from_tcp, write_to_tcp).await; + } + } +} + +async fn on_disconnected_callback() { + let edge = get_edge(); + edge.set_authorized(false, vec![]); +} + +async fn on_connected_callback(local_ip: Option, stream: &mut SendStream, pkt_id: Option) { + let edge = get_edge(); + // let installed_channel = install_channel.to_owned(); + + // let token = edge._token.lock().unwrap().clone(); + // let code = edge.network_code.lock().unwrap().clone(); + + // let edge = get_edge(); + // let edge = get_edge(); + // let token = args.token.clone(); + if let Some(ipaddr) = local_ip { + match ipaddr { + IpAddr::V4(v4) => { + let ip = v4.into(); + // println!("outer ip is {} => {}", v4, ip); + edge.outer_ip_v4.store(ip, Ordering::Relaxed); + } + _other => {} + } + } + let register_super = SdlRegisterSuper { + mac: Vec::from(edge.device_config.get_mac()), + pkt_id: edge.get_next_packet_id(), + network_id: edge.network_id.load(Ordering::Relaxed), + ip: edge.device_config.get_ip(), + mask_len: edge.device_config.get_net_bit() as u32, + access_token: edge.access_token.get(), + + // installed_channel, + client_id: edge.config.node_uuid.clone(), + pub_key: edge.rsa_pubkey.clone(), + hostname: edge.hostname.read().unwrap().clone(), + }; + // debug!("send register super: {:?}", register_super); + let packet_id = match pkt_id { + Some(id) => id, + None => edge.get_next_packet_id(), + }; + // let packet_id = edge.get_next_packet_id(); + let data = encode_to_tcp_message( + Some(register_super), + packet_id, + PacketType::RegisterSuper as u8, + ) + .unwrap(); + if let Err(e) = stream.write(&data).await { + error!("failed to write to tcp: {}", e.to_string()); + } +} \ No newline at end of file diff --git a/src/tcp/tcp_codec.rs b/src/tcp/tcp_codec.rs index e6297cc..b187fe1 100755 --- a/src/tcp/tcp_codec.rs +++ b/src/tcp/tcp_codec.rs @@ -1,10 +1,15 @@ +use std::sync::atomic::Ordering; + +use quinn::RecvStream; use tokio::{ io::{AsyncReadExt, BufReader}, net::tcp::OwnedReadHalf, }; use num_enum::TryFromPrimitive; -use tracing::debug; +use tracing::{debug, error}; + +use crate::{network::Node, pb::{SdlStunRequest, Sdlv6Info, encode_to_udp_message}, utils::send_to_sock}; #[derive(Debug)] pub struct SdlanTcp { @@ -92,8 +97,39 @@ pub enum PacketType { Data = 0xff, } +pub async fn send_stun_request(eee: &Node) { + let sdl_v6_info = match *eee.ipv6.read().unwrap() { + Some(ref l) => Some(Sdlv6Info { + port: l.port as u32, + v6: Vec::from(l.v6), + }), + None => None, + }; + let req = SdlStunRequest { + session_token: Vec::from(eee.session_token.get()), + cookie: 0, + client_id: eee.config.node_uuid.clone(), + network_id: eee.network_id.load(Ordering::Relaxed), + ip: eee.device_config.get_ip(), + mac: Vec::from(eee.device_config.get_mac()), + nat_type: eee.get_nat_type() as u32, + v6_info: sdl_v6_info, + }; + debug!("stun request: {:?}", req); + let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap(); + if let Err(e) = send_to_sock( + eee, + &msg, + &eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize], + ) + .await + { + error!("failed to send to sock: {:?}", e); + } +} + pub async fn read_a_packet( - reader: &mut BufReader, + reader: &mut BufReader, ) -> Result { debug!("read a packet"); let size = reader.read_u16().await?; diff --git a/src/tcp/tcp_conn.rs b/src/tcp/tcp_conn.rs index 26962d4..612e9b5 100755 --- a/src/tcp/tcp_conn.rs +++ b/src/tcp/tcp_conn.rs @@ -35,264 +35,8 @@ use super::tcp_codec::SdlanTcp; static GLOBAL_TCP_HANDLE: OnceCell = OnceCell::new(); -async fn handle_tcp_message(msg: SdlanTcp) { - let edge = get_edge(); - - // let now = get_current_timestamp(); - // edge.tcp_pong.store(now, Ordering::Relaxed); - - debug!("got tcp message: {:?}", msg.packet_type); - match msg.packet_type { - PacketType::RegisterSuperACK => { - edge.send_register_super_feedback( - msg._packet_id, - RegisterSuperFeedback { - result: 0, - message: "".to_owned(), - should_exit: false, - }, - ); - - let Ok(ack) = SdlRegisterSuperAck::decode(&msg.current_packet[..]) else { - error!("failed to decode REGISTER_SUPER_ACK"); - return; - }; - debug!("got register super ack: {:?}", ack); - edge.session_token.set(ack.session_token); - let Ok(aes) = rsa_decrypt(&edge.rsa_private, &ack.aes_key) else { - error!("failed to rsa decrypt aes key"); - return; - }; - /* - let Some(dev) = ack.dev_addr else { - error!("no dev_addr is specified"); - return; - }; - */ - - let ip = ip_to_string(&edge.device_config.get_ip()); - // debug!("aes key is {:?}, ip is {}/{}", aes, ip, dev.net_bit_len,); - println!("assigned ip: {}", ip); - let hostname = edge.hostname.read().unwrap().clone(); - // println!("network is: {}.{}", hostname, dev.network_domain); - /* - edge.device_config - .ip - .net_addr - .store(dev.net_addr, Ordering::Relaxed); - */ - if let Some(ref chan) = edge.connection_chan { - let _ = chan.send(ConnectionInfo::IPInfo(ip)).await; - } - /* - let mac = match dev.mac.try_into() { - Err(_) => NULL_MAC, - Ok(m) => m, - }; - */ - // *edge.device_config.mac.write().unwrap() = mac; - /* - edge.device_config - .ip - .net_bit_len - .store(dev.net_bit_len as u8, Ordering::Relaxed); - edge.network_id.store(dev.network_id, Ordering::Relaxed); - */ - // edge.device.reload_config(&edge.device_config, &dev.network_domain); - edge.device.reload_config(&edge.device_config, &edge.network_domain.read().unwrap().clone()); - - edge.set_authorized(true, aes); - send_stun_request(edge).await; - tokio::spawn(async { - let nattype = edge.probe_nat_type().await; - debug!("nat type is {:?}", nattype); - // println!("nat type is: {:?}", nattype); - }); - } - PacketType::RegisterSuperNAK => { - let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else { - error!("failed to decode REGISTER_SUPER_NAK"); - edge.send_register_super_feedback( - msg._packet_id, - RegisterSuperFeedback { - result: 1, - message: "failed to decode REGISTER SUPER NAK".to_owned(), - should_exit: false, - }, - ); - return; - }; - - let Ok(error_code) = NakMsgCode::try_from(_nak.error_code as u8) else { - edge.send_register_super_feedback( - msg._packet_id, - RegisterSuperFeedback { - result: 2, - message: "error_code not recognized".to_owned(), - should_exit: false, - }, - ); - return; - }; - match error_code { - NakMsgCode::InvalidToken => { - edge.send_register_super_feedback( - msg._packet_id, - RegisterSuperFeedback { - result: 3, - message: "invalid token".to_owned(), - should_exit: true, - }, - ); - edge.stop().await; - } - NakMsgCode::NodeDisabled => { - edge.send_register_super_feedback( - msg._packet_id, - RegisterSuperFeedback { - result: 4, - message: "Node is disabled".to_owned(), - should_exit: true, - }, - ); - edge.stop().await; - } - _other => { - edge.send_register_super_feedback( - msg._packet_id, - RegisterSuperFeedback { - result: 0, - message: "".to_owned(), - should_exit: false, - }, - ); - } - } - /* - edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback { - result: 1, - message: "failed to decode REGISTER SUPER NAK".to_owned(), - }); - */ - edge.set_authorized(false, Vec::new()); - // std::process::exit(0); - } - PacketType::Command => { - if msg.current_packet.len() < 1 { - error!("malformed COMMAND received"); - return; - } - handle_tcp_command(edge, msg.current_packet[0], &msg.current_packet[1..]).await; - } - PacketType::Event => { - if msg.current_packet.len() < 1 { - error!("malformed EVENT received"); - return; - } - let Ok(event) = msg.current_packet[0].try_into() else { - error!("failed to parse event type"); - return; - }; - handle_tcp_event(edge, event, &msg.current_packet[1..]).await; - } - PacketType::PeerInfo => { - let _ = handle_packet_peer_info(edge, &msg.current_packet[..]).await; - } - PacketType::Pong => { - debug!("tcp pong received"); - let now = get_current_timestamp(); - edge.tcp_pong.store(now, Ordering::Relaxed); - } - other => { - debug!("tcp not handling {:?}", other); - } - } -} - -async fn handle_tcp_command(_edge: &Node, _cmdtype: u8, _cmdprotobuf: &[u8]) {} - -async fn handle_tcp_event(edge: &'static Node, eventtype: EventType, eventprotobuf: &[u8]) { - match eventtype { - EventType::SendRegister => { - let Ok(reg) = SdlSendRegisterEvent::decode(eventprotobuf) else { - error!("failed to decode SendRegister Event"); - return; - }; - let v4 = reg.nat_ip.to_be_bytes(); - let mut v6_sock = None; - if let Some(v6_info) = reg.v6_info { - if let Ok(v6_bytes) = v6_info.v6.try_into() { - v6_sock = Some(V6Info { - port: v6_info.port as u16, - v6: v6_bytes, - }); - } - } - - let dst_mac = match reg.dst_mac.try_into() { - Ok(m) => m, - Err(_e) => NULL_MAC, - }; - - let remote_nat_byte = reg.nat_type as u8; - let remote_nat = match remote_nat_byte.try_into() { - Ok(t) => t, - Err(_) => NatType::NoNat, - }; - - check_peer_registration_needed( - edge, - false, - dst_mac, - // &v6_sock, - remote_nat, - &v6_sock, - &SdlanSock { - family: AF_INET, - port: reg.nat_port as u16, - v4, - v6: [0; 16], - }, - ) - .await; - } - other => { - debug!("unhandled event {:?}", other); - } - } -} -pub async fn send_stun_request(eee: &Node) { - let sdl_v6_info = match *eee.ipv6.read().unwrap() { - Some(ref l) => Some(Sdlv6Info { - port: l.port as u32, - v6: Vec::from(l.v6), - }), - None => None, - }; - let req = SdlStunRequest { - session_token: Vec::from(eee.session_token.get()), - cookie: 0, - client_id: eee.config.node_uuid.clone(), - network_id: eee.network_id.load(Ordering::Relaxed), - ip: eee.device_config.get_ip(), - mac: Vec::from(eee.device_config.get_mac()), - nat_type: eee.get_nat_type() as u32, - v6_info: sdl_v6_info, - }; - debug!("stun request: {:?}", req); - let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap(); - if let Err(e) = send_to_sock( - eee, - &msg, - &eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize], - ) - .await - { - error!("failed to send to sock: {:?}", e); - } -} async fn on_disconnected_callback() { let edge = get_edge(); @@ -349,197 +93,6 @@ async fn on_connected_callback<'a>(stream: &'a mut tokio::net::TcpStream, pkt_id } } -pub struct ReadWriteActor { - // actor接收的发送给tcp的接收端,由handle存放发送端 - // to_tcp: Receiver>, - remote: String, - connected: Arc, - pong_time: Arc, - // actor收到数据之后,发送给上层的发送端口,接收端由handle保存 - from_tcp: Sender, - _cancel: CancellationToken, - connecting_chan: Option>, - ipv6_network_restarter: Option>, -} - -impl ReadWriteActor { - pub fn new( - cancel: CancellationToken, - remote: &str, - from_tcp: Sender, - connected: Arc, - pong_time: Arc, - connecting_chan: Option>, - ipv6_network_restarter: Option>, - ) -> Self { - Self { - // to_tcp, - _cancel: cancel, - pong_time, - connected, - remote: remote.to_owned(), - from_tcp, - connecting_chan, - ipv6_network_restarter, - } - } - - pub async fn run<'a>( - &self, - keep_reconnect: bool, - mut to_tcp: Receiver>, - mut start_stop_chan: Receiver, - ) { - let mut started = false; - let mut start_pkt_id = None; - loop { - if let Some(ref connecting_chan) = self.connecting_chan { - let state = ConnectionInfo::ConnState(ConnectionState::NotConnected); - let _ = connecting_chan.send(state).await; - } - self.connected.store(false, Ordering::Relaxed); - if !started { - // println!("waiting for start"); - loop { - let start_or_stop = start_stop_chan.recv().await; - if let Some(m) = start_or_stop { - if m.is_start { - started = true; - start_pkt_id = m.pkt_id; - break; - } - } else { - // None, just return - return; - } - } - /* - while let Some(m) = start_stop_chan.recv().await { - println!("4"); - if m.is_start { - // println!("start received"); - started = true; - start_pkt_id = m.pkt_id; - break; - } else { - // println!("stop received"); - } - } - */ - debug!("start stop chan received: {}", started); - continue; - } - - if let Some(ref connecting_chan) = self.connecting_chan { - let state = ConnectionInfo::ConnState(ConnectionState::Connecting); - let _ = connecting_chan.send(state).await; - } - debug!("try connecting..."); - - let Ok(mut stream) = TcpStream::connect(&self.remote).await else { - self.connected.store(false, Ordering::Relaxed); - if keep_reconnect { - tokio::time::sleep(Duration::from_secs(3)).await; - continue; - } - return; - }; - self.connected.store(true, Ordering::Relaxed); - debug!("connected"); - on_connected_callback(&mut stream, start_pkt_id.take()).await; - - if let Some(ref connecting_chan) = self.connecting_chan { - let state = ConnectionInfo::ConnState(ConnectionState::Connected); - let _ = connecting_chan.send(state).await; - } - if let Some(ref ipv6_restarter) = self.ipv6_network_restarter { - let _ = ipv6_restarter.send(true).await; - } - // stream.write("hello".as_bytes()).await; - let (reader, mut write) = stream.into_split(); - - let read_from_tcp = async move { - let mut buffed_reader = BufReader::new(reader); - loop { - match read_a_packet(&mut buffed_reader).await { - Ok(packet) => { - debug!("got packet: {:?}", packet); - if let Err(_e) = self.from_tcp.send(packet).await { - error!("failed to receive a packet: {:?}", _e); - } - } - Err(e) => { - error!("failed to read a packet: {}, reconnecting...", e); - return; - } - } - } - }; - - let write_to_tcp = async { - while let Some(data) = to_tcp.recv().await { - match write.write(&data).await { - Ok(size) => { - debug!("{} bytes sent to tcp", size); - } - Err(e) => { - error!("failed to write to tcp: {}", e.to_string()); - return; - } - } - } - error!("to_tcp recv None"); - }; - - let check_pong = async { - loop { - tokio::time::sleep(Duration::from_secs(10)).await; - - let connected = self.connected.load(Ordering::Relaxed); - let now = get_current_timestamp(); - if connected && now - self.pong_time.load(Ordering::Relaxed) > TCP_PING_TIME * 2 - { - // pong time expire, need to re-connect - error!("pong check expired"); - return; - } - } - }; - - let check_stop = async { - loop { - match start_stop_chan.recv().await { - Some(v) => { - if !v.is_start { - started = false; - return; - } - } - _other => { - // send chan is closed; - started = false; - return; - } - } - } - }; - - pin_mut!(read_from_tcp, write_to_tcp); - - tokio::select! { - _ = read_from_tcp => {}, - _ = write_to_tcp => {}, - _ = check_pong => {}, - _ = check_stop => {}, - } - on_disconnected_callback().await; - debug!("connect retrying"); - tokio::time::sleep(Duration::from_secs(1)).await; - debug!("disconnected"); - // future::select(read_from_tcp, write_to_tcp).await; - } - } -} #[derive(Debug)] pub struct ReadWriterHandle { @@ -626,38 +179,6 @@ impl ReadWriterHandle { } } -pub fn init_tcp_conn( - cancel: CancellationToken, - addr: &str, - // on_connected: OnConnectedCallback<'a>, - // on_disconnected: T3, - // on_message: T2, - pong_time: Arc, - // cancel: CancellationToken, - start_stop_chan: Receiver, - connecting_chan: Option>, - ipv6_network_restarter: Option>, -) - // T2: Fn(SdlanTcp) -> F + Send + 'static, - // F: Future + Send, -{ - - let tcp_handle = ReadWriterHandle::new( - cancel, - addr, - // on_connected, - // on_disconnected, - // on_message, - pong_time, - start_stop_chan, - connecting_chan, - ipv6_network_restarter, - ); - - GLOBAL_TCP_HANDLE - .set(tcp_handle) - .expect("failed to set global tcp handle"); -} pub fn get_tcp_conn() -> &'static ReadWriterHandle { match GLOBAL_TCP_HANDLE.get() { diff --git a/src/utils/command.rs b/src/utils/command.rs index 3a8a0e4..d162c46 100755 --- a/src/utils/command.rs +++ b/src/utils/command.rs @@ -1,16 +1,52 @@ use structopt::StructOpt; +use clap::{Parser, Subcommand, Args}; + +#[derive(Parser)] +pub struct CommandLineInput2 { + #[command(subcommand)] + cmd: Commands, +} + +#[derive(Subcommand)] +pub enum Commands { + Login(UserLogin), + TokenLogin(TokenLogin), + + /// after login, we can use start to + /// connect to the remote + Start, + + /// exits the + Stop, +} + +#[derive(Args)] +pub struct UserLogin { + #[arg(short, long)] + username: String, + + // #[arg(long, env="APP_SECRET", hide_env_values = true, hide=true)] + // password: String, +} + +#[derive(Args)] +pub struct TokenLogin {} + #[derive(StructOpt, Debug)] pub struct CommandLineInput { - #[structopt(long = "token", default_value = "", help="specify a token")] + #[structopt(short="u", long = "user", default_value = "", help="specify a token")] + pub user: String, + + #[structopt(short="P", long = "pass", default_value = "", help="specify a token")] + pub pass: String, + + #[structopt(short="t", long = "token", default_value = "", help="specify a token")] pub token: String, #[structopt(short = "p", long = "port", default_value = "0", help="which port to use")] pub port: u16, - #[structopt(long = "code", default_value = "", help="specify a network code")] - pub network_code: String, - #[structopt(short= "h", long = "hostname", default_value="", help="specify the hostname")] pub hostname: String, }