diff --git a/Sources/Punchnet/SDLLogger.swift b/Sources/Punchnet/SDLLogger.swift index 1724c2d..433432a 100644 --- a/Sources/Punchnet/SDLLogger.swift +++ b/Sources/Punchnet/SDLLogger.swift @@ -4,8 +4,8 @@ // // Created by 安礼成 on 2024/3/13. // - import Foundation +import os.log public class SDLLogger: @unchecked Sendable { public enum Level: Int8, CustomStringConvertible { @@ -29,14 +29,16 @@ public class SDLLogger: @unchecked Sendable { } private let level: Level + private let log: OSLog public init(level: Level) { self.level = level + self.log = OSLog(subsystem: "com.jihe.punchnet", category: "punchnet") } public func log(_ message: String, level: Level = .debug) { if self.level.rawValue <= level.rawValue { - NSLog("\(level.description): \(message)") + os_log("%{public}@: %{public}@", log: self.log, type: .debug, level.description, message) } } diff --git a/Sources/Punchnet/SDLSuperClient.swift b/Sources/Punchnet/SDLSuperClient.swift index 63ad651..7527bb4 100644 --- a/Sources/Punchnet/SDLSuperClient.swift +++ b/Sources/Punchnet/SDLSuperClient.swift @@ -17,7 +17,8 @@ actor SDLSuperClient { private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: TcpMessage.self, bufferingPolicy: .unbounded) private var callbackPromises: [UInt32:EventLoopPromise] = [:] - public let (eventFlow, inboundContinuation) = AsyncStream.makeStream(of: SuperEvent.self, bufferingPolicy: .unbounded) + public let eventFlow: AsyncStream + private let inboundContinuation: AsyncStream.Continuation // id生成器 var idGenerator = SDLIdGenerator(seed: 1) @@ -39,6 +40,8 @@ actor SDLSuperClient { init(host: String, port: Int, logger: SDLLogger) async throws { self.logger = logger + + (self.eventFlow, self.inboundContinuation) = AsyncStream.makeStream(of: SuperEvent.self, bufferingPolicy: .unbounded) let bootstrap = ClientBootstrap(group: self.group) .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .channelInitializer { channel in @@ -59,82 +62,72 @@ actor SDLSuperClient { } func start() async throws { - try await self.asyncChannel.executeThenClose { inbound, outbound in - self.inboundContinuation.yield(.ready) - - try await withThrowingTaskGroup(of: Void.self) { group in - group.addTask { - try await self.asyncChannel.channel.closeFuture.get() - self.logger.log("[SDLSuperClient] socket closed", level: .warning) - throw SDLError.socketClosed - } + try await withTaskCancellationHandler { + try await self.asyncChannel.executeThenClose { inbound, outbound in + self.inboundContinuation.yield(.ready) - group.addTask { - defer { - self.inboundContinuation.finish() - } - - for try await var packet in inbound { - if Task.isCancelled { - break + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + defer { + self.logger.log("[SDLSuperClient] inbound closed", level: .warning) } - if let message = SDLSuperClientDecoder.decode(buffer: &packet) { - self.logger.log("[SDLSuperTransport] read message: \(message)", level: .debug) - switch message.packet { - case .event(let event): - self.inboundContinuation.yield(.event(event)) - case .command(let command): - self.inboundContinuation.yield(.command(message.msgId, command)) - default: - await self.fireCallback(message: message) + for try await var packet in inbound { + try Task.checkCancellation() + + if let message = SDLSuperClientDecoder.decode(buffer: &packet) { + self.logger.log("[SDLSuperTransport] read message: \(message)", level: .debug) + switch message.packet { + case .event(let event): + self.inboundContinuation.yield(.event(event)) + case .command(let command): + self.inboundContinuation.yield(.command(message.msgId, command)) + default: + await self.fireCallback(message: message) + } } } } - self.logger.log("[SDLSuperClient] inbound closed", level: .warning) - } - - group.addTask { - defer { - self.writeContinuation.finish() - } - for try await message in self.writeStream { - if Task.isCancelled { - break + group.addTask { + defer { + self.logger.log("[SDLSuperClient] outbound closed", level: .warning) } - var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 5) - buffer.writeInteger(message.packetId, as: UInt32.self) - buffer.writeBytes([message.type.rawValue]) - buffer.writeBytes(message.data) - try await outbound.write(buffer) - } - self.logger.log("[SDLSuperClient] outbound closed", level: .warning) - } - - // --MARK: 心跳机制 - group.addTask { - while !Task.isCancelled { - do { - await self.ping() - try await Task.sleep(nanoseconds: 5 * 1_000_000_000) - } catch let err { - self.logger.log("[SDLSuperClient] heartbeat cancelled with error: \(err)", level: .warning) - break + for try await message in self.writeStream { + try Task.checkCancellation() + + var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 5) + buffer.writeInteger(message.packetId, as: UInt32.self) + buffer.writeBytes([message.type.rawValue]) + buffer.writeBytes(message.data) + try await outbound.write(buffer) } } - } - - // 迭代等待所有任务的退出, 第一个异常会被抛出 - for try await _ in group { + // --MARK: 心跳机制 + group.addTask { + defer { + self.logger.log("[SDLSuperClient] ping task closed", level: .warning) + } + + while true { + try Task.checkCancellation() + await self.ping() + try await Task.sleep(nanoseconds: 5 * 1_000_000_000) + } + } + + // 迭代等待所有任务的退出, 第一个异常会被抛出 + for try await _ in group { + + } } - - self.logger.log("[SDLSuperClient] group closed", level: .warning) } + } onCancel: { + self.inboundContinuation.finish() + self.writeContinuation.finish() } - } // -- MARK: apis diff --git a/Sources/Punchnet/SDLUDPHole.swift b/Sources/Punchnet/SDLUDPHole.swift index 143a660..e56a579 100644 --- a/Sources/Punchnet/SDLUDPHole.swift +++ b/Sources/Punchnet/SDLUDPHole.swift @@ -20,7 +20,8 @@ actor SDLUDPHole { private var promises: [UInt32:EventLoopPromise] = [:] public var localAddress: SocketAddress? - public let (eventFlow, eventContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded) + public let eventFlow: AsyncStream + private let eventContinuation: AsyncStream.Continuation private let logger: SDLLogger @@ -41,6 +42,8 @@ actor SDLUDPHole { init(logger: SDLLogger) async throws { self.logger = logger + (self.eventFlow, self.eventContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded) + let bootstrap = DatagramBootstrap(group: group) .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) @@ -58,62 +61,59 @@ actor SDLUDPHole { } func start() async throws { - try await self.asyncChannel.executeThenClose {inbound, outbound in - self.eventContinuation.yield(.ready) - try await withThrowingTaskGroup(of: Void.self) { group in - group.addTask { - try await self.asyncChannel.channel.closeFuture.get() - self.logger.log("[UDPHole] channel closed", level: .warning) - throw SDLError.socketClosed - } - - group.addTask { - for try await envelope in inbound { - var buffer = envelope.data - let remoteAddress = envelope.remoteAddress - do { - if let message = try Self.decode(buffer: &buffer) { - switch message { - case .data(let data): - self.logger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug) - self.eventContinuation.yield(.data(data)) - case .stunProbeReply(let probeReply): - // 执行并移除回调 - await self.trigger(probeReply: probeReply) - default: - self.eventContinuation.yield(.message(remoteAddress, message)) + try await withTaskCancellationHandler { + try await self.asyncChannel.executeThenClose {inbound, outbound in + self.eventContinuation.yield(.ready) + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + for try await envelope in inbound { + try Task.checkCancellation() + + var buffer = envelope.data + let remoteAddress = envelope.remoteAddress + do { + if let message = try Self.decode(buffer: &buffer) { + switch message { + case .data(let data): + self.logger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug) + self.eventContinuation.yield(.data(data)) + case .stunProbeReply(let probeReply): + // 执行并移除回调 + await self.trigger(probeReply: probeReply) + default: + self.eventContinuation.yield(.message(remoteAddress, message)) + } + } else { + self.logger.log("[SDLUDPHole] decode message, get null", level: .warning) } - } else { - self.logger.log("[SDLUDPHole] decode message, get null", level: .warning) + } catch let err { + self.logger.log("[SDLUDPHole] decode message, get error: \(err)", level: .warning) + throw err } - } catch let err { - self.logger.log("[SDLUDPHole] decode message, get error: \(err)", level: .warning) - throw err } } - } - - group.addTask { - for try await message in self.writeStream { - if Task.isCancelled { - break - } - - var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1) - buffer.writeBytes([message.type.rawValue]) - buffer.writeBytes(message.data) - - let envelope = AddressedEnvelope(remoteAddress: message.remoteAddress, data: buffer) - try await outbound.write(envelope) - } - } - - for try await _ in group { + group.addTask { + for try await message in self.writeStream { + try Task.checkCancellation() + + var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1) + buffer.writeBytes([message.type.rawValue]) + buffer.writeBytes(message.data) + + let envelope = AddressedEnvelope(remoteAddress: message.remoteAddress, data: buffer) + try await outbound.write(envelope) + } + } + + for try await _ in group { } + + self.logger.log("[SDLUDPHole] group closed", level: .warning) } - - self.logger.log("[SDLUDPHole] group closed", level: .warning) } + } onCancel: { + self.writeContinuation.finish() + self.eventContinuation.finish() } }