diff --git a/src/lib.rs b/src/lib.rs index 2a0bd25..207e305 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,6 +78,7 @@ pub async fn run_sdlan( sock, hostname, server_ip, + install_channel.to_owned(), ) .await { @@ -87,10 +88,10 @@ pub async fn run_sdlan( debug!("edge inited"); let cancel = CancellationToken::new(); - let install_chan = install_channel.to_owned(); + // let install_chan = install_channel.to_owned(); tokio::spawn(async move { if let Err(e) = - async_main(install_chan, args, start_stop_chan, cancel, connecting_chan).await + async_main(args, start_stop_chan, cancel, connecting_chan).await { error!("failed to run async main: {}", e.as_str()); } diff --git a/src/network/async_main.rs b/src/network/async_main.rs index a7e3444..4718097 100755 --- a/src/network/async_main.rs +++ b/src/network/async_main.rs @@ -1,289 +1,34 @@ -use std::net::IpAddr; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; -use crate::config::{NULL_MAC, TCP_PING_TIME}; +use crate::config::{TCP_PING_TIME}; use crate::network::ipv6::run_ipv6; use crate::network::{ - get_edge, ping_to_sn, read_and_parse_packet, RegisterSuperFeedback, TunTapPacketHandler, + get_edge, ping_to_sn, read_and_parse_packet, TunTapPacketHandler, }; -use crate::pb::{ - encode_to_tcp_message, encode_to_udp_message, SdlDevAddr, SdlRegisterSuper, - SdlRegisterSuperAck, SdlRegisterSuperNak, SdlSendRegisterEvent, SdlStunRequest, Sdlv6Info, -}; -use crate::tcp::{init_tcp_conn, EventType, NakMsgCode, NatType, PacketType, SdlanTcp}; +use crate::tcp::{init_tcp_conn, send_stun_request}; use crate::utils::{send_to_sock, CommandLine}; -use crate::{ConnectionInfo, ConnectionState}; -use sdlan_sn_rs::config::AF_INET; -use sdlan_sn_rs::peer::{SdlanSock, V6Info}; -use sdlan_sn_rs::utils::{get_current_timestamp, ip_to_string, is_multi_broadcast, rsa_decrypt}; +use crate::{ConnectionInfo}; +use sdlan_sn_rs::peer::{SdlanSock}; +use sdlan_sn_rs::utils::{get_current_timestamp, is_multi_broadcast}; use sdlan_sn_rs::utils::{Mac, Result}; -use tokio::io::AsyncWriteExt; -use tokio::net::UdpSocket; +use tokio::net::{UdpSocket}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio_util::sync::CancellationToken; -use super::{check_peer_registration_needed, packet, Node, StartStopInfo}; +use super::{Node, StartStopInfo}; use crate::utils::Socket; -use prost::Message; use tracing::{debug, error}; -async fn handle_tcp_message(msg: SdlanTcp) { - let edge = get_edge(); - - // let now = get_current_timestamp(); - // edge.tcp_pong.store(now, Ordering::Relaxed); - - debug!("got tcp message: {:?}", msg.packet_type); - match msg.packet_type { - PacketType::RegisterSuperACK => { - edge.send_register_super_feedback( - msg._packet_id, - RegisterSuperFeedback { - result: 0, - message: "".to_owned(), - should_exit: false, - }, - ); - let Ok(ack) = SdlRegisterSuperAck::decode(&msg.current_packet[..]) else { - error!("failed to decode REGISTER_SUPER_ACK"); - return; - }; - debug!("got register super ack: {:?}", ack); - let Ok(aes) = rsa_decrypt(&edge.rsa_private, &ack.aes_key) else { - error!("failed to rsa decrypt aes key"); - return; - }; - let Some(dev) = ack.dev_addr else { - error!("no dev_addr is specified"); - return; - }; - - let ip = ip_to_string(&dev.net_addr); - // debug!("aes key is {:?}, ip is {}/{}", aes, ip, dev.net_bit_len,); - println!("assigned ip: {}", ip); - let hostname = edge.hostname.read().unwrap().clone(); - println!("network is: {}.{}", hostname, dev.network_domain); - edge.device_config - .ip - .net_addr - .store(dev.net_addr, Ordering::Relaxed); - if let Some(ref chan) = edge.connection_chan { - let _ = chan.send(ConnectionInfo::IPInfo(ip)).await; - } - /* - let mac = match dev.mac.try_into() { - Err(_) => NULL_MAC, - Ok(m) => m, - }; - */ - // *edge.device_config.mac.write().unwrap() = mac; - edge.device_config - .ip - .net_bit_len - .store(dev.net_bit_len as u8, Ordering::Relaxed); - edge.device.reload_config(&edge.device_config, &dev.network_domain); - edge.network_id.store(dev.network_id, Ordering::Relaxed); - - edge.set_authorized(true, aes); - send_stun_request(edge).await; - tokio::spawn(async { - let nattype = edge.probe_nat_type().await; - debug!("nat type is {:?}", nattype); - // println!("nat type is: {:?}", nattype); - }); - } - PacketType::RegisterSuperNAK => { - let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else { - error!("failed to decode REGISTER_SUPER_NAK"); - edge.send_register_super_feedback( - msg._packet_id, - RegisterSuperFeedback { - result: 1, - message: "failed to decode REGISTER SUPER NAK".to_owned(), - should_exit: false, - }, - ); - return; - }; - - let Ok(error_code) = NakMsgCode::try_from(_nak.error_code as u8) else { - edge.send_register_super_feedback( - msg._packet_id, - RegisterSuperFeedback { - result: 2, - message: "error_code not recognized".to_owned(), - should_exit: false, - }, - ); - return; - }; - match error_code { - NakMsgCode::InvalidToken => { - edge.send_register_super_feedback( - msg._packet_id, - RegisterSuperFeedback { - result: 3, - message: "invalid token".to_owned(), - should_exit: true, - }, - ); - edge.stop().await; - } - NakMsgCode::NodeDisabled => { - edge.send_register_super_feedback( - msg._packet_id, - RegisterSuperFeedback { - result: 4, - message: "Node is disabled".to_owned(), - should_exit: true, - }, - ); - edge.stop().await; - } - _other => { - edge.send_register_super_feedback( - msg._packet_id, - RegisterSuperFeedback { - result: 0, - message: "".to_owned(), - should_exit: false, - }, - ); - } - } - /* - edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback { - result: 1, - message: "failed to decode REGISTER SUPER NAK".to_owned(), - }); - */ - edge.set_authorized(false, Vec::new()); - // std::process::exit(0); - } - PacketType::Command => { - if msg.current_packet.len() < 1 { - error!("malformed COMMAND received"); - return; - } - handle_tcp_command(edge, msg.current_packet[0], &msg.current_packet[1..]).await; - } - PacketType::Event => { - if msg.current_packet.len() < 1 { - error!("malformed EVENT received"); - return; - } - let Ok(event) = msg.current_packet[0].try_into() else { - error!("failed to parse event type"); - return; - }; - handle_tcp_event(edge, event, &msg.current_packet[1..]).await; - } - PacketType::PeerInfo => { - let _ = packet::handle_packet_peer_info(edge, &msg.current_packet[..]).await; - } - PacketType::Pong => { - debug!("tcp pong received"); - let now = get_current_timestamp(); - edge.tcp_pong.store(now, Ordering::Relaxed); - } - other => { - debug!("tcp not handling {:?}", other); - } - } -} - -async fn handle_tcp_command(_edge: &Node, _cmdtype: u8, _cmdprotobuf: &[u8]) {} - -async fn handle_tcp_event(edge: &'static Node, eventtype: EventType, eventprotobuf: &[u8]) { - match eventtype { - EventType::SendRegister => { - let Ok(reg) = SdlSendRegisterEvent::decode(eventprotobuf) else { - error!("failed to decode SendRegister Event"); - return; - }; - let v4 = reg.nat_ip.to_be_bytes(); - let mut v6_sock = None; - if let Some(v6_info) = reg.v6_info { - if let Ok(v6_bytes) = v6_info.v6.try_into() { - v6_sock = Some(V6Info { - port: v6_info.port as u16, - v6: v6_bytes, - }); - } - } - - let dst_mac = match reg.dst_mac.try_into() { - Ok(m) => m, - Err(_e) => NULL_MAC, - }; - - let remote_nat_byte = reg.nat_type as u8; - let remote_nat = match remote_nat_byte.try_into() { - Ok(t) => t, - Err(_) => NatType::NoNat, - }; - - check_peer_registration_needed( - edge, - false, - dst_mac, - // &v6_sock, - remote_nat, - &v6_sock, - &SdlanSock { - family: AF_INET, - port: reg.nat_port as u16, - v4, - v6: [0; 16], - }, - ) - .await; - } - other => { - debug!("unhandled event {:?}", other); - } - } -} - pub async fn async_main( - install_channel: String, args: CommandLine, start_stop_chan: Receiver, cancel: CancellationToken, connecting_chan: Option>, ) -> Result<()> { // let _ = PidRecorder::new(".pid"); - - // // gen public key - // gen_rsa_keys(".client"); - // let mut pubkey = String::new(); - // File::open(".client/id_rsa.pub")?.read_to_string(&mut pubkey)?; - // let privatekey = load_private_key_file(".client/id_rsa")?; - - // // init sock - // if args.token.len() == 0 { - // println!("failed to load token"); - // return Ok(()); - // } - // let sock_v4 = Socket::build(0, true, true, args.tos).await?; - - // // allow multicast - - // // TODO: set the sn's tcp socket - // // let tcpsock = TCPSocket::build("121.4.79.234:1234").await?; - // let tcp_pong = Arc::new(AtomicU64::new(0)); - // let edge = Node::new( - // pubkey, - // node_conf, - // sock_v4, - // &args.token, - // privatekey, - // tcp_pong.clone(), - // ); - let edge = get_edge(); // let token = args.token.clone(); @@ -293,61 +38,7 @@ pub async fn async_main( init_tcp_conn( cancel_tcp, &args.tcp, - move |stream, pkt_id| { - let installed_channel = install_channel.to_owned(); - Box::pin(async move { - let token = edge._token.lock().unwrap().clone(); - let code = edge.network_code.lock().unwrap().clone(); - // let edge = get_edge(); - // let edge = get_edge(); - // let token = args.token.clone(); - if let Ok(ipaddr) = stream.local_addr() { - match ipaddr.ip() { - IpAddr::V4(v4) => { - let ip = v4.into(); - // println!("outer ip is {} => {}", v4, ip); - edge.outer_ip_v4.store(ip, Ordering::Relaxed); - } - _other => {} - } - } - let register_super = SdlRegisterSuper { - version: 1, - installed_channel, - client_id: edge.config.node_uuid.clone(), - dev_addr: Some(SdlDevAddr { - mac: Vec::from(edge.device_config.get_mac()), - net_addr: 0, - network_id: 0, - net_bit_len: 0, - network_domain: "".to_owned(), - }), - pub_key: edge.rsa_pubkey.clone(), - token, - network_code: code, - hostname: edge.hostname.read().unwrap().clone(), - }; - // debug!("send register super: {:?}", register_super); - let packet_id = match pkt_id { - Some(id) => id, - None => edge.get_next_packet_id(), - }; - // let packet_id = edge.get_next_packet_id(); - let data = encode_to_tcp_message( - Some(register_super), - packet_id, - PacketType::RegisterSuper as u8, - ) - .unwrap(); - if let Err(e) = stream.write(&data).await { - error!("failed to write to tcp: {}", e.to_string()); - } - }) - }, - || async { - edge.set_authorized(false, vec![]); - }, - |msg| handle_tcp_message(msg), + // |msg| handle_tcp_message(msg), edge.tcp_pong.clone(), // tcp_pong, start_stop_chan, @@ -448,36 +139,6 @@ async fn run_edge_loop(eee: &'static Node, cancel: CancellationToken) { } } -async fn send_stun_request(eee: &Node) { - let sdl_v6_info = match *eee.ipv6.read().unwrap() { - Some(ref l) => Some(Sdlv6Info { - port: l.port as u32, - v6: Vec::from(l.v6), - }), - None => None, - }; - let req = SdlStunRequest { - cookie: 0, - client_id: eee.config.node_uuid.clone(), - network_id: eee.network_id.load(Ordering::Relaxed), - ip: eee.device_config.get_ip(), - mac: Vec::from(eee.device_config.get_mac()), - nat_type: eee.get_nat_type() as u32, - v6_info: sdl_v6_info, - }; - debug!("stun request: {:?}", req); - let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap(); - if let Err(e) = send_to_sock( - eee, - &msg, - &eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize], - ) - .await - { - error!("failed to send to sock: {:?}", e); - } -} - pub async fn loop_socket_v6(eee: &'static Node, socket: Arc, cancel: CancellationToken) { debug!("loop sock v6"); loop { diff --git a/src/network/node.rs b/src/network/node.rs index 20f3340..4cbf5dc 100755 --- a/src/network/node.rs +++ b/src/network/node.rs @@ -42,6 +42,7 @@ pub async fn init_edge( udpsock_for_dns: Arc, hostname: String, server_ip: String, + install_channel: String ) -> Result<()> { // gen public key let rsa_path = format!("{}/.client", get_base_dir()); @@ -88,6 +89,7 @@ pub async fn init_edge( hostname, udpsock_for_dns, server_ip, + install_channel, ); do_init_edge(edge)?; @@ -175,6 +177,8 @@ pub struct Node { // last register super time, in unix pub _last_register_req: AtomicU64, + pub install_channel: String, + nat_type: Mutex, nat_cookie: AtomicU32, @@ -287,6 +291,7 @@ impl Node { hostname: String, udpsock_for_dns: Arc, server_ip: String, + install_channel: String, ) -> Self { let mode = if cfg!(not(feature = "tun")) { Mode::Tap @@ -347,6 +352,7 @@ impl Node { nat_cookie: AtomicU32::new(1), cookie_match: DashMap::new(), server_ip, + install_channel, } } diff --git a/src/tcp/tcp_conn.rs b/src/tcp/tcp_conn.rs index ce3ad18..0dcc194 100755 --- a/src/tcp/tcp_conn.rs +++ b/src/tcp/tcp_conn.rs @@ -1,6 +1,11 @@ use once_cell::sync::OnceCell; -use sdlan_sn_rs::utils::{get_current_timestamp, Result, SDLanError}; +use prost::Message; +use sdlan_sn_rs::config::AF_INET; +use sdlan_sn_rs::peer::{SdlanSock, V6Info}; +use sdlan_sn_rs::utils::{Result, SDLanError, get_current_timestamp, ip_to_string, rsa_decrypt}; use std::future::Future; +use std::net::IpAddr; +use std::process::Output; use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::{ @@ -19,15 +24,324 @@ use tokio::{ }; use tracing::error; -use crate::config::TCP_PING_TIME; -use crate::network::StartStopInfo; -use crate::tcp::read_a_packet; -use crate::{ConnectionInfo, ConnectionState}; +use crate::config::{NULL_MAC, TCP_PING_TIME}; +use crate::network::{Node, RegisterSuperFeedback, StartStopInfo, check_peer_registration_needed, handle_packet_peer_info}; +use crate::pb::{SdlDevAddr, SdlRegisterSuper, SdlRegisterSuperAck, SdlRegisterSuperNak, SdlSendRegisterEvent, SdlStunRequest, Sdlv6Info, encode_to_tcp_message, encode_to_udp_message}; +use crate::tcp::{EventType, NakMsgCode, NatType, PacketType, read_a_packet}; +use crate::utils::send_to_sock; +use crate::{ConnectionInfo, ConnectionState, get_edge}; use super::tcp_codec::SdlanTcp; static GLOBAL_TCP_HANDLE: OnceCell = OnceCell::new(); + +async fn handle_tcp_message(msg: SdlanTcp) { + let edge = get_edge(); + + // let now = get_current_timestamp(); + // edge.tcp_pong.store(now, Ordering::Relaxed); + + debug!("got tcp message: {:?}", msg.packet_type); + match msg.packet_type { + PacketType::RegisterSuperACK => { + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 0, + message: "".to_owned(), + should_exit: false, + }, + ); + let Ok(ack) = SdlRegisterSuperAck::decode(&msg.current_packet[..]) else { + error!("failed to decode REGISTER_SUPER_ACK"); + return; + }; + debug!("got register super ack: {:?}", ack); + let Ok(aes) = rsa_decrypt(&edge.rsa_private, &ack.aes_key) else { + error!("failed to rsa decrypt aes key"); + return; + }; + let Some(dev) = ack.dev_addr else { + error!("no dev_addr is specified"); + return; + }; + + let ip = ip_to_string(&dev.net_addr); + // debug!("aes key is {:?}, ip is {}/{}", aes, ip, dev.net_bit_len,); + println!("assigned ip: {}", ip); + let hostname = edge.hostname.read().unwrap().clone(); + println!("network is: {}.{}", hostname, dev.network_domain); + edge.device_config + .ip + .net_addr + .store(dev.net_addr, Ordering::Relaxed); + if let Some(ref chan) = edge.connection_chan { + let _ = chan.send(ConnectionInfo::IPInfo(ip)).await; + } + /* + let mac = match dev.mac.try_into() { + Err(_) => NULL_MAC, + Ok(m) => m, + }; + */ + // *edge.device_config.mac.write().unwrap() = mac; + edge.device_config + .ip + .net_bit_len + .store(dev.net_bit_len as u8, Ordering::Relaxed); + edge.device.reload_config(&edge.device_config, &dev.network_domain); + edge.network_id.store(dev.network_id, Ordering::Relaxed); + + edge.set_authorized(true, aes); + send_stun_request(edge).await; + tokio::spawn(async { + let nattype = edge.probe_nat_type().await; + debug!("nat type is {:?}", nattype); + // println!("nat type is: {:?}", nattype); + }); + } + PacketType::RegisterSuperNAK => { + let Ok(_nak) = SdlRegisterSuperNak::decode(&msg.current_packet[..]) else { + error!("failed to decode REGISTER_SUPER_NAK"); + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 1, + message: "failed to decode REGISTER SUPER NAK".to_owned(), + should_exit: false, + }, + ); + return; + }; + + let Ok(error_code) = NakMsgCode::try_from(_nak.error_code as u8) else { + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 2, + message: "error_code not recognized".to_owned(), + should_exit: false, + }, + ); + return; + }; + match error_code { + NakMsgCode::InvalidToken => { + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 3, + message: "invalid token".to_owned(), + should_exit: true, + }, + ); + edge.stop().await; + } + NakMsgCode::NodeDisabled => { + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 4, + message: "Node is disabled".to_owned(), + should_exit: true, + }, + ); + edge.stop().await; + } + _other => { + edge.send_register_super_feedback( + msg._packet_id, + RegisterSuperFeedback { + result: 0, + message: "".to_owned(), + should_exit: false, + }, + ); + } + } + /* + edge.send_register_super_feedback(msg._packet_id, RegisterSuperFeedback { + result: 1, + message: "failed to decode REGISTER SUPER NAK".to_owned(), + }); + */ + edge.set_authorized(false, Vec::new()); + // std::process::exit(0); + } + PacketType::Command => { + if msg.current_packet.len() < 1 { + error!("malformed COMMAND received"); + return; + } + handle_tcp_command(edge, msg.current_packet[0], &msg.current_packet[1..]).await; + } + PacketType::Event => { + if msg.current_packet.len() < 1 { + error!("malformed EVENT received"); + return; + } + let Ok(event) = msg.current_packet[0].try_into() else { + error!("failed to parse event type"); + return; + }; + handle_tcp_event(edge, event, &msg.current_packet[1..]).await; + } + PacketType::PeerInfo => { + let _ = handle_packet_peer_info(edge, &msg.current_packet[..]).await; + } + PacketType::Pong => { + debug!("tcp pong received"); + let now = get_current_timestamp(); + edge.tcp_pong.store(now, Ordering::Relaxed); + } + other => { + debug!("tcp not handling {:?}", other); + } + } +} + +async fn handle_tcp_command(_edge: &Node, _cmdtype: u8, _cmdprotobuf: &[u8]) {} + +async fn handle_tcp_event(edge: &'static Node, eventtype: EventType, eventprotobuf: &[u8]) { + match eventtype { + EventType::SendRegister => { + let Ok(reg) = SdlSendRegisterEvent::decode(eventprotobuf) else { + error!("failed to decode SendRegister Event"); + return; + }; + let v4 = reg.nat_ip.to_be_bytes(); + let mut v6_sock = None; + if let Some(v6_info) = reg.v6_info { + if let Ok(v6_bytes) = v6_info.v6.try_into() { + v6_sock = Some(V6Info { + port: v6_info.port as u16, + v6: v6_bytes, + }); + } + } + + let dst_mac = match reg.dst_mac.try_into() { + Ok(m) => m, + Err(_e) => NULL_MAC, + }; + + let remote_nat_byte = reg.nat_type as u8; + let remote_nat = match remote_nat_byte.try_into() { + Ok(t) => t, + Err(_) => NatType::NoNat, + }; + + check_peer_registration_needed( + edge, + false, + dst_mac, + // &v6_sock, + remote_nat, + &v6_sock, + &SdlanSock { + family: AF_INET, + port: reg.nat_port as u16, + v4, + v6: [0; 16], + }, + ) + .await; + } + other => { + debug!("unhandled event {:?}", other); + } + } +} + + +pub async fn send_stun_request(eee: &Node) { + let sdl_v6_info = match *eee.ipv6.read().unwrap() { + Some(ref l) => Some(Sdlv6Info { + port: l.port as u32, + v6: Vec::from(l.v6), + }), + None => None, + }; + let req = SdlStunRequest { + cookie: 0, + client_id: eee.config.node_uuid.clone(), + network_id: eee.network_id.load(Ordering::Relaxed), + ip: eee.device_config.get_ip(), + mac: Vec::from(eee.device_config.get_mac()), + nat_type: eee.get_nat_type() as u32, + v6_info: sdl_v6_info, + }; + debug!("stun request: {:?}", req); + let msg = encode_to_udp_message(Some(req), PacketType::StunRequest as u8).unwrap(); + if let Err(e) = send_to_sock( + eee, + &msg, + &eee.config.super_nodes[eee.config.super_node_index.load(Ordering::Relaxed) as usize], + ) + .await + { + error!("failed to send to sock: {:?}", e); + } +} + +async fn on_disconnected_callback() { + let edge = get_edge(); + edge.set_authorized(false, vec![]); +} + +async fn on_connected_callback<'a>(stream: &'a mut tokio::net::TcpStream, pkt_id: Option) { + let edge = get_edge(); + // let installed_channel = install_channel.to_owned(); + + let token = edge._token.lock().unwrap().clone(); + let code = edge.network_code.lock().unwrap().clone(); + // let edge = get_edge(); + // let edge = get_edge(); + // let token = args.token.clone(); + if let Ok(ipaddr) = stream.local_addr() { + match ipaddr.ip() { + IpAddr::V4(v4) => { + let ip = v4.into(); + // println!("outer ip is {} => {}", v4, ip); + edge.outer_ip_v4.store(ip, Ordering::Relaxed); + } + _other => {} + } + } + let register_super = SdlRegisterSuper { + version: 1, + installed_channel: edge.install_channel.clone(), + // installed_channel, + client_id: edge.config.node_uuid.clone(), + dev_addr: Some(SdlDevAddr { + mac: Vec::from(edge.device_config.get_mac()), + net_addr: 0, + network_id: 0, + net_bit_len: 0, + network_domain: "".to_owned(), + }), + pub_key: edge.rsa_pubkey.clone(), + token, + network_code: code, + hostname: edge.hostname.read().unwrap().clone(), + }; + // debug!("send register super: {:?}", register_super); + let packet_id = match pkt_id { + Some(id) => id, + None => edge.get_next_packet_id(), + }; + // let packet_id = edge.get_next_packet_id(); + let data = encode_to_tcp_message( + Some(register_super), + packet_id, + PacketType::RegisterSuper as u8, + ) + .unwrap(); + if let Err(e) = stream.write(&data).await { + error!("failed to write to tcp: {}", e.to_string()); + } +} pub struct ReadWriteActor { // actor接收的发送给tcp的接收端,由handle存放发送端 // to_tcp: Receiver>, @@ -63,18 +377,19 @@ impl ReadWriteActor { } } - pub async fn run<'a, T, T2, F>( + pub async fn run<'a>( &self, keep_reconnect: bool, mut to_tcp: Receiver>, - on_connected: T, - on_disconnected: T2, + // on_connected: OnConnectedCallback<'_>, + // on_disconnected: T2, mut start_stop_chan: Receiver, // cancel: CancellationToken, ) where - T: for<'b> Fn(&'b mut TcpStream, Option) -> BoxFuture<'b, ()>, - T2: Fn() -> F, - F: Future, + // T: Fn(&mut TcpStream, Option) -> F2, + // T2: Fn() -> F, + // F: Future, + // F2: Future, { // let (tx, rx) = channel(20); let mut started = false; @@ -140,7 +455,7 @@ impl ReadWriteActor { }; self.connected.store(true, Ordering::Relaxed); debug!("connected"); - on_connected(&mut stream, start_pkt_id.take()).await; + on_connected_callback(&mut stream, start_pkt_id.take()).await; if let Some(ref connecting_chan) = self.connecting_chan { let state = ConnectionInfo::ConnState(ConnectionState::Connected); @@ -226,7 +541,7 @@ impl ReadWriteActor { _ = check_pong => {}, _ = check_stop => {}, } - on_disconnected().await; + on_disconnected_callback().await; debug!("connect retrying"); tokio::time::sleep(Duration::from_secs(1)).await; debug!("disconnected"); @@ -258,12 +573,12 @@ impl ReadWriterHandle { Ok(()) } - fn new<'a, T, T3, T2, F, F2>( + fn new<>( cancel: CancellationToken, addr: &str, - on_connected: T, - on_disconnected: T3, - on_message: T2, + // on_connected: OnConnectedCallback<'a>, + // on_disconnected: T3, + // on_message: T2, pong_time: Arc, start_stop_chan: Receiver, // cancel: CancellationToken, @@ -271,11 +586,10 @@ impl ReadWriterHandle { ipv6_network_restarter: Option>, ) -> Self where - T: for<'b> Fn(&'b mut TcpStream, Option) -> BoxFuture<'b, ()> + Send + 'static, - T3: Fn() -> F2 + Send + 'static, - T2: Fn(SdlanTcp) -> F + Send + 'static, - F: Future + Send, - F2: Future + Send, + // T3: Fn() -> F2 + Send + 'static, + // T2: Fn(SdlanTcp) -> F + Send + 'static, + // F: Future + Send, + // F2: Future + Send, { let (send_to_tcp, to_tcp) = channel(20); let (from_tcp, mut data_from_tcp) = channel(20); @@ -292,13 +606,19 @@ impl ReadWriterHandle { ); tokio::spawn(async move { actor - .run(true, to_tcp, on_connected, on_disconnected, start_stop_chan) + .run( + true, + to_tcp, + // on_connected, + // on_disconnected, + start_stop_chan + ) .await }); tokio::spawn(async move { loop { if let Some(msg) = data_from_tcp.recv().await { - on_message(msg).await; + handle_tcp_message(msg).await; } else { error!("data from tcp exited"); // eprintln!("data from tcp exited"); @@ -315,30 +635,28 @@ impl ReadWriterHandle { } } -pub fn init_tcp_conn<'a, T, T3, T2, F, F2>( +pub fn init_tcp_conn( cancel: CancellationToken, addr: &str, - on_connected: T, - on_disconnected: T3, - on_message: T2, + // on_connected: OnConnectedCallback<'a>, + // on_disconnected: T3, + // on_message: T2, pong_time: Arc, // cancel: CancellationToken, start_stop_chan: Receiver, connecting_chan: Option>, ipv6_network_restarter: Option>, -) where - T: for<'b> Fn(&'b mut TcpStream, Option) -> BoxFuture<'b, ()> + Send + 'static, - T3: Fn() -> F2 + Send + 'static, - T2: Fn(SdlanTcp) -> F + Send + 'static, - F: Future + Send, - F2: Future + Send, +) + // T2: Fn(SdlanTcp) -> F + Send + 'static, + // F: Future + Send, { + let tcp_handle = ReadWriterHandle::new( cancel, addr, - on_connected, - on_disconnected, - on_message, + // on_connected, + // on_disconnected, + // on_message, pong_time, start_stop_chan, connecting_chan,