fix session

This commit is contained in:
anlicheng 2026-03-10 15:01:32 +08:00
parent 4cccd411e0
commit 6bc0f82169
3 changed files with 209 additions and 67 deletions

View File

@ -78,6 +78,9 @@ actor SDLContextActor {
private var updatePolicyTask: Task<Void, Never>? private var updatePolicyTask: Task<Void, Never>?
private let snapshotPublisher: SnapshotPublisher<IdentitySnapshot> private let snapshotPublisher: SnapshotPublisher<IdentitySnapshot>
// Flow : 180
private let flowSessionManager = SDLFlowSessionManager(sessionTimeout: 180)
// //
private var registerTask: Task<Void, Never>? private var registerTask: Task<Void, Never>?
@ -524,7 +527,6 @@ actor SDLContextActor {
} }
let mac = LayerPacket.MacAddress(data: data.dstMac) let mac = LayerPacket.MacAddress(data: data.dstMac)
let networkAddr = config.networkAddress let networkAddr = config.networkAddress
guard (data.dstMac == networkAddr.mac || mac.isBroadcast() || mac.isMulticast()) else { guard (data.dstMac == networkAddr.mac || mac.isBroadcast() || mac.isMulticast()) else {
return return
@ -565,30 +567,14 @@ actor SDLContextActor {
// //
let identitySnapshot = self.snapshotPublisher.current() let identitySnapshot = self.snapshotPublisher.current()
if let ruleMap = identitySnapshot.lookup(data.identityID) { let ruleMap = identitySnapshot.lookup(data.identityID)
let proto = ipPacket.header.proto
switch ipPacket.transportPacket() { if self.authIPPacket(ipPacket: ipPacket, ruleMap: ruleMap) {
case .tcp(let tcpPacket):
let dstPort = tcpPacket.header.dstPort
if ruleMap.isAllow(proto: proto, port: dstPort) {
let packet = NEPacket(data: ipPacket.data, protocolFamily: 2) let packet = NEPacket(data: ipPacket.data, protocolFamily: 2)
self.provider.packetFlow.writePacketObjects([packet]) self.provider.packetFlow.writePacketObjects([packet])
SDLLogger.shared.log("[SDLContext] identity: \(data.identityID), ruleMap: \(ruleMap), dstPort: \(dstPort) allow", level: .debug) SDLLogger.shared.log("[SDLContext] identity: \(data.identityID), allow", level: .debug)
} }
case .udp(let udpPacket): else {
let dstPort = udpPacket.dstPort
if ruleMap.isAllow(proto: proto, port: dstPort) {
let packet = NEPacket(data: ipPacket.data, protocolFamily: 2)
self.provider.packetFlow.writePacketObjects([packet])
SDLLogger.shared.log("[SDLContext] identity: \(data.identityID), ruleMap: \(ruleMap), dstPort: \(dstPort) allow", level: .debug)
}
case .icmp(_):
let packet = NEPacket(data: ipPacket.data, protocolFamily: 2)
self.provider.packetFlow.writePacketObjects([packet])
default:
()
}
} else {
SDLLogger.shared.log("[SDLContext] not found identity: \(data.identityID) ruleMap", level: .debug) SDLLogger.shared.log("[SDLContext] not found identity: \(data.identityID) ruleMap", level: .debug)
// //
await self.identifyStore.policyRequest(srcIdentityId: data.identityID, dstIdentityId: self.config.identityId, using: self.quicClient) await self.identifyStore.policyRequest(srcIdentityId: data.identityID, dstIdentityId: self.config.identityId, using: self.quicClient)
@ -598,6 +584,35 @@ actor SDLContextActor {
} }
} }
private func authIPPacket(ipPacket: IPPacket, ruleMap: IdentityRuleMap?) -> Bool {
//
if let reverseFlowSession = ipPacket.flowSession()?.reverse(),
self.flowSessionManager.hasSession(reverseFlowSession) {
self.flowSessionManager.updateSession(reverseFlowSession)
return true
}
//
let proto = ipPacket.header.proto
// 访
switch ipPacket.transportPacket {
case .tcp(let tcpPacket):
if let ruleMap, ruleMap.isAllow(proto: proto, port: tcpPacket.header.dstPort) {
return true
}
case .udp(let udpPacket):
if let ruleMap, ruleMap.isAllow(proto: proto, port: udpPacket.dstPort) {
return true
}
case .icmp(_):
return true
default:
return false
}
return false
}
// //
// public func flowReportTask() { // public func flowReportTask() {
// Task { // Task {
@ -651,6 +666,12 @@ actor SDLContextActor {
return return
} }
// FlowSession
//
if let flowSession = packet.flowSession() {
self.flowSessionManager.updateSession(flowSession)
}
// arpmac // arpmac
if let dstMac = await self.arpServer.query(ip: dstIp) { if let dstMac = await self.arpServer.query(ip: dstIp) {
await self.routeLayerPacket(dstMac: dstMac, type: .ipv4, data: packet.data) await self.routeLayerPacket(dstMac: dstMac, type: .ipv4, data: packet.data)

View File

@ -44,6 +44,40 @@ struct IPPacket {
let header: IPHeader let header: IPHeader
let data: Data let data: Data
enum TransportPacket {
case tcp(TCPPacket)
case udp(UDPPacket)
case icmp(ICMPPacket)
case unsupported(UInt8)
case malformed
}
var transportPacket: TransportPacket {
guard let proto = TransportProtocol(rawValue: header.proto) else {
return .unsupported(header.proto)
}
switch proto {
case .tcp:
guard let tcp = TCPPacket(payload) else {
return .malformed
}
return .tcp(tcp)
case .udp:
guard let udp = UDPPacket(payload) else {
return .malformed
}
return .udp(udp)
case .icmp:
guard let icmp = ICMPPacket(payload) else {
return .malformed
}
return .icmp(icmp)
}
}
var payload: Data.SubSequence { var payload: Data.SubSequence {
let offset = Int(header.headerLength) let offset = Int(header.headerLength)
@ -161,7 +195,7 @@ struct TCPPacket {
} }
self.header = header self.header = header
self.payload = data.subdata(in: headerLen..<data.count) self.payload = data[headerLen..<data.count]
} }
} }
@ -185,7 +219,7 @@ struct UDPPacket {
self.length = UInt16(bytes: (data[4], data[5])) self.length = UInt16(bytes: (data[4], data[5]))
self.checksum = UInt16(bytes: (data[6], data[7])) self.checksum = UInt16(bytes: (data[6], data[7]))
self.payload = data.subdata(in: 8..<data.count) self.payload = data[8..<data.count]
} }
} }
@ -205,45 +239,6 @@ struct ICMPPacket {
self.type = data[0] self.type = data[0]
self.code = data[1] self.code = data[1]
self.checksum = UInt16(bytes: (data[2], data[3])) self.checksum = UInt16(bytes: (data[2], data[3]))
self.payload = data.subdata(in: 4..<data.count) self.payload = data[4..<data.count]
} }
} }
// MARK: - IPPacket Transport Parsing
extension IPPacket {
enum TransportPacket {
case tcp(TCPPacket)
case udp(UDPPacket)
case icmp(ICMPPacket)
case unsupported(UInt8)
case malformed
}
func transportPacket() -> TransportPacket {
guard let proto = TransportProtocol(rawValue: header.proto) else {
return .unsupported(header.proto)
}
switch proto {
case .tcp:
guard let tcp = TCPPacket(payload) else {
return .malformed
}
return .tcp(tcp)
case .udp:
guard let udp = UDPPacket(payload) else {
return .malformed
}
return .udp(udp)
case .icmp:
guard let icmp = ICMPPacket(payload) else {
return .malformed
}
return .icmp(icmp)
}
}
}

