use std::{net::SocketAddr, sync::atomic::Ordering, time::Duration}; use crate::{ config::REGISTER_INTERVAL, pb::{ encode_to_tcp_message, encode_to_udp_message, SdlData, SdlEmpty, SdlPeerInfo, SdlQueryInfo, SdlRegister, SdlRegisterAck, SdlStunProbeReply, }, tcp::{get_tcp_conn, PacketType}, utils::{send_to_sock, Socket}, }; use etherparse::IpHeaders; use prost::Message; use sdlan_sn_rs::{ config::AF_INET, peer::{is_sdlan_sock_equal, SdlanSock, V6Info}, utils::{ aes_decrypt, get_current_timestamp, get_sdlan_sock_from_socketaddr, ip_to_string, is_multi_broadcast, Result, SDLanError, }, }; use std::sync::Arc; use tracing::{debug, error, info}; use super::{EdgePeer, Node}; pub async fn read_and_parse_packet( eee: &Node, sock: &Socket, timeout: Option, // cancel: CancellationToken, ) -> Result<()> { let mut buf = vec![0; 3000]; let res; if timeout.is_some() { tokio::select! { _ = tokio::time::sleep(timeout.unwrap()) => { return Err(SDLanError::NormalError("timeouted")); } r = sock.recv_from(&mut buf) => { res=r; } } } else { res = sock.recv_from(&mut buf).await; } debug!("read_and_parse packet, got packet"); // let res = sock.recv_from(&mut buf).await; match res { Ok((0, _)) => { // received 0 error!("received zero bytes"); // return Ok(()) } Err(e) => { // error occured error!("receive error occured: {:?}", e); } Ok((size, from)) => { // size > 0 buf.truncate(size); match handle_packet(eee, from, &buf).await { Ok(_) => {} Err(e) => { error!("failed to handle_packet: {:?}", e); } } } } Ok(()) } pub async fn handle_packet(eee: &Node, addr: SocketAddr, buf: &[u8]) -> Result<()> { if buf.len() < 1 { return Err(SDLanError::NormalError("buf length error")); } let Ok(pkt_type) = PacketType::try_from(buf[0]) else { return Err(SDLanError::NormalError("invalid packet type")); }; match pkt_type { PacketType::Data => { let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap(); debug!("[PPP]Rx data from {}", from_sock.to_string()); if !eee.is_authorized() { error!("dropping PACKET received before authorized"); return Ok(()); } let Ok(data) = SdlData::decode(&buf[1..]) else { error!("failed to decode to SDLData"); return Err(SDLanError::NormalError("failed to decode to SDLData")); }; // let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap(); if data.is_p2p { debug!("[P2P] Rx data from {}", from_sock.to_string()); } else { debug!( "[PsP] Rx data from {} via {}", ip_to_string(&data.src_ip), from_sock.to_string() ); } if data.is_p2p { check_peer_registration_needed(eee, !data.is_p2p, data.src_ip, &None, &from_sock) .await; } handle_tun_packet(eee, !data.is_p2p, data).await; } PacketType::StunProbeReply => { println!("got stunprobeReply"); let Ok(reply) = SdlStunProbeReply::decode(&buf[1..]) else { error!("failed to decode SdlStunReply"); return Ok(()); }; eee.send_nat_probe_reply(reply.cookie, reply).await; } PacketType::StunReply => { // stun reply, like pong } PacketType::Register => { if !eee.is_authorized() { error!("dropping REGISTER received before authorized"); return Ok(()); } let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap(); let _ = handle_packet_register(eee, &buf[1..], false, &from_sock).await; } PacketType::RegisterACK => { if !eee.is_authorized() { error!("dropping REGISTER received before authorized"); return Ok(()); } let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap(); let _ = handle_packet_register_ack(eee, &buf[1..], &from_sock).await; } other => { error!("udp not processing {:?}", other); } } /* let pkt_type = buf[0].into(); debug!("got packet {} bytes", buf.len()); let (cmn, slice) = packet::decode_common(&buf)?; println!("got packet: {:?}", cmn.pc); if !eee.is_authorized() { error!("unauthorized, returning"); return Ok(()); } let from_sn = (cmn.flags & config::SDLAN_FLAGS_FROM_SN) != 0; let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap(); if from_sn { if !eee.sn_is_known(&from_sock) { error!("drop incoming data from unknown supernode"); return Ok(()); } } let res = match cmn.pc { PacketType::PKTPacket => { // handle packet handle_packet_packet(eee, cmn, slice, from_sn, &from_sock).await } PacketType::PKTRegister => { // handle register from other peer handle_packet_register(eee, slice, from_sn, &from_sock).await } PacketType::PKTRegisterACK => { // handle register ack from other peer handle_packet_register_ack(eee, slice, &from_sock).await } PacketType::PKTRegisterSuperAcknowledge => { // handle register super acknowledge handle_packet_register_super_acknowledge(eee) } PacketType::PKTRegisterSuperACK => { // handle register super ack handle_packet_register_super_ack(eee, cmn, slice) } PacketType::PKTRegisterSuperNAK => { // handle register super nak handle_packet_register_super_nak(eee, cmn, slice) } PacketType::PKTPeerInfo => { // handle peer info from sn handle_packet_peer_info(eee, slice).await } PacketType::PKTCommand => { // handle command Ok(()) } other => { error!("unknown packet {:?}", other); Ok(()) } }; if let Err(e) = res { error!("handle packet error occured: {}", e.as_str()); } */ Ok(()) } pub async fn handle_packet_peer_info( eee: &Node, // cmn: Common<'_>, body: &[u8], //sender_sock: &SdlanSock, ) -> Result<()> { let Ok(pi) = SdlPeerInfo::decode(body) else { error!("failed to decode PEER_INFO"); return Ok(()); }; debug!("got peer info: {:?}", pi); if pi.v4_info.is_none() { error!("PEER's v4_info is none"); return Ok(()); } let v4 = pi.v4_info.unwrap(); let Ok(v4_u32) = v4.v4.try_into() else { error!("failed to convert v4"); return Ok(()); }; // let src_ip = u32::from_be_bytes(v4_u32); if pi.dst_ip == 0 { // pong from sn } else { match eee.pending_peers.get_peer(&pi.dst_ip) { Some(edgeinfo) => { let sock = SdlanSock { family: AF_INET, port: v4.port as u16, v4: v4_u32, v6: [0; 16], }; *(edgeinfo.sock.write().unwrap()) = sock.deepcopy(); info!( "Rx PEERINFO for {}: is at {}", ip_to_string(&pi.dst_ip), sock.to_string() ); send_register(eee, &sock, &None).await; } None => { debug!("Rx PEERINFO unknown peer: {}", ip_to_string(&pi.dst_ip)); } } } Ok(()) } /* fn handle_packet_register_super_nak(eee: &Node, _cmn: Common<'_>, slice: &[u8]) -> Result<()> { let nak: RegisterSuperNAK = serde_json::from_slice(slice)?; if nak.src_ip == eee.device_config.get_ip() { eee.set_authorized(false, Vec::new()); error!("unauthorized"); } else { eee.known_peers.delete_peer_with_ip(&nak.src_ip); eee.pending_peers.delete_peer_with_ip(&nak.src_ip); } Ok(()) } */ /* fn handle_packet_register_super_ack(eee: &Node, _cmn: Common<'_>, slice: &[u8]) -> Result<()> { debug!("handling REGISTER_SUPER_ACK"); let ack: RegisterSuperACK = serde_json::from_slice(slice)?; if ack.dev_addr.net_addr != 0 && ack.dev_addr.net_bit_len != 0 { // i'm authorized or moved; // eee.device eee.device_config .set_ip(ack.dev_addr.net_addr, ack.dev_addr.net_bit_len); debug!( "ip addr assigned: {}/{}", ip_to_string(&ack.dev_addr.net_addr), ack.dev_addr.net_bit_len ) } let Ok(private_key) = load_private_key_file(".client/id_rsa") else { error!("failed to load private key"); return Err(SDLanError::NormalError("failed to load private key")); }; let encrypt_key = rsa_decrypt(&private_key, &ack.encrypted_key)?; let header_key = rsa_decrypt(&private_key, &ack.header_key)?; eee.config .super_attempts .store(SUPER_ATTEMPTS_DEFAULT, Ordering::Relaxed); eee.stats .last_sup .store(get_current_timestamp(), Ordering::Relaxed); debug!("changed to Authorized"); eee.set_authorized(true, encrypt_key); eee.device.reload_config(&eee.device_config); eee.known_peers.clear(); eee.pending_peers.clear(); Ok(()) } */ /* fn handle_packet_register_super_acknowledge( eee: &Node, // cmn: Common<'_>, // slice: &[u8], ) -> Result<()> { debug!("handling REGISTER_SUPER_ACKNOWLEDGE"); // TODO: should check the common and the slice content. eee.config .super_attempts .store(SUPER_ATTEMPTS_DEFAULT, Ordering::Relaxed); eee.stats .last_sup .store(get_current_timestamp(), Ordering::Relaxed); Ok(()) } */ async fn handle_packet_register_ack( eee: &Node, // cmn: Common<'_>, body: &[u8], sender_sock: &SdlanSock, ) -> Result<()> { let Ok(ack) = SdlRegisterAck::decode(body) else { println!("failed to decode REGISTER_ACK"); return Ok(()); }; let origin_sender = sender_sock; debug!( "Rx REGISTER ACK from {} [{}] to {} via {}", ip_to_string(&ack.src_ip), origin_sender.to_string(), ip_to_string(&ack.dst_ip), sender_sock.to_string(), ); peer_set_p2p_confirmed(eee, ack.src_ip, sender_sock); Ok(()) } async fn handle_packet_register( eee: &Node, // cmn: Common<'_>, body: &[u8], from_sn: bool, sender_sock: &SdlanSock, ) -> Result<()> { if !eee.is_authorized() { error!("drop register due to not authed"); return Ok(()); } let Ok(reg) = SdlRegister::decode(body) else { error!("failed to decode REGISTER"); return Ok(()); }; let origin_sender = sender_sock; let via_multicast = is_multi_broadcast(reg.dst_ip); if via_multicast && reg.src_ip == eee.device_config.get_ip() { debug!("skipping register from self"); return Ok(()); } if !from_sn { info!("[P2P] Rx REGISTER from {}", sender_sock.to_string()); eee.pending_peers.delete_peer_with_ip(®.src_ip); send_register_ack(eee, origin_sender, ®).await; } else { info!( "[PsP] Rx REGISTER from {} [{}] to {} via {}", ip_to_string(®.src_ip), ip_to_string(®.dst_ip), sender_sock.to_string(), origin_sender.to_string(), ); } check_peer_registration_needed(eee, from_sn, reg.src_ip, &None, origin_sender).await; Ok(()) } /* async fn handle_packet_packet( eee: &Node, cmn: Common<'_>, body: &[u8], from_sn: bool, sender_sock: &SdlanSock, ) -> Result<()> { if eee.stats.last_sup.load(Ordering::Relaxed) == 0 { error!("dropping PACKET received before first registration with sn"); return Ok(()); } let has_sock = cmn.flags & config::SDLAN_FLAGS_SOCKET != 0; let has_v6 = cmn.flags & config::SDLAN_FLAGS_HAS_V6 != 0; let pkt = packet::Packet::unmarshal(body, has_sock, has_v6)?; // let mut orig_sender: &SdlanSock = sender_sock; // here, the origin sender ref should be checked // if let Some(ref sk) = pkt.sock { // orig_sender = sk; // } // println!("orig_sender: {:?}", orig_sender); let mut origin_sender = sender_sock; if let Some(ref k) = pkt.sock { origin_sender = k; } println!("orig_sender: {:?}", origin_sender); if !from_sn { // data from other peer debug!("[P2P] Rx data from {}", sender_sock.to_string()); eee.pending_peers.peers.remove(&pkt.src_ip); } else { // from sn, sock should not be None debug!( "[PsP] Rx data from {} (via {})", origin_sender.to_string(), sender_sock.to_string() ); } check_peer_registration_needed(eee, from_sn, pkt.src_ip, &pkt.v6_info, origin_sender).await; // handle_tun_packet(eee, from_sn, pkt).await; Ok(()) } */ pub async fn check_peer_registration_needed( eee: &Node, from_sn: bool, src_ip: u32, v6_info: &Option, peer_sock: &SdlanSock, ) { let mut p = eee.known_peers.get_peer(&src_ip); if let None = p { p = eee.known_peers.get_peer_by_sock(peer_sock); if let Some(ref k) = p { eee.known_peers.insert_peer(k.clone()); } } match p { None => { let _ = register_with_new_peer(eee, from_sn, src_ip, v6_info, peer_sock).await; // unimplemented!(); } Some(k) => { let now = get_current_timestamp(); if !from_sn { k.last_p2p.store(now, Ordering::Relaxed); } let last_seen = k.last_seen.load(Ordering::Relaxed); // more than 3 seconds if now - last_seen > 3 { check_known_peer_sock_change(eee, from_sn, src_ip, v6_info, peer_sock).await; } } } } async fn check_known_peer_sock_change( eee: &Node, from_sn: bool, ip: u32, v6_info: &Option, // dev_addr: &IpSubnet, peersock: &SdlanSock, ) { if is_multi_broadcast(ip) { return; } match eee.known_peers.get_peer(&ip) { Some(p) => { if !is_sdlan_sock_equal(&p.sock.read().unwrap(), peersock) { if !from_sn { info!( "peer changed: {}: {} -> {}", ip_to_string(&ip), &p.sock.read().unwrap().to_string(), peersock.to_string() ); eee.known_peers.delete_peer_with_ip(&ip); register_with_new_peer(eee, from_sn, ip, v6_info, peersock).await; } } else { // from sn, sn could see a different sock with us, just ignore it } } None => return, } } async fn register_with_new_peer( eee: &Node, from_sn: bool, ip: u32, v6_info: &Option, // dev_addr: &IpSubnet, peersock: &SdlanSock, ) { let now = get_current_timestamp(); let mut scan = eee.pending_peers.get_peer(&ip); if let None = scan { // such ip not found in pending let temp = Arc::new(EdgePeer::new( ip, eee.device_config.get_net_bit(), peersock, v6_info, now, )); debug!( "===> new pending {} => {}", ip_to_string(&ip), peersock.to_string(), ); eee.pending_peers.insert_peer(temp.clone()); scan = Some(temp); debug!("Pending size: {}", eee.pending_peers.peers.len()); if from_sn { // should send register to peer if eee.config.register_ttl == 1 { /* We are DMZ host or port is directly accessible. Just let peer to send back the ack */ } else if eee.config.register_ttl > 1 { let mut alter = 16; if let Ok(ttl) = eee.udp_sock_v4.ttl() { let mut temp = peersock.deepcopy(); send_register(eee, &temp, v6_info).await; let _ = eee.udp_sock_v4.set_ttl(eee.config.register_ttl as u32); while alter > 0 { temp.port += 1; send_register(eee, &temp, &None).await; alter -= 1; } let _ = eee.udp_sock_v4.set_ttl(ttl); } } else { // Normal STUN send_register(eee, peersock, v6_info).await; } // 发送给sn send_register( eee, &eee.config.super_nodes [eee.config.super_node_index.load(Ordering::Relaxed) as usize], &None, ) .await; } else { // P2P register, send directly send_register(eee, peersock, v6_info).await; } println!("register with local peers"); register_with_local_peers(eee).await; } else { if let Some(ref s) = scan { *(s.sock.write().unwrap()) = peersock.deepcopy(); } } if let Some(s) = scan { s.last_seen .store(get_current_timestamp(), Ordering::Relaxed); } } async fn register_with_local_peers(eee: &Node) { println!("register_with_local_peers"); if eee.config.allow_p2p { println!("send register to multicast sock"); send_register(eee, &eee.multicast_sock, &None).await; } } async fn send_register(eee: &Node, sock: &SdlanSock, _v6_info: &Option) { if !eee.config.allow_p2p { debug!("skipping register as p2p is disabled"); return; } let network_id = eee.network_id.load(Ordering::Relaxed); if network_id == 0 { error!("not authed"); return; } let register = SdlRegister { network_id: network_id, src_ip: eee.device_config.get_ip(), dst_ip: u32::from_be_bytes(sock.v4), }; let msg = encode_to_udp_message(Some(register), PacketType::Register as u8).unwrap(); println!("send register to {}", sock.to_string()); let _ = send_to_sock(eee, &msg, sock).await; /* let key = eee.get_header_key(); if key.len() > 0 { if let Ok(cnt) = encode_packet_encrypted(&cmn, &r, key.as_slice()) { let _ = send_to_sock_v4_and_v6(eee, &cnt, sock, v6_info).await; } } else { error!("not authed"); } */ } async fn handle_tun_packet( eee: &Node, from_sn: bool, pkt: SdlData, //orig_sender: &SdlanSock ) { let now = get_current_timestamp(); let payload = pkt.data; let key = eee.get_encrypt_key(); if key.len() == 0 { // check the encrypt key error!("packet encrypt key not provided"); return; } let origin = aes_decrypt(key.as_slice(), &payload); if let Err(_e) = origin { error!("failed to decrypt original data"); return; } let data = origin.unwrap(); let msg_size = data.len() as u64; if from_sn { if is_multi_broadcast(pkt.dst_ip) { eee.stats .rx_broadcast .fetch_add(msg_size, Ordering::Relaxed); } eee.stats.rx_sup.fetch_add(msg_size, Ordering::Relaxed); eee.stats.last_sup.store(now, Ordering::Relaxed); } else { eee.stats.rx_p2p.fetch_add(msg_size, Ordering::Relaxed); eee.stats.last_p2p.store(now, Ordering::Relaxed); } debug!("got packet from sock, will send to tun"); match IpHeaders::from_slice(&data) { Ok((iphdr, _)) => { if let Some(ipv4hdr) = iphdr.ipv4() { let dstip = u32::from_be_bytes(ipv4hdr.0.destination); if !is_multi_broadcast(dstip) && dstip != eee.device_config.get_ip() { // should not routed to me error!("should not routed to me"); return; } // packet should be sent to dev debug!("writing {} bytes to tun", data.len()); if let Err(e) = eee.device.send(&data) { error!("failed to write to tun: {}", e.to_string()); } } } Err(e) => { error!("failed to parse ip packet: {}", e.to_string()); } } } async fn send_register_ack(eee: &Node, orig_sender: &SdlanSock, reg: &SdlRegister) { if !eee.config.allow_p2p { debug!("Skipping REGISTER ACK as P2P is disallowed"); return; } let network_id = eee.network_id.load(Ordering::Relaxed); if network_id == 0 { error!("not authed"); return; } let ack = SdlRegisterAck { network_id, src_ip: eee.device_config.get_ip(), dst_ip: reg.src_ip, }; let Ok(ack) = encode_to_udp_message(Some(ack), PacketType::RegisterACK as u8) else { error!("failed to encode to udp message"); return; }; let _ = send_to_sock(eee, &ack, orig_sender).await; } fn peer_set_p2p_confirmed(eee: &Node, src_ip: u32, sender_sock: &SdlanSock) { let mut scan = eee.pending_peers.get_peer(&src_ip); if let None = scan { scan = eee.pending_peers.get_peer_by_sock(sender_sock); } if let None = scan { error!( "failed to find sender in pending peer: {}", sender_sock.to_string() ); return; } let mut scan = scan.unwrap(); eee.pending_peers.delete_peer_with_ip(&src_ip); match eee.known_peers.get_peer(&src_ip) { Some(scantmp) => { eee.known_peers.delete_peer_with_ip(&src_ip); scan = scantmp; scan.dev_addr.net_addr.store(src_ip, Ordering::Relaxed); scan.dev_addr .net_bit_len .store(eee.device_config.get_net_bit(), Ordering::Relaxed); } None => { *(scan.sock.write().unwrap()) = sender_sock.deepcopy(); } } let now = get_current_timestamp(); scan.last_p2p.store(now, Ordering::Relaxed); scan.last_seen.store(now, Ordering::Relaxed); let ip_string = ip_to_string(&src_ip); let sock_string = sender_sock.to_string(); info!( "P2P connection established: {} [{}]", &ip_string, &sock_string, ); debug!("==> new peer: {} -> {}", &ip_string, &sock_string,); eee.known_peers.insert_peer(scan); } pub async fn check_query_peer_info(eee: &Node, dst_ip: u32) { let scan: Arc; let now = get_current_timestamp(); match eee.pending_peers.get_peer(&dst_ip) { None => { let sock = SdlanSock { family: AF_INET, port: 0, v4: [0; 4], v6: [0; 16], }; let peer = Arc::new(EdgePeer::new( dst_ip, eee.device_config.get_net_bit(), &sock, &None, now, )); debug!("insert peer {} to pending", ip_to_string(&dst_ip)); eee.pending_peers.insert_peer(peer.clone()); scan = peer; } Some(s) => { scan = s; } } debug!( "now={}, last_sent_query={}, REGISTER_INTERVAL={}, scan={:?}", now, scan.last_sent_query.load(Ordering::Relaxed), REGISTER_INTERVAL, scan, ); if now - scan.last_sent_query.load(Ordering::Relaxed) > (REGISTER_INTERVAL as u64) { /* send_register( eee, &eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize], &None, ) .await; */ debug!("sending query for {}", ip_to_string(&dst_ip)); if let Ok(()) = send_query_peer(eee, dst_ip).await { scan.last_sent_query.store(now, Ordering::Relaxed); } } } async fn send_query_peer(eee: &Node, dst_ip: u32) -> Result<()> { let network_id = eee.network_id.load(Ordering::Relaxed); if network_id == 0 { error!("not authed"); return Err(SDLanError::NormalError("not connected")); } let query = SdlQueryInfo { dst_ip }; let Ok(content) = encode_to_tcp_message( Some(query), eee.get_next_packet_id(), PacketType::QueryInfo as u8, ) else { error!("failed to encode query"); return Err(SDLanError::NormalError("encode query error")); }; let tcp_conn = get_tcp_conn(); tcp_conn.send(&content).await } pub async fn ping_to_sn() { let Ok(msg) = encode_to_tcp_message::(None, 0, PacketType::Ping as u8) else { error!("failed to encode ping"); return; }; debug!("ping to sn"); let tcp_conn = get_tcp_conn(); if let Err(e) = tcp_conn.send(&msg).await { error!("failed to ping to sn: {:?}", e); } } /* pub async fn update_supernode_reg(eee: &Node) { let now = get_current_timestamp(); let authed = eee.is_authorized(); let last_reg = eee.last_register_req.load(Ordering::Relaxed); if !authed { if now > (last_reg + (REGISTER_INTERVAL as u64) / 10) { debug!("update supernode reg, fast retry"); } else { return; } } else if now < (last_reg + REGISTER_INTERVAL as u64) { return; } if eee.config.super_attempts.load(Ordering::Relaxed) == 0 { eee.config .super_attempts .store(SUPER_ATTEMPTS_DEFAULT, Ordering::Relaxed); error!("sup attempts = 0"); // next time, the supernode will use the new one let node_index = eee.config.super_node_index.fetch_add(1, Ordering::Relaxed); if node_index >= (eee.config.super_nodes.len() - 1) as u8 { eee.config.super_node_index.store(0, Ordering::Relaxed); } } else { eee.config.super_attempts.fetch_sub(1, Ordering::Relaxed); } if let Err(e) = eee.send_register_super().await { error!("failed to send register_super: {}", e.as_str()); } eee.last_register_req.store(now, Ordering::Relaxed); register_with_local_peers(eee).await; } */