merge cookie_match and packet_id_match to one

This commit is contained in:
alex 2026-03-26 19:58:47 +08:00
parent be401afc7b
commit caf62fe079

View File

@ -236,10 +236,10 @@ pub struct Node {
nat_type: Mutex<NatType>, nat_type: Mutex<NatType>,
nat_cookie: AtomicU32, nat_cookie: AtomicU32,
// cookie_match: DashMap<u32, oneshot::Sender<SdlStunProbeReply>>,
//cookie_match: DashMap<u32, oneshot::Sender<SdlStunProbeReply>>,
cookie_match: Queryer, cookie_match: Queryer,
// packet_id_match: Queryer,
// packet_id_match: DashMap<u32, oneshot::Sender<RegisterSuperFeedback>>, // packet_id_match: DashMap<u32, oneshot::Sender<RegisterSuperFeedback>>,
} }
@ -247,18 +247,7 @@ unsafe impl Sync for Node {}
impl Node { impl Node {
pub fn send_register_super_feedback(&self, pktid: u32, feed: RegisterSuperFeedback) { pub fn send_register_super_feedback(&self, pktid: u32, feed: RegisterSuperFeedback) {
// self.packet_id_match.write_feedback(pktid, feed);
self.cookie_match.write_feedback(pktid, feed); self.cookie_match.write_feedback(pktid, feed);
/*
match self.packet_id_match.remove(&pktid) {
Some(sender) => {
let _ = sender.1.send(feed);
}
None => {
return;
}
}
*/
} }
pub fn get_nat_type(&self) -> NatType { pub fn get_nat_type(&self) -> NatType {
@ -336,9 +325,8 @@ impl Node {
// *self.network_code.lock().unwrap() = network_code; // *self.network_code.lock().unwrap() = network_code;
let id = self.get_next_packet_id(); let id = self.get_next_packet_id();
// let result = self.packet_id_match.do_action_and_wait_for( let res = self.cookie_match.do_action_and_wait_for(
let result = self.cookie_match.do_action_and_wait_for( id,
id,
|| async { || async {
let _ = self let _ = self
.start_stop_sender .start_stop_sender
@ -348,40 +336,15 @@ impl Node {
}) })
.await; .await;
debug!("start with feedback"); debug!("start with feedback");
}, },
timeout timeout
).await?; ).await?;
if let Ok(data) = result.downcast() { if let Ok(res) = res.downcast() {
return Ok(*data); Ok(*res)
} else {
Err(SDLanError::ConvertError("failed to convert feedback to RSFeedback".to_owned()))
} }
Err(SDLanError::NormalError("timed out"))
/*
let (tx, rx) = oneshot::channel();
self.packet_id_match.insert(id, tx);
let _ = self
.start_stop_sender
.send(StartStopInfo {
is_start: true,
pkt_id: Some(id),
})
.await;
debug!("start with feedback");
tokio::select! {
rx_info = rx => {
if let Ok(result) = rx_info {
self.packet_id_match.remove(&id);
Ok(result)
} else {
Err(SDLanError::NormalError("rx closed"))
}
}
_ = tokio::time::sleep(timeout) => {
Err(SDLanError::NormalError("timed out"))
}
}
*/
} }
pub async fn stop(&self) { pub async fn stop(&self) {
@ -485,7 +448,7 @@ impl Node {
stats: NodeStats::new(), stats: NodeStats::new(),
_last_register_req: AtomicU64::new(0), _last_register_req: AtomicU64::new(0),
// packet_id_match: Queryer::new(), // packet_id_match: DashMap::new(),
nat_cookie: AtomicU32::new(1), nat_cookie: AtomicU32::new(1),
cookie_match: Queryer::new(), cookie_match: Queryer::new(),
server_ip, server_ip,
@ -570,13 +533,6 @@ impl Node {
pub async fn send_nat_probe_reply(&self, cookie: u32, buf: SdlStunProbeReply) { pub async fn send_nat_probe_reply(&self, cookie: u32, buf: SdlStunProbeReply) {
self.cookie_match.write_feedback(cookie, buf); self.cookie_match.write_feedback(cookie, buf);
/*
if let Some((_key, chan)) = self.cookie_match.remove(&cookie) {
let _ = chan.send(buf);
return;
}
*/
// error!("failed to get such cookie stun probe");
} }
pub async fn probe_nat_type(&self) -> NatType { pub async fn probe_nat_type(&self) -> NatType {
@ -654,30 +610,19 @@ impl Node {
to_server: &SocketAddr, to_server: &SocketAddr,
) -> Result<SdlStunProbeReply> { ) -> Result<SdlStunProbeReply> {
let cookie = self.nat_cookie.fetch_add(1, Ordering::Relaxed); let cookie = self.nat_cookie.fetch_add(1, Ordering::Relaxed);
// println!("==> sending probe request: {:?}", probe); let probe = SdlStunProbe {
attr: msgattr as u32,
cookie,
step: 0,
};
let res = self.cookie_match.do_action_and_wait_for( let result = self.cookie_match.send_message_to_udp_and_wait_for(&self.udp_sock_v4, cookie, probe, PacketType::StunProbe as u8, to_server, Duration::from_secs(3)).await?;
cookie,
|| async { if let Ok(res) = result.downcast() {
let probe = SdlStunProbe { return Ok(*res);
attr: msgattr as u32,
cookie,
step: 0,
};
let msg = encode_to_udp_message(Some(probe), PacketType::StunProbe as u8).unwrap();
if let Err(_e) = self.udp_sock_v4.send_to(&msg, to_server).await {
error!("failed to send StunProbe");
}
},
Duration::from_secs(3),
).await?;
if let Ok(data) = res.downcast() {
return Ok(*data);
} }
return Err(SDLanError::ConvertError("failed to convert to StunprobeReply".to_owned()))
// println!("==> sending probe request: {:?}", probe);
Err(SDLanError::NormalError("reply recv error"))
// step 1 received
} }
} }
@ -893,9 +838,6 @@ impl Queryer {
return Err(SDLanError::NormalError("send error")); return Err(SDLanError::NormalError("send error"));
} }
let quic_conn = get_quic_write_conn();
quic_conn.send(content).await?;
tokio::select! { tokio::select! {
data = rx => { data = rx => {
if let Ok(data) = data { if let Ok(data) = data {