debug task group
This commit is contained in:
parent
77a631a001
commit
d69c31921e
@ -62,12 +62,8 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
// 记录最后发送的stunRequest的cookie
|
// 记录最后发送的stunRequest的cookie
|
||||||
private var lastCookie: UInt32? = 0
|
private var lastCookie: UInt32? = 0
|
||||||
|
|
||||||
// 定时器
|
|
||||||
private var stunCancel: AnyCancellable?
|
|
||||||
|
|
||||||
// 网络状态变化的健康
|
// 网络状态变化的健康
|
||||||
private var monitor = SDLNetworkMonitor()
|
private var monitor: SDLNetworkMonitor?
|
||||||
private var monitorCancel: AnyCancellable?
|
|
||||||
|
|
||||||
// 内部socket通讯
|
// 内部socket通讯
|
||||||
private var noticeClient: SDLNoticeClient?
|
private var noticeClient: SDLNoticeClient?
|
||||||
@ -83,6 +79,8 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
|
|
||||||
private let logger: SDLLogger
|
private let logger: SDLLogger
|
||||||
|
|
||||||
|
private var rootTask: Task<Void, Error>?
|
||||||
|
|
||||||
struct RegisterRequest {
|
struct RegisterRequest {
|
||||||
let srcMac: Data
|
let srcMac: Data
|
||||||
let dstMac: Data
|
let dstMac: Data
|
||||||
@ -107,59 +105,68 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public func start() async throws {
|
public func start() async throws {
|
||||||
self.noticeClient = try await SDLNoticeClient(logger: self.logger)
|
self.rootTask = Task {
|
||||||
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
group.addTask {
|
||||||
group.addTask {
|
while !Task.isCancelled {
|
||||||
while !Task.isCancelled {
|
do {
|
||||||
do {
|
try await self.startUDPHole()
|
||||||
try await self.startUDPHole()
|
} catch let err {
|
||||||
} catch let err {
|
self.logger.log("[SDLContext] UDPHole get err: \(err)", level: .warning)
|
||||||
self.logger.log("[SDLContext] UDPHole get err: \(err)", level: .warning)
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
group.addTask {
|
||||||
group.addTask {
|
while !Task.isCancelled {
|
||||||
while !Task.isCancelled {
|
do {
|
||||||
do {
|
try await self.startSuperClient()
|
||||||
try await self.startSuperClient()
|
} catch let err {
|
||||||
} catch let err {
|
self.logger.log("[SDLContext] SuperClient get error: \(err), will restart", level: .warning)
|
||||||
self.logger.log("[SDLContext] SuperClient get error: \(err), will restart", level: .warning)
|
await self.arpServer.clear()
|
||||||
await self.arpServer.clear()
|
try? await Task.sleep(for: .seconds(2))
|
||||||
try? await Task.sleep(for: .seconds(2))
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
group.addTask {
|
||||||
group.addTask {
|
await self.startMonitor()
|
||||||
try await self.startMonitor()
|
}
|
||||||
}
|
|
||||||
|
group.addTask {
|
||||||
group.addTask {
|
while !Task.isCancelled {
|
||||||
while !Task.isCancelled {
|
do {
|
||||||
do {
|
try await self.startNoticeClient()
|
||||||
try await self.noticeClient?.start()
|
} catch let err {
|
||||||
} catch let err {
|
self.logger.log("[SDLContext] noticeClient get err: \(err)", level: .warning)
|
||||||
self.logger.log("[SDLContext] noticeClient get err: \(err)", level: .warning)
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try await group.waitForAll()
|
||||||
}
|
}
|
||||||
|
|
||||||
try await group.waitForAll()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try await self.rootTask?.value
|
||||||
}
|
}
|
||||||
|
|
||||||
public func stop() async {
|
public func stop() async {
|
||||||
|
self.rootTask?.cancel()
|
||||||
self.superClient = nil
|
self.superClient = nil
|
||||||
self.udpHole = nil
|
self.udpHole = nil
|
||||||
|
self.noticeClient = nil
|
||||||
|
|
||||||
self.readTask?.cancel()
|
self.readTask?.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func startNoticeClient() async throws {
|
||||||
|
self.noticeClient = try await SDLNoticeClient(logger: self.logger)
|
||||||
|
try await self.noticeClient?.start()
|
||||||
|
self.logger.log("[SDLContext] notice_client task cancel", level: .warning)
|
||||||
|
}
|
||||||
|
|
||||||
private func startUDPHole() async throws {
|
private func startUDPHole() async throws {
|
||||||
self.udpHole = try await SDLUDPHole(logger: self.logger)
|
self.udpHole = try await SDLUDPHole(logger: self.logger)
|
||||||
|
|
||||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
group.addTask {
|
group.addTask {
|
||||||
try await self.udpHole?.start()
|
try await self.udpHole?.start()
|
||||||
@ -180,12 +187,13 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
try await group.waitForAll()
|
try await group.waitForAll()
|
||||||
|
self.logger.log("[SDLContext] udp_hole task cancel", level: .warning)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private func startSuperClient() async throws {
|
private func startSuperClient() async throws {
|
||||||
self.superClient = try await SDLSuperClient(host: self.config.superHost, port: self.config.superPort, logger: self.logger)
|
self.superClient = try await SDLSuperClient(host: self.config.superHost, port: self.config.superPort, logger: self.logger)
|
||||||
|
|
||||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
group.addTask {
|
group.addTask {
|
||||||
try await self.superClient?.start()
|
try await self.superClient?.start()
|
||||||
@ -199,33 +207,21 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
try await group.waitForAll()
|
try await group.waitForAll()
|
||||||
|
self.logger.log("[SDLContext] super client task cancel", level: .warning)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func startMonitor() async throws {
|
private func startMonitor() async {
|
||||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
self.monitor = SDLNetworkMonitor()
|
||||||
group.addTask {
|
for await event in self.monitor!.eventStream {
|
||||||
try await self.noticeClient?.start()
|
switch event {
|
||||||
|
case .changed:
|
||||||
|
// 需要重新探测网络的nat类型
|
||||||
|
self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config, logger: self.logger)
|
||||||
|
self.logger.log("didNetworkPathChanged, nat type is: \(self.natType)", level: .info)
|
||||||
|
case .unreachable:
|
||||||
|
self.logger.log("didNetworkPathUnreachable", level: .warning)
|
||||||
}
|
}
|
||||||
|
|
||||||
group.addTask {
|
|
||||||
// 启动网络监控
|
|
||||||
self.monitorCancel = self.monitor.eventFlow.sink { event in
|
|
||||||
switch event {
|
|
||||||
case .changed:
|
|
||||||
// 需要重新探测网络的nat类型
|
|
||||||
Task {
|
|
||||||
self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config, logger: self.logger)
|
|
||||||
self.logger.log("didNetworkPathChanged, nat type is: \(self.natType)", level: .info)
|
|
||||||
}
|
|
||||||
case .unreachable:
|
|
||||||
self.logger.log("didNetworkPathUnreachable", level: .warning)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.monitor.start()
|
|
||||||
}
|
|
||||||
|
|
||||||
try await group.waitForAll()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -579,7 +575,7 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
deinit {
|
deinit {
|
||||||
self.stunCancel?.cancel()
|
self.rootTask?.cancel()
|
||||||
self.udpHole = nil
|
self.udpHole = nil
|
||||||
self.superClient = nil
|
self.superClient = nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,9 +15,9 @@ class SDLNetworkMonitor: @unchecked Sendable {
|
|||||||
private var interfaceType: NWInterface.InterfaceType?
|
private var interfaceType: NWInterface.InterfaceType?
|
||||||
private let publisher = PassthroughSubject<NWInterface.InterfaceType, Never>()
|
private let publisher = PassthroughSubject<NWInterface.InterfaceType, Never>()
|
||||||
private var cancel: AnyCancellable?
|
private var cancel: AnyCancellable?
|
||||||
private let queue = DispatchQueue(label: "networkMonitorQueue")
|
|
||||||
|
|
||||||
public let eventFlow = PassthroughSubject<MonitorEvent, Never>()
|
public let eventStream: AsyncStream<MonitorEvent>
|
||||||
|
private let eventContinuation: AsyncStream<MonitorEvent>.Continuation
|
||||||
|
|
||||||
enum MonitorEvent {
|
enum MonitorEvent {
|
||||||
case changed
|
case changed
|
||||||
@ -26,6 +26,7 @@ class SDLNetworkMonitor: @unchecked Sendable {
|
|||||||
|
|
||||||
init() {
|
init() {
|
||||||
self.monitor = NWPathMonitor()
|
self.monitor = NWPathMonitor()
|
||||||
|
(self.eventStream , self.eventContinuation) = AsyncStream.makeStream(of: MonitorEvent.self, bufferingPolicy: .unbounded)
|
||||||
}
|
}
|
||||||
|
|
||||||
func start() {
|
func start() {
|
||||||
@ -39,16 +40,16 @@ class SDLNetworkMonitor: @unchecked Sendable {
|
|||||||
self.publisher.send(.wiredEthernet)
|
self.publisher.send(.wiredEthernet)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
self.eventFlow.send(.unreachable)
|
self.eventContinuation.yield(.unreachable)
|
||||||
self.interfaceType = nil
|
self.interfaceType = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.monitor.start(queue: self.queue)
|
self.monitor.start(queue: DispatchQueue.global())
|
||||||
|
|
||||||
self.cancel = publisher.throttle(for: 5.0, scheduler: self.queue, latest: true)
|
self.cancel = publisher.throttle(for: 5.0, scheduler: DispatchQueue.global(), latest: true)
|
||||||
.sink { type in
|
.sink { type in
|
||||||
if self.interfaceType != nil && self.interfaceType != type {
|
if self.interfaceType != nil && self.interfaceType != type {
|
||||||
self.eventFlow.send(.changed)
|
self.eventContinuation.yield(.changed)
|
||||||
}
|
}
|
||||||
self.interfaceType = type
|
self.interfaceType = type
|
||||||
}
|
}
|
||||||
@ -57,6 +58,7 @@ class SDLNetworkMonitor: @unchecked Sendable {
|
|||||||
deinit {
|
deinit {
|
||||||
self.monitor.cancel()
|
self.monitor.cancel()
|
||||||
self.cancel?.cancel()
|
self.cancel?.cancel()
|
||||||
|
self.eventContinuation.finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -75,6 +75,10 @@ actor SDLSuperClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for try await var packet in inbound {
|
for try await var packet in inbound {
|
||||||
|
if Task.isCancelled {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
if let message = SDLSuperClientDecoder.decode(buffer: &packet) {
|
if let message = SDLSuperClientDecoder.decode(buffer: &packet) {
|
||||||
self.logger.log("[SDLSuperTransport] read message: \(message)", level: .debug)
|
self.logger.log("[SDLSuperTransport] read message: \(message)", level: .debug)
|
||||||
switch message.packet {
|
switch message.packet {
|
||||||
@ -96,6 +100,10 @@ actor SDLSuperClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for try await message in self.writeStream {
|
for try await message in self.writeStream {
|
||||||
|
if Task.isCancelled {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 5)
|
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 5)
|
||||||
buffer.writeInteger(message.packetId, as: UInt32.self)
|
buffer.writeInteger(message.packetId, as: UInt32.self)
|
||||||
buffer.writeBytes([message.type.rawValue])
|
buffer.writeBytes([message.type.rawValue])
|
||||||
@ -107,7 +115,7 @@ actor SDLSuperClient {
|
|||||||
|
|
||||||
// --MARK: 心跳机制
|
// --MARK: 心跳机制
|
||||||
group.addTask {
|
group.addTask {
|
||||||
while true {
|
while !Task.isCancelled {
|
||||||
do {
|
do {
|
||||||
await self.ping()
|
await self.ping()
|
||||||
try await Task.sleep(nanoseconds: 5 * 1_000_000_000)
|
try await Task.sleep(nanoseconds: 5 * 1_000_000_000)
|
||||||
@ -126,6 +134,7 @@ actor SDLSuperClient {
|
|||||||
self.logger.log("[SDLSuperClient] group closed", level: .warning)
|
self.logger.log("[SDLSuperClient] group closed", level: .warning)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// -- MARK: apis
|
// -- MARK: apis
|
||||||
|
|||||||
@ -58,9 +58,8 @@ actor SDLUDPHole {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func start() async throws {
|
func start() async throws {
|
||||||
try await self.asyncChannel.executeThenClose { inbound, outbound in
|
try await self.asyncChannel.executeThenClose {inbound, outbound in
|
||||||
self.eventContinuation.yield(.ready)
|
self.eventContinuation.yield(.ready)
|
||||||
|
|
||||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
group.addTask {
|
group.addTask {
|
||||||
try await self.asyncChannel.channel.closeFuture.get()
|
try await self.asyncChannel.channel.closeFuture.get()
|
||||||
@ -96,6 +95,10 @@ actor SDLUDPHole {
|
|||||||
|
|
||||||
group.addTask {
|
group.addTask {
|
||||||
for try await message in self.writeStream {
|
for try await message in self.writeStream {
|
||||||
|
if Task.isCancelled {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1)
|
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1)
|
||||||
buffer.writeBytes([message.type.rawValue])
|
buffer.writeBytes([message.type.rawValue])
|
||||||
buffer.writeBytes(message.data)
|
buffer.writeBytes(message.data)
|
||||||
@ -266,4 +269,5 @@ actor SDLUDPHole {
|
|||||||
self.writeContinuation.finish()
|
self.writeContinuation.finish()
|
||||||
self.eventContinuation.finish()
|
self.eventContinuation.finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user