sdlan bin to lib

This commit is contained in:
asxalex 2024-07-04 19:40:31 +08:00
parent de029dd6e7
commit 8a450fba3f
5 changed files with 170 additions and 8 deletions

13
Cargo.lock generated
View File

@ -385,6 +385,18 @@ dependencies = [
"subtle", "subtle",
] ]
[[package]]
name = "dns-lookup"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc"
dependencies = [
"cfg-if",
"libc",
"socket2",
"windows-sys 0.48.0",
]
[[package]] [[package]]
name = "dotenvy" name = "dotenvy"
version = "0.15.7" version = "0.15.7"
@ -1361,6 +1373,7 @@ name = "sdlan-rs"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"dashmap 6.0.1", "dashmap 6.0.1",
"dns-lookup",
"etherparse", "etherparse",
"futures-util", "futures-util",
"num_enum", "num_enum",

View File

@ -5,6 +5,7 @@ edition = "2021"
[dependencies] [dependencies]
dashmap = "6.0.1" dashmap = "6.0.1"
dns-lookup = "2.0.4"
etherparse = "0.15.0" etherparse = "0.15.0"
futures-util = "0.3.30" futures-util = "0.3.30"
num_enum = "0.7.2" num_enum = "0.7.2"

View File

@ -3,3 +3,144 @@ mod network;
mod pb; mod pb;
mod tcp; mod tcp;
mod utils; mod utils;
use std::time::Duration;
use std::net::SocketAddr;
use network::{async_main, init_edge, NodeConfig};
use tokio::sync::mpsc::Receiver;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
pub use utils::CommandLine;
use sdlan_sn_rs::{
log,
peer::SdlanSock,
utils::{create_or_load_uuid, get_sdlan_sock_from_socketaddr, Result, SDLanError},
};
pub async fn run_sdlan(args: CommandLine, start_stop_chan: Receiver<String>) -> Result<()> {
let _guard = log::init_log();
let edge_uuid = create_or_load_uuid("")?;
let node_conf = parse_config(edge_uuid, &args).await?;
if let Err(e) = init_edge(&args.token, node_conf, args.tos).await {
panic!("failed to init edge: {:?}", e);
}
let cancel = CancellationToken::new();
tokio::spawn(async move {
if let Err(e) = async_main("linux".to_owned(), args, start_stop_chan, cancel).await {
error!("failed to run async main: {}", e.as_str());
}
});
Ok(())
}
async fn parse_config(uuid: String, args: &CommandLine) -> Result<NodeConfig> {
if args.sn.len() == 0 {
return Err(SDLanError::NormalError("no sn is specified"));
}
let mut node_conf = NodeConfig::new();
let sns: Vec<&str> = args.sn.split(",").collect();
let mut correct_sns: Vec<_>;
let mut sockaddr: Vec<_>;
loop {
debug!("parsing sns");
(correct_sns, sockaddr) = parse_sns(&sns);
if sockaddr.len() < 1 {
if correct_sns.len() > 0 {
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
// (correct_sns, sockaddr) = parse_sns(&correct_sns);
} else {
// correct_sns == 0, just error
panic!("no correct sns specified");
}
}
node_conf.super_nodes = sockaddr;
break;
}
node_conf.name = args.name.to_owned();
node_conf.mtu = args.mtu;
node_conf.node_uuid = uuid;
if args.register_ttl > 1 {
node_conf.register_ttl = args.register_ttl;
}
Ok(node_conf)
}
fn parse_sns<'a>(sns: &'a Vec<&'a str>) -> (Vec<&'a str>, Vec<SdlanSock>) {
let mut correct_sns = vec![];
let mut result = vec![];
for sn in sns {
let addr: Vec<_> = sn.split(":").collect();
if addr.len() != 2 {
error!("sn format error: [host:port] => {}", sn);
continue;
}
let Ok(port) = addr[1].parse::<u16>() else {
continue;
};
// store correct format sns
correct_sns.push(*sn);
// node_conf.super_nodes.push(sock);
if let Ok(ipaddr) = dns_lookup::lookup_host(addr[0]) {
for ip in ipaddr {
let sockaddr = SocketAddr::new(ip, port);
let Ok(sock) = get_sdlan_sock_from_socketaddr(sockaddr) else {
error!("failed to parse sn: {}", sn);
continue;
};
result.push(sock);
}
} else {
error!("failed to parse host: {}", sn);
continue;
}
}
(correct_sns, result)
}
#[cfg(test)]
mod test {
use sdlan_sn_rs::config::AF_INET;
use super::*;
#[test]
#[ignore]
fn test_parse_dns() {
let sns = vec!["git.asxalex.pw:80", "121.4.79.234:81"];
let (correct, res) = parse_sns(&sns);
assert_eq!(correct, sns);
assert_eq!(
res,
vec![
SdlanSock {
family: AF_INET,
port: 80,
v4: [10, 167, 69, 157],
v6: [0; 16],
},
SdlanSock {
family: AF_INET,
port: 81,
v4: [121, 4, 79, 234],
v6: [0; 16],
}
]
)
}
}

