multicast

This commit is contained in:
asxalex 2024-07-15 23:04:43 +08:00
parent b1a469aec7
commit 5b32c51936
2 changed files with 26 additions and 5 deletions

View File

@ -14,7 +14,8 @@ use etherparse::IpHeaders;
use sdlan_sn_rs::config::{AF_INET, SDLAN_DEFAULT_TTL};
use sdlan_sn_rs::peer::SdlanSock;
use sdlan_sn_rs::utils::{
self, aes_encrypt, create_or_load_uuid, get_current_timestamp, get_sdlan_sock_from_socketaddr, ip_to_string, is_multi_broadcast, rsa_decrypt
self, aes_encrypt, create_or_load_uuid, get_current_timestamp, get_sdlan_sock_from_socketaddr,
ip_to_string, is_multi_broadcast, rsa_decrypt,
};
use sdlan_sn_rs::utils::{Result, SDLanError};
use tokio::io::AsyncWriteExt;
@ -75,7 +76,10 @@ async fn handle_tcp_message(msg: SdlanTcp) {
error!("failed to decode REGISTER_SUPER_NAK");
return;
};
error!("RegisterSuperNAK received: {}({})", _nak.error_code, _nak.error_message);
error!(
"RegisterSuperNAK received: {}({})",
_nak.error_code, _nak.error_message
);
edge.set_authorized(false, Vec::new());
// std::process::exit(0);
}
@ -195,9 +199,7 @@ pub async fn async_main(
// println!("outer ip is {} => {}", v4, ip);
edge.outer_ip_v4.store(ip, Ordering::Relaxed);
}
_other => {
}
_other => {}
}
}
let register_super = SdlRegisterSuper {
@ -481,6 +483,7 @@ async fn edge_send_packet_to_net(eee: &Node, data: &[u8]) {
error!("failed to encode to udp message");
return;
};
println!("send packet to net");
send_packet_to_net(eee, dstip, &flow, msg_size).await;
}
Err(e) => {
@ -490,6 +493,7 @@ async fn edge_send_packet_to_net(eee: &Node, data: &[u8]) {
}
async fn send_packet_to_net(eee: &Node, dst_ip: u32, pkt: &[u8], size: u64) {
println!("find peer destination");
let (dest_sock, is_p2p) = find_peer_destination(eee, dst_ip).await;
if is_p2p {
eee.stats.tx_p2p.fetch_add(size, Ordering::Relaxed);
@ -507,12 +511,14 @@ async fn send_packet_to_net(eee: &Node, dst_ip: u32, pkt: &[u8], size: u64) {
async fn find_peer_destination(eee: &Node, dst_ip: u32) -> (SdlanSock, bool) {
if is_multi_broadcast(dst_ip) {
println!("find peer destination: is multi_broadcast");
return (
eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize]
.deepcopy(),
false,
);
}
println!("find peer destination 1");
let mut is_p2p = false;
let result: SdlanSock;
if let Some(dst) = eee.known_peers.get_peer(&dst_ip) {
@ -533,6 +539,7 @@ async fn find_peer_destination(eee: &Node, dst_ip: u32) -> (SdlanSock, bool) {
[eee.config.super_node_index.load(Ordering::Relaxed) as usize]
.deepcopy();
}
println!("find peer_destination: {}", is_p2p);
if !is_p2p {
debug!("check_query_peer_info");
super::packet::check_query_peer_info(eee, dst_ip).await;

View File

@ -249,6 +249,7 @@ pub async fn handle_packet_peer_info(
sock.to_string()
);
send_register(eee, &sock, &None).await;
register_with_local_peers(eee).await;
}
None => {
debug!("Rx PEERINFO unknown peer: {}", ip_to_string(&pi.dst_ip));
@ -456,6 +457,7 @@ pub async fn check_peer_registration_needed(
peer_sock: &SdlanSock,
) {
let mut p = eee.known_peers.get_peer(&src_ip);
println!("check peer registration needed");
if let None = p {
p = eee.known_peers.get_peer_by_sock(peer_sock);
if let Some(ref k) = p {
@ -469,6 +471,7 @@ pub async fn check_peer_registration_needed(
// unimplemented!();
}
Some(k) => {
println!("peer is known");
let now = get_current_timestamp();
if !from_sn {
k.last_p2p.store(now, Ordering::Relaxed);
@ -525,6 +528,7 @@ async fn register_with_new_peer(
let now = get_current_timestamp();
let mut scan = eee.pending_peers.get_peer(&ip);
if let None = scan {
println!("pending peers not found");
// such ip not found in pending
let temp = Arc::new(EdgePeer::new(
ip,
@ -542,7 +546,13 @@ async fn register_with_new_peer(
scan = Some(temp);
debug!("Pending size: {}", eee.pending_peers.peers.len());
println!(
"Pending size: {}, from_sn: {}",
eee.pending_peers.peers.len(),
from_sn
);
if from_sn {
println!("from_sn");
// should send register to peer
if eee.config.register_ttl == 1 {
/* We are DMZ host or port is directly accessible. Just let peer to send back the ack */
@ -573,12 +583,14 @@ async fn register_with_new_peer(
)
.await;
} else {
println!("not from sn");
// P2P register, send directly
send_register(eee, peersock, v6_info).await;
}
println!("register with local peers");
register_with_local_peers(eee).await;
} else {
println!("pending peers is found");
if let Some(ref s) = scan {
*(s.sock.write().unwrap()) = peersock.deepcopy();
}
@ -772,7 +784,9 @@ pub async fn check_query_peer_info(eee: &Node, dst_ip: u32) {
now,
));
debug!("insert peer {} to pending", ip_to_string(&dst_ip));
println!("insert peer {} to pending", ip_to_string(&dst_ip));
eee.pending_peers.insert_peer(peer.clone());
register_with_local_peers(eee).await;
scan = peer;
}
Some(s) => {