Compare commits
No commits in common. "c5b04eb843111dccc26f77b8fda6901c19f309bd" and "6ef685420c741d1ed02db33784292d1edc89d64e" have entirely different histories.
c5b04eb843
...
6ef685420c
2
.vscode/settings.json
vendored
2
.vscode/settings.json
vendored
@ -1,4 +1,4 @@
|
|||||||
{
|
{
|
||||||
// "rust-analyzer.cargo.target": "x86_64-pc-windows-gnu"
|
"rust-analyzer.cargo.target": "x86_64-pc-windows-gnu"
|
||||||
// "rust-analyzer.cargo.features": ["tun"]
|
// "rust-analyzer.cargo.features": ["tun"]
|
||||||
}
|
}
|
||||||
@ -78,7 +78,6 @@ pub async fn run_sdlan(
|
|||||||
sock,
|
sock,
|
||||||
hostname,
|
hostname,
|
||||||
server_ip,
|
server_ip,
|
||||||
install_channel.to_owned(),
|
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
@ -88,10 +87,10 @@ pub async fn run_sdlan(
|
|||||||
debug!("edge inited");
|
debug!("edge inited");
|
||||||
|
|
||||||
let cancel = CancellationToken::new();
|
let cancel = CancellationToken::new();
|
||||||
// let install_chan = install_channel.to_owned();
|
let install_chan = install_channel.to_owned();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) =
|
if let Err(e) =
|
||||||
async_main(args, start_stop_chan, cancel, connecting_chan).await
|
async_main(install_chan, args, start_stop_chan, cancel, connecting_chan).await
|
||||||
{
|
{
|
||||||
error!("failed to run async main: {}", e.as_str());
|
error!("failed to run async main: {}", e.as_str());
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,34 +1,289 @@
|
|||||||
|
use std::net::IpAddr;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use crate::config::{TCP_PING_TIME};
|
use crate::config::{NULL_MAC, TCP_PING_TIME};
|
||||||
use crate::network::ipv6::run_ipv6;
|
use crate::network::ipv6::run_ipv6;
|
||||||
use crate::network::{
|
use crate::network::{
|
||||||
get_edge, ping_to_sn, read_and_parse_packet, TunTapPacketHandler,
|
get_edge, ping_to_sn, read_and_parse_packet, RegisterSuperFeedback, TunTapPacketHandler,
|
||||||
};
|
};
|
||||||
use crate::tcp::{init_tcp_conn, send_stun_request};
|
use crate::pb::{
|
||||||
|
encode_to_tcp_message, encode_to_udp_message, SdlDevAddr, SdlRegisterSuper,
|
||||||
|
SdlRegisterSuperAck, SdlRegisterSuperNak, SdlSendRegisterEvent, SdlStunRequest, Sdlv6Info,
|
||||||
|
};
|
||||||
|
use crate::tcp::{init_tcp_conn, EventType, NakMsgCode, NatType, PacketType, SdlanTcp};
|
||||||
use crate::utils::{send_to_sock, CommandLine};
|
use crate::utils::{send_to_sock, CommandLine};
|
||||||
use crate::{ConnectionInfo};
|
use crate::{ConnectionInfo, ConnectionState};
|
||||||
use sdlan_sn_rs::peer::{SdlanSock};
|
use sdlan_sn_rs::config::AF_INET;
|
||||||
use sdlan_sn_rs::utils::{get_current_timestamp, is_multi_broadcast};
|
use sdlan_sn_rs::peer::{SdlanSock, V6Info};
|
||||||
|
use sdlan_sn_rs::utils::{get_current_timestamp, ip_to_string, is_multi_broadcast, rsa_decrypt};
|
||||||
use sdlan_sn_rs::utils::{Mac, Result};
|
use sdlan_sn_rs::utils::{Mac, Result};
|
||||||
use tokio::net::{UdpSocket};
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use tokio::net::UdpSocket;
|
||||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
use super::{Node, StartStopInfo};
|
use super::{check_peer_registration_needed, packet, Node, StartStopInfo};
|
||||||
use crate::utils::Socket;
|
use crate::utils::Socket;
|
||||||
|
|
||||||
|
use prost::Message;
|
||||||
use tracing::{debug, error};
|
use tracing::{debug, error};
|
||||||
|
|
||||||
|
async fn handle_tcp_message(msg: SdlanTcp) {
|
||||||
|
let edge = get_edge();
|
||||||
|
|
||||||
|
// let now = get_current_timestamp();
|
||||||
|
// edge.tcp_pong.store(now, Ordering::Relaxed);
|
||||||
|
|
||||||
|
debug!("got tcp message: {:?}", msg.packet_type);
|
||||||
|
match msg.packet_type {
|
||||||
|
PacketType::RegisterSuperACK => {
|
||||||
|
edge.send_register_super_feedback(
|
||||||
|
msg._packet_id,
|
||||||
|
RegisterSuperFeedback {
|
||||||
|
result: 0,
|
||||||
|
message: "".to_owned(),
|
||||||
|
should_exit: false,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
let Ok(ack) = SdlRegisterSuperAck::decode(&msg.current_packet[..]) else {
|
||||||
|
error!("failed to decode REGISTER_SUPER_ACK");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
debug!("got register super ack: {:?}", ack);
|
||||||
|
let Ok(aes) = rsa_decrypt(&edge.rsa_private, &ack.aes_key) else {
|
||||||
|
error!("failed to rsa decrypt aes key");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let Some(dev) = ack.dev_addr else {
|
||||||
|
error!("no dev_addr is specified");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let ip = ip_to_string(&dev.net_addr);
|
||||||
|
// debug!("aes key is {:?}, ip is {}/{}", aes, ip, dev.net_bit_len,);
|
||||||
|
println!("assigned ip: {}", ip);
|
||||||
|
let hostname = edge.hostname.read().unwrap().clone();
|
||||||
|
println!("network is: {}.{}", hostname, dev.network_domain);
|
||||||
|
edge.device_config
|
||||||
|
.ip
|
||||||
|
.net_addr
|
||||||
|
.store(dev.net_addr, Ordering::Relaxed);
|
||||||
|
if let Some(ref chan) = edge.connection_chan {
|
||||||
|
let _ = chan.send(ConnectionInfo::IPInfo(ip)).await;
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
let mac = match dev.mac.try_into() {
|
||||||
|
Err(_) => NULL_MAC,
|
||||||
|
Ok(m) => m,
|
||||||
|
};
|
||||||
|
*/
|
||||||
|
// *edge.device_config.mac.write().unwrap() = mac;
|
||||||
|
edge.device_config
|
||||||
|
.ip
|
||||||
|
.net_bit_len
|
||||||
|
.store(dev.net_bit_len as u8, Ordering::Relaxed);
|
||||||
|
edge.device.reload_config(&edge.device_config, &dev.network_domain);
|
||||||
|
edge.network_id.store(dev.network_id, Ordering::Relaxed);
|
||||||
|
|
||||||
|
edge.set_authorized(true, aes);
|
||||||
|
send_stun_request(edge).await;
|
||||||
|
tokio::spawn(async {
|
||||||
|
let nattype = edge.probe_nat_type().await;
|
||||||
|
debug!("nat type is {:?}", nattype);
|
||||||
|
// println!("nat type is: {:?}", nattype);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
PacketType::RegisterSuperNAK => {
|
||||||
|
let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else {
|
||||||
|
error!("failed to decode REGISTER_SUPER_NAK");
|
||||||
|
edge.send_register_super_feedback(
|
||||||
|
msg._packet_id,
|
||||||
|
RegisterSuperFeedback {
|
||||||
|
result: 1,
|
||||||
|
message: "failed to decode REGISTER SUPER NAK".to_owned(),
|
||||||
|
should_exit: false,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let Ok(error_code) = NakMsgCode::try_from(_nak.error_code as u8) else {
|
||||||
|
edge.send_register_super_feedback(
|
||||||
|
msg._packet_id,
|
||||||
|
RegisterSuperFeedback {
|
||||||
|
result: 2,
|
||||||
|
message: "error_code not recognized".to_owned(),
|
||||||
|
should_exit: false,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
match error_code {
|
||||||
|
NakMsgCode::InvalidToken => {
|
||||||
|
edge.send_register_super_feedback(
|
||||||
|
msg._packet_id,
|
||||||
|
RegisterSuperFeedback {
|
||||||
|
result: 3,
|
||||||
|
message: "invalid token".to_owned(),
|
||||||
|
should_exit: true,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
edge.stop().await;
|
||||||
|
}
|
||||||
|
NakMsgCode::NodeDisabled => {
|
||||||
|
edge.send_register_super_feedback(
|
||||||
|
msg._packet_id,
|
||||||
|
RegisterSuperFeedback {
|
||||||
|
result: 4,
|
||||||
|
message: "Node is disabled".to_owned(),
|
||||||
|
should_exit: true,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
edge.stop().await;
|
||||||
|
}
|
||||||
|
_other => {
|
||||||
|
edge.send_register_super_feedback(
|
||||||
|
msg._packet_id,
|
||||||
|
RegisterSuperFeedback {
|
||||||
|
result: 0,
|
||||||
|
message: "".to_owned(),
|
||||||
|
should_exit: false,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback {
|
||||||
|
result: 1,
|
||||||
|
message: "failed to decode REGISTER SUPER NAK".to_owned(),
|
||||||
|
});
|
||||||
|
*/
|
||||||
|
edge.set_authorized(false, Vec::new());
|
||||||
|
// std::process::exit(0);
|
||||||
|
}
|
||||||
|
PacketType::Command => {
|
||||||
|
if msg.current_packet.len() < 1 {
|
||||||
|
error!("malformed COMMAND received");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
handle_tcp_command(edge, msg.current_packet[0], &msg.current_packet[1..]).await;
|
||||||
|
}
|
||||||
|
PacketType::Event => {
|
||||||
|
if msg.current_packet.len() < 1 {
|
||||||
|
error!("malformed EVENT received");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let Ok(event) = msg.current_packet[0].try_into() else {
|
||||||
|
error!("failed to parse event type");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
handle_tcp_event(edge, event, &msg.current_packet[1..]).await;
|
||||||
|
}
|
||||||
|
PacketType::PeerInfo => {
|
||||||
|
let _ = packet::handle_packet_peer_info(edge, &msg.current_packet[..]).await;
|
||||||
|
}
|
||||||
|
PacketType::Pong => {
|
||||||
|
debug!("tcp pong received");
|
||||||
|
let now = get_current_timestamp();
|
||||||
|
edge.tcp_pong.store(now, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
other => {
|
||||||
|
debug!("tcp not handling {:?}", other);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_tcp_command(_edge: &Node, _cmdtype: u8, _cmdprotobuf: &[u8]) {}
|
||||||
|
|
||||||
|
async fn handle_tcp_event(edge: &'static Node, eventtype: EventType, eventprotobuf: &[u8]) {
|
||||||
|
match eventtype {
|
||||||
|
EventType::SendRegister => {
|
||||||
|
let Ok(reg) = SdlSendRegisterEvent::decode(eventprotobuf) else {
|
||||||
|
error!("failed to decode SendRegister Event");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let v4 = reg.nat_ip.to_be_bytes();
|
||||||
|
let mut v6_sock = None;
|
||||||
|
if let Some(v6_info) = reg.v6_info {
|
||||||
|
if let Ok(v6_bytes) = v6_info.v6.try_into() {
|
||||||
|
v6_sock = Some(V6Info {
|
||||||
|
port: v6_info.port as u16,
|
||||||
|
v6: v6_bytes,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let dst_mac = match reg.dst_mac.try_into() {
|
||||||
|
Ok(m) => m,
|
||||||
|
Err(_e) => NULL_MAC,
|
||||||
|
};
|
||||||
|
|
||||||
|
let remote_nat_byte = reg.nat_type as u8;
|
||||||
|
let remote_nat = match remote_nat_byte.try_into() {
|
||||||
|
Ok(t) => t,
|
||||||
|
Err(_) => NatType::NoNat,
|
||||||
|
};
|
||||||
|
|
||||||
|
check_peer_registration_needed(
|
||||||
|
edge,
|
||||||
|
false,
|
||||||
|
dst_mac,
|
||||||
|
// &v6_sock,
|
||||||
|
remote_nat,
|
||||||
|
&v6_sock,
|
||||||
|
&SdlanSock {
|
||||||
|
family: AF_INET,
|
||||||
|
port: reg.nat_port as u16,
|
||||||
|
v4,
|
||||||
|
v6: [0; 16],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
other => {
|
||||||
|
debug!("unhandled event {:?}", other);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn async_main(
|
pub async fn async_main(
|
||||||
|
install_channel: String,
|
||||||
args: CommandLine,
|
args: CommandLine,
|
||||||
start_stop_chan: Receiver<StartStopInfo>,
|
start_stop_chan: Receiver<StartStopInfo>,
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
connecting_chan: Option<Sender<ConnectionInfo>>,
|
connecting_chan: Option<Sender<ConnectionInfo>>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// let _ = PidRecorder::new(".pid");
|
// let _ = PidRecorder::new(".pid");
|
||||||
|
|
||||||
|
// // gen public key
|
||||||
|
// gen_rsa_keys(".client");
|
||||||
|
// let mut pubkey = String::new();
|
||||||
|
// File::open(".client/id_rsa.pub")?.read_to_string(&mut pubkey)?;
|
||||||
|
// let privatekey = load_private_key_file(".client/id_rsa")?;
|
||||||
|
|
||||||
|
// // init sock
|
||||||
|
// if args.token.len() == 0 {
|
||||||
|
// println!("failed to load token");
|
||||||
|
// return Ok(());
|
||||||
|
// }
|
||||||
|
// let sock_v4 = Socket::build(0, true, true, args.tos).await?;
|
||||||
|
|
||||||
|
// // allow multicast
|
||||||
|
|
||||||
|
// // TODO: set the sn's tcp socket
|
||||||
|
// // let tcpsock = TCPSocket::build("121.4.79.234:1234").await?;
|
||||||
|
// let tcp_pong = Arc::new(AtomicU64::new(0));
|
||||||
|
// let edge = Node::new(
|
||||||
|
// pubkey,
|
||||||
|
// node_conf,
|
||||||
|
// sock_v4,
|
||||||
|
// &args.token,
|
||||||
|
// privatekey,
|
||||||
|
// tcp_pong.clone(),
|
||||||
|
// );
|
||||||
|
|
||||||
let edge = get_edge();
|
let edge = get_edge();
|
||||||
|
|
||||||
// let token = args.token.clone();
|
// let token = args.token.clone();
|
||||||
@ -38,7 +293,61 @@ pub async fn async_main(
|
|||||||
init_tcp_conn(
|
init_tcp_conn(
|
||||||
cancel_tcp,
|
cancel_tcp,
|
||||||
&args.tcp,
|
&args.tcp,
|
||||||
// |msg| handle_tcp_message(msg),
|
move |stream, pkt_id| {
|
||||||
|
let installed_channel = install_channel.to_owned();
|
||||||
|
Box::pin(async move {
|
||||||
|
let token = edge._token.lock().unwrap().clone();
|
||||||
|
let code = edge.network_code.lock().unwrap().clone();
|
||||||
|
// let edge = get_edge();
|
||||||
|
// let edge = get_edge();
|
||||||
|
// let token = args.token.clone();
|
||||||
|
if let Ok(ipaddr) = stream.local_addr() {
|
||||||
|
match ipaddr.ip() {
|
||||||
|
IpAddr::V4(v4) => {
|
||||||
|
let ip = v4.into();
|
||||||
|
// println!("outer ip is {} => {}", v4, ip);
|
||||||
|
edge.outer_ip_v4.store(ip, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
_other => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let register_super = SdlRegisterSuper {
|
||||||
|
version: 1,
|
||||||
|
installed_channel,
|
||||||
|
client_id: edge.config.node_uuid.clone(),
|
||||||
|
dev_addr: Some(SdlDevAddr {
|
||||||
|
mac: Vec::from(edge.device_config.get_mac()),
|
||||||
|
net_addr: 0,
|
||||||
|
network_id: 0,
|
||||||
|
net_bit_len: 0,
|
||||||
|
network_domain: "".to_owned(),
|
||||||
|
}),
|
||||||
|
pub_key: edge.rsa_pubkey.clone(),
|
||||||
|
token,
|
||||||
|
network_code: code,
|
||||||
|
hostname: edge.hostname.read().unwrap().clone(),
|
||||||
|
};
|
||||||
|
// debug!("send register super: {:?}", register_super);
|
||||||
|
let packet_id = match pkt_id {
|
||||||
|
Some(id) => id,
|
||||||
|
None => edge.get_next_packet_id(),
|
||||||
|
};
|
||||||
|
// let packet_id = edge.get_next_packet_id();
|
||||||
|
let data = encode_to_tcp_message(
|
||||||
|
Some(register_super),
|
||||||
|
packet_id,
|
||||||
|
PacketType::RegisterSuper as u8,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
if let Err(e) = stream.write(&data).await {
|
||||||
|
error!("failed to write to tcp: {}", e.to_string());
|
||||||
|
}
|
||||||
|
})
|
||||||
|
},
|
||||||
|
|| async {
|
||||||
|
edge.set_authorized(false, vec![]);
|
||||||
|
},
|
||||||
|
|msg| handle_tcp_message(msg),
|
||||||
edge.tcp_pong.clone(),
|
edge.tcp_pong.clone(),
|
||||||
// tcp_pong,
|
// tcp_pong,
|
||||||
start_stop_chan,
|
start_stop_chan,
|
||||||
@ -139,6 +448,36 @@ async fn run_edge_loop(eee: &'static Node, cancel: CancellationToken) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn send_stun_request(eee: &Node) {
|
||||||
|
let sdl_v6_info = match *eee.ipv6.read().unwrap() {
|
||||||
|
Some(ref l) => Some(Sdlv6Info {
|
||||||
|
port: l.port as u32,
|
||||||
|
v6: Vec::from(l.v6),
|
||||||
|
}),
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
let req = SdlStunRequest {
|
||||||
|
cookie: 0,
|
||||||
|
client_id: eee.config.node_uuid.clone(),
|
||||||
|
network_id: eee.network_id.load(Ordering::Relaxed),
|
||||||
|
ip: eee.device_config.get_ip(),
|
||||||
|
mac: Vec::from(eee.device_config.get_mac()),
|
||||||
|
nat_type: eee.get_nat_type() as u32,
|
||||||
|
v6_info: sdl_v6_info,
|
||||||
|
};
|
||||||
|
debug!("stun request: {:?}", req);
|
||||||
|
let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap();
|
||||||
|
if let Err(e) = send_to_sock(
|
||||||
|
eee,
|
||||||
|
&msg,
|
||||||
|
&eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize],
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
error!("failed to send to sock: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn loop_socket_v6(eee: &'static Node, socket: Arc<Socket>, cancel: CancellationToken) {
|
pub async fn loop_socket_v6(eee: &'static Node, socket: Arc<Socket>, cancel: CancellationToken) {
|
||||||
debug!("loop sock v6");
|
debug!("loop sock v6");
|
||||||
loop {
|
loop {
|
||||||
|
|||||||
@ -42,7 +42,6 @@ pub async fn init_edge(
|
|||||||
udpsock_for_dns: Arc<UdpSocket>,
|
udpsock_for_dns: Arc<UdpSocket>,
|
||||||
hostname: String,
|
hostname: String,
|
||||||
server_ip: String,
|
server_ip: String,
|
||||||
install_channel: String
|
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// gen public key
|
// gen public key
|
||||||
let rsa_path = format!("{}/.client", get_base_dir());
|
let rsa_path = format!("{}/.client", get_base_dir());
|
||||||
@ -89,7 +88,6 @@ pub async fn init_edge(
|
|||||||
hostname,
|
hostname,
|
||||||
udpsock_for_dns,
|
udpsock_for_dns,
|
||||||
server_ip,
|
server_ip,
|
||||||
install_channel,
|
|
||||||
);
|
);
|
||||||
do_init_edge(edge)?;
|
do_init_edge(edge)?;
|
||||||
|
|
||||||
@ -177,8 +175,6 @@ pub struct Node {
|
|||||||
// last register super time, in unix
|
// last register super time, in unix
|
||||||
pub _last_register_req: AtomicU64,
|
pub _last_register_req: AtomicU64,
|
||||||
|
|
||||||
pub install_channel: String,
|
|
||||||
|
|
||||||
nat_type: Mutex<NatType>,
|
nat_type: Mutex<NatType>,
|
||||||
|
|
||||||
nat_cookie: AtomicU32,
|
nat_cookie: AtomicU32,
|
||||||
@ -291,7 +287,6 @@ impl Node {
|
|||||||
hostname: String,
|
hostname: String,
|
||||||
udpsock_for_dns: Arc<UdpSocket>,
|
udpsock_for_dns: Arc<UdpSocket>,
|
||||||
server_ip: String,
|
server_ip: String,
|
||||||
install_channel: String,
|
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let mode = if cfg!(not(feature = "tun")) {
|
let mode = if cfg!(not(feature = "tun")) {
|
||||||
Mode::Tap
|
Mode::Tap
|
||||||
@ -352,7 +347,6 @@ impl Node {
|
|||||||
nat_cookie: AtomicU32::new(1),
|
nat_cookie: AtomicU32::new(1),
|
||||||
cookie_match: DashMap::new(),
|
cookie_match: DashMap::new(),
|
||||||
server_ip,
|
server_ip,
|
||||||
install_channel,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -501,7 +495,7 @@ impl Node {
|
|||||||
encode_to_tcp_message::<SdlEmpty>(None, 0, PacketType::UnRegisterSuper as u8).unwrap();
|
encode_to_tcp_message::<SdlEmpty>(None, 0, PacketType::UnRegisterSuper as u8).unwrap();
|
||||||
|
|
||||||
let conn = get_tcp_conn();
|
let conn = get_tcp_conn();
|
||||||
let _ = conn.send(content).await;
|
let _ = conn.send(&content).await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1143,7 +1143,7 @@ async fn send_query_peer(eee: &Node, dst_mac: Mac) -> Result<()> {
|
|||||||
return Err(SDLanError::NormalError("encode query error"));
|
return Err(SDLanError::NormalError("encode query error"));
|
||||||
};
|
};
|
||||||
let tcp_conn = get_tcp_conn();
|
let tcp_conn = get_tcp_conn();
|
||||||
tcp_conn.send(content).await
|
tcp_conn.send(&content).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn ping_to_sn() {
|
pub async fn ping_to_sn() {
|
||||||
@ -1153,7 +1153,7 @@ pub async fn ping_to_sn() {
|
|||||||
};
|
};
|
||||||
debug!("ping to sn");
|
debug!("ping to sn");
|
||||||
let tcp_conn = get_tcp_conn();
|
let tcp_conn = get_tcp_conn();
|
||||||
if let Err(e) = tcp_conn.send(msg).await {
|
if let Err(e) = tcp_conn.send(&msg).await {
|
||||||
error!("failed to ping to sn: {:?}", e);
|
error!("failed to ping to sn: {:?}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,10 +1,6 @@
|
|||||||
use myactor::{ActorError, SupervisedActor};
|
|
||||||
use once_cell::sync::OnceCell;
|
use once_cell::sync::OnceCell;
|
||||||
use prost::Message;
|
use sdlan_sn_rs::utils::{get_current_timestamp, Result, SDLanError};
|
||||||
use sdlan_sn_rs::config::AF_INET;
|
use std::future::Future;
|
||||||
use sdlan_sn_rs::peer::{SdlanSock, V6Info};
|
|
||||||
use sdlan_sn_rs::utils::{Result, SDLanError, get_current_timestamp, ip_to_string, rsa_decrypt};
|
|
||||||
use std::net::IpAddr;
|
|
||||||
use std::sync::atomic::AtomicU64;
|
use std::sync::atomic::AtomicU64;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{
|
use std::{
|
||||||
@ -23,325 +19,15 @@ use tokio::{
|
|||||||
};
|
};
|
||||||
use tracing::error;
|
use tracing::error;
|
||||||
|
|
||||||
use crate::config::{NULL_MAC, TCP_PING_TIME};
|
use crate::config::TCP_PING_TIME;
|
||||||
use crate::network::{Node, RegisterSuperFeedback, StartStopInfo, check_peer_registration_needed, handle_packet_peer_info};
|
use crate::network::StartStopInfo;
|
||||||
use crate::pb::{SdlDevAddr, SdlRegisterSuper, SdlRegisterSuperAck, SdlRegisterSuperNak, SdlSendRegisterEvent, SdlStunRequest, Sdlv6Info, encode_to_tcp_message, encode_to_udp_message};
|
use crate::tcp::read_a_packet;
|
||||||
use crate::tcp::{EventType, NakMsgCode, NatType, PacketType, read_a_packet};
|
use crate::{ConnectionInfo, ConnectionState};
|
||||||
use crate::utils::send_to_sock;
|
|
||||||
use crate::{ConnectionInfo, ConnectionState, get_edge};
|
|
||||||
|
|
||||||
use super::tcp_codec::SdlanTcp;
|
use super::tcp_codec::SdlanTcp;
|
||||||
|
|
||||||
static GLOBAL_TCP_HANDLE: OnceCell<ReadWriterHandle> = OnceCell::new();
|
static GLOBAL_TCP_HANDLE: OnceCell<ReadWriterHandle> = OnceCell::new();
|
||||||
|
|
||||||
|
|
||||||
async fn handle_tcp_message(msg: SdlanTcp) {
|
|
||||||
let edge = get_edge();
|
|
||||||
|
|
||||||
// let now = get_current_timestamp();
|
|
||||||
// edge.tcp_pong.store(now, Ordering::Relaxed);
|
|
||||||
|
|
||||||
debug!("got tcp message: {:?}", msg.packet_type);
|
|
||||||
match msg.packet_type {
|
|
||||||
PacketType::RegisterSuperACK => {
|
|
||||||
edge.send_register_super_feedback(
|
|
||||||
msg._packet_id,
|
|
||||||
RegisterSuperFeedback {
|
|
||||||
result: 0,
|
|
||||||
message: "".to_owned(),
|
|
||||||
should_exit: false,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
let Ok(ack) = SdlRegisterSuperAck::decode(&msg.current_packet[..]) else {
|
|
||||||
error!("failed to decode REGISTER_SUPER_ACK");
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
debug!("got register super ack: {:?}", ack);
|
|
||||||
let Ok(aes) = rsa_decrypt(&edge.rsa_private, &ack.aes_key) else {
|
|
||||||
error!("failed to rsa decrypt aes key");
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
let Some(dev) = ack.dev_addr else {
|
|
||||||
error!("no dev_addr is specified");
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
let ip = ip_to_string(&dev.net_addr);
|
|
||||||
// debug!("aes key is {:?}, ip is {}/{}", aes, ip, dev.net_bit_len,);
|
|
||||||
println!("assigned ip: {}", ip);
|
|
||||||
let hostname = edge.hostname.read().unwrap().clone();
|
|
||||||
println!("network is: {}.{}", hostname, dev.network_domain);
|
|
||||||
edge.device_config
|
|
||||||
.ip
|
|
||||||
.net_addr
|
|
||||||
.store(dev.net_addr, Ordering::Relaxed);
|
|
||||||
if let Some(ref chan) = edge.connection_chan {
|
|
||||||
let _ = chan.send(ConnectionInfo::IPInfo(ip)).await;
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
let mac = match dev.mac.try_into() {
|
|
||||||
Err(_) => NULL_MAC,
|
|
||||||
Ok(m) => m,
|
|
||||||
};
|
|
||||||
*/
|
|
||||||
// *edge.device_config.mac.write().unwrap() = mac;
|
|
||||||
edge.device_config
|
|
||||||
.ip
|
|
||||||
.net_bit_len
|
|
||||||
.store(dev.net_bit_len as u8, Ordering::Relaxed);
|
|
||||||
edge.device.reload_config(&edge.device_config, &dev.network_domain);
|
|
||||||
edge.network_id.store(dev.network_id, Ordering::Relaxed);
|
|
||||||
|
|
||||||
edge.set_authorized(true, aes);
|
|
||||||
send_stun_request(edge).await;
|
|
||||||
tokio::spawn(async {
|
|
||||||
let nattype = edge.probe_nat_type().await;
|
|
||||||
debug!("nat type is {:?}", nattype);
|
|
||||||
// println!("nat type is: {:?}", nattype);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
PacketType::RegisterSuperNAK => {
|
|
||||||
let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else {
|
|
||||||
error!("failed to decode REGISTER_SUPER_NAK");
|
|
||||||
edge.send_register_super_feedback(
|
|
||||||
msg._packet_id,
|
|
||||||
RegisterSuperFeedback {
|
|
||||||
result: 1,
|
|
||||||
message: "failed to decode REGISTER SUPER NAK".to_owned(),
|
|
||||||
should_exit: false,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
let Ok(error_code) = NakMsgCode::try_from(_nak.error_code as u8) else {
|
|
||||||
edge.send_register_super_feedback(
|
|
||||||
msg._packet_id,
|
|
||||||
RegisterSuperFeedback {
|
|
||||||
result: 2,
|
|
||||||
message: "error_code not recognized".to_owned(),
|
|
||||||
should_exit: false,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
match error_code {
|
|
||||||
NakMsgCode::InvalidToken => {
|
|
||||||
edge.send_register_super_feedback(
|
|
||||||
msg._packet_id,
|
|
||||||
RegisterSuperFeedback {
|
|
||||||
result: 3,
|
|
||||||
message: "invalid token".to_owned(),
|
|
||||||
should_exit: true,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
edge.stop().await;
|
|
||||||
}
|
|
||||||
NakMsgCode::NodeDisabled => {
|
|
||||||
edge.send_register_super_feedback(
|
|
||||||
msg._packet_id,
|
|
||||||
RegisterSuperFeedback {
|
|
||||||
result: 4,
|
|
||||||
message: "Node is disabled".to_owned(),
|
|
||||||
should_exit: true,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
edge.stop().await;
|
|
||||||
}
|
|
||||||
_other => {
|
|
||||||
edge.send_register_super_feedback(
|
|
||||||
msg._packet_id,
|
|
||||||
RegisterSuperFeedback {
|
|
||||||
result: 0,
|
|
||||||
message: "".to_owned(),
|
|
||||||
should_exit: false,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback {
|
|
||||||
result: 1,
|
|
||||||
message: "failed to decode REGISTER SUPER NAK".to_owned(),
|
|
||||||
});
|
|
||||||
*/
|
|
||||||
edge.set_authorized(false, Vec::new());
|
|
||||||
// std::process::exit(0);
|
|
||||||
}
|
|
||||||
PacketType::Command => {
|
|
||||||
if msg.current_packet.len() < 1 {
|
|
||||||
error!("malformed COMMAND received");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
handle_tcp_command(edge, msg.current_packet[0], &msg.current_packet[1..]).await;
|
|
||||||
}
|
|
||||||
PacketType::Event => {
|
|
||||||
if msg.current_packet.len() < 1 {
|
|
||||||
error!("malformed EVENT received");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let Ok(event) = msg.current_packet[0].try_into() else {
|
|
||||||
error!("failed to parse event type");
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
handle_tcp_event(edge, event, &msg.current_packet[1..]).await;
|
|
||||||
}
|
|
||||||
PacketType::PeerInfo => {
|
|
||||||
let _ = handle_packet_peer_info(edge, &msg.current_packet[..]).await;
|
|
||||||
}
|
|
||||||
PacketType::Pong => {
|
|
||||||
debug!("tcp pong received");
|
|
||||||
let now = get_current_timestamp();
|
|
||||||
edge.tcp_pong.store(now, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
other => {
|
|
||||||
debug!("tcp not handling {:?}", other);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_tcp_command(_edge: &Node, _cmdtype: u8, _cmdprotobuf: &[u8]) {}
|
|
||||||
|
|
||||||
async fn handle_tcp_event(edge: &'static Node, eventtype: EventType, eventprotobuf: &[u8]) {
|
|
||||||
match eventtype {
|
|
||||||
EventType::SendRegister => {
|
|
||||||
let Ok(reg) = SdlSendRegisterEvent::decode(eventprotobuf) else {
|
|
||||||
error!("failed to decode SendRegister Event");
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
let v4 = reg.nat_ip.to_be_bytes();
|
|
||||||
let mut v6_sock = None;
|
|
||||||
if let Some(v6_info) = reg.v6_info {
|
|
||||||
if let Ok(v6_bytes) = v6_info.v6.try_into() {
|
|
||||||
v6_sock = Some(V6Info {
|
|
||||||
port: v6_info.port as u16,
|
|
||||||
v6: v6_bytes,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let dst_mac = match reg.dst_mac.try_into() {
|
|
||||||
Ok(m) => m,
|
|
||||||
Err(_e) => NULL_MAC,
|
|
||||||
};
|
|
||||||
|
|
||||||
let remote_nat_byte = reg.nat_type as u8;
|
|
||||||
let remote_nat = match remote_nat_byte.try_into() {
|
|
||||||
Ok(t) => t,
|
|
||||||
Err(_) => NatType::NoNat,
|
|
||||||
};
|
|
||||||
|
|
||||||
check_peer_registration_needed(
|
|
||||||
edge,
|
|
||||||
false,
|
|
||||||
dst_mac,
|
|
||||||
// &v6_sock,
|
|
||||||
remote_nat,
|
|
||||||
&v6_sock,
|
|
||||||
&SdlanSock {
|
|
||||||
family: AF_INET,
|
|
||||||
port: reg.nat_port as u16,
|
|
||||||
v4,
|
|
||||||
v6: [0; 16],
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
other => {
|
|
||||||
debug!("unhandled event {:?}", other);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
pub async fn send_stun_request(eee: &Node) {
|
|
||||||
let sdl_v6_info = match *eee.ipv6.read().unwrap() {
|
|
||||||
Some(ref l) => Some(Sdlv6Info {
|
|
||||||
port: l.port as u32,
|
|
||||||
v6: Vec::from(l.v6),
|
|
||||||
}),
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
let req = SdlStunRequest {
|
|
||||||
cookie: 0,
|
|
||||||
client_id: eee.config.node_uuid.clone(),
|
|
||||||
network_id: eee.network_id.load(Ordering::Relaxed),
|
|
||||||
ip: eee.device_config.get_ip(),
|
|
||||||
mac: Vec::from(eee.device_config.get_mac()),
|
|
||||||
nat_type: eee.get_nat_type() as u32,
|
|
||||||
v6_info: sdl_v6_info,
|
|
||||||
};
|
|
||||||
debug!("stun request: {:?}", req);
|
|
||||||
let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap();
|
|
||||||
if let Err(e) = send_to_sock(
|
|
||||||
eee,
|
|
||||||
&msg,
|
|
||||||
&eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize],
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
error!("failed to send to sock: {:?}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn on_disconnected_callback() {
|
|
||||||
let edge = get_edge();
|
|
||||||
edge.set_authorized(false, vec![]);
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn on_connected_callback<'a>(stream: &'a mut tokio::net::TcpStream, pkt_id: Option<u32>) {
|
|
||||||
let edge = get_edge();
|
|
||||||
// let installed_channel = install_channel.to_owned();
|
|
||||||
|
|
||||||
let token = edge._token.lock().unwrap().clone();
|
|
||||||
let code = edge.network_code.lock().unwrap().clone();
|
|
||||||
// let edge = get_edge();
|
|
||||||
// let edge = get_edge();
|
|
||||||
// let token = args.token.clone();
|
|
||||||
if let Ok(ipaddr) = stream.local_addr() {
|
|
||||||
match ipaddr.ip() {
|
|
||||||
IpAddr::V4(v4) => {
|
|
||||||
let ip = v4.into();
|
|
||||||
// println!("outer ip is {} => {}", v4, ip);
|
|
||||||
edge.outer_ip_v4.store(ip, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
_other => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let register_super = SdlRegisterSuper {
|
|
||||||
version: 1,
|
|
||||||
installed_channel: edge.install_channel.clone(),
|
|
||||||
// installed_channel,
|
|
||||||
client_id: edge.config.node_uuid.clone(),
|
|
||||||
dev_addr: Some(SdlDevAddr {
|
|
||||||
mac: Vec::from(edge.device_config.get_mac()),
|
|
||||||
net_addr: 0,
|
|
||||||
network_id: 0,
|
|
||||||
net_bit_len: 0,
|
|
||||||
network_domain: "".to_owned(),
|
|
||||||
}),
|
|
||||||
pub_key: edge.rsa_pubkey.clone(),
|
|
||||||
token,
|
|
||||||
network_code: code,
|
|
||||||
hostname: edge.hostname.read().unwrap().clone(),
|
|
||||||
};
|
|
||||||
// debug!("send register super: {:?}", register_super);
|
|
||||||
let packet_id = match pkt_id {
|
|
||||||
Some(id) => id,
|
|
||||||
None => edge.get_next_packet_id(),
|
|
||||||
};
|
|
||||||
// let packet_id = edge.get_next_packet_id();
|
|
||||||
let data = encode_to_tcp_message(
|
|
||||||
Some(register_super),
|
|
||||||
packet_id,
|
|
||||||
PacketType::RegisterSuper as u8,
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
if let Err(e) = stream.write(&data).await {
|
|
||||||
error!("failed to write to tcp: {}", e.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ReadWriteActor {
|
pub struct ReadWriteActor {
|
||||||
// actor接收的发送给tcp的接收端,由handle存放发送端
|
// actor接收的发送给tcp的接收端,由handle存放发送端
|
||||||
// to_tcp: Receiver<Vec<u8>>,
|
// to_tcp: Receiver<Vec<u8>>,
|
||||||
@ -350,7 +36,7 @@ pub struct ReadWriteActor {
|
|||||||
pong_time: Arc<AtomicU64>,
|
pong_time: Arc<AtomicU64>,
|
||||||
// actor收到数据之后,发送给上层的发送端口,接收端由handle保存
|
// actor收到数据之后,发送给上层的发送端口,接收端由handle保存
|
||||||
from_tcp: Sender<SdlanTcp>,
|
from_tcp: Sender<SdlanTcp>,
|
||||||
_cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
connecting_chan: Option<Sender<ConnectionInfo>>,
|
connecting_chan: Option<Sender<ConnectionInfo>>,
|
||||||
ipv6_network_restarter: Option<Sender<bool>>,
|
ipv6_network_restarter: Option<Sender<bool>>,
|
||||||
}
|
}
|
||||||
@ -367,7 +53,7 @@ impl ReadWriteActor {
|
|||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
// to_tcp,
|
// to_tcp,
|
||||||
_cancel: cancel,
|
cancel,
|
||||||
pong_time,
|
pong_time,
|
||||||
connected,
|
connected,
|
||||||
remote: remote.to_owned(),
|
remote: remote.to_owned(),
|
||||||
@ -377,12 +63,20 @@ impl ReadWriteActor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run<'a>(
|
pub async fn run<'a, T, T2, F>(
|
||||||
&self,
|
&self,
|
||||||
keep_reconnect: bool,
|
keep_reconnect: bool,
|
||||||
mut to_tcp: Receiver<Vec<u8>>,
|
mut to_tcp: Receiver<Vec<u8>>,
|
||||||
|
on_connected: T,
|
||||||
|
on_disconnected: T2,
|
||||||
mut start_stop_chan: Receiver<StartStopInfo>,
|
mut start_stop_chan: Receiver<StartStopInfo>,
|
||||||
) {
|
// cancel: CancellationToken,
|
||||||
|
) where
|
||||||
|
T: for<'b> Fn(&'b mut TcpStream, Option<u32>) -> BoxFuture<'b, ()>,
|
||||||
|
T2: Fn() -> F,
|
||||||
|
F: Future<Output = ()>,
|
||||||
|
{
|
||||||
|
// let (tx, rx) = channel(20);
|
||||||
let mut started = false;
|
let mut started = false;
|
||||||
let mut start_pkt_id = None;
|
let mut start_pkt_id = None;
|
||||||
loop {
|
loop {
|
||||||
@ -432,6 +126,13 @@ impl ReadWriteActor {
|
|||||||
let Ok(mut stream) = TcpStream::connect(&self.remote).await else {
|
let Ok(mut stream) = TcpStream::connect(&self.remote).await else {
|
||||||
self.connected.store(false, Ordering::Relaxed);
|
self.connected.store(false, Ordering::Relaxed);
|
||||||
if keep_reconnect {
|
if keep_reconnect {
|
||||||
|
/*
|
||||||
|
tokio::select! {
|
||||||
|
_ = tokio::time::sleep(Duration::from_secs(3)) => {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
||||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -439,7 +140,7 @@ impl ReadWriteActor {
|
|||||||
};
|
};
|
||||||
self.connected.store(true, Ordering::Relaxed);
|
self.connected.store(true, Ordering::Relaxed);
|
||||||
debug!("connected");
|
debug!("connected");
|
||||||
on_connected_callback(&mut stream, start_pkt_id.take()).await;
|
on_connected(&mut stream, start_pkt_id.take()).await;
|
||||||
|
|
||||||
if let Some(ref connecting_chan) = self.connecting_chan {
|
if let Some(ref connecting_chan) = self.connecting_chan {
|
||||||
let state = ConnectionInfo::ConnState(ConnectionState::Connected);
|
let state = ConnectionInfo::ConnState(ConnectionState::Connected);
|
||||||
@ -525,7 +226,7 @@ impl ReadWriteActor {
|
|||||||
_ = check_pong => {},
|
_ = check_pong => {},
|
||||||
_ = check_stop => {},
|
_ = check_stop => {},
|
||||||
}
|
}
|
||||||
on_disconnected_callback().await;
|
on_disconnected().await;
|
||||||
debug!("connect retrying");
|
debug!("connect retrying");
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
debug!("disconnected");
|
debug!("disconnected");
|
||||||
@ -542,10 +243,10 @@ pub struct ReadWriterHandle {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl ReadWriterHandle {
|
impl ReadWriterHandle {
|
||||||
pub async fn send(&self, data: Vec<u8>) -> Result<()> {
|
pub async fn send(&self, data: &[u8]) -> Result<()> {
|
||||||
if self.connected.load(Ordering::Relaxed) {
|
if self.connected.load(Ordering::Relaxed) {
|
||||||
// connected, send to it
|
// connected, send to it
|
||||||
if let Err(e) = self.send_to_tcp.send(data).await {
|
if let Err(e) = self.send_to_tcp.send(Vec::from(data)).await {
|
||||||
error!("failed to send to send_to_tcp: {}", e.to_string());
|
error!("failed to send to send_to_tcp: {}", e.to_string());
|
||||||
return Err(SDLanError::NormalError("failed to send"));
|
return Err(SDLanError::NormalError("failed to send"));
|
||||||
};
|
};
|
||||||
@ -557,12 +258,12 @@ impl ReadWriterHandle {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new<>(
|
fn new<'a, T, T3, T2, F, F2>(
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
addr: &str,
|
addr: &str,
|
||||||
// on_connected: OnConnectedCallback<'a>,
|
on_connected: T,
|
||||||
// on_disconnected: T3,
|
on_disconnected: T3,
|
||||||
// on_message: T2,
|
on_message: T2,
|
||||||
pong_time: Arc<AtomicU64>,
|
pong_time: Arc<AtomicU64>,
|
||||||
start_stop_chan: Receiver<StartStopInfo>,
|
start_stop_chan: Receiver<StartStopInfo>,
|
||||||
// cancel: CancellationToken,
|
// cancel: CancellationToken,
|
||||||
@ -570,10 +271,11 @@ impl ReadWriterHandle {
|
|||||||
ipv6_network_restarter: Option<Sender<bool>>,
|
ipv6_network_restarter: Option<Sender<bool>>,
|
||||||
) -> Self
|
) -> Self
|
||||||
where
|
where
|
||||||
// T3: Fn() -> F2 + Send + 'static,
|
T: for<'b> Fn(&'b mut TcpStream, Option<u32>) -> BoxFuture<'b, ()> + Send + 'static,
|
||||||
// T2: Fn(SdlanTcp) -> F + Send + 'static,
|
T3: Fn() -> F2 + Send + 'static,
|
||||||
// F: Future<Output = ()> + Send,
|
T2: Fn(SdlanTcp) -> F + Send + 'static,
|
||||||
// F2: Future<Output = ()> + Send,
|
F: Future<Output = ()> + Send,
|
||||||
|
F2: Future<Output = ()> + Send,
|
||||||
{
|
{
|
||||||
let (send_to_tcp, to_tcp) = channel(20);
|
let (send_to_tcp, to_tcp) = channel(20);
|
||||||
let (from_tcp, mut data_from_tcp) = channel(20);
|
let (from_tcp, mut data_from_tcp) = channel(20);
|
||||||
@ -590,19 +292,13 @@ impl ReadWriterHandle {
|
|||||||
);
|
);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
actor
|
actor
|
||||||
.run(
|
.run(true, to_tcp, on_connected, on_disconnected, start_stop_chan)
|
||||||
true,
|
|
||||||
to_tcp,
|
|
||||||
// on_connected,
|
|
||||||
// on_disconnected,
|
|
||||||
start_stop_chan
|
|
||||||
)
|
|
||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
if let Some(msg) = data_from_tcp.recv().await {
|
if let Some(msg) = data_from_tcp.recv().await {
|
||||||
handle_tcp_message(msg).await;
|
on_message(msg).await;
|
||||||
} else {
|
} else {
|
||||||
error!("data from tcp exited");
|
error!("data from tcp exited");
|
||||||
// eprintln!("data from tcp exited");
|
// eprintln!("data from tcp exited");
|
||||||
@ -619,28 +315,30 @@ impl ReadWriterHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn init_tcp_conn(
|
pub fn init_tcp_conn<'a, T, T3, T2, F, F2>(
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
addr: &str,
|
addr: &str,
|
||||||
// on_connected: OnConnectedCallback<'a>,
|
on_connected: T,
|
||||||
// on_disconnected: T3,
|
on_disconnected: T3,
|
||||||
// on_message: T2,
|
on_message: T2,
|
||||||
pong_time: Arc<AtomicU64>,
|
pong_time: Arc<AtomicU64>,
|
||||||
// cancel: CancellationToken,
|
// cancel: CancellationToken,
|
||||||
start_stop_chan: Receiver<StartStopInfo>,
|
start_stop_chan: Receiver<StartStopInfo>,
|
||||||
connecting_chan: Option<Sender<ConnectionInfo>>,
|
connecting_chan: Option<Sender<ConnectionInfo>>,
|
||||||
ipv6_network_restarter: Option<Sender<bool>>,
|
ipv6_network_restarter: Option<Sender<bool>>,
|
||||||
)
|
) where
|
||||||
// T2: Fn(SdlanTcp) -> F + Send + 'static,
|
T: for<'b> Fn(&'b mut TcpStream, Option<u32>) -> BoxFuture<'b, ()> + Send + 'static,
|
||||||
// F: Future<Output = ()> + Send,
|
T3: Fn() -> F2 + Send + 'static,
|
||||||
|
T2: Fn(SdlanTcp) -> F + Send + 'static,
|
||||||
|
F: Future<Output = ()> + Send,
|
||||||
|
F2: Future<Output = ()> + Send,
|
||||||
{
|
{
|
||||||
|
|
||||||
let tcp_handle = ReadWriterHandle::new(
|
let tcp_handle = ReadWriterHandle::new(
|
||||||
cancel,
|
cancel,
|
||||||
addr,
|
addr,
|
||||||
// on_connected,
|
on_connected,
|
||||||
// on_disconnected,
|
on_disconnected,
|
||||||
// on_message,
|
on_message,
|
||||||
pong_time,
|
pong_time,
|
||||||
start_stop_chan,
|
start_stop_chan,
|
||||||
connecting_chan,
|
connecting_chan,
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user