This commit is contained in:
asxalex 2024-07-04 16:50:28 +08:00
commit de029dd6e7
25 changed files with 5735 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/target
*.so

2416
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

19
Cargo.toml Normal file
View File

@ -0,0 +1,19 @@
[package]
name = "sdlan-rs"
version = "0.1.0"
edition = "2021"
[dependencies]
dashmap = "6.0.1"
etherparse = "0.15.0"
futures-util = "0.3.30"
num_enum = "0.7.2"
once_cell = "1.19.0"
prost = "0.12.6"
prost-build = "0.12.6"
rsa = "0.9.6"
sdlan-sn-rs = { git = "ssh://git@git.asxalex.pw/sdlan-v2/sdlan-rs.git" }
structopt = "0.3.26"
tokio = { version = "1.38.0", futures = ["full"] }
tokio-util = "0.7.11"
tracing = "0.1.40"

6
Makefile Normal file
View File

@ -0,0 +1,6 @@
pb:
cargo run --bin build_pb
mv src/pb/_.rs src/pb/message.rs
libtun-so:
cd src/network && gcc -fPIC -shared -o libtuntap.so tuntap.c && cd -

156
message.proto Normal file
View File

@ -0,0 +1,156 @@
syntax = "proto3";
//
message SDLV4Info {
uint32 port = 1;
bytes v4 = 2;
uint32 nat_type = 3;
}
message SDLV6Info {
uint32 port = 1;
bytes v6 = 2;
}
//
message SDLDevAddr {
uint32 network_id = 1;
uint32 net_addr = 2;
uint32 net_bit_len = 3;
}
// tcp通讯消息
message SDLEmpty {
}
message SDLRegisterSuper {
uint32 version = 1;
string installed_channel = 2;
string client_id = 3;
SDLDevAddr dev_addr = 4;
string pub_key = 5;
string token = 6;
}
message SDLRegisterSuperAck {
SDLDevAddr dev_addr = 1;
bytes aes_key = 2;
bytes known_ips = 3;
uint32 upgrade_type = 4;
optional string upgrade_prompt = 5;
optional string upgrade_address = 6;
}
message SDLRegisterSuperNak {
uint32 error_code = 1;
string error_message = 2;
}
//
message SDLQueryInfo {
uint32 dst_ip = 1;
}
message SDLPeerInfo {
uint32 dst_ip = 1;
SDLV4Info v4_info = 2;
optional SDLV6Info v6_info = 3;
}
//
message SDLKnownIpEvent {
uint32 ip = 1;
}
message SDLDropIpEvent {
uint32 ip = 1;
}
message SDLNatChangedEvent {
uint32 ip = 1;
}
message SDLSendRegisterEvent {
uint32 dst_ip = 1;
uint32 nat_ip = 2;
uint32 nat_port = 3;
}
message SDLNetworkShutdownEvent {
string message = 1;
}
//
message SDLChangeNetworkCommand {
SDLDevAddr dev_addr = 1;
bytes aes_key = 2;
bytes known_ips = 3;
}
message SDLCommandAck {
// status = true, status = false message是失败原因描述
bool status = 1;
optional string message = 2;
}
message SDLFlows {
//
uint32 forward_num = 1;
// p2p直接流量
uint32 p2p_num = 2;
//
uint32 inbound_num = 3;
}
// UDP通讯消息
message SDLStunRequest {
uint32 cookie = 1;
string client_id = 2;
uint32 network_id = 3;
uint32 ip = 4;
uint32 nat_type = 5;
}
message SDLStunReply {
uint32 cookie = 1;
}
message SDLData {
uint32 network_id = 1;
uint32 src_ip = 2;
uint32 dst_ip = 3;
bool is_p2p = 4;
uint32 ttl = 5;
bytes data = 6;
}
message SDLRegister {
uint32 network_id = 1;
uint32 src_ip = 2;
uint32 dst_ip = 3;
}
message SDLRegisterAck {
uint32 network_id = 1;
uint32 src_ip = 2;
uint32 dst_ip = 3;
}
//
message SDLStunProbe {
uint32 cookie = 1;
uint32 attr = 2;
}
message SDLStunProbeReply {
uint32 cookie = 1;
uint32 port = 2;
uint32 ip = 3;
}

8
src/bin/build_pb/main.rs Normal file
View File

@ -0,0 +1,8 @@
fn main() {
prost_build::Config::new()
.out_dir("src/pb")
// .out_dir("../tcp_mock/pb")
.protoc_arg("--experimental_allow_proto3_optional")
.compile_protos(&["message.proto"], &["../sdlan/"])
.unwrap();
}

9
src/config/mod.rs Normal file
View File

@ -0,0 +1,9 @@
pub const REGISTER_INTERVAL: u8 = 20;
pub const REGISTER_SUPER_INTERVAL: u16 = 20;
pub const MULITCAST_V4: [u8; 4] = [224, 0, 0, 69];
pub const MULTICAST_PORT: u16 = 1969;
// pub const SUPER_ATTEMPTS_DEFAULT: u8 = 3;
pub const TCP_PING_TIME: u64 = 7;

5
src/lib.rs Normal file
View File

@ -0,0 +1,5 @@
mod config;
mod network;
mod pb;
mod tcp;
mod utils;

495
src/network/async_main.rs Normal file
View File

