diff --git a/src/lib.rs b/src/lib.rs index 12d0108..a50b7a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,12 +26,10 @@ pub async fn run_sdlan( // start_stop_sender: Sender, // start_stop_receiver: Receiver, ) -> Result<()> { - debug!("run_sdlan"); let (start_stop_sender, start_stop_chan) = channel(20); let edge_uuid = create_or_load_uuid("")?; let node_conf = parse_config(edge_uuid, &args).await?; - debug!("initing edge"); if let Err(e) = init_edge(&args.token, node_conf, args.tos, start_stop_sender).await { panic!("failed to init edge: {:?}", e); } diff --git a/src/network/async_main.rs b/src/network/async_main.rs index 99c500d..090b94e 100644 --- a/src/network/async_main.rs +++ b/src/network/async_main.rs @@ -139,7 +139,7 @@ async fn handle_tcp_event(edge: &Node, eventtype: EventType, eventprotobuf: &[u8 pub async fn async_main( install_channel: String, args: CommandLine, - start_stop_chan: Receiver, + start_stop_chan: Receiver, cancel: CancellationToken, ) -> Result<()> { // let _ = PidRecorder::new(".pid"); @@ -177,9 +177,9 @@ pub async fn async_main( init_tcp_conn( &args.tcp, move |stream| { - let token = args.token.clone(); let installed_channel = install_channel.to_owned(); Box::pin(async { + let token = edge._token.lock().unwrap().clone(); // let edge = get_edge(); // let edge = get_edge(); // let token = args.token.clone(); diff --git a/src/network/node.rs b/src/network/node.rs index 951ad59..b68b4fb 100644 --- a/src/network/node.rs +++ b/src/network/node.rs @@ -2,7 +2,7 @@ use dashmap::DashMap; use rsa::RsaPrivateKey; use sdlan_sn_rs::config::{AF_INET, AF_INET6}; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicU8, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use tokio::io::AsyncReadExt; use tokio::sync::mpsc::Sender; @@ -27,13 +27,8 @@ pub async fn init_edge( token: &str, node_conf: NodeConfig, tos: u32, - start_stop: Sender, + start_stop: Sender, ) -> Result<()> { - if token.len() == 0 { - println!("failed to load token"); - return Err(SDLanError::NormalError("no token is specified")); - } - let _ = PidRecorder::new(".pid"); // gen public key @@ -93,10 +88,10 @@ pub struct Node { pub tcp_pong: Arc, - start_stop_sender: Sender, + start_stop_sender: Sender, // user token info - pub _token: String, + pub _token: Mutex, pub device_config: DeviceConfig, pub device: Iface, @@ -135,11 +130,13 @@ unsafe impl Sync for Node {} impl Node { pub async fn start(&self, token: String) { - self.start_stop_sender.send(token).await; + *self._token.lock().unwrap() = token; + self.start_stop_sender.send(true).await; } pub async fn stop(&self) { - self.start_stop_sender.send("".to_owned()).await; + *self._token.lock().unwrap() = "".to_owned(); + self.start_stop_sender.send(false).await; } pub fn new( @@ -150,12 +147,12 @@ impl Node { token: &str, private: RsaPrivateKey, tcp_pong: Arc, - start_stop: Sender, + start_stop: Sender, ) -> Self { Self { packet_id: AtomicU32::new(1), network_id: AtomicU32::new(0), - _token: token.to_owned(), + _token: Mutex::new(token.to_owned()), start_stop_sender: start_stop, diff --git a/src/tcp/tcp_conn.rs b/src/tcp/tcp_conn.rs index 12790d5..a644aeb 100644 --- a/src/tcp/tcp_conn.rs +++ b/src/tcp/tcp_conn.rs @@ -57,7 +57,7 @@ impl ReadWriteActor { keep_reconnect: bool, mut to_tcp: Receiver>, on_connected: T, - mut start_stop_chan: Receiver, + mut start_stop_chan: Receiver, // cancel: CancellationToken, ) where T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()>, @@ -69,12 +69,12 @@ impl ReadWriteActor { if !started { println!("waiting for start"); while let Some(m) = start_stop_chan.recv().await { - if m.len() != 0 { - println!("new token received"); + if m { + println!("start received"); started = true; break; } else { - println!("empty token received"); + println!("stop received"); } } } @@ -146,19 +146,12 @@ impl ReadWriteActor { }; pin_mut!(read_from_tcp, write_to_tcp); - let received_empty_token_for_stop = async { - while let Some(tk) = start_stop_chan.recv().await { - if tk.len() == 0 { - return; - } - } - }; tokio::select! { _ = read_from_tcp => {}, _ = write_to_tcp => {}, _ = check_pong => {}, - _ = received_empty_token_for_stop => { + Some(false) = start_stop_chan.recv() => { started = false; } } @@ -197,7 +190,7 @@ impl ReadWriterHandle { on_connected: T, on_message: T2, pong_time: Arc, - start_stop_chan: Receiver, + start_stop_chan: Receiver, // cancel: CancellationToken, ) -> Self where @@ -236,7 +229,7 @@ pub fn init_tcp_conn<'a, T, T2, F>( on_message: T2, pong_time: Arc, // cancel: CancellationToken, - start_stop_chan: Receiver, + start_stop_chan: Receiver, ) where T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()> + Send + 'static, T2: Fn(SdlanTcp) -> F + Send + 'static,