View File

@ -0,0 +1,126 @@
//
// FiveTuple.swift
// punchnet
// tcp/udp Flow
// Created by on 2026/3/10.
//
import Foundation
// MARK: - key
struct FlowSession: Hashable {
let srcIP: UInt32
let dstIP: UInt32
let srcPort: UInt16
let dstPort: UInt16
let proto: UInt8
func hash(into hasher: inout Hasher) {
// hash
hasher.combine(srcIP)
hasher.combine(dstIP)
hasher.combine(UInt32(srcPort) << 16 | UInt32(dstPort))
hasher.combine(proto)
}
static func ==(lhs: Self, rhs: Self) -> Bool {
return lhs.srcIP == rhs.srcIP &&
lhs.dstIP == rhs.dstIP &&
lhs.srcPort == rhs.srcPort &&
lhs.dstPort == rhs.dstPort &&
lhs.proto == rhs.proto
}
func reverse() -> FlowSession {
return FlowSession(
srcIP: dstIP,
dstIP: srcIP,
srcPort: dstPort,
dstPort: srcPort,
proto: proto
)
}
}
// MARK: -
final class SDLFlowSessionManager {
private var sessions: [FlowSession: TimeInterval] = [:]
private let lock = NSLock()
private let sessionTimeout: TimeInterval
/// - Parameter sessionTimeout:
init(sessionTimeout: TimeInterval = 300) {
self.sessionTimeout = sessionTimeout
}
//
func updateSession(_ key: FlowSession) {
lock.lock()
defer {
lock.unlock()
}
sessions[key] = Date().timeIntervalSince1970 + sessionTimeout
}
//
func hasSession(_ key: FlowSession) -> Bool {
lock.lock()
defer {
lock.unlock()
}
if let expireTs = sessions[key] {
if expireTs >= Date().timeIntervalSince1970 {
return true
}
self.sessions.removeValue(forKey: key)
}
return false
}
//
func removeSession(_ key: FlowSession) {
lock.lock()
defer {
lock.unlock()
}
sessions.removeValue(forKey: key)
}
//
func cleanupExpiredSessions() {
lock.lock()
defer {
lock.unlock()
}
let now = Date().timeIntervalSince1970
self.sessions = self.sessions.filter { $0.value >= now }
}
// /
var count: Int {
lock.lock()
defer {
lock.unlock()
}
return sessions.count
}
}
extension IPPacket {
func flowSession() -> FlowSession? {
switch self.transportPacket {
case .tcp(let tcpPacket):
return FlowSession(srcIP: header.source, dstIP: header.destination, srcPort: tcpPacket.header.srcPort, dstPort: tcpPacket.header.dstPort, proto: header.proto)
case .udp(let udpPacket):
return FlowSession(srcIP: header.source, dstIP: header.destination, srcPort: udpPacket.srcPort, dstPort: udpPacket.dstPort, proto: header.proto)
default:
return nil
}
}
}