@ -0,0 +1,495 @@
use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::time::Duration;
use crate::config::TCP_PING_TIME;
use crate::network::{get_edge, ping_to_sn, read_and_parse_packet};
use crate::pb::{
encode_to_tcp_message, encode_to_udp_message, SdlData, SdlDevAddr, SdlRegisterSuper,
SdlRegisterSuperAck, SdlRegisterSuperNak, SdlSendRegisterEvent, SdlStunRequest,
};
use crate::tcp::{init_tcp_conn, EventType, PacketType, SdlanTcp};
use crate::utils::{send_to_sock, CommandLine};
use etherparse::IpHeaders;
use sdlan_sn_rs::config::{AF_INET, SDLAN_DEFAULT_TTL};
use sdlan_sn_rs::peer::SdlanSock;
use sdlan_sn_rs::utils::{
self, aes_encrypt, create_or_load_uuid, get_current_timestamp, ip_to_string,
is_multi_broadcast, rsa_decrypt,
};
use sdlan_sn_rs::utils::{Result, SDLanError};
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_util::sync::CancellationToken;
use super::{check_peer_registration_needed, init_edge, packet, Node, NodeConfig};
use crate::utils::Socket;
use prost::Message;
use tracing::{debug, error, info};
async fn handle_tcp_message(msg: SdlanTcp) {
let edge = get_edge();
// let now = get_current_timestamp();
// edge.tcp_pong.store(now, Ordering::Relaxed);
debug!("got tcp message: {:?}", msg.packet_type);
match msg.packet_type {
PacketType::RegisterSuperACK => {
let Ok(ack) = SdlRegisterSuperAck::decode(&msg.current_packet[..]) else {
error!("failed to decode REGISTER_SUPER_ACK");
return;
};
debug!("got register super ack: {:?}", ack);
let Ok(aes) = rsa_decrypt(&edge.rsa_private, &ack.aes_key) else {
error!("failed to rsa decrypt aes key");
return;
};
let Some(dev) = ack.dev_addr else {
error!("no dev_addr is specified");
return;
};
let ip = ip_to_string(&dev.net_addr);
debug!("aes key is {:?}, ip is {}/{}", aes, ip, dev.net_bit_len,);
edge.device_config
.ip
.net_addr
.store(dev.net_addr, Ordering::Relaxed);
edge.device_config
.ip
.net_bit_len
.store(dev.net_bit_len as u8, Ordering::Relaxed);
edge.device.reload_config(&edge.device_config);
edge.network_id.store(dev.network_id, Ordering::Relaxed);
edge.set_authorized(true, aes);
}
PacketType::RegisterSuperNAK => {
let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else {
error!("failed to decode REGISTER_SUPER_NAK");
return;
};
edge.set_authorized(false, Vec::new());
// std::process::exit(0);
}
PacketType::Command => {
if msg.current_packet.len() < 1 {
error!("malformed COMMAND received");
return;
}
handle_tcp_command(edge, msg.current_packet[0], &msg.current_packet[1..]).await;
}
PacketType::Event => {
if msg.current_packet.len() < 1 {
error!("malformed EVENT received");
return;
}
let Ok(event) = msg.current_packet[0].try_into() else {
error!("failed to parse event type");
return;
};
handle_tcp_event(edge, event, &msg.current_packet[1..]).await;
}
PacketType::PeerInfo => {
let _ = packet::handle_packet_peer_info(edge, &msg.current_packet[..]).await;
}
PacketType::Pong => {
debug!("tcp pong received");
let now = get_current_timestamp();
edge.tcp_pong.store(now, Ordering::Relaxed);
}
other => {
debug!("tcp not handling {:?}", other);
}
}
}
async fn handle_tcp_command(_edge: &Node, _cmdtype: u8, _cmdprotobuf: &[u8]) {}
async fn handle_tcp_event(edge: &Node, eventtype: EventType, eventprotobuf: &[u8]) {
match eventtype {
EventType::SendRegister => {
let Ok(reg) = SdlSendRegisterEvent::decode(eventprotobuf) else {
error!("failed to decode SendRegister Event");
return;
};
let v4 = reg.nat_ip.to_be_bytes();
check_peer_registration_needed(
edge,
true,
reg.dst_ip,
&None,
&SdlanSock {
family: AF_INET,
port: reg.nat_port as u16,
v4,
v6: [0; 16],
},
)
.await;
}
other => {
debug!("unhandled event {:?}", other);
}
}
}
pub async fn async_main(
install_channel: String,
args: CommandLine,
start_stop_chan: Receiver<bool>,
cancel: CancellationToken,
) -> Result<()> {
// let _ = PidRecorder::new(".pid");
// // gen public key
// gen_rsa_keys(".client");
// let mut pubkey = String::new();
// File::open(".client/id_rsa.pub")?.read_to_string(&mut pubkey)?;
// let privatekey = load_private_key_file(".client/id_rsa")?;
// // init sock
// if args.token.len() == 0 {
// println!("failed to load token");
// return Ok(());
// }
// let sock_v4 = Socket::build(0, true, true, args.tos).await?;
// // allow multicast
// // TODO: set the sn's tcp socket
// // let tcpsock = TCPSocket::build("121.4.79.234:1234").await?;
// let tcp_pong = Arc::new(AtomicU64::new(0));
// let edge = Node::new(
// pubkey,
// node_conf,
// sock_v4,
// &args.token,
// privatekey,
// tcp_pong.clone(),
// );
let edge = get_edge();
// let token = args.token.clone();
init_tcp_conn(
&args.tcp,
move |stream| {
let token = args.token.clone();
let installed_channel = install_channel.to_owned();
Box::pin(async {
// let edge = get_edge();
// let edge = get_edge();
// let token = args.token.clone();
let register_super = SdlRegisterSuper {
version: 1,
installed_channel,
client_id: edge.config.node_uuid.clone(),
dev_addr: Some(SdlDevAddr {
net_addr: 0,
network_id: 0,
net_bit_len: 0,
}),
pub_key: edge.rsa_pubkey.clone(),
token,
};
let packet_id = edge.get_next_packet_id();
let data = encode_to_tcp_message(
Some(register_super),
packet_id,
PacketType::RegisterSuper as u8,
)
.unwrap();
if let Err(e) = stream.write(&data).await {
error!("failed to write to tcp: {}", e.to_string());
}
})
},
|msg| handle_tcp_message(msg),
edge.tcp_pong.clone(),
// tcp_pong,
start_stop_chan,
);
// tcp_conn.send("hello".as_bytes()).await;
// tokio::spawn(handle_tcp_message(tcp_conn.data_from_tcp));
// tcp_conn.send("".as_bytes());
debug!("waiting for authorization...");
loop {
// let _ = edge.send_register_super().await;
// let _ = read_and_parse_packet(edge, &edge.udp_sock_v4, Some(Duration::from_secs(3))).await;
println!("checking for authorized");
if edge.is_authorized() {
break;
}
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(3)) => {
continue;
}
_ = cancel.cancelled() => {
return Ok(());
}
}
}
{
let cancel = cancel.clone();
tokio::spawn(async move {
run_edge_loop(edge, cancel).await;
});
}
{
let cancel = cancel.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = cancel.cancelled() => {
if let Err(e) = edge.send_unregister_super().await {
error!("failed to send unregister super: {}", e.as_str());
}
break;
}
_ = tokio::time::sleep(Duration::from_secs(TCP_PING_TIME)) => {
ping_to_sn().await;
}
}
}
debug!("loop update_supernode_reg exited");
});
}
cancel.cancelled().await;
/*
match tokio::signal::ctrl_c().await {
Ok(()) => {
debug!("shutdown received");
cancel.cancel();
debug!("exiting async_main");
tokio::time::sleep(Duration::from_millis(500)).await;
debug!("exiting async_main2");
}
Err(err) => {
eprintln!("failed to listen for shutdown signal: {}", err);
}
}
*/
// std::process::exit(0);
Ok(())
}
async fn run_edge_loop(eee: &'static Node, cancel: CancellationToken) {
ping_to_sn().await;
{
let cancel = cancel.clone();
tokio::spawn(async move {
loop_socket_v4(eee, &eee.udp_sock_v4, cancel).await;
});
}
{
tokio::spawn(async move {
loop_tap(eee, cancel).await;
});
}
}
async fn loop_socket_v4(eee: &Node, socket: &Socket, cancel: CancellationToken) {
debug!("loop sock v4");
loop {
tokio::select! {
_ = cancel.cancelled() => {
break;
}
_ = read_and_parse_packet(eee, socket,Some(Duration::from_secs(10))) => { }
_ = tokio::time::sleep(Duration::from_secs(10)) => {
let req = SdlStunRequest {
cookie: 0,
client_id: eee.config.node_uuid.clone(),
network_id: eee.network_id.load(Ordering::Relaxed),
ip: eee.device_config.get_ip(),
nat_type: 0,
};
let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap();
if let Err(e) = send_to_sock(eee, &msg, &eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize]).await {
error!("failed to send to sock: {:?}", e);
}
}
}
}
debug!("loop_socket_v4 exited");
}
async fn loop_tap(eee: &'static Node, cancel: CancellationToken) {
debug!("loop tap");
let (tx, mut rx) = channel(10);
tokio::spawn(async {
get_tun_flow(eee, tx).await;
});
loop {
tokio::select! {
_ = cancel.cancelled() => {
drop(rx);
break;
}
buf = rx.recv() => {
if buf.is_none() {
break;
}
read_and_parse_tun_packet(eee, buf.unwrap()).await;
}
}
}
debug!("loop_tap exited");
}
async fn get_tun_flow(eee: &'static Node, tx: Sender<Vec<u8>>) {
loop {
let buf = tokio::task::spawn_blocking(|| {
let mut buf = vec![0; 1800];
let Ok(size) = eee.device.recv(&mut buf) else {
return vec![];
};
buf.truncate(size);
buf
})
.await
.unwrap();
if buf.len() == 0 {
return;
}
if let Err(e) = tx.send(buf).await {
error!("failed to send buf: {}", e);
return;
}
}
}
async fn read_and_parse_tun_packet(eee: &'static Node, buf: Vec<u8>) {
if !eee.is_authorized() {
debug!("drop packet before authorized");
return;
}
/*
if eee.stats.last_sup.load(Ordering::Relaxed) == 0 {
debug!("drop packet before first registration");
return;
}
*/
// buf.truncate(size);
edge_send_packet_to_net(eee, &buf).await;
}
async fn edge_send_packet_to_net(eee: &Node, data: &[u8]) {
debug!("edge send packet to net({} bytes): {:?}", data.len(), data);
match IpHeaders::from_slice(&data) {
Ok((iphdr, _payload)) => {
let Some(ipv4hdr) = iphdr.ipv4() else {
debug!("ipv6 packet ignored");
return;
};
let dstip = u32::from_be_bytes(ipv4hdr.0.destination);
debug!("packet dst ip: {:?}", ipv4hdr.0.destination);
let src = u32::from_be_bytes(ipv4hdr.0.source);
debug!("packet src ip: {:?}", ipv4hdr.0.source);
// packet should be sent to dev
debug!("got {} bytes from tun", data.len());
if (!eee.config.allow_routing) && (src != eee.device_config.get_ip()) {
info!("dropping routed packet");
return;
}
if !eee.is_authorized() {
debug!("drop tun packet due to not authed");
return;
}
let encrypt_key = eee.get_encrypt_key();
if encrypt_key.len() == 0 {
error!("drop tun packet due to encrypt key len is 0");
return;
}
let Ok(encrypted_flow) = aes_encrypt(encrypt_key.as_slice(), data) else {
error!("failed to encrypt flow");
return;
};
let message = SdlData {
// TODO: network id should be stored in
network_id: eee.network_id.load(Ordering::Relaxed),
src_ip: eee.device_config.get_ip(),
dst_ip: dstip,
is_p2p: true,
ttl: SDLAN_DEFAULT_TTL as u32,
data: encrypted_flow,
};
debug!("sending SdlData: {:?}", message);
let Ok(flow) = encode_to_udp_message(Some(message), PacketType::Data as u8) else {
error!("failed to encode to udp message");
return;
};
send_packet_to_net(eee, dstip, &flow).await;
}
Err(e) => {
error!("failed to parse ip packet: {}", e.to_string());
}
}
}
async fn send_packet_to_net(eee: &Node, dst_ip: u32, pkt: &[u8]) {
let (dest_sock, is_p2p) = find_peer_destination(eee, dst_ip).await;
if is_p2p {
eee.stats.tx_p2p.fetch_add(1, Ordering::Relaxed);
} else {
eee.stats.tx_sup.fetch_add(1, Ordering::Relaxed);
if is_multi_broadcast(dst_ip) {
eee.stats.tx_broadcast.fetch_add(1, Ordering::Relaxed);
}
}
debug!("send packet PACKET to {}", dest_sock.to_string());
if let Err(e) = send_to_sock(eee, pkt, &dest_sock).await {
error!("failed to send packet to net: {}", e.as_str());
}
}
async fn find_peer_destination(eee: &Node, dst_ip: u32) -> (SdlanSock, bool) {
if is_multi_broadcast(dst_ip) {
return (
eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize]
.deepcopy(),
false,
);
}
let mut is_p2p = false;
let result: SdlanSock;
if let Some(dst) = eee.known_peers.get_peer(&dst_ip) {
let now = get_current_timestamp();
if now - dst.last_seen.load(Ordering::Relaxed) >= ((dst.timeout / 2) as u64) {
// too much time elapsed since we saw the peer, need to register again
eee.known_peers.delete_peer_with_ip(&dst_ip);
result = eee.config.super_nodes
[eee.config.super_node_index.load(Ordering::Relaxed) as usize]
.deepcopy();
} else {
// dst.last_seen.store(now, Ordering::Relaxed);
is_p2p = true;
result = dst.sock.read().unwrap().deepcopy();
}
} else {
result = eee.config.super_nodes
[eee.config.super_node_index.load(Ordering::Relaxed) as usize]
.deepcopy();
}
if !is_p2p {
debug!("check_query_peer_info");
super::packet::check_query_peer_info(eee, dst_ip).await;
}
return (result, is_p2p);
}

56
src/network/device.rs Normal file
View File

@ -0,0 +1,56 @@
use sdlan_sn_rs::peer::IpSubnet;
pub struct DeviceConfig {
pub ip: IpSubnet,
}
impl DeviceConfig {
pub fn new() -> Self {
DeviceConfig {
ip: IpSubnet::new(0, 0),
}
}
/*
pub fn set_ip(&self, net_addr: u32, net_bit_len: u8) {
if net_bit_len <= 8 || net_bit_len > 32 {
error!("configured net bit length error: {}", net_bit_len);
return;
}
self.ip.net_addr.store(net_addr, Ordering::Relaxed);
self.ip.net_bit_len.store(net_bit_len, Ordering::Relaxed);
}
*/
pub fn get_ip(&self) -> u32 {
self.ip.net_addr()
}
pub fn get_net_bit(&self) -> u8 {
self.ip.net_bit_len()
}
}
/// The mode in which open the virtual network adapter.
#[allow(unused)]
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
pub enum Mode {
/// TUN mode
///
/// The packets returned are on the IP layer (layer 3), prefixed with 4-byte header (2 bytes
/// are flags, 2 bytes are the protocol inside, eg one of
/// <https://en.wikipedia.org/wiki/EtherType#Examples>.
Tun = 1,
/// TAP mode
///
/// The packets are on the transport layer (layer 2), and start with ethernet frame header.
Tap = 2,
}
/*
pub trait TunDevice: ReadWriter {
fn name(&self) -> &str;
fn mode(&self) -> &Mode;
fn get_ip(&self) -> u32;
fn get_net_bit(&self) -> u8;
}
*/

21
src/network/mod.rs Normal file
View File

@ -0,0 +1,21 @@
mod node;
pub use node::*;
mod async_main;
pub use async_main::*;
mod packet;
pub use packet::*;
#[cfg_attr(target_os = "linux", path = "tun_linux.rs")]
#[cfg_attr(target_os = "windows", path = "tun_win.rs")]
mod tun;
mod device;
/*
pub trait ReadWriter {
fn send(&self, content: &[u8]) -> std::io::Result<usize>;
fn recv(&self, buf: &mut [u8]) -> std::io::Result<usize>;
}
*/

480
src/network/node.rs Normal file
View File

