fix on_connected_callback, on_disconnected_callback, and on_message callback

This commit is contained in:
alex 2026-01-16 15:53:00 +08:00
parent 6ef685420c
commit 214e73a0b9
4 changed files with 373 additions and 387 deletions

View File

@ -78,6 +78,7 @@ pub async fn run_sdlan(
sock,
hostname,
server_ip,
install_channel.to_owned(),
)
.await
{
@ -87,10 +88,10 @@ pub async fn run_sdlan(
debug!("edge inited");
let cancel = CancellationToken::new();
let install_chan = install_channel.to_owned();
// let install_chan = install_channel.to_owned();
tokio::spawn(async move {
if let Err(e) =
async_main(install_chan, args, start_stop_chan, cancel, connecting_chan).await
async_main(args, start_stop_chan, cancel, connecting_chan).await
{
error!("failed to run async main: {}", e.as_str());
}

View File

@ -1,289 +1,34 @@
use std::net::IpAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use crate::config::{NULL_MAC, TCP_PING_TIME};
use crate::config::{TCP_PING_TIME};
use crate::network::ipv6::run_ipv6;
use crate::network::{
get_edge, ping_to_sn, read_and_parse_packet, RegisterSuperFeedback, TunTapPacketHandler,
get_edge, ping_to_sn, read_and_parse_packet, TunTapPacketHandler,
};
use crate::pb::{
encode_to_tcp_message, encode_to_udp_message, SdlDevAddr, SdlRegisterSuper,
SdlRegisterSuperAck, SdlRegisterSuperNak, SdlSendRegisterEvent, SdlStunRequest, Sdlv6Info,
};
use crate::tcp::{init_tcp_conn, EventType, NakMsgCode, NatType, PacketType, SdlanTcp};
use crate::tcp::{init_tcp_conn, send_stun_request};
use crate::utils::{send_to_sock, CommandLine};
use crate::{ConnectionInfo, ConnectionState};
use sdlan_sn_rs::config::AF_INET;
use sdlan_sn_rs::peer::{SdlanSock, V6Info};
use sdlan_sn_rs::utils::{get_current_timestamp, ip_to_string, is_multi_broadcast, rsa_decrypt};
use crate::{ConnectionInfo};
use sdlan_sn_rs::peer::{SdlanSock};
use sdlan_sn_rs::utils::{get_current_timestamp, is_multi_broadcast};
use sdlan_sn_rs::utils::{Mac, Result};
use tokio::io::AsyncWriteExt;
use tokio::net::UdpSocket;
use tokio::net::{UdpSocket};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_util::sync::CancellationToken;
use super::{check_peer_registration_needed, packet, Node, StartStopInfo};
use super::{Node, StartStopInfo};
use crate::utils::Socket;
use prost::Message;
use tracing::{debug, error};
async fn handle_tcp_message(msg: SdlanTcp) {
let edge = get_edge();
// let now = get_current_timestamp();
// edge.tcp_pong.store(now, Ordering::Relaxed);
debug!("got tcp message: {:?}", msg.packet_type);
match msg.packet_type {
PacketType::RegisterSuperACK => {
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 0,
message: "".to_owned(),
should_exit: false,
},
);
let Ok(ack) = SdlRegisterSuperAck::decode(&msg.current_packet[..]) else {
error!("failed to decode REGISTER_SUPER_ACK");
return;
};
debug!("got register super ack: {:?}", ack);
let Ok(aes) = rsa_decrypt(&edge.rsa_private, &ack.aes_key) else {
error!("failed to rsa decrypt aes key");
return;
};
let Some(dev) = ack.dev_addr else {
error!("no dev_addr is specified");
return;
};
let ip = ip_to_string(&dev.net_addr);
// debug!("aes key is {:?}, ip is {}/{}", aes, ip, dev.net_bit_len,);
println!("assigned ip: {}", ip);
let hostname = edge.hostname.read().unwrap().clone();
println!("network is: {}.{}", hostname, dev.network_domain);
edge.device_config
.ip
.net_addr
.store(dev.net_addr, Ordering::Relaxed);
if let Some(ref chan) = edge.connection_chan {
let _ = chan.send(ConnectionInfo::IPInfo(ip)).await;
}
/*
let mac = match dev.mac.try_into() {
Err(_) => NULL_MAC,
Ok(m) => m,
};
*/
// *edge.device_config.mac.write().unwrap() = mac;
edge.device_config
.ip
.net_bit_len
.store(dev.net_bit_len as u8, Ordering::Relaxed);
edge.device.reload_config(&edge.device_config, &dev.network_domain);
edge.network_id.store(dev.network_id, Ordering::Relaxed);
edge.set_authorized(true, aes);
send_stun_request(edge).await;
tokio::spawn(async {
let nattype = edge.probe_nat_type().await;
debug!("nat type is {:?}", nattype);
// println!("nat type is: {:?}", nattype);
});
}
PacketType::RegisterSuperNAK => {
let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else {
error!("failed to decode REGISTER_SUPER_NAK");
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 1,
message: "failed to decode REGISTER SUPER NAK".to_owned(),
should_exit: false,
},
);
return;
};
let Ok(error_code) = NakMsgCode::try_from(_nak.error_code as u8) else {
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 2,
message: "error_code not recognized".to_owned(),
should_exit: false,
},
);
return;
};
match error_code {
NakMsgCode::InvalidToken => {
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 3,
message: "invalid token".to_owned(),
should_exit: true,
},
);
edge.stop().await;
}
NakMsgCode::NodeDisabled => {
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 4,
message: "Node is disabled".to_owned(),
should_exit: true,
},
);
edge.stop().await;
}
_other => {
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 0,
message: "".to_owned(),
should_exit: false,
},
);
}
}
/*
edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback {
result: 1,
message: "failed to decode REGISTER SUPER NAK".to_owned(),
});
*/
edge.set_authorized(false, Vec::new());
// std::process::exit(0);
}
PacketType::Command => {
if msg.current_packet.len() < 1 {
error!("malformed COMMAND received");
return;
}
handle_tcp_command(edge, msg.current_packet[0], &msg.current_packet[1..]).await;
}
PacketType::Event => {
if msg.current_packet.len() < 1 {
error!("malformed EVENT received");
return;
}
let Ok(event) = msg.current_packet[0].try_into() else {
error!("failed to parse event type");
return;
};
handle_tcp_event(edge, event, &msg.current_packet[1..]).await;
}
PacketType::PeerInfo => {
let _ = packet::handle_packet_peer_info(edge, &msg.current_packet[..]).await;
}
PacketType::Pong => {
debug!("tcp pong received");
let now = get_current_timestamp();
edge.tcp_pong.store(now, Ordering::Relaxed);
}
other => {
debug!("tcp not handling {:?}", other);
}
}
}
async fn handle_tcp_command(_edge: &Node, _cmdtype: u8, _cmdprotobuf: &[u8]) {}
async fn handle_tcp_event(edge: &'static Node, eventtype: EventType, eventprotobuf: &[u8]) {
match eventtype {
EventType::SendRegister => {
let Ok(reg) = SdlSendRegisterEvent::decode(eventprotobuf) else {
error!("failed to decode SendRegister Event");
return;
};
let v4 = reg.nat_ip.to_be_bytes();
let mut v6_sock = None;
if let Some(v6_info) = reg.v6_info {
if let Ok(v6_bytes) = v6_info.v6.try_into() {
v6_sock = Some(V6Info {
port: v6_info.port as u16,
v6: v6_bytes,
});
}
}
let dst_mac = match reg.dst_mac.try_into() {
Ok(m) => m,
Err(_e) => NULL_MAC,
};
let remote_nat_byte = reg.nat_type as u8;
let remote_nat = match remote_nat_byte.try_into() {
Ok(t) => t,
Err(_) => NatType::NoNat,
};
check_peer_registration_needed(
edge,
false,
dst_mac,
// &v6_sock,
remote_nat,
&v6_sock,
&SdlanSock {
family: AF_INET,
port: reg.nat_port as u16,
v4,
v6: [0; 16],
},
)
.await;
}
other => {
debug!("unhandled event {:?}", other);
}
}
}
pub async fn async_main(
install_channel: String,
args: CommandLine,
start_stop_chan: Receiver<StartStopInfo>,
cancel: CancellationToken,
connecting_chan: Option<Sender<ConnectionInfo>>,
) -> Result<()> {
// let _ = PidRecorder::new(".pid");
// // gen public key
// gen_rsa_keys(".client");
// let mut pubkey = String::new();
// File::open(".client/id_rsa.pub")?.read_to_string(&mut pubkey)?;
// let privatekey = load_private_key_file(".client/id_rsa")?;
// // init sock
// if args.token.len() == 0 {
// println!("failed to load token");
// return Ok(());
// }
// let sock_v4 = Socket::build(0, true, true, args.tos).await?;
// // allow multicast
// // TODO: set the sn's tcp socket
// // let tcpsock = TCPSocket::build("121.4.79.234:1234").await?;
// let tcp_pong = Arc::new(AtomicU64::new(0));
// let edge = Node::new(
// pubkey,
// node_conf,
// sock_v4,
// &args.token,
// privatekey,
// tcp_pong.clone(),
// );
let edge = get_edge();
// let token = args.token.clone();
@ -293,61 +38,7 @@ pub async fn async_main(
init_tcp_conn(
cancel_tcp,
&args.tcp,
move |stream, pkt_id| {
let installed_channel = install_channel.to_owned();
Box::pin(async move {
let token = edge._token.lock().unwrap().clone();
let code = edge.network_code.lock().unwrap().clone();
// let edge = get_edge();
// let edge = get_edge();
// let token = args.token.clone();
if let Ok(ipaddr) = stream.local_addr() {
match ipaddr.ip() {
IpAddr::V4(v4) => {
let ip = v4.into();
// println!("outer ip is {} => {}", v4, ip);
edge.outer_ip_v4.store(ip, Ordering::Relaxed);
}
_other => {}
}
}
let register_super = SdlRegisterSuper {
version: 1,
installed_channel,
client_id: edge.config.node_uuid.clone(),
dev_addr: Some(SdlDevAddr {
mac: Vec::from(edge.device_config.get_mac()),
net_addr: 0,
network_id: 0,
net_bit_len: 0,
network_domain: "".to_owned(),
}),
pub_key: edge.rsa_pubkey.clone(),
token,
network_code: code,
hostname: edge.hostname.read().unwrap().clone(),
};
// debug!("send register super: {:?}", register_super);
let packet_id = match pkt_id {
Some(id) => id,
None => edge.get_next_packet_id(),
};
// let packet_id = edge.get_next_packet_id();
let data = encode_to_tcp_message(
Some(register_super),
packet_id,
PacketType::RegisterSuper as u8,
)
.unwrap();
if let Err(e) = stream.write(&data).await {
error!("failed to write to tcp: {}", e.to_string());
}
})
},
|| async {
edge.set_authorized(false, vec![]);
},
|msg| handle_tcp_message(msg),
// |msg| handle_tcp_message(msg),
edge.tcp_pong.clone(),
// tcp_pong,
start_stop_chan,
@ -448,36 +139,6 @@ async fn run_edge_loop(eee: &'static Node, cancel: CancellationToken) {
}
}
async fn send_stun_request(eee: &Node) {
let sdl_v6_info = match *eee.ipv6.read().unwrap() {
Some(ref l) => Some(Sdlv6Info {
port: l.port as u32,
v6: Vec::from(l.v6),
}),
None => None,
};
let req = SdlStunRequest {
cookie: 0,
client_id: eee.config.node_uuid.clone(),
network_id: eee.network_id.load(Ordering::Relaxed),
ip: eee.device_config.get_ip(),
mac: Vec::from(eee.device_config.get_mac()),
nat_type: eee.get_nat_type() as u32,
v6_info: sdl_v6_info,
};
debug!("stun request: {:?}", req);
let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap();
if let Err(e) = send_to_sock(
eee,
&msg,
&eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize],
)
.await
{
error!("failed to send to sock: {:?}", e);
}
}
pub async fn loop_socket_v6(eee: &'static Node, socket: Arc<Socket>, cancel: CancellationToken) {
debug!("loop sock v6");
loop {

View File

@ -42,6 +42,7 @@ pub async fn init_edge(
udpsock_for_dns: Arc<UdpSocket>,
hostname: String,
server_ip: String,
install_channel: String
) -> Result<()> {
// gen public key
let rsa_path = format!("{}/.client", get_base_dir());
@ -88,6 +89,7 @@ pub async fn init_edge(
hostname,
udpsock_for_dns,
server_ip,
install_channel,
);
do_init_edge(edge)?;
@ -175,6 +177,8 @@ pub struct Node {
// last register super time, in unix
pub _last_register_req: AtomicU64,
pub install_channel: String,
nat_type: Mutex<NatType>,
nat_cookie: AtomicU32,
@ -287,6 +291,7 @@ impl Node {
hostname: String,
udpsock_for_dns: Arc<UdpSocket>,
server_ip: String,
install_channel: String,
) -> Self {
let mode = if cfg!(not(feature = "tun")) {
Mode::Tap
@ -347,6 +352,7 @@ impl Node {
nat_cookie: AtomicU32::new(1),
cookie_match: DashMap::new(),
server_ip,
install_channel,
}
}

View File

@ -1,6 +1,11 @@
use once_cell::sync::OnceCell;
use sdlan_sn_rs::utils::{get_current_timestamp, Result, SDLanError};
use prost::Message;
use sdlan_sn_rs::config::AF_INET;
use sdlan_sn_rs::peer::{SdlanSock, V6Info};
use sdlan_sn_rs::utils::{Result, SDLanError, get_current_timestamp, ip_to_string, rsa_decrypt};
use std::future::Future;
use std::net::IpAddr;
use std::process::Output;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::{
@ -19,15 +24,324 @@ use tokio::{
};
use tracing::error;
use crate::config::TCP_PING_TIME;
use crate::network::StartStopInfo;
use crate::tcp::read_a_packet;
use crate::{ConnectionInfo, ConnectionState};
use crate::config::{NULL_MAC, TCP_PING_TIME};
use crate::network::{Node, RegisterSuperFeedback, StartStopInfo, check_peer_registration_needed, handle_packet_peer_info};
use crate::pb::{SdlDevAddr, SdlRegisterSuper, SdlRegisterSuperAck, SdlRegisterSuperNak, SdlSendRegisterEvent, SdlStunRequest, Sdlv6Info, encode_to_tcp_message, encode_to_udp_message};
use crate::tcp::{EventType, NakMsgCode, NatType, PacketType, read_a_packet};
use crate::utils::send_to_sock;
use crate::{ConnectionInfo, ConnectionState, get_edge};
use super::tcp_codec::SdlanTcp;
static GLOBAL_TCP_HANDLE: OnceCell<ReadWriterHandle> = OnceCell::new();
async fn handle_tcp_message(msg: SdlanTcp) {
let edge = get_edge();
// let now = get_current_timestamp();
// edge.tcp_pong.store(now, Ordering::Relaxed);
debug!("got tcp message: {:?}", msg.packet_type);
match msg.packet_type {
PacketType::RegisterSuperACK => {
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 0,
message: "".to_owned(),
should_exit: false,
},
);
let Ok(ack) = SdlRegisterSuperAck::decode(&msg.current_packet[..]) else {
error!("failed to decode REGISTER_SUPER_ACK");
return;
};
debug!("got register super ack: {:?}", ack);
let Ok(aes) = rsa_decrypt(&edge.rsa_private, &ack.aes_key) else {
error!("failed to rsa decrypt aes key");
return;
};
let Some(dev) = ack.dev_addr else {
error!("no dev_addr is specified");
return;
};
let ip = ip_to_string(&dev.net_addr);
// debug!("aes key is {:?}, ip is {}/{}", aes, ip, dev.net_bit_len,);
println!("assigned ip: {}", ip);
let hostname = edge.hostname.read().unwrap().clone();
println!("network is: {}.{}", hostname, dev.network_domain);
edge.device_config
.ip
.net_addr
.store(dev.net_addr, Ordering::Relaxed);
if let Some(ref chan) = edge.connection_chan {
let _ = chan.send(ConnectionInfo::IPInfo(ip)).await;
}
/*
let mac = match dev.mac.try_into() {
Err(_) => NULL_MAC,
Ok(m) => m,
};
*/
// *edge.device_config.mac.write().unwrap() = mac;
edge.device_config
.ip
.net_bit_len
.store(dev.net_bit_len as u8, Ordering::Relaxed);
edge.device.reload_config(&edge.device_config, &dev.network_domain);
edge.network_id.store(dev.network_id, Ordering::Relaxed);
edge.set_authorized(true, aes);
send_stun_request(edge).await;
tokio::spawn(async {
let nattype = edge.probe_nat_type().await;
debug!("nat type is {:?}", nattype);
// println!("nat type is: {:?}", nattype);
});
}
PacketType::RegisterSuperNAK => {
let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else {
error!("failed to decode REGISTER_SUPER_NAK");
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 1,
message: "failed to decode REGISTER SUPER NAK".to_owned(),
should_exit: false,
},
);
return;
};
let Ok(error_code) = NakMsgCode::try_from(_nak.error_code as u8) else {
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 2,
message: "error_code not recognized".to_owned(),
should_exit: false,
},
);
return;
};
match error_code {
NakMsgCode::InvalidToken => {
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 3,
message: "invalid token".to_owned(),
should_exit: true,
},
);
edge.stop().await;
}
NakMsgCode::NodeDisabled => {
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 4,
message: "Node is disabled".to_owned(),
should_exit: true,
},
);
edge.stop().await;
}
_other => {
edge.send_register_super_feedback(
msg._packet_id,
RegisterSuperFeedback {
result: 0,
message: "".to_owned(),
should_exit: false,
},
);
}
}
/*
edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback {
result: 1,
message: "failed to decode REGISTER SUPER NAK".to_owned(),
});
*/
edge.set_authorized(false, Vec::new());
// std::process::exit(0);
}
PacketType::Command => {
if msg.current_packet.len() < 1 {
error!("malformed COMMAND received");
return;
}
handle_tcp_command(edge, msg.current_packet[0], &msg.current_packet[1..]).await;
}
PacketType::Event => {
if msg.current_packet.len() < 1 {
error!("malformed EVENT received");
return;
}
let Ok(event) = msg.current_packet[0].try_into() else {
error!("failed to parse event type");
return;
};
handle_tcp_event(edge, event, &msg.current_packet[1..]).await;
}
PacketType::PeerInfo => {
let _ = handle_packet_peer_info(edge, &msg.current_packet[..]).await;
}
PacketType::Pong => {
debug!("tcp pong received");
let now = get_current_timestamp();
edge.tcp_pong.store(now, Ordering::Relaxed);
}
other => {
debug!("tcp not handling {:?}", other);
}
}
}
async fn handle_tcp_command(_edge: &Node, _cmdtype: u8, _cmdprotobuf: &[u8]) {}
async fn handle_tcp_event(edge: &'static Node, eventtype: EventType, eventprotobuf: &[u8]) {
match eventtype {
EventType::SendRegister => {
let Ok(reg) = SdlSendRegisterEvent::decode(eventprotobuf) else {
error!("failed to decode SendRegister Event");
return;
};
let v4 = reg.nat_ip.to_be_bytes();
let mut v6_sock = None;
if let Some(v6_info) = reg.v6_info {
if let Ok(v6_bytes) = v6_info.v6.try_into() {
v6_sock = Some(V6Info {
port: v6_info.port as u16,
v6: v6_bytes,
});
}
}
let dst_mac = match reg.dst_mac.try_into() {
Ok(m) => m,
Err(_e) => NULL_MAC,
};
let remote_nat_byte = reg.nat_type as u8;
let remote_nat = match remote_nat_byte.try_into() {
Ok(t) => t,
Err(_) => NatType::NoNat,
};
check_peer_registration_needed(
edge,
false,
dst_mac,
// &v6_sock,
remote_nat,
&v6_sock,
&SdlanSock {
family: AF_INET,
port: reg.nat_port as u16,
v4,
v6: [0; 16],
},
)
.await;
}
other => {
debug!("unhandled event {:?}", other);
}
}
}
pub async fn send_stun_request(eee: &Node) {
let sdl_v6_info = match *eee.ipv6.read().unwrap() {
Some(ref l) => Some(Sdlv6Info {
port: l.port as u32,
v6: Vec::from(l.v6),
}),
None => None,
};
let req = SdlStunRequest {
cookie: 0,
client_id: eee.config.node_uuid.clone(),
network_id: eee.network_id.load(Ordering::Relaxed),
ip: eee.device_config.get_ip(),
mac: Vec::from(eee.device_config.get_mac()),
nat_type: eee.get_nat_type() as u32,
v6_info: sdl_v6_info,
};
debug!("stun request: {:?}", req);
let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap();
if let Err(e) = send_to_sock(
eee,
&msg,
&eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize],
)
.await
{
error!("failed to send to sock: {:?}", e);
}
}
async fn on_disconnected_callback() {
let edge = get_edge();
edge.set_authorized(false, vec![]);
}
async fn on_connected_callback<'a>(stream: &'a mut tokio::net::TcpStream, pkt_id: Option<u32>) {
let edge = get_edge();
// let installed_channel = install_channel.to_owned();
let token = edge._token.lock().unwrap().clone();
let code = edge.network_code.lock().unwrap().clone();
// let edge = get_edge();
// let edge = get_edge();
// let token = args.token.clone();
if let Ok(ipaddr) = stream.local_addr() {
match ipaddr.ip() {
IpAddr::V4(v4) => {
let ip = v4.into();
// println!("outer ip is {} => {}", v4, ip);
edge.outer_ip_v4.store(ip, Ordering::Relaxed);
}
_other => {}
}
}
let register_super = SdlRegisterSuper {
version: 1,
installed_channel: edge.install_channel.clone(),
// installed_channel,
client_id: edge.config.node_uuid.clone(),
dev_addr: Some(SdlDevAddr {
mac: Vec::from(edge.device_config.get_mac()),
net_addr: 0,
network_id: 0,
net_bit_len: 0,
network_domain: "".to_owned(),
}),
pub_key: edge.rsa_pubkey.clone(),
token,
network_code: code,
hostname: edge.hostname.read().unwrap().clone(),
};
// debug!("send register super: {:?}", register_super);
let packet_id = match pkt_id {
Some(id) => id,
None => edge.get_next_packet_id(),
};
// let packet_id = edge.get_next_packet_id();
let data = encode_to_tcp_message(
Some(register_super),
packet_id,
PacketType::RegisterSuper as u8,
)
.unwrap();
if let Err(e) = stream.write(&data).await {
error!("failed to write to tcp: {}", e.to_string());
}
}
pub struct ReadWriteActor {
// actor接收的发送给tcp的接收端由handle存放发送端
// to_tcp: Receiver<Vec<u8>>,
@ -63,18 +377,19 @@ impl ReadWriteActor {
}
}
pub async fn run<'a, T, T2, F>(
pub async fn run<'a>(
&self,
keep_reconnect: bool,
mut to_tcp: Receiver<Vec<u8>>,
on_connected: T,
on_disconnected: T2,
// on_connected: OnConnectedCallback<'_>,
// on_disconnected: T2,
mut start_stop_chan: Receiver<StartStopInfo>,
// cancel: CancellationToken,
) where
T: for<'b> Fn(&'b mut TcpStream, Option<u32>) -> BoxFuture<'b, ()>,
T2: Fn() -> F,
F: Future<Output = ()>,
// T: Fn(&mut TcpStream, Option<u32>) -> F2,
// T2: Fn() -> F,
// F: Future<Output = ()>,
// F2: Future<Output = ()>,
{
// let (tx, rx) = channel(20);
let mut started = false;
@ -140,7 +455,7 @@ impl ReadWriteActor {
};
self.connected.store(true, Ordering::Relaxed);
debug!("connected");
on_connected(&mut stream, start_pkt_id.take()).await;
on_connected_callback(&mut stream, start_pkt_id.take()).await;
if let Some(ref connecting_chan) = self.connecting_chan {
let state = ConnectionInfo::ConnState(ConnectionState::Connected);
@ -226,7 +541,7 @@ impl ReadWriteActor {
_ = check_pong => {},
_ = check_stop => {},
}
on_disconnected().await;
on_disconnected_callback().await;
debug!("connect retrying");
tokio::time::sleep(Duration::from_secs(1)).await;
debug!("disconnected");
@ -258,12 +573,12 @@ impl ReadWriterHandle {
Ok(())
}
fn new<'a, T, T3, T2, F, F2>(
fn new<>(
cancel: CancellationToken,
addr: &str,
on_connected: T,
on_disconnected: T3,
on_message: T2,
// on_connected: OnConnectedCallback<'a>,
// on_disconnected: T3,
// on_message: T2,
pong_time: Arc<AtomicU64>,
start_stop_chan: Receiver<StartStopInfo>,
// cancel: CancellationToken,
@ -271,11 +586,10 @@ impl ReadWriterHandle {
ipv6_network_restarter: Option<Sender<bool>>,
) -> Self
where
T: for<'b> Fn(&'b mut TcpStream, Option<u32>) -> BoxFuture<'b, ()> + Send + 'static,
T3: Fn() -> F2 + Send + 'static,
T2: Fn(SdlanTcp) -> F + Send + 'static,
F: Future<Output = ()> + Send,
F2: Future<Output = ()> + Send,
// T3: Fn() -> F2 + Send + 'static,
// T2: Fn(SdlanTcp) -> F + Send + 'static,
// F: Future<Output = ()> + Send,
// F2: Future<Output = ()> + Send,
{
let (send_to_tcp, to_tcp) = channel(20);
let (from_tcp, mut data_from_tcp) = channel(20);
@ -292,13 +606,19 @@ impl ReadWriterHandle {
);
tokio::spawn(async move {
actor
.run(true, to_tcp, on_connected, on_disconnected, start_stop_chan)
.run(
true,
to_tcp,
// on_connected,
// on_disconnected,
start_stop_chan
)
.await
});
tokio::spawn(async move {
loop {
if let Some(msg) = data_from_tcp.recv().await {
on_message(msg).await;
handle_tcp_message(msg).await;
} else {
error!("data from tcp exited");
// eprintln!("data from tcp exited");
@ -315,30 +635,28 @@ impl ReadWriterHandle {
}
}
pub fn init_tcp_conn<'a, T, T3, T2, F, F2>(
pub fn init_tcp_conn(
cancel: CancellationToken,
addr: &str,
on_connected: T,
on_disconnected: T3,
on_message: T2,
// on_connected: OnConnectedCallback<'a>,
// on_disconnected: T3,
// on_message: T2,
pong_time: Arc<AtomicU64>,
// cancel: CancellationToken,
start_stop_chan: Receiver<StartStopInfo>,
connecting_chan: Option<Sender<ConnectionInfo>>,
ipv6_network_restarter: Option<Sender<bool>>,
) where
T: for<'b> Fn(&'b mut TcpStream, Option<u32>) -> BoxFuture<'b, ()> + Send + 'static,
T3: Fn() -> F2 + Send + 'static,
T2: Fn(SdlanTcp) -> F + Send + 'static,
F: Future<Output = ()> + Send,
F2: Future<Output = ()> + Send,
)
// T2: Fn(SdlanTcp) -> F + Send + 'static,
// F: Future<Output = ()> + Send,
{
let tcp_handle = ReadWriterHandle::new(
cancel,
addr,
on_connected,
on_disconnected,
on_message,
// on_connected,
// on_disconnected,
// on_message,
pong_time,
start_stop_chan,
connecting_chan,