This commit is contained in:
anlicheng 2026-02-03 13:35:02 +08:00
parent 2f9920ad6d
commit 478969d99d
2 changed files with 59 additions and 53 deletions

View File

@ -23,7 +23,7 @@ actor SDLContextActor {
nonisolated let aesCipher: AESCipher nonisolated let aesCipher: AESCipher
// aes // aes
var aesKey: Data = Data() private var aesKey: Data?
// rsa, public_key // rsa, public_key
nonisolated let rsaCipher: RSACipher nonisolated let rsaCipher: RSACipher
@ -127,6 +127,9 @@ actor SDLContextActor {
self.logger.log("[SDLContext] udpHole started") self.logger.log("[SDLContext] udpHole started")
self.udpHole = udpHole self.udpHole = udpHole
await udpHole.channelIsActived()
await self.handleUDPHoleReady()
try await withThrowingTaskGroup(of: Void.self) { group in try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { group.addTask {
try await channel.closeFuture.get() try await channel.closeFuture.get()
@ -142,19 +145,6 @@ actor SDLContextActor {
} }
} }
// event
group.addTask {
for try await event in udpHole.eventStream {
try Task.checkCancellation()
switch event {
case .ready:
await self.handleUDPHoleReady()
case .closed:
()
}
}
}
// //
group.addTask { group.addTask {
for try await data in udpHole.dataStream { for try await data in udpHole.dataStream {
@ -203,22 +193,24 @@ actor SDLContextActor {
self.natType = natType self.natType = natType
} }
// super
private func handleUDPHoleReady() async { private func handleUDPHoleReady() async {
if let udpHole = self.udpHole { guard let udpHole = self.udpHole else {
self.puncherActor = SDLPuncherActor(udpHole: udpHole, querySocketAddress: config.stunSocketAddress, logger: logger) return
self.proberActor = SDLNATProberActor(udpHole: udpHole, addressArray: self.config.stunProbeSocketAddressArray, logger: self.logger)
} }
await withDiscardingTaskGroup { group in self.puncherActor = SDLPuncherActor(udpHole: udpHole, querySocketAddress: config.stunSocketAddress, logger: logger)
group.addTask { self.proberActor = SDLNATProberActor(udpHole: udpHole, addressArray: self.config.stunProbeSocketAddressArray, logger: self.logger)
// nat // nat
Task.detached {
if let natType = await self.proberActor?.probeNatType() { if let natType = await self.proberActor?.probeNatType() {
await self.setNatType(natType: natType) await self.setNatType(natType: natType)
self.logger.log("[SDLContext] nat_type is: \(natType)") self.logger.log("[SDLContext] nat_type is: \(natType)")
} }
} }
group.addTask { //
var registerSuper = SDLRegisterSuper() var registerSuper = SDLRegisterSuper()
registerSuper.pktID = 0 registerSuper.pktID = 0
registerSuper.clientID = self.config.clientId registerSuper.clientID = self.config.clientId
@ -232,9 +224,7 @@ actor SDLContextActor {
if let registerSuperData = try? registerSuper.serializedData() { if let registerSuperData = try? registerSuper.serializedData() {
self.logger.log("[SDLContext] will send register super") self.logger.log("[SDLContext] will send register super")
await self.udpHole?.send(type: .registerSuper, data: registerSuperData, remoteAddress: self.config.stunSocketAddress) self.udpHole?.send(type: .registerSuper, data: registerSuperData, remoteAddress: self.config.stunSocketAddress)
}
}
} }
} }
@ -348,6 +338,10 @@ actor SDLContextActor {
} }
private func handleData(data: SDLData) throws { private func handleData(data: SDLData) throws {
guard let aesKey = self.aesKey else {
return
}
let mac = LayerPacket.MacAddress(data: data.dstMac) let mac = LayerPacket.MacAddress(data: data.dstMac)
let networkAddr = config.networkAddress let networkAddr = config.networkAddress
@ -355,7 +349,7 @@ actor SDLContextActor {
return return
} }
guard let decyptedData = try? self.aesCipher.decypt(aesKey: self.aesKey, data: Data(data.data)) else { guard let decyptedData = try? self.aesCipher.decypt(aesKey: aesKey, data: Data(data.data)) else {
return return
} }
@ -477,7 +471,7 @@ actor SDLContextActor {
let networkAddr = self.config.networkAddress let networkAddr = self.config.networkAddress
// 2 // 2
let layerPacket = LayerPacket(dstMac: dstMac, srcMac: networkAddr.mac, type: type, data: data) let layerPacket = LayerPacket(dstMac: dstMac, srcMac: networkAddr.mac, type: type, data: data)
guard let encodedPacket = try? self.aesCipher.encrypt(aesKey: self.aesKey, data: layerPacket.marshal()) else { guard let aesKey = self.aesKey, let encodedPacket = try? self.aesCipher.encrypt(aesKey: aesKey, data: layerPacket.marshal()) else {
return return
} }

View File

@ -31,8 +31,9 @@ final class SDLUDPHole: ChannelInboundHandler {
public let dataStream: AsyncStream<SDLData> public let dataStream: AsyncStream<SDLData>
private let dataContinuation: AsyncStream<SDLData>.Continuation private let dataContinuation: AsyncStream<SDLData>.Continuation
public let eventStream: AsyncStream<HoleEvent> // channelready
private let eventContinuation: AsyncStream<HoleEvent>.Continuation private var cont: CheckedContinuation<Void, Never>?
private var isReady: Bool = false
private let logger: SDLLogger private let logger: SDLLogger
@ -46,7 +47,6 @@ final class SDLUDPHole: ChannelInboundHandler {
self.logger = logger self.logger = logger
(self.signalStream, self.signalContinuation) = AsyncStream.makeStream(of: (SocketAddress, SDLHoleSignal).self, bufferingPolicy: .unbounded) (self.signalStream, self.signalContinuation) = AsyncStream.makeStream(of: (SocketAddress, SDLHoleSignal).self, bufferingPolicy: .unbounded)
(self.dataStream, self.dataContinuation) = AsyncStream.makeStream(of: SDLData.self, bufferingPolicy: .unbounded) (self.dataStream, self.dataContinuation) = AsyncStream.makeStream(of: SDLData.self, bufferingPolicy: .unbounded)
(self.eventStream, self.eventContinuation) = AsyncStream.makeStream(of: HoleEvent.self, bufferingPolicy: .unbounded)
} }
func start() throws -> Channel { func start() throws -> Channel {
@ -63,10 +63,25 @@ final class SDLUDPHole: ChannelInboundHandler {
return channel return channel
} }
func channelIsActived() async {
await withCheckedContinuation { c in
if isReady {
c.resume()
} else {
self.cont = c
}
}
}
// --MARK: ChannelInboundHandler delegate // --MARK: ChannelInboundHandler delegate
func channelActive(context: ChannelHandlerContext) { func channelActive(context: ChannelHandlerContext) {
self.eventContinuation.yield(.ready) guard !isReady else {
return
}
self.isReady = true
self.cont?.resume()
self.cont = nil
} }
func channelRead(context: ChannelHandlerContext, data: NIOAny) { func channelRead(context: ChannelHandlerContext, data: NIOAny) {
@ -93,8 +108,6 @@ final class SDLUDPHole: ChannelInboundHandler {
func channelInactive(context: ChannelHandlerContext) { func channelInactive(context: ChannelHandlerContext) {
self.signalContinuation.finish() self.signalContinuation.finish()
self.dataContinuation.finish() self.dataContinuation.finish()
self.eventContinuation.yield(.closed)
self.eventContinuation.finish()
context.close(promise: nil) context.close(promise: nil)
} }
@ -196,7 +209,6 @@ final class SDLUDPHole: ChannelInboundHandler {
deinit { deinit {
try? self.group.syncShutdownGracefully() try? self.group.syncShutdownGracefully()
self.eventContinuation.finish()
self.channel?.close(promise: nil) self.channel?.close(promise: nil)
} }