@ -0,0 +1,480 @@
use dashmap::DashMap;
use rsa::RsaPrivateKey;
use sdlan_sn_rs::config::{AF_INET, AF_INET6};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicU8, Ordering};
use std::sync::{Arc, RwLock};
use tokio::io::AsyncReadExt;
use crate::pb::{encode_to_tcp_message, SdlEmpty};
use crate::tcp::{get_tcp_conn, PacketType};
use crate::utils::{PidRecorder, Socket};
use sdlan_sn_rs::peer::{is_sdlan_sock_equal, IpSubnet, V6Info};
use once_cell::sync::OnceCell;
use super::device::{DeviceConfig, Mode};
use super::tun::{new_iface, Iface};
use tokio::fs::File;
use sdlan_sn_rs::utils::{create_or_load_uuid, gen_rsa_keys, load_private_key_file};
use sdlan_sn_rs::utils::{Result, SDLanError};
static EDGE: OnceCell<Node> = OnceCell::new();
pub async fn init_edge(token: &str, node_conf: NodeConfig, tos: u32) -> Result<()> {
if token.len() == 0 {
println!("failed to load token");
return Err(SDLanError::NormalError("no token is specified"));
}
let _ = PidRecorder::new(".pid");
// gen public key
gen_rsa_keys(".client");
let mut pubkey = String::new();
File::open(".client/id_rsa.pub")
.await?
.read_to_string(&mut pubkey)
.await?;
let privatekey = load_private_key_file(".client/id_rsa")?;
// init sock
// let edge_uuid = create_or_load_uuid("")?;
//let node_conf = parse_config(edge_uuid, &args).await?;
let sock_v4 = Socket::build(0, true, true, tos).await?;
// allow multicast
// TODO: set the sn's tcp socket
// let tcpsock = TCPSocket::build("121.4.79.234:1234").await?;
let tcp_pong = Arc::new(AtomicU64::new(0));
let edge = Node::new(
pubkey,
node_conf,
sock_v4,
token,
privatekey,
tcp_pong.clone(),
);
do_init_edge(edge)?;
Ok(())
}
fn do_init_edge(edge: Node) -> Result<()> {
if let Err(_) = EDGE.set(edge) {
return Err(SDLanError::NormalError("initialize sn error"));
}
Ok(())
}
pub fn get_edge() -> &'static Node {
let edge = EDGE.get();
if edge.is_none() {
panic!("should init_edge first");
}
edge.unwrap()
}
pub struct Node {
packet_id: AtomicU32,
pub network_id: AtomicU32,
pub tcp_pong: Arc<AtomicU64>,
// user token info
pub _token: String,
pub device_config: DeviceConfig,
pub device: Iface,
// authorize related
pub authorized: AtomicBool,
// pub header_key: RwLock<Arc<Vec<u8>>>,
pub encrypt_key: RwLock<Arc<Vec<u8>>>,
pub rsa_pubkey: String,
pub rsa_private: RsaPrivateKey,
pub config: NodeConfig,
// pub super_node: Vec<Peer>,
// pub super_attempts: AtomicU8,
// store pending, and known peers
pub pending_peers: PeerMap,
pub known_peers: PeerMap,
// pub tcp_sock_v4: TCPSocket,
pub udp_sock_v4: Socket,
pub udp_sock_v6: RwLock<Arc<Option<Socket>>>,
pub multicast_sock: SdlanSock,
pub _local_v6: RwLock<Option<[u8; 16]>>,
pub stats: NodeStats,
// last register super time, in unix
pub _last_register_req: AtomicU64,
}
unsafe impl Sync for Node {}
impl Node {
pub fn new(
pubkey: String,
config: NodeConfig,
sock: Socket,
// tcpsock: TCPSocket,
token: &str,
private: RsaPrivateKey,
tcp_pong: Arc<AtomicU64>,
) -> Self {
Self {
packet_id: AtomicU32::new(1),
network_id: AtomicU32::new(0),
_token: token.to_owned(),
tcp_pong,
device_config: DeviceConfig::new(),
device: new_iface("dev", Mode::Tun),
authorized: AtomicBool::new(false),
encrypt_key: RwLock::new(Arc::new(Vec::new())),
// rsa_pubkey:
rsa_pubkey: pubkey,
rsa_private: private,
config,
// super_node: Vec::new(),
// super_attempts: AtomicU8::new(0),
pending_peers: PeerMap::new(),
known_peers: PeerMap::new(),
// tcp_sock_v4: tcpsock,
udp_sock_v4: sock,
udp_sock_v6: RwLock::new(Arc::new(None)),
multicast_sock: SdlanSock {
family: AF_INET,
port: config::MULTICAST_PORT,
v4: config::MULITCAST_V4,
v6: [0; 16],
},
_local_v6: RwLock::new(None),
stats: NodeStats::new(),
_last_register_req: AtomicU64::new(0),
}
}
pub fn get_next_packet_id(&self) -> u32 {
self.packet_id.fetch_add(1, Ordering::Relaxed)
}
pub fn is_authorized(&self) -> bool {
// self.header_key
self.authorized.load(Ordering::Relaxed)
}
pub fn set_authorized(&self, authorized: bool, encrypt_key: Vec<u8>) {
self.authorized.store(authorized, Ordering::Relaxed);
*(self.encrypt_key.write().unwrap()) = Arc::new(encrypt_key);
}
/*
pub fn get_header_key(&self) -> Arc<Vec<u8>> {
// self.header_key.read().unwrap().clone()
}
*/
pub fn get_encrypt_key(&self) -> Arc<Vec<u8>> {
self.encrypt_key.read().unwrap().clone()
}
/*
pub fn sn_is_known(&self, sock: &SdlanSock) -> bool {
for sn in self.config.super_nodes.iter() {
if sn.family != sock.family || sn.port != sock.port {
continue;
}
if sn.family == AF_INET && sn.v4 == sock.v4 {
return true;
}
if sn.family == AF_INET6 && sn.v6 == sock.v6 {
return true;
}
}
return false;
}
*/
pub fn _remove_v6(&self) {
*(self.udp_sock_v6.write().unwrap()) = Arc::new(None);
}
/*
pub async fn send_to_v4<A: ToSocketAddrs>(&self, info: &[u8], target: A) -> Result<usize> {
match self.udp_sock_v4.send_to(info, target).await {
Ok(n) => Ok(n),
Err(e) => {
println!("failed to send");
Err(SDLanError::NormalError("failed to send"))
}
}
}
*/
/*
pub async fn send_to_v6<A: ToSocketAddrs>(&self, info: &[u8], target: A) -> Result<usize> {
let m = self.udp_sock_v6.read().unwrap().clone();
if let Some(ref l) = m.as_ref() {
match l.send_to(info, target).await {
Err(e) => {
return Err(SDLanError::NormalError("send error"));
}
Ok(n) => return Ok(n),
}
}
Err(SDLanError::NormalError("no udp6 conn is bined"))
}
*/
pub async fn send_unregister_super(&self) -> Result<()> {
let content =
encode_to_tcp_message::<SdlEmpty>(None, 0, PacketType::UnRegisterSuper as u8).unwrap();
let conn = get_tcp_conn();
let _ = conn.send(&content).await;
Ok(())
}
/*
pub async fn send_register_super(&self) -> Result<()> {
let packet_id = self.packet_id.fetch_add(1, Ordering::Relaxed);
let cmn = Common {
packet_id,
version: SDLAN_VERSION,
id: &self.config.node_uuid,
token: self.token,
ttl: SDLAN_DEFAULT_TTL,
pc: PacketType::PKTRegisterSuper,
flags: 0,
};
let rs = RegisterSuper {
pass: "encrypt!",
cookie: 0,
sock: None,
v6_info: None,
dev_addr: IpSubnetNonAtomic::new(
self.device_config.get_ip(),
self.device_config.get_net_bit(),
),
pub_key: self.rsa_pubkey.clone(),
};
let content = packet::encode_packet(&cmn, &rs)?;
// self.udp_sock_v4.send_to(&content, self.config.super_nodes)
send_to_sock(
&self,
&content,
&self.config.super_nodes[self.config.super_node_index.load(Ordering::Relaxed) as usize],
)
.await?;
// write_to_addr(&sock, "127.0.0.1:7655", &content).await?;
// println!("sent!");
Ok(())
}
*/
}
pub struct PeerMap {
pub peers: DashMap<u32, Arc<EdgePeer>>,
}
#[allow(unused)]
impl PeerMap {
pub fn new() -> PeerMap {
Self {
peers: DashMap::new(),
}
}
pub fn get_peer(&self, ip: &u32) -> Option<Arc<EdgePeer>> {
if let Some(v) = self.peers.get(ip) {
Some(v.clone())
} else {
None
}
}
pub fn clear(&self) {
self.peers.clear();
}
pub fn get_peer_by_sock(&self, sock: &SdlanSock) -> Option<Arc<EdgePeer>> {
for s in self.peers.iter() {
let m = s.sock.read().unwrap();
if is_sdlan_sock_equal(&m, sock) {
return Some(s.value().clone());
}
}
None
}
pub fn delete_peer_with_ip(&self, ip: &u32) {
self.peers.remove(ip);
}
pub fn insert_peer(&self, p: Arc<EdgePeer>) {
let net_addr = p.dev_addr.net_addr();
if net_addr != 0 {
self.peers.insert(net_addr, p);
}
}
}
pub struct NodeStats {
pub tx_p2p: AtomicU64,
pub rx_p2p: AtomicU64,
pub tx_sup: AtomicU64,
pub rx_sup: AtomicU64,
pub tx_broadcast: AtomicU64,
pub rx_broadcast: AtomicU64,
pub last_sup: AtomicU64,
pub last_p2p: AtomicU64,
}
impl NodeStats {
pub fn new() -> Self {
Self {
tx_p2p: AtomicU64::new(0),
rx_p2p: AtomicU64::new(0),
tx_sup: AtomicU64::new(0),
rx_sup: AtomicU64::new(0),
tx_broadcast: AtomicU64::new(0),
rx_broadcast: AtomicU64::new(0),
last_p2p: AtomicU64::new(0),
last_sup: AtomicU64::new(0),
}
}
}
use sdlan_sn_rs::peer::SdlanSock;
use crate::config::{self, REGISTER_INTERVAL};
pub struct NodeConfig {
// node name
pub name: String,
// 允许路由
pub allow_routing: bool,
// 丢弃多播,广播消息
pub _drop_multicast: bool,
// 允许p2p打洞
pub allow_p2p: bool,
// mtu of the tun
pub mtu: u32,
// udp消息的服务类型
pub _tos: u8,
// 打洞时候,时间间隔
pub _register_super_interval: u16,
// 打洞时候register消息的ttl
pub register_ttl: u8,
// 本地打开的udp端口
pub _local_port: u16,
pub node_uuid: String,
// pub super_attempts: AtomicU8,
pub super_nodes: Vec<SdlanSock>,
pub super_node_index: AtomicU8,
}
impl NodeConfig {
pub fn new() -> Self {
Self {
name: String::new(),
allow_routing: true,
allow_p2p: true,
_drop_multicast: false,
_tos: 0,
_register_super_interval: config::REGISTER_SUPER_INTERVAL,
register_ttl: 1,
// any port,
_local_port: 0,
node_uuid: String::new(),
super_nodes: Vec::new(),
// super_attempts: AtomicU8::new(config::SUPER_ATTEMPTS_DEFAULT),
super_node_index: AtomicU8::new(0),
mtu: 1290,
}
}
}
#[derive(Debug)]
pub struct EdgePeer {
pub dev_addr: IpSubnet,
// 对端对外开放的ip和端口信息
pub sock: RwLock<SdlanSock>,
// peer's ipv6 info
pub _v6_info: RwLock<Option<SdlanSock>>,
pub timeout: u8,
// 最近一次遇见
pub last_seen: AtomicU64,
// 最近一次p2p消息
pub last_p2p: AtomicU64,
// 最近一次合法时间
pub _last_valid_timestamp: AtomicU64,
// 最近一次发送query
pub last_sent_query: AtomicU64,
}
impl EdgePeer {
pub fn new(
net_addr: u32,
net_bit_len: u8,
sock: &SdlanSock,
v6info: &Option<V6Info>,
now: u64,
) -> Self {
let mut v6_info = None;
if let Some(v6info) = v6info {
v6_info = Some(SdlanSock {
family: AF_INET6,
port: v6info.port,
v4: [0; 4],
v6: v6info.v6,
})
}
Self {
dev_addr: IpSubnet::new(net_addr, net_bit_len),
sock: RwLock::new(sock.deepcopy()),
_v6_info: RwLock::new(v6_info),
timeout: REGISTER_INTERVAL,
last_p2p: AtomicU64::new(0),
last_seen: AtomicU64::new(0),
_last_valid_timestamp: AtomicU64::new(now),
last_sent_query: AtomicU64::new(0),
}
}
}

