Compare commits
2 Commits
6ef685420c
...
c5b04eb843
| Author | SHA1 | Date | |
|---|---|---|---|
| c5b04eb843 | |||
| 214e73a0b9 |
2
.vscode/settings.json
vendored
2
.vscode/settings.json
vendored
@ -1,4 +1,4 @@
|
||||
{
|
||||
"rust-analyzer.cargo.target": "x86_64-pc-windows-gnu"
|
||||
// "rust-analyzer.cargo.target": "x86_64-pc-windows-gnu"
|
||||
// "rust-analyzer.cargo.features": ["tun"]
|
||||
}
|
||||
@ -78,6 +78,7 @@ pub async fn run_sdlan(
|
||||
sock,
|
||||
hostname,
|
||||
server_ip,
|
||||
install_channel.to_owned(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@ -87,10 +88,10 @@ pub async fn run_sdlan(
|
||||
debug!("edge inited");
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let install_chan = install_channel.to_owned();
|
||||
// let install_chan = install_channel.to_owned();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) =
|
||||
async_main(install_chan, args, start_stop_chan, cancel, connecting_chan).await
|
||||
async_main(args, start_stop_chan, cancel, connecting_chan).await
|
||||
{
|
||||
error!("failed to run async main: {}", e.as_str());
|
||||
}
|
||||
|
||||
@ -1,289 +1,34 @@
|
||||
use std::net::IpAddr;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::config::{NULL_MAC, TCP_PING_TIME};
|
||||
use crate::config::{TCP_PING_TIME};
|
||||
use crate::network::ipv6::run_ipv6;
|
||||
use crate::network::{
|
||||
get_edge, ping_to_sn, read_and_parse_packet, RegisterSuperFeedback, TunTapPacketHandler,
|
||||
get_edge, ping_to_sn, read_and_parse_packet, TunTapPacketHandler,
|
||||
};
|
||||
use crate::pb::{
|
||||
encode_to_tcp_message, encode_to_udp_message, SdlDevAddr, SdlRegisterSuper,
|
||||
SdlRegisterSuperAck, SdlRegisterSuperNak, SdlSendRegisterEvent, SdlStunRequest, Sdlv6Info,
|
||||
};
|
||||
use crate::tcp::{init_tcp_conn, EventType, NakMsgCode, NatType, PacketType, SdlanTcp};
|
||||
use crate::tcp::{init_tcp_conn, send_stun_request};
|
||||
use crate::utils::{send_to_sock, CommandLine};
|
||||
use crate::{ConnectionInfo, ConnectionState};
|
||||
use sdlan_sn_rs::config::AF_INET;
|
||||
use sdlan_sn_rs::peer::{SdlanSock, V6Info};
|
||||
use sdlan_sn_rs::utils::{get_current_timestamp, ip_to_string, is_multi_broadcast, rsa_decrypt};
|
||||
use crate::{ConnectionInfo};
|
||||
use sdlan_sn_rs::peer::{SdlanSock};
|
||||
use sdlan_sn_rs::utils::{get_current_timestamp, is_multi_broadcast};
|
||||
use sdlan_sn_rs::utils::{Mac, Result};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::net::{UdpSocket};
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use super::{check_peer_registration_needed, packet, Node, StartStopInfo};
|
||||
use super::{Node, StartStopInfo};
|
||||
use crate::utils::Socket;
|
||||
|
||||
use prost::Message;
|
||||
use tracing::{debug, error};
|
||||
|
||||
async fn handle_tcp_message(msg: SdlanTcp) {
|
||||
let edge = get_edge();
|
||||
|
||||
// let now = get_current_timestamp();
|
||||
// edge.tcp_pong.store(now, Ordering::Relaxed);
|
||||
|
||||
debug!("got tcp message: {:?}", msg.packet_type);
|
||||
match msg.packet_type {
|
||||
PacketType::RegisterSuperACK => {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 0,
|
||||
message: "".to_owned(),
|
||||
should_exit: false,
|
||||
},
|
||||
);
|
||||
let Ok(ack) = SdlRegisterSuperAck::decode(&msg.current_packet[..]) else {
|
||||
error!("failed to decode REGISTER_SUPER_ACK");
|
||||
return;
|
||||
};
|
||||
debug!("got register super ack: {:?}", ack);
|
||||
let Ok(aes) = rsa_decrypt(&edge.rsa_private, &ack.aes_key) else {
|
||||
error!("failed to rsa decrypt aes key");
|
||||
return;
|
||||
};
|
||||
let Some(dev) = ack.dev_addr else {
|
||||
error!("no dev_addr is specified");
|
||||
return;
|
||||
};
|
||||
|
||||
let ip = ip_to_string(&dev.net_addr);
|
||||
// debug!("aes key is {:?}, ip is {}/{}", aes, ip, dev.net_bit_len,);
|
||||
println!("assigned ip: {}", ip);
|
||||
let hostname = edge.hostname.read().unwrap().clone();
|
||||
println!("network is: {}.{}", hostname, dev.network_domain);
|
||||
edge.device_config
|
||||
.ip
|
||||
.net_addr
|
||||
.store(dev.net_addr, Ordering::Relaxed);
|
||||
if let Some(ref chan) = edge.connection_chan {
|
||||
let _ = chan.send(ConnectionInfo::IPInfo(ip)).await;
|
||||
}
|
||||
/*
|
||||
let mac = match dev.mac.try_into() {
|
||||
Err(_) => NULL_MAC,
|
||||
Ok(m) => m,
|
||||
};
|
||||
*/
|
||||
// *edge.device_config.mac.write().unwrap() = mac;
|
||||
edge.device_config
|
||||
.ip
|
||||
.net_bit_len
|
||||
.store(dev.net_bit_len as u8, Ordering::Relaxed);
|
||||
edge.device.reload_config(&edge.device_config, &dev.network_domain);
|
||||
edge.network_id.store(dev.network_id, Ordering::Relaxed);
|
||||
|
||||
edge.set_authorized(true, aes);
|
||||
send_stun_request(edge).await;
|
||||
tokio::spawn(async {
|
||||
let nattype = edge.probe_nat_type().await;
|
||||
debug!("nat type is {:?}", nattype);
|
||||
// println!("nat type is: {:?}", nattype);
|
||||
});
|
||||
}
|
||||
PacketType::RegisterSuperNAK => {
|
||||
let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else {
|
||||
error!("failed to decode REGISTER_SUPER_NAK");
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 1,
|
||||
message: "failed to decode REGISTER SUPER NAK".to_owned(),
|
||||
should_exit: false,
|
||||
},
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
let Ok(error_code) = NakMsgCode::try_from(_nak.error_code as u8) else {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 2,
|
||||
message: "error_code not recognized".to_owned(),
|
||||
should_exit: false,
|
||||
},
|
||||
);
|
||||
return;
|
||||
};
|
||||
match error_code {
|
||||
NakMsgCode::InvalidToken => {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 3,
|
||||
message: "invalid token".to_owned(),
|
||||
should_exit: true,
|
||||
},
|
||||
);
|
||||
edge.stop().await;
|
||||
}
|
||||
NakMsgCode::NodeDisabled => {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 4,
|
||||
message: "Node is disabled".to_owned(),
|
||||
should_exit: true,
|
||||
},
|
||||
);
|
||||
edge.stop().await;
|
||||
}
|
||||
_other => {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 0,
|
||||
message: "".to_owned(),
|
||||
should_exit: false,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
/*
|
||||
edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback {
|
||||
result: 1,
|
||||
message: "failed to decode REGISTER SUPER NAK".to_owned(),
|
||||
});
|
||||
*/
|
||||
edge.set_authorized(false, Vec::new());
|
||||
// std::process::exit(0);
|
||||
}
|
||||
PacketType::Command => {
|
||||
if msg.current_packet.len() < 1 {
|
||||
error!("malformed COMMAND received");
|
||||
return;
|
||||
}
|
||||
handle_tcp_command(edge, msg.current_packet[0], &msg.current_packet[1..]).await;
|
||||
}
|
||||
PacketType::Event => {
|
||||
if msg.current_packet.len() < 1 {
|
||||
error!("malformed EVENT received");
|
||||
return;
|
||||
}
|
||||
let Ok(event) = msg.current_packet[0].try_into() else {
|
||||
error!("failed to parse event type");
|
||||
return;
|
||||
};
|
||||
handle_tcp_event(edge, event, &msg.current_packet[1..]).await;
|
||||
}
|
||||
PacketType::PeerInfo => {
|
||||
let _ = packet::handle_packet_peer_info(edge, &msg.current_packet[..]).await;
|
||||
}
|
||||
PacketType::Pong => {
|
||||
debug!("tcp pong received");
|
||||
let now = get_current_timestamp();
|
||||
edge.tcp_pong.store(now, Ordering::Relaxed);
|
||||
}
|
||||
other => {
|
||||
debug!("tcp not handling {:?}", other);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_tcp_command(_edge: &Node, _cmdtype: u8, _cmdprotobuf: &[u8]) {}
|
||||
|
||||
async fn handle_tcp_event(edge: &'static Node, eventtype: EventType, eventprotobuf: &[u8]) {
|
||||
match eventtype {
|
||||
EventType::SendRegister => {
|
||||
let Ok(reg) = SdlSendRegisterEvent::decode(eventprotobuf) else {
|
||||
error!("failed to decode SendRegister Event");
|
||||
return;
|
||||
};
|
||||
let v4 = reg.nat_ip.to_be_bytes();
|
||||
let mut v6_sock = None;
|
||||
if let Some(v6_info) = reg.v6_info {
|
||||
if let Ok(v6_bytes) = v6_info.v6.try_into() {
|
||||
v6_sock = Some(V6Info {
|
||||
port: v6_info.port as u16,
|
||||
v6: v6_bytes,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let dst_mac = match reg.dst_mac.try_into() {
|
||||
Ok(m) => m,
|
||||
Err(_e) => NULL_MAC,
|
||||
};
|
||||
|
||||
let remote_nat_byte = reg.nat_type as u8;
|
||||
let remote_nat = match remote_nat_byte.try_into() {
|
||||
Ok(t) => t,
|
||||
Err(_) => NatType::NoNat,
|
||||
};
|
||||
|
||||
check_peer_registration_needed(
|
||||
edge,
|
||||
false,
|
||||
dst_mac,
|
||||
// &v6_sock,
|
||||
remote_nat,
|
||||
&v6_sock,
|
||||
&SdlanSock {
|
||||
family: AF_INET,
|
||||
port: reg.nat_port as u16,
|
||||
v4,
|
||||
v6: [0; 16],
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
other => {
|
||||
debug!("unhandled event {:?}", other);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn async_main(
|
||||
install_channel: String,
|
||||
args: CommandLine,
|
||||
start_stop_chan: Receiver<StartStopInfo>,
|
||||
cancel: CancellationToken,
|
||||
connecting_chan: Option<Sender<ConnectionInfo>>,
|
||||
) -> Result<()> {
|
||||
// let _ = PidRecorder::new(".pid");
|
||||
|
||||
// // gen public key
|
||||
// gen_rsa_keys(".client");
|
||||
// let mut pubkey = String::new();
|
||||
// File::open(".client/id_rsa.pub")?.read_to_string(&mut pubkey)?;
|
||||
// let privatekey = load_private_key_file(".client/id_rsa")?;
|
||||
|
||||
// // init sock
|
||||
// if args.token.len() == 0 {
|
||||
// println!("failed to load token");
|
||||
// return Ok(());
|
||||
// }
|
||||
// let sock_v4 = Socket::build(0, true, true, args.tos).await?;
|
||||
|
||||
// // allow multicast
|
||||
|
||||
// // TODO: set the sn's tcp socket
|
||||
// // let tcpsock = TCPSocket::build("121.4.79.234:1234").await?;
|
||||
// let tcp_pong = Arc::new(AtomicU64::new(0));
|
||||
// let edge = Node::new(
|
||||
// pubkey,
|
||||
// node_conf,
|
||||
// sock_v4,
|
||||
// &args.token,
|
||||
// privatekey,
|
||||
// tcp_pong.clone(),
|
||||
// );
|
||||
|
||||
let edge = get_edge();
|
||||
|
||||
// let token = args.token.clone();
|
||||
@ -293,61 +38,7 @@ pub async fn async_main(
|
||||
init_tcp_conn(
|
||||
cancel_tcp,
|
||||
&args.tcp,
|
||||
move |stream, pkt_id| {
|
||||
let installed_channel = install_channel.to_owned();
|
||||
Box::pin(async move {
|
||||
let token = edge._token.lock().unwrap().clone();
|
||||
let code = edge.network_code.lock().unwrap().clone();
|
||||
// let edge = get_edge();
|
||||
// let edge = get_edge();
|
||||
// let token = args.token.clone();
|
||||
if let Ok(ipaddr) = stream.local_addr() {
|
||||
match ipaddr.ip() {
|
||||
IpAddr::V4(v4) => {
|
||||
let ip = v4.into();
|
||||
// println!("outer ip is {} => {}", v4, ip);
|
||||
edge.outer_ip_v4.store(ip, Ordering::Relaxed);
|
||||
}
|
||||
_other => {}
|
||||
}
|
||||
}
|
||||
let register_super = SdlRegisterSuper {
|
||||
version: 1,
|
||||
installed_channel,
|
||||
client_id: edge.config.node_uuid.clone(),
|
||||
dev_addr: Some(SdlDevAddr {
|
||||
mac: Vec::from(edge.device_config.get_mac()),
|
||||
net_addr: 0,
|
||||
network_id: 0,
|
||||
net_bit_len: 0,
|
||||
network_domain: "".to_owned(),
|
||||
}),
|
||||
pub_key: edge.rsa_pubkey.clone(),
|
||||
token,
|
||||
network_code: code,
|
||||
hostname: edge.hostname.read().unwrap().clone(),
|
||||
};
|
||||
// debug!("send register super: {:?}", register_super);
|
||||
let packet_id = match pkt_id {
|
||||
Some(id) => id,
|
||||
None => edge.get_next_packet_id(),
|
||||
};
|
||||
// let packet_id = edge.get_next_packet_id();
|
||||
let data = encode_to_tcp_message(
|
||||
Some(register_super),
|
||||
packet_id,
|
||||
PacketType::RegisterSuper as u8,
|
||||
)
|
||||
.unwrap();
|
||||
if let Err(e) = stream.write(&data).await {
|
||||
error!("failed to write to tcp: {}", e.to_string());
|
||||
}
|
||||
})
|
||||
},
|
||||
|| async {
|
||||
edge.set_authorized(false, vec![]);
|
||||
},
|
||||
|msg| handle_tcp_message(msg),
|
||||
// |msg| handle_tcp_message(msg),
|
||||
edge.tcp_pong.clone(),
|
||||
// tcp_pong,
|
||||
start_stop_chan,
|
||||
@ -448,36 +139,6 @@ async fn run_edge_loop(eee: &'static Node, cancel: CancellationToken) {
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_stun_request(eee: &Node) {
|
||||
let sdl_v6_info = match *eee.ipv6.read().unwrap() {
|
||||
Some(ref l) => Some(Sdlv6Info {
|
||||
port: l.port as u32,
|
||||
v6: Vec::from(l.v6),
|
||||
}),
|
||||
None => None,
|
||||
};
|
||||
let req = SdlStunRequest {
|
||||
cookie: 0,
|
||||
client_id: eee.config.node_uuid.clone(),
|
||||
network_id: eee.network_id.load(Ordering::Relaxed),
|
||||
ip: eee.device_config.get_ip(),
|
||||
mac: Vec::from(eee.device_config.get_mac()),
|
||||
nat_type: eee.get_nat_type() as u32,
|
||||
v6_info: sdl_v6_info,
|
||||
};
|
||||
debug!("stun request: {:?}", req);
|
||||
let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap();
|
||||
if let Err(e) = send_to_sock(
|
||||
eee,
|
||||
&msg,
|
||||
&eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize],
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("failed to send to sock: {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn loop_socket_v6(eee: &'static Node, socket: Arc<Socket>, cancel: CancellationToken) {
|
||||
debug!("loop sock v6");
|
||||
loop {
|
||||
|
||||
@ -42,6 +42,7 @@ pub async fn init_edge(
|
||||
udpsock_for_dns: Arc<UdpSocket>,
|
||||
hostname: String,
|
||||
server_ip: String,
|
||||
install_channel: String
|
||||
) -> Result<()> {
|
||||
// gen public key
|
||||
let rsa_path = format!("{}/.client", get_base_dir());
|
||||
@ -88,6 +89,7 @@ pub async fn init_edge(
|
||||
hostname,
|
||||
udpsock_for_dns,
|
||||
server_ip,
|
||||
install_channel,
|
||||
);
|
||||
do_init_edge(edge)?;
|
||||
|
||||
@ -175,6 +177,8 @@ pub struct Node {
|
||||
// last register super time, in unix
|
||||
pub _last_register_req: AtomicU64,
|
||||
|
||||
pub install_channel: String,
|
||||
|
||||
nat_type: Mutex<NatType>,
|
||||
|
||||
nat_cookie: AtomicU32,
|
||||
@ -287,6 +291,7 @@ impl Node {
|
||||
hostname: String,
|
||||
udpsock_for_dns: Arc<UdpSocket>,
|
||||
server_ip: String,
|
||||
install_channel: String,
|
||||
) -> Self {
|
||||
let mode = if cfg!(not(feature = "tun")) {
|
||||
Mode::Tap
|
||||
@ -347,6 +352,7 @@ impl Node {
|
||||
nat_cookie: AtomicU32::new(1),
|
||||
cookie_match: DashMap::new(),
|
||||
server_ip,
|
||||
install_channel,
|
||||
}
|
||||
}
|
||||
|
||||
@ -495,7 +501,7 @@ impl Node {
|
||||
encode_to_tcp_message::<SdlEmpty>(None, 0, PacketType::UnRegisterSuper as u8).unwrap();
|
||||
|
||||
let conn = get_tcp_conn();
|
||||
let _ = conn.send(&content).await;
|
||||
let _ = conn.send(content).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -1143,7 +1143,7 @@ async fn send_query_peer(eee: &Node, dst_mac: Mac) -> Result<()> {
|
||||
return Err(SDLanError::NormalError("encode query error"));
|
||||
};
|
||||
let tcp_conn = get_tcp_conn();
|
||||
tcp_conn.send(&content).await
|
||||
tcp_conn.send(content).await
|
||||
}
|
||||
|
||||
pub async fn ping_to_sn() {
|
||||
@ -1153,7 +1153,7 @@ pub async fn ping_to_sn() {
|
||||
};
|
||||
debug!("ping to sn");
|
||||
let tcp_conn = get_tcp_conn();
|
||||
if let Err(e) = tcp_conn.send(&msg).await {
|
||||
if let Err(e) = tcp_conn.send(msg).await {
|
||||
error!("failed to ping to sn: {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,6 +1,10 @@
|
||||
use myactor::{ActorError, SupervisedActor};
|
||||
use once_cell::sync::OnceCell;
|
||||
use sdlan_sn_rs::utils::{get_current_timestamp, Result, SDLanError};
|
||||
use std::future::Future;
|
||||
use prost::Message;
|
||||
use sdlan_sn_rs::config::AF_INET;
|
||||
use sdlan_sn_rs::peer::{SdlanSock, V6Info};
|
||||
use sdlan_sn_rs::utils::{Result, SDLanError, get_current_timestamp, ip_to_string, rsa_decrypt};
|
||||
use std::net::IpAddr;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::Arc;
|
||||
use std::{
|
||||
@ -19,15 +23,325 @@ use tokio::{
|
||||
};
|
||||
use tracing::error;
|
||||
|
||||
use crate::config::TCP_PING_TIME;
|
||||
use crate::network::StartStopInfo;
|
||||
use crate::tcp::read_a_packet;
|
||||
use crate::{ConnectionInfo, ConnectionState};
|
||||
use crate::config::{NULL_MAC, TCP_PING_TIME};
|
||||
use crate::network::{Node, RegisterSuperFeedback, StartStopInfo, check_peer_registration_needed, handle_packet_peer_info};
|
||||
use crate::pb::{SdlDevAddr, SdlRegisterSuper, SdlRegisterSuperAck, SdlRegisterSuperNak, SdlSendRegisterEvent, SdlStunRequest, Sdlv6Info, encode_to_tcp_message, encode_to_udp_message};
|
||||
use crate::tcp::{EventType, NakMsgCode, NatType, PacketType, read_a_packet};
|
||||
use crate::utils::send_to_sock;
|
||||
use crate::{ConnectionInfo, ConnectionState, get_edge};
|
||||
|
||||
use super::tcp_codec::SdlanTcp;
|
||||
|
||||
static GLOBAL_TCP_HANDLE: OnceCell<ReadWriterHandle> = OnceCell::new();
|
||||
|
||||
|
||||
async fn handle_tcp_message(msg: SdlanTcp) {
|
||||
let edge = get_edge();
|
||||
|
||||
// let now = get_current_timestamp();
|
||||
// edge.tcp_pong.store(now, Ordering::Relaxed);
|
||||
|
||||
debug!("got tcp message: {:?}", msg.packet_type);
|
||||
match msg.packet_type {
|
||||
PacketType::RegisterSuperACK => {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 0,
|
||||
message: "".to_owned(),
|
||||
should_exit: false,
|
||||
},
|
||||
);
|
||||
let Ok(ack) = SdlRegisterSuperAck::decode(&msg.current_packet[..]) else {
|
||||
error!("failed to decode REGISTER_SUPER_ACK");
|
||||
return;
|
||||
};
|
||||
debug!("got register super ack: {:?}", ack);
|
||||
let Ok(aes) = rsa_decrypt(&edge.rsa_private, &ack.aes_key) else {
|
||||
error!("failed to rsa decrypt aes key");
|
||||
return;
|
||||
};
|
||||
let Some(dev) = ack.dev_addr else {
|
||||
error!("no dev_addr is specified");
|
||||
return;
|
||||
};
|
||||
|
||||
let ip = ip_to_string(&dev.net_addr);
|
||||
// debug!("aes key is {:?}, ip is {}/{}", aes, ip, dev.net_bit_len,);
|
||||
println!("assigned ip: {}", ip);
|
||||
let hostname = edge.hostname.read().unwrap().clone();
|
||||
println!("network is: {}.{}", hostname, dev.network_domain);
|
||||
edge.device_config
|
||||
.ip
|
||||
.net_addr
|
||||
.store(dev.net_addr, Ordering::Relaxed);
|
||||
if let Some(ref chan) = edge.connection_chan {
|
||||
let _ = chan.send(ConnectionInfo::IPInfo(ip)).await;
|
||||
}
|
||||
/*
|
||||
let mac = match dev.mac.try_into() {
|
||||
Err(_) => NULL_MAC,
|
||||
Ok(m) => m,
|
||||
};
|
||||
*/
|
||||
// *edge.device_config.mac.write().unwrap() = mac;
|
||||
edge.device_config
|
||||
.ip
|
||||
.net_bit_len
|
||||
.store(dev.net_bit_len as u8, Ordering::Relaxed);
|
||||
edge.device.reload_config(&edge.device_config, &dev.network_domain);
|
||||
edge.network_id.store(dev.network_id, Ordering::Relaxed);
|
||||
|
||||
edge.set_authorized(true, aes);
|
||||
send_stun_request(edge).await;
|
||||
tokio::spawn(async {
|
||||
let nattype = edge.probe_nat_type().await;
|
||||
debug!("nat type is {:?}", nattype);
|
||||
// println!("nat type is: {:?}", nattype);
|
||||
});
|
||||
}
|
||||
PacketType::RegisterSuperNAK => {
|
||||
let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else {
|
||||
error!("failed to decode REGISTER_SUPER_NAK");
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 1,
|
||||
message: "failed to decode REGISTER SUPER NAK".to_owned(),
|
||||
should_exit: false,
|
||||
},
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
let Ok(error_code) = NakMsgCode::try_from(_nak.error_code as u8) else {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 2,
|
||||
message: "error_code not recognized".to_owned(),
|
||||
should_exit: false,
|
||||
},
|
||||
);
|
||||
return;
|
||||
};
|
||||
match error_code {
|
||||
NakMsgCode::InvalidToken => {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 3,
|
||||
message: "invalid token".to_owned(),
|
||||
should_exit: true,
|
||||
},
|
||||
);
|
||||
edge.stop().await;
|
||||
}
|
||||
NakMsgCode::NodeDisabled => {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 4,
|
||||
message: "Node is disabled".to_owned(),
|
||||
should_exit: true,
|
||||
},
|
||||
);
|
||||
edge.stop().await;
|
||||
}
|
||||
_other => {
|
||||
edge.send_register_super_feedback(
|
||||
msg._packet_id,
|
||||
RegisterSuperFeedback {
|
||||
result: 0,
|
||||
message: "".to_owned(),
|
||||
should_exit: false,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
/*
|
||||
edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback {
|
||||
result: 1,
|
||||
message: "failed to decode REGISTER SUPER NAK".to_owned(),
|
||||
});
|
||||
*/
|
||||
edge.set_authorized(false, Vec::new());
|
||||
// std::process::exit(0);
|
||||
}
|
||||
PacketType::Command => {
|
||||
if msg.current_packet.len() < 1 {
|
||||
error!("malformed COMMAND received");
|
||||
return;
|
||||
}
|
||||
handle_tcp_command(edge, msg.current_packet[0], &msg.current_packet[1..]).await;
|
||||
}
|
||||
PacketType::Event => {
|
||||
if msg.current_packet.len() < 1 {
|
||||
error!("malformed EVENT received");
|
||||
return;
|
||||
}
|
||||
let Ok(event) = msg.current_packet[0].try_into() else {
|
||||
error!("failed to parse event type");
|
||||
return;
|
||||
};
|
||||
handle_tcp_event(edge, event, &msg.current_packet[1..]).await;
|
||||
}
|
||||
PacketType::PeerInfo => {
|
||||
let _ = handle_packet_peer_info(edge, &msg.current_packet[..]).await;
|
||||
}
|
||||
PacketType::Pong => {
|
||||
debug!("tcp pong received");
|
||||
let now = get_current_timestamp();
|
||||
edge.tcp_pong.store(now, Ordering::Relaxed);
|
||||
}
|
||||
other => {
|
||||
debug!("tcp not handling {:?}", other);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_tcp_command(_edge: &Node, _cmdtype: u8, _cmdprotobuf: &[u8]) {}
|
||||
|
||||
async fn handle_tcp_event(edge: &'static Node, eventtype: EventType, eventprotobuf: &[u8]) {
|
||||
match eventtype {
|
||||
EventType::SendRegister => {
|
||||
let Ok(reg) = SdlSendRegisterEvent::decode(eventprotobuf) else {
|
||||
error!("failed to decode SendRegister Event");
|
||||
return;
|
||||
};
|
||||
let v4 = reg.nat_ip.to_be_bytes();
|
||||
let mut v6_sock = None;
|
||||
if let Some(v6_info) = reg.v6_info {
|
||||
if let Ok(v6_bytes) = v6_info.v6.try_into() {
|
||||
v6_sock = Some(V6Info {
|
||||
port: v6_info.port as u16,
|
||||
v6: v6_bytes,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let dst_mac = match reg.dst_mac.try_into() {
|
||||
Ok(m) => m,
|
||||
Err(_e) => NULL_MAC,
|
||||
};
|
||||
|
||||
let remote_nat_byte = reg.nat_type as u8;
|
||||
let remote_nat = match remote_nat_byte.try_into() {
|
||||
Ok(t) => t,
|
||||
Err(_) => NatType::NoNat,
|
||||
};
|
||||
|
||||
check_peer_registration_needed(
|
||||
edge,
|
||||
false,
|
||||
dst_mac,
|
||||
// &v6_sock,
|
||||
remote_nat,
|
||||
&v6_sock,
|
||||
&SdlanSock {
|
||||
family: AF_INET,
|
||||
port: reg.nat_port as u16,
|
||||
v4,
|
||||
v6: [0; 16],
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
other => {
|
||||
debug!("unhandled event {:?}", other);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub async fn send_stun_request(eee: &Node) {
|
||||
let sdl_v6_info = match *eee.ipv6.read().unwrap() {
|
||||
Some(ref l) => Some(Sdlv6Info {
|
||||
port: l.port as u32,
|
||||
v6: Vec::from(l.v6),
|
||||
}),
|
||||
None => None,
|
||||
};
|
||||
let req = SdlStunRequest {
|
||||
cookie: 0,
|
||||
client_id: eee.config.node_uuid.clone(),
|
||||
network_id: eee.network_id.load(Ordering::Relaxed),
|
||||
ip: eee.device_config.get_ip(),
|
||||
mac: Vec::from(eee.device_config.get_mac()),
|
||||
nat_type: eee.get_nat_type() as u32,
|
||||
v6_info: sdl_v6_info,
|
||||
};
|
||||
debug!("stun request: {:?}", req);
|
||||
let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap();
|
||||
if let Err(e) = send_to_sock(
|
||||
eee,
|
||||
&msg,
|
||||
&eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize],
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("failed to send to sock: {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
async fn on_disconnected_callback() {
|
||||
let edge = get_edge();
|
||||
edge.set_authorized(false, vec![]);
|
||||
}
|
||||
|
||||
async fn on_connected_callback<'a>(stream: &'a mut tokio::net::TcpStream, pkt_id: Option<u32>) {
|
||||
let edge = get_edge();
|
||||
// let installed_channel = install_channel.to_owned();
|
||||
|
||||
let token = edge._token.lock().unwrap().clone();
|
||||
let code = edge.network_code.lock().unwrap().clone();
|
||||
// let edge = get_edge();
|
||||
// let edge = get_edge();
|
||||
// let token = args.token.clone();
|
||||
if let Ok(ipaddr) = stream.local_addr() {
|
||||
match ipaddr.ip() {
|
||||
IpAddr::V4(v4) => {
|
||||
let ip = v4.into();
|
||||
// println!("outer ip is {} => {}", v4, ip);
|
||||
edge.outer_ip_v4.store(ip, Ordering::Relaxed);
|
||||
}
|
||||
_other => {}
|
||||
}
|
||||
}
|
||||
let register_super = SdlRegisterSuper {
|
||||
version: 1,
|
||||
installed_channel: edge.install_channel.clone(),
|
||||
// installed_channel,
|
||||
client_id: edge.config.node_uuid.clone(),
|
||||
dev_addr: Some(SdlDevAddr {
|
||||
mac: Vec::from(edge.device_config.get_mac()),
|
||||
net_addr: 0,
|
||||
network_id: 0,
|
||||
net_bit_len: 0,
|
||||
network_domain: "".to_owned(),
|
||||
}),
|
||||
pub_key: edge.rsa_pubkey.clone(),
|
||||
token,
|
||||
network_code: code,
|
||||
hostname: edge.hostname.read().unwrap().clone(),
|
||||
};
|
||||
// debug!("send register super: {:?}", register_super);
|
||||
let packet_id = match pkt_id {
|
||||
Some(id) => id,
|
||||
None => edge.get_next_packet_id(),
|
||||
};
|
||||
// let packet_id = edge.get_next_packet_id();
|
||||
let data = encode_to_tcp_message(
|
||||
Some(register_super),
|
||||
packet_id,
|
||||
PacketType::RegisterSuper as u8,
|
||||
)
|
||||
.unwrap();
|
||||
if let Err(e) = stream.write(&data).await {
|
||||
error!("failed to write to tcp: {}", e.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ReadWriteActor {
|
||||
// actor接收的发送给tcp的接收端,由handle存放发送端
|
||||
// to_tcp: Receiver<Vec<u8>>,
|
||||
@ -36,7 +350,7 @@ pub struct ReadWriteActor {
|
||||
pong_time: Arc<AtomicU64>,
|
||||
// actor收到数据之后,发送给上层的发送端口,接收端由handle保存
|
||||
from_tcp: Sender<SdlanTcp>,
|
||||
cancel: CancellationToken,
|
||||
_cancel: CancellationToken,
|
||||
connecting_chan: Option<Sender<ConnectionInfo>>,
|
||||
ipv6_network_restarter: Option<Sender<bool>>,
|
||||
}
|
||||
@ -53,7 +367,7 @@ impl ReadWriteActor {
|
||||
) -> Self {
|
||||
Self {
|
||||
// to_tcp,
|
||||
cancel,
|
||||
_cancel: cancel,
|
||||
pong_time,
|
||||
connected,
|
||||
remote: remote.to_owned(),
|
||||
@ -63,20 +377,12 @@ impl ReadWriteActor {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run<'a, T, T2, F>(
|
||||
pub async fn run<'a>(
|
||||
&self,
|
||||
keep_reconnect: bool,
|
||||
mut to_tcp: Receiver<Vec<u8>>,
|
||||
on_connected: T,
|
||||
on_disconnected: T2,
|
||||
mut start_stop_chan: Receiver<StartStopInfo>,
|
||||
// cancel: CancellationToken,
|
||||
) where
|
||||
T: for<'b> Fn(&'b mut TcpStream, Option<u32>) -> BoxFuture<'b, ()>,
|
||||
T2: Fn() -> F,
|
||||
F: Future<Output = ()>,
|
||||
{
|
||||
// let (tx, rx) = channel(20);
|
||||
) {
|
||||
let mut started = false;
|
||||
let mut start_pkt_id = None;
|
||||
loop {
|
||||
@ -126,13 +432,6 @@ impl ReadWriteActor {
|
||||
let Ok(mut stream) = TcpStream::connect(&self.remote).await else {
|
||||
self.connected.store(false, Ordering::Relaxed);
|
||||
if keep_reconnect {
|
||||
/*
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(3)) => {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
*/
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
continue;
|
||||
}
|
||||
@ -140,7 +439,7 @@ impl ReadWriteActor {
|
||||
};
|
||||
self.connected.store(true, Ordering::Relaxed);
|
||||
debug!("connected");
|
||||
on_connected(&mut stream, start_pkt_id.take()).await;
|
||||
on_connected_callback(&mut stream, start_pkt_id.take()).await;
|
||||
|
||||
if let Some(ref connecting_chan) = self.connecting_chan {
|
||||
let state = ConnectionInfo::ConnState(ConnectionState::Connected);
|
||||
@ -226,7 +525,7 @@ impl ReadWriteActor {
|
||||
_ = check_pong => {},
|
||||
_ = check_stop => {},
|
||||
}
|
||||
on_disconnected().await;
|
||||
on_disconnected_callback().await;
|
||||
debug!("connect retrying");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
debug!("disconnected");
|
||||
@ -243,10 +542,10 @@ pub struct ReadWriterHandle {
|
||||
}
|
||||
|
||||
impl ReadWriterHandle {
|
||||
pub async fn send(&self, data: &[u8]) -> Result<()> {
|
||||
pub async fn send(&self, data: Vec<u8>) -> Result<()> {
|
||||
if self.connected.load(Ordering::Relaxed) {
|
||||
// connected, send to it
|
||||
if let Err(e) = self.send_to_tcp.send(Vec::from(data)).await {
|
||||
if let Err(e) = self.send_to_tcp.send(data).await {
|
||||
error!("failed to send to send_to_tcp: {}", e.to_string());
|
||||
return Err(SDLanError::NormalError("failed to send"));
|
||||
};
|
||||
@ -258,12 +557,12 @@ impl ReadWriterHandle {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn new<'a, T, T3, T2, F, F2>(
|
||||
fn new<>(
|
||||
cancel: CancellationToken,
|
||||
addr: &str,
|
||||
on_connected: T,
|
||||
on_disconnected: T3,
|
||||
on_message: T2,
|
||||
// on_connected: OnConnectedCallback<'a>,
|
||||
// on_disconnected: T3,
|
||||
// on_message: T2,
|
||||
pong_time: Arc<AtomicU64>,
|
||||
start_stop_chan: Receiver<StartStopInfo>,
|
||||
// cancel: CancellationToken,
|
||||
@ -271,11 +570,10 @@ impl ReadWriterHandle {
|
||||
ipv6_network_restarter: Option<Sender<bool>>,
|
||||
) -> Self
|
||||
where
|
||||
T: for<'b> Fn(&'b mut TcpStream, Option<u32>) -> BoxFuture<'b, ()> + Send + 'static,
|
||||
T3: Fn() -> F2 + Send + 'static,
|
||||
T2: Fn(SdlanTcp) -> F + Send + 'static,
|
||||
F: Future<Output = ()> + Send,
|
||||
F2: Future<Output = ()> + Send,
|
||||
// T3: Fn() -> F2 + Send + 'static,
|
||||
// T2: Fn(SdlanTcp) -> F + Send + 'static,
|
||||
// F: Future<Output = ()> + Send,
|
||||
// F2: Future<Output = ()> + Send,
|
||||
{
|
||||
let (send_to_tcp, to_tcp) = channel(20);
|
||||
let (from_tcp, mut data_from_tcp) = channel(20);
|
||||
@ -292,13 +590,19 @@ impl ReadWriterHandle {
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
actor
|
||||
.run(true, to_tcp, on_connected, on_disconnected, start_stop_chan)
|
||||
.run(
|
||||
true,
|
||||
to_tcp,
|
||||
// on_connected,
|
||||
// on_disconnected,
|
||||
start_stop_chan
|
||||
)
|
||||
.await
|
||||
});
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
if let Some(msg) = data_from_tcp.recv().await {
|
||||
on_message(msg).await;
|
||||
handle_tcp_message(msg).await;
|
||||
} else {
|
||||
error!("data from tcp exited");
|
||||
// eprintln!("data from tcp exited");
|
||||
@ -315,30 +619,28 @@ impl ReadWriterHandle {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init_tcp_conn<'a, T, T3, T2, F, F2>(
|
||||
pub fn init_tcp_conn(
|
||||
cancel: CancellationToken,
|
||||
addr: &str,
|
||||
on_connected: T,
|
||||
on_disconnected: T3,
|
||||
on_message: T2,
|
||||
// on_connected: OnConnectedCallback<'a>,
|
||||
// on_disconnected: T3,
|
||||
// on_message: T2,
|
||||
pong_time: Arc<AtomicU64>,
|
||||
// cancel: CancellationToken,
|
||||
start_stop_chan: Receiver<StartStopInfo>,
|
||||
connecting_chan: Option<Sender<ConnectionInfo>>,
|
||||
ipv6_network_restarter: Option<Sender<bool>>,
|
||||
) where
|
||||
T: for<'b> Fn(&'b mut TcpStream, Option<u32>) -> BoxFuture<'b, ()> + Send + 'static,
|
||||
T3: Fn() -> F2 + Send + 'static,
|
||||
T2: Fn(SdlanTcp) -> F + Send + 'static,
|
||||
F: Future<Output = ()> + Send,
|
||||
F2: Future<Output = ()> + Send,
|
||||
)
|
||||
// T2: Fn(SdlanTcp) -> F + Send + 'static,
|
||||
// F: Future<Output = ()> + Send,
|
||||
{
|
||||
|
||||
let tcp_handle = ReadWriterHandle::new(
|
||||
cancel,
|
||||
addr,
|
||||
on_connected,
|
||||
on_disconnected,
|
||||
on_message,
|
||||
// on_connected,
|
||||
// on_disconnected,
|
||||
// on_message,
|
||||
pong_time,
|
||||
start_stop_chan,
|
||||
connecting_chan,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user