fix udp Hole

This commit is contained in:
anlicheng 2026-04-13 17:57:23 +08:00
parent 7ed993a775
commit 5739b59854

View File

@ -20,16 +20,16 @@ final class SDLUDPHole: ChannelInboundHandler {
public let messageStream: AsyncStream<(SocketAddress, SDLHoleMessage)> public let messageStream: AsyncStream<(SocketAddress, SDLHoleMessage)>
private let messageContinuation: AsyncStream<(SocketAddress, SDLHoleMessage)>.Continuation private let messageContinuation: AsyncStream<(SocketAddress, SDLHoleMessage)>.Continuation
// channelready
private var isReady: Bool = false
private var isStopped: Bool = false private var isStopped: Bool = false
// //
init() throws { init() throws {
(self.messageStream, self.messageContinuation) = AsyncStream.makeStream(of: (SocketAddress, SDLHoleMessage).self, bufferingPolicy: .unbounded) let (stream, continuation) = AsyncStream.makeStream(of: (SocketAddress, SDLHoleMessage).self, bufferingPolicy: .bufferingNewest(2048))
self.messageStream = stream
self.messageContinuation = continuation
} }
func start() throws -> SocketAddress? { func start() throws -> SocketAddress {
let bootstrap = DatagramBootstrap(group: group) let bootstrap = DatagramBootstrap(group: group)
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.channelInitializer { channel in .channelInitializer { channel in
@ -38,8 +38,9 @@ final class SDLUDPHole: ChannelInboundHandler {
let channel = try bootstrap.bind(host: "0.0.0.0", port: 0).wait() let channel = try bootstrap.bind(host: "0.0.0.0", port: 0).wait()
self.channel = channel self.channel = channel
precondition(channel.localAddress != nil, "UDP channel has no localAddress after bind")
return channel.localAddress return channel.localAddress!
} }
func waitClose() async throws { func waitClose() async throws {
@ -54,12 +55,14 @@ final class SDLUDPHole: ChannelInboundHandler {
self.isStopped = true self.isStopped = true
self.messageContinuation.finish() self.messageContinuation.finish()
self.channel?.close(promise: nil) self.channel?.close(promise: nil)
self.channel = nil
try? self.group.syncShutdownGracefully()
} }
// --MARK: ChannelInboundHandler delegate // --MARK: ChannelInboundHandler delegate
func channelActive(context: ChannelHandlerContext) { func channelActive(context: ChannelHandlerContext) {
self.isReady = true
} }
func channelRead(context: ChannelHandlerContext, data: NIOAny) { func channelRead(context: ChannelHandlerContext, data: NIOAny) {
@ -69,26 +72,27 @@ final class SDLUDPHole: ChannelInboundHandler {
let remoteAddress = envelope.remoteAddress let remoteAddress = envelope.remoteAddress
if let rawBytes = buffer.getBytes(at: buffer.readerIndex, length: buffer.readableBytes) { if let rawBytes = buffer.getBytes(at: buffer.readerIndex, length: buffer.readableBytes) {
SDLLogger.log("[SDLUDPHole] get raw bytes: \(rawBytes.count), from: \(remoteAddress)", for: .punchnet) SDLLogger.log("[SDLUDPHole] get raw bytes: \(rawBytes.count), from: \(remoteAddress)", for: .debug)
} }
do { do {
if let message = try decode(buffer: &buffer) { if let message = try decode(buffer: &buffer) {
self.messageContinuation.yield((remoteAddress, message)) self.messageContinuation.yield((remoteAddress, message))
} else { } else {
SDLLogger.log("[SDLUDPHole] decode message, get null", for: .punchnet) SDLLogger.log("[SDLUDPHole] decode message, get null", for: .debug)
} }
} catch let err { } catch let err {
SDLLogger.log("[SDLUDPHole] decode message, get error: \(err)", for: .punchnet) SDLLogger.log("[SDLUDPHole] decode message, get error: \(err)", for: .debug)
} }
} }
func channelInactive(context: ChannelHandlerContext) { func channelInactive(context: ChannelHandlerContext) {
self.messageContinuation.finish() self.channel = nil
context.close(promise: nil) context.close(promise: nil)
} }
func errorCaught(context: ChannelHandlerContext, error: any Error) { func errorCaught(context: ChannelHandlerContext, error: any Error) {
self.channel = nil
context.close(promise: nil) context.close(promise: nil)
} }
@ -151,12 +155,10 @@ final class SDLUDPHole: ChannelInboundHandler {
return nil return nil
} }
} }
deinit { deinit {
self.stop() self.stop()
try? self.group.syncShutdownGracefully()
} }
} }