added query for info

This commit is contained in:
alex 2026-03-12 11:40:56 +08:00
parent de8bf7cd31
commit 814ab9181a
5 changed files with 318 additions and 90 deletions

View File

@ -0,0 +1,167 @@
use std::{net::SocketAddr, sync::atomic::Ordering, time::Duration};
use num_enum::TryFromPrimitive;
use punchnet::get_edge;
use sdlan_sn_rs::utils::Mac;
use serde::{Deserialize, Serialize};
use tokio::{net::UdpSocket, time::sleep};
#[derive(TryFromPrimitive)]
#[repr(u8)]
pub enum InfoFuncCode {
Info = 0x00,
InfoFeedback = 0x80,
}
#[derive(Serialize, Deserialize)]
pub struct InfoFeedback {
pub mac: Mac,
pub ip: u32,
pub tx_p2p: u64,
pub rx_p2p: u64,
pub tx_sup: u64,
pub rx_sup: u64,
}
pub async fn query_for_info() {
let Ok(udp) = UdpSocket::bind("127.0.0.1:0").await else {
eprintln!("failed to create");
return;
};
let remote = format!("127.0.0.1:{}", LOCAL_INFO_UDP_PORT).parse::<SocketAddr>().unwrap();
let buf = vec![InfoFuncCode::Info as u8, 0, 0];
if let Err(e) = udp.send_to(buf.as_slice(), remote).await {
eprintln!("failed to send query info");
return;
}
let mut buf = vec![0;1024];
tokio::select! {
data = udp.recv_from(&mut buf) => {
if let Ok((size, from)) = data {
if size < 3 {
eprintln!("no byte received");
return;
}
buf.truncate(size);
let Ok(typecode) = InfoFuncCode::try_from_primitive(buf[0]) else {
eprintln!("invalid type: {}", buf[0]);
return;
};
let size = u16::from_be_bytes([buf[1], buf[2]]);
if buf.len() as u16 != size + 3 {
eprintln!("info length error: buf.len={}, size={}", buf.len(), size);
return;
}
match typecode {
InfoFuncCode::InfoFeedback => {
let Ok(data) = serde_json::from_slice::<InfoFeedback>(&buf[3..]) else {
eprintln!("failed to marshal to json");
return;
};
println!("punchnet info:");
let ip = data.ip;
println!(" ip: {}.{}.{}.{}",
((ip>>24) & 0xff) as u8,
((ip>>16) & 0xff) as u8,
((ip>>8) & 0xff) as u8,
((ip) & 0xff) as u8,
);
let mac = data.mac;
println!(" mac: {:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]
);
println!(" p2p tx: {} bytes", data.tx_p2p);
println!(" p2p rx: {} bytes", data.rx_p2p);
println!(" super tx: {} bytes", data.tx_sup);
println!(" super rx: {} bytes", data.rx_sup);
}
_other => {
}
}
} else {
eprintln!("failed to recv from punchnet");
}
return;
}
_ = sleep(Duration::from_secs(3)) => {
eprintln!("query timed out, is punchnet running?");
return;
}
}
}
const LOCAL_INFO_UDP_PORT: u16 = 1789;
pub async fn handle_query_for_info_info() {
let Ok(udp) = UdpSocket::bind(format!("127.0.0.1:{}", LOCAL_INFO_UDP_PORT)).await else {
eprintln!("failed to create");
return;
};
let mut buf = vec![0u8; 1024];
loop {
if let Ok((size, from)) = udp.recv_from(&mut buf).await {
let current_data = &buf[..size];
handle_query_info(&udp, &buf[..size], from).await;
}
}
}
async fn handle_query_info(udp: &UdpSocket, buf: &[u8], from: SocketAddr) {
if buf.len() < 1 {
return;
}
let tp = buf[0];
let Ok(typecode) = InfoFuncCode::try_from_primitive(tp) else {
eprintln!("invalid type: {}", tp);
return;
};
match typecode {
InfoFuncCode::Info => {
send_info_back(udp, from).await;
}
_other => {
}
}
}
async fn send_info_back(udp: &UdpSocket, from: SocketAddr) {
let edge = get_edge();
let ip = edge.device_config.get_ip();
let mac = edge.device_config.get_mac();
let feedback = InfoFeedback {
ip,
mac,
tx_p2p: edge.stats.tx_p2p.load(Ordering::Relaxed),
rx_p2p: edge.stats.rx_p2p.load(Ordering::Relaxed),
tx_sup: edge.stats.tx_sup.load(Ordering::Relaxed),
rx_sup: edge.stats.rx_sup.load(Ordering::Relaxed),
};
let value = serde_json::to_string(&feedback).unwrap();
let mut content = Vec::with_capacity(value.len() + 3);
content.push(InfoFuncCode::InfoFeedback as u8);
let size = value.len() as u16;
let size_buf = size.to_be_bytes();
content.extend_from_slice(&size_buf);
content.extend_from_slice(value.as_bytes());
udp.send_to(content.as_slice(), from).await;
}

