diff --git a/Cargo.lock b/Cargo.lock index af72354..c01c03d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2134,6 +2134,7 @@ dependencies = [ name = "punchnet" version = "1.0.3" dependencies = [ + "ahash", "bytes", "cargo-deb", "chacha20poly1305", diff --git a/Cargo.toml b/Cargo.toml index b3880d0..7cbe37e 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ chacha20poly1305 = "0.10.1" hmac = "0.12.1" md-5 = "0.10.6" hex = "0.4.3" +ahash = "0.8.12" # rolling-file = { path = "../rolling-file" } [target.'cfg(unix)'.dependencies] diff --git a/src/network/node.rs b/src/network/node.rs index 1ee1578..19c3929 100755 --- a/src/network/node.rs +++ b/src/network/node.rs @@ -13,7 +13,7 @@ use tokio::sync::oneshot; use tracing::{debug, error}; use crate::quic::quic_init; -use crate::{ConnectionInfo, MyEncryptor, get_base_dir}; +use crate::{ConnectionInfo, MyEncryptor, RuleCache, get_base_dir}; use crate::pb::{ encode_to_tcp_message, encode_to_udp_message, SdlEmpty, SdlStunProbe, SdlStunProbeReply, }; @@ -169,6 +169,8 @@ pub struct Node { pub identity_id: IdentityID, + pub rule_cache: RuleCache, + pub access_token: StringToken, pub session_token: StringToken>, @@ -391,6 +393,8 @@ impl Node { network_id: AtomicU32::new(0), hostname: RwLock::new(hostname), + rule_cache: RuleCache::new(), + network_domain: RwLock::new(String::new()), udp_sock_for_dns: udpsock_for_dns, diff --git a/src/utils/acl_session.rs b/src/utils/acl_session.rs new file mode 100644 index 0000000..98daa11 --- /dev/null +++ b/src/utils/acl_session.rs @@ -0,0 +1,159 @@ +use std::{net::IpAddr, sync::{Arc, atomic::{AtomicU64, Ordering}}, time::{Duration, Instant, SystemTime, UNIX_EPOCH}}; + +use ahash::RandomState; +use dashmap::{DashMap, DashSet}; +use tracing::debug; + +const RuleValidTimeInSecs: u64 = 60; + +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct FiveTuple { + pub src_ip: IpAddr, + pub dst_ip: IpAddr, + pub src_port: u16, + pub dst_port: u16, + pub proto: u8, +} + +impl FiveTuple { + pub fn reverse(&self) -> Self { + Self { + src_ip: self.dst_ip, + dst_ip: self.src_ip, + dst_port: self.src_port, + src_port: self.dst_port, + proto: self.proto, + } + } +} + +pub struct SessionInfo { + pub last_active: AtomicU64, +} + +pub struct SessionTable { + table: Arc>, + timeout_secs: u64, +} + +fn now_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + +impl SessionTable { + pub fn new(timeout_secs: u64) -> Self { + Self { + table: Arc::new(DashMap::with_hasher(RandomState::new())), + timeout_secs, + } + } + + pub fn add_session_info(&self, key: FiveTuple) { + if let Some(info) = self.table.get(&key) { + info.last_active.store(now_secs(), Ordering::Relaxed); + return; + } + self.table.insert(key, SessionInfo { last_active: AtomicU64::new(now_secs()) }); + } + + pub fn process_packet(&self, key: &FiveTuple) -> bool { + if let Some(session) = self.table.get_mut(&key) { + session.last_active.store(now_secs(), Ordering::Relaxed); + return true; + } + + false + } + + pub fn retain(&self) { + let now = now_secs(); + self.table.retain(|_, info|{ + let last = info.last_active.load(Ordering::Relaxed); + now-last < self.timeout_secs + }); + } +} + +// ip's u32 representation +type IdentityID = u32; +// port representation +type Port = u16; +// proto, like icmp = 1, udp=16 +type Proto = u8; + +type RuleInfo = (Port, Proto); + + + +#[derive(Debug)] +pub struct RuleFromServer { + pub proto: Proto, + pub port: Port, +} + +pub struct RuleCache { + pub rule_info: DashMap), RandomState>, + pub rule_valid_secs: u64, + pub session_table: Arc, +} + +type ShouldRenew = bool; + +impl RuleCache { + pub fn new() -> Self { + let session_table= Arc::new(SessionTable::new(300)); + let table_cleaner = Arc::clone(&session_table); + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(20)).await; + table_cleaner.retain(); + } + }); + Self { + rule_info: DashMap::with_hasher(RandomState::new()), + rule_valid_secs: RuleValidTimeInSecs, + session_table, + } + } + + pub fn set_identity_cache(&self, identity: IdentityID, infos: Vec) { + debug!("setting identity cache for identity={}, infos: {:?}", identity, infos); + + let now = now_secs(); + + let now_sets = DashSet::with_hasher(RandomState::new()); + for info in &infos { + // let mut protomap = HashSet::new(); + now_sets.insert((info.port, info.proto)); + } + self.rule_info.insert(identity, (AtomicU64::new(now) , now_sets)); + } + + fn handle_packet_in(&self, info: FiveTuple) { + self.session_table.add_session_info(info); + } + + pub fn is_identity_ok(&self, identity: IdentityID, info: FiveTuple) -> (bool, ShouldRenew) { + if self.session_table.process_packet(&info) { + return (true, false); + } + + let now = now_secs(); + if let Some(sets_info) = self.rule_info.get(&identity) { + let last_set_time = sets_info.0.load(Ordering::Relaxed); + if sets_info.1.contains(&(info.dst_port, info.proto)) { + if (now - last_set_time) > self.rule_valid_secs { + return (true, true); + } + return (true, false); + } + } + + self.rule_info.insert(identity, (AtomicU64::new(now), DashSet::with_hasher(RandomState::new()))); + + (false, true) + } +} \ No newline at end of file diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 8aa2372..dea7fb0 100755 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,10 +1,12 @@ mod command; +mod acl_session; mod encrypter; use std::{fs::OpenOptions, io::Write, net::Ipv4Addr, path::Path}; pub use encrypter::*; pub use command::*; +pub use acl_session::*; mod socks; use rand::Rng;