From d74bc610609a25a15559a3518ea8ee5551513853 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 28 Jan 2026 18:53:03 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E9=80=9A=E8=AE=AF=E6=A8=A1?= =?UTF-8?q?=E5=9E=8B=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Tun/Punchnet/Actors/SDLDNSClientActor.swift | 3 +- Tun/Punchnet/Actors/SDLNATProberActor.swift | 4 +- .../Actors/SDLNoticeClientActor.swift | 70 ++++++++++++ Tun/Punchnet/Actors/SDLUDPHoleActor.swift | 3 - Tun/Punchnet/SDLContext.swift | 103 ++++++++---------- Tun/Punchnet/SDLMessage.pb.swift | 60 ---------- Tun/Punchnet/SDLMessage.swift | 6 - Tun/Punchnet/SDLNoticeClient.swift | 88 --------------- 8 files changed, 120 insertions(+), 217 deletions(-) create mode 100644 Tun/Punchnet/Actors/SDLNoticeClientActor.swift delete mode 100644 Tun/Punchnet/SDLNoticeClient.swift diff --git a/Tun/Punchnet/Actors/SDLDNSClientActor.swift b/Tun/Punchnet/Actors/SDLDNSClientActor.swift index 88fc261..3fd4702 100644 --- a/Tun/Punchnet/Actors/SDLDNSClientActor.swift +++ b/Tun/Punchnet/Actors/SDLDNSClientActor.swift @@ -13,7 +13,6 @@ import NIOPosix @available(macOS 14, *) actor SDLDNSClientActor { private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) - private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: Data.self, bufferingPolicy: .unbounded) private var channel: Channel? private let logger: SDLLogger @@ -54,7 +53,7 @@ actor SDLDNSClientActor { deinit { try? self.group.syncShutdownGracefully() - self.writeContinuation.finish() + self.packetContinuation.finish() } } diff --git a/Tun/Punchnet/Actors/SDLNATProberActor.swift b/Tun/Punchnet/Actors/SDLNATProberActor.swift index 56e7d0c..8fe78ac 100644 --- a/Tun/Punchnet/Actors/SDLNATProberActor.swift +++ b/Tun/Punchnet/Actors/SDLNATProberActor.swift @@ -94,7 +94,7 @@ actor SDLNATProberActor { } /// UDP 层收到 STUN 响应后调用 - func handleProbeReply(from address: SocketAddress, reply: SDLStunProbeReply) async { + func handleProbeReply(reply: SDLStunProbeReply) async { guard let session = self.sessions[reply.cookie] else { return } @@ -104,7 +104,7 @@ actor SDLNATProberActor { // 提前退出的情况,没有nat映射 if let step1 = session.replies[1] { let localAddress = await self.udpHole.getLocalAddress() - if address == localAddress { + if reply.socketAddress() == localAddress { finish(cookie: session.cookieId, .noNat) return } diff --git a/Tun/Punchnet/Actors/SDLNoticeClientActor.swift b/Tun/Punchnet/Actors/SDLNoticeClientActor.swift new file mode 100644 index 0000000..7351a9c --- /dev/null +++ b/Tun/Punchnet/Actors/SDLNoticeClientActor.swift @@ -0,0 +1,70 @@ +// +// SDLNoticeClient.swift +// Tun +// +// Created by 安礼成 on 2024/5/20. +// + +import Foundation + +// +// SDLanServer.swift +// Tun +// +// Created by 安礼成 on 2024/1/31. +// + +import Foundation +import NIOCore +import NIOPosix + +// 处理和sn-server服务器之间的通讯 +actor SDLNoticeClientActor { + private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + private var channel: Channel? + private let remoteAddress: SocketAddress + private let logger: SDLLogger + + // 启动函数 + init(noticePort: Int, logger: SDLLogger) throws { + self.logger = logger + self.remoteAddress = try! SocketAddress(ipAddress: "127.0.0.1", port: noticePort) + } + + func start() throws { + let bootstrap = DatagramBootstrap(group: self.group) + .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) + .channelInitializer { channel in + channel.pipeline.addHandler(SDLNoticeClientInboundHandler()) + } + + self.channel = try bootstrap.bind(host: "0.0.0.0", port: 0).wait() + self.logger.log("[SDLNoticeClient] started", level: .debug) + } + + // 处理写入逻辑 + func send(data: Data) { + guard let channel = self.channel else { + return + } + + let buf = channel.allocator.buffer(bytes: data) + let envelope = AddressedEnvelope(remoteAddress: self.remoteAddress, data: buf) + channel.eventLoop.execute { + channel.writeAndFlush(envelope, promise: nil) + } + } + + deinit { + try? self.group.syncShutdownGracefully() + } + +} + +extension SDLNoticeClientActor { + + private class SDLNoticeClientInboundHandler: ChannelInboundHandler { + typealias InboundIn = AddressedEnvelope + } + +} diff --git a/Tun/Punchnet/Actors/SDLUDPHoleActor.swift b/Tun/Punchnet/Actors/SDLUDPHoleActor.swift index c3f4dd2..73a8d8f 100644 --- a/Tun/Punchnet/Actors/SDLUDPHoleActor.swift +++ b/Tun/Punchnet/Actors/SDLUDPHoleActor.swift @@ -138,9 +138,6 @@ extension SDLUDPHoleActor { case .registerAck: let registerAck = try SDLRegisterAck(serializedBytes: bytes) return .registerAck(registerAck) - case .stunReply: - let stunReply = try SDLStunReply(serializedBytes: bytes) - return .stunReply(stunReply) case .stunProbeReply: let stunProbeReply = try SDLStunProbeReply(serializedBytes: bytes) return .stunProbeReply(stunProbeReply) diff --git a/Tun/Punchnet/SDLContext.swift b/Tun/Punchnet/SDLContext.swift index 3f4b8b6..59e0667 100644 --- a/Tun/Punchnet/SDLContext.swift +++ b/Tun/Punchnet/SDLContext.swift @@ -65,7 +65,7 @@ public class SDLContext { private var monitor: SDLNetworkMonitor? // 内部socket通讯 - private var noticeClient: SDLNoticeClient? + private var noticeClientActor: SDLNoticeClientActor? // 流量统计 private var flowTracer = SDLFlowTracerActor() @@ -137,14 +137,14 @@ public class SDLContext { public func stop() async { self.rootTask?.cancel() self.udpHoleActor = nil - self.noticeClient = nil + self.noticeClientActor = nil self.readTask?.cancel() } private func startNoticeClient() async throws { - self.noticeClient = try await SDLNoticeClient(noticePort: self.config.noticePort, logger: self.logger) - try await self.noticeClient?.start() + self.noticeClientActor = try SDLNoticeClientActor(noticePort: self.config.noticePort, logger: self.logger) + try await self.noticeClientActor?.start() self.logger.log("[SDLContext] notice_client task cancel", level: .warning) } @@ -274,10 +274,8 @@ public class SDLContext { await self.puncherActor.handlePeerInfo(peerInfo: peerInfo) case .event(let event): try await self.handleEvent(event: event) - case .stunReply(let stunReply): - await self.handleStunReply(stunReply: stunReply) - case .stunProbeReply(_): - () + case .stunProbeReply(let probeReply): + await self.proberActor?.handleProbeReply(reply: probeReply) case .data(let data): try await self.handleData(data: data) case .register(let register): @@ -296,7 +294,7 @@ public class SDLContext { // 服务器分配的tun网卡信息 do { let ipAddress = try await self.providerActor.setNetworkSettings(networkAddress: self.config.networkAddress, dnsServer: SDLDNSClientActor.Helper.dnsServer) - await self.noticeClient?.send(data: NoticeMessage.ipAdress(ip: ipAddress)) + await self.noticeClientActor?.send(data: NoticeMessage.ipAdress(ip: ipAddress)) self.startReader() } catch let err { @@ -316,11 +314,11 @@ public class SDLContext { switch errorCode { case .invalidToken, .nodeDisabled: let alertNotice = NoticeMessage.alert(alert: errorMessage) - await self.noticeClient?.send(data: alertNotice) + await self.noticeClientActor?.send(data: alertNotice) exit(-1) case .noIpAddress, .networkFault, .internalFault: let alertNotice = NoticeMessage.alert(alert: errorMessage) - await self.noticeClient?.send(data: alertNotice) + await self.noticeClientActor?.send(data: alertNotice) } self.logger.log("[SDLContext] Get a SuperNak message exit", level: .warning) @@ -346,18 +344,11 @@ public class SDLContext { case .networkShutdown(let shutdownEvent): let alertNotice = NoticeMessage.alert(alert: shutdownEvent.message) - await self.noticeClient?.send(data: alertNotice) + await self.noticeClientActor?.send(data: alertNotice) exit(-1) } } -//// case .ready: -//// await self.puncherActor.setUDPHoleActor(udpHoleActor: self.udpHoleActor) -//// // 获取当前网络的类型 -//// self.natType = await getNatType() -//// self.logger.log("[SDLContext] broadcast is: \(self.natType)", level: .debug) -// - private func handleRegister(remoteAddress: SocketAddress, register: SDLRegister) async throws { let networkAddr = config.networkAddress self.logger.log("register packet: \(register), network_address: \(networkAddr)", level: .debug) @@ -390,15 +381,6 @@ public class SDLContext { } } - private func handleStunReply(stunReply: SDLStunReply) async { -// let cookie = stunReply.cookie -// if cookie == self.lastCookie { -// // 记录下当前在nat上的映射信息,暂时没有用;后续会用来判断网络类型 -// //self.natAddress = stunReply.natAddress -// self.logger.log("[SDLContext] get a stunReply: \(try! stunReply.jsonString())", level: .debug) -// } - } - private func handleData(data: SDLData) async throws { let mac = LayerPacket.MacAddress(data: data.dstMac) @@ -470,46 +452,55 @@ public class SDLContext { self.readTask = Task(priority: .high) { repeat { let packets = await self.providerActor.readPackets() - for packet in packets { - await self.dealPacket(data: packet) - } + let ipPackets = packets.compactMap { IPPacket($0) } + await self.batchProcessPackets(batchSize: 20, packets: ipPackets) } while true } } - // 处理读取的每个数据包 - private func dealPacket(data: Data) async { - guard let packet = IPPacket(data) else { - return + // 批量分发ip数据包 + private func batchProcessPackets(batchSize: Int, packets: [IPPacket]) async { + for startIndex in stride(from: 0, to: packets.count, by: batchSize) { + let endIndex = Swift.min(startIndex + batchSize, packets.count) + + let chunkPackets = packets[startIndex..(decoder: inout D) throws { - while let fieldNumber = try decoder.nextFieldNumber() { - // The use of inline closures is to circumvent an issue where the compiler - // allocates stack space for every case branch when no optimizations are - // enabled. https://github.com/apple/swift-protobuf/issues/1034 - switch fieldNumber { - case 1: try { try decoder.decodeSingularUInt32Field(value: &self.networkID) }() - case 2: try { try decoder.decodeSingularUInt32Field(value: &self.code) }() - case 3: try { try decoder.decodeSingularStringField(value: &self.message) }() - default: break - } - } - } - - func traverse(visitor: inout V) throws { - if self.networkID != 0 { - try visitor.visitSingularUInt32Field(value: self.networkID, fieldNumber: 1) - } - if self.code != 0 { - try visitor.visitSingularUInt32Field(value: self.code, fieldNumber: 2) - } - if !self.message.isEmpty { - try visitor.visitSingularStringField(value: self.message, fieldNumber: 3) - } - try unknownFields.traverse(visitor: &visitor) - } - - static func ==(lhs: SDLStunReply, rhs: SDLStunReply) -> Bool { - if lhs.networkID != rhs.networkID {return false} - if lhs.code != rhs.code {return false} - if lhs.message != rhs.message {return false} - if lhs.unknownFields != rhs.unknownFields {return false} - return true - } -} - extension SDLData: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { static let protoMessageName: String = "SDLData" static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ diff --git a/Tun/Punchnet/SDLMessage.swift b/Tun/Punchnet/SDLMessage.swift index 0a67f0c..079a5de 100644 --- a/Tun/Punchnet/SDLMessage.swift +++ b/Tun/Punchnet/SDLMessage.swift @@ -27,10 +27,6 @@ enum SDLPacketType: UInt8 { // 事件类型 case event = 0x10 - // 推送命令消息, 需要返回值 - case command = 0x11 - case commandAck = 0x12 - // 流量统计 case flowTracer = 0x15 @@ -38,7 +34,6 @@ enum SDLPacketType: UInt8 { case registerAck = 0x21 case stunRequest = 0x30 - case stunReply = 0x31 case stunProbe = 0x32 case stunProbeReply = 0x33 @@ -106,7 +101,6 @@ enum SDLHoleInboundMessage { case peerInfo(SDLPeerInfo) case event(SDLEvent) - case stunReply(SDLStunReply) case stunProbeReply(SDLStunProbeReply) case data(SDLData) diff --git a/Tun/Punchnet/SDLNoticeClient.swift b/Tun/Punchnet/SDLNoticeClient.swift deleted file mode 100644 index 7155538..0000000 --- a/Tun/Punchnet/SDLNoticeClient.swift +++ /dev/null @@ -1,88 +0,0 @@ -// -// SDLNoticeClient.swift -// Tun -// -// Created by 安礼成 on 2024/5/20. -// - -import Foundation - -// -// SDLanServer.swift -// Tun -// -// Created by 安礼成 on 2024/1/31. -// - -import Foundation -import NIOCore -import NIOPosix - -// 处理和sn-server服务器之间的通讯 -actor SDLNoticeClient { - private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) - private let asyncChannel: NIOAsyncChannel, AddressedEnvelope> - private let remoteAddress: SocketAddress - private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: Data.self, bufferingPolicy: .unbounded) - - private let logger: SDLLogger - - // 启动函数 - init(noticePort: Int, logger: SDLLogger) async throws { - self.logger = logger - - self.remoteAddress = try! SocketAddress(ipAddress: "127.0.0.1", port: noticePort) - - let bootstrap = DatagramBootstrap(group: self.group) - .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) - - self.asyncChannel = try await bootstrap.bind(host: "0.0.0.0", port: 0) - .flatMapThrowing {channel in - return try NIOAsyncChannel(wrappingChannelSynchronously: channel, configuration: .init( - inboundType: AddressedEnvelope.self, - outboundType: AddressedEnvelope.self - )) - } - .get() - - self.logger.log("[SDLNoticeClient] started and listening on: \(self.asyncChannel.channel.localAddress!)", level: .debug) - } - - func start() async throws { - try await self.asyncChannel.executeThenClose { inbound, outbound in - try await withThrowingTaskGroup(of: Void.self) { group in - group.addTask { - try await self.asyncChannel.channel.closeFuture.get() - throw SDLError.socketClosed - } - - group.addTask { - defer { - self.writeContinuation.finish() - } - - for try await message in self.writeStream { - let buf = self.asyncChannel.channel.allocator.buffer(bytes: message) - let envelope = AddressedEnvelope(remoteAddress: self.remoteAddress, data: buf) - - try await outbound.write(envelope) - } - } - - for try await _ in group { - - } - } - } - } - - // 处理写入逻辑 - func send(data: Data) { - self.writeContinuation.yield(data) - } - - deinit { - try? self.group.syncShutdownGracefully() - self.writeContinuation.finish() - } -}