message downcast ok

This commit is contained in:
alex 2026-03-26 20:40:42 +08:00
parent e2d5b465f7
commit 2cff146f8c
3 changed files with 79 additions and 42 deletions

View File

@ -13,14 +13,13 @@ use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration; use std::time::Duration;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot; use tracing::{debug, error, warn};
use tracing::{debug, error};
use crate::network::RouteTable2; use crate::network::{ARP_REPLY, ArpHdr, EthHdr, RouteTable2};
use crate::quic::quic_init; use crate::quic::quic_init;
use crate::{ConnectionInfo, MyEncryptor, RuleCache, get_base_dir}; use crate::{ConnectionInfo, MyEncryptor, RuleCache, get_base_dir};
use crate::pb::{ use crate::pb::{
encode_to_tcp_message, encode_to_udp_message, SdlEmpty, SdlStunProbe, SdlStunProbeReply, SdlArpRequest, SdlArpResponse, SdlEmpty, SdlStunProbe, SdlStunProbeReply, encode_to_tcp_message, encode_to_udp_message
}; };
use crate::tcp::{NatType, PacketType, StunProbeAttr, get_quic_write_conn}; use crate::tcp::{NatType, PacketType, StunProbeAttr, get_quic_write_conn};
use crate::utils::{Socket}; use crate::utils::{Socket};
@ -33,7 +32,7 @@ use super::device::{DeviceConfig, Mode};
use super::tun::{new_iface, Iface}; use super::tun::{new_iface, Iface};
use tokio::fs::File; use tokio::fs::File;
use sdlan_sn_rs::utils::{Mac, gen_rsa_keys, load_private_key_file, save_to_file}; use sdlan_sn_rs::utils::{Mac, gen_rsa_keys, ip_to_string, load_private_key_file, save_to_file};
use sdlan_sn_rs::utils::{Result, SDLanError}; use sdlan_sn_rs::utils::{Result, SDLanError};
static EDGE: OnceCell<Node> = OnceCell::new(); static EDGE: OnceCell<Node> = OnceCell::new();
@ -238,7 +237,7 @@ pub struct Node {
nat_cookie: AtomicU32, nat_cookie: AtomicU32,
//cookie_match: DashMap<u32, oneshot::Sender<SdlStunProbeReply>>, //cookie_match: DashMap<u32, oneshot::Sender<SdlStunProbeReply>>,
cookie_match: Queryer, pub cookie_match: Queryer,
// packet_id_match: DashMap<u32, oneshot::Sender<RegisterSuperFeedback>>, // packet_id_match: DashMap<u32, oneshot::Sender<RegisterSuperFeedback>>,
} }
@ -247,7 +246,7 @@ unsafe impl Sync for Node {}
impl Node { impl Node {
pub fn send_register_super_feedback(&self, pktid: u32, feed: RegisterSuperFeedback) { pub fn send_register_super_feedback(&self, pktid: u32, feed: RegisterSuperFeedback) {
self.cookie_match.write_feedback(pktid, feed); self.cookie_match.write_feedback(pktid, Box::new(feed));
} }
pub fn get_nat_type(&self) -> NatType { pub fn get_nat_type(&self) -> NatType {
@ -531,8 +530,73 @@ impl Node {
} }
*/ */
pub async fn send_and_wait_for_arp_reply(&self, gw_ip: u32, real_ip: u32) -> Result<()>{
let id = self.get_next_packet_id();
let res = self.cookie_match.do_action_and_wait_for(
id,
|| async {
let arp_request = SdlArpRequest {
pkt_id: id,
target_ip: gw_ip,
};
warn!("arp request: {:?}", arp_request);
let msg = encode_to_tcp_message(Some(arp_request), PacketType::ArpRequest as u8).unwrap();
let conn = get_quic_write_conn();
let _ = conn.send(msg).await;
},
Duration::from_secs(3)).await?;
match res.downcast() {
Ok(res) => {
let res: SdlArpResponse = *res;
debug!("got arp response: {:?}", res);
if res.target_mac.len() != 6 {
// invalid target_mac
error!("invalid target_mac: {:?}, ip={}", res.target_mac, ip_to_string(&res.target_ip));
return Ok(());
}
// TODO: construct the arp reply, and write to tun;
let src_mac = res.target_mac.try_into().unwrap();
let dst_mac = self.device_config.get_mac();
let dst_ip = self.device_config.get_ip();
let hdr = ArpHdr{
ethhdr: EthHdr {
dest: dst_mac,
src: src_mac,
eth_type: 0x0806,
},
hwtype: 0x0001,
protocol: 0x0800,
hwlen: 6,
protolen: 4,
opcode: ARP_REPLY,
shwaddr: src_mac,
sipaddr: [((real_ip >> 16) as u16) & 0xffff, (real_ip as u16) & 0xffff],
dhwaddr: dst_mac,
dipaddr: [((dst_ip >> 16) & 0x0000ffff) as u16, (dst_ip & 0x0000ffff) as u16]
};
let data = hdr.marshal_to_bytes();
if let Err(_e) = self.device.send(&data) {
error!("failed to write arp response to device");
}
Ok(())
}
Err(e ) => {
error!("failed to convert to ArpResponse: {:?}", e);
Err(SDLanError::ConvertError("failed to convert to ArpResponse".to_owned()))
}
}
}
pub async fn send_nat_probe_reply(&self, cookie: u32, buf: SdlStunProbeReply) { pub async fn send_nat_probe_reply(&self, cookie: u32, buf: SdlStunProbeReply) {
self.cookie_match.write_feedback(cookie, buf); self.cookie_match.write_feedback(cookie, Box::new(buf));
} }
pub async fn probe_nat_type(&self) -> NatType { pub async fn probe_nat_type(&self) -> NatType {
@ -819,9 +883,9 @@ impl Queryer {
} }
} }
pub fn write_feedback<T: Any + Sync + Send + 'static>(&self, id: u32, data: T) { pub fn write_feedback(&self, id: u32, data: BoxedProstMessage) {
if let Some((_, tx)) = self.mailbox.remove(&id) { if let Some((_, tx)) = self.mailbox.remove(&id) {
if let Err(e) = tx.send(Box::new(data)) { if let Err(e) = tx.send(data) {
error!("failed to write feedback"); error!("failed to write feedback");
} }
} }

View File

@ -240,6 +240,8 @@ impl TunTapPacketHandler for Iface {
use crate::{network::ArpRequest, pb::{SdlArpRequest, encode_to_tcp_message}, tcp::get_quic_write_conn}; use crate::{network::ArpRequest, pb::{SdlArpRequest, encode_to_tcp_message}, tcp::get_quic_write_conn};
let dest_ip = ((arp.dipaddr[0] as u32) << 16) + arp.dipaddr[1] as u32; let dest_ip = ((arp.dipaddr[0] as u32) << 16) + arp.dipaddr[1] as u32;
edge.send_and_wait_for_arp_reply(dest_ip, dest_ip).await;
/*
let request = SdlArpRequest { let request = SdlArpRequest {
pkt_id: edge.get_next_packet_id(), pkt_id: edge.get_next_packet_id(),
target_ip: dest_ip, target_ip: dest_ip,
@ -249,6 +251,7 @@ impl TunTapPacketHandler for Iface {
let conn = get_quic_write_conn(); let conn = get_quic_write_conn();
debug!("sending arp request"); debug!("sending arp request");
let _ = conn.send(req).await; let _ = conn.send(req).await;
*/
return Ok(()); return Ok(());
} }
_other => { _other => {

View File

@ -199,39 +199,9 @@ async fn handle_tcp_message(msg: SdlanTcp) {
error!("failed to decode ARP RESPONSE"); error!("failed to decode ARP RESPONSE");
return; return;
}; };
debug!("got arp response: {:?}", resp);
if resp.target_mac.len() != 6 {
// invalid target_mac
error!("invalid target_mac: {:?}, ip={}", resp.target_mac, ip_to_string(&resp.target_ip));
return;
}
// TODO: construct the arp reply, and write to tun; warn!("got arp response: {:?}", resp);
let src_mac = resp.target_mac.try_into().unwrap(); edge.cookie_match.write_feedback(resp.pkt_id, Box::new(resp));
let dst_mac = edge.device_config.get_mac();
let dst_ip = edge.device_config.get_ip();
let hdr = ArpHdr{
ethhdr: EthHdr {
dest: dst_mac,
src: src_mac,
eth_type: 0x0806,
},
hwtype: 0x0001,
protocol: 0x0800,
hwlen: 6,
protolen: 4,
opcode: ARP_REPLY,
shwaddr: src_mac,
sipaddr: [((resp.target_ip >> 16) as u16) & 0xffff, (resp.target_ip as u16) & 0xffff],
dhwaddr: dst_mac,
dipaddr: [((dst_ip >> 16) & 0x0000ffff) as u16, (dst_ip & 0x0000ffff) as u16]
};
let data = hdr.marshal_to_bytes();
if let Err(_e) = edge.device.send(&data) {
error!("failed to write arp response to device");
}
} }
PacketType::PolicyReply => { PacketType::PolicyReply => {
let Ok(policy) = SdlPolicyResponse::decode(&msg.current_packet[..]) else { let Ok(policy) = SdlPolicyResponse::decode(&msg.current_packet[..]) else {