sdlan-lib-rs/src/network/async_main.rs
2026-02-25 16:06:44 +08:00

415 lines
12 KiB
Rust
Executable File

use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use crate::config::{TCP_PING_TIME};
use crate::network::ipv6::run_ipv6;
use crate::network::{
get_edge, ping_to_sn, read_and_parse_packet, TunTapPacketHandler,
};
use crate::tcp::{init_tcp_conn, send_stun_request};
use crate::utils::{send_to_sock, CommandLine};
use crate::{ConnectionInfo};
use sdlan_sn_rs::peer::{SdlanSock};
use sdlan_sn_rs::utils::{get_current_timestamp, is_multi_broadcast};
use sdlan_sn_rs::utils::{Mac, Result};
use tokio::net::{UdpSocket};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use super::{Node, StartStopInfo};
use crate::utils::Socket;
use tracing::{debug, error};
pub async fn async_main(
args: CommandLine,
start_stop_chan: Receiver<StartStopInfo>,
cancel: CancellationToken,
connecting_chan: Option<Sender<ConnectionInfo>>,
) -> Result<()> {
// let _ = PidRecorder::new(".pid");
let edge = get_edge();
// let token = args.token.clone();
let cancel_tcp = cancel.clone();
let (ipv6_network_restarter, rx) = channel(10);
tokio::spawn(run_ipv6(edge, rx));
// TODO: change the quic logic
/*
tokio::spawn(async move {
loop {
let conn = edge.quic_endpoint.connect("192.168.0.1".parse().unwrap(), "www.punchnet.com").unwrap().await.unwrap();
println!("连接成功!");
let (mut send, mut recv) = conn.open_bi().await.unwrap();
loop {
send.write_all(b"Hello QUIC!").await.unwrap();
let mut buf = vec![0u8; 1024];
if let Ok(size) = recv.read(&mut buf).await {
if let Some(size) = size {
println!("got data from server: {}", String::from_utf8_lossy(&buf[..size]))
} else {
println!("no data size found");
}
} else {
println!("read failed");
break;
}
recv.read(&mut buf).await.unwrap();
sleep(Duration::from_secs(11)).await;
}
println!("hello");
// conn.close(0u32.into(), b"donw");
edge.quic_endpoint.wait_idle().await;
}
});
*/
////////////////// to here
init_tcp_conn(
cancel_tcp,
&args.tcp,
// |msg| handle_tcp_message(msg),
edge.tcp_pong.clone(),
// tcp_pong,
start_stop_chan,
connecting_chan,
Some(ipv6_network_restarter),
);
// tcp_conn.send("hello".as_bytes()).await;
// tokio::spawn(handle_tcp_message(tcp_conn.data_from_tcp));
// tcp_conn.send("".as_bytes());
debug!("waiting for authorization...");
/*
loop {
// let _ = edge.send_register_super().await;
// let _ = read_and_parse_packet(edge, &edge.udp_sock_v4, Some(Duration::from_secs(3))).await;
println!("checking for authorized");
if edge.is_authorized() {
break;
}
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(3)) => {
continue;
}
_ = cancel.cancelled() => {
return Ok(());
}
}
}
*/
{
let cancel = cancel.clone();
tokio::spawn(async move {
run_edge_loop(edge, cancel).await;
});
}
{
let cancel = cancel.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = cancel.cancelled() => {
if let Err(e) = edge.send_unregister_super().await {
error!("failed to send unregister super: {}", e.as_str());
}
break;
}
_ = tokio::time::sleep(Duration::from_secs(TCP_PING_TIME)) => {
ping_to_sn().await;
}
}
}
debug!("loop update_supernode_reg exited");
});
}
cancel.cancelled().await;
/*
match tokio::signal::ctrl_c().await {
Ok(()) => {
debug!("shutdown received");
cancel.cancel();
debug!("exiting async_main");
tokio::time::sleep(Duration::from_millis(500)).await;
debug!("exiting async_main2");
}
Err(err) => {
eprintln!("failed to listen for shutdown signal: {}", err);
}
}
*/
// std::process::exit(0);
Ok(())
}
async fn run_edge_loop(eee: &'static Node, cancel: CancellationToken) {
ping_to_sn().await;
{
let cancel2 = cancel.clone();
let cancel = cancel.clone();
tokio::spawn(async move {
loop_socket_v4(eee, &eee.udp_sock_v4, cancel, false).await;
});
if let Some(ref multicast) = eee.udp_sock_multicast {
loop_socket_v4(eee, &multicast, cancel2, true).await;
}
}
{
tokio::spawn(async move {
loop_tap(eee, cancel).await;
});
}
}
pub async fn loop_socket_v6(eee: &'static Node, socket: Arc<Socket>, cancel: CancellationToken) {
debug!("loop sock v6");
loop {
tokio::select! {
_ = cancel.cancelled() => {
break;
}
_ = read_and_parse_packet(eee, &socket, Some(Duration::from_secs(10)), false) => { }
_ = tokio::time::sleep(Duration::from_secs(10)) => {
/*
let req = SdlStunRequest {
cookie: 0,
client_id: eee.config.node_uuid.clone(),
network_id: eee.network_id.load(Ordering::Relaxed),
ip: eee.device_config.get_ip(),
nat_type: 0,
};
let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap();
if let Err(e) = send_to_sock(eee, &msg, &eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize]).await {
error!("failed to send to sock: {:?}", e);
}*/
}
}
}
debug!("loop_socket_v4 exited");
}
pub async fn loop_socket_v4(
eee: &'static Node,
socket: &Socket,
cancel: CancellationToken,
is_multicast_sock: bool,
) {
debug!("loop sock v4");
let cancel_clone = cancel.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = cancel_clone.cancelled() => {
break;
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
if !is_multicast_sock {
send_stun_request(eee).await;
}
}
}
}
});
loop {
tokio::select! {
_ = cancel.cancelled() => {
break;
}
_ = read_and_parse_packet(eee, socket, Some(Duration::from_secs(10)), is_multicast_sock) => { }
}
}
debug!("loop_socket_v4 exited");
}
async fn receive_dns_reply(sock: &Arc<UdpSocket>) -> Option<Vec<u8>> {
let mut reply = vec![0;1024];
if let Ok((size, _from)) = sock.recv_from(&mut reply).await {
if size == 0 {
// closed
return None;
}
reply.truncate(size);
return Some(reply);
}
None
}
async fn loop_tap(eee: &'static Node, cancel: CancellationToken) {
debug!("loop tap");
let (tx, mut rx) = channel(10);
tokio::spawn(async {
get_tun_flow(eee, tx).await;
});
loop {
tokio::select! {
_ = cancel.cancelled() => {
drop(rx);
break;
}
reply = receive_dns_reply(&eee.udp_sock_for_dns) => {
if reply.is_none() {
drop(rx);
break;
}
let reply = reply.unwrap();
let dstmac = eee.device_config.get_mac();
let srcmac = eee.device_config.dns_mac;
let mut packet = Vec::with_capacity(14+reply.len());
packet.extend_from_slice(&dstmac);
packet.extend_from_slice(&srcmac);
packet.push(0x08);
packet.push(0x00);
packet.extend_from_slice(&reply);
/// TODO: check the packet should
if let Err(_e) = eee.device.handle_packet_from_net(&packet, &Vec::new()).await {
error!("failed to write dns packet to device");
}
}
buf = rx.recv() => {
if buf.is_none() {
break;
}
read_and_parse_tun_packet(eee, buf.unwrap()).await;
}
}
}
debug!("loop_tap exited");
}
async fn get_tun_flow(eee: &'static Node, tx: Sender<Vec<u8>>) {
loop {
let buf = tokio::task::spawn_blocking(|| {
let mut buf = vec![0; 1500];
let Ok(size) = eee.device.recv(&mut buf) else {
return vec![];
};
buf.truncate(size);
buf
})
.await
.unwrap();
if buf.len() == 0 {
return;
}
if let Err(e) = tx.send(buf).await {
error!("failed to send buf: {}", e);
return;
}
}
}
async fn read_and_parse_tun_packet(eee: &'static Node, buf: Vec<u8>) {
/*
if !eee.is_authorized() {
debug!("drop packet before authorized");
return;
}
*/
/*
if eee.stats.last_sup.load(Ordering::Relaxed) == 0 {
debug!("drop packet before first registration");
return;
}
*/
// buf.truncate(size);
edge_send_packet_to_net(eee, buf).await;
}
async fn edge_send_packet_to_net(eee: &Node, data: Vec<u8>) {
// debug!("edge send packet to net({} bytes): {:?}", data.len(), data);
let encrypt_key = eee.get_encrypt_key();
if encrypt_key.len() == 0 {
error!("drop tun packet due to encrypt key len is 0");
return;
}
if let Err(e) = eee
.device
.handle_packet_from_device(data, encrypt_key.as_slice())
.await
{
error!("failed to handle packet from device: {}", e.to_string());
}
}
pub async fn send_packet_to_net(eee: &'static Node, dst_mac: Mac, pkt: &[u8], size: u64) {
let (dest_sock, is_p2p) = find_peer_destination(eee, dst_mac).await;
if is_p2p {
eee.stats.tx_p2p.fetch_add(size, Ordering::Relaxed);
} else {
eee.stats.tx_sup.fetch_add(size, Ordering::Relaxed);
if is_multi_broadcast(&dst_mac) {
eee.stats.tx_broadcast.fetch_add(size, Ordering::Relaxed);
}
}
debug!("send packet PACKET to {}", dest_sock.to_string());
if let Err(e) = send_to_sock(eee, pkt, &dest_sock).await {
error!("failed to send packet to net: {}", e.as_str());
}
}
async fn find_peer_destination(eee: &'static Node, dst_mac: Mac) -> (SdlanSock, bool) {
if is_multi_broadcast(&dst_mac) {
return (
eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize]
.deepcopy(),
false,
);
}
let mut is_p2p = false;
let result: SdlanSock;
let mut need_delete_from_known_peers = false;
if let Some(dst) = eee.known_peers.peers.get_mut(&dst_mac) {
let now = get_current_timestamp();
if now - dst.last_p2p.load(Ordering::Relaxed) >= ((dst.timeout / 2) as u64) {
// too much time elapsed since we saw the peer, need to register again
error!("last p2p is too old, deleting from known_peers");
need_delete_from_known_peers = true;
// eee.known_peers.delete_peer_with_mac(&dst_mac);
debug!("deleted from known");
result = eee.config.super_nodes
[eee.config.super_node_index.load(Ordering::Relaxed) as usize]
.deepcopy();
} else {
// dst.last_seen.store(now, Ordering::Relaxed);
is_p2p = true;
result = dst.sock.deepcopy();
}
} else {
result = eee.config.super_nodes
[eee.config.super_node_index.load(Ordering::Relaxed) as usize]
.deepcopy();
}
if need_delete_from_known_peers {
eee.known_peers.delete_peer_with_mac(&dst_mac);
}
// println!("find peer_destination: {}", is_p2p);
if !is_p2p {
debug!("check_query_peer_info");
super::packet::check_query_peer_info(eee, dst_mac).await;
}
return (result, is_p2p);
}