853
src/network/packet.rs Normal file
View File

@ -0,0 +1,853 @@
use std::{net::SocketAddr, sync::atomic::Ordering, time::Duration};
use crate::{
config::REGISTER_INTERVAL,
pb::{
encode_to_tcp_message, encode_to_udp_message, SdlData, SdlEmpty, SdlPeerInfo, SdlQueryInfo,
SdlRegister, SdlRegisterAck,
},
tcp::{get_tcp_conn, PacketType},
utils::{send_to_sock, Socket},
};
use etherparse::IpHeaders;
use prost::Message;
use sdlan_sn_rs::{
config::AF_INET,
peer::{is_sdlan_sock_equal, SdlanSock, V6Info},
utils::{
aes_decrypt, get_current_timestamp, get_sdlan_sock_from_socketaddr, ip_to_string,
is_multi_broadcast, Result, SDLanError,
},
};
use std::sync::Arc;
use tracing::{debug, error, info};
use super::{EdgePeer, Node};
pub async fn read_and_parse_packet(
eee: &Node,
sock: &Socket,
timeout: Option<Duration>,
// cancel: CancellationToken,
) -> Result<()> {
let mut buf = vec![0; 3000];
let res;
if timeout.is_some() {
tokio::select! {
_ = tokio::time::sleep(timeout.unwrap()) => {
return Err(SDLanError::NormalError("timeouted"));
}
r = sock.recv_from(&mut buf) => {
res=r;
}
}
} else {
res = sock.recv_from(&mut buf).await;
}
debug!("read_and_parse packet, got packet");
// let res = sock.recv_from(&mut buf).await;
match res {
Ok((0, _)) => {
// received 0
error!("received zero bytes");
// return Ok(())
}
Err(e) => {
// error occured
error!("receive error occured: {:?}", e);
}
Ok((size, from)) => {
// size > 0
buf.truncate(size);
match handle_packet(eee, from, &buf).await {
Ok(_) => {}
Err(e) => {
error!("failed to handle_packet: {:?}", e);
}
}
}
}
Ok(())
}
pub async fn handle_packet(eee: &Node, addr: SocketAddr, buf: &[u8]) -> Result<()> {
if buf.len() < 1 {
return Err(SDLanError::NormalError("buf length error"));
}
let Ok(pkt_type) = PacketType::try_from(buf[0]) else {
return Err(SDLanError::NormalError("invalid packet type"));
};
match pkt_type {
PacketType::StunReply => {
// stun reply, like pong
}
PacketType::Register => {
if !eee.is_authorized() {
error!("dropping REGISTER received before authorized");
return Ok(());
}
let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap();
let _ = handle_packet_register(eee, &buf[1..], false, &from_sock).await;
}
PacketType::RegisterACK => {
if !eee.is_authorized() {
error!("dropping REGISTER received before authorized");
return Ok(());
}
let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap();
let _ = handle_packet_register_ack(eee, &buf[1..], &from_sock).await;
}
PacketType::Data => {
if !eee.is_authorized() {
error!("dropping PACKET received before authorized");
return Ok(());
}
let Ok(data) = SdlData::decode(&buf[1..]) else {
error!("failed to decode to SDLData");
return Err(SDLanError::NormalError("failed to decode to SDLData"));
};
let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap();
if data.is_p2p {
debug!("[P2P] Rx data from {}", from_sock.to_string());
} else {
debug!(
"[PsP] Rx data from {} via {}",
ip_to_string(&data.src_ip),
from_sock.to_string()
);
}
if data.is_p2p {
check_peer_registration_needed(eee, !data.is_p2p, data.src_ip, &None, &from_sock)
.await;
}
handle_tun_packet(eee, !data.is_p2p, data).await;
}
other => {
error!("udp not processing {:?}", other);
}
}
/*
let pkt_type = buf[0].into();
debug!("got packet {} bytes", buf.len());
let (cmn, slice) = packet::decode_common(&buf)?;
println!("got packet: {:?}", cmn.pc);
if !eee.is_authorized() {
error!("unauthorized, returning");
return Ok(());
}
let from_sn = (cmn.flags & config::SDLAN_FLAGS_FROM_SN) != 0;
let from_sock = get_sdlan_sock_from_socketaddr(addr).unwrap();
if from_sn {
if !eee.sn_is_known(&from_sock) {
error!("drop incoming data from unknown supernode");
return Ok(());
}
}
let res = match cmn.pc {
PacketType::PKTPacket => {
// handle packet
handle_packet_packet(eee, cmn, slice, from_sn, &from_sock).await
}
PacketType::PKTRegister => {
// handle register from other peer
handle_packet_register(eee, slice, from_sn, &from_sock).await
}
PacketType::PKTRegisterACK => {
// handle register ack from other peer
handle_packet_register_ack(eee, slice, &from_sock).await
}
PacketType::PKTRegisterSuperAcknowledge => {
// handle register super acknowledge
handle_packet_register_super_acknowledge(eee)
}
PacketType::PKTRegisterSuperACK => {
// handle register super ack
handle_packet_register_super_ack(eee, cmn, slice)
}
PacketType::PKTRegisterSuperNAK => {
// handle register super nak
handle_packet_register_super_nak(eee, cmn, slice)
}
PacketType::PKTPeerInfo => {
// handle peer info from sn
handle_packet_peer_info(eee, slice).await
}
PacketType::PKTCommand => {
// handle command
Ok(())
}
other => {
error!("unknown packet {:?}", other);
Ok(())
}
};
if let Err(e) = res {
error!("handle packet error occured: {}", e.as_str());
}
*/
Ok(())
}
pub async fn handle_packet_peer_info(
eee: &Node,
// cmn: Common<'_>,
body: &[u8],
//sender_sock: &SdlanSock,
) -> Result<()> {
let Ok(pi) = SdlPeerInfo::decode(body) else {
error!("failed to decode PEER_INFO");
return Ok(());
};
debug!("got peer info: {:?}", pi);
if pi.v4_info.is_none() {
error!("PEER's v4_info is none");
return Ok(());
}
let v4 = pi.v4_info.unwrap();
let Ok(v4_u32) = v4.v4.try_into() else {
error!("failed to convert v4");
return Ok(());
};
// let src_ip = u32::from_be_bytes(v4_u32);
if pi.dst_ip == 0 {
// pong from sn
} else {
match eee.pending_peers.get_peer(&pi.dst_ip) {
Some(edgeinfo) => {
let sock = SdlanSock {
family: AF_INET,
port: v4.port as u16,
v4: v4_u32,
v6: [0; 16],
};
*(edgeinfo.sock.write().unwrap()) = sock.deepcopy();
info!(
"Rx PEERINFO for {}: is at {}",
ip_to_string(&pi.dst_ip),
sock.to_string()
);
send_register(eee, &sock, &None).await;
}
None => {
debug!("Rx PEERINFO unknown peer: {}", ip_to_string(&pi.dst_ip));
}
}
}
Ok(())
}
/*
fn handle_packet_register_super_nak(eee: &Node, _cmn: Common<'_>, slice: &[u8]) -> Result<()> {
let nak: RegisterSuperNAK = serde_json::from_slice(slice)?;
if nak.src_ip == eee.device_config.get_ip() {
eee.set_authorized(false, Vec::new());
error!("unauthorized");
} else {
eee.known_peers.delete_peer_with_ip(&nak.src_ip);
eee.pending_peers.delete_peer_with_ip(&nak.src_ip);
}
Ok(())
}
*/
/*
fn handle_packet_register_super_ack(eee: &Node, _cmn: Common<'_>, slice: &[u8]) -> Result<()> {
debug!("handling REGISTER_SUPER_ACK");
let ack: RegisterSuperACK = serde_json::from_slice(slice)?;
if ack.dev_addr.net_addr != 0 && ack.dev_addr.net_bit_len != 0 {
// i'm authorized or moved;
// eee.device
eee.device_config
.set_ip(ack.dev_addr.net_addr, ack.dev_addr.net_bit_len);
debug!(
"ip addr assigned: {}/{}",
ip_to_string(&ack.dev_addr.net_addr),
ack.dev_addr.net_bit_len
)
}
let Ok(private_key) = load_private_key_file(".client/id_rsa") else {
error!("failed to load private key");
return Err(SDLanError::NormalError("failed to load private key"));
};
let encrypt_key = rsa_decrypt(&private_key, &ack.encrypted_key)?;
let header_key = rsa_decrypt(&private_key, &ack.header_key)?;
eee.config
.super_attempts
.store(SUPER_ATTEMPTS_DEFAULT, Ordering::Relaxed);
eee.stats
.last_sup
.store(get_current_timestamp(), Ordering::Relaxed);
debug!("changed to Authorized");
eee.set_authorized(true, encrypt_key);
eee.device.reload_config(&eee.device_config);
eee.known_peers.clear();
eee.pending_peers.clear();
Ok(())
}
*/
/*
fn handle_packet_register_super_acknowledge(
eee: &Node,
// cmn: Common<'_>,
// slice: &[u8],
) -> Result<()> {
debug!("handling REGISTER_SUPER_ACKNOWLEDGE");
// TODO: should check the common and the slice content.
eee.config
.super_attempts
.store(SUPER_ATTEMPTS_DEFAULT, Ordering::Relaxed);
eee.stats
.last_sup
.store(get_current_timestamp(), Ordering::Relaxed);
Ok(())
}
*/
async fn handle_packet_register_ack(
eee: &Node,
// cmn: Common<'_>,
body: &[u8],
sender_sock: &SdlanSock,
) -> Result<()> {
let Ok(ack) = SdlRegisterAck::decode(body) else {
println!("failed to decode REGISTER_ACK");
return Ok(());
};
let origin_sender = sender_sock;
debug!(
"Rx REGISTER ACK from {} [{}] to {} via {}",
ip_to_string(&ack.src_ip),
origin_sender.to_string(),
ip_to_string(&ack.dst_ip),
sender_sock.to_string(),
);
peer_set_p2p_confirmed(eee, ack.src_ip, sender_sock);
Ok(())
}
async fn handle_packet_register(
eee: &Node,
// cmn: Common<'_>,
body: &[u8],
from_sn: bool,
sender_sock: &SdlanSock,
) -> Result<()> {
if !eee.is_authorized() {
error!("drop register due to not authed");
return Ok(());
}
let Ok(reg) = SdlRegister::decode(body) else {
error!("failed to decode REGISTER");
return Ok(());
};
let origin_sender = sender_sock;
let via_multicast = is_multi_broadcast(reg.dst_ip);
if via_multicast && reg.src_ip == eee.device_config.get_ip() {
debug!("skipping register from self");
return Ok(());
}
if !from_sn {
info!("[P2P] Rx REGISTER from {}", sender_sock.to_string());
eee.pending_peers.delete_peer_with_ip(&reg.src_ip);
send_register_ack(eee, origin_sender, &reg).await;
} else {
info!(
"[PsP] Rx REGISTER from {} [{}] to {} via {}",
ip_to_string(&reg.src_ip),
ip_to_string(&reg.dst_ip),
sender_sock.to_string(),
origin_sender.to_string(),
);
}
check_peer_registration_needed(eee, from_sn, reg.src_ip, &None, origin_sender).await;
Ok(())
}
/*
async fn handle_packet_packet(
eee: &Node,
cmn: Common<'_>,
body: &[u8],
from_sn: bool,
sender_sock: &SdlanSock,
) -> Result<()> {
if eee.stats.last_sup.load(Ordering::Relaxed) == 0 {
error!("dropping PACKET received before first registration with sn");
return Ok(());
}
let has_sock = cmn.flags & config::SDLAN_FLAGS_SOCKET != 0;
let has_v6 = cmn.flags & config::SDLAN_FLAGS_HAS_V6 != 0;
let pkt = packet::Packet::unmarshal(body, has_sock, has_v6)?;
// let mut orig_sender: &SdlanSock = sender_sock;
// here, the origin sender ref should be checked
// if let Some(ref sk) = pkt.sock {
// orig_sender = sk;
// }
// println!("orig_sender: {:?}", orig_sender);
let mut origin_sender = sender_sock;
if let Some(ref k) = pkt.sock {
origin_sender = k;
}
println!("orig_sender: {:?}", origin_sender);
if !from_sn {
// data from other peer
debug!("[P2P] Rx data from {}", sender_sock.to_string());
eee.pending_peers.peers.remove(&pkt.src_ip);
} else {
// from sn, sock should not be None
debug!(
"[PsP] Rx data from {} (via {})",
origin_sender.to_string(),
sender_sock.to_string()
);
}
check_peer_registration_needed(eee, from_sn, pkt.src_ip, &pkt.v6_info, origin_sender).await;
// handle_tun_packet(eee, from_sn, pkt).await;
Ok(())
}
*/
pub async fn check_peer_registration_needed(
eee: &Node,
from_sn: bool,
src_ip: u32,
v6_info: &Option<V6Info>,
peer_sock: &SdlanSock,
) {
let mut p = eee.known_peers.get_peer(&src_ip);
if let None = p {
p = eee.known_peers.get_peer_by_sock(peer_sock);
if let Some(ref k) = p {
eee.known_peers.insert_peer(k.clone());
}
}
match p {
None => {
let _ = register_with_new_peer(eee, from_sn, src_ip, v6_info, peer_sock).await;
// unimplemented!();
}
Some(k) => {
let now = get_current_timestamp();
if !from_sn {
k.last_p2p.store(now, Ordering::Relaxed);
}
let last_seen = k.last_seen.load(Ordering::Relaxed);
// more than 3 seconds
if now - last_seen > 3 {
check_known_peer_sock_change(eee, from_sn, src_ip, v6_info, peer_sock).await;
}
}
}
}
async fn check_known_peer_sock_change(
eee: &Node,
from_sn: bool,
ip: u32,
v6_info: &Option<V6Info>,
// dev_addr: &IpSubnet,
peersock: &SdlanSock,
) {
if is_multi_broadcast(ip) {
return;
}
match eee.known_peers.get_peer(&ip) {
Some(p) => {
if !is_sdlan_sock_equal(&p.sock.read().unwrap(), peersock) {
if !from_sn {
info!(
"peer changed: {}: {} -> {}",
ip_to_string(&ip),
&p.sock.read().unwrap().to_string(),
peersock.to_string()
);
eee.known_peers.delete_peer_with_ip(&ip);
register_with_new_peer(eee, from_sn, ip, v6_info, peersock).await;
}
} else {
// from sn, sn could see a different sock with us, just ignore it
}
}
None => return,
}
}
async fn register_with_new_peer(
eee: &Node,
from_sn: bool,
ip: u32,
v6_info: &Option<V6Info>,
// dev_addr: &IpSubnet,
peersock: &SdlanSock,
) {
let now = get_current_timestamp();
let mut scan = eee.pending_peers.get_peer(&ip);
if let None = scan {
// such ip not found in pending
let temp = Arc::new(EdgePeer::new(
ip,
eee.device_config.get_net_bit(),
peersock,
v6_info,
now,
));
debug!(
"===> new pending {} => {}",
ip_to_string(&ip),
peersock.to_string(),
);
eee.pending_peers.insert_peer(temp.clone());
scan = Some(temp);
debug!("Pending size: {}", eee.pending_peers.peers.len());
if from_sn {
// should send register to peer
if eee.config.register_ttl == 1 {
/* We are DMZ host or port is directly accessible. Just let peer to send back the ack */
} else if eee.config.register_ttl > 1 {
let mut alter = 16;
if let Ok(ttl) = eee.udp_sock_v4.ttl() {
let mut temp = peersock.deepcopy();
send_register(eee, &temp, v6_info).await;
let _ = eee.udp_sock_v4.set_ttl(eee.config.register_ttl as u32);
while alter > 0 {
temp.port += 1;
send_register(eee, &temp, &None).await;
alter -= 1;
}
let _ = eee.udp_sock_v4.set_ttl(ttl);
}
} else {
// Normal STUN
send_register(eee, peersock, v6_info).await;
}
// 发送给sn
send_register(
eee,
&eee.config.super_nodes
[eee.config.super_node_index.load(Ordering::Relaxed) as usize],
&None,
)
.await;
} else {
// P2P register, send directly
send_register(eee, peersock, v6_info).await;
}
register_with_local_peers(eee).await;
} else {
if let Some(ref s) = scan {
*(s.sock.write().unwrap()) = peersock.deepcopy();
}
}
if let Some(s) = scan {
s.last_seen
.store(get_current_timestamp(), Ordering::Relaxed);
}
}
async fn register_with_local_peers(eee: &Node) {
if eee.config.allow_p2p {
send_register(eee, &eee.multicast_sock, &None).await;
}
}
async fn send_register(eee: &Node, sock: &SdlanSock, _v6_info: &Option<V6Info>) {
if !eee.config.allow_p2p {
debug!("skipping register as p2p is disabled");
return;
}
let network_id = eee.network_id.load(Ordering::Relaxed);
if network_id == 0 {
error!("not authed");
return;
}
let register = SdlRegister {
network_id: network_id,
src_ip: eee.device_config.get_ip(),
dst_ip: u32::from_be_bytes(sock.v4),
};
let msg = encode_to_udp_message(Some(register), PacketType::Register as u8).unwrap();
let _ = send_to_sock(eee, &msg, sock).await;
/*
let key = eee.get_header_key();
if key.len() > 0 {
if let Ok(cnt) = encode_packet_encrypted(&cmn, &r, key.as_slice()) {
let _ = send_to_sock_v4_and_v6(eee, &cnt, sock, v6_info).await;
}
} else {
error!("not authed");
}
*/
}
async fn handle_tun_packet(
eee: &Node,
from_sn: bool,
pkt: SdlData, //orig_sender: &SdlanSock
) {
let now = get_current_timestamp();
if from_sn {
if is_multi_broadcast(pkt.dst_ip) {
eee.stats.rx_broadcast.fetch_add(1, Ordering::Relaxed);
}
eee.stats.rx_sup.fetch_add(1, Ordering::Relaxed);
eee.stats.last_sup.store(now, Ordering::Relaxed);
} else {
eee.stats.rx_p2p.fetch_add(1, Ordering::Relaxed);
eee.stats.last_p2p.store(now, Ordering::Relaxed);
}
let payload = pkt.data;
let key = eee.get_encrypt_key();
if key.len() == 0 {
// check the encrypt key
error!("packet encrypt key not provided");
return;
}
let origin = aes_decrypt(key.as_slice(), &payload);
if let Err(_e) = origin {
error!("failed to decrypt original data");
return;
}
let data = origin.unwrap();
debug!("got packet from sock, will send to tun");
match IpHeaders::from_slice(&data) {
Ok((iphdr, _)) => {
if let Some(ipv4hdr) = iphdr.ipv4() {
let dstip = u32::from_be_bytes(ipv4hdr.0.destination);
if !is_multi_broadcast(dstip) && dstip != eee.device_config.get_ip() {
// should not routed to me
error!("should not routed to me");
return;
}
// packet should be sent to dev
debug!("writing {} bytes to tun", data.len());
if let Err(e) = eee.device.send(&data) {
error!("failed to write to tun: {}", e.to_string());
}
}
}
Err(e) => {
error!("failed to parse ip packet: {}", e.to_string());
}
}
}
async fn send_register_ack(eee: &Node, orig_sender: &SdlanSock, reg: &SdlRegister) {
if !eee.config.allow_p2p {
debug!("Skipping REGISTER ACK as P2P is disallowed");
return;
}
let network_id = eee.network_id.load(Ordering::Relaxed);
if network_id == 0 {
error!("not authed");
return;
}
let ack = SdlRegisterAck {
network_id,
src_ip: eee.device_config.get_ip(),
dst_ip: reg.src_ip,
};
let Ok(ack) = encode_to_udp_message(Some(ack), PacketType::RegisterACK as u8) else {
error!("failed to encode to udp message");
return;
};
let _ = send_to_sock(eee, &ack, orig_sender).await;
}
fn peer_set_p2p_confirmed(eee: &Node, src_ip: u32, sender_sock: &SdlanSock) {
let mut scan = eee.pending_peers.get_peer(&src_ip);
if let None = scan {
scan = eee.pending_peers.get_peer_by_sock(sender_sock);
}
if let None = scan {
error!(
"failed to find sender in pending peer: {}",
sender_sock.to_string()
);
return;
}
let mut scan = scan.unwrap();
eee.pending_peers.delete_peer_with_ip(&src_ip);
match eee.known_peers.get_peer(&src_ip) {
Some(scantmp) => {
eee.known_peers.delete_peer_with_ip(&src_ip);
scan = scantmp;
scan.dev_addr.net_addr.store(src_ip, Ordering::Relaxed);
scan.dev_addr
.net_bit_len
.store(eee.device_config.get_net_bit(), Ordering::Relaxed);
}
None => {
*(scan.sock.write().unwrap()) = sender_sock.deepcopy();
}
}
let now = get_current_timestamp();
scan.last_p2p.store(now, Ordering::Relaxed);
scan.last_seen.store(now, Ordering::Relaxed);
let ip_string = ip_to_string(&src_ip);
let sock_string = sender_sock.to_string();
info!(
"P2P connection established: {} [{}]",
&ip_string, &sock_string,
);
debug!("==> new peer: {} -> {}", &ip_string, &sock_string,);
eee.known_peers.insert_peer(scan);
}
pub async fn check_query_peer_info(eee: &Node, dst_ip: u32) {
let scan: Arc<EdgePeer>;
let now = get_current_timestamp();
match eee.pending_peers.get_peer(&dst_ip) {
None => {
let sock = SdlanSock {
family: AF_INET,
port: 0,
v4: [0; 4],
v6: [0; 16],
};
let peer = Arc::new(EdgePeer::new(
dst_ip,
eee.device_config.get_net_bit(),
&sock,
&None,
now,
));
debug!("insert peer {} to pending", ip_to_string(&dst_ip));
eee.pending_peers.insert_peer(peer.clone());
scan = peer;
}
Some(s) => {
scan = s;
}
}
debug!(
"now={}, last_sent_query={}, REGISTER_INTERVAL={}, scan={:?}",
now,
scan.last_sent_query.load(Ordering::Relaxed),
REGISTER_INTERVAL,
scan,
);
if now - scan.last_sent_query.load(Ordering::Relaxed) > (REGISTER_INTERVAL as u64) {
/*
send_register(
eee,
&eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize],
&None,
)
.await;
*/
debug!("sending query for {}", ip_to_string(&dst_ip));
if let Ok(()) = send_query_peer(eee, dst_ip).await {
scan.last_sent_query.store(now, Ordering::Relaxed);
}
}
}
async fn send_query_peer(eee: &Node, dst_ip: u32) -> Result<()> {
let network_id = eee.network_id.load(Ordering::Relaxed);
if network_id == 0 {
error!("not authed");
return Err(SDLanError::NormalError("not connected"));
}
let query = SdlQueryInfo { dst_ip };
let Ok(content) = encode_to_tcp_message(
Some(query),
eee.get_next_packet_id(),
PacketType::QueryInfo as u8,
) else {
error!("failed to encode query");
return Err(SDLanError::NormalError("encode query error"));
};
let tcp_conn = get_tcp_conn();
tcp_conn.send(&content).await
}
pub async fn ping_to_sn() {
let Ok(msg) = encode_to_tcp_message::<SdlEmpty>(None, 0, PacketType::Ping as u8) else {
error!("failed to encode ping");
return;
};
debug!("ping to sn");
let tcp_conn = get_tcp_conn();
if let Err(e) = tcp_conn.send(&msg).await {
error!("failed to ping to sn: {:?}", e);
}
}
/*
pub async fn update_supernode_reg(eee: &Node) {
let now = get_current_timestamp();
let authed = eee.is_authorized();
let last_reg = eee.last_register_req.load(Ordering::Relaxed);
if !authed {
if now > (last_reg + (REGISTER_INTERVAL as u64) / 10) {
debug!("update supernode reg, fast retry");
} else {
return;
}
} else if now < (last_reg + REGISTER_INTERVAL as u64) {
return;
}
if eee.config.super_attempts.load(Ordering::Relaxed) == 0 {
eee.config
.super_attempts
.store(SUPER_ATTEMPTS_DEFAULT, Ordering::Relaxed);
error!("sup attempts = 0");
// next time, the supernode will use the new one
let node_index = eee.config.super_node_index.fetch_add(1, Ordering::Relaxed);
if node_index >= (eee.config.super_nodes.len() - 1) as u8 {
eee.config.super_node_index.store(0, Ordering::Relaxed);
}
} else {
eee.config.super_attempts.fetch_sub(1, Ordering::Relaxed);
}
if let Err(e) = eee.send_register_super().await {
error!("failed to send register_super: {}", e.as_str());
}
eee.last_register_req.store(now, Ordering::Relaxed);
register_with_local_peers(eee).await;
}
*/

