edge set unauthorized on tcp disconnected

This commit is contained in:
asxalex 2024-07-06 20:58:22 +08:00
parent 762ff30c18
commit 408aedc0ab
3 changed files with 57 additions and 19 deletions

View File

@ -1,9 +1,9 @@
use sdlan_rs::get_edge; use sdlan_rs::get_edge;
use sdlan_rs::run_sdlan;
use sdlan_rs::CommandLine;
use sdlan_sn_rs::log; use sdlan_sn_rs::log;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use sdlan_rs::run_sdlan;
use sdlan_rs::CommandLine;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
@ -23,6 +23,22 @@ async fn main() {
let edge = get_edge(); let edge = get_edge();
edge.start("0".to_owned()).await; edge.start("0".to_owned()).await;
/*
tokio::time::sleep(Duration::from_secs(20)).await; tokio::time::sleep(Duration::from_secs(20)).await;
edge.stop().await; edge.stop().await;
*/
let mut stream =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::user_defined1()).unwrap();
let mut started = true;
loop {
let sig = stream.recv().await;
if started {
edge.stop().await;
} else {
edge.start("0".to_owned()).await;
}
started = !started;
}
} }

View File

@ -207,6 +207,9 @@ pub async fn async_main(
} }
}) })
}, },
|| async {
edge.set_authorized(false, vec![]);
},
|msg| handle_tcp_message(msg), |msg| handle_tcp_message(msg),
edge.tcp_pong.clone(), edge.tcp_pong.clone(),
// tcp_pong, // tcp_pong,

View File

@ -52,15 +52,18 @@ impl ReadWriteActor {
} }
} }
pub async fn run<'a, T>( pub async fn run<'a, T, T2, F>(
&self, &self,
keep_reconnect: bool, keep_reconnect: bool,
mut to_tcp: Receiver<Vec<u8>>, mut to_tcp: Receiver<Vec<u8>>,
on_connected: T, on_connected: T,
on_disconnected: T2,
mut start_stop_chan: Receiver<bool>, mut start_stop_chan: Receiver<bool>,
// cancel: CancellationToken, // cancel: CancellationToken,
) where ) where
T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()>, T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()>,
T2: Fn() -> F,
F: Future<Output = ()>,
{ {
// let (tx, rx) = channel(20); // let (tx, rx) = channel(20);
let mut started = false; let mut started = false;
@ -155,7 +158,7 @@ impl ReadWriteActor {
started = false; started = false;
} }
} }
on_disconnected().await;
println!("connect retrying"); println!("connect retrying");
// future::select(read_from_tcp, write_to_tcp).await; // future::select(read_from_tcp, write_to_tcp).await;
} }
@ -185,9 +188,10 @@ impl ReadWriterHandle {
Ok(()) Ok(())
} }
fn new<'a, T, T2, F>( fn new<'a, T, T3, T2, F, F2>(
addr: &str, addr: &str,
on_connected: T, on_connected: T,
on_disconnected: T3,
on_message: T2, on_message: T2,
pong_time: Arc<AtomicU64>, pong_time: Arc<AtomicU64>,
start_stop_chan: Receiver<bool>, start_stop_chan: Receiver<bool>,
@ -195,15 +199,21 @@ impl ReadWriterHandle {
) -> Self ) -> Self
where where
T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()> + Send + 'static, T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()> + Send + 'static,
T3: Fn() -> F2 + Send + 'static,
T2: Fn(SdlanTcp) -> F + Send + 'static, T2: Fn(SdlanTcp) -> F + Send + 'static,
F: Future<Output = ()> + Send, F: Future<Output = ()> + Send,
F2: Future<Output = ()> + Send,
{ {
let (send_to_tcp, to_tcp) = channel(20); let (send_to_tcp, to_tcp) = channel(20);
let (from_tcp, mut data_from_tcp) = channel(20); let (from_tcp, mut data_from_tcp) = channel(20);
let connected = Arc::new(AtomicBool::new(false)); let connected = Arc::new(AtomicBool::new(false));
let actor = ReadWriteActor::new(addr, from_tcp, connected.clone(), pong_time); let actor = ReadWriteActor::new(addr, from_tcp, connected.clone(), pong_time);
tokio::spawn(async move { actor.run(true, to_tcp, on_connected, start_stop_chan).await }); tokio::spawn(async move {
actor
.run(true, to_tcp, on_connected, on_disconnected, start_stop_chan)
.await
});
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
if let Some(msg) = data_from_tcp.recv().await { if let Some(msg) = data_from_tcp.recv().await {
@ -223,20 +233,29 @@ impl ReadWriterHandle {
} }
} }
pub fn init_tcp_conn<'a, T, T2, F>( pub fn init_tcp_conn<'a, T, T3, T2, F, F2>(
addr: &str, addr: &str,
on_connected: T, on_connected: T,
on_disconnected: T3,
on_message: T2, on_message: T2,
pong_time: Arc<AtomicU64>, pong_time: Arc<AtomicU64>,
// cancel: CancellationToken, // cancel: CancellationToken,
start_stop_chan: Receiver<bool>, start_stop_chan: Receiver<bool>,
) where ) where
T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()> + Send + 'static, T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()> + Send + 'static,
T3: Fn() -> F2 + Send + 'static,
T2: Fn(SdlanTcp) -> F + Send + 'static, T2: Fn(SdlanTcp) -> F + Send + 'static,
F: Future<Output = ()> + Send, F: Future<Output = ()> + Send,
F2: Future<Output = ()> + Send,
{ {
let tcp_handle = let tcp_handle = ReadWriterHandle::new(
ReadWriterHandle::new(addr, on_connected, on_message, pong_time, start_stop_chan); addr,
on_connected,
on_disconnected,
on_message,
pong_time,
start_stop_chan,
);
GLOBAL_TCP_HANDLE GLOBAL_TCP_HANDLE
.set(tcp_handle) .set(tcp_handle)