diff --git a/Cargo.lock b/Cargo.lock index b330340..d512f00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -385,6 +385,18 @@ dependencies = [ "subtle", ] +[[package]] +name = "dns-lookup" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc" +dependencies = [ + "cfg-if", + "libc", + "socket2", + "windows-sys 0.48.0", +] + [[package]] name = "dotenvy" version = "0.15.7" @@ -1361,6 +1373,7 @@ name = "sdlan-rs" version = "0.1.0" dependencies = [ "dashmap 6.0.1", + "dns-lookup", "etherparse", "futures-util", "num_enum", diff --git a/Cargo.toml b/Cargo.toml index ef2d909..7b533a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] dashmap = "6.0.1" +dns-lookup = "2.0.4" etherparse = "0.15.0" futures-util = "0.3.30" num_enum = "0.7.2" diff --git a/src/lib.rs b/src/lib.rs index 7e1948c..21ee249 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,3 +3,144 @@ mod network; mod pb; mod tcp; mod utils; + +use std::time::Duration; + +use std::net::SocketAddr; + +use network::{async_main, init_edge, NodeConfig}; +use tokio::sync::mpsc::Receiver; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error}; +pub use utils::CommandLine; + +use sdlan_sn_rs::{ + log, + peer::SdlanSock, + utils::{create_or_load_uuid, get_sdlan_sock_from_socketaddr, Result, SDLanError}, +}; + +pub async fn run_sdlan(args: CommandLine, start_stop_chan: Receiver) -> Result<()> { + let _guard = log::init_log(); + + let edge_uuid = create_or_load_uuid("")?; + let node_conf = parse_config(edge_uuid, &args).await?; + + if let Err(e) = init_edge(&args.token, node_conf, args.tos).await { + panic!("failed to init edge: {:?}", e); + } + + let cancel = CancellationToken::new(); + tokio::spawn(async move { + if let Err(e) = async_main("linux".to_owned(), args, start_stop_chan, cancel).await { + error!("failed to run async main: {}", e.as_str()); + } + }); + + Ok(()) +} + +async fn parse_config(uuid: String, args: &CommandLine) -> Result { + if args.sn.len() == 0 { + return Err(SDLanError::NormalError("no sn is specified")); + } + let mut node_conf = NodeConfig::new(); + + let sns: Vec<&str> = args.sn.split(",").collect(); + let mut correct_sns: Vec<_>; + let mut sockaddr: Vec<_>; + + loop { + debug!("parsing sns"); + (correct_sns, sockaddr) = parse_sns(&sns); + if sockaddr.len() < 1 { + if correct_sns.len() > 0 { + tokio::time::sleep(Duration::from_millis(500)).await; + continue; + // (correct_sns, sockaddr) = parse_sns(&correct_sns); + } else { + // correct_sns == 0, just error + panic!("no correct sns specified"); + } + } + + node_conf.super_nodes = sockaddr; + break; + } + + node_conf.name = args.name.to_owned(); + node_conf.mtu = args.mtu; + node_conf.node_uuid = uuid; + + if args.register_ttl > 1 { + node_conf.register_ttl = args.register_ttl; + } + + Ok(node_conf) +} + +fn parse_sns<'a>(sns: &'a Vec<&'a str>) -> (Vec<&'a str>, Vec) { + let mut correct_sns = vec![]; + let mut result = vec![]; + for sn in sns { + let addr: Vec<_> = sn.split(":").collect(); + if addr.len() != 2 { + error!("sn format error: [host:port] => {}", sn); + continue; + } + + let Ok(port) = addr[1].parse::() else { + continue; + }; + // store correct format sns + correct_sns.push(*sn); + + // node_conf.super_nodes.push(sock); + if let Ok(ipaddr) = dns_lookup::lookup_host(addr[0]) { + for ip in ipaddr { + let sockaddr = SocketAddr::new(ip, port); + let Ok(sock) = get_sdlan_sock_from_socketaddr(sockaddr) else { + error!("failed to parse sn: {}", sn); + continue; + }; + result.push(sock); + } + } else { + error!("failed to parse host: {}", sn); + continue; + } + } + (correct_sns, result) +} + +#[cfg(test)] +mod test { + use sdlan_sn_rs::config::AF_INET; + + use super::*; + + #[test] + #[ignore] + fn test_parse_dns() { + let sns = vec!["git.asxalex.pw:80", "121.4.79.234:81"]; + let (correct, res) = parse_sns(&sns); + assert_eq!(correct, sns); + assert_eq!( + res, + vec![ + SdlanSock { + family: AF_INET, + port: 80, + v4: [10, 167, 69, 157], + v6: [0; 16], + }, + SdlanSock { + family: AF_INET, + port: 81, + v4: [121, 4, 79, 234], + v6: [0; 16], + } + ] + ) + } +} diff --git a/src/network/async_main.rs b/src/network/async_main.rs index e33a19a..99c500d 100644 --- a/src/network/async_main.rs +++ b/src/network/async_main.rs @@ -139,7 +139,7 @@ async fn handle_tcp_event(edge: &Node, eventtype: EventType, eventprotobuf: &[u8 pub async fn async_main( install_channel: String, args: CommandLine, - start_stop_chan: Receiver, + start_stop_chan: Receiver, cancel: CancellationToken, ) -> Result<()> { // let _ = PidRecorder::new(".pid"); diff --git a/src/tcp/tcp_conn.rs b/src/tcp/tcp_conn.rs index d627a1d..12790d5 100644 --- a/src/tcp/tcp_conn.rs +++ b/src/tcp/tcp_conn.rs @@ -57,7 +57,7 @@ impl ReadWriteActor { keep_reconnect: bool, mut to_tcp: Receiver>, on_connected: T, - mut start_stop_chan: Receiver, + mut start_stop_chan: Receiver, // cancel: CancellationToken, ) where T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()>, @@ -69,12 +69,12 @@ impl ReadWriteActor { if !started { println!("waiting for start"); while let Some(m) = start_stop_chan.recv().await { - if m { - println!("true received"); + if m.len() != 0 { + println!("new token received"); started = true; break; } else { - println!("false received"); + println!("empty token received"); } } } @@ -146,12 +146,19 @@ impl ReadWriteActor { }; pin_mut!(read_from_tcp, write_to_tcp); + let received_empty_token_for_stop = async { + while let Some(tk) = start_stop_chan.recv().await { + if tk.len() == 0 { + return; + } + } + }; tokio::select! { _ = read_from_tcp => {}, _ = write_to_tcp => {}, _ = check_pong => {}, - Some(false) = start_stop_chan.recv() => { + _ = received_empty_token_for_stop => { started = false; } } @@ -190,7 +197,7 @@ impl ReadWriterHandle { on_connected: T, on_message: T2, pong_time: Arc, - start_stop_chan: Receiver, + start_stop_chan: Receiver, // cancel: CancellationToken, ) -> Self where @@ -229,7 +236,7 @@ pub fn init_tcp_conn<'a, T, T2, F>( on_message: T2, pong_time: Arc, // cancel: CancellationToken, - start_stop_chan: Receiver, + start_stop_chan: Receiver, ) where T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()> + Send + 'static, T2: Fn(SdlanTcp) -> F + Send + 'static,