fix SDLUDPHole

This commit is contained in:
anlicheng 2026-04-14 15:47:02 +08:00
parent 9be6402a63
commit 9d3a8af0aa

View File

@ -14,7 +14,7 @@ import SwiftProtobuf
final class SDLUDPHole: ChannelInboundHandler { final class SDLUDPHole: ChannelInboundHandler {
typealias InboundIn = AddressedEnvelope<ByteBuffer> typealias InboundIn = AddressedEnvelope<ByteBuffer>
private enum State { private enum State: Equatable {
case idle case idle
case ready case ready
case stopping case stopping
@ -23,21 +23,15 @@ final class SDLUDPHole: ChannelInboundHandler {
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private var channel: Channel? private var channel: Channel?
private var closeFuture: EventLoopFuture<Void>?
private var state: State = .idle private var state: State = .idle
private var didFinishMessageStream: Bool = false private var didFinishMessageStream: Bool = false
private let closeStream: AsyncStream<Void>
private let closeContinuation: AsyncStream<Void>.Continuation
public let messageStream: AsyncStream<(SocketAddress, SDLHoleMessage)> public let messageStream: AsyncStream<(SocketAddress, SDLHoleMessage)>
private let messageContinuation: AsyncStream<(SocketAddress, SDLHoleMessage)>.Continuation private let messageContinuation: AsyncStream<(SocketAddress, SDLHoleMessage)>.Continuation
// //
init() throws { init() throws {
let (closeStream, closeContinuation) = AsyncStream.makeStream(of: Void.self, bufferingPolicy: .bufferingNewest(1))
self.closeStream = closeStream
self.closeContinuation = closeContinuation
let (stream, continuation) = AsyncStream.makeStream(of: (SocketAddress, SDLHoleMessage).self, bufferingPolicy: .bufferingNewest(2048)) let (stream, continuation) = AsyncStream.makeStream(of: (SocketAddress, SDLHoleMessage).self, bufferingPolicy: .bufferingNewest(2048))
self.messageStream = stream self.messageStream = stream
self.messageContinuation = continuation self.messageContinuation = continuation
@ -65,6 +59,7 @@ final class SDLUDPHole: ChannelInboundHandler {
let channel = try bootstrap.bind(host: "0.0.0.0", port: 0).wait() let channel = try bootstrap.bind(host: "0.0.0.0", port: 0).wait()
self.channel = channel self.channel = channel
self.closeFuture = channel.closeFuture
self.state = .ready self.state = .ready
precondition(channel.localAddress != nil, "UDP channel has no localAddress after bind") precondition(channel.localAddress != nil, "UDP channel has no localAddress after bind")
@ -72,7 +67,15 @@ final class SDLUDPHole: ChannelInboundHandler {
} }
func waitClose() async throws { func waitClose() async throws {
for await _ in self.closeStream { } switch self.state {
case .idle:
return
case .ready, .stopping, .stopped:
guard let closeFuture = self.closeFuture else {
return
}
try await closeFuture.get()
}
} }
func stop() { func stop() {
@ -82,7 +85,6 @@ final class SDLUDPHole: ChannelInboundHandler {
case .idle: case .idle:
self.state = .stopped self.state = .stopped
self.finishMessageStream() self.finishMessageStream()
self.closeContinuation.finish()
return return
case .ready: case .ready:
self.state = .stopping self.state = .stopping
@ -90,7 +92,6 @@ final class SDLUDPHole: ChannelInboundHandler {
self.finishMessageStream() self.finishMessageStream()
self.channel?.close(promise: nil) self.channel?.close(promise: nil)
self.channel = nil
} }
// --MARK: ChannelInboundHandler delegate // --MARK: ChannelInboundHandler delegate
@ -124,17 +125,14 @@ final class SDLUDPHole: ChannelInboundHandler {
self.finishMessageStream() self.finishMessageStream()
self.channel = nil self.channel = nil
self.state = .stopped self.state = .stopped
self.closeContinuation.yield(())
self.closeContinuation.finish()
context.close(promise: nil)
} }
func errorCaught(context: ChannelHandlerContext, error: any Error) { func errorCaught(context: ChannelHandlerContext, error: any Error) {
SDLLogger.log("[SDLUDPHole] channel error: \(error)", for: .debug)
self.finishMessageStream() self.finishMessageStream()
self.channel = nil if self.state != .stopped {
self.state = .stopped self.state = .stopping
self.closeContinuation.yield(()) }
self.closeContinuation.finish()
context.close(promise: nil) context.close(promise: nil)
} }