941 lines
26 KiB
Rust
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use arc_swap::ArcSwap;
use dashmap::DashMap;
use prost::Message;
use quinn::Endpoint;
use rsa::RsaPrivateKey;
use sdlan_sn_rs::config::{AF_INET, AF_INET6};
use tokio::net::UdpSocket;
use std::any::Any;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicU8, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc::Sender;
use tracing::{debug, error, warn};
use crate::network::{ArpTable, RouteTable2};
use crate::quic::quic_init;
use crate::{CommandLine, ConnectionInfo, MyEncryptor, RuleCache, get_base_dir};
use crate::pb::{
SdlArpRequest, SdlEmpty, SdlStunProbe, SdlStunProbeReply, encode_to_tcp_message, encode_to_udp_message
};
use crate::tcp::{NatType, PacketType, StunProbeAttr, get_quic_write_conn};
use crate::utils::{Socket};
use sdlan_sn_rs::peer::{IpSubnet, V6Info};
use once_cell::sync::OnceCell;
use super::device::{DeviceConfig, Mode};
use super::tun::{new_iface, Iface};
use tokio::fs::File;
use sdlan_sn_rs::utils::{Mac, gen_rsa_keys, load_private_key_file, save_to_file};
use sdlan_sn_rs::utils::{Result, SDLanError};
static EDGE: OnceCell<Node> = OnceCell::new();
pub async fn init_edge(
// token: &str,
// network_code: &str,
args: &CommandLine,
mac: Mac,
node_conf: NodeConfig,
// tos: u32,
start_stop: Sender<StartStopInfo>,
// mtu: u32,
connecting_chan: Option<Sender<ConnectionInfo>>,
udpsock_for_dns: Arc<UdpSocket>,
hostname: String,
server_ip: String,
install_channel: String
) -> Result<()> {
// gen public key
let rsa_path = format!("{}/.client", get_base_dir());
gen_rsa_keys(&rsa_path);
let mut pubkey = String::new();
// File::open(".client/id_rsa.pub")
File::open(&format!("{}/id_rsa.pub", rsa_path))
.await?
.read_to_string(&mut pubkey)
.await?;
let privatekey = load_private_key_file(&format!("{}/id_rsa", rsa_path))?;
// init sock
// let edge_uuid = create_or_load_uuid("")?;
//let node_conf = parse_config(edge_uuid, &args).await?;
let Ok(sock_v4) = Socket::build(node_conf._local_port, true, false, args.tos).await else {
panic!("failed to build sock for sock v4");
};
let mut sock_multicast = None;
if !node_conf._drop_multicast {
sock_multicast = Some(Socket::build(MULTICAST_PORT, true, true, 0).await?);
}
// let sock_multicast = Socket::build(MULTICAST_PORT, true, true, 0).await?;
// allow multicast
// TODO: set the sn's tcp socket
// let tcpsock = TCPSocket::build("121.4.79.234:1234").await?;
let tcp_pong = Arc::new(AtomicU64::new(0));
let edge = Node::new(
mac,
pubkey,
node_conf,
sock_v4,
sock_multicast,
// token,
// network_code,
privatekey,
tcp_pong.clone(),
start_stop,
args.mtu,
connecting_chan,
hostname,
udpsock_for_dns,
server_ip,
install_channel,
);
edge.route_table.parse_and_add_route(&args.route_file, &args.route_str);
do_init_edge(edge)?;
Ok(())
}
fn do_init_edge(edge: Node) -> Result<()> {
if let Err(_) = EDGE.set(edge) {
return Err(SDLanError::NormalError("initialize sn error"));
}
Ok(())
}
pub fn get_edge() -> &'static Node {
let edge = EDGE.get();
if edge.is_none() {
panic!("should init_edge first");
}
edge.unwrap()
}
pub struct RegisterSuperFeedback {
// 0 for success, other for error
pub result: u8,
pub message: String,
pub should_exit: bool,
}
pub struct StartStopInfo {
pub is_start: bool,
pub pkt_id: Option<u32>,
}
pub struct StringToken<T>(RwLock<T>);
impl <T: Clone> StringToken<T> {
pub fn new(value: T) -> Self {
Self(RwLock::new(value))
}
pub fn get(&self) -> T {
self.0.read().unwrap().clone()
}
pub fn set(&self, token: T) {
*self.0.write().unwrap() = token;
}
}
pub struct IdentityID(AtomicU32);
impl IdentityID {
pub fn new(id: u32) -> Self {
Self(AtomicU32::new(id))
}
pub fn load(&self) -> u32 {
self.0.load(Ordering::Relaxed)
}
pub fn store(&self, id: u32) {
self.0.store(id, Ordering::Relaxed);
}
}
pub struct Node {
packet_id: AtomicU32,
pub encryptor: ArcSwap<MyEncryptor>,
pub network_id: AtomicU32,
pub network_domain: RwLock<String>,
pub identity_id: IdentityID,
// rule is identity-to-identity ok
pub rule_cache: RuleCache,
// route_table stores the routes
pub route_table: RouteTable2,
pub access_token: StringToken<String>,
pub session_token: StringToken<Vec<u8>>,
pub hostname: RwLock<String>,
pub quic_endpoint: Endpoint,
pub udp_sock_for_dns: Arc<UdpSocket>,
pub server_ip: String,
pub tcp_pong: Arc<AtomicU64>,
start_stop_sender: Sender<StartStopInfo>,
pub connection_chan: Option<Sender<ConnectionInfo>>,
// user token info
// pub _token: Mutex<String>,
// pub network_code: Mutex<String>,
pub device_config: DeviceConfig,
pub device: Iface,
// authorize related
pub authorized: AtomicBool,
// pub header_key: RwLock<Arc<Vec<u8>>>,
// pub encrypt_key: RwLock<Arc<Vec<u8>>>,
pub rsa_pubkey: String,
pub rsa_private: RsaPrivateKey,
pub config: NodeConfig,
// pub super_node: Vec<Peer>,
// pub super_attempts: AtomicU8,
// store pending, and known peers
pub pending_peers: PeerMap,
#[cfg(feature = "tun")]
pub arp_table: ArpTable,
pub known_peers: PeerMap,
// pub tcp_sock_v4: TCPSocket,
pub udp_sock_multicast: Option<Socket>,
pub udp_sock_v4: Socket,
pub outer_ip_v4: AtomicU32,
pub udp_sock_v6: RwLock<Option<Arc<Socket>>>,
pub ipv6: RwLock<Option<SdlanSock>>,
pub multicast_sock: SdlanSock,
pub _local_v6: RwLock<Option<[u8; 16]>>,
pub stats: NodeStats,
// last register super time, in unix
pub _last_register_req: AtomicU64,
pub install_channel: String,
nat_type: Mutex<NatType>,
nat_cookie: AtomicU32,
//cookie_match: DashMap<u32, oneshot::Sender<SdlStunProbeReply>>,
pub cookie_match: Queryer,
// packet_id_match: DashMap<u32, oneshot::Sender<RegisterSuperFeedback>>,
}
unsafe impl Sync for Node {}
impl Node {
pub fn send_register_super_feedback(&self, pktid: u32, feed: RegisterSuperFeedback) {
self.cookie_match.write_feedback(pktid, Box::new(feed));
}
pub fn get_nat_type(&self) -> NatType {
self.nat_type.lock().unwrap().clone()
}
pub async fn start_without_feedback(
&self,
access_token: String,
network_id: u32,
network_domain: &String,
ip_net: u32,
ip_net_bit_len: u8,
identity_id: u32,
hostname: Option<String>
) -> Result<()> {
if let Some(host) = hostname {
let idfile = format!("{}/.host", get_base_dir());
let _ = save_to_file(&idfile, &host);
*self.hostname.write().unwrap() = host;
}
self.access_token.set(access_token);
if let Err(_e) = self.device_config.set_ip(ip_net, ip_net_bit_len) {
error!("failed to set ip");
}
// self.device_config.ip.net_addr.store(ip_net, Ordering::Relaxed);
// self.device_config.ip.net_bit_len.store(ip_net_bit_len, Ordering::Relaxed);
// *self.device_config.mac.write().unwrap() = create_or_load_mac();
self.network_id.store(network_id, Ordering::Relaxed);
self.network_domain.write().unwrap().clone_from(network_domain);
// self.network_domain = network_domain;
self.identity_id.store(identity_id);
// *self._token.lock().unwrap() = token;
// *self.network_code.lock().unwrap() = network_code;
let _ = self
.start_stop_sender
.send(StartStopInfo {
is_start: true,
pkt_id: None,
})
.await;
Ok(())
}
pub async fn start_with_feedback(
&self,
access_token: String,
network_id: u32,
network_domain: &String,
ip_net: u32,
ip_net_bit_len: u8,
identity_id: u32,
// token: String,
// network_code: String,
hostname: Option<String>,
timeout: Duration,
) -> Result<RegisterSuperFeedback> {
if let Some(host) = hostname {
let idfile = format!("{}/.host", get_base_dir());
let _ = save_to_file(&idfile, &host);
*self.hostname.write().unwrap() = host;
}
self.access_token.set(access_token);
if let Err(_e) = self.device_config.set_ip(ip_net, ip_net_bit_len) {
error!("failed to set ip");
}
// *self.device_config.mac.write().unwrap() = create_or_load_mac();
self.network_id.store(network_id, Ordering::Relaxed);
self.network_domain.write().unwrap().clone_from(network_domain);
self.identity_id.store(identity_id);
// *self._token.lock().unwrap() = token;
// *self.network_code.lock().unwrap() = network_code;
let id = self.get_next_packet_id();
let res = self.cookie_match.do_action_and_wait_for(
0,
|| async {
let _ = self
.start_stop_sender
.send(StartStopInfo {
is_start: true,
pkt_id: Some(id),
})
.await;
debug!("start with feedback");
},
timeout
).await?;
if let Ok(res) = res.downcast() {
Ok(*res)
} else {
Err(SDLanError::ConvertError("failed to convert feedback to RSFeedback".to_owned()))
}
}
pub async fn stop(&self) {
// *self._token.lock().unwrap() = "".to_owned();
self.session_token.set(vec![]);
let _ = self
.start_stop_sender
.send(StartStopInfo {
is_start: false,
pkt_id: None,
})
.await;
}
pub fn new(
mac: Mac,
pubkey: String,
config: NodeConfig,
sock: Socket,
multicast_sock: Option<Socket>,
// tcpsock: TCPSocket,
// token: &str,
// network_code: &str,
private: RsaPrivateKey,
tcp_pong: Arc<AtomicU64>,
start_stop: Sender<StartStopInfo>,
mtu: u32,
connecting_chan: Option<Sender<ConnectionInfo>>,
hostname: String,
udpsock_for_dns: Arc<UdpSocket>,
server_ip: String,
install_channel: String,
) -> Self {
let mode = if cfg!(not(feature = "tun")) {
Mode::Tap
} else {
Mode::Tun
};
Self {
#[cfg(feature = "tun")]
arp_table: ArpTable::new(),
packet_id: AtomicU32::new(1),
encryptor: ArcSwap::from(Arc::new(MyEncryptor::new())),
// encryptor: RwLock::new(MyEncryptor::new()),
network_id: AtomicU32::new(0),
hostname: RwLock::new(hostname),
rule_cache: RuleCache::new(),
route_table: RouteTable2::new(),
network_domain: RwLock::new(String::new()),
udp_sock_for_dns: udpsock_for_dns,
quic_endpoint: quic_init(),
identity_id: IdentityID::new(0),
access_token: StringToken::new(String::new()),
session_token: StringToken::new(Vec::new()),
// _token: Mutex::new(token.to_owned()),
// network_code: Mutex::new(network_code.to_owned()),
start_stop_sender: start_stop,
connection_chan: connecting_chan,
tcp_pong,
nat_type: Mutex::new(NatType::Blocked),
device_config: DeviceConfig::new(mac, mtu),
device: new_iface("dev", mode),
authorized: AtomicBool::new(false),
// encrypt_key: RwLock::new(Arc::new(Vec::new())),
// rsa_pubkey:
rsa_pubkey: pubkey,
rsa_private: private,
config,
// super_node: Vec::new(),
// super_attempts: AtomicU8::new(0),
pending_peers: PeerMap::new(),
known_peers: PeerMap::new(),
// tcp_sock_v4: tcpsock,
udp_sock_multicast: multicast_sock,
udp_sock_v4: sock,
outer_ip_v4: AtomicU32::new(0),
udp_sock_v6: RwLock::new(None),
ipv6: RwLock::new(None),
multicast_sock: SdlanSock {
family: AF_INET,
port: config::MULTICAST_PORT,
v4: config::MULITCAST_V4,
v6: [0; 16],
},
_local_v6: RwLock::new(None),
stats: NodeStats::new(),
_last_register_req: AtomicU64::new(0),
// packet_id_match: DashMap::new(),
nat_cookie: AtomicU32::new(1),
cookie_match: Queryer::new(),
server_ip,
install_channel,
}
}
pub fn get_next_packet_id(&self) -> u32 {
self.packet_id.fetch_add(1, Ordering::Relaxed)
}
pub fn is_authorized(&self) -> bool {
// self.header_key
self.authorized.load(Ordering::Relaxed)
}
pub fn set_authorized(&self, authorized: bool) {
self.authorized.store(authorized, Ordering::Relaxed);
// *(self.encrypt_key.write().unwrap()) = Arc::new(encrypt_key);
}
/*
pub fn get_header_key(&self) -> Arc<Vec<u8>> {
// self.header_key.read().unwrap().clone()
}
*/
/*
pub fn get_encrypt_key(&self) -> Arc<Vec<u8>> {
self.encrypt_key.read().unwrap().clone()
}
*/
/*
pub fn sn_is_known(&self, sock: &SdlanSock) -> bool {
for sn in self.config.super_nodes.iter() {
if sn.family != sock.family || sn.port != sock.port {
continue;
}
if sn.family == AF_INET && sn.v4 == sock.v4 {
return true;
}
if sn.family == AF_INET6 && sn.v6 == sock.v6 {
return true;
}
}
return false;
}
*/
pub fn _remove_v6(&self) {
*(self.udp_sock_v6.write().unwrap()) = None;
}
/*
pub async fn send_to_v4<A: ToSocketAddrs>(&self, info: &[u8], target: A) -> Result<usize> {
match self.udp_sock_v4.send_to(info, target).await {
Ok(n) => Ok(n),
Err(e) => {
println!("failed to send");
Err(SDLanError::NormalError("failed to send"))
}
}
}
*/
/*
pub async fn send_to_v6<A: ToSocketAddrs>(&self, info: &[u8], target: A) -> Result<usize> {
let m = self.udp_sock_v6.read().unwrap().clone();
if let Some(ref l) = m.as_ref() {
match l.send_to(info, target).await {
Err(e) => {
return Err(SDLanError::NormalError("send error"));
}
Ok(n) => return Ok(n),
}
}
Err(SDLanError::NormalError("no udp6 conn is bined"))
}
*/
pub async fn send_arp_request(&self, gw_ip: u32, real_ip: u32) -> Result<()>{
let arp_request = SdlArpRequest {
// pkt_id: id,
target_ip: gw_ip,
origin_ip: real_ip,
context: Vec::new(),
};
let msg = encode_to_tcp_message(Some(arp_request), PacketType::ArpRequest as u8).unwrap();
let conn = get_quic_write_conn();
let _ = conn.send(msg).await;
Ok(())
}
pub async fn send_nat_probe_reply(&self, cookie: u32, buf: SdlStunProbeReply) {
self.cookie_match.write_feedback(cookie, Box::new(buf));
}
pub async fn probe_nat_type(&self) -> NatType {
/*
if !self.is_authorized() {
return None;
}
*/
let Ok(reply1) = self
.send_and_wait_for_probe_reply(StunProbeAttr::None, &self.config.nat_server1)
.await
else {
*self.nat_type.lock().unwrap() = NatType::Blocked;
return NatType::Blocked;
};
if reply1.ip == self.outer_ip_v4.load(Ordering::Relaxed) {
let Ok(_reply2) = self
.send_and_wait_for_probe_reply(StunProbeAttr::Peer, &self.config.nat_server1)
.await
else {
*self.nat_type.lock().unwrap() = NatType::Symmetric;
return NatType::Symmetric;
};
*self.nat_type.lock().unwrap() = NatType::NoNat;
return NatType::NoNat;
}
if let Ok(_reply2_2) = self
.send_and_wait_for_probe_reply(StunProbeAttr::Peer, &self.config.nat_server1)
.await
{
*self.nat_type.lock().unwrap() = NatType::FullCone;
return NatType::FullCone;
}
let Ok(reply3) = self
.send_and_wait_for_probe_reply(StunProbeAttr::None, &self.config.nat_server2)
.await
else {
*self.nat_type.lock().unwrap() = NatType::Blocked;
return NatType::Blocked;
};
if reply3.ip != reply1.ip || reply3.port != reply1.port {
*self.nat_type.lock().unwrap() = NatType::Symmetric;
return NatType::Symmetric;
}
if let Ok(_reply4) = self
.send_and_wait_for_probe_reply(StunProbeAttr::Port, &self.config.nat_server1)
.await
{
*self.nat_type.lock().unwrap() = NatType::ConeRestricted;
return NatType::ConeRestricted;
} else {
*self.nat_type.lock().unwrap() = NatType::PortRestricted;
return NatType::PortRestricted;
}
}
pub async fn send_unregister_super(&self) -> Result<()> {
let content =
encode_to_tcp_message::<SdlEmpty>(None, PacketType::UnRegisterSuper as u8).unwrap();
let conn = get_quic_write_conn();
let _ = conn.send(content).await;
Ok(())
}
async fn send_and_wait_for_probe_reply(
&self,
msgattr: StunProbeAttr,
to_server: &SocketAddr,
) -> Result<SdlStunProbeReply> {
let cookie = self.nat_cookie.fetch_add(1, Ordering::Relaxed);
let probe = SdlStunProbe {
attr: msgattr as u32,
cookie,
step: 0,
};
let result = self.cookie_match.send_message_to_udp_and_wait_for(&self.udp_sock_v4, cookie, probe, PacketType::StunProbe as u8, to_server, Duration::from_secs(3)).await?;
if let Ok(res) = result.downcast() {
return Ok(*res);
}
return Err(SDLanError::ConvertError("failed to convert to StunprobeReply".to_owned()))
// println!("==> sending probe request: {:?}", probe);
}
}
pub struct PeerMap {
pub peers: DashMap<Mac, EdgePeer>,
}
#[allow(unused)]
impl PeerMap {
pub fn new() -> PeerMap {
Self {
peers: DashMap::new(),
}
}
/*
pub fn get_peer(&self, mac: &Mac) -> Option<Arc<EdgePeer>> {
if let Some(v) = self.peers.get(mac) {
Some(v.clone())
} else {
None
}
}
*/
pub fn clear(&self) {
self.peers.clear();
}
pub fn get_peer_by_sock(&self, sock: &SdlanSock) -> Option<Arc<EdgePeer>> {
/*
for s in self.peers.iter() {
let m = s.sock.read().unwrap();
if is_sdlan_sock_equal(&m, sock) {
return Some(s.value().clone());
}
}
*/
None
}
pub fn delete_peer_with_mac(&self, mac: &Mac) -> Option<(Mac, EdgePeer)> {
self.peers.remove(mac)
}
pub fn insert_peer(&self, mac: Mac, p: EdgePeer) {
self.peers.insert(mac, p);
}
}
pub struct NodeStats {
pub tx_p2p: AtomicU64,
pub rx_p2p: AtomicU64,
pub tx_sup: AtomicU64,
pub rx_sup: AtomicU64,
pub tx_broadcast: AtomicU64,
pub rx_broadcast: AtomicU64,
pub last_sup: AtomicU64,
pub last_p2p: AtomicU64,
}
impl NodeStats {
pub fn new() -> Self {
Self {
tx_p2p: AtomicU64::new(0),
rx_p2p: AtomicU64::new(0),
tx_sup: AtomicU64::new(0),
rx_sup: AtomicU64::new(0),
tx_broadcast: AtomicU64::new(0),
rx_broadcast: AtomicU64::new(0),
last_p2p: AtomicU64::new(0),
last_sup: AtomicU64::new(0),
}
}
}
use sdlan_sn_rs::peer::SdlanSock;
use crate::config::{self, MULTICAST_PORT, REGISTER_SUPER_INTERVAL};
pub struct NodeConfig {
// node name
pub name: String,
// 允许路由
pub allow_routing: bool,
// 丢弃多播,广播消息
pub _drop_multicast: bool,
// 允许p2p打洞
pub allow_p2p: bool,
// mtu of the tun
pub mtu: u32,
// udp消息的服务类型
pub _tos: u8,
// 打洞时候,时间间隔
pub _register_super_interval: u16,
// 打洞时候register消息的ttl
pub register_ttl: u8,
// 本地打开的udp端口
pub _local_port: u16,
pub node_uuid: String,
// pub super_attempts: AtomicU8,
pub super_nodes: Vec<SdlanSock>,
pub super_node_index: AtomicU8,
pub nat_server1: SocketAddr,
pub nat_server2: SocketAddr,
}
#[derive(Debug)]
pub struct EdgePeer {
// pub mac: Mac,
pub dev_addr: IpSubnet,
pub nat_type: NatType,
// 对端对外开放的ip和端口信息
pub sock: SdlanSock,
// peer's ipv6 info
pub _v6_info: Option<SdlanSock>,
pub timeout: u8,
// 最近一次遇见
pub last_seen: AtomicU64,
// 最近一次p2p消息
pub last_p2p: AtomicU64,
// 最近一次合法时间
pub _last_valid_timestamp: AtomicU64,
// 当时间超过重新注册的时间之后,
// pub has_sent_last_p2p: AtomicBool,
// 最近一次发送query
pub last_sent_query: AtomicU64,
}
impl EdgePeer {
pub fn new(
// mac: Mac,
net_addr: u32,
net_bit_len: u8,
sock: &SdlanSock,
v6info: &Option<V6Info>,
now: u64,
) -> Self {
let mut v6_info = None;
if let Some(v6info) = v6info {
v6_info = Some(SdlanSock {
family: AF_INET6,
port: v6info.port,
v4: [0; 4],
v6: v6info.v6,
})
}
Self {
// mac,
dev_addr: IpSubnet::new(net_addr, net_bit_len),
sock: sock.deepcopy(),
_v6_info: v6_info,
timeout: REGISTER_SUPER_INTERVAL as u8,
last_p2p: AtomicU64::new(0),
last_seen: AtomicU64::new(0),
_last_valid_timestamp: AtomicU64::new(now),
last_sent_query: AtomicU64::new(0),
nat_type: NatType::Invalid,
}
}
pub fn get_nat_type(&self) -> NatType {
self.nat_type
}
}
type BoxedProstMessage = Box<dyn Any + Send + Sync + 'static>;
pub struct Queryer {
pub mailbox: DashMap<u32, tokio::sync::oneshot::Sender<BoxedProstMessage>>,
}
impl Queryer {
pub fn new() -> Self {
Self {
mailbox: DashMap::new(),
}
}
pub fn write_feedback(&self, id: u32, data: BoxedProstMessage) {
if let Some((_, tx)) = self.mailbox.remove(&id) {
if let Err(_e) = tx.send(data) {
error!("failed to write feedback");
}
}
}
pub async fn send_message_to_udp_and_wait_for<T: Message>(&self, sock: &Socket, id: u32, message: T, packet_type: u8, to_server: &SocketAddr, timeout: Duration) -> Result<BoxedProstMessage> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.mailbox.insert(id, tx);
let content = encode_to_udp_message(Some(message), packet_type)?;
if let Err(_e) = sock.send_to(&content, to_server).await {
self.mailbox.remove(&id);
return Err(SDLanError::NormalError("send error"));
}
tokio::select! {
data = rx => {
if let Ok(data) = data {
self.mailbox.remove(&id);
Ok(data)
} else {
self.mailbox.remove(&id);
Err(SDLanError::IOError("rx receive failed".to_string()))
}
}
_ = tokio::time::sleep(timeout) => {
self.mailbox.remove(&id);
Err(SDLanError::IOError("timed out".to_string()))
}
}
}
pub async fn do_action_and_wait_for<T, F>(&self, id: u32, action: T, timeout: Duration) -> Result<BoxedProstMessage>
where
F: Future<Output = ()>,
T: Fn() -> F,
{
let (tx, rx) = tokio::sync::oneshot::channel();
self.mailbox.insert(id, tx);
action().await;
tokio::select! {
data = rx => {
if let Ok(data) = data {
self.mailbox.remove(&id);
Ok(data)
} else {
self.mailbox.remove(&id);
Err(SDLanError::IOError("rx receive failed".to_string()))
}
}
_ = tokio::time::sleep(timeout) => {
self.mailbox.remove(&id);
Err(SDLanError::IOError("timed out".to_string()))
}
}
}
pub async fn send_message_to_quic_and_wait_for<T: Message>(&self, id: u32, message: T, packet_type: u8, timeout: Duration) -> Result<BoxedProstMessage> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.mailbox.insert(id, tx);
let content = encode_to_tcp_message(Some(message), packet_type)?;
let quic_conn = get_quic_write_conn();
quic_conn.send(content).await?;
tokio::select! {
data = rx => {
if let Ok(data) = data {
self.mailbox.remove(&id);
Ok(data)
} else {
self.mailbox.remove(&id);
Err(SDLanError::IOError("rx receive failed".to_string()))
}
}
_ = tokio::time::sleep(timeout) => {
self.mailbox.remove(&id);
Err(SDLanError::IOError("timed out".to_string()))
}
}
}
}