use std::sync::atomic::{Ordering}; use std::sync::Arc; use std::time::Duration; use crate::config::{TCP_PING_TIME}; use crate::network::ipv6::run_ipv6; use crate::network::{ get_edge, ping_to_sn, read_and_parse_packet, TunTapPacketHandler, }; use crate::tcp::{init_quic_conn, send_stun_request}; use crate::utils::{send_to_sock, CommandLine}; use crate::{ConnectionInfo}; use bytes::BytesMut; use etherparse::{PacketBuilder, SlicedPacket}; use sdlan_sn_rs::peer::{SdlanSock}; use sdlan_sn_rs::utils::{get_current_timestamp, ip_to_string, is_multi_broadcast}; use sdlan_sn_rs::utils::{Mac, Result}; use tokio::net::{UdpSocket}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio_util::sync::CancellationToken; use super::{Node, StartStopInfo}; use crate::utils::Socket; use tracing::{debug, error, warn}; pub async fn async_main( args: CommandLine, start_stop_chan: Receiver, cancel: CancellationToken, connecting_chan: Option>, ) -> Result<()> { // let _ = PidRecorder::new(".pid"); let edge = get_edge(); // let token = args.token.clone(); let cancel_tcp = cancel.clone(); let (ipv6_network_restarter, rx) = channel(10); tokio::spawn(run_ipv6(edge, rx)); init_quic_conn( cancel_tcp, &args.quic, &args.quic_domain, // |msg| handle_tcp_message(msg), edge.tcp_pong.clone(), // tcp_pong, start_stop_chan, connecting_chan, Some(ipv6_network_restarter), ); debug!("waiting for authorization..."); /* loop { // let _ = edge.send_register_super().await; // let _ = read_and_parse_packet(edge, &edge.udp_sock_v4, Some(Duration::from_secs(3))).await; println!("checking for authorized"); if edge.is_authorized() { break; } tokio::select! { _ = tokio::time::sleep(Duration::from_secs(3)) => { continue; } _ = cancel.cancelled() => { return Ok(()); } } } */ { let cancel = cancel.clone(); tokio::spawn(async move { run_edge_loop(edge, cancel).await; }); } { let cancel = cancel.clone(); tokio::spawn(async move { loop { tokio::select! { _ = cancel.cancelled() => { if let Err(e) = edge.send_unregister_super().await { error!("failed to send unregister super: {}", e.as_str()); } break; } _ = tokio::time::sleep(Duration::from_secs(TCP_PING_TIME)) => { ping_to_sn().await; } } } debug!("loop update_supernode_reg exited"); }); } cancel.cancelled().await; /* match tokio::signal::ctrl_c().await { Ok(()) => { debug!("shutdown received"); cancel.cancel(); debug!("exiting async_main"); tokio::time::sleep(Duration::from_millis(500)).await; debug!("exiting async_main2"); } Err(err) => { eprintln!("failed to listen for shutdown signal: {}", err); } } */ // std::process::exit(0); Ok(()) } async fn run_edge_loop(eee: &'static Node, cancel: CancellationToken) { ping_to_sn().await; { let cancel2 = cancel.clone(); let cancel = cancel.clone(); tokio::spawn(async move { loop_socket_v4(eee, &eee.udp_sock_v4, cancel, false).await; }); if let Some(ref multicast) = eee.udp_sock_multicast { loop_socket_v4(eee, &multicast, cancel2, true).await; } } { tokio::spawn(async move { loop_tap(eee, cancel).await; }); } } pub async fn loop_socket_v6(eee: &'static Node, socket: Arc, cancel: CancellationToken) { debug!("loop sock v6"); loop { tokio::select! { _ = cancel.cancelled() => { break; } _ = read_and_parse_packet(eee, &socket, Some(Duration::from_secs(10)), false) => { } _ = tokio::time::sleep(Duration::from_secs(10)) => { /* let req = SdlStunRequest { cookie: 0, client_id: eee.config.node_uuid.clone(), network_id: eee.network_id.load(Ordering::Relaxed), ip: eee.device_config.get_ip(), nat_type: 0, }; let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap(); if let Err(e) = send_to_sock(eee, &msg, &eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize]).await { error!("failed to send to sock: {:?}", e); }*/ } } } debug!("loop_socket_v4 exited"); } pub async fn loop_socket_v4( eee: &'static Node, socket: &Socket, cancel: CancellationToken, is_multicast_sock: bool, ) { debug!("loop sock v4"); let cancel_clone = cancel.clone(); tokio::spawn(async move { loop { tokio::select! { _ = cancel_clone.cancelled() => { break; } _ = tokio::time::sleep(Duration::from_secs(10)) => { if !is_multicast_sock { send_stun_request(eee).await; } } } } }); loop { tokio::select! { _ = cancel.cancelled() => { break; } _ = read_and_parse_packet(eee, socket, Some(Duration::from_secs(10)), is_multicast_sock) => { } } } debug!("loop_socket_v4 exited"); } async fn receive_dns_reply(sock: &Arc) -> Option> { let mut reply = vec![0;1024]; if let Ok((size, _from)) = sock.recv_from(&mut reply).await { if size == 0 { // closed return None; } reply.truncate(size); return Some(reply); } None } async fn loop_tap(eee: &'static Node, cancel: CancellationToken) { debug!("loop tap"); let (tx, mut rx) = channel(10); tokio::spawn(async { get_tun_flow(eee, tx).await; }); loop { tokio::select! { _ = cancel.cancelled() => { drop(rx); break; } global_reply_global = receive_dns_reply(&eee.udp_sock_for_global_dns) => { // global reply, only dns payload if let Some(data) = global_reply_global { if let Ok(mut dns) = simple_dns::Packet::parse(&data) { let transaction_id = dns.id(); if let Some((ip, port, origin_transaction_id)) = eee.dns_matcher.get_client_info(transaction_id) { warn!("got dns reply from global 223.5.5.5: {:?}, will send to {}:{}", data, ip_to_string(&ip), port); let dstmac = eee.device_config.get_mac(); let srcmac = eee.device_config.dns_mac; dns.set_id(origin_transaction_id); let builder = PacketBuilder::ethernet2(srcmac, dstmac) .ipv4([100, 100, 100, 100], ip.to_be_bytes(), 64) .udp(53, port); let new_dns_resp = dns.build_bytes_vec().unwrap(); let mut response_packet = Vec::with_capacity(builder.size(new_dns_resp.len())); if let Ok(_) = builder.write(&mut response_packet, &new_dns_resp) { warn!("will send to tun with packet: {:?}", response_packet); if let Err(_e) = eee.device.handle_packet_from_net(&response_packet).await { error!("failed to write dns packet to device"); } } else { error!("failed to write dns body"); } } } } } reply = receive_dns_reply(&eee.udp_sock_for_dns) => { if reply.is_none() { drop(rx); break; } let reply = reply.unwrap(); warn!("got packet from 15353: {:?}", reply); let dstmac = eee.device_config.get_mac(); let srcmac = eee.device_config.dns_mac; let mut packet = Vec::with_capacity(14+reply.len()); packet.extend_from_slice(&dstmac); packet.extend_from_slice(&srcmac); packet.push(0x08); packet.push(0x00); packet.extend_from_slice(&reply); // TODO: check the packet should if let Err(_e) = eee.device.handle_packet_from_net(&packet).await { error!("failed to write dns packet to device"); } } buf = rx.recv() => { if buf.is_none() { break; } read_and_parse_tun_packet(eee, buf.unwrap()).await; } } } error!("loop_tap exited"); } #[cfg(any(feature = "tun", target_os = "windows"))] fn get_data_from_tun_with_layer2_zeroed(eee: &Node) -> BytesMut { let mut temp = BytesMut::zeroed(1514); // let mut temp = BytesMut::with_capacity(1514); let mut data_buf = temp.split_off(14); let Ok(size) = eee.device.recv(&mut data_buf) else { return BytesMut::new(); }; data_buf.truncate(size); temp.unsplit(data_buf); temp } #[cfg(not(feature = "tun"))] fn get_data_from_tap_with_layer2(eee: &Node) -> BytesMut { let mut buf = BytesMut::zeroed(1514); let Ok(size) = eee.device.recv(&mut buf) else { return BytesMut::new(); }; buf.truncate(size); buf } async fn get_tun_flow(eee: &'static Node, tx: Sender) { loop { let buf = tokio::task::spawn_blocking(|| { #[cfg(any(feature = "tun", target_os = "windows"))] let data = get_data_from_tun_with_layer2_zeroed(eee); #[cfg(all(not(feature = "tun"), not(target_os="windows")))] let data = get_data_from_tap_with_layer2(eee); data }) .await .unwrap(); if buf.len() == 0 { return; } if let Err(e) = tx.send(buf).await { error!("failed to send buf: {}", e); return; } } } async fn read_and_parse_tun_packet(eee: &'static Node, buf: BytesMut) { /* if !eee.is_authorized() { debug!("drop packet before authorized"); return; } */ /* if eee.stats.last_sup.load(Ordering::Relaxed) == 0 { debug!("drop packet before first registration"); return; } */ // buf.truncate(size); edge_send_packet_to_net(eee, buf).await; } async fn edge_send_packet_to_net(eee: &Node, data: BytesMut) { // debug!("edge send packet to net({} bytes): {:?}", data.len(), data); /* let encrypt_key = eee.get_encrypt_key(); if encrypt_key.len() == 0 { error!("drop tun packet due to encrypt key len is 0"); return; } */ if let Err(e) = eee .device .handle_packet_from_device(data) .await { error!("failed to handle packet from device: {}", e.to_string()); } } pub async fn send_packet_to_net(eee: &'static Node, dst_mac: Mac, pkt: &[u8], size: u64) { let (dest_sock, is_p2p) = find_peer_destination(eee, dst_mac).await; if is_p2p { eee.stats.tx_p2p.fetch_add(size, Ordering::Relaxed); } else { eee.stats.tx_sup.fetch_add(size, Ordering::Relaxed); if is_multi_broadcast(&dst_mac) { eee.stats.tx_broadcast.fetch_add(size, Ordering::Relaxed); } } debug!("send packet PACKET to {}", dest_sock.to_string()); if let Err(e) = send_to_sock(eee, pkt, &dest_sock).await { error!("failed to send packet to net: {}", e.as_str()); } } async fn find_peer_destination(eee: &'static Node, dst_mac: Mac) -> (SdlanSock, bool) { if is_multi_broadcast(&dst_mac) { return ( eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize] .deepcopy(), false, ); } let mut is_p2p = false; let result: SdlanSock; let mut need_delete_from_known_peers = false; if let Some(dst) = eee.known_peers.peers.get_mut(&dst_mac) { let now = get_current_timestamp(); if now - dst.last_p2p.load(Ordering::Relaxed) >= ((dst.timeout / 2) as u64) { // too much time elapsed since we saw the peer, need to register again error!("last p2p is too old, deleting from known_peers"); need_delete_from_known_peers = true; // eee.known_peers.delete_peer_with_mac(&dst_mac); debug!("deleted from known"); result = eee.config.super_nodes [eee.config.super_node_index.load(Ordering::Relaxed) as usize] .deepcopy(); } else { // dst.last_seen.store(now, Ordering::Relaxed); is_p2p = true; result = dst.sock.deepcopy(); } } else { result = eee.config.super_nodes [eee.config.super_node_index.load(Ordering::Relaxed) as usize] .deepcopy(); } if need_delete_from_known_peers { eee.known_peers.delete_peer_with_mac(&dst_mac); } // println!("find peer_destination: {}", is_p2p); if !is_p2p { debug!("check_query_peer_info"); super::packet::check_query_peer_info(eee, dst_mac).await; } return (result, is_p2p); }