fix dns client
This commit is contained in:
parent
e36ecd0c29
commit
047f5b90ec
@ -13,9 +13,9 @@ import NIOPosix
|
||||
@available(macOS 14, *)
|
||||
actor SDLDNSClientActor {
|
||||
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||
private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>
|
||||
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: Data.self, bufferingPolicy: .unbounded)
|
||||
|
||||
private var channel: Channel?
|
||||
private let logger: SDLLogger
|
||||
private let dnsServerAddress: SocketAddress
|
||||
|
||||
@ -26,72 +26,30 @@ actor SDLDNSClientActor {
|
||||
init(dnsServerAddress: SocketAddress, logger: SDLLogger) async throws {
|
||||
self.dnsServerAddress = dnsServerAddress
|
||||
self.logger = logger
|
||||
|
||||
(self.packetFlow, self.packetContinuation) = AsyncStream.makeStream(of: Data.self, bufferingPolicy: .unbounded)
|
||||
}
|
||||
|
||||
func start() throws {
|
||||
let bootstrap = DatagramBootstrap(group: group)
|
||||
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
||||
|
||||
self.asyncChannel = try await bootstrap.bind(host: "0.0.0.0", port: 0)
|
||||
.flatMapThrowing { channel in
|
||||
return try NIOAsyncChannel(wrappingChannelSynchronously: channel, configuration: .init(
|
||||
inboundType: AddressedEnvelope<ByteBuffer>.self,
|
||||
outboundType: AddressedEnvelope<ByteBuffer>.self
|
||||
))
|
||||
}
|
||||
.get()
|
||||
.channelInitializer { channel in
|
||||
channel.pipeline.addHandler(SDLDNSInboundHandler(packetContinuation: self.packetContinuation, logger: self.logger))
|
||||
}
|
||||
|
||||
func start() async throws {
|
||||
try await withTaskCancellationHandler {
|
||||
try await self.asyncChannel.executeThenClose {inbound, outbound in
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
group.addTask {
|
||||
defer {
|
||||
self.logger.log("[DNSClient] inbound closed", level: .warning)
|
||||
}
|
||||
|
||||
for try await envelope in inbound {
|
||||
try Task.checkCancellation()
|
||||
var buffer = envelope.data
|
||||
let remoteAddress = envelope.remoteAddress
|
||||
self.logger.log("[DNSClient] read data: \(buffer), from: \(remoteAddress)", level: .debug)
|
||||
|
||||
let len = buffer.readableBytes
|
||||
if let bytes = buffer.readBytes(length: len) {
|
||||
self.packetContinuation.yield(Data(bytes))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
group.addTask {
|
||||
defer {
|
||||
self.logger.log("[DNSClient] outbound closed", level: .warning)
|
||||
}
|
||||
|
||||
for await message in self.writeStream {
|
||||
try Task.checkCancellation()
|
||||
|
||||
let buffer = self.asyncChannel.channel.allocator.buffer(bytes: message)
|
||||
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: self.dnsServerAddress, data: buffer)
|
||||
try await outbound.write(envelope)
|
||||
}
|
||||
}
|
||||
|
||||
if let _ = try await group.next() {
|
||||
group.cancelAll()
|
||||
}
|
||||
}
|
||||
}
|
||||
} onCancel: {
|
||||
self.writeContinuation.finish()
|
||||
self.packetContinuation.finish()
|
||||
self.logger.log("[DNSClient] withTaskCancellationHandler cancel")
|
||||
}
|
||||
self.channel = try bootstrap.bind(host: "0.0.0.0", port: 0).wait()
|
||||
self.logger.log("[UDPHole] started", level: .debug)
|
||||
}
|
||||
|
||||
func forward(ipPacket: IPPacket) {
|
||||
self.writeContinuation.yield(ipPacket.data)
|
||||
guard let channel = self.channel else {
|
||||
return
|
||||
}
|
||||
|
||||
let buffer = channel.allocator.buffer(bytes: ipPacket.data)
|
||||
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: self.dnsServerAddress, data: buffer)
|
||||
channel.pipeline.eventLoop.execute {
|
||||
channel.writeAndFlush(envelope, promise: nil)
|
||||
}
|
||||
}
|
||||
|
||||
deinit {
|
||||
@ -103,6 +61,37 @@ actor SDLDNSClientActor {
|
||||
|
||||
extension SDLDNSClientActor {
|
||||
|
||||
private final class SDLDNSInboundHandler: ChannelInboundHandler {
|
||||
typealias InboundIn = AddressedEnvelope<ByteBuffer>
|
||||
|
||||
private var packetContinuation: AsyncStream<Data>.Continuation
|
||||
private var logger: SDLLogger
|
||||
|
||||
// --MARK: ChannelInboundHandler delegate
|
||||
|
||||
init(packetContinuation: AsyncStream<Data>.Continuation, logger: SDLLogger) {
|
||||
self.packetContinuation = packetContinuation
|
||||
self.logger = logger
|
||||
}
|
||||
|
||||
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||
let envelope = unwrapInboundIn(data)
|
||||
|
||||
var buffer = envelope.data
|
||||
let remoteAddress = envelope.remoteAddress
|
||||
self.logger.log("[DNSClient] read data: \(buffer), from: \(remoteAddress)", level: .debug)
|
||||
|
||||
let len = buffer.readableBytes
|
||||
if let bytes = buffer.readBytes(length: len) {
|
||||
self.packetContinuation.yield(Data(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
func channelInactive(context: ChannelHandlerContext) {
|
||||
self.packetContinuation.finish()
|
||||
}
|
||||
}
|
||||
|
||||
struct Helper {
|
||||
static let dnsServer: String = "100.100.100.100"
|
||||
// dns请求包的目标地址
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user