Compare commits
2 Commits
2f0f1f6c7c
...
f839a5dd11
| Author | SHA1 | Date | |
|---|---|---|---|
| f839a5dd11 | |||
| 5551d38b88 |
@ -21,6 +21,11 @@ actor SDLContextActor {
|
|||||||
case failed(any Error)
|
case failed(any Error)
|
||||||
case stopped
|
case stopped
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private enum UDPHoleKind: Equatable {
|
||||||
|
case v4
|
||||||
|
case v6
|
||||||
|
}
|
||||||
|
|
||||||
private var readyState: ReadyState = .idle
|
private var readyState: ReadyState = .idle
|
||||||
private var readyWaiters: [CheckedContinuation<Void, Error>] = []
|
private var readyWaiters: [CheckedContinuation<Void, Error>] = []
|
||||||
@ -43,6 +48,9 @@ actor SDLContextActor {
|
|||||||
private var udpHole: SDLUDPHole?
|
private var udpHole: SDLUDPHole?
|
||||||
private var udpHoleWorkers: [Task<Void, Never>]?
|
private var udpHoleWorkers: [Task<Void, Never>]?
|
||||||
private var udpHoleLocalAddress: SocketAddress?
|
private var udpHoleLocalAddress: SocketAddress?
|
||||||
|
private var udpHoleV6: SDLUDPHoleV6?
|
||||||
|
private var udpHoleV6Workers: [Task<Void, Never>]?
|
||||||
|
private var udpHoleV6LocalAddress: SocketAddress?
|
||||||
|
|
||||||
// dns的client对象
|
// dns的client对象
|
||||||
private var dnsClient: DNSCloudClient?
|
private var dnsClient: DNSCloudClient?
|
||||||
@ -96,12 +104,12 @@ actor SDLContextActor {
|
|||||||
self.provider = provider
|
self.provider = provider
|
||||||
self.config = config
|
self.config = config
|
||||||
self.rsaCipher = rsaCipher
|
self.rsaCipher = rsaCipher
|
||||||
|
|
||||||
self.puncherActor = SDLPuncherActor()
|
self.puncherActor = SDLPuncherActor()
|
||||||
self.proberActor = SDLNATProberActor(addressArray: config.stunProbeSocketAddressArray)
|
self.proberActor = SDLNATProberActor(addressArray: config.stunProbeSocketAddressArray)
|
||||||
|
|
||||||
self.arpServer = ArpServer()
|
self.arpServer = ArpServer()
|
||||||
|
|
||||||
// 权限控制
|
// 权限控制
|
||||||
let snapshotPublisher = SnapshotPublisher(initial: IdentitySnapshot.empty())
|
let snapshotPublisher = SnapshotPublisher(initial: IdentitySnapshot.empty())
|
||||||
self.identifyStore = IdentityStore(publisher: snapshotPublisher)
|
self.identifyStore = IdentityStore(publisher: snapshotPublisher)
|
||||||
@ -112,11 +120,11 @@ actor SDLContextActor {
|
|||||||
guard case .idle = self.readyState else {
|
guard case .idle = self.readyState else {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
self.readyState = .starting
|
self.readyState = .starting
|
||||||
self.prepareTunnelNotifier()
|
self.prepareTunnelNotifier()
|
||||||
self.startMonitor()
|
self.startMonitor()
|
||||||
|
|
||||||
// 启动arp的定时清理任务
|
// 启动arp的定时清理任务
|
||||||
await self.puncherActor.start()
|
await self.puncherActor.start()
|
||||||
await self.arpServer.start()
|
await self.arpServer.start()
|
||||||
@ -137,8 +145,15 @@ actor SDLContextActor {
|
|||||||
try await udpHole.waitClose()
|
try await udpHole.waitClose()
|
||||||
SDLLogger.log("[SDLContext] udp closed!!!!")
|
SDLLogger.log("[SDLContext] udp closed!!!!")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await self.supervisor.addWorker(name: "udpHoleV6") {
|
||||||
|
let udpHoleV6 = try await self.startUDPHoleV6()
|
||||||
|
SDLLogger.log("[SDLContext] udp v6 running!!!!")
|
||||||
|
try await udpHoleV6.waitClose()
|
||||||
|
SDLLogger.log("[SDLContext] udp v6 closed!!!!")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public func waitForReady() async throws {
|
public func waitForReady() async throws {
|
||||||
switch self.readyState {
|
switch self.readyState {
|
||||||
case .ready:
|
case .ready:
|
||||||
@ -167,7 +182,7 @@ actor SDLContextActor {
|
|||||||
private func startQUICClient() async throws -> SDLQUICClient {
|
private func startQUICClient() async throws -> SDLQUICClient {
|
||||||
self.quicWorker?.cancel()
|
self.quicWorker?.cancel()
|
||||||
self.quicClient?.stop()
|
self.quicClient?.stop()
|
||||||
|
|
||||||
// 启动monitor
|
// 启动monitor
|
||||||
let quicClient = SDLQUICClient(host: self.config.serverHost, port: 443)
|
let quicClient = SDLQUICClient(host: self.config.serverHost, port: 443)
|
||||||
quicClient.start()
|
quicClient.start()
|
||||||
@ -194,7 +209,7 @@ actor SDLContextActor {
|
|||||||
await self.handleRegisterSuperNak(nakPacket: registerSuperNak)
|
await self.handleRegisterSuperNak(nakPacket: registerSuperNak)
|
||||||
case .peerInfo(let peerInfo):
|
case .peerInfo(let peerInfo):
|
||||||
//SDLLogger.shared.log("[SDLContext] peer message: \(peerInfo)")
|
//SDLLogger.shared.log("[SDLContext] peer message: \(peerInfo)")
|
||||||
await self.puncherActor.handlePeerInfo(using: self.udpHole, peerInfo: peerInfo)
|
await self.puncherActor.handlePeerInfo(using: self.udpHole, udpHoleV6: self.udpHoleV6, peerInfo: peerInfo)
|
||||||
case .event(let event):
|
case .event(let event):
|
||||||
await self.handleEvent(event: event)
|
await self.handleEvent(event: event)
|
||||||
case .policyReponse(let policyResponse):
|
case .policyReponse(let policyResponse):
|
||||||
@ -210,7 +225,7 @@ actor SDLContextActor {
|
|||||||
|
|
||||||
return quicClient
|
return quicClient
|
||||||
}
|
}
|
||||||
|
|
||||||
private func prepareTunnelNotifier() {
|
private func prepareTunnelNotifier() {
|
||||||
// 启动noticeClient
|
// 启动noticeClient
|
||||||
// 旧的 UDP NoticeClient 已移除,改为初始化基于 App Group 的通知通道。
|
// 旧的 UDP NoticeClient 已移除,改为初始化基于 App Group 的通知通道。
|
||||||
@ -316,7 +331,7 @@ actor SDLContextActor {
|
|||||||
// 处理消息流
|
// 处理消息流
|
||||||
let messageStream = udpHole.messageStream
|
let messageStream = udpHole.messageStream
|
||||||
let messageTask = Task.detached {
|
let messageTask = Task.detached {
|
||||||
await self.consumeUDPHoleMessages(stream: messageStream, localAddress: localAddress)
|
await self.consumeUDPHoleMessages(stream: messageStream, localAddress: localAddress, source: .v4)
|
||||||
}
|
}
|
||||||
|
|
||||||
self.udpHole = udpHole
|
self.udpHole = udpHole
|
||||||
@ -329,6 +344,28 @@ actor SDLContextActor {
|
|||||||
return udpHole
|
return udpHole
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func startUDPHoleV6() async throws -> SDLUDPHoleV6 {
|
||||||
|
self.udpHoleV6Workers?.forEach {$0.cancel()}
|
||||||
|
self.udpHoleV6Workers = nil
|
||||||
|
|
||||||
|
// 启动udp服务器
|
||||||
|
let udpHoleV6 = try SDLUDPHoleV6()
|
||||||
|
let localAddress = try udpHoleV6.start()
|
||||||
|
SDLLogger.log("[SDLContext] udpHoleV6 started, on address: \(localAddress)")
|
||||||
|
|
||||||
|
// 处理消息流
|
||||||
|
let messageStream = udpHoleV6.messageStream
|
||||||
|
let messageTask = Task.detached {
|
||||||
|
await self.consumeUDPHoleMessages(stream: messageStream, localAddress: localAddress, source: .v6)
|
||||||
|
}
|
||||||
|
|
||||||
|
self.udpHoleV6 = udpHoleV6
|
||||||
|
self.udpHoleV6LocalAddress = localAddress
|
||||||
|
self.udpHoleV6Workers = [messageTask]
|
||||||
|
|
||||||
|
return udpHoleV6
|
||||||
|
}
|
||||||
|
|
||||||
// 处理context的停止问题
|
// 处理context的停止问题
|
||||||
public func stop() async {
|
public func stop() async {
|
||||||
self.resumeReadyWaiters(.failure(CancellationError()))
|
self.resumeReadyWaiters(.failure(CancellationError()))
|
||||||
@ -344,6 +381,12 @@ actor SDLContextActor {
|
|||||||
self.udpHole = nil
|
self.udpHole = nil
|
||||||
self.udpHoleLocalAddress = nil
|
self.udpHoleLocalAddress = nil
|
||||||
|
|
||||||
|
self.udpHoleV6Workers?.forEach { $0.cancel() }
|
||||||
|
self.udpHoleV6Workers = nil
|
||||||
|
self.udpHoleV6?.stop()
|
||||||
|
self.udpHoleV6 = nil
|
||||||
|
self.udpHoleV6LocalAddress = nil
|
||||||
|
|
||||||
self.quicWorker?.cancel()
|
self.quicWorker?.cancel()
|
||||||
self.quicWorker = nil
|
self.quicWorker = nil
|
||||||
self.quicClient?.stop()
|
self.quicClient?.stop()
|
||||||
@ -502,16 +545,16 @@ actor SDLContextActor {
|
|||||||
|
|
||||||
// 发送给super/stun节点的数据
|
// 发送给super/stun节点的数据
|
||||||
private func sendSuperPacket(type: SDLPacketType, data: Data) {
|
private func sendSuperPacket(type: SDLPacketType, data: Data) {
|
||||||
self.udpHole?.send(type: type, data: data, remoteAddress: self.config.stunSocketAddress)
|
self.sendPacket(type: type, data: data, remoteAddress: self.config.stunSocketAddress)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 发送给peer的数据
|
// 发送给peer的数据
|
||||||
private func sendPeerPacket(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) {
|
private func sendPeerPacket(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) {
|
||||||
self.udpHole?.send(type: type, data: data, remoteAddress: remoteAddress)
|
self.sendPacket(type: type, data: data, remoteAddress: remoteAddress)
|
||||||
}
|
}
|
||||||
|
|
||||||
private func makeCurrentV6Info() -> SDLV6Info? {
|
private func makeCurrentV6Info() -> SDLV6Info? {
|
||||||
guard let port = self.udpHoleLocalAddress?.port else {
|
guard let port = self.udpHoleV6LocalAddress?.port else {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -535,6 +578,25 @@ actor SDLContextActor {
|
|||||||
return v6Info
|
return v6Info
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func sendPacket(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) {
|
||||||
|
switch remoteAddress {
|
||||||
|
case .v4:
|
||||||
|
guard let udpHole = self.udpHole else {
|
||||||
|
SDLLogger.log("[SDLContext] udpHole is nil for remoteAddress: \(remoteAddress)", for: .debug)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
udpHole.send(type: type, data: data, remoteAddress: remoteAddress)
|
||||||
|
case .v6:
|
||||||
|
guard let udpHoleV6 = self.udpHoleV6 else {
|
||||||
|
SDLLogger.log("[SDLContext] udpHoleV6 is nil for remoteAddress: \(remoteAddress)", for: .debug)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
udpHoleV6.send(type: type, data: data, remoteAddress: remoteAddress)
|
||||||
|
default:
|
||||||
|
SDLLogger.log("[SDLContext] unsupported socket family: \(remoteAddress)", for: .debug)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private func spawnLoop(_ body: @escaping () async throws -> Void) -> Task<Void, Never> {
|
private func spawnLoop(_ body: @escaping () async throws -> Void) -> Task<Void, Never> {
|
||||||
return Task.detached {
|
return Task.detached {
|
||||||
while !Task.isCancelled {
|
while !Task.isCancelled {
|
||||||
@ -607,6 +669,8 @@ actor SDLContextActor {
|
|||||||
deinit {
|
deinit {
|
||||||
self.udpHole = nil
|
self.udpHole = nil
|
||||||
self.udpHoleLocalAddress = nil
|
self.udpHoleLocalAddress = nil
|
||||||
|
self.udpHoleV6 = nil
|
||||||
|
self.udpHoleV6LocalAddress = nil
|
||||||
self.dnsClient = nil
|
self.dnsClient = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -768,7 +832,7 @@ extension SDLContextActor {
|
|||||||
// 处理从Hole收到的数据
|
// 处理从Hole收到的数据
|
||||||
extension SDLContextActor {
|
extension SDLContextActor {
|
||||||
|
|
||||||
private func consumeUDPHoleMessages(stream: AsyncStream<(SocketAddress, SDLHoleMessage)>, localAddress: SocketAddress) async {
|
private func consumeUDPHoleMessages(stream: AsyncStream<(SocketAddress, SDLHoleMessage)>, localAddress: SocketAddress, source: UDPHoleKind) async {
|
||||||
for await (remoteAddress, message) in stream {
|
for await (remoteAddress, message) in stream {
|
||||||
if Task.isCancelled {
|
if Task.isCancelled {
|
||||||
break
|
break
|
||||||
@ -776,16 +840,19 @@ extension SDLContextActor {
|
|||||||
|
|
||||||
switch message.inboundMessage {
|
switch message.inboundMessage {
|
||||||
case .control(let controlMessage):
|
case .control(let controlMessage):
|
||||||
await self.handleHoleControlMessage(controlMessage, localAddress: localAddress, remoteAddress: remoteAddress)
|
await self.handleHoleControlMessage(controlMessage, localAddress: localAddress, remoteAddress: remoteAddress, source: source)
|
||||||
case .data(let data):
|
case .data(let data):
|
||||||
try? await self.handleHoleData(data: data)
|
try? await self.handleHoleData(data: data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func handleHoleControlMessage(_ message: SDLHoleControlMessage, localAddress: SocketAddress, remoteAddress: SocketAddress) async {
|
private func handleHoleControlMessage(_ message: SDLHoleControlMessage, localAddress: SocketAddress, remoteAddress: SocketAddress, source: UDPHoleKind) async {
|
||||||
switch message {
|
switch message {
|
||||||
case .stunProbeReply(let probeReply):
|
case .stunProbeReply(let probeReply):
|
||||||
|
guard source == .v4 else {
|
||||||
|
return
|
||||||
|
}
|
||||||
await self.proberActor.handleProbeReply(localAddress: localAddress, reply: probeReply)
|
await self.proberActor.handleProbeReply(localAddress: localAddress, reply: probeReply)
|
||||||
case .register(let register):
|
case .register(let register):
|
||||||
try? self.handleRegister(remoteAddress: remoteAddress, register: register)
|
try? self.handleRegister(remoteAddress: remoteAddress, register: register)
|
||||||
|
|||||||
@ -13,7 +13,7 @@ actor SDLPuncherActor {
|
|||||||
nonisolated private let cooldownInterval: TimeInterval = 10
|
nonisolated private let cooldownInterval: TimeInterval = 10
|
||||||
// 等待peerInfo返回的超时时间
|
// 等待peerInfo返回的超时时间
|
||||||
nonisolated private let peerInfoTimeout: TimeInterval = 3
|
nonisolated private let peerInfoTimeout: TimeInterval = 3
|
||||||
|
|
||||||
struct RegisterRequest {
|
struct RegisterRequest {
|
||||||
let srcMac: Data
|
let srcMac: Data
|
||||||
let dstMac: Data
|
let dstMac: Data
|
||||||
@ -93,7 +93,7 @@ actor SDLPuncherActor {
|
|||||||
quicClient.send(type: .queryInfo, data: queryData)
|
quicClient.send(type: .queryInfo, data: queryData)
|
||||||
}
|
}
|
||||||
|
|
||||||
func handlePeerInfo(using udpHole: SDLUDPHole?, peerInfo: SDLPeerInfo) async {
|
func handlePeerInfo(using udpHole: SDLUDPHole?, udpHoleV6: SDLUDPHoleV6?, peerInfo: SDLPeerInfo) async {
|
||||||
let now = Date()
|
let now = Date()
|
||||||
self.cleanupExpiredEntries(now: now)
|
self.cleanupExpiredEntries(now: now)
|
||||||
|
|
||||||
@ -108,8 +108,8 @@ actor SDLPuncherActor {
|
|||||||
entry.markCoolingDown()
|
entry.markCoolingDown()
|
||||||
self.requestEntries[peerInfo.dstMac] = entry
|
self.requestEntries[peerInfo.dstMac] = entry
|
||||||
|
|
||||||
guard let udpHole else {
|
guard udpHole != nil || udpHoleV6 != nil else {
|
||||||
SDLLogger.log("[SDLPuncherActor] udpHole is nil when peerInfo arrived", for: .debug)
|
SDLLogger.log("[SDLPuncherActor] udpHole and udpHoleV6 are nil when peerInfo arrived", for: .debug)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -127,7 +127,7 @@ actor SDLPuncherActor {
|
|||||||
if peerInfo.hasV4Info {
|
if peerInfo.hasV4Info {
|
||||||
if let remoteAddress = try? await peerInfo.v4Info.socketAddress() {
|
if let remoteAddress = try? await peerInfo.v4Info.socketAddress() {
|
||||||
SDLLogger.log("[SDLContext] hole sock address: \(remoteAddress)", for: .debug)
|
SDLLogger.log("[SDLContext] hole sock address: \(remoteAddress)", for: .debug)
|
||||||
udpHole.send(type: .register, data: registerData, remoteAddress: remoteAddress)
|
self.sendRegister(using: udpHole, udpHoleV6: udpHoleV6, registerData: registerData, remoteAddress: remoteAddress)
|
||||||
} else {
|
} else {
|
||||||
SDLLogger.log("[SDLPuncherActor] failed to resolve peerInfo.v4Info", for: .debug)
|
SDLLogger.log("[SDLPuncherActor] failed to resolve peerInfo.v4Info", for: .debug)
|
||||||
}
|
}
|
||||||
@ -136,7 +136,7 @@ actor SDLPuncherActor {
|
|||||||
if peerInfo.hasV6Info {
|
if peerInfo.hasV6Info {
|
||||||
if let remoteAddress = try? await peerInfo.v6Info.socketAddress() {
|
if let remoteAddress = try? await peerInfo.v6Info.socketAddress() {
|
||||||
SDLLogger.log("[SDLContext] hole sock address v6: \(remoteAddress)", for: .debug)
|
SDLLogger.log("[SDLContext] hole sock address v6: \(remoteAddress)", for: .debug)
|
||||||
udpHole.send(type: .register, data: registerData, remoteAddress: remoteAddress)
|
self.sendRegister(using: udpHole, udpHoleV6: udpHoleV6, registerData: registerData, remoteAddress: remoteAddress)
|
||||||
} else {
|
} else {
|
||||||
SDLLogger.log("[SDLPuncherActor] failed to resolve peerInfo.v6Info", for: .debug)
|
SDLLogger.log("[SDLPuncherActor] failed to resolve peerInfo.v6Info", for: .debug)
|
||||||
}
|
}
|
||||||
@ -156,6 +156,28 @@ actor SDLPuncherActor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func sendRegister(using udpHole: SDLUDPHole?,
|
||||||
|
udpHoleV6: SDLUDPHoleV6?,
|
||||||
|
registerData: Data,
|
||||||
|
remoteAddress: SocketAddress) {
|
||||||
|
switch remoteAddress {
|
||||||
|
case .v4:
|
||||||
|
guard let udpHole else {
|
||||||
|
SDLLogger.log("[SDLPuncherActor] udpHole is nil when v4 peerInfo arrived", for: .debug)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
udpHole.send(type: .register, data: registerData, remoteAddress: remoteAddress)
|
||||||
|
case .v6:
|
||||||
|
guard let udpHoleV6 else {
|
||||||
|
SDLLogger.log("[SDLPuncherActor] udpHoleV6 is nil when v6 peerInfo arrived", for: .debug)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
udpHoleV6.send(type: .register, data: registerData, remoteAddress: remoteAddress)
|
||||||
|
default:
|
||||||
|
SDLLogger.log("[SDLPuncherActor] unsupported peer address family: \(remoteAddress)", for: .debug)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
deinit {
|
deinit {
|
||||||
self.cleanupTask?.cancel()
|
self.cleanupTask?.cancel()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -44,8 +44,8 @@ final class SDLUDPHole: ChannelInboundHandler {
|
|||||||
channel.pipeline.addHandler(self)
|
channel.pipeline.addHandler(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 绑定到IPv6通配地址,依赖SwiftNIO创建dual-stack socket,同时接收IPv4/IPv6流量
|
// 绑定到IPv4通配地址,只处理IPv4流量
|
||||||
let channel = try bootstrap.bind(host: "::", port: 0).wait()
|
let channel = try bootstrap.bind(host: "0.0.0.0", port: 0).wait()
|
||||||
self.channel = channel
|
self.channel = channel
|
||||||
self.closeFuture = channel.closeFuture
|
self.closeFuture = channel.closeFuture
|
||||||
self.state = .ready
|
self.state = .ready
|
||||||
@ -70,6 +70,7 @@ final class SDLUDPHole: ChannelInboundHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func stop() {
|
func stop() {
|
||||||
|
SDLLogger.log("[SDLUDPHole] waitClose stop", for: .debug)
|
||||||
switch self.state {
|
switch self.state {
|
||||||
case .stopping, .stopped:
|
case .stopping, .stopped:
|
||||||
return
|
return
|
||||||
@ -116,6 +117,7 @@ final class SDLUDPHole: ChannelInboundHandler {
|
|||||||
self.finishMessageStream()
|
self.finishMessageStream()
|
||||||
self.channel = nil
|
self.channel = nil
|
||||||
self.state = .stopped
|
self.state = .stopped
|
||||||
|
SDLLogger.log("[SDLUDPHole] channelInactive", for: .debug)
|
||||||
}
|
}
|
||||||
|
|
||||||
func errorCaught(context: ChannelHandlerContext, error: any Error) {
|
func errorCaught(context: ChannelHandlerContext, error: any Error) {
|
||||||
@ -125,6 +127,7 @@ final class SDLUDPHole: ChannelInboundHandler {
|
|||||||
self.state = .stopping
|
self.state = .stopping
|
||||||
}
|
}
|
||||||
context.close(promise: nil)
|
context.close(promise: nil)
|
||||||
|
SDLLogger.log("[SDLUDPHole] errorCaught", for: .debug)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MARK: 处理写入逻辑
|
// MARK: 处理写入逻辑
|
||||||
@ -198,6 +201,7 @@ final class SDLUDPHole: ChannelInboundHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
deinit {
|
deinit {
|
||||||
|
SDLLogger.log("[SDLUDPHole] closeWait deinit", for: .debug)
|
||||||
self.stop()
|
self.stop()
|
||||||
try? self.group.syncShutdownGracefully()
|
try? self.group.syncShutdownGracefully()
|
||||||
}
|
}
|
||||||
|
|||||||
205
Tun/Punchnet/SDLUDPHoleV6.swift
Normal file
205
Tun/Punchnet/SDLUDPHoleV6.swift
Normal file
@ -0,0 +1,205 @@
|
|||||||
|
//
|
||||||
|
// SDLUDPHoleV6.swift
|
||||||
|
// Tun
|
||||||
|
//
|
||||||
|
// Created by 安礼成 on 2026/4/15.
|
||||||
|
//
|
||||||
|
|
||||||
|
import Foundation
|
||||||
|
import NIOCore
|
||||||
|
import NIOPosix
|
||||||
|
import SwiftProtobuf
|
||||||
|
|
||||||
|
// 处理和sn-server服务器之间的通讯
|
||||||
|
final class SDLUDPHoleV6: ChannelInboundHandler {
|
||||||
|
typealias InboundIn = AddressedEnvelope<ByteBuffer>
|
||||||
|
|
||||||
|
private enum State: Equatable {
|
||||||
|
case idle
|
||||||
|
case ready
|
||||||
|
case stopping
|
||||||
|
case stopped
|
||||||
|
}
|
||||||
|
|
||||||
|
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||||
|
private var channel: Channel?
|
||||||
|
private var closeFuture: EventLoopFuture<Void>?
|
||||||
|
private var state: State = .idle
|
||||||
|
private var didFinishMessageStream: Bool = false
|
||||||
|
|
||||||
|
public let messageStream: AsyncStream<(SocketAddress, SDLHoleMessage)>
|
||||||
|
private let messageContinuation: AsyncStream<(SocketAddress, SDLHoleMessage)>.Continuation
|
||||||
|
|
||||||
|
// 启动函数
|
||||||
|
init() throws {
|
||||||
|
let (stream, continuation) = AsyncStream.makeStream(of: (SocketAddress, SDLHoleMessage).self, bufferingPolicy: .bufferingNewest(2048))
|
||||||
|
self.messageStream = stream
|
||||||
|
self.messageContinuation = continuation
|
||||||
|
}
|
||||||
|
|
||||||
|
func start() throws -> SocketAddress {
|
||||||
|
let bootstrap = DatagramBootstrap(group: group)
|
||||||
|
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
||||||
|
.channelInitializer { channel in
|
||||||
|
channel.pipeline.addHandler(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 绑定到IPv6通配地址,只处理IPv6流量
|
||||||
|
let channel = try bootstrap.bind(host: "::", port: 0).wait()
|
||||||
|
self.channel = channel
|
||||||
|
self.closeFuture = channel.closeFuture
|
||||||
|
self.state = .ready
|
||||||
|
precondition(channel.localAddress != nil, "UDP v6 channel has no localAddress after bind")
|
||||||
|
|
||||||
|
return channel.localAddress!
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitClose() async throws {
|
||||||
|
switch self.state {
|
||||||
|
case .idle:
|
||||||
|
SDLLogger.log("[SDLUDPHoleV6] waitClose11", for: .debug)
|
||||||
|
return
|
||||||
|
case .ready, .stopping, .stopped:
|
||||||
|
guard let closeFuture = self.closeFuture else {
|
||||||
|
SDLLogger.log("[SDLUDPHoleV6] waitClose22", for: .debug)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
try await closeFuture.get()
|
||||||
|
SDLLogger.log("[SDLUDPHoleV6] waitClose33", for: .debug)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func stop() {
|
||||||
|
switch self.state {
|
||||||
|
case .stopping, .stopped:
|
||||||
|
return
|
||||||
|
case .idle:
|
||||||
|
self.state = .stopped
|
||||||
|
self.finishMessageStream()
|
||||||
|
return
|
||||||
|
case .ready:
|
||||||
|
self.state = .stopping
|
||||||
|
}
|
||||||
|
|
||||||
|
self.finishMessageStream()
|
||||||
|
self.channel?.close(promise: nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// --MARK: ChannelInboundHandler delegate
|
||||||
|
|
||||||
|
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||||
|
guard case .ready = self.state else {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
let envelope = unwrapInboundIn(data)
|
||||||
|
|
||||||
|
var buffer = envelope.data
|
||||||
|
let remoteAddress = envelope.remoteAddress
|
||||||
|
|
||||||
|
if let rawBytes = buffer.getBytes(at: buffer.readerIndex, length: buffer.readableBytes) {
|
||||||
|
SDLLogger.log("[SDLUDPHoleV6] get raw bytes: \(rawBytes.count), from: \(remoteAddress)", for: .debug)
|
||||||
|
}
|
||||||
|
|
||||||
|
do {
|
||||||
|
if let message = try decode(buffer: &buffer) {
|
||||||
|
self.messageContinuation.yield((remoteAddress, message))
|
||||||
|
} else {
|
||||||
|
SDLLogger.log("[SDLUDPHoleV6] decode message, get null", for: .debug)
|
||||||
|
}
|
||||||
|
} catch let err {
|
||||||
|
SDLLogger.log("[SDLUDPHoleV6] decode message, get error: \(err)", for: .debug)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func channelInactive(context: ChannelHandlerContext) {
|
||||||
|
self.finishMessageStream()
|
||||||
|
self.channel = nil
|
||||||
|
self.state = .stopped
|
||||||
|
}
|
||||||
|
|
||||||
|
func errorCaught(context: ChannelHandlerContext, error: any Error) {
|
||||||
|
SDLLogger.log("[SDLUDPHoleV6] channel error: \(error)", for: .debug)
|
||||||
|
self.finishMessageStream()
|
||||||
|
if self.state != .stopped {
|
||||||
|
self.state = .stopping
|
||||||
|
}
|
||||||
|
context.close(promise: nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MARK: 处理写入逻辑
|
||||||
|
func send(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) {
|
||||||
|
guard case .ready = self.state, let channel = self.channel else {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var buffer = channel.allocator.buffer(capacity: data.count + 1)
|
||||||
|
buffer.writeBytes([type.rawValue])
|
||||||
|
buffer.writeBytes(data)
|
||||||
|
|
||||||
|
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: remoteAddress, data: buffer)
|
||||||
|
_ = channel.eventLoop.submit {
|
||||||
|
channel.writeAndFlush(envelope, promise: nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --MARK: 编解码器
|
||||||
|
private func decode(buffer: inout ByteBuffer) throws -> SDLHoleMessage? {
|
||||||
|
guard let type = buffer.readInteger(as: UInt8.self),
|
||||||
|
let packetType = SDLPacketType(rawValue: type) else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
switch packetType {
|
||||||
|
case .data:
|
||||||
|
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
|
||||||
|
let dataPacket = try? SDLData(serializedBytes: bytes) else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return .data(dataPacket)
|
||||||
|
case .register:
|
||||||
|
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
|
||||||
|
let registerPacket = try? SDLRegister(serializedBytes: bytes) else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return .register(registerPacket)
|
||||||
|
case .registerAck:
|
||||||
|
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
|
||||||
|
let registerAck = try? SDLRegisterAck(serializedBytes: bytes) else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return .registerAck(registerAck)
|
||||||
|
case .stunProbeReply:
|
||||||
|
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
|
||||||
|
let stunProbeReply = try? SDLStunProbeReply(serializedBytes: bytes) else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return .stunProbeReply(stunProbeReply)
|
||||||
|
case .stunReply:
|
||||||
|
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
|
||||||
|
let stunReply = try? SDLStunReply(serializedBytes: bytes) else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return .stunReply(stunReply)
|
||||||
|
default:
|
||||||
|
SDLLogger.log("SDLUDPHoleV6 decode miss type: \(type)", for: .debug)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func finishMessageStream() {
|
||||||
|
guard !self.didFinishMessageStream else {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
self.didFinishMessageStream = true
|
||||||
|
self.messageContinuation.finish()
|
||||||
|
}
|
||||||
|
|
||||||
|
deinit {
|
||||||
|
self.stop()
|
||||||
|
try? self.group.syncShutdownGracefully()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -1,3 +1,3 @@
|
|||||||
#! /bin/sh
|
#! /bin/sh
|
||||||
|
|
||||||
log stream --predicate 'subsystem == "com.jihe.punchnet"' --info --style compact
|
log stream --predicate 'subsystem == "com.jihe.punchnet.debug"' --info --style compact
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user