126
src/network/tun_linux.rs Normal file
View File

@ -0,0 +1,126 @@
use sdlan_sn_rs::utils::{ip_to_string, net_bit_len_to_mask, SDLanError};
use std::ffi::CStr;
use std::ffi::{c_char, c_int};
use std::fs::OpenOptions;
use std::ptr::null_mut;
use sdlan_sn_rs::utils::Result;
use std::io::{Read, Write};
use std::os::fd::AsRawFd;
use std::process::Command;
use tracing::{debug, error};
use super::device::{DeviceConfig, Mode};
#[link(name = "tuntap")]
extern "C" {
fn tuntap_setup(fd: c_int, name: *mut u8, mode: c_int, packet_info: c_int) -> c_int;
}
#[allow(unused)]
pub struct Iface {
fd: std::fs::File,
mode: Mode,
name: String,
}
pub fn new_iface(tunname: &str, mode: Mode) -> Iface {
match Iface::without_packet_info(tunname, mode) {
Err(e) => {
panic!("failed to create tun: {}", e.as_str());
}
Ok(iface) => iface,
}
}
impl Iface {
#[allow(unused)]
pub fn with_packet_info(ifname: &str, mode: Mode) -> Result<Self> {
Iface::open_tun(ifname, mode, true)
}
pub fn without_packet_info(ifname: &str, mode: Mode) -> Result<Self> {
Iface::open_tun(ifname, mode, false)
}
fn open_tun(ifname: &str, mode: Mode, need_packet_info: bool) -> Result<Self> {
let fs = match OpenOptions::new()
.read(true)
.write(true)
.open("/dev/net/tun")
{
Ok(fs) => fs,
Err(e) => panic!("failed to open tun: {}", e),
};
let mut name_ptr: *mut u8 = null_mut();
let mut success = false;
let mut _name = Vec::new();
for i in 0..16 {
_name = Vec::new();
_name.extend_from_slice(ifname.as_bytes());
_name.extend_from_slice(i.to_string().as_bytes());
_name.extend_from_slice(&[0; 33]);
name_ptr = _name.as_mut_ptr();
let result = unsafe {
tuntap_setup(
fs.as_raw_fd(),
name_ptr,
mode as c_int,
if need_packet_info { 1 } else { 0 },
)
};
if result >= 0 {
success = true;
break;
}
}
if success {
let name = unsafe {
CStr::from_ptr(name_ptr as *const c_char)
.to_string_lossy()
.into_owned()
};
Ok(Iface { fd: fs, mode, name })
} else {
Err(SDLanError::NormalError("failed to setup tun"))
}
}
pub fn reload_config(&self, device_config: &DeviceConfig) {
let netbit = device_config.get_net_bit();
let ip = device_config.get_ip();
if netbit == 0 || ip == 0 {
error!("reload config's ip is 0");
return;
}
let ip = ip_to_string(&ip);
let netbit = ip_to_string(&net_bit_len_to_mask(netbit));
let res = Command::new("ifconfig")
.arg(&self.name)
.arg(ip)
.arg("netmask")
.arg(&netbit)
.arg("up")
.output();
match res {
Ok(_) => {
debug!("ifconfig ok");
}
Err(e) => {
error!("failed to run ifconfig: {}", e.to_string());
}
}
}
pub fn recv(&self, buf: &mut [u8]) -> std::io::Result<usize> {
(&self.fd).read(buf)
}
pub fn send(&self, content: &[u8]) -> std::io::Result<usize> {
(&self.fd).write(content)
}
}

