fix task group

This commit is contained in:
anlicheng 2025-08-20 15:53:21 +08:00
parent 90939a68c1
commit de79811623
3 changed files with 109 additions and 114 deletions

View File

@ -4,8 +4,8 @@
// //
// Created by on 2024/3/13. // Created by on 2024/3/13.
// //
import Foundation import Foundation
import os.log
public class SDLLogger: @unchecked Sendable { public class SDLLogger: @unchecked Sendable {
public enum Level: Int8, CustomStringConvertible { public enum Level: Int8, CustomStringConvertible {
@ -29,14 +29,16 @@ public class SDLLogger: @unchecked Sendable {
} }
private let level: Level private let level: Level
private let log: OSLog
public init(level: Level) { public init(level: Level) {
self.level = level self.level = level
self.log = OSLog(subsystem: "com.jihe.punchnet", category: "punchnet")
} }
public func log(_ message: String, level: Level = .debug) { public func log(_ message: String, level: Level = .debug) {
if self.level.rawValue <= level.rawValue { if self.level.rawValue <= level.rawValue {
NSLog("\(level.description): \(message)") os_log("%{public}@: %{public}@", log: self.log, type: .debug, level.description, message)
} }
} }

View File

@ -17,7 +17,8 @@ actor SDLSuperClient {
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: TcpMessage.self, bufferingPolicy: .unbounded) private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: TcpMessage.self, bufferingPolicy: .unbounded)
private var callbackPromises: [UInt32:EventLoopPromise<SDLSuperInboundMessage>] = [:] private var callbackPromises: [UInt32:EventLoopPromise<SDLSuperInboundMessage>] = [:]
public let (eventFlow, inboundContinuation) = AsyncStream.makeStream(of: SuperEvent.self, bufferingPolicy: .unbounded) public let eventFlow: AsyncStream<SuperEvent>
private let inboundContinuation: AsyncStream<SuperEvent>.Continuation
// id // id
var idGenerator = SDLIdGenerator(seed: 1) var idGenerator = SDLIdGenerator(seed: 1)
@ -39,6 +40,8 @@ actor SDLSuperClient {
init(host: String, port: Int, logger: SDLLogger) async throws { init(host: String, port: Int, logger: SDLLogger) async throws {
self.logger = logger self.logger = logger
(self.eventFlow, self.inboundContinuation) = AsyncStream.makeStream(of: SuperEvent.self, bufferingPolicy: .unbounded)
let bootstrap = ClientBootstrap(group: self.group) let bootstrap = ClientBootstrap(group: self.group)
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.channelInitializer { channel in .channelInitializer { channel in
@ -59,82 +62,72 @@ actor SDLSuperClient {
} }
func start() async throws { func start() async throws {
try await self.asyncChannel.executeThenClose { inbound, outbound in try await withTaskCancellationHandler {
self.inboundContinuation.yield(.ready) try await self.asyncChannel.executeThenClose { inbound, outbound in
self.inboundContinuation.yield(.ready)
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await self.asyncChannel.channel.closeFuture.get()
self.logger.log("[SDLSuperClient] socket closed", level: .warning)
throw SDLError.socketClosed
}
group.addTask { try await withThrowingTaskGroup(of: Void.self) { group in
defer { group.addTask {
self.inboundContinuation.finish() defer {
} self.logger.log("[SDLSuperClient] inbound closed", level: .warning)
for try await var packet in inbound {
if Task.isCancelled {
break
} }
if let message = SDLSuperClientDecoder.decode(buffer: &packet) { for try await var packet in inbound {
self.logger.log("[SDLSuperTransport] read message: \(message)", level: .debug) try Task.checkCancellation()
switch message.packet {
case .event(let event): if let message = SDLSuperClientDecoder.decode(buffer: &packet) {
self.inboundContinuation.yield(.event(event)) self.logger.log("[SDLSuperTransport] read message: \(message)", level: .debug)
case .command(let command): switch message.packet {
self.inboundContinuation.yield(.command(message.msgId, command)) case .event(let event):
default: self.inboundContinuation.yield(.event(event))
await self.fireCallback(message: message) case .command(let command):
self.inboundContinuation.yield(.command(message.msgId, command))
default:
await self.fireCallback(message: message)
}
} }
} }
} }
self.logger.log("[SDLSuperClient] inbound closed", level: .warning)
}
group.addTask {
defer {
self.writeContinuation.finish()
}
for try await message in self.writeStream { group.addTask {
if Task.isCancelled { defer {
break self.logger.log("[SDLSuperClient] outbound closed", level: .warning)
} }
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 5) for try await message in self.writeStream {
buffer.writeInteger(message.packetId, as: UInt32.self) try Task.checkCancellation()
buffer.writeBytes([message.type.rawValue])
buffer.writeBytes(message.data) var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 5)
try await outbound.write(buffer) buffer.writeInteger(message.packetId, as: UInt32.self)
} buffer.writeBytes([message.type.rawValue])
self.logger.log("[SDLSuperClient] outbound closed", level: .warning) buffer.writeBytes(message.data)
} try await outbound.write(buffer)
// --MARK:
group.addTask {
while !Task.isCancelled {
do {
await self.ping()
try await Task.sleep(nanoseconds: 5 * 1_000_000_000)
} catch let err {
self.logger.log("[SDLSuperClient] heartbeat cancelled with error: \(err)", level: .warning)
break
} }
} }
}
// 退,
for try await _ in group {
// --MARK:
group.addTask {
defer {
self.logger.log("[SDLSuperClient] ping task closed", level: .warning)
}
while true {
try Task.checkCancellation()
await self.ping()
try await Task.sleep(nanoseconds: 5 * 1_000_000_000)
}
}
// 退,
for try await _ in group {
}
} }
self.logger.log("[SDLSuperClient] group closed", level: .warning)
} }
} onCancel: {
self.inboundContinuation.finish()
self.writeContinuation.finish()
} }
} }
// -- MARK: apis // -- MARK: apis

View File

@ -20,7 +20,8 @@ actor SDLUDPHole {
private var promises: [UInt32:EventLoopPromise<SDLStunProbeReply>] = [:] private var promises: [UInt32:EventLoopPromise<SDLStunProbeReply>] = [:]
public var localAddress: SocketAddress? public var localAddress: SocketAddress?
public let (eventFlow, eventContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded) public let eventFlow: AsyncStream<UDPEvent>
private let eventContinuation: AsyncStream<UDPEvent>.Continuation
private let logger: SDLLogger private let logger: SDLLogger
@ -41,6 +42,8 @@ actor SDLUDPHole {
init(logger: SDLLogger) async throws { init(logger: SDLLogger) async throws {
self.logger = logger self.logger = logger
(self.eventFlow, self.eventContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded)
let bootstrap = DatagramBootstrap(group: group) let bootstrap = DatagramBootstrap(group: group)
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
@ -58,62 +61,59 @@ actor SDLUDPHole {
} }
func start() async throws { func start() async throws {
try await self.asyncChannel.executeThenClose {inbound, outbound in try await withTaskCancellationHandler {
self.eventContinuation.yield(.ready) try await self.asyncChannel.executeThenClose {inbound, outbound in
try await withThrowingTaskGroup(of: Void.self) { group in self.eventContinuation.yield(.ready)
group.addTask { try await withThrowingTaskGroup(of: Void.self) { group in
try await self.asyncChannel.channel.closeFuture.get() group.addTask {
self.logger.log("[UDPHole] channel closed", level: .warning) for try await envelope in inbound {
throw SDLError.socketClosed try Task.checkCancellation()
}
var buffer = envelope.data
group.addTask { let remoteAddress = envelope.remoteAddress
for try await envelope in inbound { do {
var buffer = envelope.data if let message = try Self.decode(buffer: &buffer) {
let remoteAddress = envelope.remoteAddress switch message {
do { case .data(let data):
if let message = try Self.decode(buffer: &buffer) { self.logger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug)
switch message { self.eventContinuation.yield(.data(data))
case .data(let data): case .stunProbeReply(let probeReply):
self.logger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug) //
self.eventContinuation.yield(.data(data)) await self.trigger(probeReply: probeReply)
case .stunProbeReply(let probeReply): default:
// self.eventContinuation.yield(.message(remoteAddress, message))
await self.trigger(probeReply: probeReply) }
default: } else {
self.eventContinuation.yield(.message(remoteAddress, message)) self.logger.log("[SDLUDPHole] decode message, get null", level: .warning)
} }
} else { } catch let err {
self.logger.log("[SDLUDPHole] decode message, get null", level: .warning) self.logger.log("[SDLUDPHole] decode message, get error: \(err)", level: .warning)
throw err
} }
} catch let err {
self.logger.log("[SDLUDPHole] decode message, get error: \(err)", level: .warning)
throw err
} }
} }
}
group.addTask {
for try await message in self.writeStream {
if Task.isCancelled {
break
}
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1)
buffer.writeBytes([message.type.rawValue])
buffer.writeBytes(message.data)
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: message.remoteAddress, data: buffer)
try await outbound.write(envelope)
}
}
for try await _ in group {
group.addTask {
for try await message in self.writeStream {
try Task.checkCancellation()
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1)
buffer.writeBytes([message.type.rawValue])
buffer.writeBytes(message.data)
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: message.remoteAddress, data: buffer)
try await outbound.write(envelope)
}
}
for try await _ in group { }
self.logger.log("[SDLUDPHole] group closed", level: .warning)
} }
self.logger.log("[SDLUDPHole] group closed", level: .warning)
} }
} onCancel: {
self.writeContinuation.finish()
self.eventContinuation.finish()
} }
} }