debug task group

This commit is contained in:
anlicheng 2025-08-19 23:31:06 +08:00
parent 77a631a001
commit 90939a68c1
4 changed files with 85 additions and 74 deletions

View File

@ -62,12 +62,8 @@ public class SDLContext: @unchecked Sendable {
// stunRequestcookie // stunRequestcookie
private var lastCookie: UInt32? = 0 private var lastCookie: UInt32? = 0
//
private var stunCancel: AnyCancellable?
// //
private var monitor = SDLNetworkMonitor() private var monitor: SDLNetworkMonitor?
private var monitorCancel: AnyCancellable?
// socket // socket
private var noticeClient: SDLNoticeClient? private var noticeClient: SDLNoticeClient?
@ -83,6 +79,8 @@ public class SDLContext: @unchecked Sendable {
private let logger: SDLLogger private let logger: SDLLogger
private var rootTask: Task<Void, Error>?
struct RegisterRequest { struct RegisterRequest {
let srcMac: Data let srcMac: Data
let dstMac: Data let dstMac: Data
@ -107,59 +105,68 @@ public class SDLContext: @unchecked Sendable {
} }
public func start() async throws { public func start() async throws {
self.noticeClient = try await SDLNoticeClient(logger: self.logger) self.rootTask = Task {
try await withThrowingTaskGroup(of: Void.self) { group in
try await withThrowingTaskGroup(of: Void.self) { group in group.addTask {
group.addTask { while !Task.isCancelled {
while !Task.isCancelled { do {
do { try await self.startUDPHole()
try await self.startUDPHole() } catch let err {
} catch let err { self.logger.log("[SDLContext] UDPHole get err: \(err)", level: .warning)
self.logger.log("[SDLContext] UDPHole get err: \(err)", level: .warning) }
} }
} }
}
group.addTask {
group.addTask { while !Task.isCancelled {
while !Task.isCancelled { do {
do { try await self.startSuperClient()
try await self.startSuperClient() } catch let err {
} catch let err { self.logger.log("[SDLContext] SuperClient get error: \(err), will restart", level: .warning)
self.logger.log("[SDLContext] SuperClient get error: \(err), will restart", level: .warning) await self.arpServer.clear()
await self.arpServer.clear() try? await Task.sleep(for: .seconds(2))
try? await Task.sleep(for: .seconds(2)) }
} }
} }
}
group.addTask {
group.addTask { await self.startMonitor()
try await self.startMonitor() }
}
group.addTask {
group.addTask { while !Task.isCancelled {
while !Task.isCancelled { do {
do { try await self.startNoticeClient()
try await self.noticeClient?.start() } catch let err {
} catch let err { self.logger.log("[SDLContext] noticeClient get err: \(err)", level: .warning)
self.logger.log("[SDLContext] noticeClient get err: \(err)", level: .warning) }
} }
} }
try await group.waitForAll()
} }
try await group.waitForAll()
} }
try await self.rootTask?.value
} }
public func stop() async { public func stop() async {
self.rootTask?.cancel()
self.superClient = nil self.superClient = nil
self.udpHole = nil self.udpHole = nil
self.noticeClient = nil
self.readTask?.cancel() self.readTask?.cancel()
} }
private func startNoticeClient() async throws {
self.noticeClient = try await SDLNoticeClient(logger: self.logger)
try await self.noticeClient?.start()
self.logger.log("[SDLContext] notice_client task cancel", level: .warning)
}
private func startUDPHole() async throws { private func startUDPHole() async throws {
self.udpHole = try await SDLUDPHole(logger: self.logger) self.udpHole = try await SDLUDPHole(logger: self.logger)
try await withThrowingTaskGroup(of: Void.self) { group in try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { group.addTask {
try await self.udpHole?.start() try await self.udpHole?.start()
@ -180,12 +187,13 @@ public class SDLContext: @unchecked Sendable {
} }
} }
try await group.waitForAll() try await group.waitForAll()
self.logger.log("[SDLContext] udp_hole task cancel", level: .warning)
} }
} }
private func startSuperClient() async throws { private func startSuperClient() async throws {
self.superClient = try await SDLSuperClient(host: self.config.superHost, port: self.config.superPort, logger: self.logger) self.superClient = try await SDLSuperClient(host: self.config.superHost, port: self.config.superPort, logger: self.logger)
try await withThrowingTaskGroup(of: Void.self) { group in try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { group.addTask {
try await self.superClient?.start() try await self.superClient?.start()
@ -199,33 +207,21 @@ public class SDLContext: @unchecked Sendable {
} }
} }
try await group.waitForAll() try await group.waitForAll()
self.logger.log("[SDLContext] super client task cancel", level: .warning)
} }
} }
private func startMonitor() async throws { private func startMonitor() async {
try await withThrowingTaskGroup(of: Void.self) { group in self.monitor = SDLNetworkMonitor()
group.addTask { for await event in self.monitor!.eventStream {
try await self.noticeClient?.start() switch event {
case .changed:
// nat
self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config, logger: self.logger)
self.logger.log("didNetworkPathChanged, nat type is: \(self.natType)", level: .info)
case .unreachable:
self.logger.log("didNetworkPathUnreachable", level: .warning)
} }
group.addTask {
//
self.monitorCancel = self.monitor.eventFlow.sink { event in
switch event {
case .changed:
// nat
Task {
self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config, logger: self.logger)
self.logger.log("didNetworkPathChanged, nat type is: \(self.natType)", level: .info)
}
case .unreachable:
self.logger.log("didNetworkPathUnreachable", level: .warning)
}
}
self.monitor.start()
}
try await group.waitForAll()
} }
} }
@ -579,7 +575,7 @@ public class SDLContext: @unchecked Sendable {
} }
deinit { deinit {
self.stunCancel?.cancel() self.rootTask?.cancel()
self.udpHole = nil self.udpHole = nil
self.superClient = nil self.superClient = nil
} }

