From fc66d96ca4d39684718ffd71dd045c0f2cc76410 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 14 Apr 2026 14:37:33 +0800 Subject: [PATCH] fix Local DNS --- Tun/Punchnet/DNS/DNSLocalClient.swift | 64 +++++++++++++++++++-------- 1 file changed, 46 insertions(+), 18 deletions(-) diff --git a/Tun/Punchnet/DNS/DNSLocalClient.swift b/Tun/Punchnet/DNS/DNSLocalClient.swift index b9b6aed..a0e052c 100644 --- a/Tun/Punchnet/DNS/DNSLocalClient.swift +++ b/Tun/Punchnet/DNS/DNSLocalClient.swift @@ -22,6 +22,7 @@ actor DNSLocalClient { private var state: State = .idle private var connections: [NWConnection] = [] + private var receiveTasks: [ObjectIdentifier: Task] = [:] private let dnsServers = ["223.5.5.5", "119.29.29.29"] let packetFlow: AsyncStream @@ -35,7 +36,7 @@ actor DNSLocalClient { private var didFinishPacketFlow = false init() { - let (stream, continuation) = AsyncStream.makeStream(of: Data.self, bufferingPolicy: .unbounded) + let (stream, continuation) = AsyncStream.makeStream(of: Data.self, bufferingPolicy: .bufferingNewest(256)) self.packetFlow = stream self.packetContinuation = continuation } @@ -104,6 +105,8 @@ actor DNSLocalClient { } self.state = .stopped + self.receiveTasks.values.forEach { $0.cancel() } + self.receiveTasks.removeAll() self.connections.forEach { $0.cancel() } self.connections.removeAll() @@ -122,11 +125,13 @@ actor DNSLocalClient { switch state { case .ready: - self.receiveLoop(for: conn) + self.startReceiveTask(for: conn) case .failed(let error): SDLLogger.log("[DNSLocalClient] failed with error: \(error.localizedDescription)", for: .debug) self.stop() case .cancelled: + let key = ObjectIdentifier(conn) + self.receiveTasks.removeValue(forKey: key)?.cancel() self.connections.removeAll { $0 === conn } if self.connections.isEmpty { self.stop() @@ -136,28 +141,31 @@ actor DNSLocalClient { } } - private func receiveLoop(for conn: NWConnection) { - conn.receiveMessage { [weak self] content, _, _, error in - Task { - await self?.handleReceive(content: content, error: error, for: conn) - } - } - } - - private func handleReceive(content: Data?, error: NWError?, for conn: NWConnection) { - guard case .running = self.state else { + private func startReceiveTask(for conn: NWConnection) { + let key = ObjectIdentifier(conn) + guard self.receiveTasks[key] == nil else { return } - if let data = content { - self.handleResponse(data: data) - } - - if error == nil && conn.state == .ready { - self.receiveLoop(for: conn) + let stream = Self.makeReceiveStream(for: conn) + self.receiveTasks[key] = Task { [weak self] in + guard let self else { + return + } + + for await data in stream { + await self.handleResponse(data: data) + } + + await self.didFinishReceiving(for: conn) } } + private func didFinishReceiving(for conn: NWConnection) { + let key = ObjectIdentifier(conn) + self.receiveTasks.removeValue(forKey: key) + } + private func handleResponse(data: Data) { guard case .running = self.state, let rewrittenTransactionID = Self.readTransactionID(from: data), @@ -235,6 +243,26 @@ actor DNSLocalClient { rewrittenPayload[1] = UInt8(transactionID & 0xFF) return rewrittenPayload } + + private static func makeReceiveStream(for conn: NWConnection) -> AsyncStream { + return AsyncStream(bufferingPolicy: .bufferingNewest(256)) { continuation in + func receiveNext() { + conn.receiveMessage { content, _, _, error in + if let data = content, !data.isEmpty { + continuation.yield(data) + } + + if error == nil && conn.state == .ready { + receiveNext() + } else { + continuation.finish() + } + } + } + + receiveNext() + } + } } extension DNSLocalClient {