From 408aedc0abe920203d7d8d5a1295af3440414e02 Mon Sep 17 00:00:00 2001 From: asxalex Date: Sat, 6 Jul 2024 20:58:22 +0800 Subject: [PATCH] edge set unauthorized on tcp disconnected --- src/bin/sdlan/main.rs | 40 +++++++++++++++++++++++++++------------ src/network/async_main.rs | 3 +++ src/tcp/tcp_conn.rs | 33 +++++++++++++++++++++++++------- 3 files changed, 57 insertions(+), 19 deletions(-) diff --git a/src/bin/sdlan/main.rs b/src/bin/sdlan/main.rs index e309e1a..cf92cdf 100644 --- a/src/bin/sdlan/main.rs +++ b/src/bin/sdlan/main.rs @@ -1,28 +1,44 @@ use sdlan_rs::get_edge; +use sdlan_rs::run_sdlan; +use sdlan_rs::CommandLine; use sdlan_sn_rs::log; use std::time::Duration; use tokio::sync::mpsc::Sender; -use sdlan_rs::run_sdlan; -use sdlan_rs::CommandLine; #[tokio::main] async fn main() { let _guard = log::init_log(); - tokio::spawn(run_sdlan(CommandLine { - sn: "39.98.184.67:1265".to_owned(), - tcp: "39.98.184.67:18083".to_owned(), - _allow_routing: true, - register_ttl: 1, - mtu: 1290, - name: "tau".to_owned(), - tos: 0, - token: "".to_owned(), + tokio::spawn(run_sdlan(CommandLine { + sn: "39.98.184.67:1265".to_owned(), + tcp: "39.98.184.67:18083".to_owned(), + _allow_routing: true, + register_ttl: 1, + mtu: 1290, + name: "tau".to_owned(), + tos: 0, + token: "".to_owned(), })); tokio::time::sleep(Duration::from_secs(3)).await; let edge = get_edge(); edge.start("0".to_owned()).await; + /* tokio::time::sleep(Duration::from_secs(20)).await; edge.stop().await; -} \ No newline at end of file + */ + + let mut stream = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::user_defined1()).unwrap(); + + let mut started = true; + loop { + let sig = stream.recv().await; + if started { + edge.stop().await; + } else { + edge.start("0".to_owned()).await; + } + started = !started; + } +} diff --git a/src/network/async_main.rs b/src/network/async_main.rs index 090b94e..86a6d07 100644 --- a/src/network/async_main.rs +++ b/src/network/async_main.rs @@ -207,6 +207,9 @@ pub async fn async_main( } }) }, + || async { + edge.set_authorized(false, vec![]); + }, |msg| handle_tcp_message(msg), edge.tcp_pong.clone(), // tcp_pong, diff --git a/src/tcp/tcp_conn.rs b/src/tcp/tcp_conn.rs index a644aeb..f9dc29b 100644 --- a/src/tcp/tcp_conn.rs +++ b/src/tcp/tcp_conn.rs @@ -52,15 +52,18 @@ impl ReadWriteActor { } } - pub async fn run<'a, T>( + pub async fn run<'a, T, T2, F>( &self, keep_reconnect: bool, mut to_tcp: Receiver>, on_connected: T, + on_disconnected: T2, mut start_stop_chan: Receiver, // cancel: CancellationToken, ) where T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()>, + T2: Fn() -> F, + F: Future, { // let (tx, rx) = channel(20); let mut started = false; @@ -155,7 +158,7 @@ impl ReadWriteActor { started = false; } } - + on_disconnected().await; println!("connect retrying"); // future::select(read_from_tcp, write_to_tcp).await; } @@ -185,9 +188,10 @@ impl ReadWriterHandle { Ok(()) } - fn new<'a, T, T2, F>( + fn new<'a, T, T3, T2, F, F2>( addr: &str, on_connected: T, + on_disconnected: T3, on_message: T2, pong_time: Arc, start_stop_chan: Receiver, @@ -195,15 +199,21 @@ impl ReadWriterHandle { ) -> Self where T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()> + Send + 'static, + T3: Fn() -> F2 + Send + 'static, T2: Fn(SdlanTcp) -> F + Send + 'static, F: Future + Send, + F2: Future + Send, { let (send_to_tcp, to_tcp) = channel(20); let (from_tcp, mut data_from_tcp) = channel(20); let connected = Arc::new(AtomicBool::new(false)); let actor = ReadWriteActor::new(addr, from_tcp, connected.clone(), pong_time); - tokio::spawn(async move { actor.run(true, to_tcp, on_connected, start_stop_chan).await }); + tokio::spawn(async move { + actor + .run(true, to_tcp, on_connected, on_disconnected, start_stop_chan) + .await + }); tokio::spawn(async move { loop { if let Some(msg) = data_from_tcp.recv().await { @@ -223,20 +233,29 @@ impl ReadWriterHandle { } } -pub fn init_tcp_conn<'a, T, T2, F>( +pub fn init_tcp_conn<'a, T, T3, T2, F, F2>( addr: &str, on_connected: T, + on_disconnected: T3, on_message: T2, pong_time: Arc, // cancel: CancellationToken, start_stop_chan: Receiver, ) where T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()> + Send + 'static, + T3: Fn() -> F2 + Send + 'static, T2: Fn(SdlanTcp) -> F + Send + 'static, F: Future + Send, + F2: Future + Send, { - let tcp_handle = - ReadWriterHandle::new(addr, on_connected, on_message, pong_time, start_stop_chan); + let tcp_handle = ReadWriterHandle::new( + addr, + on_connected, + on_disconnected, + on_message, + pong_time, + start_stop_chan, + ); GLOBAL_TCP_HANDLE .set(tcp_handle)