View File

@ -15,9 +15,9 @@ class SDLNetworkMonitor: @unchecked Sendable {
private var interfaceType: NWInterface.InterfaceType? private var interfaceType: NWInterface.InterfaceType?
private let publisher = PassthroughSubject<NWInterface.InterfaceType, Never>() private let publisher = PassthroughSubject<NWInterface.InterfaceType, Never>()
private var cancel: AnyCancellable? private var cancel: AnyCancellable?
private let queue = DispatchQueue(label: "networkMonitorQueue")
public let eventFlow = PassthroughSubject<MonitorEvent, Never>() public let eventStream: AsyncStream<MonitorEvent>
private let eventContinuation: AsyncStream<MonitorEvent>.Continuation
enum MonitorEvent { enum MonitorEvent {
case changed case changed
@ -26,6 +26,7 @@ class SDLNetworkMonitor: @unchecked Sendable {
init() { init() {
self.monitor = NWPathMonitor() self.monitor = NWPathMonitor()
(self.eventStream , self.eventContinuation) = AsyncStream.makeStream(of: MonitorEvent.self, bufferingPolicy: .unbounded)
} }
func start() { func start() {
@ -39,16 +40,16 @@ class SDLNetworkMonitor: @unchecked Sendable {
self.publisher.send(.wiredEthernet) self.publisher.send(.wiredEthernet)
} }
} else { } else {
self.eventFlow.send(.unreachable) self.eventContinuation.yield(.unreachable)
self.interfaceType = nil self.interfaceType = nil
} }
} }
self.monitor.start(queue: self.queue) self.monitor.start(queue: DispatchQueue.global())
self.cancel = publisher.throttle(for: 5.0, scheduler: self.queue, latest: true) self.cancel = publisher.throttle(for: 5.0, scheduler: DispatchQueue.global(), latest: true)
.sink { type in .sink { type in
if self.interfaceType != nil && self.interfaceType != type { if self.interfaceType != nil && self.interfaceType != type {
self.eventFlow.send(.changed) self.eventContinuation.yield(.changed)
} }
self.interfaceType = type self.interfaceType = type
} }
@ -57,6 +58,7 @@ class SDLNetworkMonitor: @unchecked Sendable {
deinit { deinit {
self.monitor.cancel() self.monitor.cancel()
self.cancel?.cancel() self.cancel?.cancel()
self.eventContinuation.finish()
} }
} }

View File

@ -75,6 +75,10 @@ actor SDLSuperClient {
} }
for try await var packet in inbound { for try await var packet in inbound {
if Task.isCancelled {
break
}
if let message = SDLSuperClientDecoder.decode(buffer: &packet) { if let message = SDLSuperClientDecoder.decode(buffer: &packet) {
self.logger.log("[SDLSuperTransport] read message: \(message)", level: .debug) self.logger.log("[SDLSuperTransport] read message: \(message)", level: .debug)
switch message.packet { switch message.packet {
@ -96,6 +100,10 @@ actor SDLSuperClient {
} }
for try await message in self.writeStream { for try await message in self.writeStream {
if Task.isCancelled {
break
}
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 5) var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 5)
buffer.writeInteger(message.packetId, as: UInt32.self) buffer.writeInteger(message.packetId, as: UInt32.self)
buffer.writeBytes([message.type.rawValue]) buffer.writeBytes([message.type.rawValue])
@ -107,7 +115,7 @@ actor SDLSuperClient {
// --MARK: // --MARK:
group.addTask { group.addTask {
while true { while !Task.isCancelled {
do { do {
await self.ping() await self.ping()
try await Task.sleep(nanoseconds: 5 * 1_000_000_000) try await Task.sleep(nanoseconds: 5 * 1_000_000_000)
@ -126,6 +134,7 @@ actor SDLSuperClient {
self.logger.log("[SDLSuperClient] group closed", level: .warning) self.logger.log("[SDLSuperClient] group closed", level: .warning)
} }
} }
} }
// -- MARK: apis // -- MARK: apis

View File

@ -58,9 +58,8 @@ actor SDLUDPHole {
} }
func start() async throws { func start() async throws {
try await self.asyncChannel.executeThenClose { inbound, outbound in try await self.asyncChannel.executeThenClose {inbound, outbound in
self.eventContinuation.yield(.ready) self.eventContinuation.yield(.ready)
try await withThrowingTaskGroup(of: Void.self) { group in try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { group.addTask {
try await self.asyncChannel.channel.closeFuture.get() try await self.asyncChannel.channel.closeFuture.get()
@ -96,6 +95,10 @@ actor SDLUDPHole {
group.addTask { group.addTask {
for try await message in self.writeStream { for try await message in self.writeStream {
if Task.isCancelled {
break
}
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1) var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1)
buffer.writeBytes([message.type.rawValue]) buffer.writeBytes([message.type.rawValue])
buffer.writeBytes(message.data) buffer.writeBytes(message.data)
@ -266,4 +269,5 @@ actor SDLUDPHole {
self.writeContinuation.finish() self.writeContinuation.finish()
self.eventContinuation.finish() self.eventContinuation.finish()
} }
} }