From b1c6b45f35f042ea0e93544af3e8011ddfebf99e Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 4 Feb 2026 13:56:05 +0800 Subject: [PATCH] =?UTF-8?q?actor=E7=9A=84=E6=9C=80=E5=B0=8F=E4=BE=9D?= =?UTF-8?q?=E8=B5=96=E5=8E=9F=E5=88=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Tun/Punchnet/Actors/SDLNATProberActor.swift | 23 ++++----- Tun/Punchnet/Actors/SDLPuncherActor.swift | 40 +++++----------- Tun/Punchnet/SDLContextActor.swift | 53 +++++++++++---------- 3 files changed, 51 insertions(+), 65 deletions(-) diff --git a/Tun/Punchnet/Actors/SDLNATProberActor.swift b/Tun/Punchnet/Actors/SDLNATProberActor.swift index f9ca19f..93c2c99 100644 --- a/Tun/Punchnet/Actors/SDLNATProberActor.swift +++ b/Tun/Punchnet/Actors/SDLNATProberActor.swift @@ -52,8 +52,7 @@ actor SDLNATProberActor { // MARK: - Dependencies - private let udpHole: SDLUDPHole - private let addressArray: [[SocketAddress]] + nonisolated private let addressArray: [[SocketAddress]] // MARK: - Completion private var cookieId: UInt32 = 1 @@ -62,14 +61,13 @@ actor SDLNATProberActor { // MARK: - Init - init(udpHole: SDLUDPHole, addressArray: [[SocketAddress]]) { - self.udpHole = udpHole + init(addressArray: [[SocketAddress]]) { self.addressArray = addressArray } // MARK: - Public API - func probeNatType() async -> NatType { + func probeNatType(using udpHole: SDLUDPHole) async -> NatType { let cookieId = self.cookieId self.cookieId &+= 1 @@ -86,13 +84,13 @@ actor SDLNATProberActor { ) self.sessions[cookieId] = session Task { - await self.sendProbe(cookie: cookieId) + await self.sendProbe(using: udpHole, cookie: cookieId) } } } /// UDP 层收到 STUN 响应后调用 - func handleProbeReply(reply: SDLStunProbeReply) async { + func handleProbeReply(localAddress: SocketAddress?, reply: SDLStunProbeReply) async { guard let session = self.sessions[reply.cookie] else { return } @@ -101,7 +99,6 @@ actor SDLNATProberActor { // 提前退出的情况,没有nat映射 if session.replies[1] != nil { - let localAddress = self.udpHole.getLocalAddress() if await reply.socketAddress() == localAddress { finish(cookie: session.cookieId, .noNat) return @@ -160,11 +157,11 @@ actor SDLNATProberActor { // MARK: - Internal helpers - private func sendProbe(cookie: UInt32) async { - self.udpHole.send(type: .stunProbe, data: makeProbePacket(cookieId: cookie, step: 1, attr: .none), remoteAddress: addressArray[0][0]) - self.udpHole.send(type: .stunProbe, data: makeProbePacket(cookieId: cookie, step: 2, attr: .none), remoteAddress: addressArray[1][1]) - self.udpHole.send(type: .stunProbe, data: makeProbePacket(cookieId: cookie, step: 3, attr: .peer), remoteAddress: addressArray[0][0]) - self.udpHole.send(type: .stunProbe, data: makeProbePacket(cookieId: cookie, step: 4, attr: .port), remoteAddress: addressArray[0][0]) + private func sendProbe(using udpHole: SDLUDPHole, cookie: UInt32) async { + udpHole.send(type: .stunProbe, data: makeProbePacket(cookieId: cookie, step: 1, attr: .none), remoteAddress: addressArray[0][0]) + udpHole.send(type: .stunProbe, data: makeProbePacket(cookieId: cookie, step: 2, attr: .none), remoteAddress: addressArray[1][1]) + udpHole.send(type: .stunProbe, data: makeProbePacket(cookieId: cookie, step: 3, attr: .peer), remoteAddress: addressArray[0][0]) + udpHole.send(type: .stunProbe, data: makeProbePacket(cookieId: cookie, step: 4, attr: .port), remoteAddress: addressArray[0][0]) } private func makeProbePacket(cookieId: UInt32, step: UInt32, attr: SDLProbeAttr) -> Data { diff --git a/Tun/Punchnet/Actors/SDLPuncherActor.swift b/Tun/Punchnet/Actors/SDLPuncherActor.swift index 2bdf295..e88671e 100644 --- a/Tun/Punchnet/Actors/SDLPuncherActor.swift +++ b/Tun/Punchnet/Actors/SDLPuncherActor.swift @@ -9,20 +9,16 @@ import Foundation import NIOCore actor SDLPuncherActor { + nonisolated private let cooldown: Duration = .seconds(5) + // dstMac private var coolingDown: Set = [] - private let cooldown: Duration = .seconds(5) - - private var udpHole: SDLUDPHole - private var pktId: UInt32 = 1 // 提交后还没有响应的请求 - private var pendingRequests: [UInt32:RegisterRequest] = [:] + private var pendingRequests: [UInt32: RegisterRequest] = [:] // 处理holer - private var querySocketAddress: SocketAddress - - nonisolated private let(requestStream, requestContinuation) = AsyncStream.makeStream(of: RegisterRequest.self, bufferingPolicy: .unbounded) + nonisolated private let querySocketAddress: SocketAddress struct RegisterRequest { let srcMac: Data @@ -30,25 +26,13 @@ actor SDLPuncherActor { let networkId: UInt32 } - init(udpHole: SDLUDPHole, querySocketAddress: SocketAddress) { - self.udpHole = udpHole + init(querySocketAddress: SocketAddress) { self.querySocketAddress = querySocketAddress } - nonisolated func submitRegisterRequest(request: RegisterRequest) { - self.requestContinuation.yield(request) - } - - func startConsuming() async { - for await request in self.requestStream { - self.submitRegisterRequest(request: request) - } - } - - private func handleRegisterRequest(request: RegisterRequest) { + func submitRegisterRequest(using udpHole: SDLUDPHole?, request: RegisterRequest) { let dstMac = request.dstMac - - guard !coolingDown.contains(dstMac) else { + guard let udpHole, !coolingDown.contains(dstMac) else { return } @@ -60,9 +44,9 @@ actor SDLPuncherActor { if self.pktId == 0 { self.pktId = 1 } + self.tryHole(using: udpHole, pktId: pktId, request: request) Task { - await self.tryHole(pktId: pktId, request: request) // 启动冷却期 try? await Task.sleep(for: .seconds(5)) self.endCooldown(for: dstMac) @@ -70,7 +54,7 @@ actor SDLPuncherActor { } } - func handlePeerInfo(peerInfo: SDLPeerInfo) async { + func handlePeerInfo(using udpHole: SDLUDPHole, peerInfo: SDLPeerInfo) async { if let request = pendingRequests.removeValue(forKey: peerInfo.pktID) { if let remoteAddress = try? await peerInfo.v4Info.socketAddress() { SDLLogger.shared.log("[SDLContext] hole sock address: \(remoteAddress)", level: .debug) @@ -80,7 +64,7 @@ actor SDLPuncherActor { register.srcMac = request.srcMac register.dstMac = request.dstMac - self.udpHole.send(type: .register, data: try! register.serializedData(), remoteAddress: remoteAddress) + udpHole.send(type: .register, data: try! register.serializedData(), remoteAddress: remoteAddress) } else { SDLLogger.shared.log("[SDLContext] hole sock address is invalid: \(peerInfo.v4Info)", level: .warning) } @@ -95,14 +79,14 @@ actor SDLPuncherActor { self.pendingRequests.removeValue(forKey: pktId) } - private func tryHole(pktId: UInt32, request: RegisterRequest) async { + private func tryHole(using udpHole: SDLUDPHole, pktId: UInt32, request: RegisterRequest) { var queryInfo = SDLQueryInfo() queryInfo.pktID = pktId queryInfo.dstMac = request.dstMac self.pendingRequests[pktId] = request if let queryData = try? queryInfo.serializedData() { - self.udpHole.send(type: .queryInfo, data: queryData, remoteAddress: self.querySocketAddress) + udpHole.send(type: .queryInfo, data: queryData, remoteAddress: self.querySocketAddress) } } } diff --git a/Tun/Punchnet/SDLContextActor.swift b/Tun/Punchnet/SDLContextActor.swift index 5db1c4d..6234900 100644 --- a/Tun/Punchnet/SDLContextActor.swift +++ b/Tun/Punchnet/SDLContextActor.swift @@ -36,13 +36,13 @@ actor SDLContextActor { private var udpHoleWorkers: [Task]? nonisolated let providerAdapter: SDLTunnelProviderAdapter - var puncherActor: SDLPuncherActor? // dns的client对象 private var dnsClient: SDLDNSClient? private var dnsWorker: Task? + nonisolated private let puncherActor: SDLPuncherActor // 网络探测对象 - var proberActor: SDLNATProberActor? + nonisolated private let proberActor: SDLNATProberActor // 数据包读取任务 private var readTask: Task<(), Never>? @@ -71,6 +71,9 @@ actor SDLContextActor { self.sessionManager = SessionManager() self.arpServer = ArpServer(known_macs: [:]) self.providerAdapter = SDLTunnelProviderAdapter(provider: provider) + + self.puncherActor = SDLPuncherActor(querySocketAddress: config.stunSocketAddress) + self.proberActor = SDLNATProberActor(addressArray: config.stunProbeSocketAddressArray) } public func start() { @@ -165,6 +168,10 @@ actor SDLContextActor { let udpHole = try SDLUDPHole() try udpHole.start() SDLLogger.shared.log("[SDLContext] udpHole started") + + // 获取当前udp启动的地址 + let localAddress = udpHole.getLocalAddress() + self.udpHole = udpHole await udpHole.channelIsActived() @@ -209,11 +216,11 @@ actor SDLContextActor { case .registerSuperNak(let registerSuperNak): await self.handleRegisterSuperNak(nakPacket: registerSuperNak) case .peerInfo(let peerInfo): - await self.puncherActor?.handlePeerInfo(peerInfo: peerInfo) + await self.puncherActor.handlePeerInfo(using: udpHole, peerInfo: peerInfo) case .event(let event): try? await self.handleEvent(event: event) case .stunProbeReply(let probeReply): - await self.proberActor?.handleProbeReply(reply: probeReply) + await self.proberActor.handleProbeReply(localAddress: localAddress, reply: probeReply) case .register(let register): try? await self.handleRegister(remoteAddress: remoteAddress, register: register) case .registerAck(let registerAck): @@ -255,16 +262,11 @@ actor SDLContextActor { return } - self.puncherActor = SDLPuncherActor(udpHole: udpHole, querySocketAddress: config.stunSocketAddress) - self.proberActor = SDLNATProberActor(udpHole: udpHole, addressArray: self.config.stunProbeSocketAddressArray) - // 开始探测nat的类型 - Task.detached { - await Task.yield() - if let natType = await self.proberActor?.probeNatType() { - await self.setNatType(natType: natType) - SDLLogger.shared.log("[SDLContext] nat_type is: \(natType)") - } + Task { + let natType = await self.proberActor.probeNatType(using: udpHole) + self.setNatType(natType: natType) + SDLLogger.shared.log("[SDLContext] nat_type is: \(natType)") } // 注册 @@ -410,7 +412,7 @@ actor SDLContextActor { } } - private func handleData(data: SDLData) throws { + private func handleData(data: SDLData) async throws { guard let aesKey = self.aesKey else { return } @@ -439,7 +441,7 @@ actor SDLContextActor { case .request: SDLLogger.shared.log("[SDLContext] get arp request packet", level: .debug) let response = ARPPacket.arpResponse(for: arpPacket, mac: networkAddr.mac, ip: networkAddr.ip) - self.routeLayerPacket(dstMac: arpPacket.senderMAC, type: .arp, data: response.marshal()) + await self.routeLayerPacket(dstMac: arpPacket.senderMAC, type: .arp, data: response.marshal()) case .response: SDLLogger.shared.log("[SDLContext] get arp response packet", level: .debug) self.arpServer.append(ip: arpPacket.senderIP, mac: arpPacket.senderMAC) @@ -498,7 +500,7 @@ actor SDLContextActor { } // 处理读取的每个数据包 - private func dealPacket(packet: IPPacket) { + private func dealPacket(packet: IPPacket) async { let networkAddr = self.config.networkAddress if SDLDNSClient.Helper.isDnsRequestPacket(ipPacket: packet) { let destIp = packet.header.destination_ip @@ -517,21 +519,21 @@ actor SDLContextActor { // 查找arp缓存中是否有目标mac地址 if let dstMac = self.arpServer.query(ip: dstIp) { - self.routeLayerPacket(dstMac: dstMac, type: .ipv4, data: packet.data) + await self.routeLayerPacket(dstMac: dstMac, type: .ipv4, data: packet.data) } else { SDLLogger.shared.log("[SDLContext] dstIp: \(dstIp.asIpAddress()) arp query not found, broadcast", level: .debug) // 构造arp广播 let arpReqeust = ARPPacket.arpRequest(senderIP: networkAddr.ip, senderMAC: networkAddr.mac, targetIP: dstIp) - self.routeLayerPacket(dstMac: ARPPacket.broadcastMac , type: .arp, data: arpReqeust.marshal()) + await self.routeLayerPacket(dstMac: ARPPacket.broadcastMac , type: .arp, data: arpReqeust.marshal()) } } - private func routeLayerPacket(dstMac: Data, type: LayerPacket.PacketType, data: Data) { + private func routeLayerPacket(dstMac: Data, type: LayerPacket.PacketType, data: Data) async { let networkAddr = self.config.networkAddress // 将数据封装层2层的数据包 let layerPacket = LayerPacket(dstMac: dstMac, srcMac: networkAddr.mac, type: type, data: data) - guard let aesKey = self.aesKey, let encodedPacket = try? self.aesCipher.encrypt(aesKey: aesKey, data: layerPacket.marshal()) else { + guard let udpHole = self.udpHole, let aesKey = self.aesKey, let encodedPacket = try? self.aesCipher.encrypt(aesKey: aesKey, data: layerPacket.marshal()) else { return } @@ -547,22 +549,25 @@ actor SDLContextActor { // 广播地址不要去尝试打洞 if ARPPacket.isBroadcastMac(dstMac) { // 通过super_node进行转发 - self.udpHole?.send(type: .data, data: data, remoteAddress: self.config.stunSocketAddress) + udpHole.send(type: .data, data: data, remoteAddress: self.config.stunSocketAddress) } else { // 通过session发送到对端 if let session = self.sessionManager.getSession(toAddress: dstMac) { SDLLogger.shared.log("[SDLContext] send packet by session: \(session)", level: .debug) - self.udpHole?.send(type: .data, data: data, remoteAddress: session.natAddress) + udpHole.send(type: .data, data: data, remoteAddress: session.natAddress) self.flowTracer.inc(num: data.count, type: .p2p) } else { // 通过super_node进行转发 - self.udpHole?.send(type: .data, data: data, remoteAddress: self.config.stunSocketAddress) + udpHole.send(type: .data, data: data, remoteAddress: self.config.stunSocketAddress) // 流量统计 self.flowTracer.inc(num: data.count, type: .forward) + // 尝试打洞 - self.puncherActor?.submitRegisterRequest(request: .init(srcMac: networkAddr.mac, dstMac: dstMac, networkId: networkAddr.networkId)) + Task.detached { + await self.puncherActor.submitRegisterRequest(using: udpHole, request: .init(srcMac: networkAddr.mac, dstMac: dstMac, networkId: networkAddr.networkId)) + } } } }