This commit is contained in:
anlicheng 2026-01-30 13:39:58 +08:00
parent e40a266b13
commit d4390f5117
3 changed files with 38 additions and 19 deletions

View File

@ -13,7 +13,7 @@ actor SDLPuncherActor {
private var coolingDown: Set<Data> = [] private var coolingDown: Set<Data> = []
private let cooldown: Duration = .seconds(5) private let cooldown: Duration = .seconds(5)
private var udpHole: SDLUDPHole? private var udpHole: SDLUDPHole
private var pktId: UInt32 = 1 private var pktId: UInt32 = 1
// //
@ -23,22 +23,31 @@ actor SDLPuncherActor {
private var logger: SDLLogger private var logger: SDLLogger
private var querySocketAddress: SocketAddress private var querySocketAddress: SocketAddress
private let(requestStream, requestContinuation) = AsyncStream.makeStream(of: RegisterRequest.self, bufferingPolicy: .unbounded)
struct RegisterRequest { struct RegisterRequest {
let srcMac: Data let srcMac: Data
let dstMac: Data let dstMac: Data
let networkId: UInt32 let networkId: UInt32
} }
init(querySocketAddress: SocketAddress, logger: SDLLogger) { init(udpHole: SDLUDPHole, querySocketAddress: SocketAddress, logger: SDLLogger) {
self.udpHole = udpHole
self.querySocketAddress = querySocketAddress self.querySocketAddress = querySocketAddress
self.logger = logger self.logger = logger
} }
func setUDPHoleActor(udpHole: SDLUDPHole?) { nonisolated func submitRegisterRequest(request: RegisterRequest) {
self.udpHole = udpHole self.requestContinuation.yield(request)
} }
func submitRegisterRequest(request: RegisterRequest) { func startConsuming() async {
for await request in self.requestStream {
self.submitRegisterRequest(request: request)
}
}
private func handleRegisterRequest(request: RegisterRequest) {
let dstMac = request.dstMac let dstMac = request.dstMac
guard !coolingDown.contains(dstMac) else { guard !coolingDown.contains(dstMac) else {
@ -73,7 +82,7 @@ actor SDLPuncherActor {
register.srcMac = request.srcMac register.srcMac = request.srcMac
register.dstMac = request.dstMac register.dstMac = request.dstMac
self.udpHole?.send(type: .register, data: try! register.serializedData(), remoteAddress: remoteAddress) self.udpHole.send(type: .register, data: try! register.serializedData(), remoteAddress: remoteAddress)
} else { } else {
self.logger.log("[SDLContext] hole sock address is invalid: \(peerInfo.v4Info)", level: .warning) self.logger.log("[SDLContext] hole sock address is invalid: \(peerInfo.v4Info)", level: .warning)
} }
@ -95,7 +104,7 @@ actor SDLPuncherActor {
self.pendingRequests[pktId] = request self.pendingRequests[pktId] = request
if let queryData = try? queryInfo.serializedData() { if let queryData = try? queryInfo.serializedData() {
self.udpHole?.send(type: .queryInfo, data: queryData, remoteAddress: self.querySocketAddress) self.udpHole.send(type: .queryInfo, data: queryData, remoteAddress: self.querySocketAddress)
} }
} }
} }

View File

@ -45,7 +45,7 @@ public class SDLContext {
// //
var udpHole: SDLUDPHole? var udpHole: SDLUDPHole?
var providerAdapter: SDLTunnelProviderAdapter var providerAdapter: SDLTunnelProviderAdapter
var puncherActor: SDLPuncherActor var puncherActor: SDLPuncherActor?
// dnsclient // dnsclient
var dnsClient: SDLDNSClient? var dnsClient: SDLDNSClient?
@ -68,7 +68,7 @@ public class SDLContext {
private var noticeClient: SDLNoticeClient? private var noticeClient: SDLNoticeClient?
// //
private var flowTracer = SDLFlowTracerActor() private var flowTracer = SDLFlowTracer()
private var flowTracerCancel: AnyCancellable? private var flowTracerCancel: AnyCancellable?
private let logger: SDLLogger private let logger: SDLLogger
@ -83,7 +83,6 @@ public class SDLContext {
self.sessionManager = SessionManager() self.sessionManager = SessionManager()
self.arpServer = ArpServer(known_macs: [:]) self.arpServer = ArpServer(known_macs: [:])
self.providerAdapter = SDLTunnelProviderAdapter(provider: provider, logger: logger) self.providerAdapter = SDLTunnelProviderAdapter(provider: provider, logger: logger)
self.puncherActor = SDLPuncherActor(querySocketAddress: config.stunSocketAddress, logger: logger)
} }
public func start() async throws { public func start() async throws {
@ -160,7 +159,7 @@ public class SDLContext {
case .registerSuperNak(let registerSuperNak): case .registerSuperNak(let registerSuperNak):
await self.handleRegisterSuperNak(nakPacket: registerSuperNak) await self.handleRegisterSuperNak(nakPacket: registerSuperNak)
case .peerInfo(let peerInfo): case .peerInfo(let peerInfo):
await self.puncherActor.handlePeerInfo(peerInfo: peerInfo) await self.puncherActor?.handlePeerInfo(peerInfo: peerInfo)
case .event(let event): case .event(let event):
try await self.handleEvent(event: event) try await self.handleEvent(event: event)
case .stunProbeReply(let probeReply): case .stunProbeReply(let probeReply):
@ -214,7 +213,9 @@ public class SDLContext {
} }
private func handleUDPHoleReady() async throws { private func handleUDPHoleReady() async throws {
await self.puncherActor.setUDPHoleActor(udpHole: self.udpHole) if let udpHole = self.udpHole {
self.puncherActor = SDLPuncherActor(udpHole: udpHole, querySocketAddress: config.stunSocketAddress, logger: logger)
}
await withTaskGroup(of: Void.self) { group in await withTaskGroup(of: Void.self) { group in
group.addTask { group.addTask {
@ -369,7 +370,7 @@ public class SDLContext {
let layerPacket = try LayerPacket(layerData: decyptedData) let layerPacket = try LayerPacket(layerData: decyptedData)
await self.flowTracer.inc(num: decyptedData.count, type: .inbound) self.flowTracer.inc(num: decyptedData.count, type: .inbound)
// arp // arp
switch layerPacket.type { switch layerPacket.type {
case .arp: case .arp:
@ -507,15 +508,15 @@ public class SDLContext {
if let session = self.sessionManager.getSession(toAddress: dstMac) { if let session = self.sessionManager.getSession(toAddress: dstMac) {
self.logger.log("[SDLContext] send packet by session: \(session)", level: .debug) self.logger.log("[SDLContext] send packet by session: \(session)", level: .debug)
self.udpHole?.send(type: .data, data: data, remoteAddress: session.natAddress) self.udpHole?.send(type: .data, data: data, remoteAddress: session.natAddress)
await self.flowTracer.inc(num: data.count, type: .p2p) self.flowTracer.inc(num: data.count, type: .p2p)
} }
else { else {
// super_node // super_node
self.udpHole?.send(type: .data, data: data, remoteAddress: self.config.stunSocketAddress) self.udpHole?.send(type: .data, data: data, remoteAddress: self.config.stunSocketAddress)
// //
await self.flowTracer.inc(num: data.count, type: .forward) self.flowTracer.inc(num: data.count, type: .forward)
// //
await self.puncherActor.submitRegisterRequest(request: .init(srcMac: networkAddr.mac, dstMac: dstMac, networkId: networkAddr.networkId)) self.puncherActor?.submitRegisterRequest(request: .init(srcMac: networkAddr.mac, dstMac: dstMac, networkId: networkAddr.networkId))
} }
} }
} }

View File

@ -6,9 +6,10 @@
// //
import Foundation import Foundation
import Darwin
// //
actor SDLFlowTracerActor { final class SDLFlowTracer {
enum FlowType { enum FlowType {
case forward case forward
case p2p case p2p
@ -19,7 +20,14 @@ actor SDLFlowTracerActor {
private var p2pFlowBytes: UInt32 = 0 private var p2pFlowBytes: UInt32 = 0
private var inFlowBytes: UInt32 = 0 private var inFlowBytes: UInt32 = 0
private let lock = NSLock()
func inc(num: Int, type: FlowType) { func inc(num: Int, type: FlowType) {
lock.lock()
defer {
lock.unlock()
}
switch type { switch type {
case .inbound: case .inbound:
self.inFlowBytes += UInt32(num) self.inFlowBytes += UInt32(num)
@ -31,13 +39,14 @@ actor SDLFlowTracerActor {
} }
func reset() -> (UInt32, UInt32, UInt32) { func reset() -> (UInt32, UInt32, UInt32) {
lock.lock()
defer { defer {
self.forwardFlowBytes = 0 self.forwardFlowBytes = 0
self.inFlowBytes = 0 self.inFlowBytes = 0
self.p2pFlowBytes = 0 self.p2pFlowBytes = 0
lock.unlock()
} }
return (forwardFlowBytes, p2pFlowBytes, inFlowBytes) return (forwardFlowBytes, p2pFlowBytes, inFlowBytes)
} }
} }