added should_exit

This commit is contained in:
asxalex 2024-07-20 12:23:10 +08:00
parent b7f91f25f3
commit 23c797920b
4 changed files with 143 additions and 55 deletions

View File

@ -4,8 +4,9 @@ use sdlan_rs::CommandLine;
use sdlan_rs::CommandLineInput;
use sdlan_sn_rs::log;
use structopt::StructOpt;
use std::process::exit;
use std::time::Duration;
use structopt::StructOpt;
#[tokio::main]
async fn main() {
@ -27,14 +28,32 @@ async fn main() {
tos: 0,
token: "".to_owned(),
allow_p2p: true,
}, tx,
},
tx,
&sdlan_rs::get_install_channel(),
).await;
)
.await;
let _ = rx.recv();
let edge = get_edge();
let _ = edge.start_without_feedback(cmd.token).await;
// let res = edge.start_without_feedback(cmd.token).await;
let Ok(res) = edge
.start_with_feedback(cmd.token, Duration::from_secs(3))
.await
else {
println!("failed to start1");
exit(0);
};
if res.result != 0 {
println!("failed to start: {}", res.message);
if res.should_exit {
exit(0);
}
// edge.stop().await;
// exit(0);
}
/*
tokio::time::sleep(Duration::from_secs(20)).await;

View File

@ -1,6 +1,6 @@
use std::net::IpAddr;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::net::IpAddr;
use crate::config::TCP_PING_TIME;
use crate::network::{get_edge, ping_to_sn, read_and_parse_packet, RegisterSuperFeedback};
@ -12,18 +12,16 @@ use crate::tcp::{init_tcp_conn, EventType, NakMsgCode, PacketType, SdlanTcp};
use crate::utils::{send_to_sock, CommandLine};
use etherparse::IpHeaders;
use sdlan_sn_rs::config::{AF_INET, SDLAN_DEFAULT_TTL};
use sdlan_sn_rs::packet::Register;
use sdlan_sn_rs::peer::SdlanSock;
use sdlan_sn_rs::utils::{
aes_encrypt, get_current_timestamp,
ip_to_string, is_multi_broadcast, rsa_decrypt,
};
use sdlan_sn_rs::utils::Result;
use sdlan_sn_rs::utils::{
aes_encrypt, get_current_timestamp, ip_to_string, is_multi_broadcast, rsa_decrypt,
};
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_util::sync::CancellationToken;
use super::{check_peer_registration_needed, packet, Node, StartStopInfo };
use super::{check_peer_registration_needed, packet, Node, StartStopInfo};
use crate::utils::Socket;
use prost::Message;
@ -38,10 +36,14 @@ async fn handle_tcp_message(msg: SdlanTcp) {
debug!("got tcp message: {:?}", msg.packet_type);
match msg.packet_type {
PacketType::RegisterSuperACK => {
edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback {
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 0,
message: "".to_owned(),
});
should_exit: false,
},
);
let Ok(ack) = SdlRegisterSuperAck::decode(&msg.current_packet[..]) else {
error!("failed to decode REGISTER_SUPER_ACK");
return;
@ -79,40 +81,60 @@ async fn handle_tcp_message(msg: SdlanTcp) {
PacketType::RegisterSuperNAK => {
let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else {
error!("failed to decode REGISTER_SUPER_NAK");
edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback {
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 1,
message: "failed to decode REGISTER SUPER NAK".to_owned(),
});
should_exit: false,
},
);
return;
};
let Ok(error_code) = NakMsgCode::try_from(_nak.error_code as u8) else {
edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback {
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 2,
message: "error_code not recognized".to_owned(),
});
should_exit: false,
},
);
return;
};
match error_code {
NakMsgCode::InvalidToken => {
edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback {
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 3,
message: "invalid token".to_owned(),
});
should_exit: true,
},
);
edge.stop().await;
}
NakMsgCode::NodeDisabled => {
edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback {
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 4,
message: "Node is disabled".to_owned(),
});
should_exit: true,
},
);
edge.stop().await;
}
_other => {
edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback {
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 0,
message: "".to_owned(),
});
should_exit: false,
},
);
}
}
/*
@ -224,7 +246,9 @@ pub async fn async_main(
let edge = get_edge();
// let token = args.token.clone();
let cancel_tcp = cancel.clone();
init_tcp_conn(
cancel_tcp,
&args.tcp,
move |stream, pkt_id| {
let installed_channel = install_channel.to_owned();

View File

@ -93,6 +93,7 @@ pub struct RegisterSuperFeedback {
// 0 for success, other for error
pub result: u8,
pub message: String,
pub should_exit: bool,
}
pub struct StartStopInfo {
@ -174,41 +175,57 @@ impl Node {
pub async fn start_without_feedback(&self, token: String) -> Result<()> {
*self._token.lock().unwrap() = token;
let _ = self.start_stop_sender.send(StartStopInfo{
let _ = self
.start_stop_sender
.send(StartStopInfo {
is_start: true,
pkt_id: None,
}).await;
})
.await;
Ok(())
}
pub async fn start_with_feedback(&self, token: String, timeout: Duration) -> Result<RegisterSuperFeedback> {
pub async fn start_with_feedback(
&self,
token: String,
timeout: Duration,
) -> Result<RegisterSuperFeedback> {
*self._token.lock().unwrap() = token;
let (tx, mut rx) = oneshot::channel();
let (tx, rx) = oneshot::channel();
let id = self.get_next_packet_id();
self.packet_id_match.insert(id, tx);
let _ = self.start_stop_sender.send(StartStopInfo{
let _ = self
.start_stop_sender
.send(StartStopInfo {
is_start: true,
pkt_id: Some(id),
}).await;
})
.await;
tokio::select! {
Ok(result) = rx => {
rx_info = rx => {
if let Ok(result) = rx_info {
self.packet_id_match.remove(&id);
Ok(result)
} else {
Err(SDLanError::NormalError("rx closed"))
}
}
_ = tokio::time::sleep(timeout) => {
Err(SDLanError::NormalError("timed out"))
}
}
}
pub async fn stop(&self) {
*self._token.lock().unwrap() = "".to_owned();
let _ = self.start_stop_sender.send(StartStopInfo{
let _ = self
.start_stop_sender
.send(StartStopInfo {
is_start: false,
pkt_id: None,
}).await;
})
.await;
}
pub fn new(

View File

@ -8,6 +8,7 @@ use std::{
time::Duration,
};
use tokio::io::BufReader;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use futures_util::{future::BoxFuture, pin_mut};
@ -34,10 +35,12 @@ pub struct ReadWriteActor {
pong_time: Arc<AtomicU64>,
// actor收到数据之后发送给上层的发送端口,接收端由handle保存
from_tcp: Sender<SdlanTcp>,
cancel: CancellationToken,
}
impl ReadWriteActor {
pub fn new(
cancel: CancellationToken,
remote: &str,
from_tcp: Sender<SdlanTcp>,
connected: Arc<AtomicBool>,
@ -45,6 +48,7 @@ impl ReadWriteActor {
) -> Self {
Self {
// to_tcp,
cancel,
pong_time,
connected,
remote: remote.to_owned(),
@ -72,7 +76,22 @@ impl ReadWriteActor {
self.connected.store(false, Ordering::Relaxed);
if !started {
// println!("waiting for start");
loop {
let start_or_stop = start_stop_chan.recv().await;
if let Some(m) = start_or_stop {
if m.is_start {
started = true;
start_pkt_id = m.pkt_id;
break;
}
} else {
// None, just return
return;
}
}
/*
while let Some(m) = start_stop_chan.recv().await {
println!("4");
if m.is_start {
// println!("start received");
started = true;
@ -82,6 +101,9 @@ impl ReadWriteActor {
// println!("stop received");
}
}
*/
debug!("start stop chan recv none");
continue;
}
debug!("try connecting...");
let Ok(mut stream) = TcpStream::connect(&self.remote).await else {
@ -157,11 +179,13 @@ impl ReadWriteActor {
match start_stop_chan.recv().await {
Some(v) => {
if !v.is_start {
started = false;
return;
}
}
other => {
// send chan is closed;
started = false;
return;
}
}
@ -179,6 +203,7 @@ impl ReadWriteActor {
on_disconnected().await;
debug!("connect retrying");
tokio::time::sleep(Duration::from_secs(1)).await;
debug!("disconnected");
// future::select(read_from_tcp, write_to_tcp).await;
}
}
@ -208,6 +233,7 @@ impl ReadWriterHandle {
}
fn new<'a, T, T3, T2, F, F2>(
cancel: CancellationToken,
addr: &str,
on_connected: T,
on_disconnected: T3,
@ -227,7 +253,7 @@ impl ReadWriterHandle {
let (from_tcp, mut data_from_tcp) = channel(20);
let connected = Arc::new(AtomicBool::new(false));
let actor = ReadWriteActor::new(addr, from_tcp, connected.clone(), pong_time);
let actor = ReadWriteActor::new(cancel, addr, from_tcp, connected.clone(), pong_time);
tokio::spawn(async move {
actor
.run(true, to_tcp, on_connected, on_disconnected, start_stop_chan)
@ -253,6 +279,7 @@ impl ReadWriterHandle {
}
pub fn init_tcp_conn<'a, T, T3, T2, F, F2>(
cancel: CancellationToken,
addr: &str,
on_connected: T,
on_disconnected: T3,
@ -268,6 +295,7 @@ pub fn init_tcp_conn<'a, T, T3, T2, F, F2>(
F2: Future<Output = ()> + Send,
{
let tcp_handle = ReadWriterHandle::new(
cancel,
addr,
on_connected,
on_disconnected,