View File

@ -1,13 +1,19 @@
mod api; mod api;
mod local_udp_info;
use std::fs;
use std::fs::File; use std::fs::File;
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::stdout;
use std::process; use std::process;
use std::env; use std::env;
use std::time::Duration; use std::time::Duration;
use clap::Parser; use clap::Parser;
use daemonize::Daemonize; use daemonize::Daemonize;
use futures_util::io;
use libc::SIGTERM;
use libc::kill;
use punchnet::CachedLoginInfo; use punchnet::CachedLoginInfo;
use punchnet::CommandLineInput2; use punchnet::CommandLineInput2;
use punchnet::Commands; use punchnet::Commands;
@ -29,7 +35,8 @@ use sdlan_sn_rs::utils::Mac;
use sdlan_sn_rs::utils::Result; use sdlan_sn_rs::utils::Result;
use sdlan_sn_rs::utils::create_or_load_uuid; use sdlan_sn_rs::utils::create_or_load_uuid;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::io::stdout; use tokio::net::UdpSocket;
use tokio::runtime::Runtime;
use tokio::time::sleep; use tokio::time::sleep;
use tracing::error; use tracing::error;
@ -44,6 +51,8 @@ use crate::api::TEST_PREFIX;
use crate::api::connect; use crate::api::connect;
use crate::api::login_with_token; use crate::api::login_with_token;
use crate::api::login_with_user_pass; use crate::api::login_with_user_pass;
use crate::local_udp_info::handle_query_for_info_info;
use crate::local_udp_info::query_for_info;
const APP_USER_ENV_NAME: &str = "PUNCH_USER"; const APP_USER_ENV_NAME: &str = "PUNCH_USER";
@ -107,6 +116,8 @@ async fn daemonize_me(
client_id: String, client_id: String,
mac: Mac, mac: Mac,
) { ) {
let _guard = log::init_log(&format!("{}/.output", get_base_dir()));
println!("identity_id = {}", connect_info.identity_id); println!("identity_id = {}", connect_info.identity_id);
let self_host_name = connect_info.hostname; let self_host_name = connect_info.hostname;
@ -177,6 +188,10 @@ async fn daemonize_me(
let _ = rx.recv(); let _ = rx.recv();
tokio::spawn(async {
handle_query_for_info_info().await;
});
let edge = get_edge(); let edge = get_edge();
// let res = edge.start_without_feedback(cmd.token).await; // let res = edge.start_without_feedback(cmd.token).await;
/* /*
@ -235,6 +250,7 @@ async fn daemonize_me(
match tokio::signal::ctrl_c().await { match tokio::signal::ctrl_c().await {
Ok(()) => { Ok(()) => {
let _ = restore_dns(); let _ = restore_dns();
delete_pid_file();
} }
Err(err) => { Err(err) => {
eprintln!("failed to listen for shutdown signal: {}", err); eprintln!("failed to listen for shutdown signal: {}", err);
@ -258,17 +274,13 @@ async fn daemonize_me(
} }
#[tokio::main] fn main() {
async fn main() {
set_base_dir("/usr/local/punchnet"); set_base_dir("/usr/local/punchnet");
let _guard = log::init_log(&format!("{}/.output", get_base_dir())); // let _guard = log::init_log(&format!("{}/.output", get_base_dir()));
let client_id = create_or_load_uuid(&format!("{}/.id", get_base_dir()), None).unwrap(); let client_id = create_or_load_uuid(&format!("{}/.id", get_base_dir()), None).unwrap();
let test_token = "49722584273728716817720074439183";
let mac = create_or_load_mac(); let mac = create_or_load_mac();
let system = "linux"; let system = "linux";
let version = "1.0.0"; let version = "1.0.0";
@ -276,103 +288,145 @@ async fn main() {
let cmd = CommandLineInput2::parse(); let cmd = CommandLineInput2::parse();
// println!("port is {}", cmd.port); // println!("port is {}", cmd.port);
let connect_info: ConnectData; match &cmd.cmd {
let remembered: CachedLoginInfo;
let should_daemonize: bool;
match cmd.cmd {
Commands::Login(user) => { Commands::Login(user) => {
// TODO: do login with user let rt = Runtime::new().unwrap();
let _ = parse_login_result( rt.block_on( async move {
login_with_user_pass(TEST_PREFIX, &client_id, &user.username, &user.password, mac, system, version).await let _ = parse_login_result(
); login_with_user_pass(TEST_PREFIX, &client_id, &user.username, &user.password, mac, system, version).await
);
});
process::exit(0); process::exit(0);
// TODO: do login with user
} }
Commands::TokenLogin(tk) => { Commands::TokenLogin(tk) => {
let _ = parse_login_result( let rt = Runtime::new().unwrap();
login_with_token(TEST_PREFIX, &client_id, &tk.token, mac, system, version).await rt.block_on(async move {
); let _ = parse_login_result(
process::exit(0);
}
Commands::AutoRun(tk) => {
let mut remembered_token = get_access_token();
if remembered_token.is_none() {
let data = parse_login_result(
login_with_token(TEST_PREFIX, &client_id, &tk.token, mac, system, version).await login_with_token(TEST_PREFIX, &client_id, &tk.token, mac, system, version).await
); );
remembered_token = Some(CachedLoginInfo{ });
access_token: data.access_token, process::exit(0);
username: data.username,
user_type: data.user_type,
audit: data.audit,
network_id: data.network_id,
network_name: data.network_name,
});
}
remembered = remembered_token.unwrap();
connect_info = parse_connect_result(
connect(TEST_PREFIX, &client_id, &remembered.access_token).await
);
should_daemonize = false;
}
Commands::Start => {
let remembered_token = get_access_token();
if remembered_token.is_none() {
eprintln!("not logged in, should login with user/pass or token first");
process::exit(-2);
}
remembered = remembered_token.unwrap();
connect_info = parse_connect_result(
connect(TEST_PREFIX, &client_id, &remembered.access_token).await
);
should_daemonize = false;
} }
Commands::Stop => { Commands::Stop => {
process::exit(-4); match fs::read_to_string("/tmp/punchnet.pid") {
Ok(content) => {
let pid: i32 = match content.trim().parse() {
Ok(value) => value,
Err(_e) => {
eprintln!("failed to parse value: {}", content);
process::exit(-4);
}
};
let result = unsafe {
kill(pid, SIGTERM)
};
if result != 0 {
let err = io::Error::last_os_error();
eprintln!("failed to kill: {}", err);
process::exit(-5);
}
}
Err(e) => {
eprintln!("failed to read pid: {}", e);
process::exit(-6);
}
}
process::exit(0);
}
Commands::Info => {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
query_for_info().await;
});
process::exit(0);
}
_other => {
// just fall through to next code
} }
} }
let out = OpenOptions::new()
.create(true)
.truncate(true)
.append(true)
.write(true)
.open("/tmp/punchnet.out").unwrap();
let err = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open("/tmp/punchnet.err").unwrap();
if should_daemonize { let daemonize = Daemonize::new()
let stdout = OpenOptions::new() .pid_file("/tmp/punchnet.pid")
.create(true) .chown_pid_file(true)
.append(true) .working_directory(get_base_dir())
.write(true) .stdout(out)
.open("/tmp/punchnet.out").unwrap(); .stderr(err)
let stderr = OpenOptions::new() .privileged_action(|| {
.create(true) });
.append(true)
.write(true)
.open("/tmp/punchnet.err").unwrap();
let daemonize = Daemonize::new() match daemonize.start() {
.pid_file("/tmp/punchnet.pid") Ok(_) => {
.chown_pid_file(true) let rt = Runtime::new().unwrap();
.working_directory(get_base_dir()) match &cmd.cmd {
.stdout(stdout) Commands::Start => {
.stderr(stderr) rt.block_on(async move {
.privileged_action(|| { let remembered_token = get_access_token();
if remembered_token.is_none() {
eprintln!("not logged in, should login with user/pass or token first");
process::exit(-2);
}
}); let remembered = remembered_token.unwrap();
match daemonize.start() {
Ok(_) => { let connect_info = parse_connect_result(
loop { connect(TEST_PREFIX, &client_id, &remembered.access_token).await
println!("guard is {:?}", _guard); );
sleep(Duration::from_secs(3)).await; daemonize_me(connect_info, remembered, client_id, mac).await;
})
} }
daemonize_me(connect_info, remembered, client_id, mac).await; Commands::AutoRun(tk) => {
rt.block_on(async move {
let mut remembered_token = get_access_token();
if remembered_token.is_none() {
let data = parse_login_result(
login_with_token(TEST_PREFIX, &client_id, &tk.token, mac, system, version).await
);
remembered_token = Some(CachedLoginInfo{
access_token: data.access_token,
username: data.username,
user_type: data.user_type,
audit: data.audit,
network_id: data.network_id,
network_name: data.network_name,
});
}
let remembered = remembered_token.unwrap();
let connect_info = parse_connect_result(
connect(TEST_PREFIX, &client_id, &remembered.access_token).await
);
daemonize_me(connect_info, remembered, client_id, mac).await;
})
}
other => {
eprintln!("should not comes here");
process::exit(-1);
}
} }
Err(e) => {
eprintln!("failed to daemonize: {}", e); }
} Err(e) => {
eprintln!("failed to daemonize");
} }
} else {
daemonize_me(connect_info, remembered, client_id, mac).await;
} }
} }
pub fn delete_pid_file() {
fs::remove_file("/tmp/punchnet.pid");
}

View File

@ -903,6 +903,11 @@ async fn handle_tun_packet(
return; return;
}; };
if _from_sn {
eee.stats.rx_sup.fetch_add(data.len() as u64, Ordering::Relaxed);
} else {
eee.stats.rx_p2p.fetch_add(data.len() as u64, Ordering::Relaxed);
}
if let Some(ip) = headers.net { if let Some(ip) = headers.net {
match ip { match ip {

View File

@ -7,7 +7,7 @@ use sdlan_sn_rs::{config::AF_INET, peer::{SdlanSock, V6Info}, utils::{Result, SD
use tokio::{sync::mpsc::{Receiver, Sender, channel}}; use tokio::{sync::mpsc::{Receiver, Sender, channel}};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, error}; use tracing::{debug, error, warn};
use crate::{ConnectionInfo, ConnectionState, config::{NULL_MAC, TCP_PING_TIME}, get_edge, network::{ARP_REPLY, ArpHdr, EthHdr, Node, RegisterSuperFeedback, StartStopInfo, check_peer_registration_needed, handle_packet_peer_info}, pb::{SdlArpResponse, SdlPolicyResponse, SdlRegisterSuper, SdlRegisterSuperAck, SdlRegisterSuperNak, SdlSendRegisterEvent, encode_to_tcp_message}, tcp::{EventType, NakMsgCode, NatType, PacketType, RuleInfo, SdlanTcp, read_a_packet, send_stun_request, set_identity_cache}}; use crate::{ConnectionInfo, ConnectionState, config::{NULL_MAC, TCP_PING_TIME}, get_edge, network::{ARP_REPLY, ArpHdr, EthHdr, Node, RegisterSuperFeedback, StartStopInfo, check_peer_registration_needed, handle_packet_peer_info}, pb::{SdlArpResponse, SdlPolicyResponse, SdlRegisterSuper, SdlRegisterSuperAck, SdlRegisterSuperNak, SdlSendRegisterEvent, encode_to_tcp_message}, tcp::{EventType, NakMsgCode, NatType, PacketType, RuleInfo, SdlanTcp, read_a_packet, send_stun_request, set_identity_cache}};
@ -185,7 +185,7 @@ async fn handle_tcp_message(msg: SdlanTcp) {
}; };
if resp.target_mac.len() != 6 { if resp.target_mac.len() != 6 {
// invalid target_mac // invalid target_mac
error!("invalid target_mac"); error!("invalid target_mac: {:?}, ip={}", resp.target_mac, ip_to_string(&resp.target_ip));
return; return;
} }
@ -591,7 +591,7 @@ impl ReadWriteActor {
loop { loop {
match read_a_packet(&mut recv).await { match read_a_packet(&mut recv).await {
Ok(packet) => { Ok(packet) => {
debug!("got packet: {:?}", packet); warn!("got packet: {:?}", packet);
if let Err(_e) = self.from_tcp.send(packet).await { if let Err(_e) = self.from_tcp.send(packet).await {
error!("failed to receive a packet: {:?}", _e); error!("failed to receive a packet: {:?}", _e);
} }

View File

@ -24,6 +24,8 @@ pub enum Commands {
/// connect to the remote /// connect to the remote
Start, Start,
Info,
/// exits the /// exits the
Stop, Stop,
} }