From 23c797920baa990e68cffa475e222211bfd7eb5e Mon Sep 17 00:00:00 2001 From: asxalex Date: Sat, 20 Jul 2024 12:23:10 +0800 Subject: [PATCH] added should_exit --- src/bin/sdlan/main.rs | 29 ++++++++++--- src/network/async_main.rs | 86 +++++++++++++++++++++++++-------------- src/network/node.rs | 53 ++++++++++++++++-------- src/tcp/tcp_conn.rs | 30 +++++++++++++- 4 files changed, 143 insertions(+), 55 deletions(-) diff --git a/src/bin/sdlan/main.rs b/src/bin/sdlan/main.rs index f0044f4..3c237a6 100644 --- a/src/bin/sdlan/main.rs +++ b/src/bin/sdlan/main.rs @@ -4,8 +4,9 @@ use sdlan_rs::CommandLine; use sdlan_rs::CommandLineInput; use sdlan_sn_rs::log; -use structopt::StructOpt; +use std::process::exit; use std::time::Duration; +use structopt::StructOpt; #[tokio::main] async fn main() { @@ -27,21 +28,39 @@ async fn main() { tos: 0, token: "".to_owned(), allow_p2p: true, - }, tx, + }, + tx, &sdlan_rs::get_install_channel(), - ).await; + ) + .await; let _ = rx.recv(); let edge = get_edge(); - let _ = edge.start_without_feedback(cmd.token).await; + // let res = edge.start_without_feedback(cmd.token).await; + let Ok(res) = edge + .start_with_feedback(cmd.token, Duration::from_secs(3)) + .await + else { + println!("failed to start1"); + exit(0); + }; + + if res.result != 0 { + println!("failed to start: {}", res.message); + if res.should_exit { + exit(0); + } + // edge.stop().await; + // exit(0); + } /* tokio::time::sleep(Duration::from_secs(20)).await; edge.stop().await; */ - /* + /* let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::user_defined1()).unwrap(); diff --git a/src/network/async_main.rs b/src/network/async_main.rs index 454ca80..ae91db9 100644 --- a/src/network/async_main.rs +++ b/src/network/async_main.rs @@ -1,6 +1,6 @@ +use std::net::IpAddr; use std::sync::atomic::Ordering; use std::time::Duration; -use std::net::IpAddr; use crate::config::TCP_PING_TIME; use crate::network::{get_edge, ping_to_sn, read_and_parse_packet, RegisterSuperFeedback}; @@ -12,18 +12,16 @@ use crate::tcp::{init_tcp_conn, EventType, NakMsgCode, PacketType, SdlanTcp}; use crate::utils::{send_to_sock, CommandLine}; use etherparse::IpHeaders; use sdlan_sn_rs::config::{AF_INET, SDLAN_DEFAULT_TTL}; -use sdlan_sn_rs::packet::Register; use sdlan_sn_rs::peer::SdlanSock; -use sdlan_sn_rs::utils::{ - aes_encrypt, get_current_timestamp, - ip_to_string, is_multi_broadcast, rsa_decrypt, -}; use sdlan_sn_rs::utils::Result; +use sdlan_sn_rs::utils::{ + aes_encrypt, get_current_timestamp, ip_to_string, is_multi_broadcast, rsa_decrypt, +}; use tokio::io::AsyncWriteExt; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio_util::sync::CancellationToken; -use super::{check_peer_registration_needed, packet, Node, StartStopInfo }; +use super::{check_peer_registration_needed, packet, Node, StartStopInfo}; use crate::utils::Socket; use prost::Message; @@ -38,10 +36,14 @@ async fn handle_tcp_message(msg: SdlanTcp) { debug!("got tcp message: {:?}", msg.packet_type); match msg.packet_type { PacketType::RegisterSuperACK => { - edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback { - result: 0, - message: "".to_owned(), - }); + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 0, + message: "".to_owned(), + should_exit: false, + }, + ); let Ok(ack) = SdlRegisterSuperAck::decode(&msg.current_packet[..]) else { error!("failed to decode REGISTER_SUPER_ACK"); return; @@ -79,40 +81,60 @@ async fn handle_tcp_message(msg: SdlanTcp) { PacketType::RegisterSuperNAK => { let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else { error!("failed to decode REGISTER_SUPER_NAK"); - edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback { - result: 1, - message: "failed to decode REGISTER SUPER NAK".to_owned(), - }); + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 1, + message: "failed to decode REGISTER SUPER NAK".to_owned(), + should_exit: false, + }, + ); return; }; let Ok(error_code) = NakMsgCode::try_from(_nak.error_code as u8) else { - edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback { - result: 2, - message: "error_code not recognized".to_owned(), - }); + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 2, + message: "error_code not recognized".to_owned(), + should_exit: false, + }, + ); return; }; match error_code { NakMsgCode::InvalidToken => { - edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback { - result: 3, - message: "invalid token".to_owned(), - }); + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 3, + message: "invalid token".to_owned(), + should_exit: true, + }, + ); edge.stop().await; } NakMsgCode::NodeDisabled => { - edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback { - result: 4, - message: "Node is disabled".to_owned(), - }); + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 4, + message: "Node is disabled".to_owned(), + should_exit: true, + }, + ); edge.stop().await; } _other => { - edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback { - result: 0, - message: "".to_owned(), - }); + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 0, + message: "".to_owned(), + should_exit: false, + }, + ); } } /* @@ -224,7 +246,9 @@ pub async fn async_main( let edge = get_edge(); // let token = args.token.clone(); + let cancel_tcp = cancel.clone(); init_tcp_conn( + cancel_tcp, &args.tcp, move |stream, pkt_id| { let installed_channel = install_channel.to_owned(); diff --git a/src/network/node.rs b/src/network/node.rs index 30e1bcc..e6bfd73 100644 --- a/src/network/node.rs +++ b/src/network/node.rs @@ -93,6 +93,7 @@ pub struct RegisterSuperFeedback { // 0 for success, other for error pub result: u8, pub message: String, + pub should_exit: bool, } pub struct StartStopInfo { @@ -174,41 +175,57 @@ impl Node { pub async fn start_without_feedback(&self, token: String) -> Result<()> { *self._token.lock().unwrap() = token; - let _ = self.start_stop_sender.send(StartStopInfo{ - is_start: true, - pkt_id: None, - }).await; + let _ = self + .start_stop_sender + .send(StartStopInfo { + is_start: true, + pkt_id: None, + }) + .await; Ok(()) } - pub async fn start_with_feedback(&self, token: String, timeout: Duration) -> Result { + pub async fn start_with_feedback( + &self, + token: String, + timeout: Duration, + ) -> Result { *self._token.lock().unwrap() = token; - let (tx, mut rx) = oneshot::channel(); + let (tx, rx) = oneshot::channel(); let id = self.get_next_packet_id(); self.packet_id_match.insert(id, tx); - let _ = self.start_stop_sender.send(StartStopInfo{ - is_start: true, - pkt_id: Some(id), - }).await; + let _ = self + .start_stop_sender + .send(StartStopInfo { + is_start: true, + pkt_id: Some(id), + }) + .await; tokio::select! { - Ok(result) = rx => { - self.packet_id_match.remove(&id); - Ok(result) + rx_info = rx => { + if let Ok(result) = rx_info { + self.packet_id_match.remove(&id); + Ok(result) + } else { + Err(SDLanError::NormalError("rx closed")) + } } _ = tokio::time::sleep(timeout) => { Err(SDLanError::NormalError("timed out")) } } - } pub async fn stop(&self) { *self._token.lock().unwrap() = "".to_owned(); - let _ = self.start_stop_sender.send(StartStopInfo{ - is_start: false, - pkt_id: None, - }).await; + let _ = self + .start_stop_sender + .send(StartStopInfo { + is_start: false, + pkt_id: None, + }) + .await; } pub fn new( diff --git a/src/tcp/tcp_conn.rs b/src/tcp/tcp_conn.rs index ccf8db5..21e4051 100644 --- a/src/tcp/tcp_conn.rs +++ b/src/tcp/tcp_conn.rs @@ -8,6 +8,7 @@ use std::{ time::Duration, }; use tokio::io::BufReader; +use tokio_util::sync::CancellationToken; use tracing::debug; use futures_util::{future::BoxFuture, pin_mut}; @@ -34,10 +35,12 @@ pub struct ReadWriteActor { pong_time: Arc, // actor收到数据之后,发送给上层的发送端口,接收端由handle保存 from_tcp: Sender, + cancel: CancellationToken, } impl ReadWriteActor { pub fn new( + cancel: CancellationToken, remote: &str, from_tcp: Sender, connected: Arc, @@ -45,6 +48,7 @@ impl ReadWriteActor { ) -> Self { Self { // to_tcp, + cancel, pong_time, connected, remote: remote.to_owned(), @@ -72,7 +76,22 @@ impl ReadWriteActor { self.connected.store(false, Ordering::Relaxed); if !started { // println!("waiting for start"); + loop { + let start_or_stop = start_stop_chan.recv().await; + if let Some(m) = start_or_stop { + if m.is_start { + started = true; + start_pkt_id = m.pkt_id; + break; + } + } else { + // None, just return + return; + } + } + /* while let Some(m) = start_stop_chan.recv().await { + println!("4"); if m.is_start { // println!("start received"); started = true; @@ -82,6 +101,9 @@ impl ReadWriteActor { // println!("stop received"); } } + */ + debug!("start stop chan recv none"); + continue; } debug!("try connecting..."); let Ok(mut stream) = TcpStream::connect(&self.remote).await else { @@ -157,11 +179,13 @@ impl ReadWriteActor { match start_stop_chan.recv().await { Some(v) => { if !v.is_start { + started = false; return; } } other => { // send chan is closed; + started = false; return; } } @@ -179,6 +203,7 @@ impl ReadWriteActor { on_disconnected().await; debug!("connect retrying"); tokio::time::sleep(Duration::from_secs(1)).await; + debug!("disconnected"); // future::select(read_from_tcp, write_to_tcp).await; } } @@ -208,6 +233,7 @@ impl ReadWriterHandle { } fn new<'a, T, T3, T2, F, F2>( + cancel: CancellationToken, addr: &str, on_connected: T, on_disconnected: T3, @@ -227,7 +253,7 @@ impl ReadWriterHandle { let (from_tcp, mut data_from_tcp) = channel(20); let connected = Arc::new(AtomicBool::new(false)); - let actor = ReadWriteActor::new(addr, from_tcp, connected.clone(), pong_time); + let actor = ReadWriteActor::new(cancel, addr, from_tcp, connected.clone(), pong_time); tokio::spawn(async move { actor .run(true, to_tcp, on_connected, on_disconnected, start_stop_chan) @@ -253,6 +279,7 @@ impl ReadWriterHandle { } pub fn init_tcp_conn<'a, T, T3, T2, F, F2>( + cancel: CancellationToken, addr: &str, on_connected: T, on_disconnected: T3, @@ -268,6 +295,7 @@ pub fn init_tcp_conn<'a, T, T3, T2, F, F2>( F2: Future + Send, { let tcp_handle = ReadWriterHandle::new( + cancel, addr, on_connected, on_disconnected,