added connecting_chan

This commit is contained in:
asxalex 2024-07-20 17:24:47 +08:00
parent 23c797920b
commit 25f15830fc
4 changed files with 22 additions and 4 deletions

View File

@ -31,6 +31,7 @@ async fn main() {
},
tx,
&sdlan_rs::get_install_channel(),
None,
)
.await;

View File

@ -10,7 +10,7 @@ use std::net::{SocketAddr, ToSocketAddrs};
pub use network::get_edge;
use network::{async_main, init_edge, NodeConfig};
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::{channel, Sender};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
pub use utils::{CommandLine, CommandLineInput};
@ -25,6 +25,8 @@ pub async fn run_sdlan(
args: CommandLine,
sender: std::sync::mpsc::Sender<bool>,
install_channel: &str,
connecting_chan: Option<Sender<bool>>
// start_stop_sender: Sender<String>,
// start_stop_receiver: Receiver<String>,
) -> Result<()> {
@ -41,7 +43,7 @@ pub async fn run_sdlan(
let cancel = CancellationToken::new();
let install_chan = install_channel.to_owned();
tokio::spawn(async move {
if let Err(e) = async_main(install_chan, args, start_stop_chan, cancel).await {
if let Err(e) = async_main(install_chan, args, start_stop_chan, cancel, connecting_chan).await {
error!("failed to run async main: {}", e.as_str());
}
});

View File

@ -213,6 +213,7 @@ pub async fn async_main(
args: CommandLine,
start_stop_chan: Receiver<StartStopInfo>,
cancel: CancellationToken,
connecting_chan: Option<Sender<bool>>,
) -> Result<()> {
// let _ = PidRecorder::new(".pid");
@ -303,6 +304,7 @@ pub async fn async_main(
edge.tcp_pong.clone(),
// tcp_pong,
start_stop_chan,
connecting_chan,
);
// tcp_conn.send("hello".as_bytes()).await;

View File

@ -36,6 +36,7 @@ pub struct ReadWriteActor {
// actor收到数据之后发送给上层的发送端口,接收端由handle保存
from_tcp: Sender<SdlanTcp>,
cancel: CancellationToken,
connecting_chan: Option<Sender<bool>>,
}
impl ReadWriteActor {
@ -45,6 +46,7 @@ impl ReadWriteActor {
from_tcp: Sender<SdlanTcp>,
connected: Arc<AtomicBool>,
pong_time: Arc<AtomicU64>,
connecting_chan: Option<Sender<bool>>,
) -> Self {
Self {
// to_tcp,
@ -53,6 +55,7 @@ impl ReadWriteActor {
connected,
remote: remote.to_owned(),
from_tcp,
connecting_chan,
}
}
@ -106,6 +109,9 @@ impl ReadWriteActor {
continue;
}
debug!("try connecting...");
if let Some(ref connecting_chan) = self.connecting_chan {
let _ = connecting_chan.send(true).await;
}
let Ok(mut stream) = TcpStream::connect(&self.remote).await else {
self.connected.store(false, Ordering::Relaxed);
if keep_reconnect {
@ -123,6 +129,10 @@ impl ReadWriteActor {
};
self.connected.store(true, Ordering::Relaxed);
on_connected(&mut stream, start_pkt_id.take()).await;
if let Some(ref connecting_chan) = self.connecting_chan {
let _ = connecting_chan.send(false).await;
}
// stream.write("hello".as_bytes()).await;
let (reader, mut write) = stream.into_split();
@ -241,6 +251,7 @@ impl ReadWriterHandle {
pong_time: Arc<AtomicU64>,
start_stop_chan: Receiver<StartStopInfo>,
// cancel: CancellationToken,
connecting_chan: Option<Sender<bool>>,
) -> Self
where
T: for<'b> Fn(&'b mut TcpStream, Option<u32>) -> BoxFuture<'b, ()> + Send + 'static,
@ -252,8 +263,8 @@ impl ReadWriterHandle {
let (send_to_tcp, to_tcp) = channel(20);
let (from_tcp, mut data_from_tcp) = channel(20);
let connected = Arc::new(AtomicBool::new(false));
let actor = ReadWriteActor::new(cancel, addr, from_tcp, connected.clone(), pong_time);
let connected: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let actor = ReadWriteActor::new(cancel, addr, from_tcp, connected.clone(), pong_time, connecting_chan);
tokio::spawn(async move {
actor
.run(true, to_tcp, on_connected, on_disconnected, start_stop_chan)
@ -287,6 +298,7 @@ pub fn init_tcp_conn<'a, T, T3, T2, F, F2>(
pong_time: Arc<AtomicU64>,
// cancel: CancellationToken,
start_stop_chan: Receiver<StartStopInfo>,
connecting_chan: Option<Sender<bool>>
) where
T: for<'b> Fn(&'b mut TcpStream, Option<u32>) -> BoxFuture<'b, ()> + Send + 'static,
T3: Fn() -> F2 + Send + 'static,
@ -302,6 +314,7 @@ pub fn init_tcp_conn<'a, T, T3, T2, F, F2>(
on_message,
pong_time,
start_stop_chan,
connecting_chan,
);
GLOBAL_TCP_HANDLE