From 25f15830fcdca7f683dade8de41bc694d20b9c1e Mon Sep 17 00:00:00 2001 From: asxalex Date: Sat, 20 Jul 2024 17:24:47 +0800 Subject: [PATCH] added connecting_chan --- src/bin/sdlan/main.rs | 1 + src/lib.rs | 6 ++++-- src/network/async_main.rs | 2 ++ src/tcp/tcp_conn.rs | 17 +++++++++++++++-- 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/bin/sdlan/main.rs b/src/bin/sdlan/main.rs index 3c237a6..8930a09 100644 --- a/src/bin/sdlan/main.rs +++ b/src/bin/sdlan/main.rs @@ -31,6 +31,7 @@ async fn main() { }, tx, &sdlan_rs::get_install_channel(), + None, ) .await; diff --git a/src/lib.rs b/src/lib.rs index 7bffba8..d22971b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,7 @@ use std::net::{SocketAddr, ToSocketAddrs}; pub use network::get_edge; use network::{async_main, init_edge, NodeConfig}; -use tokio::sync::mpsc::channel; +use tokio::sync::mpsc::{channel, Sender}; use tokio_util::sync::CancellationToken; use tracing::{debug, error}; pub use utils::{CommandLine, CommandLineInput}; @@ -25,6 +25,8 @@ pub async fn run_sdlan( args: CommandLine, sender: std::sync::mpsc::Sender, install_channel: &str, + + connecting_chan: Option> // start_stop_sender: Sender, // start_stop_receiver: Receiver, ) -> Result<()> { @@ -41,7 +43,7 @@ pub async fn run_sdlan( let cancel = CancellationToken::new(); let install_chan = install_channel.to_owned(); tokio::spawn(async move { - if let Err(e) = async_main(install_chan, args, start_stop_chan, cancel).await { + if let Err(e) = async_main(install_chan, args, start_stop_chan, cancel, connecting_chan).await { error!("failed to run async main: {}", e.as_str()); } }); diff --git a/src/network/async_main.rs b/src/network/async_main.rs index ae91db9..f3f5143 100644 --- a/src/network/async_main.rs +++ b/src/network/async_main.rs @@ -213,6 +213,7 @@ pub async fn async_main( args: CommandLine, start_stop_chan: Receiver, cancel: CancellationToken, + connecting_chan: Option>, ) -> Result<()> { // let _ = PidRecorder::new(".pid"); @@ -303,6 +304,7 @@ pub async fn async_main( edge.tcp_pong.clone(), // tcp_pong, start_stop_chan, + connecting_chan, ); // tcp_conn.send("hello".as_bytes()).await; diff --git a/src/tcp/tcp_conn.rs b/src/tcp/tcp_conn.rs index 21e4051..e21503f 100644 --- a/src/tcp/tcp_conn.rs +++ b/src/tcp/tcp_conn.rs @@ -36,6 +36,7 @@ pub struct ReadWriteActor { // actor收到数据之后,发送给上层的发送端口,接收端由handle保存 from_tcp: Sender, cancel: CancellationToken, + connecting_chan: Option>, } impl ReadWriteActor { @@ -45,6 +46,7 @@ impl ReadWriteActor { from_tcp: Sender, connected: Arc, pong_time: Arc, + connecting_chan: Option>, ) -> Self { Self { // to_tcp, @@ -53,6 +55,7 @@ impl ReadWriteActor { connected, remote: remote.to_owned(), from_tcp, + connecting_chan, } } @@ -106,6 +109,9 @@ impl ReadWriteActor { continue; } debug!("try connecting..."); + if let Some(ref connecting_chan) = self.connecting_chan { + let _ = connecting_chan.send(true).await; + } let Ok(mut stream) = TcpStream::connect(&self.remote).await else { self.connected.store(false, Ordering::Relaxed); if keep_reconnect { @@ -123,6 +129,10 @@ impl ReadWriteActor { }; self.connected.store(true, Ordering::Relaxed); on_connected(&mut stream, start_pkt_id.take()).await; + + if let Some(ref connecting_chan) = self.connecting_chan { + let _ = connecting_chan.send(false).await; + } // stream.write("hello".as_bytes()).await; let (reader, mut write) = stream.into_split(); @@ -241,6 +251,7 @@ impl ReadWriterHandle { pong_time: Arc, start_stop_chan: Receiver, // cancel: CancellationToken, + connecting_chan: Option>, ) -> Self where T: for<'b> Fn(&'b mut TcpStream, Option) -> BoxFuture<'b, ()> + Send + 'static, @@ -252,8 +263,8 @@ impl ReadWriterHandle { let (send_to_tcp, to_tcp) = channel(20); let (from_tcp, mut data_from_tcp) = channel(20); - let connected = Arc::new(AtomicBool::new(false)); - let actor = ReadWriteActor::new(cancel, addr, from_tcp, connected.clone(), pong_time); + let connected: Arc = Arc::new(AtomicBool::new(false)); + let actor = ReadWriteActor::new(cancel, addr, from_tcp, connected.clone(), pong_time, connecting_chan); tokio::spawn(async move { actor .run(true, to_tcp, on_connected, on_disconnected, start_stop_chan) @@ -287,6 +298,7 @@ pub fn init_tcp_conn<'a, T, T3, T2, F, F2>( pong_time: Arc, // cancel: CancellationToken, start_stop_chan: Receiver, + connecting_chan: Option> ) where T: for<'b> Fn(&'b mut TcpStream, Option) -> BoxFuture<'b, ()> + Send + 'static, T3: Fn() -> F2 + Send + 'static, @@ -302,6 +314,7 @@ pub fn init_tcp_conn<'a, T, T3, T2, F, F2>( on_message, pong_time, start_stop_chan, + connecting_chan, ); GLOBAL_TCP_HANDLE