54
src/network/tun_win.rs Normal file
View File

@ -0,0 +1,54 @@
use crate::network::ReadWriter;
use sdlan_sn_rs::utils::Result;
use std::io::{Error, ErrorKind};
use std::sync::Arc;
use wintun;
pub struct WinTun {
adapter: Arc<wintun::Adapter>,
session: Arc<wintun::Session>,
}
impl ReadWriter for WinTun {
fn recv(&self, buf: &mut [u8]) -> std::io::Result<usize> {
let Ok(pkt) = self.session.receive_blocking() else {
return Err(Error::new(ErrorKind::Other, "failed to receive"));
};
let content = pkt.bytes();
let length = content.len();
if content.len() > buf.len() {
return Err(Error::new(ErrorKind::Other, "length not enough"));
}
for i in 0..content.len() {
buf[i] = content[i];
}
Ok(length)
}
fn send(&self, content: &[u8]) -> std::io::Result<usize> {
let mut pkt = self
.session
.allocate_send_packet(content.len() as u16)
.unwrap();
let buf: &mut [u8] = pkt.bytes_mut();
buf.copy_from_slice(content);
self.session.send_packet(pkt);
Ok(content.len())
}
}
fn create_wintun(path: &str) -> WinTun {
let wt = unsafe { wintun::load_from_path(path) }.expect("failed to load wintun");
let adapter = match wintun::Adapter::open(&wt, "Demo") {
Ok(a) => a,
Err(_) => wintun::Adapter::create(&wt, "Demo", "Example", None)
.expect("failed to create wintun adapter"),
};
let session = Arc::new(adapter.start_session(wintun::MAX_RING_CAPACITY).unwrap());
WinTun { adapter, session }
}
pub fn create_tun() -> Result<Box<dyn ReadWriter>> {
Ok(Box::new(create_wintun("/path/to/file")))
}

64
src/network/tuntap.c Normal file
View File

@ -0,0 +1,64 @@
/*
* Since the rust ioctl bindings don't have all the structures and constants,
* it's easier to just write the thing in C and link it in.
*/
#include <assert.h>
#include <stdint.h>
#include <string.h>
#include <sys/socket.h>
#ifdef __linux__
#include <linux/if.h>
#include <linux/if_tun.h>
#else
#include <net/if.h>
#define IFF_TUN 0x0001
#define IFF_TAP 0x0002
#define IFF_NO_PI 0x1000
#define TUNSETIFF _IOW('T', 202, int)
#endif
#include <sys/ioctl.h>
/**
* fd the fd to turn into TUN or TAP.
* name the name to use. If empty, kernel will assign something by itself.
* Must be buffer with capacity at least 33.
* mode 1 = TUN, 2 = TAP.
* packet_info if packet info should be provided, if the given value is 0 it will not prepend packet info.
*/
int tuntap_setup(int fd, unsigned char *name, int mode, int packet_info)
{
struct ifreq ifr;
memset(&ifr, 0, sizeof ifr);
switch (mode)
{
case 1:
ifr.ifr_flags = IFF_TUN;
break;
case 2:
ifr.ifr_flags = IFF_TAP;
break;
default:
assert(0);
}
// If no packet info needs to be provided add corresponding flag
if (!packet_info)
{
ifr.ifr_flags |= IFF_NO_PI;
}
// Leave one for terminating '\0'. No idea if it is needed, didn't find
// it in the docs, but assuming the worst.
strncpy(ifr.ifr_name, (char *)name, IFNAMSIZ - 1);
int ioresult = ioctl(fd, TUNSETIFF, &ifr);
if (ioresult < 0)
{
return ioresult;
}
strncpy((char *)name, ifr.ifr_name, IFNAMSIZ < 32 ? IFNAMSIZ : 32);
name[32] = '\0';
return 0;
}

230
src/pb/message.rs Normal file
View File

