This commit is contained in:
anlicheng 2026-04-14 15:29:59 +08:00
parent 83f71711cf
commit 9be6402a63
3 changed files with 117 additions and 46 deletions

View File

@ -121,6 +121,7 @@ actor SDLContextActor {
self.startMonitor() self.startMonitor()
// arp // arp
await self.puncherActor.start()
await self.arpServer.start() await self.arpServer.start()
await self.startDnsClient() await self.startDnsClient()
await self.startDnsLocalClient() await self.startDnsLocalClient()
@ -369,6 +370,7 @@ actor SDLContextActor {
self.state = .unregistered self.state = .unregistered
await self.supervisor.stop() await self.supervisor.stop()
await self.puncherActor.stop()
self.udpHoleWorkers?.forEach { $0.cancel() } self.udpHoleWorkers?.forEach { $0.cancel() }
self.udpHoleWorkers = nil self.udpHoleWorkers = nil

View File

@ -11,18 +11,8 @@ import NIOCore
actor SDLPuncherActor { actor SDLPuncherActor {
// 10 // 10
nonisolated private let cooldownInterval: TimeInterval = 10 nonisolated private let cooldownInterval: TimeInterval = 10
// peerInfo
struct RequestContext { nonisolated private let peerInfoTimeout: TimeInterval = 3
let expireAt: Date
let request: RegisterRequest
func isExpired() -> Bool {
return expireAt < Date()
}
}
// dstMac
private var pendingRequests: [Data: RequestContext] = [:]
struct RegisterRequest { struct RegisterRequest {
let srcMac: Data let srcMac: Data
@ -30,57 +20,137 @@ actor SDLPuncherActor {
let networkId: UInt32 let networkId: UInt32
} }
private enum RequestPhase {
case waitingPeerInfo(deadline: Date)
case coolingDown
}
private struct RequestEntry {
let request: RegisterRequest
let cooldownUntil: Date
var phase: RequestPhase
func canSubmit(at now: Date) -> Bool {
return cooldownUntil <= now
}
func isWaitingPeerInfo(at now: Date) -> Bool {
guard case .waitingPeerInfo(let deadline) = self.phase else {
return false
}
return deadline > now
}
mutating func markCoolingDown() {
self.phase = .coolingDown
}
}
// dstMac
private var requestEntries: [Data: RequestEntry] = [:]
private var cleanupTask: Task<Void, Never>?
func start() {
guard self.cleanupTask == nil else {
return
}
self.cleanupTask = Task { [weak self] in
while !Task.isCancelled {
try? await Task.sleep(for: .seconds(1))
await self?.cleanupExpiredEntries()
}
}
}
func submitRegisterRequest(quicClient: SDLQUICClient?, request: RegisterRequest) { func submitRegisterRequest(quicClient: SDLQUICClient?, request: RegisterRequest) {
guard let quicClient else { guard let quicClient else {
return return
} }
// let now = Date()
let dstMac = request.dstMac self.cleanupExpiredEntries(now: now)
guard self.isRequestExpired(dstMac: dstMac) else {
if let entry = self.requestEntries[request.dstMac], !entry.canSubmit(at: now) {
return return
} }
self.pendingRequests[dstMac] = .init(expireAt: Date().addingTimeInterval(cooldownInterval), request: request)
//
var queryInfo = SDLQueryInfo() var queryInfo = SDLQueryInfo()
queryInfo.dstMac = request.dstMac queryInfo.dstMac = request.dstMac
if let queryData = try? queryInfo.serializedData() {
quicClient.send(type: .queryInfo, data: queryData) guard let queryData = try? queryInfo.serializedData() else {
SDLLogger.log("[SDLPuncherActor] failed to encode queryInfo", for: .debug)
return
} }
self.requestEntries[request.dstMac] = RequestEntry(
request: request,
cooldownUntil: now.addingTimeInterval(self.cooldownInterval),
phase: .waitingPeerInfo(deadline: now.addingTimeInterval(self.peerInfoTimeout))
)
quicClient.send(type: .queryInfo, data: queryData)
} }
func handlePeerInfo(using udpHole: SDLUDPHole?, peerInfo: SDLPeerInfo) async { func handlePeerInfo(using udpHole: SDLUDPHole?, peerInfo: SDLPeerInfo) async {
// ; let now = Date()
guard let requestContext = pendingRequests.removeValue(forKey: peerInfo.dstMac) else { self.cleanupExpiredEntries(now: now)
guard var entry = self.requestEntries[peerInfo.dstMac] else {
return return
} }
// guard entry.isWaitingPeerInfo(at: now) else {
guard let udpHole, peerInfo.hasV4Info else { return
}
entry.markCoolingDown()
self.requestEntries[peerInfo.dstMac] = entry
guard let udpHole else {
SDLLogger.log("[SDLPuncherActor] udpHole is nil when peerInfo arrived", for: .debug)
return
}
guard peerInfo.hasV4Info else {
SDLLogger.log("[SDLPuncherActor] peerInfo missing v4Info", for: .debug)
return
}
guard let remoteAddress = try? await peerInfo.v4Info.socketAddress() else {
SDLLogger.log("[SDLPuncherActor] failed to resolve peerInfo.v4Info", for: .debug)
return return
} }
if let remoteAddress = try? await peerInfo.v4Info.socketAddress() {
SDLLogger.log("[SDLContext] hole sock address: \(remoteAddress)", for: .punchnet) SDLLogger.log("[SDLContext] hole sock address: \(remoteAddress)", for: .punchnet)
// register
var register = SDLRegister()
register.networkID = requestContext.request.networkId
register.srcMac = requestContext.request.srcMac
register.dstMac = requestContext.request.dstMac
if let registerData = try? register.serializedData() { var register = SDLRegister()
register.networkID = entry.request.networkId
register.srcMac = entry.request.srcMac
register.dstMac = entry.request.dstMac
guard let registerData = try? register.serializedData() else {
SDLLogger.log("[SDLPuncherActor] failed to encode register", for: .debug)
return
}
udpHole.send(type: .register, data: registerData, remoteAddress: remoteAddress) udpHole.send(type: .register, data: registerData, remoteAddress: remoteAddress)
} }
func stop() {
self.cleanupTask?.cancel()
self.cleanupTask = nil
self.requestEntries.removeAll()
}
private func cleanupExpiredEntries(now: Date = Date()) {
self.requestEntries = self.requestEntries.filter { _, entry in
!entry.canSubmit(at: now)
} }
} }
// deinit {
func isRequestExpired(dstMac: Data) -> Bool { self.cleanupTask?.cancel()
if let context = pendingRequests[dstMac] {
return context.isExpired()
} }
return true
}
} }

View File

@ -149,15 +149,14 @@ actor DNSLocalClient {
let stream = Self.makeReceiveStream(for: conn) let stream = Self.makeReceiveStream(for: conn)
self.receiveTasks[key] = Task { [weak self] in self.receiveTasks[key] = Task { [weak self] in
guard let self else {
return
}
for await data in stream { for await data in stream {
guard let self else {
break
}
await self.handleResponse(data: data) await self.handleResponse(data: data)
} }
await self.didFinishReceiving(for: conn) await self?.didFinishReceiving(for: conn)
} }
} }