changed tcp to quic, can remove tcp_conn.rs safely
This commit is contained in:
parent
9d52223f84
commit
8645b65534
30
Cargo.lock
generated
30
Cargo.lock
generated
@ -280,7 +280,7 @@ dependencies = [
|
||||
"anstyle",
|
||||
"ar",
|
||||
"cargo_toml",
|
||||
"clap 4.5.54",
|
||||
"clap 4.5.60",
|
||||
"elf",
|
||||
"env_logger",
|
||||
"glob",
|
||||
@ -387,18 +387,19 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "4.5.54"
|
||||
version = "4.5.60"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c6e6ff9dcd79cff5cd969a17a545d79e84ab086e444102a591e288a8aa3ce394"
|
||||
checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a"
|
||||
dependencies = [
|
||||
"clap_builder",
|
||||
"clap_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_builder"
|
||||
version = "4.5.54"
|
||||
version = "4.5.60"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fa42cf4d2b7a41bc8f663a7cab4031ebafa1bf3875705bfaf8466dc60ab52c00"
|
||||
checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876"
|
||||
dependencies = [
|
||||
"anstream",
|
||||
"anstyle",
|
||||
@ -407,10 +408,22 @@ dependencies = [
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_lex"
|
||||
version = "0.7.7"
|
||||
name = "clap_derive"
|
||||
version = "4.5.55"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c3e64b0cc0439b12df2fa678eae89a1c56a529fd067a9115f7827f1fffd22b32"
|
||||
checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5"
|
||||
dependencies = [
|
||||
"heck 0.5.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.114",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_lex"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831"
|
||||
|
||||
[[package]]
|
||||
name = "cmake"
|
||||
@ -1991,6 +2004,7 @@ version = "1.0.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"cargo-deb",
|
||||
"clap 4.5.60",
|
||||
"crc",
|
||||
"crc32fast",
|
||||
"dashmap 6.1.0",
|
||||
|
||||
@ -32,6 +32,7 @@ bytes = "1.11.1"
|
||||
quinn = "0.11.9"
|
||||
rustls = "0.23.37"
|
||||
rustls-pemfile = "2.2.0"
|
||||
clap = { version = "4.5.60", features = ["derive"] }
|
||||
# rolling-file = { path = "../rolling-file" }
|
||||
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
|
||||
@ -40,7 +40,7 @@ pub struct LoginData {
|
||||
pub audit: u32,
|
||||
pub network_id: u32,
|
||||
pub network_name: String,
|
||||
pub network_domain: String,
|
||||
// pub network_domain: String,
|
||||
pub exit_node: Vec<ExitNode>,
|
||||
}
|
||||
|
||||
@ -140,6 +140,9 @@ pub struct ConnectResponse {
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct ConnectData {
|
||||
pub ip: String,
|
||||
pub mask_len: u8,
|
||||
pub hostname: String,
|
||||
pub identity_id: u32,
|
||||
pub resource_list: Vec<ResourceList>,
|
||||
pub node_list: Vec<NodeList>,
|
||||
// pub acl: Vec<ACL>,
|
||||
@ -147,8 +150,10 @@ pub struct ConnectData {
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct ResourceList {
|
||||
pub id: i32,
|
||||
pub name: String,
|
||||
pub url: String,
|
||||
pub connection_status: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
@ -189,4 +194,44 @@ pub async fn disconnect(
|
||||
access_token,
|
||||
};
|
||||
post_with_data(&url, data).await
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct GetResourceRequest<'a> {
|
||||
client_id: &'a str,
|
||||
access_token: &'a str,
|
||||
id: i32,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct GetResourceResponse {
|
||||
pub code: i32,
|
||||
pub message: String,
|
||||
pub data: Option<ResourceData>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct ResourceData {
|
||||
pub id: i32,
|
||||
pub name: String,
|
||||
pub ip: String,
|
||||
pub system: String,
|
||||
pub connection_status: String,
|
||||
pub resource_list: Vec<ResourceList>,
|
||||
}
|
||||
|
||||
pub async fn get_node_resource(
|
||||
url_prefix: &str,
|
||||
client_id: &str,
|
||||
access_token: &str,
|
||||
id: i32,
|
||||
) -> Result<GetResourceResponse> {
|
||||
let data = GetResourceRequest {
|
||||
client_id,
|
||||
access_token,
|
||||
id,
|
||||
};
|
||||
|
||||
let url = format!("{}/get_node_resource", url_prefix);
|
||||
post_with_data(&url, data).await
|
||||
}
|
||||
@ -1,5 +1,6 @@
|
||||
mod api;
|
||||
|
||||
use punchnet::create_or_load_mac;
|
||||
use punchnet::get_base_dir;
|
||||
use punchnet::get_edge;
|
||||
use punchnet::mod_hostname;
|
||||
@ -10,11 +11,15 @@ use punchnet::CommandLine;
|
||||
use punchnet::CommandLineInput;
|
||||
use sdlan_sn_rs::log;
|
||||
|
||||
use sdlan_sn_rs::utils::create_or_load_uuid;
|
||||
use tracing::error;
|
||||
|
||||
use std::net::ToSocketAddrs;
|
||||
use structopt::StructOpt;
|
||||
|
||||
use crate::api::TEST_PREFIX;
|
||||
use crate::api::login_with_token;
|
||||
|
||||
|
||||
|
||||
#[tokio::main]
|
||||
@ -22,9 +27,14 @@ async fn main() {
|
||||
set_base_dir("/usr/local/punchnet");
|
||||
let _guard = log::init_log(&format!("{}/.output", get_base_dir()));
|
||||
|
||||
let client_id = create_or_load_uuid(&format!("{}/.id", get_base_dir()), None).unwrap();
|
||||
let token = "49722584273728716817720074439183";
|
||||
|
||||
let mac = create_or_load_mac();
|
||||
let system = "linux";
|
||||
let version = "1.0.0";
|
||||
|
||||
let cmd = CommandLineInput::from_args();
|
||||
|
||||
|
||||
// println!("port is {}", cmd.port);
|
||||
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
|
||||
@ -19,7 +19,7 @@ use tokio::net::UdpSocket;
|
||||
use tokio::sync::mpsc::{channel, Sender};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error};
|
||||
pub use utils::{CommandLine, CommandLineInput, mod_hostname};
|
||||
pub use utils::{CommandLine, CommandLineInput, mod_hostname, create_or_load_mac};
|
||||
|
||||
pub use config::{get_base_dir, set_base_dir};
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@ -7,7 +7,7 @@ use crate::network::ipv6::run_ipv6;
|
||||
use crate::network::{
|
||||
get_edge, ping_to_sn, read_and_parse_packet, TunTapPacketHandler,
|
||||
};
|
||||
use crate::tcp::{init_tcp_conn, send_stun_request};
|
||||
use crate::tcp::{init_quic_conn, send_stun_request};
|
||||
use crate::utils::{send_to_sock, CommandLine};
|
||||
use crate::{ConnectionInfo};
|
||||
use sdlan_sn_rs::peer::{SdlanSock};
|
||||
@ -37,41 +37,7 @@ pub async fn async_main(
|
||||
let (ipv6_network_restarter, rx) = channel(10);
|
||||
tokio::spawn(run_ipv6(edge, rx));
|
||||
|
||||
// TODO: change the quic logic
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let conn = edge.quic_endpoint.connect("192.168.0.1".parse().unwrap(), "www.punchnet.com").unwrap().await.unwrap();
|
||||
println!("连接成功!");
|
||||
|
||||
let (mut send, mut recv) = conn.open_bi().await.unwrap();
|
||||
|
||||
loop {
|
||||
send.write_all(b"Hello QUIC!").await.unwrap();
|
||||
let mut buf = vec![0u8; 1024];
|
||||
if let Ok(size) = recv.read(&mut buf).await {
|
||||
if let Some(size) = size {
|
||||
println!("got data from server: {}", String::from_utf8_lossy(&buf[..size]))
|
||||
} else {
|
||||
println!("no data size found");
|
||||
}
|
||||
} else {
|
||||
println!("read failed");
|
||||
break;
|
||||
}
|
||||
|
||||
recv.read(&mut buf).await.unwrap();
|
||||
sleep(Duration::from_secs(11)).await;
|
||||
}
|
||||
|
||||
println!("hello");
|
||||
// conn.close(0u32.into(), b"donw");
|
||||
|
||||
edge.quic_endpoint.wait_idle().await;
|
||||
}
|
||||
});
|
||||
////////////////// to here
|
||||
|
||||
init_tcp_conn(
|
||||
init_quic_conn(
|
||||
cancel_tcp,
|
||||
&args.tcp,
|
||||
// |msg| handle_tcp_message(msg),
|
||||
@ -82,11 +48,6 @@ pub async fn async_main(
|
||||
Some(ipv6_network_restarter),
|
||||
);
|
||||
|
||||
// tcp_conn.send("hello".as_bytes()).await;
|
||||
// tokio::spawn(handle_tcp_message(tcp_conn.data_from_tcp));
|
||||
|
||||
// tcp_conn.send("".as_bytes());
|
||||
|
||||
debug!("waiting for authorization...");
|
||||
|
||||
/*
|
||||
@ -155,6 +116,17 @@ pub async fn async_main(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_quic_loop(
|
||||
edge: &Node,
|
||||
quic_addr: &str,
|
||||
pong_time: Arc<AtomicU64>,
|
||||
start_stop_chan: Receiver<StartStopInfo>,
|
||||
connecting_chan: Sender<ConnectionInfo>,
|
||||
ipv6_ntework_restarter: Option<Sender<bool>>
|
||||
) {
|
||||
|
||||
}
|
||||
|
||||
async fn run_edge_loop(eee: &'static Node, cancel: CancellationToken) {
|
||||
ping_to_sn().await;
|
||||
{
|
||||
|
||||
@ -17,7 +17,7 @@ use crate::{ConnectionInfo, get_base_dir};
|
||||
use crate::pb::{
|
||||
encode_to_tcp_message, encode_to_udp_message, SdlEmpty, SdlStunProbe, SdlStunProbeReply,
|
||||
};
|
||||
use crate::tcp::{get_tcp_conn, NatType, PacketType, StunProbeAttr};
|
||||
use crate::tcp::{NatType, PacketType, StunProbeAttr, get_quic_write_conn};
|
||||
use crate::utils::{Socket, create_or_load_mac};
|
||||
|
||||
use sdlan_sn_rs::peer::{IpSubnet, V6Info};
|
||||
@ -588,7 +588,7 @@ impl Node {
|
||||
let content =
|
||||
encode_to_tcp_message::<SdlEmpty>(None, 0, PacketType::UnRegisterSuper as u8).unwrap();
|
||||
|
||||
let conn = get_tcp_conn();
|
||||
let conn = get_quic_write_conn();
|
||||
let _ = conn.send(content).await;
|
||||
|
||||
Ok(())
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
use std::{net::SocketAddr, sync::atomic::Ordering, time::Duration};
|
||||
|
||||
use crate::tcp::NatType;
|
||||
use crate::tcp::{NatType, get_quic_write_conn};
|
||||
use crate::{network::TunTapPacketHandler, utils::mac_to_string};
|
||||
|
||||
use crate::{
|
||||
@ -9,7 +9,7 @@ use crate::{
|
||||
encode_to_tcp_message, encode_to_udp_message, SdlData, SdlEmpty, SdlPeerInfo, SdlQueryInfo,
|
||||
SdlRegister, SdlRegisterAck, SdlStunProbeReply,
|
||||
},
|
||||
tcp::{get_tcp_conn, PacketType},
|
||||
tcp::{PacketType},
|
||||
utils::{send_to_sock, Socket},
|
||||
};
|
||||
use etherparse::Ethernet2Header;
|
||||
@ -1143,7 +1143,7 @@ async fn send_query_peer(eee: &Node, dst_mac: Mac) -> Result<()> {
|
||||
error!("failed to encode query");
|
||||
return Err(SDLanError::NormalError("encode query error"));
|
||||
};
|
||||
let tcp_conn = get_tcp_conn();
|
||||
let tcp_conn = get_quic_write_conn();
|
||||
tcp_conn.send(content).await
|
||||
}
|
||||
|
||||
@ -1153,7 +1153,7 @@ pub async fn ping_to_sn() {
|
||||
return;
|
||||
};
|
||||
debug!("ping to sn");
|
||||
let tcp_conn = get_tcp_conn();
|
||||
let tcp_conn = get_quic_write_conn();
|
||||
if let Err(e) = tcp_conn.send(msg).await {
|
||||
error!("failed to ping to sn: {:?}", e);
|
||||
}
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
mod tcp_codec;
|
||||
mod tcp_conn;
|
||||
// mod tcp_conn;
|
||||
mod quic;
|
||||
|
||||
pub use tcp_codec::*;
|
||||
pub use tcp_conn::*;
|
||||
pub use quic::*;
|
||||
// pub use tcp_conn::*;
|
||||
|
||||
637
src/tcp/quic.rs
Normal file
637
src/tcp/quic.rs
Normal file
@ -0,0 +1,637 @@
|
||||
use std::{net::IpAddr, sync::{Arc, OnceLock, atomic::{AtomicBool, AtomicU64, Ordering}}, time::Duration};
|
||||
|
||||
use futures_util::pin_mut;
|
||||
use prost::Message;
|
||||
use quinn::SendStream;
|
||||
use sdlan_sn_rs::{config::AF_INET, peer::{SdlanSock, V6Info}, utils::{Result, SDLanError, get_current_timestamp, ip_to_string, rsa_decrypt}};
|
||||
use tokio::{io::BufReader, net::TcpStream, sync::mpsc::{Receiver, Sender, channel}};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error};
|
||||
|
||||
use crate::{ConnectionInfo, ConnectionState, config::{NULL_MAC, TCP_PING_TIME}, get_edge, network::{Node, RegisterSuperFeedback, StartStopInfo, check_peer_registration_needed, handle_packet_peer_info}, pb::{SdlRegisterSuper, SdlRegisterSuperAck, SdlRegisterSuperNak, SdlSendRegisterEvent, encode_to_tcp_message}, tcp::{EventType, NakMsgCode, NatType, PacketType, SdlanTcp, read_a_packet, send_stun_request}};
|
||||
|
||||
static GLOBAL_QUIC_HANDLE: OnceLock<ReadWriterHandle> = OnceLock::new();
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ReadWriterHandle {
|
||||
connected: Arc<AtomicBool>,
|
||||
send_to_tcp: Sender<Vec<u8>>,
|
||||
// pub data_from_tcp: Receiver<SdlanTcp>,
|
||||
}
|
||||
|
||||
impl ReadWriterHandle {
|
||||
pub async fn send(&self, data: Vec<u8>) -> Result<()> {
|
||||
if self.connected.load(Ordering::Relaxed) {
|
||||
// connected, send to it
|
||||
if let Err(e) = self.send_to_tcp.send(data).await {
|
||||
error!("failed to send to send_to_tcp: {}", e.to_string());
|
||||
return Err(SDLanError::NormalError("failed to send"));
|
||||
};
|
||||
debug!("tcp info sent");
|
||||
} else {
|
||||
error!("tcp not connected, so not sending data");
|
||||
return Err(SDLanError::NormalError("not connected, so not sending"));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn new<>(
|
||||
cancel: CancellationToken,
|
||||
addr: &str,
|
||||
// on_connected: OnConnectedCallback<'a>,
|
||||
// on_disconnected: T3,
|
||||
// on_message: T2,
|
||||
pong_time: Arc<AtomicU64>,
|
||||
start_stop_chan: Receiver<StartStopInfo>,
|
||||
// cancel: CancellationToken,
|
||||
connecting_chan: Option<Sender<ConnectionInfo>>,
|
||||
ipv6_network_restarter: Option<Sender<bool>>,
|
||||
) -> Self {
|
||||
let (send_to_tcp, to_tcp) = channel(20);
|
||||
let (from_tcp, mut data_from_tcp) = channel(20);
|
||||
|
||||
let connected: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
|
||||
|
||||
tokio::spawn(async move {
|
||||
|
||||
});
|
||||
|
||||
let actor = ReadWriteActor::new(
|
||||
cancel,
|
||||
addr,
|
||||
from_tcp,
|
||||
connected.clone(),
|
||||
pong_time,
|
||||
connecting_chan,
|
||||
ipv6_network_restarter,
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
actor
|
||||
.run(
|
||||
true,
|
||||
to_tcp,
|
||||
// on_connected,
|
||||
// on_disconnected,
|
||||
start_stop_chan
|
||||
)
|
||||
.await
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
if let Some(msg) = data_from_tcp.recv().await {
|
||||
handle_tcp_message(msg).await;
|
||||
} else {
|
||||
error!("data from tcp exited");
|
||||
// eprintln!("data from tcp exited");
|
||||
return;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
ReadWriterHandle {
|
||||
connected,
|
||||
send_to_tcp,
|
||||
// data_from_tcp,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_quic_write_conn() -> &'static ReadWriterHandle {
|
||||
match GLOBAL_QUIC_HANDLE.get() {
|
||||
Some(v) => v,
|
||||
None => panic!("should call init_tcp_conn first"),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
async fn handle_tcp_message(msg: SdlanTcp) {
|
||||
let edge = get_edge();
|
||||
|
||||
// let now = get_current_timestamp();
|
||||
// edge.tcp_pong.store(now, Ordering::Relaxed);
|
||||
|
||||
debug!("got tcp message: {:?}", msg.packet_type);
|
||||
match msg.packet_type {
|
||||
PacketType::RegisterSuperACK => {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 0,
|
||||
message: "".to_owned(),
|
||||
should_exit: false,
|
||||
},
|
||||
);
|
||||
|
||||
let Ok(ack) = SdlRegisterSuperAck::decode(&msg.current_packet[..]) else {
|
||||
error!("failed to decode REGISTER_SUPER_ACK");
|
||||
return;
|
||||
};
|
||||
debug!("got register super ack: {:?}", ack);
|
||||
edge.session_token.set(ack.session_token);
|
||||
let Ok(aes) = rsa_decrypt(&edge.rsa_private, &ack.aes_key) else {
|
||||
error!("failed to rsa decrypt aes key");
|
||||
return;
|
||||
};
|
||||
/*
|
||||
let Some(dev) = ack.dev_addr else {
|
||||
error!("no dev_addr is specified");
|
||||
return;
|
||||
};
|
||||
*/
|
||||
|
||||
let ip = ip_to_string(&edge.device_config.get_ip());
|
||||
// debug!("aes key is {:?}, ip is {}/{}", aes, ip, dev.net_bit_len,);
|
||||
println!("assigned ip: {}", ip);
|
||||
let hostname = edge.hostname.read().unwrap().clone();
|
||||
// println!("network is: {}.{}", hostname, dev.network_domain);
|
||||
/*
|
||||
edge.device_config
|
||||
.ip
|
||||
.net_addr
|
||||
.store(dev.net_addr, Ordering::Relaxed);
|
||||
*/
|
||||
if let Some(ref chan) = edge.connection_chan {
|
||||
let _ = chan.send(ConnectionInfo::IPInfo(ip)).await;
|
||||
}
|
||||
/*
|
||||
let mac = match dev.mac.try_into() {
|
||||
Err(_) => NULL_MAC,
|
||||
Ok(m) => m,
|
||||
};
|
||||
*/
|
||||
// *edge.device_config.mac.write().unwrap() = mac;
|
||||
/*
|
||||
edge.device_config
|
||||
.ip
|
||||
.net_bit_len
|
||||
.store(dev.net_bit_len as u8, Ordering::Relaxed);
|
||||
edge.network_id.store(dev.network_id, Ordering::Relaxed);
|
||||
*/
|
||||
// edge.device.reload_config(&edge.device_config, &dev.network_domain);
|
||||
edge.device.reload_config(&edge.device_config, &edge.network_domain.read().unwrap().clone());
|
||||
|
||||
edge.set_authorized(true, aes);
|
||||
send_stun_request(edge).await;
|
||||
tokio::spawn(async {
|
||||
let nattype = edge.probe_nat_type().await;
|
||||
debug!("nat type is {:?}", nattype);
|
||||
// println!("nat type is: {:?}", nattype);
|
||||
});
|
||||
}
|
||||
PacketType::RegisterSuperNAK => {
|
||||
let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else {
|
||||
error!("failed to decode REGISTER_SUPER_NAK");
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 1,
|
||||
message: "failed to decode REGISTER SUPER NAK".to_owned(),
|
||||
should_exit: false,
|
||||
},
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
let Ok(error_code) = NakMsgCode::try_from(_nak.error_code as u8) else {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 2,
|
||||
message: "error_code not recognized".to_owned(),
|
||||
should_exit: false,
|
||||
},
|
||||
);
|
||||
return;
|
||||
};
|
||||
match error_code {
|
||||
NakMsgCode::InvalidToken => {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 3,
|
||||
message: "invalid token".to_owned(),
|
||||
should_exit: true,
|
||||
},
|
||||
);
|
||||
edge.stop().await;
|
||||
}
|
||||
NakMsgCode::NodeDisabled => {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 4,
|
||||
message: "Node is disabled".to_owned(),
|
||||
should_exit: true,
|
||||
},
|
||||
);
|
||||
edge.stop().await;
|
||||
}
|
||||
_other => {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 0,
|
||||
message: "".to_owned(),
|
||||
should_exit: false,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
/*
|
||||
edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback {
|
||||
result: 1,
|
||||
message: "failed to decode REGISTER SUPER NAK".to_owned(),
|
||||
});
|
||||
*/
|
||||
edge.set_authorized(false, Vec::new());
|
||||
// std::process::exit(0);
|
||||
}
|
||||
PacketType::Command => {
|
||||
if msg.current_packet.len() < 1 {
|
||||
error!("malformed COMMAND received");
|
||||
return;
|
||||
}
|
||||
handle_tcp_command(edge, msg.current_packet[0], &msg.current_packet[1..]).await;
|
||||
}
|
||||
PacketType::Event => {
|
||||
if msg.current_packet.len() < 1 {
|
||||
error!("malformed EVENT received");
|
||||
return;
|
||||
}
|
||||
let Ok(event) = msg.current_packet[0].try_into() else {
|
||||
error!("failed to parse event type");
|
||||
return;
|
||||
};
|
||||
handle_tcp_event(edge, event, &msg.current_packet[1..]).await;
|
||||
}
|
||||
PacketType::PeerInfo => {
|
||||
let _ = handle_packet_peer_info(edge, &msg.current_packet[..]).await;
|
||||
}
|
||||
PacketType::Pong => {
|
||||
debug!("tcp pong received");
|
||||
let now = get_current_timestamp();
|
||||
edge.tcp_pong.store(now, Ordering::Relaxed);
|
||||
}
|
||||
other => {
|
||||
debug!("tcp not handling {:?}", other);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
async fn handle_tcp_command(_edge: &Node, _cmdtype: u8, _cmdprotobuf: &[u8]) {}
|
||||
|
||||
async fn handle_tcp_event(edge: &'static Node, eventtype: EventType, eventprotobuf: &[u8]) {
|
||||
match eventtype {
|
||||
EventType::SendRegister => {
|
||||
let Ok(reg) = SdlSendRegisterEvent::decode(eventprotobuf) else {
|
||||
error!("failed to decode SendRegister Event");
|
||||
return;
|
||||
};
|
||||
let v4 = reg.nat_ip.to_be_bytes();
|
||||
let mut v6_sock = None;
|
||||
if let Some(v6_info) = reg.v6_info {
|
||||
if let Ok(v6_bytes) = v6_info.v6.try_into() {
|
||||
v6_sock = Some(V6Info {
|
||||
port: v6_info.port as u16,
|
||||
v6: v6_bytes,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let dst_mac = match reg.dst_mac.try_into() {
|
||||
Ok(m) => m,
|
||||
Err(_e) => NULL_MAC,
|
||||
};
|
||||
|
||||
let remote_nat_byte = reg.nat_type as u8;
|
||||
let remote_nat = match remote_nat_byte.try_into() {
|
||||
Ok(t) => t,
|
||||
Err(_) => NatType::NoNat,
|
||||
};
|
||||
|
||||
check_peer_registration_needed(
|
||||
edge,
|
||||
false,
|
||||
dst_mac,
|
||||
// &v6_sock,
|
||||
remote_nat,
|
||||
&v6_sock,
|
||||
&SdlanSock {
|
||||
family: AF_INET,
|
||||
port: reg.nat_port as u16,
|
||||
v4,
|
||||
v6: [0; 16],
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
other => {
|
||||
debug!("unhandled event {:?}", other);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub fn init_quic_conn(
|
||||
cancel: CancellationToken,
|
||||
addr: &str,
|
||||
// on_connected: OnConnectedCallback<'a>,
|
||||
// on_disconnected: T3,
|
||||
// on_message: T2,
|
||||
pong_time: Arc<AtomicU64>,
|
||||
// cancel: CancellationToken,
|
||||
start_stop_chan: Receiver<StartStopInfo>,
|
||||
connecting_chan: Option<Sender<ConnectionInfo>>,
|
||||
ipv6_network_restarter: Option<Sender<bool>>,
|
||||
)
|
||||
// T2: Fn(SdlanTcp) -> F + Send + 'static,
|
||||
// F: Future<Output = ()> + Send,
|
||||
{
|
||||
let tcp_handle = ReadWriterHandle::new(
|
||||
cancel,
|
||||
addr,
|
||||
// on_connected,
|
||||
// on_disconnected,
|
||||
// on_message,
|
||||
pong_time,
|
||||
start_stop_chan,
|
||||
connecting_chan,
|
||||
ipv6_network_restarter,
|
||||
);
|
||||
|
||||
GLOBAL_QUIC_HANDLE
|
||||
.set(tcp_handle)
|
||||
.expect("failed to set global tcp handle");
|
||||
}
|
||||
|
||||
pub struct ReadWriteActor {
|
||||
// actor接收的发送给tcp的接收端,由handle存放发送端
|
||||
// to_tcp: Receiver<Vec<u8>>,
|
||||
remote: String,
|
||||
connected: Arc<AtomicBool>,
|
||||
pong_time: Arc<AtomicU64>,
|
||||
// actor收到数据之后,发送给上层的发送端口,接收端由handle保存
|
||||
from_tcp: Sender<SdlanTcp>,
|
||||
_cancel: CancellationToken,
|
||||
connecting_chan: Option<Sender<ConnectionInfo>>,
|
||||
ipv6_network_restarter: Option<Sender<bool>>,
|
||||
}
|
||||
|
||||
impl ReadWriteActor {
|
||||
pub fn new(
|
||||
cancel: CancellationToken,
|
||||
remote: &str,
|
||||
from_tcp: Sender<SdlanTcp>,
|
||||
connected: Arc<AtomicBool>,
|
||||
pong_time: Arc<AtomicU64>,
|
||||
connecting_chan: Option<Sender<ConnectionInfo>>,
|
||||
ipv6_network_restarter: Option<Sender<bool>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
// to_tcp,
|
||||
_cancel: cancel,
|
||||
pong_time,
|
||||
connected,
|
||||
remote: remote.to_owned(),
|
||||
from_tcp,
|
||||
connecting_chan,
|
||||
ipv6_network_restarter,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run<'a>(
|
||||
&self,
|
||||
keep_reconnect: bool,
|
||||
mut to_tcp: Receiver<Vec<u8>>,
|
||||
mut start_stop_chan: Receiver<StartStopInfo>,
|
||||
) {
|
||||
let edge = get_edge();
|
||||
|
||||
let mut started = false;
|
||||
let mut start_pkt_id = None;
|
||||
loop {
|
||||
if let Some(ref connecting_chan) = self.connecting_chan {
|
||||
let state = ConnectionInfo::ConnState(ConnectionState::NotConnected);
|
||||
let _ = connecting_chan.send(state).await;
|
||||
}
|
||||
self.connected.store(false, Ordering::Relaxed);
|
||||
if !started {
|
||||
// println!("waiting for start");
|
||||
loop {
|
||||
let start_or_stop = start_stop_chan.recv().await;
|
||||
if let Some(m) = start_or_stop {
|
||||
if m.is_start {
|
||||
started = true;
|
||||
start_pkt_id = m.pkt_id;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// None, just return
|
||||
return;
|
||||
}
|
||||
}
|
||||
/*
|
||||
while let Some(m) = start_stop_chan.recv().await {
|
||||
println!("4");
|
||||
if m.is_start {
|
||||
// println!("start received");
|
||||
started = true;
|
||||
start_pkt_id = m.pkt_id;
|
||||
break;
|
||||
} else {
|
||||
// println!("stop received");
|
||||
}
|
||||
}
|
||||
*/
|
||||
debug!("start stop chan received: {}", started);
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(ref connecting_chan) = self.connecting_chan {
|
||||
let state = ConnectionInfo::ConnState(ConnectionState::Connecting);
|
||||
let _ = connecting_chan.send(state).await;
|
||||
}
|
||||
debug!("try connecting...");
|
||||
|
||||
let Ok(conn) = edge.quic_endpoint.connect(self.remote.parse().unwrap(), "") else {
|
||||
self.connected.store(false, Ordering::Relaxed);
|
||||
if keep_reconnect {
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
continue;
|
||||
}
|
||||
return;
|
||||
};
|
||||
let Ok(conn) = conn.await else {
|
||||
self.connected.store(false, Ordering::Relaxed);
|
||||
if keep_reconnect {
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
continue;
|
||||
}
|
||||
return;
|
||||
};
|
||||
|
||||
let local_ip = conn.local_ip();
|
||||
|
||||
let Ok((mut send, mut recv)) = conn.open_bi().await else {
|
||||
self.connected.store(false, Ordering::Relaxed);
|
||||
if keep_reconnect {
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
continue;
|
||||
}
|
||||
return;
|
||||
};
|
||||
|
||||
self.connected.store(true, Ordering::Relaxed);
|
||||
debug!("connected");
|
||||
on_connected_callback(local_ip, &mut send, start_pkt_id.take()).await;
|
||||
|
||||
if let Some(ref connecting_chan) = self.connecting_chan {
|
||||
let state = ConnectionInfo::ConnState(ConnectionState::Connected);
|
||||
let _ = connecting_chan.send(state).await;
|
||||
}
|
||||
if let Some(ref ipv6_restarter) = self.ipv6_network_restarter {
|
||||
let _ = ipv6_restarter.send(true).await;
|
||||
}
|
||||
// stream.write("hello".as_bytes()).await;
|
||||
// let (reader, mut write) = stream.into_split();
|
||||
|
||||
let read_from_tcp = async move {
|
||||
let mut buffed_reader = BufReader::new(recv);
|
||||
loop {
|
||||
match read_a_packet(&mut buffed_reader).await {
|
||||
Ok(packet) => {
|
||||
debug!("got packet: {:?}", packet);
|
||||
if let Err(_e) = self.from_tcp.send(packet).await {
|
||||
error!("failed to receive a packet: {:?}", _e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("failed to read a packet: {}, reconnecting...", e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let write_to_tcp = async {
|
||||
while let Some(data) = to_tcp.recv().await {
|
||||
match send.write(&data).await {
|
||||
Ok(size) => {
|
||||
debug!("{} bytes sent to tcp", size);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("failed to write to tcp: {}", e.to_string());
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
error!("to_tcp recv None");
|
||||
};
|
||||
|
||||
let check_pong = async {
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
|
||||
let connected = self.connected.load(Ordering::Relaxed);
|
||||
let now = get_current_timestamp();
|
||||
if connected && now - self.pong_time.load(Ordering::Relaxed) > TCP_PING_TIME * 2
|
||||
{
|
||||
// pong time expire, need to re-connect
|
||||
error!("pong check expired");
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let check_stop = async {
|
||||
loop {
|
||||
match start_stop_chan.recv().await {
|
||||
Some(v) => {
|
||||
if !v.is_start {
|
||||
started = false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
_other => {
|
||||
// send chan is closed;
|
||||
started = false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pin_mut!(read_from_tcp, write_to_tcp);
|
||||
|
||||
tokio::select! {
|
||||
_ = read_from_tcp => {},
|
||||
_ = write_to_tcp => {},
|
||||
_ = check_pong => {},
|
||||
_ = check_stop => {},
|
||||
}
|
||||
on_disconnected_callback().await;
|
||||
debug!("connect retrying");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
debug!("disconnected");
|
||||
// future::select(read_from_tcp, write_to_tcp).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn on_disconnected_callback() {
|
||||
let edge = get_edge();
|
||||
edge.set_authorized(false, vec![]);
|
||||
}
|
||||
|
||||
async fn on_connected_callback(local_ip: Option<IpAddr>, stream: &mut SendStream, pkt_id: Option<u32>) {
|
||||
let edge = get_edge();
|
||||
// let installed_channel = install_channel.to_owned();
|
||||
|
||||
// let token = edge._token.lock().unwrap().clone();
|
||||
// let code = edge.network_code.lock().unwrap().clone();
|
||||
|
||||
// let edge = get_edge();
|
||||
// let edge = get_edge();
|
||||
// let token = args.token.clone();
|
||||
if let Some(ipaddr) = local_ip {
|
||||
match ipaddr {
|
||||
IpAddr::V4(v4) => {
|
||||
let ip = v4.into();
|
||||
// println!("outer ip is {} => {}", v4, ip);
|
||||
edge.outer_ip_v4.store(ip, Ordering::Relaxed);
|
||||
}
|
||||
_other => {}
|
||||
}
|
||||
}
|
||||
let register_super = SdlRegisterSuper {
|
||||
mac: Vec::from(edge.device_config.get_mac()),
|
||||
pkt_id: edge.get_next_packet_id(),
|
||||
network_id: edge.network_id.load(Ordering::Relaxed),
|
||||
ip: edge.device_config.get_ip(),
|
||||
mask_len: edge.device_config.get_net_bit() as u32,
|
||||
access_token: edge.access_token.get(),
|
||||
|
||||
// installed_channel,
|
||||
client_id: edge.config.node_uuid.clone(),
|
||||
pub_key: edge.rsa_pubkey.clone(),
|
||||
hostname: edge.hostname.read().unwrap().clone(),
|
||||
};
|
||||
// debug!("send register super: {:?}", register_super);
|
||||
let packet_id = match pkt_id {
|
||||
Some(id) => id,
|
||||
None => edge.get_next_packet_id(),
|
||||
};
|
||||
// let packet_id = edge.get_next_packet_id();
|
||||
let data = encode_to_tcp_message(
|
||||
Some(register_super),
|
||||
packet_id,
|
||||
PacketType::RegisterSuper as u8,
|
||||
)
|
||||
.unwrap();
|
||||
if let Err(e) = stream.write(&data).await {
|
||||
error!("failed to write to tcp: {}", e.to_string());
|
||||
}
|
||||
}
|
||||
@ -1,10 +1,15 @@
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use quinn::RecvStream;
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, BufReader},
|
||||
net::tcp::OwnedReadHalf,
|
||||
};
|
||||
|
||||
use num_enum::TryFromPrimitive;
|
||||
use tracing::debug;
|
||||
use tracing::{debug, error};
|
||||
|
||||
use crate::{network::Node, pb::{SdlStunRequest, Sdlv6Info, encode_to_udp_message}, utils::send_to_sock};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SdlanTcp {
|
||||
@ -92,8 +97,39 @@ pub enum PacketType {
|
||||
Data = 0xff,
|
||||
}
|
||||
|
||||
pub async fn send_stun_request(eee: &Node) {
|
||||
let sdl_v6_info = match *eee.ipv6.read().unwrap() {
|
||||
Some(ref l) => Some(Sdlv6Info {
|
||||
port: l.port as u32,
|
||||
v6: Vec::from(l.v6),
|
||||
}),
|
||||
None => None,
|
||||
};
|
||||
let req = SdlStunRequest {
|
||||
session_token: Vec::from(eee.session_token.get()),
|
||||
cookie: 0,
|
||||
client_id: eee.config.node_uuid.clone(),
|
||||
network_id: eee.network_id.load(Ordering::Relaxed),
|
||||
ip: eee.device_config.get_ip(),
|
||||
mac: Vec::from(eee.device_config.get_mac()),
|
||||
nat_type: eee.get_nat_type() as u32,
|
||||
v6_info: sdl_v6_info,
|
||||
};
|
||||
debug!("stun request: {:?}", req);
|
||||
let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap();
|
||||
if let Err(e) = send_to_sock(
|
||||
eee,
|
||||
&msg,
|
||||
&eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize],
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("failed to send to sock: {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn read_a_packet(
|
||||
reader: &mut BufReader<OwnedReadHalf>,
|
||||
reader: &mut BufReader<RecvStream>,
|
||||
) -> Result<SdlanTcp, std::io::Error> {
|
||||
debug!("read a packet");
|
||||
let size = reader.read_u16().await?;
|
||||
|
||||
@ -35,264 +35,8 @@ use super::tcp_codec::SdlanTcp;
|
||||
static GLOBAL_TCP_HANDLE: OnceCell<ReadWriterHandle> = OnceCell::new();
|
||||
|
||||
|
||||
async fn handle_tcp_message(msg: SdlanTcp) {
|
||||
let edge = get_edge();
|
||||
|
||||
// let now = get_current_timestamp();
|
||||
// edge.tcp_pong.store(now, Ordering::Relaxed);
|
||||
|
||||
debug!("got tcp message: {:?}", msg.packet_type);
|
||||
match msg.packet_type {
|
||||
PacketType::RegisterSuperACK => {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 0,
|
||||
message: "".to_owned(),
|
||||
should_exit: false,
|
||||
},
|
||||
);
|
||||
|
||||
let Ok(ack) = SdlRegisterSuperAck::decode(&msg.current_packet[..]) else {
|
||||
error!("failed to decode REGISTER_SUPER_ACK");
|
||||
return;
|
||||
};
|
||||
debug!("got register super ack: {:?}", ack);
|
||||
edge.session_token.set(ack.session_token);
|
||||
let Ok(aes) = rsa_decrypt(&edge.rsa_private, &ack.aes_key) else {
|
||||
error!("failed to rsa decrypt aes key");
|
||||
return;
|
||||
};
|
||||
/*
|
||||
let Some(dev) = ack.dev_addr else {
|
||||
error!("no dev_addr is specified");
|
||||
return;
|
||||
};
|
||||
*/
|
||||
|
||||
let ip = ip_to_string(&edge.device_config.get_ip());
|
||||
// debug!("aes key is {:?}, ip is {}/{}", aes, ip, dev.net_bit_len,);
|
||||
println!("assigned ip: {}", ip);
|
||||
let hostname = edge.hostname.read().unwrap().clone();
|
||||
// println!("network is: {}.{}", hostname, dev.network_domain);
|
||||
/*
|
||||
edge.device_config
|
||||
.ip
|
||||
.net_addr
|
||||
.store(dev.net_addr, Ordering::Relaxed);
|
||||
*/
|
||||
if let Some(ref chan) = edge.connection_chan {
|
||||
let _ = chan.send(ConnectionInfo::IPInfo(ip)).await;
|
||||
}
|
||||
/*
|
||||
let mac = match dev.mac.try_into() {
|
||||
Err(_) => NULL_MAC,
|
||||
Ok(m) => m,
|
||||
};
|
||||
*/
|
||||
// *edge.device_config.mac.write().unwrap() = mac;
|
||||
/*
|
||||
edge.device_config
|
||||
.ip
|
||||
.net_bit_len
|
||||
.store(dev.net_bit_len as u8, Ordering::Relaxed);
|
||||
edge.network_id.store(dev.network_id, Ordering::Relaxed);
|
||||
*/
|
||||
// edge.device.reload_config(&edge.device_config, &dev.network_domain);
|
||||
edge.device.reload_config(&edge.device_config, &edge.network_domain.read().unwrap().clone());
|
||||
|
||||
edge.set_authorized(true, aes);
|
||||
send_stun_request(edge).await;
|
||||
tokio::spawn(async {
|
||||
let nattype = edge.probe_nat_type().await;
|
||||
debug!("nat type is {:?}", nattype);
|
||||
// println!("nat type is: {:?}", nattype);
|
||||
});
|
||||
}
|
||||
PacketType::RegisterSuperNAK => {
|
||||
let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else {
|
||||
error!("failed to decode REGISTER_SUPER_NAK");
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 1,
|
||||
message: "failed to decode REGISTER SUPER NAK".to_owned(),
|
||||
should_exit: false,
|
||||
},
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
let Ok(error_code) = NakMsgCode::try_from(_nak.error_code as u8) else {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 2,
|
||||
message: "error_code not recognized".to_owned(),
|
||||
should_exit: false,
|
||||
},
|
||||
);
|
||||
return;
|
||||
};
|
||||
match error_code {
|
||||
NakMsgCode::InvalidToken => {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 3,
|
||||
message: "invalid token".to_owned(),
|
||||
should_exit: true,
|
||||
},
|
||||
);
|
||||
edge.stop().await;
|
||||
}
|
||||
NakMsgCode::NodeDisabled => {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 4,
|
||||
message: "Node is disabled".to_owned(),
|
||||
should_exit: true,
|
||||
},
|
||||
);
|
||||
edge.stop().await;
|
||||
}
|
||||
_other => {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 0,
|
||||
message: "".to_owned(),
|
||||
should_exit: false,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
/*
|
||||
edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback {
|
||||
result: 1,
|
||||
message: "failed to decode REGISTER SUPER NAK".to_owned(),
|
||||
});
|
||||
*/
|
||||
edge.set_authorized(false, Vec::new());
|
||||
// std::process::exit(0);
|
||||
}
|
||||
PacketType::Command => {
|
||||
if msg.current_packet.len() < 1 {
|
||||
error!("malformed COMMAND received");
|
||||
return;
|
||||
}
|
||||
handle_tcp_command(edge, msg.current_packet[0], &msg.current_packet[1..]).await;
|
||||
}
|
||||
PacketType::Event => {
|
||||
if msg.current_packet.len() < 1 {
|
||||
error!("malformed EVENT received");
|
||||
return;
|
||||
}
|
||||
let Ok(event) = msg.current_packet[0].try_into() else {
|
||||
error!("failed to parse event type");
|
||||
return;
|
||||
};
|
||||
handle_tcp_event(edge, event, &msg.current_packet[1..]).await;
|
||||
}
|
||||
PacketType::PeerInfo => {
|
||||
let _ = handle_packet_peer_info(edge, &msg.current_packet[..]).await;
|
||||
}
|
||||
PacketType::Pong => {
|
||||
debug!("tcp pong received");
|
||||
let now = get_current_timestamp();
|
||||
edge.tcp_pong.store(now, Ordering::Relaxed);
|
||||
}
|
||||
other => {
|
||||
debug!("tcp not handling {:?}", other);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_tcp_command(_edge: &Node, _cmdtype: u8, _cmdprotobuf: &[u8]) {}
|
||||
|
||||
async fn handle_tcp_event(edge: &'static Node, eventtype: EventType, eventprotobuf: &[u8]) {
|
||||
match eventtype {
|
||||
EventType::SendRegister => {
|
||||
let Ok(reg) = SdlSendRegisterEvent::decode(eventprotobuf) else {
|
||||
error!("failed to decode SendRegister Event");
|
||||
return;
|
||||
};
|
||||
let v4 = reg.nat_ip.to_be_bytes();
|
||||
let mut v6_sock = None;
|
||||
if let Some(v6_info) = reg.v6_info {
|
||||
if let Ok(v6_bytes) = v6_info.v6.try_into() {
|
||||
v6_sock = Some(V6Info {
|
||||
port: v6_info.port as u16,
|
||||
v6: v6_bytes,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let dst_mac = match reg.dst_mac.try_into() {
|
||||
Ok(m) => m,
|
||||
Err(_e) => NULL_MAC,
|
||||
};
|
||||
|
||||
let remote_nat_byte = reg.nat_type as u8;
|
||||
let remote_nat = match remote_nat_byte.try_into() {
|
||||
Ok(t) => t,
|
||||
Err(_) => NatType::NoNat,
|
||||
};
|
||||
|
||||
check_peer_registration_needed(
|
||||
edge,
|
||||
false,
|
||||
dst_mac,
|
||||
// &v6_sock,
|
||||
remote_nat,
|
||||
&v6_sock,
|
||||
&SdlanSock {
|
||||
family: AF_INET,
|
||||
port: reg.nat_port as u16,
|
||||
v4,
|
||||
v6: [0; 16],
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
other => {
|
||||
debug!("unhandled event {:?}", other);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub async fn send_stun_request(eee: &Node) {
|
||||
let sdl_v6_info = match *eee.ipv6.read().unwrap() {
|
||||
Some(ref l) => Some(Sdlv6Info {
|
||||
port: l.port as u32,
|
||||
v6: Vec::from(l.v6),
|
||||
}),
|
||||
None => None,
|
||||
};
|
||||
let req = SdlStunRequest {
|
||||
session_token: Vec::from(eee.session_token.get()),
|
||||
cookie: 0,
|
||||
client_id: eee.config.node_uuid.clone(),
|
||||
network_id: eee.network_id.load(Ordering::Relaxed),
|
||||
ip: eee.device_config.get_ip(),
|
||||
mac: Vec::from(eee.device_config.get_mac()),
|
||||
nat_type: eee.get_nat_type() as u32,
|
||||
v6_info: sdl_v6_info,
|
||||
};
|
||||
debug!("stun request: {:?}", req);
|
||||
let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap();
|
||||
if let Err(e) = send_to_sock(
|
||||
eee,
|
||||
&msg,
|
||||
&eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize],
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("failed to send to sock: {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
async fn on_disconnected_callback() {
|
||||
let edge = get_edge();
|
||||
@ -349,197 +93,6 @@ async fn on_connected_callback<'a>(stream: &'a mut tokio::net::TcpStream, pkt_id
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ReadWriteActor {
|
||||
// actor接收的发送给tcp的接收端,由handle存放发送端
|
||||
// to_tcp: Receiver<Vec<u8>>,
|
||||
remote: String,
|
||||
connected: Arc<AtomicBool>,
|
||||
pong_time: Arc<AtomicU64>,
|
||||
// actor收到数据之后,发送给上层的发送端口,接收端由handle保存
|
||||
from_tcp: Sender<SdlanTcp>,
|
||||
_cancel: CancellationToken,
|
||||
connecting_chan: Option<Sender<ConnectionInfo>>,
|
||||
ipv6_network_restarter: Option<Sender<bool>>,
|
||||
}
|
||||
|
||||
impl ReadWriteActor {
|
||||
pub fn new(
|
||||
cancel: CancellationToken,
|
||||
remote: &str,
|
||||
from_tcp: Sender<SdlanTcp>,
|
||||
connected: Arc<AtomicBool>,
|
||||
pong_time: Arc<AtomicU64>,
|
||||
connecting_chan: Option<Sender<ConnectionInfo>>,
|
||||
ipv6_network_restarter: Option<Sender<bool>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
// to_tcp,
|
||||
_cancel: cancel,
|
||||
pong_time,
|
||||
connected,
|
||||
remote: remote.to_owned(),
|
||||
from_tcp,
|
||||
connecting_chan,
|
||||
ipv6_network_restarter,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run<'a>(
|
||||
&self,
|
||||
keep_reconnect: bool,
|
||||
mut to_tcp: Receiver<Vec<u8>>,
|
||||
mut start_stop_chan: Receiver<StartStopInfo>,
|
||||
) {
|
||||
let mut started = false;
|
||||
let mut start_pkt_id = None;
|
||||
loop {
|
||||
if let Some(ref connecting_chan) = self.connecting_chan {
|
||||
let state = ConnectionInfo::ConnState(ConnectionState::NotConnected);
|
||||
let _ = connecting_chan.send(state).await;
|
||||
}
|
||||
self.connected.store(false, Ordering::Relaxed);
|
||||
if !started {
|
||||
// println!("waiting for start");
|
||||
loop {
|
||||
let start_or_stop = start_stop_chan.recv().await;
|
||||
if let Some(m) = start_or_stop {
|
||||
if m.is_start {
|
||||
started = true;
|
||||
start_pkt_id = m.pkt_id;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// None, just return
|
||||
return;
|
||||
}
|
||||
}
|
||||
/*
|
||||
while let Some(m) = start_stop_chan.recv().await {
|
||||
println!("4");
|
||||
if m.is_start {
|
||||
// println!("start received");
|
||||
started = true;
|
||||
start_pkt_id = m.pkt_id;
|
||||
break;
|
||||
} else {
|
||||
// println!("stop received");
|
||||
}
|
||||
}
|
||||
*/
|
||||
debug!("start stop chan received: {}", started);
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(ref connecting_chan) = self.connecting_chan {
|
||||
let state = ConnectionInfo::ConnState(ConnectionState::Connecting);
|
||||
let _ = connecting_chan.send(state).await;
|
||||
}
|
||||
debug!("try connecting...");
|
||||
|
||||
let Ok(mut stream) = TcpStream::connect(&self.remote).await else {
|
||||
self.connected.store(false, Ordering::Relaxed);
|
||||
if keep_reconnect {
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
continue;
|
||||
}
|
||||
return;
|
||||
};
|
||||
self.connected.store(true, Ordering::Relaxed);
|
||||
debug!("connected");
|
||||
on_connected_callback(&mut stream, start_pkt_id.take()).await;
|
||||
|
||||
if let Some(ref connecting_chan) = self.connecting_chan {
|
||||
let state = ConnectionInfo::ConnState(ConnectionState::Connected);
|
||||
let _ = connecting_chan.send(state).await;
|
||||
}
|
||||
if let Some(ref ipv6_restarter) = self.ipv6_network_restarter {
|
||||
let _ = ipv6_restarter.send(true).await;
|
||||
}
|
||||
// stream.write("hello".as_bytes()).await;
|
||||
let (reader, mut write) = stream.into_split();
|
||||
|
||||
let read_from_tcp = async move {
|
||||
let mut buffed_reader = BufReader::new(reader);
|
||||
loop {
|
||||
match read_a_packet(&mut buffed_reader).await {
|
||||
Ok(packet) => {
|
||||
debug!("got packet: {:?}", packet);
|
||||
if let Err(_e) = self.from_tcp.send(packet).await {
|
||||
error!("failed to receive a packet: {:?}", _e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("failed to read a packet: {}, reconnecting...", e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let write_to_tcp = async {
|
||||
while let Some(data) = to_tcp.recv().await {
|
||||
match write.write(&data).await {
|
||||
Ok(size) => {
|
||||
debug!("{} bytes sent to tcp", size);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("failed to write to tcp: {}", e.to_string());
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
error!("to_tcp recv None");
|
||||
};
|
||||
|
||||
let check_pong = async {
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
|
||||
let connected = self.connected.load(Ordering::Relaxed);
|
||||
let now = get_current_timestamp();
|
||||
if connected && now - self.pong_time.load(Ordering::Relaxed) > TCP_PING_TIME * 2
|
||||
{
|
||||
// pong time expire, need to re-connect
|
||||
error!("pong check expired");
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let check_stop = async {
|
||||
loop {
|
||||
match start_stop_chan.recv().await {
|
||||
Some(v) => {
|
||||
if !v.is_start {
|
||||
started = false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
_other => {
|
||||
// send chan is closed;
|
||||
started = false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pin_mut!(read_from_tcp, write_to_tcp);
|
||||
|
||||
tokio::select! {
|
||||
_ = read_from_tcp => {},
|
||||
_ = write_to_tcp => {},
|
||||
_ = check_pong => {},
|
||||
_ = check_stop => {},
|
||||
}
|
||||
on_disconnected_callback().await;
|
||||
debug!("connect retrying");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
debug!("disconnected");
|
||||
// future::select(read_from_tcp, write_to_tcp).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ReadWriterHandle {
|
||||
@ -626,38 +179,6 @@ impl ReadWriterHandle {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init_tcp_conn(
|
||||
cancel: CancellationToken,
|
||||
addr: &str,
|
||||
// on_connected: OnConnectedCallback<'a>,
|
||||
// on_disconnected: T3,
|
||||
// on_message: T2,
|
||||
pong_time: Arc<AtomicU64>,
|
||||
// cancel: CancellationToken,
|
||||
start_stop_chan: Receiver<StartStopInfo>,
|
||||
connecting_chan: Option<Sender<ConnectionInfo>>,
|
||||
ipv6_network_restarter: Option<Sender<bool>>,
|
||||
)
|
||||
// T2: Fn(SdlanTcp) -> F + Send + 'static,
|
||||
// F: Future<Output = ()> + Send,
|
||||
{
|
||||
|
||||
let tcp_handle = ReadWriterHandle::new(
|
||||
cancel,
|
||||
addr,
|
||||
// on_connected,
|
||||
// on_disconnected,
|
||||
// on_message,
|
||||
pong_time,
|
||||
start_stop_chan,
|
||||
connecting_chan,
|
||||
ipv6_network_restarter,
|
||||
);
|
||||
|
||||
GLOBAL_TCP_HANDLE
|
||||
.set(tcp_handle)
|
||||
.expect("failed to set global tcp handle");
|
||||
}
|
||||
|
||||
pub fn get_tcp_conn() -> &'static ReadWriterHandle {
|
||||
match GLOBAL_TCP_HANDLE.get() {
|
||||
|
||||
@ -1,16 +1,52 @@
|
||||
use structopt::StructOpt;
|
||||
use clap::{Parser, Subcommand, Args};
|
||||
|
||||
#[derive(Parser)]
|
||||
pub struct CommandLineInput2 {
|
||||
#[command(subcommand)]
|
||||
cmd: Commands,
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
pub enum Commands {
|
||||
Login(UserLogin),
|
||||
TokenLogin(TokenLogin),
|
||||
|
||||
/// after login, we can use start to
|
||||
/// connect to the remote
|
||||
Start,
|
||||
|
||||
/// exits the
|
||||
Stop,
|
||||
}
|
||||
|
||||
#[derive(Args)]
|
||||
pub struct UserLogin {
|
||||
#[arg(short, long)]
|
||||
username: String,
|
||||
|
||||
// #[arg(long, env="APP_SECRET", hide_env_values = true, hide=true)]
|
||||
// password: String,
|
||||
}
|
||||
|
||||
#[derive(Args)]
|
||||
pub struct TokenLogin {}
|
||||
|
||||
|
||||
#[derive(StructOpt, Debug)]
|
||||
pub struct CommandLineInput {
|
||||
#[structopt(long = "token", default_value = "", help="specify a token")]
|
||||
#[structopt(short="u", long = "user", default_value = "", help="specify a token")]
|
||||
pub user: String,
|
||||
|
||||
#[structopt(short="P", long = "pass", default_value = "", help="specify a token")]
|
||||
pub pass: String,
|
||||
|
||||
#[structopt(short="t", long = "token", default_value = "", help="specify a token")]
|
||||
pub token: String,
|
||||
|
||||
#[structopt(short = "p", long = "port", default_value = "0", help="which port to use")]
|
||||
pub port: u16,
|
||||
|
||||
#[structopt(long = "code", default_value = "", help="specify a network code")]
|
||||
pub network_code: String,
|
||||
|
||||
#[structopt(short= "h", long = "hostname", default_value="", help="specify the hostname")]
|
||||
pub hostname: String,
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user