use std::net::IpAddr; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use crate::config::{NULL_MAC, TCP_PING_TIME}; use crate::network::ipv6::run_ipv6; use crate::network::{ get_edge, ping_to_sn, read_and_parse_packet, RegisterSuperFeedback, TunTapPacketHandler, }; use crate::pb::{ encode_to_tcp_message, encode_to_udp_message, SdlDevAddr, SdlRegisterSuper, SdlRegisterSuperAck, SdlRegisterSuperNak, SdlSendRegisterEvent, SdlStunRequest, Sdlv6Info, }; use crate::tcp::{init_tcp_conn, EventType, NakMsgCode, NatType, PacketType, SdlanTcp}; use crate::utils::{send_to_sock, CommandLine}; use crate::ConnectionState; use sdlan_sn_rs::config::AF_INET; use sdlan_sn_rs::peer::{SdlanSock, V6Info}; use sdlan_sn_rs::utils::{get_current_timestamp, ip_to_string, is_multi_broadcast, rsa_decrypt}; use sdlan_sn_rs::utils::{Mac, Result}; use tokio::io::AsyncWriteExt; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio_util::sync::CancellationToken; use super::{check_peer_registration_needed, packet, Node, StartStopInfo}; use crate::utils::Socket; use prost::Message; use tracing::{debug, error}; async fn handle_tcp_message(msg: SdlanTcp) { let edge = get_edge(); // let now = get_current_timestamp(); // edge.tcp_pong.store(now, Ordering::Relaxed); debug!("got tcp message: {:?}", msg.packet_type); match msg.packet_type { PacketType::RegisterSuperACK => { edge.send_register_super_feedback( msg._packet_id, RegisterSuperFeedback { result: 0, message: "".to_owned(), should_exit: false, }, ); let Ok(ack) = SdlRegisterSuperAck::decode(&msg.current_packet[..]) else { error!("failed to decode REGISTER_SUPER_ACK"); return; }; debug!("got register super ack: {:?}", ack); let Ok(aes) = rsa_decrypt(&edge.rsa_private, &ack.aes_key) else { error!("failed to rsa decrypt aes key"); return; }; let Some(dev) = ack.dev_addr else { error!("no dev_addr is specified"); return; }; let ip = ip_to_string(&dev.net_addr); // debug!("aes key is {:?}, ip is {}/{}", aes, ip, dev.net_bit_len,); println!("assigned ip: {}", ip); edge.device_config .ip .net_addr .store(dev.net_addr, Ordering::Relaxed); /* let mac = match dev.mac.try_into() { Err(_) => NULL_MAC, Ok(m) => m, }; */ // *edge.device_config.mac.write().unwrap() = mac; edge.device_config .ip .net_bit_len .store(dev.net_bit_len as u8, Ordering::Relaxed); edge.device.reload_config(&edge.device_config); edge.network_id.store(dev.network_id, Ordering::Relaxed); edge.set_authorized(true, aes); send_stun_request(edge).await; tokio::spawn(async { let nattype = edge.probe_nat_type().await; debug!("nat type is {:?}", nattype); // println!("nat type is: {:?}", nattype); }); } PacketType::RegisterSuperNAK => { let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else { error!("failed to decode REGISTER_SUPER_NAK"); edge.send_register_super_feedback( msg._packet_id, RegisterSuperFeedback { result: 1, message: "failed to decode REGISTER SUPER NAK".to_owned(), should_exit: false, }, ); return; }; let Ok(error_code) = NakMsgCode::try_from(_nak.error_code as u8) else { edge.send_register_super_feedback( msg._packet_id, RegisterSuperFeedback { result: 2, message: "error_code not recognized".to_owned(), should_exit: false, }, ); return; }; match error_code { NakMsgCode::InvalidToken => { edge.send_register_super_feedback( msg._packet_id, RegisterSuperFeedback { result: 3, message: "invalid token".to_owned(), should_exit: true, }, ); edge.stop().await; } NakMsgCode::NodeDisabled => { edge.send_register_super_feedback( msg._packet_id, RegisterSuperFeedback { result: 4, message: "Node is disabled".to_owned(), should_exit: true, }, ); edge.stop().await; } _other => { edge.send_register_super_feedback( msg._packet_id, RegisterSuperFeedback { result: 0, message: "".to_owned(), should_exit: false, }, ); } } /* edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback { result: 1, message: "failed to decode REGISTER SUPER NAK".to_owned(), }); */ edge.set_authorized(false, Vec::new()); // std::process::exit(0); } PacketType::Command => { if msg.current_packet.len() < 1 { error!("malformed COMMAND received"); return; } handle_tcp_command(edge, msg.current_packet[0], &msg.current_packet[1..]).await; } PacketType::Event => { if msg.current_packet.len() < 1 { error!("malformed EVENT received"); return; } let Ok(event) = msg.current_packet[0].try_into() else { error!("failed to parse event type"); return; }; handle_tcp_event(edge, event, &msg.current_packet[1..]).await; } PacketType::PeerInfo => { let _ = packet::handle_packet_peer_info(edge, &msg.current_packet[..]).await; } PacketType::Pong => { debug!("tcp pong received"); let now = get_current_timestamp(); edge.tcp_pong.store(now, Ordering::Relaxed); } other => { debug!("tcp not handling {:?}", other); } } } async fn handle_tcp_command(_edge: &Node, _cmdtype: u8, _cmdprotobuf: &[u8]) {} async fn handle_tcp_event(edge: &'static Node, eventtype: EventType, eventprotobuf: &[u8]) { match eventtype { EventType::SendRegister => { let Ok(reg) = SdlSendRegisterEvent::decode(eventprotobuf) else { error!("failed to decode SendRegister Event"); return; }; let v4 = reg.nat_ip.to_be_bytes(); let mut v6_sock = None; if let Some(v6_info) = reg.v6_info { if let Ok(v6_bytes) = v6_info.v6.try_into() { v6_sock = Some(V6Info { port: v6_info.port as u16, v6: v6_bytes, }); } } let dst_mac = match reg.dst_mac.try_into() { Ok(m) => m, Err(_e) => NULL_MAC, }; let remote_nat_byte = reg.nat_type as u8; let remote_nat = match remote_nat_byte.try_into() { Ok(t) => t, Err(_) => NatType::NoNat, }; check_peer_registration_needed( edge, false, dst_mac, // &v6_sock, remote_nat, &v6_sock, &SdlanSock { family: AF_INET, port: reg.nat_port as u16, v4, v6: [0; 16], }, ) .await; } other => { debug!("unhandled event {:?}", other); } } } pub async fn async_main( install_channel: String, args: CommandLine, start_stop_chan: Receiver, cancel: CancellationToken, connecting_chan: Option>, ) -> Result<()> { // let _ = PidRecorder::new(".pid"); // // gen public key // gen_rsa_keys(".client"); // let mut pubkey = String::new(); // File::open(".client/id_rsa.pub")?.read_to_string(&mut pubkey)?; // let privatekey = load_private_key_file(".client/id_rsa")?; // // init sock // if args.token.len() == 0 { // println!("failed to load token"); // return Ok(()); // } // let sock_v4 = Socket::build(0, true, true, args.tos).await?; // // allow multicast // // TODO: set the sn's tcp socket // // let tcpsock = TCPSocket::build("121.4.79.234:1234").await?; // let tcp_pong = Arc::new(AtomicU64::new(0)); // let edge = Node::new( // pubkey, // node_conf, // sock_v4, // &args.token, // privatekey, // tcp_pong.clone(), // ); let edge = get_edge(); // let token = args.token.clone(); let cancel_tcp = cancel.clone(); let (ipv6_network_restarter, rx) = channel(10); tokio::spawn(run_ipv6(edge, rx)); init_tcp_conn( cancel_tcp, &args.tcp, move |stream, pkt_id| { let installed_channel = install_channel.to_owned(); Box::pin(async move { let token = edge._token.lock().unwrap().clone(); let code = edge.network_code.lock().unwrap().clone(); // let edge = get_edge(); // let edge = get_edge(); // let token = args.token.clone(); if let Ok(ipaddr) = stream.local_addr() { match ipaddr.ip() { IpAddr::V4(v4) => { let ip = v4.into(); // println!("outer ip is {} => {}", v4, ip); edge.outer_ip_v4.store(ip, Ordering::Relaxed); } _other => {} } } let register_super = SdlRegisterSuper { version: 1, installed_channel, client_id: edge.config.node_uuid.clone(), dev_addr: Some(SdlDevAddr { mac: Vec::from(edge.device_config.get_mac()), net_addr: 0, network_id: 0, net_bit_len: 0, }), pub_key: edge.rsa_pubkey.clone(), token, network_code: code, }; // debug!("send register super: {:?}", register_super); let packet_id = match pkt_id { Some(id) => id, None => edge.get_next_packet_id(), }; // let packet_id = edge.get_next_packet_id(); let data = encode_to_tcp_message( Some(register_super), packet_id, PacketType::RegisterSuper as u8, ) .unwrap(); if let Err(e) = stream.write(&data).await { error!("failed to write to tcp: {}", e.to_string()); } }) }, || async { edge.set_authorized(false, vec![]); }, |msg| handle_tcp_message(msg), edge.tcp_pong.clone(), // tcp_pong, start_stop_chan, connecting_chan, Some(ipv6_network_restarter), ); // tcp_conn.send("hello".as_bytes()).await; // tokio::spawn(handle_tcp_message(tcp_conn.data_from_tcp)); // tcp_conn.send("".as_bytes()); debug!("waiting for authorization..."); /* loop { // let _ = edge.send_register_super().await; // let _ = read_and_parse_packet(edge, &edge.udp_sock_v4, Some(Duration::from_secs(3))).await; println!("checking for authorized"); if edge.is_authorized() { break; } tokio::select! { _ = tokio::time::sleep(Duration::from_secs(3)) => { continue; } _ = cancel.cancelled() => { return Ok(()); } } } */ { let cancel = cancel.clone(); tokio::spawn(async move { run_edge_loop(edge, cancel).await; }); } { let cancel = cancel.clone(); tokio::spawn(async move { loop { tokio::select! { _ = cancel.cancelled() => { if let Err(e) = edge.send_unregister_super().await { error!("failed to send unregister super: {}", e.as_str()); } break; } _ = tokio::time::sleep(Duration::from_secs(TCP_PING_TIME)) => { ping_to_sn().await; } } } debug!("loop update_supernode_reg exited"); }); } cancel.cancelled().await; /* match tokio::signal::ctrl_c().await { Ok(()) => { debug!("shutdown received"); cancel.cancel(); debug!("exiting async_main"); tokio::time::sleep(Duration::from_millis(500)).await; debug!("exiting async_main2"); } Err(err) => { eprintln!("failed to listen for shutdown signal: {}", err); } } */ // std::process::exit(0); Ok(()) } async fn run_edge_loop(eee: &'static Node, cancel: CancellationToken) { ping_to_sn().await; { let cancel2 = cancel.clone(); let cancel = cancel.clone(); tokio::spawn(async move { loop_socket_v4(eee, &eee.udp_sock_v4, cancel, false).await; }); if let Some(ref multicast) = eee.udp_sock_multicast { loop_socket_v4(eee, &multicast, cancel2, true).await; } } { tokio::spawn(async move { loop_tap(eee, cancel).await; }); } } async fn send_stun_request(eee: &Node) { let sdl_v6_info = match *eee.ipv6.read().unwrap() { Some(ref l) => Some(Sdlv6Info { port: l.port as u32, v6: Vec::from(l.v6), }), None => None, }; let req = SdlStunRequest { cookie: 0, client_id: eee.config.node_uuid.clone(), network_id: eee.network_id.load(Ordering::Relaxed), ip: eee.device_config.get_ip(), mac: Vec::from(eee.device_config.get_mac()), nat_type: eee.get_nat_type() as u32, v6_info: sdl_v6_info, }; debug!("stun request: {:?}", req); let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap(); if let Err(e) = send_to_sock( eee, &msg, &eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize], ) .await { error!("failed to send to sock: {:?}", e); } } pub async fn loop_socket_v6(eee: &'static Node, socket: Arc, cancel: CancellationToken) { debug!("loop sock v6"); loop { tokio::select! { _ = cancel.cancelled() => { break; } _ = read_and_parse_packet(eee, &socket, Some(Duration::from_secs(10)), false) => { } _ = tokio::time::sleep(Duration::from_secs(10)) => { /* let req = SdlStunRequest { cookie: 0, client_id: eee.config.node_uuid.clone(), network_id: eee.network_id.load(Ordering::Relaxed), ip: eee.device_config.get_ip(), nat_type: 0, }; let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap(); if let Err(e) = send_to_sock(eee, &msg, &eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize]).await { error!("failed to send to sock: {:?}", e); }*/ } } } debug!("loop_socket_v4 exited"); } pub async fn loop_socket_v4( eee: &'static Node, socket: &Socket, cancel: CancellationToken, is_multicast_sock: bool, ) { debug!("loop sock v4"); let cancel_clone = cancel.clone(); tokio::spawn(async move { loop { tokio::select! { _ = cancel_clone.cancelled() => { break; } _ = tokio::time::sleep(Duration::from_secs(10)) => { if !is_multicast_sock { send_stun_request(eee).await; } } } } }); loop { tokio::select! { _ = cancel.cancelled() => { break; } _ = read_and_parse_packet(eee, socket, Some(Duration::from_secs(10)), is_multicast_sock) => { } } } debug!("loop_socket_v4 exited"); } async fn loop_tap(eee: &'static Node, cancel: CancellationToken) { debug!("loop tap"); let (tx, mut rx) = channel(10); tokio::spawn(async { get_tun_flow(eee, tx).await; }); loop { tokio::select! { _ = cancel.cancelled() => { drop(rx); break; } buf = rx.recv() => { if buf.is_none() { break; } read_and_parse_tun_packet(eee, buf.unwrap()).await; } } } debug!("loop_tap exited"); } async fn get_tun_flow(eee: &'static Node, tx: Sender>) { loop { let buf = tokio::task::spawn_blocking(|| { let mut buf = vec![0; 1500]; let Ok(size) = eee.device.recv(&mut buf) else { return vec![]; }; buf.truncate(size); buf }) .await .unwrap(); if buf.len() == 0 { return; } if let Err(e) = tx.send(buf).await { error!("failed to send buf: {}", e); return; } } } async fn read_and_parse_tun_packet(eee: &'static Node, buf: Vec) { /* if !eee.is_authorized() { debug!("drop packet before authorized"); return; } */ /* if eee.stats.last_sup.load(Ordering::Relaxed) == 0 { debug!("drop packet before first registration"); return; } */ // buf.truncate(size); edge_send_packet_to_net(eee, buf).await; } async fn edge_send_packet_to_net(eee: &Node, data: Vec) { // debug!("edge send packet to net({} bytes): {:?}", data.len(), data); let encrypt_key = eee.get_encrypt_key(); if encrypt_key.len() == 0 { error!("drop tun packet due to encrypt key len is 0"); return; } if let Err(e) = eee .device .handle_packet_from_device(data, encrypt_key.as_slice()) .await { error!("failed to handle packet from device: {}", e.to_string()); } } pub async fn send_packet_to_net(eee: &'static Node, dst_mac: Mac, pkt: &[u8], size: u64) { let (dest_sock, is_p2p) = find_peer_destination(eee, dst_mac).await; if is_p2p { eee.stats.tx_p2p.fetch_add(size, Ordering::Relaxed); } else { eee.stats.tx_sup.fetch_add(size, Ordering::Relaxed); if is_multi_broadcast(&dst_mac) { eee.stats.tx_broadcast.fetch_add(size, Ordering::Relaxed); } } debug!("send packet PACKET to {}", dest_sock.to_string()); if let Err(e) = send_to_sock(eee, pkt, &dest_sock).await { error!("failed to send packet to net: {}", e.as_str()); } } async fn find_peer_destination(eee: &'static Node, dst_mac: Mac) -> (SdlanSock, bool) { if is_multi_broadcast(&dst_mac) { return ( eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize] .deepcopy(), false, ); } let mut is_p2p = false; let result: SdlanSock; let mut need_delete_from_known_peers = false; if let Some(dst) = eee.known_peers.peers.get_mut(&dst_mac) { let now = get_current_timestamp(); if now - dst.last_p2p.load(Ordering::Relaxed) >= ((dst.timeout / 2) as u64) { // too much time elapsed since we saw the peer, need to register again error!("last p2p is too old, deleting from known_peers"); need_delete_from_known_peers = true; // eee.known_peers.delete_peer_with_mac(&dst_mac); debug!("deleted from known"); result = eee.config.super_nodes [eee.config.super_node_index.load(Ordering::Relaxed) as usize] .deepcopy(); } else { // dst.last_seen.store(now, Ordering::Relaxed); is_p2p = true; result = dst.sock.deepcopy(); } } else { result = eee.config.super_nodes [eee.config.super_node_index.load(Ordering::Relaxed) as usize] .deepcopy(); } if need_delete_from_known_peers { eee.known_peers.delete_peer_with_mac(&dst_mac); } // println!("find peer_destination: {}", is_p2p); if !is_p2p { debug!("check_query_peer_info"); super::packet::check_query_peer_info(eee, dst_mac).await; } return (result, is_p2p); }