View File

@ -139,7 +139,7 @@ async fn handle_tcp_event(edge: &Node, eventtype: EventType, eventprotobuf: &[u8
pub async fn async_main( pub async fn async_main(
install_channel: String, install_channel: String,
args: CommandLine, args: CommandLine,
start_stop_chan: Receiver<bool>, start_stop_chan: Receiver<String>,
cancel: CancellationToken, cancel: CancellationToken,
) -> Result<()> { ) -> Result<()> {
// let _ = PidRecorder::new(".pid"); // let _ = PidRecorder::new(".pid");

View File

@ -57,7 +57,7 @@ impl ReadWriteActor {
keep_reconnect: bool, keep_reconnect: bool,
mut to_tcp: Receiver<Vec<u8>>, mut to_tcp: Receiver<Vec<u8>>,
on_connected: T, on_connected: T,
mut start_stop_chan: Receiver<bool>, mut start_stop_chan: Receiver<String>,
// cancel: CancellationToken, // cancel: CancellationToken,
) where ) where
T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()>, T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()>,
@ -69,12 +69,12 @@ impl ReadWriteActor {
if !started { if !started {
println!("waiting for start"); println!("waiting for start");
while let Some(m) = start_stop_chan.recv().await { while let Some(m) = start_stop_chan.recv().await {
if m { if m.len() != 0 {
println!("true received"); println!("new token received");
started = true; started = true;
break; break;
} else { } else {
println!("false received"); println!("empty token received");
} }
} }
} }
@ -146,12 +146,19 @@ impl ReadWriteActor {
}; };
pin_mut!(read_from_tcp, write_to_tcp); pin_mut!(read_from_tcp, write_to_tcp);
let received_empty_token_for_stop = async {
while let Some(tk) = start_stop_chan.recv().await {
if tk.len() == 0 {
return;
}
}
};
tokio::select! { tokio::select! {
_ = read_from_tcp => {}, _ = read_from_tcp => {},
_ = write_to_tcp => {}, _ = write_to_tcp => {},
_ = check_pong => {}, _ = check_pong => {},
Some(false) = start_stop_chan.recv() => { _ = received_empty_token_for_stop => {
started = false; started = false;
} }
} }
@ -190,7 +197,7 @@ impl ReadWriterHandle {
on_connected: T, on_connected: T,
on_message: T2, on_message: T2,
pong_time: Arc<AtomicU64>, pong_time: Arc<AtomicU64>,
start_stop_chan: Receiver<bool>, start_stop_chan: Receiver<String>,
// cancel: CancellationToken, // cancel: CancellationToken,
) -> Self ) -> Self
where where
@ -229,7 +236,7 @@ pub fn init_tcp_conn<'a, T, T2, F>(
on_message: T2, on_message: T2,
pong_time: Arc<AtomicU64>, pong_time: Arc<AtomicU64>,
// cancel: CancellationToken, // cancel: CancellationToken,
start_stop_chan: Receiver<bool>, start_stop_chan: Receiver<String>,
) where ) where
T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()> + Send + 'static, T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()> + Send + 'static,
T2: Fn(SdlanTcp) -> F + Send + 'static, T2: Fn(SdlanTcp) -> F + Send + 'static,