Compare commits

...

2 Commits

Author SHA1 Message Date
f839a5dd11 支持ipv6 2026-04-15 17:15:36 +08:00
5551d38b88 Tun/Punchnet/SDLUDPHoleV6.swift 2026-04-15 17:14:46 +08:00
5 changed files with 323 additions and 25 deletions

View File

@ -22,6 +22,11 @@ actor SDLContextActor {
case stopped case stopped
} }
private enum UDPHoleKind: Equatable {
case v4
case v6
}
private var readyState: ReadyState = .idle private var readyState: ReadyState = .idle
private var readyWaiters: [CheckedContinuation<Void, Error>] = [] private var readyWaiters: [CheckedContinuation<Void, Error>] = []
@ -43,6 +48,9 @@ actor SDLContextActor {
private var udpHole: SDLUDPHole? private var udpHole: SDLUDPHole?
private var udpHoleWorkers: [Task<Void, Never>]? private var udpHoleWorkers: [Task<Void, Never>]?
private var udpHoleLocalAddress: SocketAddress? private var udpHoleLocalAddress: SocketAddress?
private var udpHoleV6: SDLUDPHoleV6?
private var udpHoleV6Workers: [Task<Void, Never>]?
private var udpHoleV6LocalAddress: SocketAddress?
// dnsclient // dnsclient
private var dnsClient: DNSCloudClient? private var dnsClient: DNSCloudClient?
@ -137,6 +145,13 @@ actor SDLContextActor {
try await udpHole.waitClose() try await udpHole.waitClose()
SDLLogger.log("[SDLContext] udp closed!!!!") SDLLogger.log("[SDLContext] udp closed!!!!")
} }
await self.supervisor.addWorker(name: "udpHoleV6") {
let udpHoleV6 = try await self.startUDPHoleV6()
SDLLogger.log("[SDLContext] udp v6 running!!!!")
try await udpHoleV6.waitClose()
SDLLogger.log("[SDLContext] udp v6 closed!!!!")
}
} }
public func waitForReady() async throws { public func waitForReady() async throws {
@ -194,7 +209,7 @@ actor SDLContextActor {
await self.handleRegisterSuperNak(nakPacket: registerSuperNak) await self.handleRegisterSuperNak(nakPacket: registerSuperNak)
case .peerInfo(let peerInfo): case .peerInfo(let peerInfo):
//SDLLogger.shared.log("[SDLContext] peer message: \(peerInfo)") //SDLLogger.shared.log("[SDLContext] peer message: \(peerInfo)")
await self.puncherActor.handlePeerInfo(using: self.udpHole, peerInfo: peerInfo) await self.puncherActor.handlePeerInfo(using: self.udpHole, udpHoleV6: self.udpHoleV6, peerInfo: peerInfo)
case .event(let event): case .event(let event):
await self.handleEvent(event: event) await self.handleEvent(event: event)
case .policyReponse(let policyResponse): case .policyReponse(let policyResponse):
@ -316,7 +331,7 @@ actor SDLContextActor {
// //
let messageStream = udpHole.messageStream let messageStream = udpHole.messageStream
let messageTask = Task.detached { let messageTask = Task.detached {
await self.consumeUDPHoleMessages(stream: messageStream, localAddress: localAddress) await self.consumeUDPHoleMessages(stream: messageStream, localAddress: localAddress, source: .v4)
} }
self.udpHole = udpHole self.udpHole = udpHole
@ -329,6 +344,28 @@ actor SDLContextActor {
return udpHole return udpHole
} }
private func startUDPHoleV6() async throws -> SDLUDPHoleV6 {
self.udpHoleV6Workers?.forEach {$0.cancel()}
self.udpHoleV6Workers = nil
// udp
let udpHoleV6 = try SDLUDPHoleV6()
let localAddress = try udpHoleV6.start()
SDLLogger.log("[SDLContext] udpHoleV6 started, on address: \(localAddress)")
//
let messageStream = udpHoleV6.messageStream
let messageTask = Task.detached {
await self.consumeUDPHoleMessages(stream: messageStream, localAddress: localAddress, source: .v6)
}
self.udpHoleV6 = udpHoleV6
self.udpHoleV6LocalAddress = localAddress
self.udpHoleV6Workers = [messageTask]
return udpHoleV6
}
// context // context
public func stop() async { public func stop() async {
self.resumeReadyWaiters(.failure(CancellationError())) self.resumeReadyWaiters(.failure(CancellationError()))
@ -344,6 +381,12 @@ actor SDLContextActor {
self.udpHole = nil self.udpHole = nil
self.udpHoleLocalAddress = nil self.udpHoleLocalAddress = nil
self.udpHoleV6Workers?.forEach { $0.cancel() }
self.udpHoleV6Workers = nil
self.udpHoleV6?.stop()
self.udpHoleV6 = nil
self.udpHoleV6LocalAddress = nil
self.quicWorker?.cancel() self.quicWorker?.cancel()
self.quicWorker = nil self.quicWorker = nil
self.quicClient?.stop() self.quicClient?.stop()
@ -502,16 +545,16 @@ actor SDLContextActor {
// super/stun // super/stun
private func sendSuperPacket(type: SDLPacketType, data: Data) { private func sendSuperPacket(type: SDLPacketType, data: Data) {
self.udpHole?.send(type: type, data: data, remoteAddress: self.config.stunSocketAddress) self.sendPacket(type: type, data: data, remoteAddress: self.config.stunSocketAddress)
} }
// peer // peer
private func sendPeerPacket(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) { private func sendPeerPacket(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) {
self.udpHole?.send(type: type, data: data, remoteAddress: remoteAddress) self.sendPacket(type: type, data: data, remoteAddress: remoteAddress)
} }
private func makeCurrentV6Info() -> SDLV6Info? { private func makeCurrentV6Info() -> SDLV6Info? {
guard let port = self.udpHoleLocalAddress?.port else { guard let port = self.udpHoleV6LocalAddress?.port else {
return nil return nil
} }
@ -535,6 +578,25 @@ actor SDLContextActor {
return v6Info return v6Info
} }
private func sendPacket(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) {
switch remoteAddress {
case .v4:
guard let udpHole = self.udpHole else {
SDLLogger.log("[SDLContext] udpHole is nil for remoteAddress: \(remoteAddress)", for: .debug)
return
}
udpHole.send(type: type, data: data, remoteAddress: remoteAddress)
case .v6:
guard let udpHoleV6 = self.udpHoleV6 else {
SDLLogger.log("[SDLContext] udpHoleV6 is nil for remoteAddress: \(remoteAddress)", for: .debug)
return
}
udpHoleV6.send(type: type, data: data, remoteAddress: remoteAddress)
default:
SDLLogger.log("[SDLContext] unsupported socket family: \(remoteAddress)", for: .debug)
}
}
private func spawnLoop(_ body: @escaping () async throws -> Void) -> Task<Void, Never> { private func spawnLoop(_ body: @escaping () async throws -> Void) -> Task<Void, Never> {
return Task.detached { return Task.detached {
while !Task.isCancelled { while !Task.isCancelled {
@ -607,6 +669,8 @@ actor SDLContextActor {
deinit { deinit {
self.udpHole = nil self.udpHole = nil
self.udpHoleLocalAddress = nil self.udpHoleLocalAddress = nil
self.udpHoleV6 = nil
self.udpHoleV6LocalAddress = nil
self.dnsClient = nil self.dnsClient = nil
} }
} }
@ -768,7 +832,7 @@ extension SDLContextActor {
// Hole // Hole
extension SDLContextActor { extension SDLContextActor {
private func consumeUDPHoleMessages(stream: AsyncStream<(SocketAddress, SDLHoleMessage)>, localAddress: SocketAddress) async { private func consumeUDPHoleMessages(stream: AsyncStream<(SocketAddress, SDLHoleMessage)>, localAddress: SocketAddress, source: UDPHoleKind) async {
for await (remoteAddress, message) in stream { for await (remoteAddress, message) in stream {
if Task.isCancelled { if Task.isCancelled {
break break
@ -776,16 +840,19 @@ extension SDLContextActor {
switch message.inboundMessage { switch message.inboundMessage {
case .control(let controlMessage): case .control(let controlMessage):
await self.handleHoleControlMessage(controlMessage, localAddress: localAddress, remoteAddress: remoteAddress) await self.handleHoleControlMessage(controlMessage, localAddress: localAddress, remoteAddress: remoteAddress, source: source)
case .data(let data): case .data(let data):
try? await self.handleHoleData(data: data) try? await self.handleHoleData(data: data)
} }
} }
} }
private func handleHoleControlMessage(_ message: SDLHoleControlMessage, localAddress: SocketAddress, remoteAddress: SocketAddress) async { private func handleHoleControlMessage(_ message: SDLHoleControlMessage, localAddress: SocketAddress, remoteAddress: SocketAddress, source: UDPHoleKind) async {
switch message { switch message {
case .stunProbeReply(let probeReply): case .stunProbeReply(let probeReply):
guard source == .v4 else {
return
}
await self.proberActor.handleProbeReply(localAddress: localAddress, reply: probeReply) await self.proberActor.handleProbeReply(localAddress: localAddress, reply: probeReply)
case .register(let register): case .register(let register):
try? self.handleRegister(remoteAddress: remoteAddress, register: register) try? self.handleRegister(remoteAddress: remoteAddress, register: register)

View File

@ -93,7 +93,7 @@ actor SDLPuncherActor {
quicClient.send(type: .queryInfo, data: queryData) quicClient.send(type: .queryInfo, data: queryData)
} }
func handlePeerInfo(using udpHole: SDLUDPHole?, peerInfo: SDLPeerInfo) async { func handlePeerInfo(using udpHole: SDLUDPHole?, udpHoleV6: SDLUDPHoleV6?, peerInfo: SDLPeerInfo) async {
let now = Date() let now = Date()
self.cleanupExpiredEntries(now: now) self.cleanupExpiredEntries(now: now)
@ -108,8 +108,8 @@ actor SDLPuncherActor {
entry.markCoolingDown() entry.markCoolingDown()
self.requestEntries[peerInfo.dstMac] = entry self.requestEntries[peerInfo.dstMac] = entry
guard let udpHole else { guard udpHole != nil || udpHoleV6 != nil else {
SDLLogger.log("[SDLPuncherActor] udpHole is nil when peerInfo arrived", for: .debug) SDLLogger.log("[SDLPuncherActor] udpHole and udpHoleV6 are nil when peerInfo arrived", for: .debug)
return return
} }
@ -127,7 +127,7 @@ actor SDLPuncherActor {
if peerInfo.hasV4Info { if peerInfo.hasV4Info {
if let remoteAddress = try? await peerInfo.v4Info.socketAddress() { if let remoteAddress = try? await peerInfo.v4Info.socketAddress() {
SDLLogger.log("[SDLContext] hole sock address: \(remoteAddress)", for: .debug) SDLLogger.log("[SDLContext] hole sock address: \(remoteAddress)", for: .debug)
udpHole.send(type: .register, data: registerData, remoteAddress: remoteAddress) self.sendRegister(using: udpHole, udpHoleV6: udpHoleV6, registerData: registerData, remoteAddress: remoteAddress)
} else { } else {
SDLLogger.log("[SDLPuncherActor] failed to resolve peerInfo.v4Info", for: .debug) SDLLogger.log("[SDLPuncherActor] failed to resolve peerInfo.v4Info", for: .debug)
} }
@ -136,7 +136,7 @@ actor SDLPuncherActor {
if peerInfo.hasV6Info { if peerInfo.hasV6Info {
if let remoteAddress = try? await peerInfo.v6Info.socketAddress() { if let remoteAddress = try? await peerInfo.v6Info.socketAddress() {
SDLLogger.log("[SDLContext] hole sock address v6: \(remoteAddress)", for: .debug) SDLLogger.log("[SDLContext] hole sock address v6: \(remoteAddress)", for: .debug)
udpHole.send(type: .register, data: registerData, remoteAddress: remoteAddress) self.sendRegister(using: udpHole, udpHoleV6: udpHoleV6, registerData: registerData, remoteAddress: remoteAddress)
} else { } else {
SDLLogger.log("[SDLPuncherActor] failed to resolve peerInfo.v6Info", for: .debug) SDLLogger.log("[SDLPuncherActor] failed to resolve peerInfo.v6Info", for: .debug)
} }
@ -156,6 +156,28 @@ actor SDLPuncherActor {
} }
} }
private func sendRegister(using udpHole: SDLUDPHole?,
udpHoleV6: SDLUDPHoleV6?,
registerData: Data,
remoteAddress: SocketAddress) {
switch remoteAddress {
case .v4:
guard let udpHole else {
SDLLogger.log("[SDLPuncherActor] udpHole is nil when v4 peerInfo arrived", for: .debug)
return
}
udpHole.send(type: .register, data: registerData, remoteAddress: remoteAddress)
case .v6:
guard let udpHoleV6 else {
SDLLogger.log("[SDLPuncherActor] udpHoleV6 is nil when v6 peerInfo arrived", for: .debug)
return
}
udpHoleV6.send(type: .register, data: registerData, remoteAddress: remoteAddress)
default:
SDLLogger.log("[SDLPuncherActor] unsupported peer address family: \(remoteAddress)", for: .debug)
}
}
deinit { deinit {
self.cleanupTask?.cancel() self.cleanupTask?.cancel()
} }

View File

@ -44,8 +44,8 @@ final class SDLUDPHole: ChannelInboundHandler {
channel.pipeline.addHandler(self) channel.pipeline.addHandler(self)
} }
// IPv6SwiftNIOdual-stack socketIPv4/IPv6 // IPv4IPv4
let channel = try bootstrap.bind(host: "::", port: 0).wait() let channel = try bootstrap.bind(host: "0.0.0.0", port: 0).wait()
self.channel = channel self.channel = channel
self.closeFuture = channel.closeFuture self.closeFuture = channel.closeFuture
self.state = .ready self.state = .ready
@ -70,6 +70,7 @@ final class SDLUDPHole: ChannelInboundHandler {
} }
func stop() { func stop() {
SDLLogger.log("[SDLUDPHole] waitClose stop", for: .debug)
switch self.state { switch self.state {
case .stopping, .stopped: case .stopping, .stopped:
return return
@ -116,6 +117,7 @@ final class SDLUDPHole: ChannelInboundHandler {
self.finishMessageStream() self.finishMessageStream()
self.channel = nil self.channel = nil
self.state = .stopped self.state = .stopped
SDLLogger.log("[SDLUDPHole] channelInactive", for: .debug)
} }
func errorCaught(context: ChannelHandlerContext, error: any Error) { func errorCaught(context: ChannelHandlerContext, error: any Error) {
@ -125,6 +127,7 @@ final class SDLUDPHole: ChannelInboundHandler {
self.state = .stopping self.state = .stopping
} }
context.close(promise: nil) context.close(promise: nil)
SDLLogger.log("[SDLUDPHole] errorCaught", for: .debug)
} }
// MARK: // MARK:
@ -198,6 +201,7 @@ final class SDLUDPHole: ChannelInboundHandler {
} }
deinit { deinit {
SDLLogger.log("[SDLUDPHole] closeWait deinit", for: .debug)
self.stop() self.stop()
try? self.group.syncShutdownGracefully() try? self.group.syncShutdownGracefully()
} }

View File

@ -0,0 +1,205 @@
//
// SDLUDPHoleV6.swift
// Tun
//
// Created by on 2026/4/15.
//
import Foundation
import NIOCore
import NIOPosix
import SwiftProtobuf
// sn-server
final class SDLUDPHoleV6: ChannelInboundHandler {
typealias InboundIn = AddressedEnvelope<ByteBuffer>
private enum State: Equatable {
case idle
case ready
case stopping
case stopped
}
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private var channel: Channel?
private var closeFuture: EventLoopFuture<Void>?
private var state: State = .idle
private var didFinishMessageStream: Bool = false
public let messageStream: AsyncStream<(SocketAddress, SDLHoleMessage)>
private let messageContinuation: AsyncStream<(SocketAddress, SDLHoleMessage)>.Continuation
//
init() throws {
let (stream, continuation) = AsyncStream.makeStream(of: (SocketAddress, SDLHoleMessage).self, bufferingPolicy: .bufferingNewest(2048))
self.messageStream = stream
self.messageContinuation = continuation
}
func start() throws -> SocketAddress {
let bootstrap = DatagramBootstrap(group: group)
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.channelInitializer { channel in
channel.pipeline.addHandler(self)
}
// IPv6IPv6
let channel = try bootstrap.bind(host: "::", port: 0).wait()
self.channel = channel
self.closeFuture = channel.closeFuture
self.state = .ready
precondition(channel.localAddress != nil, "UDP v6 channel has no localAddress after bind")
return channel.localAddress!
}
func waitClose() async throws {
switch self.state {
case .idle:
SDLLogger.log("[SDLUDPHoleV6] waitClose11", for: .debug)
return
case .ready, .stopping, .stopped:
guard let closeFuture = self.closeFuture else {
SDLLogger.log("[SDLUDPHoleV6] waitClose22", for: .debug)
return
}
try await closeFuture.get()
SDLLogger.log("[SDLUDPHoleV6] waitClose33", for: .debug)
}
}
func stop() {
switch self.state {
case .stopping, .stopped:
return
case .idle:
self.state = .stopped
self.finishMessageStream()
return
case .ready:
self.state = .stopping
}
self.finishMessageStream()
self.channel?.close(promise: nil)
}
// --MARK: ChannelInboundHandler delegate
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
guard case .ready = self.state else {
return
}
let envelope = unwrapInboundIn(data)
var buffer = envelope.data
let remoteAddress = envelope.remoteAddress
if let rawBytes = buffer.getBytes(at: buffer.readerIndex, length: buffer.readableBytes) {
SDLLogger.log("[SDLUDPHoleV6] get raw bytes: \(rawBytes.count), from: \(remoteAddress)", for: .debug)
}
do {
if let message = try decode(buffer: &buffer) {
self.messageContinuation.yield((remoteAddress, message))
} else {
SDLLogger.log("[SDLUDPHoleV6] decode message, get null", for: .debug)
}
} catch let err {
SDLLogger.log("[SDLUDPHoleV6] decode message, get error: \(err)", for: .debug)
}
}
func channelInactive(context: ChannelHandlerContext) {
self.finishMessageStream()
self.channel = nil
self.state = .stopped
}
func errorCaught(context: ChannelHandlerContext, error: any Error) {
SDLLogger.log("[SDLUDPHoleV6] channel error: \(error)", for: .debug)
self.finishMessageStream()
if self.state != .stopped {
self.state = .stopping
}
context.close(promise: nil)
}
// MARK:
func send(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) {
guard case .ready = self.state, let channel = self.channel else {
return
}
var buffer = channel.allocator.buffer(capacity: data.count + 1)
buffer.writeBytes([type.rawValue])
buffer.writeBytes(data)
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: remoteAddress, data: buffer)
_ = channel.eventLoop.submit {
channel.writeAndFlush(envelope, promise: nil)
}
}
// --MARK:
private func decode(buffer: inout ByteBuffer) throws -> SDLHoleMessage? {
guard let type = buffer.readInteger(as: UInt8.self),
let packetType = SDLPacketType(rawValue: type) else {
return nil
}
switch packetType {
case .data:
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let dataPacket = try? SDLData(serializedBytes: bytes) else {
return nil
}
return .data(dataPacket)
case .register:
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let registerPacket = try? SDLRegister(serializedBytes: bytes) else {
return nil
}
return .register(registerPacket)
case .registerAck:
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let registerAck = try? SDLRegisterAck(serializedBytes: bytes) else {
return nil
}
return .registerAck(registerAck)
case .stunProbeReply:
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let stunProbeReply = try? SDLStunProbeReply(serializedBytes: bytes) else {
return nil
}
return .stunProbeReply(stunProbeReply)
case .stunReply:
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let stunReply = try? SDLStunReply(serializedBytes: bytes) else {
return nil
}
return .stunReply(stunReply)
default:
SDLLogger.log("SDLUDPHoleV6 decode miss type: \(type)", for: .debug)
return nil
}
}
private func finishMessageStream() {
guard !self.didFinishMessageStream else {
return
}
self.didFinishMessageStream = true
self.messageContinuation.finish()
}
deinit {
self.stop()
try? self.group.syncShutdownGracefully()
}
}

View File

@ -1,3 +1,3 @@
#! /bin/sh #! /bin/sh
log stream --predicate 'subsystem == "com.jihe.punchnet"' --info --style compact log stream --predicate 'subsystem == "com.jihe.punchnet.debug"' --info --style compact