This commit is contained in:
anlicheng 2026-04-14 14:47:45 +08:00
parent fc66d96ca4
commit 36e6b9f6d9
2 changed files with 144 additions and 37 deletions

View File

@ -261,12 +261,13 @@ actor SDLContextActor {
// dns // dns
let dnsClient = DNSCloudClient(host: self.config.serverIp, port: 15353) let dnsClient = DNSCloudClient(host: self.config.serverIp, port: 15353)
dnsClient.start() await dnsClient.start()
SDLLogger.log("[SDLContext] dnsClient started") SDLLogger.log("[SDLContext] dnsClient started")
self.dnsClient = dnsClient self.dnsClient = dnsClient
let packetFlow = dnsClient.packetFlow
self.dnsWorker = Task.detached { self.dnsWorker = Task.detached {
// //
for await packet in dnsClient.packetFlow { for await packet in packetFlow {
if Task.isCancelled { if Task.isCancelled {
break break
} }
@ -285,7 +286,7 @@ actor SDLContextActor {
await dnsLocalClient.start() await dnsLocalClient.start()
SDLLogger.log("[SDLContext] dnsClient started") SDLLogger.log("[SDLContext] dnsClient started")
self.dnsLocalClient = dnsLocalClient self.dnsLocalClient = dnsLocalClient
let packetFlow = await dnsLocalClient.packetFlow let packetFlow = dnsLocalClient.packetFlow
self.dnsLocalWorker = Task.detached { self.dnsLocalWorker = Task.detached {
// //
for await packet in packetFlow { for await packet in packetFlow {
@ -379,7 +380,9 @@ actor SDLContextActor {
self.quicClient?.stop() self.quicClient?.stop()
self.quicClient = nil self.quicClient = nil
self.dnsClient?.stop() if let dnsClient = self.dnsClient {
await dnsClient.stop()
}
self.dnsWorker?.cancel() self.dnsWorker?.cancel()
self.dnsWorker = nil self.dnsWorker = nil
self.dnsClient = nil self.dnsClient = nil
@ -765,7 +768,9 @@ actor SDLContextActor {
// ip // ip
if name.contains(self.config.networkAddress.networkDomain) { if name.contains(self.config.networkAddress.networkDomain) {
SDLLogger.log("[SDLContext] get cloud dns request: \(name)") SDLLogger.log("[SDLContext] get cloud dns request: \(name)")
self.dnsClient?.forward(ipPacketData: packet.data) if let dnsClient = self.dnsClient {
await dnsClient.forward(ipPacketData: packet.data)
}
} }
// //
else if let exitNode = config.exitNode { else if let exitNode = config.exitNode {

View File

@ -7,28 +7,49 @@
import Foundation import Foundation
import Network import Network
final class DNSCloudClient { actor DNSCloudClient {
private enum State {
case idle
case running
case stopped
}
private var state: State = .idle
private var connection: NWConnection? private var connection: NWConnection?
private var receiveTask: Task<Void, Never>?
private let dnsServerAddress: NWEndpoint private let dnsServerAddress: NWEndpoint
// DNS // DNS
public let packetFlow: AsyncStream<Data> public let packetFlow: AsyncStream<Data>
private let packetContinuation: AsyncStream<Data>.Continuation private let packetContinuation: AsyncStream<Data>.Continuation
private var didFinishPacketFlow = false
// //
private let (closeStream, closeContinuation) = AsyncStream.makeStream(of: Void.self) private let closeStream: AsyncStream<Void>
private let closeContinuation: AsyncStream<Void>.Continuation
private var didFinishCloseStream = false
/// - Parameter host: sn-server ( "8.8.8.8") /// - Parameter host: sn-server ( "8.8.8.8")
/// - Parameter port: ( 53) /// - Parameter port: ( 53)
init(host: String, port: UInt16 ) { init(host: String, port: UInt16 ) {
self.dnsServerAddress = .hostPort(host: NWEndpoint.Host(host), port: NWEndpoint.Port(integerLiteral: port)) self.dnsServerAddress = .hostPort(host: NWEndpoint.Host(host), port: NWEndpoint.Port(integerLiteral: port))
let (stream, continuation) = AsyncStream.makeStream(of: Data.self, bufferingPolicy: .unbounded) let (packetStream, packetContinuation) = AsyncStream.makeStream(of: Data.self, bufferingPolicy: .bufferingNewest(256))
self.packetFlow = stream self.packetFlow = packetStream
self.packetContinuation = continuation self.packetContinuation = packetContinuation
let (closeStream, closeContinuation) = AsyncStream.makeStream(of: Void.self, bufferingPolicy: .bufferingNewest(1))
self.closeStream = closeStream
self.closeContinuation = closeContinuation
} }
func start() { func start() {
guard case .idle = self.state else {
return
}
self.state = .running
// 1. // 1.
let parameters = NWParameters.udp let parameters = NWParameters.udp
@ -42,18 +63,8 @@ final class DNSCloudClient {
self.connection = connection self.connection = connection
connection.stateUpdateHandler = { [weak self] state in connection.stateUpdateHandler = { [weak self] state in
switch state { Task {
case .ready: await self?.handleConnectionStateUpdate(state, for: connection)
SDLLogger.log("[DNSClient] Connection ready", for: .debug)
self?.receiveLoop() //
case .failed(let error):
SDLLogger.log("[DNSClient] Connection failed: \(error)", for: .debug)
self?.stop()
case .cancelled:
self?.packetContinuation.finish()
self?.closeContinuation.finish()
default:
break
} }
} }
@ -62,26 +73,34 @@ final class DNSCloudClient {
} }
public func waitClose() async { public func waitClose() async {
for await _ in closeStream { } for await _ in self.closeStream { }
} }
/// ///
private func receiveLoop() { private static func makeReceiveStream(for connection: NWConnection) -> AsyncStream<Data> {
connection?.receiveMessage { [weak self] content, _, isComplete, error in return AsyncStream(bufferingPolicy: .bufferingNewest(256)) { continuation in
if let data = content, !data.isEmpty { func receiveNext() {
// DNS AsyncStream connection.receiveMessage { content, _, _, error in
self?.packetContinuation.yield(data) if let data = content, !data.isEmpty {
// DNS AsyncStream
continuation.yield(data)
}
if error == nil && connection.state == .ready {
receiveNext() //
} else {
continuation.finish()
}
}
} }
if error == nil && self?.connection?.state == .ready { receiveNext()
self?.receiveLoop() //
}
} }
} }
/// DNS TUN IP /// DNS TUN IP
func forward(ipPacketData: Data) { func forward(ipPacketData: Data) {
guard let connection = self.connection, connection.state == .ready else { guard case .running = self.state, let connection = self.connection, connection.state == .ready else {
return return
} }
@ -93,13 +112,96 @@ final class DNSCloudClient {
} }
func stop() { func stop() {
connection?.cancel() guard self.state != .stopped else {
connection = nil return
}
self.state = .stopped
self.receiveTask?.cancel()
self.receiveTask = nil
self.connection?.cancel()
self.connection = nil
self.finishPacketFlowIfNeeded()
self.finishCloseStreamIfNeeded()
}
private func handleConnectionStateUpdate(_ state: NWConnection.State, for connection: NWConnection) {
guard case .running = self.state else {
return
}
switch state {
case .ready:
SDLLogger.log("[DNSClient] Connection ready", for: .debug)
self.startReceiveTask(for: connection)
case .failed(let error):
SDLLogger.log("[DNSClient] Connection failed: \(error)", for: .debug)
self.stop()
case .cancelled:
self.stop()
default:
break
}
}
private func startReceiveTask(for connection: NWConnection) {
guard self.receiveTask == nil else {
return
}
let stream = Self.makeReceiveStream(for: connection)
self.receiveTask = Task { [weak self] in
guard let self else {
return
}
for await data in stream {
await self.handleReceivedPacket(data)
}
await self.didFinishReceiving(for: connection)
}
}
private func handleReceivedPacket(_ data: Data) {
guard case .running = self.state else {
return
}
self.packetContinuation.yield(data)
}
private func didFinishReceiving(for connection: NWConnection) {
guard case .running = self.state else {
return
}
if self.connection === connection, connection.state != .ready {
self.stop()
} else {
self.receiveTask = nil
}
}
private func finishPacketFlowIfNeeded() {
guard !self.didFinishPacketFlow else {
return
}
self.didFinishPacketFlow = true
self.packetContinuation.finish()
}
private func finishCloseStreamIfNeeded() {
guard !self.didFinishCloseStream else {
return
}
self.didFinishCloseStream = true
self.closeContinuation.finish()
} }
deinit { deinit {
stop() self.connection?.cancel()
} }
} }