diff --git a/Sources/Punchnet/SDLContext.swift b/Sources/Punchnet/SDLContext.swift index 8d321a8..9ee5d23 100644 --- a/Sources/Punchnet/SDLContext.swift +++ b/Sources/Punchnet/SDLContext.swift @@ -124,7 +124,6 @@ public class SDLContext: @unchecked Sendable { try await self.handleUDPEvent(event: event) } } - } group.addTask { diff --git a/Sources/Punchnet/SDLSuperClient.swift b/Sources/Punchnet/SDLSuperClient.swift index 5a0999b..7493e74 100644 --- a/Sources/Punchnet/SDLSuperClient.swift +++ b/Sources/Punchnet/SDLSuperClient.swift @@ -58,12 +58,15 @@ actor SDLSuperClient { func start() async throws { try await self.asyncChannel.executeThenClose { inbound, outbound in + self.inboundContinuation.yield(.ready) try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { - defer { - self.inboundContinuation.finish() - } - + try await self.asyncChannel.channel.closeFuture.get() + self.inboundContinuation.yield(.closed) + self.inboundContinuation.finish() + } + + group.addTask { for try await var packet in inbound { if let message = SDLSuperClientDecoder.decode(buffer: &packet) { SDLLogger.log("[SDLSuperTransport] read message: \(message)", level: .warning) diff --git a/Sources/Punchnet/SDLUDPHole.swift b/Sources/Punchnet/SDLUDPHole.swift index a6db05b..b591eef 100644 --- a/Sources/Punchnet/SDLUDPHole.swift +++ b/Sources/Punchnet/SDLUDPHole.swift @@ -19,13 +19,13 @@ struct UDPMessage { actor SDLUDPHole { private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) private let asyncChannel: NIOAsyncChannel, AddressedEnvelope> - private let (writeStream, continuation) = AsyncStream.makeStream(of: UDPMessage.self, bufferingPolicy: .unbounded) + private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: UDPMessage.self, bufferingPolicy: .unbounded) private var cookieGenerator = SDLIdGenerator(seed: 1) private var promises: [UInt32:EventLoopPromise] = [:] public var localAddress: SocketAddress? - public let (eventFlow, inboundContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded) + public let (eventFlow, eventContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded) // 定义事件类型 enum UDPEvent { @@ -55,12 +55,16 @@ actor SDLUDPHole { func start() async throws { try await self.asyncChannel.executeThenClose { inbound, outbound in + self.eventContinuation.yield(.ready) + try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { - defer { - self.inboundContinuation.finish() - } - + try await self.asyncChannel.channel.closeFuture.get() + self.eventContinuation.yield(.closed) + self.eventContinuation.finish() + } + + group.addTask { for try await envelope in inbound { var buffer = envelope.data let remoteAddress = envelope.remoteAddress @@ -69,12 +73,12 @@ actor SDLUDPHole { switch message { case .data(let data): SDLLogger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug) - self.inboundContinuation.yield(.data(data)) + self.eventContinuation.yield(.data(data)) case .stunProbeReply(let probeReply): // 执行并移除回调 await self.trigger(probeReply: probeReply) default: - self.inboundContinuation.yield(.message(remoteAddress, message)) + self.eventContinuation.yield(.message(remoteAddress, message)) } } else { SDLLogger.log("[SDLUDPHole] decode message, get null", level: .warning) @@ -87,7 +91,7 @@ actor SDLUDPHole { group.addTask { defer { - self.continuation.finish() + self.writeContinuation.finish() } for try await message in self.writeStream { @@ -99,11 +103,6 @@ actor SDLUDPHole { try await outbound.write(envelope) } } - - - - //eventFlow.send(.ready) - try await group.waitForAll() } } @@ -224,7 +223,7 @@ actor SDLUDPHole { // 处理写入逻辑 private func send(remoteAddress: SocketAddress, type: SDLPacketType, data: Data) { let message = UDPMessage(remoteAddress: remoteAddress, type: type, data: data) - self.continuation.yield(message) + self.writeContinuation.yield(message) } //--MARK: 编解码器 @@ -260,5 +259,4 @@ actor SDLUDPHole { deinit { try? self.group.syncShutdownGracefully() } - }