@ -0,0 +1,230 @@
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Sdlv4Info {
#[prost(uint32, tag = "1")]
pub port: u32,
#[prost(bytes = "vec", tag = "2")]
pub v4: ::prost::alloc::vec::Vec<u8>,
#[prost(uint32, tag = "3")]
pub nat_type: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Sdlv6Info {
#[prost(uint32, tag = "1")]
pub port: u32,
#[prost(bytes = "vec", tag = "2")]
pub v6: ::prost::alloc::vec::Vec<u8>,
}
/// 设备网络地址信息
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlDevAddr {
#[prost(uint32, tag = "1")]
pub network_id: u32,
#[prost(uint32, tag = "2")]
pub net_addr: u32,
#[prost(uint32, tag = "3")]
pub net_bit_len: u32,
}
/// tcp通讯消息
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlEmpty {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlRegisterSuper {
#[prost(uint32, tag = "1")]
pub version: u32,
#[prost(string, tag = "2")]
pub installed_channel: ::prost::alloc::string::String,
#[prost(string, tag = "3")]
pub client_id: ::prost::alloc::string::String,
#[prost(message, optional, tag = "4")]
pub dev_addr: ::core::option::Option<SdlDevAddr>,
#[prost(string, tag = "5")]
pub pub_key: ::prost::alloc::string::String,
#[prost(string, tag = "6")]
pub token: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlRegisterSuperAck {
#[prost(message, optional, tag = "1")]
pub dev_addr: ::core::option::Option<SdlDevAddr>,
#[prost(bytes = "vec", tag = "2")]
pub aes_key: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes = "vec", tag = "3")]
pub known_ips: ::prost::alloc::vec::Vec<u8>,
#[prost(uint32, tag = "4")]
pub upgrade_type: u32,
#[prost(string, optional, tag = "5")]
pub upgrade_prompt: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag = "6")]
pub upgrade_address: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlRegisterSuperNak {
#[prost(uint32, tag = "1")]
pub error_code: u32,
#[prost(string, tag = "2")]
pub error_message: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlQueryInfo {
#[prost(uint32, tag = "1")]
pub dst_ip: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlPeerInfo {
#[prost(uint32, tag = "1")]
pub dst_ip: u32,
#[prost(message, optional, tag = "2")]
pub v4_info: ::core::option::Option<Sdlv4Info>,
#[prost(message, optional, tag = "3")]
pub v6_info: ::core::option::Option<Sdlv6Info>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlKnownIpEvent {
#[prost(uint32, tag = "1")]
pub ip: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlDropIpEvent {
#[prost(uint32, tag = "1")]
pub ip: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlNatChangedEvent {
#[prost(uint32, tag = "1")]
pub ip: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlSendRegisterEvent {
#[prost(uint32, tag = "1")]
pub dst_ip: u32,
#[prost(uint32, tag = "2")]
pub nat_ip: u32,
#[prost(uint32, tag = "3")]
pub nat_port: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlNetworkShutdownEvent {
#[prost(string, tag = "1")]
pub message: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlChangeNetworkCommand {
#[prost(message, optional, tag = "1")]
pub dev_addr: ::core::option::Option<SdlDevAddr>,
#[prost(bytes = "vec", tag = "2")]
pub aes_key: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes = "vec", tag = "3")]
pub known_ips: ::prost::alloc::vec::Vec<u8>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlCommandAck {
/// status = true, 表示成功status = false 表示失败message是失败原因描述
#[prost(bool, tag = "1")]
pub status: bool,
#[prost(string, optional, tag = "2")]
pub message: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlFlows {
/// 服务器转发流量
#[prost(uint32, tag = "1")]
pub forward_num: u32,
/// p2p直接流量
#[prost(uint32, tag = "2")]
pub p2p_num: u32,
/// 接收的流量
#[prost(uint32, tag = "3")]
pub inbound_num: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlStunRequest {
#[prost(uint32, tag = "1")]
pub cookie: u32,
#[prost(string, tag = "2")]
pub client_id: ::prost::alloc::string::String,
#[prost(uint32, tag = "3")]
pub network_id: u32,
#[prost(uint32, tag = "4")]
pub ip: u32,
#[prost(uint32, tag = "5")]
pub nat_type: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlStunReply {
#[prost(uint32, tag = "1")]
pub cookie: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlData {
#[prost(uint32, tag = "1")]
pub network_id: u32,
#[prost(uint32, tag = "2")]
pub src_ip: u32,
#[prost(uint32, tag = "3")]
pub dst_ip: u32,
#[prost(bool, tag = "4")]
pub is_p2p: bool,
#[prost(uint32, tag = "5")]
pub ttl: u32,
#[prost(bytes = "vec", tag = "6")]
pub data: ::prost::alloc::vec::Vec<u8>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlRegister {
#[prost(uint32, tag = "1")]
pub network_id: u32,
#[prost(uint32, tag = "2")]
pub src_ip: u32,
#[prost(uint32, tag = "3")]
pub dst_ip: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlRegisterAck {
#[prost(uint32, tag = "1")]
pub network_id: u32,
#[prost(uint32, tag = "2")]
pub src_ip: u32,
#[prost(uint32, tag = "3")]
pub dst_ip: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlStunProbe {
#[prost(uint32, tag = "1")]
pub cookie: u32,
#[prost(uint32, tag = "2")]
pub attr: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SdlStunProbeReply {
#[prost(uint32, tag = "1")]
pub cookie: u32,
#[prost(uint32, tag = "2")]
pub port: u32,
#[prost(uint32, tag = "3")]
pub ip: u32,
}

40
src/pb/mod.rs Normal file
View File

@ -0,0 +1,40 @@
mod message;
pub use message::*;
use prost::Message;
use sdlan_sn_rs::utils::Result;
// tcp message has two-byte of size at header
pub fn encode_to_tcp_message<T: Message>(
msg: Option<T>,
packet_id: u32,
packet_type: u8,
) -> Result<Vec<u8>> {
let mut raw_data = Vec::new();
if let Some(msg) = msg {
msg.encode(&mut raw_data)?;
}
let mut result = Vec::with_capacity(raw_data.len() + 7);
let size = u16::to_be_bytes(raw_data.len() as u16 + 5);
result.extend_from_slice(&size);
result.extend_from_slice(&u32::to_be_bytes(packet_id));
result.push(packet_type);
result.extend_from_slice(&raw_data);
Ok(result)
}
// udp message has no two-byte of size at header
pub fn encode_to_udp_message<T: Message>(msg: Option<T>, packet_type: u8) -> Result<Vec<u8>> {
let mut raw_data = Vec::new();
if let Some(msg) = msg {
msg.encode(&mut raw_data)?;
}
let mut result = Vec::with_capacity(raw_data.len() + 1);
result.push(packet_type);
result.extend_from_slice(&raw_data);
Ok(result)
}

5
src/tcp/mod.rs Normal file
View File

@ -0,0 +1,5 @@
mod tcp_codec;
mod tcp_conn;
pub use tcp_codec::*;
pub use tcp_conn::*;

203
src/tcp/tcp_codec.rs Normal file
View File

@ -0,0 +1,203 @@
use tokio::{
io::{AsyncReadExt, BufReader},
net::tcp::OwnedReadHalf,
};
use num_enum::TryFromPrimitive;
use tracing::debug;
#[derive(Debug)]
pub struct SdlanTcp {
pub _packet_id: u32,
pub packet_type: PacketType,
pub current_packet: Vec<u8>,
}
#[derive(Debug, Copy, Clone, TryFromPrimitive)]
#[repr(u8)]
pub enum EventType {
KnownIP = 0x01,
DropIP = 0x02,
NatChanged = 0x03,
SendRegister = 0x04,
NetworkShutdown = 0xFF,
}
#[derive(Debug, Copy, Clone, TryFromPrimitive)]
#[repr(u8)]
pub enum PacketType {
Empty = 0x00,
RegisterSuper = 0x01,
RegisterSuperACK = 0x02,
RegisterSuperNAK = 0x04,
UnRegisterSuper = 0x05,
QueryInfo = 0x06,
PeerInfo = 0x07,
Ping = 0x08,
Pong = 0x09,
Event = 0x10,
Command = 0x11,
CommandACK = 0x12,
FlowTracer = 0x15,
Register = 0x20,
RegisterACK = 0x21,
StunRequest = 0x30,
StunReply = 0x31,
StunProbe = 0x32,
StunProbeReply = 0x33,
Data = 0xff,
}
pub async fn read_a_packet(
reader: &mut BufReader<OwnedReadHalf>,
) -> Result<SdlanTcp, std::io::Error> {
debug!("read a packet");
let size = reader.read_u16().await?;
debug!("1");
let packet_id = reader.read_u32().await?;
debug!("2");
let packet_type = reader.read_u8().await?;
debug!("3");
if size < 5 {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"size less than five",
));
}
let bufsize = (size - 5) as usize;
let mut binary = vec![0; bufsize];
let mut to_read = bufsize;
loop {
if to_read == 0 {
break;
}
let size_got = reader.read(&mut binary[(bufsize - to_read)..]).await?;
if size_got == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"read got zero bytes",
));
}
to_read -= size_got;
}
let Ok(packet_type) = packet_type.try_into() else {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"packet type error",
));
};
let result = SdlanTcp {
_packet_id: packet_id,
packet_type,
current_packet: binary,
};
Ok(result)
}
/*
pub async fn read_a_packet2(reader: &mut OwnedReadHalf) -> Result<SdlanTcp, std::io::Error> {
debug!("read a packet");
let size = reader.read_u16().await?;
debug!("1");
let packet_id = reader.read_u32().await?;
debug!("2");
let packet_type = reader.read_u8().await?;
debug!("3");
if size < 5 {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"size less than five",
));
}
let mut binary = vec![0; (size - 5) as usize];
let mut bytes_read = 0;
loop {
let size_got = reader.read(&mut binary[bytes_read..]).await?;
if size_got == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"read got zero bytes",
));
}
bytes_read += size_got;
if bytes_read == (size - 5) as usize {
break;
}
}
let Ok(packet_type) = packet_type.try_into() else {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"packet type error",
));
};
let result = SdlanTcp {
_packet_id: packet_id,
packet_type,
current_packet: binary,
};
Ok(result)
}
impl SdlanTcpCodec {
pub fn new() -> Self {
Self
}
}
#[derive(Debug)]
pub enum Item {
Data(Vec<u8>),
}
impl Decoder for SdlanTcpCodec {
type Item = SdlanTcp;
type Error = std::io::Error;
fn decode(
&mut self,
src: &mut tokio_util::bytes::BytesMut,
) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
return Ok(None);
}
let size = src.get_u16();
let packet_id = src.get_u32();
let packet_type = src.get_u8();
let mut binary = Vec::with_capacity((size - 5).into());
for i in 0..(size - 5) {
let data = src.get_u8();
binary.push(data);
}
let Ok(packet_type) = packet_type.try_into() else {
return Ok(None);
};
let result = SdlanTcp {
_packet_id: packet_id,
packet_type,
current_packet: binary,
};
Ok(Some(result))
}
}
*/

251
src/tcp/tcp_conn.rs Normal file
View File

@ -0,0 +1,251 @@
use once_cell::sync::OnceCell;
use sdlan_sn_rs::utils::{get_current_timestamp, Result, SDLanError};
use std::future::Future;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::{
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use tokio::io::BufReader;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use futures_util::{future::BoxFuture, pin_mut};
use tokio::{
io::AsyncWriteExt,
net::TcpStream,
sync::mpsc::{channel, Receiver, Sender},
};
use tracing::error;
use crate::config::TCP_PING_TIME;
use crate::tcp::read_a_packet;
use super::tcp_codec::SdlanTcp;
static GLOBAL_TCP_HANDLE: OnceCell<ReadWriterHandle> = OnceCell::new();
pub struct ReadWriteActor {
// actor接收的发送给tcp的接收端由handle存放发送端
// to_tcp: Receiver<Vec<u8>>,
remote: String,
connected: Arc<AtomicBool>,
pong_time: Arc<AtomicU64>,
// actor收到数据之后发送给上层的发送端口,接收端由handle保存
from_tcp: Sender<SdlanTcp>,
}
impl ReadWriteActor {
pub fn new(
remote: &str,
from_tcp: Sender<SdlanTcp>,
connected: Arc<AtomicBool>,
pong_time: Arc<AtomicU64>,
) -> Self {
Self {
// to_tcp,
pong_time,
connected,
remote: remote.to_owned(),
from_tcp,
}
}
pub async fn run<'a, T>(
&self,
keep_reconnect: bool,
mut to_tcp: Receiver<Vec<u8>>,
on_connected: T,
mut start_stop_chan: Receiver<bool>,
// cancel: CancellationToken,
) where
T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()>,
{
// let (tx, rx) = channel(20);
let mut started = false;
loop {
self.connected.store(false, Ordering::Relaxed);
if !started {
println!("waiting for start");
while let Some(m) = start_stop_chan.recv().await {
if m {
println!("true received");
started = true;
break;
} else {
println!("false received");
}
}
}
println!("try connecting...");
let Ok(mut stream) = TcpStream::connect(&self.remote).await else {
self.connected.store(false, Ordering::Relaxed);
if keep_reconnect {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(3)) => {
continue;
}
}
// tokio::time::sleep(Duration::from_secs(3)).await;
// continue;
}
return;
};
self.connected.store(true, Ordering::Relaxed);
on_connected(&mut stream).await;
// stream.write("hello".as_bytes()).await;
let (reader, mut write) = stream.into_split();
let read_from_tcp = async move {
let mut buffed_reader = BufReader::new(reader);
loop {
match read_a_packet(&mut buffed_reader).await {
Ok(packet) => {
debug!("got packet: {:?}", packet);
if let Err(_e) = self.from_tcp.send(packet).await {
error!("failed to receive a packet: {:?}", _e);
}
}
Err(e) => {
error!("failed to read a packet: {}, reconnecting...", e);
return;
}
}
}
};
let write_to_tcp = async {
while let Some(data) = to_tcp.recv().await {
match write.write(&data).await {
Ok(size) => {
debug!("{} bytes sent to tcp", size);
}
Err(e) => {
error!("failed to write to tcp: {}", e.to_string());
return;
}
}
}
println!("to_tcp recv None");
};
let check_pong = async {
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
let connected = self.connected.load(Ordering::Relaxed);
let now = get_current_timestamp();
if connected && now - self.pong_time.load(Ordering::Relaxed) > TCP_PING_TIME * 2
{
// pong time expire, need to re-connect
error!("pong check expired");
return;
}
}
};
pin_mut!(read_from_tcp, write_to_tcp);
tokio::select! {
_ = read_from_tcp => {},
_ = write_to_tcp => {},
_ = check_pong => {},
Some(false) = start_stop_chan.recv() => {
started = false;
}
}
println!("connect retrying");
// future::select(read_from_tcp, write_to_tcp).await;
}
}
}
#[derive(Debug)]
pub struct ReadWriterHandle {
connected: Arc<AtomicBool>,
send_to_tcp: Sender<Vec<u8>>,
// pub data_from_tcp: Receiver<SdlanTcp>,
}
impl ReadWriterHandle {
pub async fn send(&self, data: &[u8]) -> Result<()> {
if self.connected.load(Ordering::Relaxed) {
// connected, send to it
if let Err(e) = self.send_to_tcp.send(Vec::from(data)).await {
println!("failed to send to send_to_tcp: {}", e.to_string());
return Err(SDLanError::NormalError("failed to send"));
};
debug!("tcp info sent");
} else {
error!("tcp not connected, so not sending data");
return Err(SDLanError::NormalError("not connected, so not sending"));
}
Ok(())
}
fn new<'a, T, T2, F>(
addr: &str,
on_connected: T,
on_message: T2,
pong_time: Arc<AtomicU64>,
start_stop_chan: Receiver<bool>,
// cancel: CancellationToken,
) -> Self
where
T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()> + Send + 'static,
T2: Fn(SdlanTcp) -> F + Send + 'static,
F: Future<Output = ()> + Send,
{
let (send_to_tcp, to_tcp) = channel(20);
let (from_tcp, mut data_from_tcp) = channel(20);
let connected = Arc::new(AtomicBool::new(false));
let actor = ReadWriteActor::new(addr, from_tcp, connected.clone(), pong_time);
tokio::spawn(async move { actor.run(true, to_tcp, on_connected, start_stop_chan).await });
tokio::spawn(async move {
loop {
if let Some(msg) = data_from_tcp.recv().await {
on_message(msg).await;
} else {
println!("data from tcp exited");
return;
}
}
});
ReadWriterHandle {
connected,
send_to_tcp,
// data_from_tcp,
}
}
}
pub fn init_tcp_conn<'a, T, T2, F>(
addr: &str,
on_connected: T,
on_message: T2,
pong_time: Arc<AtomicU64>,
// cancel: CancellationToken,
start_stop_chan: Receiver<bool>,
) where
T: for<'b> Fn(&'b mut TcpStream) -> BoxFuture<'b, ()> + Send + 'static,
T2: Fn(SdlanTcp) -> F + Send + 'static,
F: Future<Output = ()> + Send,
{
let tcp_handle =
ReadWriterHandle::new(addr, on_connected, on_message, pong_time, start_stop_chan);
GLOBAL_TCP_HANDLE
.set(tcp_handle)
.expect("failed to set global tcp handle");
}
pub fn get_tcp_conn() -> &'static ReadWriterHandle {
match GLOBAL_TCP_HANDLE.get() {
Some(v) => v,
None => panic!("should call init_tcp_conn first"),
}
}

47
src/utils/command.rs Normal file
View File

@ -0,0 +1,47 @@
use structopt::StructOpt;
#[derive(StructOpt, Debug)]
pub struct CommandLine {
#[structopt(short = "s", long = "sn", default_value = "127.0.0.1:7655")]
pub sn: String,
#[structopt(short = "t", long = "tcp", default_value = "127.0.0.1:7656")]
pub tcp: String,
#[structopt(short = "r")]
pub _allow_routing: bool,
#[structopt(
help = "ttl of the register udp4 packet",
short = "L",
default_value = "1"
)]
pub register_ttl: u8,
#[structopt(help = "mtu of the tun", short = "m", default_value = "1290")]
pub mtu: u32,
#[structopt(short = "n", long = "name", default_value = "test-name")]
pub name: String,
#[structopt(long = "tos", default_value = "0")]
pub tos: u32,
#[structopt(long = "token", default_value = "0")]
pub token: String,
}
impl Clone for CommandLine {
fn clone(&self) -> Self {
Self {
sn: self.sn.clone(),
tcp: self.tcp.clone(),
_allow_routing: self._allow_routing,
register_ttl: self.register_ttl,
mtu: self.mtu,
name: self.name.clone(),
tos: self.tos,
token: self.token.clone(),
}
}
}

