diff --git a/src/network/node.rs b/src/network/node.rs index 867e4ec..f968039 100755 --- a/src/network/node.rs +++ b/src/network/node.rs @@ -236,10 +236,10 @@ pub struct Node { nat_type: Mutex, nat_cookie: AtomicU32, - // cookie_match: DashMap>, + + //cookie_match: DashMap>, cookie_match: Queryer, - // packet_id_match: Queryer, // packet_id_match: DashMap>, } @@ -247,18 +247,7 @@ unsafe impl Sync for Node {} impl Node { pub fn send_register_super_feedback(&self, pktid: u32, feed: RegisterSuperFeedback) { - // self.packet_id_match.write_feedback(pktid, feed); self.cookie_match.write_feedback(pktid, feed); - /* - match self.packet_id_match.remove(&pktid) { - Some(sender) => { - let _ = sender.1.send(feed); - } - None => { - return; - } - } - */ } pub fn get_nat_type(&self) -> NatType { @@ -336,9 +325,8 @@ impl Node { // *self.network_code.lock().unwrap() = network_code; let id = self.get_next_packet_id(); - // let result = self.packet_id_match.do_action_and_wait_for( - let result = self.cookie_match.do_action_and_wait_for( - id, + let res = self.cookie_match.do_action_and_wait_for( + id, || async { let _ = self .start_stop_sender @@ -348,40 +336,15 @@ impl Node { }) .await; debug!("start with feedback"); - }, + }, timeout ).await?; - if let Ok(data) = result.downcast() { - return Ok(*data); + if let Ok(res) = res.downcast() { + Ok(*res) + } else { + Err(SDLanError::ConvertError("failed to convert feedback to RSFeedback".to_owned())) } - Err(SDLanError::NormalError("timed out")) - /* - let (tx, rx) = oneshot::channel(); - self.packet_id_match.insert(id, tx); - let _ = self - .start_stop_sender - .send(StartStopInfo { - is_start: true, - pkt_id: Some(id), - }) - .await; - debug!("start with feedback"); - - tokio::select! { - rx_info = rx => { - if let Ok(result) = rx_info { - self.packet_id_match.remove(&id); - Ok(result) - } else { - Err(SDLanError::NormalError("rx closed")) - } - } - _ = tokio::time::sleep(timeout) => { - Err(SDLanError::NormalError("timed out")) - } - } - */ } pub async fn stop(&self) { @@ -485,7 +448,7 @@ impl Node { stats: NodeStats::new(), _last_register_req: AtomicU64::new(0), - // packet_id_match: Queryer::new(), + // packet_id_match: DashMap::new(), nat_cookie: AtomicU32::new(1), cookie_match: Queryer::new(), server_ip, @@ -570,13 +533,6 @@ impl Node { pub async fn send_nat_probe_reply(&self, cookie: u32, buf: SdlStunProbeReply) { self.cookie_match.write_feedback(cookie, buf); - /* - if let Some((_key, chan)) = self.cookie_match.remove(&cookie) { - let _ = chan.send(buf); - return; - } - */ - // error!("failed to get such cookie stun probe"); } pub async fn probe_nat_type(&self) -> NatType { @@ -654,30 +610,19 @@ impl Node { to_server: &SocketAddr, ) -> Result { let cookie = self.nat_cookie.fetch_add(1, Ordering::Relaxed); - // println!("==> sending probe request: {:?}", probe); + let probe = SdlStunProbe { + attr: msgattr as u32, + cookie, + step: 0, + }; - let res = self.cookie_match.do_action_and_wait_for( - cookie, - || async { - let probe = SdlStunProbe { - attr: msgattr as u32, - cookie, - step: 0, - }; - let msg = encode_to_udp_message(Some(probe), PacketType::StunProbe as u8).unwrap(); - if let Err(_e) = self.udp_sock_v4.send_to(&msg, to_server).await { - error!("failed to send StunProbe"); - } - }, - Duration::from_secs(3), - ).await?; - if let Ok(data) = res.downcast() { - return Ok(*data); + let result = self.cookie_match.send_message_to_udp_and_wait_for(&self.udp_sock_v4, cookie, probe, PacketType::StunProbe as u8, to_server, Duration::from_secs(3)).await?; + + if let Ok(res) = result.downcast() { + return Ok(*res); } - - - Err(SDLanError::NormalError("reply recv error")) - // step 1 received + return Err(SDLanError::ConvertError("failed to convert to StunprobeReply".to_owned())) + // println!("==> sending probe request: {:?}", probe); } } @@ -893,9 +838,6 @@ impl Queryer { return Err(SDLanError::NormalError("send error")); } - let quic_conn = get_quic_write_conn(); - quic_conn.send(content).await?; - tokio::select! { data = rx => { if let Ok(data) = data {