调整task

This commit is contained in:
anlicheng 2025-08-01 15:05:03 +08:00
parent f273da3b11
commit 26ee512a9a
3 changed files with 79 additions and 57 deletions

View File

@ -93,28 +93,68 @@ public class SDLContext: @unchecked Sendable {
} }
public func start() async throws { public func start() async throws {
self.udpHole = try await SDLUDPHole()
self.superClient = try await SDLSuperClient(host: self.config.superHost, port: self.config.superPort)
self.noticeClient = try await SDLNoticeClient() self.noticeClient = try await SDLNoticeClient()
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
while !Task.isCancelled {
do {
try await self.startUDPHole()
} catch let err {
NSLog("udp Hole get err: \(err)")
}
}
}
group.addTask {
while !Task.isCancelled {
do {
try await self.startSuperClient()
} catch let err {
NSLog("SuperClient get error: \(err)")
await self.arpServer.clear()
try? await Task.sleep(for: .seconds(2))
}
}
}
group.addTask {
try await self.startMonitor()
}
group.addTask {
while !Task.isCancelled {
do {
try await self.noticeClient?.start()
} catch let err {
NSLog("noticeClient get err: \(err)")
}
}
}
try await group.waitForAll()
}
}
public func stop() async {
self.superClient = nil
self.udpHole = nil
self.readTask?.cancel()
}
private func startUDPHole() async throws {
self.udpHole = try await SDLUDPHole()
try await withThrowingTaskGroup(of: Void.self) { group in try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { group.addTask {
try await self.udpHole?.start() try await self.udpHole?.start()
} }
group.addTask { group.addTask {
try await self.superClient?.start() while !Task.isCancelled {
} try await Task.sleep(nanoseconds: 5 * 1_000_000_000)
self.lastCookie = await self.udpHole?.stunRequest(context: self)
group.addTask {
try await self.noticeClient?.start()
}
group.addTask {
if let eventFlow = self.superClient?.eventFlow {
for try await event in eventFlow {
try await self.handleSuperEvent(event: event)
}
} }
} }
@ -125,13 +165,34 @@ public class SDLContext: @unchecked Sendable {
} }
} }
} }
try await group.waitForAll()
}
}
private func startSuperClient() async throws {
self.superClient = try await SDLSuperClient(host: self.config.superHost, port: self.config.superPort)
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await self.superClient?.start()
}
group.addTask { group.addTask {
while !Task.isCancelled { if let eventFlow = self.superClient?.eventFlow {
try await Task.sleep(nanoseconds: 5 * 1_000_000_000) for try await event in eventFlow {
self.lastCookie = await self.udpHole?.stunRequest(context: self) try await self.handleSuperEvent(event: event)
}
} }
} }
try await group.waitForAll()
}
}
private func startMonitor() async throws {
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await self.noticeClient?.start()
}
group.addTask { group.addTask {
// //
@ -154,13 +215,6 @@ public class SDLContext: @unchecked Sendable {
} }
} }
public func stop() async {
self.superClient = nil
self.udpHole = nil
self.readTask?.cancel()
}
private func handleSuperEvent(event: SDLSuperClient.SuperEvent) async throws { private func handleSuperEvent(event: SDLSuperClient.SuperEvent) async throws {
switch event { switch event {
case .ready: case .ready:
@ -213,14 +267,6 @@ public class SDLContext: @unchecked Sendable {
() ()
} }
case .closed:
NSLog("[SDLContext] super client closed")
await self.arpServer.clear()
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
// Task {@MainActor in
// try await self.startSuperClient()
// }
}
case .event(let evt): case .event(let evt):
switch evt { switch evt {
case .natChanged(let natChangedEvent): case .natChanged(let natChangedEvent):
@ -261,19 +307,6 @@ public class SDLContext: @unchecked Sendable {
} }
private func startUDPHole() async throws {
// self.udpHole = SDLUDPHole()
//
// self.udpCancel?.cancel()
// self.udpCancel = self.udpHole?.eventFlow.sink { event in
// Task.detached {
// await self.handleUDPEvent(event: event)
// }
// }
//
// try await self.udpHole?.start()
}
private func handleUDPEvent(event: SDLUDPHole.UDPEvent) async throws { private func handleUDPEvent(event: SDLUDPHole.UDPEvent) async throws {
switch event { switch event {
case .ready: case .ready:
@ -281,13 +314,6 @@ public class SDLContext: @unchecked Sendable {
//self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config) //self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config)
SDLLogger.log("[SDLContext] nat type is: \(self.natType)", level: .debug) SDLLogger.log("[SDLContext] nat type is: \(self.natType)", level: .debug)
case .closed:
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
Task {
try await self.startUDPHole()
}
}
case .message(let remoteAddress, let message): case .message(let remoteAddress, let message):
switch message { switch message {
case .register(let register): case .register(let register):

View File

@ -31,7 +31,6 @@ actor SDLSuperClient {
// //
enum SuperEvent { enum SuperEvent {
case ready case ready
case closed
case event(SDLEvent) case event(SDLEvent)
case command(UInt32, SDLCommand) case command(UInt32, SDLCommand)
} }
@ -62,7 +61,6 @@ actor SDLSuperClient {
try await withThrowingTaskGroup(of: Void.self) { group in try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { group.addTask {
try await self.asyncChannel.channel.closeFuture.get() try await self.asyncChannel.channel.closeFuture.get()
self.inboundContinuation.yield(.closed)
self.inboundContinuation.finish() self.inboundContinuation.finish()
} }

View File

@ -30,7 +30,6 @@ actor SDLUDPHole {
// //
enum UDPEvent { enum UDPEvent {
case ready case ready
case closed
case message(SocketAddress, SDLHoleInboundMessage) case message(SocketAddress, SDLHoleInboundMessage)
case data(SDLData) case data(SDLData)
} }
@ -60,7 +59,6 @@ actor SDLUDPHole {
try await withThrowingTaskGroup(of: Void.self) { group in try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { group.addTask {
try await self.asyncChannel.channel.closeFuture.get() try await self.asyncChannel.channel.closeFuture.get()
self.eventContinuation.yield(.closed)
self.eventContinuation.finish() self.eventContinuation.finish()
} }