8
src/utils/mod.rs Normal file
View File

@ -0,0 +1,8 @@
mod command;
pub use command::*;
mod socks;
pub use socks::*;
mod pid_recorder;
pub use pid_recorder::PidRecorder;

36
src/utils/pid_recorder.rs Normal file
View File

@ -0,0 +1,36 @@
use std::{
fs::{self, OpenOptions},
io::Write,
};
pub struct PidRecorder(String);
impl PidRecorder {
pub fn new(pidfile: &str) -> Self {
let pid = std::process::id();
match OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(pidfile)
{
Ok(mut fp) => {
fp.write(format!("{}", pid).as_bytes())
.expect("failed to write");
}
Err(e) => {
println!("failed to open pid file: {}", e);
}
}
Self(pidfile.to_owned())
}
}
impl Drop for PidRecorder {
fn drop(&mut self) {
if let Err(e) = fs::remove_file(&self.0) {
println!("failed to remove pid file: {}", e);
}
}
}

145
src/utils/socks.rs Normal file
View File

@ -0,0 +1,145 @@
use sdlan_sn_rs::{
config::{AF_INET, AF_INET6},
utils::{Result, SDLanError},
};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use tokio::net::ToSocketAddrs;
use tracing::error;
use sdlan_sn_rs::peer::SdlanSock;
use tokio::net::UdpSocket;
use crate::network::Node;
pub struct Socket {
udp: UdpSocket,
}
impl Socket {
pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], target: A) -> Result<usize> {
let m = self.udp.send_to(buf, target).await?;
Ok(m)
}
pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> {
let m = self.udp.recv_from(buf).await?;
Ok(m)
}
pub fn ttl(&self) -> Result<u32> {
if let Ok(v) = self.udp.ttl() {
Ok(v)
} else {
Err(SDLanError::NormalError("no ttl found"))
}
}
pub fn set_ttl(&self, ttl: u32) -> Result<()> {
if let Ok(_) = self.udp.set_ttl(ttl) {
Ok(())
} else {
Err(SDLanError::NormalError("failed to set ttl"))
}
}
pub async fn build(port: u16, bind_any: bool, join_multicast: bool, tos: u32) -> Result<Self> {
let addr = match bind_any {
true => "0.0.0.0",
false => "127.0.0.1",
};
let udp = UdpSocket::bind(format!("{}:{}", addr, port)).await?;
if join_multicast {
if let Err(e) =
udp.join_multicast_v4(Ipv4Addr::new(224, 0, 0, 69), Ipv4Addr::new(0, 0, 0, 0))
{
error!("failed to join multicast: {}", e.to_string());
}
}
if tos != 0 {
if let Err(e) = udp.set_tos(tos) {
error!("failed to set tos: {}", e.to_string());
}
}
Ok(Self { udp })
}
}
/*
pub async fn send_to_sock_v4_and_v6(
// sk: &Socket,
eee: &Node,
content: &[u8],
sock: &SdlanSock,
v6: &Option<V6Info>,
) -> Result<()> {
let _ = send_to_sock(&eee, content, sock).await;
if let Some(v6) = v6 {
// let sk6 = eee.udp_sock_v6.read().unwrap().clone();
let sock = SdlanSock {
family: AF_INET6,
port: v6.port,
v4: [0; 4],
v6: v6.v6,
};
let _ = send_to_sock(eee, content, &sock).await;
}
Ok(())
}
*/
pub async fn send_to_sock(
// sk: &Socket,
eee: &Node,
content: &[u8],
sock: &SdlanSock,
// v6: &Option<V6Info>,
) -> Result<()> {
match sock.family {
AF_INET => {
// sockv4
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(sock.v4)), sock.port);
eee.udp_sock_v4.send_to(content, addr).await?;
return Ok(());
}
AF_INET6 => {
// sock v6
let sk = eee.udp_sock_v6.read().unwrap().clone();
match sk.as_ref() {
None => {
error!("ipv6 not opened, not responding");
return Ok(());
}
Some(sk) => {
let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::from(sock.v6)), sock.port);
sk.send_to(content, addr).await?;
return Ok(());
}
}
}
other => {
error!("unknown family {}, aborting", other);
return Err(SDLanError::NormalError("unknown family"));
}
}
}
/*
pub async fn send_to_sock(sk: &Socket, content: &[u8], sock: &SdlanSock) -> Result<usize> {
match sock.family {
AF_INET => {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(sock.v4)), sock.port);
let n = sk.send_to(content, addr).await?;
return Ok(n);
}
AF_INET6 => {
let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::from(sock.v6)), sock.port);
let n = sk.send_to(content, addr).await?;
return Ok(n);
}
other => {
error!("AF family {} not implemented", other);
return Ok(0);
}
}
}
*/