diff --git a/src/bin/punchnet/local_udp_info.rs b/src/bin/punchnet/local_udp_info.rs new file mode 100644 index 0000000..298c6a0 --- /dev/null +++ b/src/bin/punchnet/local_udp_info.rs @@ -0,0 +1,167 @@ +use std::{net::SocketAddr, sync::atomic::Ordering, time::Duration}; + +use num_enum::TryFromPrimitive; +use punchnet::get_edge; +use sdlan_sn_rs::utils::Mac; +use serde::{Deserialize, Serialize}; +use tokio::{net::UdpSocket, time::sleep}; + + +#[derive(TryFromPrimitive)] +#[repr(u8)] +pub enum InfoFuncCode { + Info = 0x00, + + InfoFeedback = 0x80, +} + +#[derive(Serialize, Deserialize)] +pub struct InfoFeedback { + pub mac: Mac, + pub ip: u32, + pub tx_p2p: u64, + pub rx_p2p: u64, + + pub tx_sup: u64, + pub rx_sup: u64, +} + +pub async fn query_for_info() { + let Ok(udp) = UdpSocket::bind("127.0.0.1:0").await else { + eprintln!("failed to create"); + return; + }; + + let remote = format!("127.0.0.1:{}", LOCAL_INFO_UDP_PORT).parse::().unwrap(); + + let buf = vec![InfoFuncCode::Info as u8, 0, 0]; + + if let Err(e) = udp.send_to(buf.as_slice(), remote).await { + eprintln!("failed to send query info"); + return; + } + + let mut buf = vec![0;1024]; + tokio::select! { + data = udp.recv_from(&mut buf) => { + if let Ok((size, from)) = data { + if size < 3 { + eprintln!("no byte received"); + return; + } + buf.truncate(size); + + let Ok(typecode) = InfoFuncCode::try_from_primitive(buf[0]) else { + eprintln!("invalid type: {}", buf[0]); + return; + }; + + let size = u16::from_be_bytes([buf[1], buf[2]]); + if buf.len() as u16 != size + 3 { + eprintln!("info length error: buf.len={}, size={}", buf.len(), size); + return; + } + + match typecode { + InfoFuncCode::InfoFeedback => { + let Ok(data) = serde_json::from_slice::(&buf[3..]) else { + eprintln!("failed to marshal to json"); + return; + }; + println!("punchnet info:"); + let ip = data.ip; + println!(" ip: {}.{}.{}.{}", + ((ip>>24) & 0xff) as u8, + ((ip>>16) & 0xff) as u8, + ((ip>>8) & 0xff) as u8, + ((ip) & 0xff) as u8, + ); + let mac = data.mac; + println!(" mac: {:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}", + mac[0], mac[1], mac[2], mac[3], mac[4], mac[5] + ); + + println!(" p2p tx: {} bytes", data.tx_p2p); + println!(" p2p rx: {} bytes", data.rx_p2p); + + println!(" super tx: {} bytes", data.tx_sup); + println!(" super rx: {} bytes", data.rx_sup); + } + _other => { + } + } + + } else { + eprintln!("failed to recv from punchnet"); + } + return; + } + _ = sleep(Duration::from_secs(3)) => { + eprintln!("query timed out, is punchnet running?"); + return; + } + } +} + +const LOCAL_INFO_UDP_PORT: u16 = 1789; + +pub async fn handle_query_for_info_info() { + let Ok(udp) = UdpSocket::bind(format!("127.0.0.1:{}", LOCAL_INFO_UDP_PORT)).await else { + eprintln!("failed to create"); + return; + }; + + let mut buf = vec![0u8; 1024]; + loop { + if let Ok((size, from)) = udp.recv_from(&mut buf).await { + let current_data = &buf[..size]; + handle_query_info(&udp, &buf[..size], from).await; + } + } +} + +async fn handle_query_info(udp: &UdpSocket, buf: &[u8], from: SocketAddr) { + if buf.len() < 1 { + return; + } + let tp = buf[0]; + + let Ok(typecode) = InfoFuncCode::try_from_primitive(tp) else { + eprintln!("invalid type: {}", tp); + return; + }; + + match typecode { + InfoFuncCode::Info => { + send_info_back(udp, from).await; + } + _other => { + } + } +} + +async fn send_info_back(udp: &UdpSocket, from: SocketAddr) { + let edge = get_edge(); + let ip = edge.device_config.get_ip(); + let mac = edge.device_config.get_mac(); + let feedback = InfoFeedback { + ip, + mac, + tx_p2p: edge.stats.tx_p2p.load(Ordering::Relaxed), + rx_p2p: edge.stats.rx_p2p.load(Ordering::Relaxed), + + tx_sup: edge.stats.tx_sup.load(Ordering::Relaxed), + rx_sup: edge.stats.rx_sup.load(Ordering::Relaxed), + }; + let value = serde_json::to_string(&feedback).unwrap(); + + let mut content = Vec::with_capacity(value.len() + 3); + content.push(InfoFuncCode::InfoFeedback as u8); + + let size = value.len() as u16; + let size_buf = size.to_be_bytes(); + + content.extend_from_slice(&size_buf); + content.extend_from_slice(value.as_bytes()); + udp.send_to(content.as_slice(), from).await; +} diff --git a/src/bin/punchnet/main.rs b/src/bin/punchnet/main.rs index eaa85be..b20aff8 100755 --- a/src/bin/punchnet/main.rs +++ b/src/bin/punchnet/main.rs @@ -1,13 +1,19 @@ mod api; +mod local_udp_info; +use std::fs; use std::fs::File; use std::fs::OpenOptions; +use std::io::stdout; use std::process; use std::env; use std::time::Duration; use clap::Parser; use daemonize::Daemonize; +use futures_util::io; +use libc::SIGTERM; +use libc::kill; use punchnet::CachedLoginInfo; use punchnet::CommandLineInput2; use punchnet::Commands; @@ -29,7 +35,8 @@ use sdlan_sn_rs::utils::Mac; use sdlan_sn_rs::utils::Result; use sdlan_sn_rs::utils::create_or_load_uuid; use tokio::io::AsyncWriteExt; -use tokio::io::stdout; +use tokio::net::UdpSocket; +use tokio::runtime::Runtime; use tokio::time::sleep; use tracing::error; @@ -44,6 +51,8 @@ use crate::api::TEST_PREFIX; use crate::api::connect; use crate::api::login_with_token; use crate::api::login_with_user_pass; +use crate::local_udp_info::handle_query_for_info_info; +use crate::local_udp_info::query_for_info; const APP_USER_ENV_NAME: &str = "PUNCH_USER"; @@ -107,6 +116,8 @@ async fn daemonize_me( client_id: String, mac: Mac, ) { + let _guard = log::init_log(&format!("{}/.output", get_base_dir())); + println!("identity_id = {}", connect_info.identity_id); let self_host_name = connect_info.hostname; @@ -177,6 +188,10 @@ async fn daemonize_me( let _ = rx.recv(); + tokio::spawn(async { + handle_query_for_info_info().await; + }); + let edge = get_edge(); // let res = edge.start_without_feedback(cmd.token).await; /* @@ -235,6 +250,7 @@ async fn daemonize_me( match tokio::signal::ctrl_c().await { Ok(()) => { let _ = restore_dns(); + delete_pid_file(); } Err(err) => { eprintln!("failed to listen for shutdown signal: {}", err); @@ -258,17 +274,13 @@ async fn daemonize_me( } -#[tokio::main] -async fn main() { +fn main() { set_base_dir("/usr/local/punchnet"); - let _guard = log::init_log(&format!("{}/.output", get_base_dir())); + // let _guard = log::init_log(&format!("{}/.output", get_base_dir())); let client_id = create_or_load_uuid(&format!("{}/.id", get_base_dir()), None).unwrap(); - let test_token = "49722584273728716817720074439183"; - let mac = create_or_load_mac(); - let system = "linux"; let version = "1.0.0"; @@ -276,103 +288,145 @@ async fn main() { let cmd = CommandLineInput2::parse(); // println!("port is {}", cmd.port); - let connect_info: ConnectData; - let remembered: CachedLoginInfo; - - let should_daemonize: bool; - - match cmd.cmd { + match &cmd.cmd { Commands::Login(user) => { - // TODO: do login with user - let _ = parse_login_result( - login_with_user_pass(TEST_PREFIX, &client_id, &user.username, &user.password, mac, system, version).await - ); + let rt = Runtime::new().unwrap(); + rt.block_on( async move { + let _ = parse_login_result( + login_with_user_pass(TEST_PREFIX, &client_id, &user.username, &user.password, mac, system, version).await + ); + }); process::exit(0); + // TODO: do login with user } Commands::TokenLogin(tk) => { - let _ = parse_login_result( - login_with_token(TEST_PREFIX, &client_id, &tk.token, mac, system, version).await - ); - process::exit(0); - } - Commands::AutoRun(tk) => { - let mut remembered_token = get_access_token(); - if remembered_token.is_none() { - let data = parse_login_result( + let rt = Runtime::new().unwrap(); + rt.block_on(async move { + let _ = parse_login_result( login_with_token(TEST_PREFIX, &client_id, &tk.token, mac, system, version).await ); - remembered_token = Some(CachedLoginInfo{ - access_token: data.access_token, - username: data.username, - user_type: data.user_type, - audit: data.audit, - network_id: data.network_id, - network_name: data.network_name, - }); - } - - remembered = remembered_token.unwrap(); - - connect_info = parse_connect_result( - connect(TEST_PREFIX, &client_id, &remembered.access_token).await - ); - should_daemonize = false; - } - Commands::Start => { - let remembered_token = get_access_token(); - if remembered_token.is_none() { - eprintln!("not logged in, should login with user/pass or token first"); - process::exit(-2); - } - - remembered = remembered_token.unwrap(); - - connect_info = parse_connect_result( - connect(TEST_PREFIX, &client_id, &remembered.access_token).await - ); - should_daemonize = false; + }); + process::exit(0); } Commands::Stop => { - process::exit(-4); + match fs::read_to_string("/tmp/punchnet.pid") { + Ok(content) => { + let pid: i32 = match content.trim().parse() { + Ok(value) => value, + Err(_e) => { + eprintln!("failed to parse value: {}", content); + process::exit(-4); + } + }; + let result = unsafe { + kill(pid, SIGTERM) + }; + if result != 0 { + let err = io::Error::last_os_error(); + eprintln!("failed to kill: {}", err); + process::exit(-5); + } + } + Err(e) => { + eprintln!("failed to read pid: {}", e); + process::exit(-6); + } + } + process::exit(0); + } + Commands::Info => { + + let rt = Runtime::new().unwrap(); + rt.block_on(async move { + query_for_info().await; + }); + process::exit(0); + } + _other => { + // just fall through to next code } } + let out = OpenOptions::new() + .create(true) + .truncate(true) + .append(true) + .write(true) + .open("/tmp/punchnet.out").unwrap(); + let err = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open("/tmp/punchnet.err").unwrap(); - if should_daemonize { - let stdout = OpenOptions::new() - .create(true) - .append(true) - .write(true) - .open("/tmp/punchnet.out").unwrap(); - let stderr = OpenOptions::new() - .create(true) - .append(true) - .write(true) - .open("/tmp/punchnet.err").unwrap(); + let daemonize = Daemonize::new() + .pid_file("/tmp/punchnet.pid") + .chown_pid_file(true) + .working_directory(get_base_dir()) + .stdout(out) + .stderr(err) + .privileged_action(|| { + }); - let daemonize = Daemonize::new() - .pid_file("/tmp/punchnet.pid") - .chown_pid_file(true) - .working_directory(get_base_dir()) - .stdout(stdout) - .stderr(stderr) - .privileged_action(|| { + match daemonize.start() { + Ok(_) => { + let rt = Runtime::new().unwrap(); + match &cmd.cmd { + Commands::Start => { + rt.block_on(async move { + let remembered_token = get_access_token(); + if remembered_token.is_none() { + eprintln!("not logged in, should login with user/pass or token first"); + process::exit(-2); + } - }); - match daemonize.start() { - Ok(_) => { - loop { - println!("guard is {:?}", _guard); - sleep(Duration::from_secs(3)).await; + let remembered = remembered_token.unwrap(); + + let connect_info = parse_connect_result( + connect(TEST_PREFIX, &client_id, &remembered.access_token).await + ); + daemonize_me(connect_info, remembered, client_id, mac).await; + }) } - daemonize_me(connect_info, remembered, client_id, mac).await; + Commands::AutoRun(tk) => { + rt.block_on(async move { + let mut remembered_token = get_access_token(); + if remembered_token.is_none() { + let data = parse_login_result( + login_with_token(TEST_PREFIX, &client_id, &tk.token, mac, system, version).await + ); + remembered_token = Some(CachedLoginInfo{ + access_token: data.access_token, + username: data.username, + user_type: data.user_type, + audit: data.audit, + network_id: data.network_id, + network_name: data.network_name, + }); + } + let remembered = remembered_token.unwrap(); + + let connect_info = parse_connect_result( + connect(TEST_PREFIX, &client_id, &remembered.access_token).await + ); + daemonize_me(connect_info, remembered, client_id, mac).await; + }) + + } + other => { + eprintln!("should not comes here"); + process::exit(-1); + } } - Err(e) => { - eprintln!("failed to daemonize: {}", e); - } + + } + Err(e) => { + eprintln!("failed to daemonize"); } - } else { - daemonize_me(connect_info, remembered, client_id, mac).await; } } + +pub fn delete_pid_file() { + fs::remove_file("/tmp/punchnet.pid"); +} diff --git a/src/network/packet.rs b/src/network/packet.rs index a0cb614..23598e8 100755 --- a/src/network/packet.rs +++ b/src/network/packet.rs @@ -903,6 +903,11 @@ async fn handle_tun_packet( return; }; + if _from_sn { + eee.stats.rx_sup.fetch_add(data.len() as u64, Ordering::Relaxed); + } else { + eee.stats.rx_p2p.fetch_add(data.len() as u64, Ordering::Relaxed); + } if let Some(ip) = headers.net { match ip { diff --git a/src/tcp/quic.rs b/src/tcp/quic.rs index dfb50ec..194658f 100644 --- a/src/tcp/quic.rs +++ b/src/tcp/quic.rs @@ -7,7 +7,7 @@ use sdlan_sn_rs::{config::AF_INET, peer::{SdlanSock, V6Info}, utils::{Result, SD use tokio::{sync::mpsc::{Receiver, Sender, channel}}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; use crate::{ConnectionInfo, ConnectionState, config::{NULL_MAC, TCP_PING_TIME}, get_edge, network::{ARP_REPLY, ArpHdr, EthHdr, Node, RegisterSuperFeedback, StartStopInfo, check_peer_registration_needed, handle_packet_peer_info}, pb::{SdlArpResponse, SdlPolicyResponse, SdlRegisterSuper, SdlRegisterSuperAck, SdlRegisterSuperNak, SdlSendRegisterEvent, encode_to_tcp_message}, tcp::{EventType, NakMsgCode, NatType, PacketType, RuleInfo, SdlanTcp, read_a_packet, send_stun_request, set_identity_cache}}; @@ -185,7 +185,7 @@ async fn handle_tcp_message(msg: SdlanTcp) { }; if resp.target_mac.len() != 6 { // invalid target_mac - error!("invalid target_mac"); + error!("invalid target_mac: {:?}, ip={}", resp.target_mac, ip_to_string(&resp.target_ip)); return; } @@ -591,7 +591,7 @@ impl ReadWriteActor { loop { match read_a_packet(&mut recv).await { Ok(packet) => { - debug!("got packet: {:?}", packet); + warn!("got packet: {:?}", packet); if let Err(_e) = self.from_tcp.send(packet).await { error!("failed to receive a packet: {:?}", _e); } diff --git a/src/utils/command.rs b/src/utils/command.rs index abd05c5..4b08484 100755 --- a/src/utils/command.rs +++ b/src/utils/command.rs @@ -24,6 +24,8 @@ pub enum Commands { /// connect to the remote Start, + Info, + /// exits the Stop, }