add supervisor

This commit is contained in:
anlicheng 2026-03-10 16:30:31 +08:00
parent 10278dfef0
commit d930dbafad
2 changed files with 70 additions and 39 deletions

View File

@ -19,10 +19,9 @@ actor SDLContextActor {
case registered
}
nonisolated let config: SDLConfiguration
private var state: State = .unregistered
nonisolated let config: SDLConfiguration
// nat
var natType: SDLNATProberActor.NatType = .blocked
@ -48,7 +47,7 @@ actor SDLContextActor {
private var quicClient: SDLQUICClient?
private var quicWorkers: [Task<Void, Never>]?
nonisolated private let puncherActor: SDLPuncherActor
//
nonisolated private let proberActor: SDLNATProberActor
@ -70,7 +69,8 @@ actor SDLContextActor {
nonisolated private let flowTracer = SDLFlowTracer()
//
private var loopChildWorkers: [Task<Void, Never>] = []
private var supervisor = SDLSupervisor()
private let provider: NEPacketTunnelProvider
//
@ -102,37 +102,37 @@ actor SDLContextActor {
self.snapshotPublisher = snapshotPublisher
}
public func start() {
public func start() async {
self.startMonitor()
self.loopChildWorkers.append(spawnLoop {
await self.supervisor.addWorker(name: "quicClient") {
SDLLogger.shared.log("[SDLContext] try start quicClient")
let quicClient = try await self.startQUICClient()
SDLLogger.shared.log("[SDLContext] quicClient running!!!!")
await quicClient.waitClose()
SDLLogger.shared.log("[SDLContext] quicClient closed!!!!")
})
}
self.loopChildWorkers.append(spawnLoop {
await self.supervisor.addWorker(name: "noticeClient") {
let noticeClient = try self.startNoticeClient()
SDLLogger.shared.log("[SDLContext] noticeClient running!!!!")
try await noticeClient.waitClose()
SDLLogger.shared.log("[SDLContext] noticeClient closed!!!!")
})
}
self.loopChildWorkers.append(spawnLoop {
await self.supervisor.addWorker(name: "dnsClient") {
let dnsClient = try await self.startDnsClient()
SDLLogger.shared.log("[SDLContext] dns running!!!!")
try await dnsClient.waitClose()
SDLLogger.shared.log("[SDLContext] dns closed!!!!")
})
}
self.loopChildWorkers.append(spawnLoop {
await self.supervisor.addWorker(name: "udpHole") {
let udpHole = try await self.startUDPHole()
SDLLogger.shared.log("[SDLContext] udp running!!!!")
try await udpHole.waitClose()
SDLLogger.shared.log("[SDLContext] udp closed!!!!")
})
}
}
private func startQUICClient() async throws -> SDLQUICClient {
@ -299,14 +299,13 @@ actor SDLContextActor {
// nat
self.probeNatType()
return udpHole
}
// context
public func stop() async {
self.loopChildWorkers.forEach { $0.cancel() }
self.loopChildWorkers.removeAll()
await self.supervisor.stop()
self.udpHoleWorkers?.forEach { $0.cancel() }
self.udpHoleWorkers = nil
@ -403,7 +402,7 @@ actor SDLContextActor {
self.provider.cancelTunnelWithError(err)
}
}
private func handleRegisterSuperNak(nakPacket: SDLRegisterSuperNak) {
let errorMessage = nakPacket.errorMessage
guard let errorCode = SDLNAKErrorCode(rawValue: UInt8(nakPacket.errorCode)) else {
@ -428,9 +427,9 @@ actor SDLContextActor {
private func handleEvent(event: SDLEvent) async {
switch event {
// case .dropMacs(let dropMacsEvent):
// SDLLogger.shared.log("[SDLContext] drop macs", level: .info)
// await self.arpServer.dropMacs(macs: dropMacsEvent.macs)
// case .dropMacs(let dropMacsEvent):
// SDLLogger.shared.log("[SDLContext] drop macs", level: .info)
// await self.arpServer.dropMacs(macs: dropMacsEvent.macs)
case .natChanged(let natChangedEvent):
let dstMac = natChangedEvent.mac
SDLLogger.shared.log("[SDLContext] natChangedEvent, dstMac: \(dstMac)", level: .info)
@ -576,7 +575,7 @@ actor SDLContextActor {
self.flowSessionManager.updateSession(reverseFlowSession)
return true
}
//
let proto = ipPacket.header.proto
// 访
@ -594,23 +593,23 @@ actor SDLContextActor {
default:
return false
}
return false
}
//
// public func flowReportTask() {
// Task {
// //
// self.flowTracerCancel = Timer.publish(every: 60.0, on: .main, in: .common).autoconnect()
// .sink { _ in
// Task {
// let (forwardNum, p2pNum, inboundNum) = await self.flowTracer.reset()
// await self.superClient?.flowReport(forwardNum: forwardNum, p2pNum: p2pNum, inboundNum: inboundNum)
// }
// }
// }
// }
// public func flowReportTask() {
// Task {
// //
// self.flowTracerCancel = Timer.publish(every: 60.0, on: .main, in: .common).autoconnect()
// .sink { _ in
// Task {
// let (forwardNum, p2pNum, inboundNum) = await self.flowTracer.reset()
// await self.superClient?.flowReport(forwardNum: forwardNum, p2pNum: p2pNum, inboundNum: inboundNum)
// }
// }
// }
// }
// , 线packetFlow
private func startReader() {
@ -663,9 +662,9 @@ actor SDLContextActor {
}
else {
SDLLogger.shared.log("[SDLContext] dstIp: \(dstIp.asIpAddress()) arp query not found, broadcast", level: .debug)
// // arp广
// let arpReqeust = ARPPacket.arpRequest(senderIP: networkAddr.ip, senderMAC: networkAddr.mac, targetIP: dstIp)
// await self.routeLayerPacket(dstMac: ARPPacket.broadcastMac , type: .arp, data: arpReqeust.marshal())
// // arp广
// let arpReqeust = ARPPacket.arpRequest(senderIP: networkAddr.ip, senderMAC: networkAddr.mac, targetIP: dstIp)
// await self.routeLayerPacket(dstMac: ARPPacket.broadcastMac , type: .arp, data: arpReqeust.marshal())
try? await self.arpServer.arpRequest(targetIp: dstIp, use: self.quicClient)
}

View File

@ -0,0 +1,32 @@
//
// SDLSupervisor.swift
// punchnet
//
// Created by on 2026/3/10.
//
actor SDLSupervisor {
private var loopChildWorkers: [Task<Void, Never>] = []
func addWorker(name: String, _ body: @escaping () async throws -> Void, retryDelay: Duration = .seconds(2)) {
let worker = Task(name: name) {
while !Task.isCancelled {
do {
try await body()
} catch is CancellationError {
break
} catch let err {
SDLLogger.shared.log("[Supervisor] worker \(name) crashed: \(err.localizedDescription)")
try? await Task.sleep(for: retryDelay)
}
}
}
self.loopChildWorkers.append(worker)
}
func stop() {
self.loopChildWorkers.forEach { $0.cancel() }
self.loopChildWorkers.removeAll()
}
}