diff --git a/Tun/Punchnet/SDLAddressResolver.swift b/Tun/Punchnet/SDLAddressResolver.swift new file mode 100644 index 0000000..ce40988 --- /dev/null +++ b/Tun/Punchnet/SDLAddressResolver.swift @@ -0,0 +1,42 @@ +// +// SDLAddressResolverPool.swift +// Tun +// +// Created by 安礼成 on 2026/2/3. +// + +import Foundation +import NIOCore +import NIOPosix + +actor SDLAddressResolver { + static let shared = SDLAddressResolver(threads: System.coreCount) + + private let pool: NIOThreadPool + private var cache: [String: SocketAddress] = [:] + + private init(threads: Int = 2) { + self.pool = NIOThreadPool(numberOfThreads: threads) + self.pool.start() + } + + func resolve(host: String, port: Int) async throws -> SocketAddress { + let key = "\(host):\(port)" + + if let cached = cache[key] { + return cached + } + + let address = try await pool.runIfActive { + try SocketAddress.makeAddressResolvingHost(host, port: port) + } + + cache[key] = address + return address + } + + deinit { + pool.shutdownGracefully { _ in } + } + +} diff --git a/Tun/Punchnet/SDLContextActor.swift b/Tun/Punchnet/SDLContextActor.swift index 939a9cd..07913a8 100644 --- a/Tun/Punchnet/SDLContextActor.swift +++ b/Tun/Punchnet/SDLContextActor.swift @@ -78,18 +78,21 @@ actor SDLContextActor { self.loopChildWorkers.append(spawnLoop { let noticeClient = try self.startNoticeClient() + self.logger.log("[SDLContext] noticeClient running!!!!") try await noticeClient.waitClose() self.logger.log("[SDLContext] noticeClient closed!!!!") }) self.loopChildWorkers.append(spawnLoop { let dnsClient = try await self.startDnsClient() + self.logger.log("[SDLContext] dns running!!!!") try await dnsClient.waitClose() self.logger.log("[SDLContext] dns closed!!!!") }) self.loopChildWorkers.append(spawnLoop { let udpHole = try await self.startUDPHole() + self.logger.log("[SDLContext] udp running!!!!") try await udpHole.waitClose() self.logger.log("[SDLContext] udp closed!!!!") }) @@ -98,6 +101,8 @@ actor SDLContextActor { private func startNoticeClient() throws -> SDLNoticeClient { // 启动noticeClient let noticeClient = try SDLNoticeClient(noticePort: self.config.noticePort, logger: self.logger) + noticeClient.start() + self.logger.log("[SDLContext] noticeClient started") self.noticeClient = noticeClient @@ -114,13 +119,13 @@ actor SDLContextActor { self.logger.log("[SDLContext] monitor started") self.monitor = monitor - self.monitorWorker = Task { + self.monitorWorker = Task.detached { for await event in monitor.eventStream { switch event { case .changed: // 需要重新探测网络的nat类型 //self.natType = await self.getNatType() - self.logger.log("didNetworkPathChanged, nat type is: \(self.natType)", level: .info) + self.logger.log("didNetworkPathChanged, nat type is:", level: .info) case .unreachable: self.logger.log("didNetworkPathUnreachable", level: .warning) } @@ -138,7 +143,7 @@ actor SDLContextActor { try dnsClient.start() self.logger.log("[SDLContext] dnsClient started") self.dnsClient = dnsClient - self.dnsWorker = Task { + self.dnsWorker = Task.detached { // 处理事件流 for await packet in dnsClient.packetFlow { if Task.isCancelled { @@ -184,17 +189,17 @@ actor SDLContextActor { } // 处理数据流 - let dataTask = Task { + let dataTask = Task.detached { for await data in udpHole.dataStream { if Task.isCancelled { break } - try? self.handleData(data: data) + try? await self.handleData(data: data) } } // 处理控制信号 - let signalTask = Task { + let signalTask = Task.detached { for await(remoteAddress, signal) in udpHole.signalStream { if Task.isCancelled { break @@ -204,17 +209,17 @@ actor SDLContextActor { case .registerSuperAck(let registerSuperAck): await self.handleRegisterSuperAck(registerSuperAck: registerSuperAck) case .registerSuperNak(let registerSuperNak): - self.handleRegisterSuperNak(nakPacket: registerSuperNak) + await self.handleRegisterSuperNak(nakPacket: registerSuperNak) case .peerInfo(let peerInfo): await self.puncherActor?.handlePeerInfo(peerInfo: peerInfo) case .event(let event): - try? self.handleEvent(event: event) + try? await self.handleEvent(event: event) case .stunProbeReply(let probeReply): await self.proberActor?.handleProbeReply(reply: probeReply) case .register(let register): - try? self.handleRegister(remoteAddress: remoteAddress, register: register) + try? await self.handleRegister(remoteAddress: remoteAddress, register: register) case .registerAck(let registerAck): - self.handleRegisterAck(remoteAddress: remoteAddress, registerAck: registerAck) + await self.handleRegisterAck(remoteAddress: remoteAddress, registerAck: registerAck) } } } @@ -305,9 +310,11 @@ actor SDLContextActor { // 服务器分配的tun网卡信息 do { let ipAddress = try await self.providerAdapter.setNetworkSettings(networkAddress: self.config.networkAddress, dnsServer: SDLDNSClient.Helper.dnsServer) + self.logger.log("[SDLContext] setNetworkSettings successed") self.noticeClient?.send(data: NoticeMessage.ipAdress(ip: ipAddress)) - + self.logger.log("[SDLContext] send ip successed") self.startReader() + self.logger.log("[SDLContext] reader started") } catch let err { self.logger.log("[SDLContext] setTunnelNetworkSettings get error: \(err)", level: .error) exit(-1) @@ -464,7 +471,7 @@ actor SDLContextActor { self.readTask?.cancel() // 开启新的任务 - self.readTask = Task(priority: .high) { + self.readTask = Task.detached(priority: .high) { while true { if Task.isCancelled { return @@ -473,7 +480,7 @@ actor SDLContextActor { let packets = await self.providerAdapter.readPackets() let ipPackets = packets.compactMap { IPPacket($0) } for ipPacket in ipPackets { - self.dealPacket(packet: ipPacket) + await self.dealPacket(packet: ipPacket) } } } diff --git a/Tun/Punchnet/SDLNoticeClient.swift b/Tun/Punchnet/SDLNoticeClient.swift index 06b991b..aa97253 100644 --- a/Tun/Punchnet/SDLNoticeClient.swift +++ b/Tun/Punchnet/SDLNoticeClient.swift @@ -21,6 +21,9 @@ import NIOPosix // 处理和sn-server服务器之间的通讯 final class SDLNoticeClient { private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let (writeStream, writeContinuation) = AsyncStream.makeStream(of: Data.self) + private var task: Task? + private var channel: Channel private let logger: SDLLogger private let noticePort: Int @@ -40,15 +43,37 @@ final class SDLNoticeClient { self.logger.log("[SDLNoticeClient] started", level: .debug) } - // 处理写入逻辑 - func send(data: Data) { - if let remoteAddress = try? SocketAddress(ipAddress: "127.0.0.1", port: noticePort) { - let buf = channel.allocator.buffer(bytes: data) - let envelope = AddressedEnvelope(remoteAddress: remoteAddress, data: buf) - self.channel.eventLoop.execute { - self.channel.writeAndFlush(envelope, promise: nil) + func start() { + let channel = self.channel + self.task = Task.detached { + self.logger.log("[SDLNoticeClient] task 11", level: .debug) + guard let remoteAddress = try? await SDLAddressResolver.shared.resolve(host: "127.0.0.1", port: self.noticePort) else { + self.logger.log("[SDLNoticeClient] task 22", level: .debug) + return + } + + for await data in self.writeStream { + if Task.isCancelled { + self.logger.log("[SDLNoticeClient] task 33", level: .debug) + break + } + self.logger.log("[SDLNoticeClient] task 44", level: .debug) + + self.logger.log("[SDLNoticeClient] send packet", level: .debug) + + let buf = channel.allocator.buffer(bytes: data) + let envelope = AddressedEnvelope(remoteAddress: remoteAddress, data: buf) + channel.eventLoop.execute { + channel.writeAndFlush(envelope, promise: nil) + } } } + + } + + // 处理写入逻辑 + func send(data: Data) { + self.writeContinuation.yield(data) } func waitClose() async throws { @@ -56,6 +81,8 @@ final class SDLNoticeClient { } deinit { + self.writeContinuation.finish() + self.task?.cancel() try? self.group.syncShutdownGracefully() } diff --git a/Tun/Punchnet/SDLTunnelProviderAdapter.swift b/Tun/Punchnet/SDLTunnelProviderAdapter.swift index b472f00..e5140c1 100644 --- a/Tun/Punchnet/SDLTunnelProviderAdapter.swift +++ b/Tun/Punchnet/SDLTunnelProviderAdapter.swift @@ -60,7 +60,7 @@ final class SDLTunnelProviderAdapter { dnsSettings.matchDomains = [networkDomain] dnsSettings.matchDomainsNoSearch = false networkSettings.dnsSettings = dnsSettings - self.logger.log("[SDLContext] Tun started at network ip: \(netAddress.ipAddress), mask: \(netAddress.maskAddress)", level: .info) + self.logger.log("[SDLTunnelProviderAdapter] Tun started at network ip: \(netAddress.ipAddress), mask: \(netAddress.maskAddress)", level: .info) let ipv4Settings = NEIPv4Settings(addresses: [netAddress.ipAddress], subnetMasks: [netAddress.maskAddress]) // 设置路由表 diff --git a/Tun/Punchnet/SDLUDPHole.swift b/Tun/Punchnet/SDLUDPHole.swift index 31c6433..08bfb10 100644 --- a/Tun/Punchnet/SDLUDPHole.swift +++ b/Tun/Punchnet/SDLUDPHole.swift @@ -133,7 +133,7 @@ final class SDLUDPHole: ChannelInboundHandler { buffer.writeBytes(data) let envelope = AddressedEnvelope(remoteAddress: remoteAddress, data: buffer) - channel.eventLoop.execute { + _ = channel.eventLoop.submit { channel.writeAndFlush(envelope, promise: nil) } }