From aae0b333de796e13f2b730c93c09bf6697d291d2 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 1 Aug 2025 00:21:12 +0800 Subject: [PATCH] add udp async actor --- Package.resolved | 6 +- Package.swift | 3 +- Sources/Punchnet/SDLUDPHole.swift | 1 + Sources/Punchnet/SDLUDPHoleActor.swift | 260 +++++++++++++++++++++++++ 4 files changed, 266 insertions(+), 4 deletions(-) create mode 100644 Sources/Punchnet/SDLUDPHoleActor.swift diff --git a/Package.resolved b/Package.resolved index 0821caa..2d41a25 100644 --- a/Package.resolved +++ b/Package.resolved @@ -1,5 +1,5 @@ { - "originHash" : "c6ae4911d55046c288e0ecdc9de28a95dcb591d3daaa03914664a3d2e069977a", + "originHash" : "597c0437584251872938aab4e6fd3cc1fc28e6da31c045adee36661e9c049c5e", "pins" : [ { "identity" : "swift-atomics", @@ -24,8 +24,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/apple/swift-nio.git", "state" : { - "revision" : "ad6b5f17270a7008f60d35ec5378e6144a575162", - "version" : "2.84.0" + "revision" : "a5fea865badcb1c993c85b0f0e8d05a4bd2270fb", + "version" : "2.85.0" } }, { diff --git a/Package.swift b/Package.swift index b51db5c..9113375 100644 --- a/Package.swift +++ b/Package.swift @@ -16,7 +16,7 @@ let package = Package( targets: ["Punchnet"]), ], dependencies: [ - .package(url: "https://github.com/apple/swift-nio.git", from: "2.84.0"), + .package(url: "https://github.com/apple/swift-nio.git", exact: "2.85.0"), .package(url: "https://github.com/apple/swift-protobuf.git", from: "1.26.0") ], @@ -28,6 +28,7 @@ let package = Package( dependencies: [ .product(name: "NIOCore", package: "swift-nio"), .product(name: "NIOPosix", package: "swift-nio"), + .product(name: "NIOFoundationCompat", package: "swift-nio"), .product(name: "SwiftProtobuf", package: "swift-protobuf") ] ), diff --git a/Sources/Punchnet/SDLUDPHole.swift b/Sources/Punchnet/SDLUDPHole.swift index 37001d2..a0c8022 100644 --- a/Sources/Punchnet/SDLUDPHole.swift +++ b/Sources/Punchnet/SDLUDPHole.swift @@ -147,6 +147,7 @@ class SDLUDPHole: ChannelInboundHandler, @unchecked Sendable { // 启动函数 func start() async throws { + let bootstrap = DatagramBootstrap(group: self.group) .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .channelInitializer { channel in diff --git a/Sources/Punchnet/SDLUDPHoleActor.swift b/Sources/Punchnet/SDLUDPHoleActor.swift new file mode 100644 index 0000000..d93e222 --- /dev/null +++ b/Sources/Punchnet/SDLUDPHoleActor.swift @@ -0,0 +1,260 @@ +// +// SDLanServer.swift +// Tun +// +// Created by 安礼成 on 2024/1/31. +// + +import Foundation +import NIOCore +import NIOPosix + +struct UDPMessage { + let remoteAddress: SocketAddress + let type: SDLPacketType + let data: Data +} + +// 处理和sn-server服务器之间的通讯 +actor SDLUDPHoleActor { + private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + private let asyncChannel: NIOAsyncChannel, AddressedEnvelope> + private let (writeStream, continuation) = AsyncStream.makeStream(of: UDPMessage.self, bufferingPolicy: .unbounded) + + private var cookieGenerator = SDLIdGenerator(seed: 1) + private var promises: [UInt32:EventLoopPromise] = [:] + public var localAddress: SocketAddress? + + public let (eventFlow, inboundContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded) + + // 定义事件类型 + enum UDPEvent { + case ready + case closed + case message(SocketAddress, SDLHoleInboundMessage) + case data(SDLData) + } + + // 启动函数 + init() async throws { + let bootstrap = DatagramBootstrap(group: group) + .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) + + self.asyncChannel = try await bootstrap.bind(host: "0.0.0.0", port: 0) + .flatMapThrowing { channel in + return try NIOAsyncChannel(wrappingChannelSynchronously: channel, configuration: .init( + inboundType: AddressedEnvelope.self, + outboundType: AddressedEnvelope.self + )) + } + .get() + + self.localAddress = self.asyncChannel.channel.localAddress + SDLLogger.log("[UDPHole] started and listening on: \(self.localAddress!)", level: .debug) + + try await self.asyncChannel.executeThenClose { inbound, outbound in + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + defer { + self.inboundContinuation.finish() + } + + for try await envelope in inbound { + var buffer = envelope.data + let remoteAddress = envelope.remoteAddress + do { + if let message = try Self.decode(buffer: &buffer) { + switch message { + case .data(let data): + SDLLogger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug) + self.inboundContinuation.yield(.data(data)) + case .stunProbeReply(let probeReply): + // 执行并移除回调 + await self.trigger(probeReply: probeReply) + default: + self.inboundContinuation.yield(.message(remoteAddress, message)) + } + } else { + SDLLogger.log("[SDLUDPHole] decode message, get null", level: .warning) + } + } catch let err { + SDLLogger.log("[SDLUDPHole] decode message, get error: \(err)", level: .debug) + } + } + } + + group.addTask { + defer { + self.continuation.finish() + } + + for try await message in self.writeStream { + var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1) + buffer.writeBytes([message.type.rawValue]) + buffer.writeBytes(message.data) + + let envelope = AddressedEnvelope(remoteAddress: message.remoteAddress, data: buffer) + try await outbound.write(envelope) + } + } + + //eventFlow.send(.ready) + + try await group.waitForAll() + } + } + } + + // MARK: super_node apis + + func stunRequest(context ctx: SDLContext) -> UInt32 { + let cookie = self.cookieGenerator.nextId() + let remoteAddress = ctx.config.stunSocketAddress + + var stunRequest = SDLStunRequest() + stunRequest.cookie = cookie + stunRequest.clientID = ctx.config.clientId + stunRequest.networkID = ctx.devAddr.networkID + stunRequest.ip = ctx.devAddr.netAddr + stunRequest.mac = ctx.devAddr.mac + stunRequest.natType = UInt32(ctx.natType.rawValue) + + SDLLogger.log("[SDLUDPHole] stunRequest: \(remoteAddress), host: \(ctx.config.stunServers[0].host):\(ctx.config.stunServers[0].ports[0])", level: .warning) + + self.send(remoteAddress: remoteAddress, type: .stunRequest, data: try! stunRequest.serializedData()) + + return cookie + } + + // 探测tun信息 + func stunProbe(remoteAddress: SocketAddress, attr: SDLProbeAttr = .none, timeout: Int = 5) async throws -> SDLStunProbeReply { + return try await self._stunProbe(remoteAddress: remoteAddress, attr: attr, timeout: timeout).get() + } + + private func _stunProbe(remoteAddress: SocketAddress, attr: SDLProbeAttr = .none, timeout: Int) -> EventLoopFuture { + let cookie = self.cookieGenerator.nextId() + var stunProbe = SDLStunProbe() + stunProbe.cookie = cookie + stunProbe.attr = UInt32(attr.rawValue) + self.send(remoteAddress: remoteAddress, type: .stunProbe, data: try! stunProbe.serializedData()) + SDLLogger.log("[SDLUDPHole] stunProbe: \(remoteAddress)", level: .warning) + + let promise = self.asyncChannel.channel.eventLoop.makePromise(of: SDLStunProbeReply.self) + self.promises[cookie] = promise + + return promise.futureResult + } + + private func trigger(probeReply: SDLStunProbeReply) { + let id = probeReply.cookie + // 执行并移除回调 + if let promise = self.promises[id] { + self.asyncChannel.channel.eventLoop.execute { + promise.succeed(probeReply) + } + self.promises.removeValue(forKey: id) + } + } + + // MARK: client-client apis + + // 发送数据包到其他session + func sendPacket(context ctx: SDLContext, session: Session, data: Data) { + let remoteAddress = session.natAddress + + var dataPacket = SDLData() + dataPacket.networkID = ctx.devAddr.networkID + dataPacket.srcMac = ctx.devAddr.mac + dataPacket.dstMac = session.dstMac + dataPacket.ttl = 255 + dataPacket.data = data + if let packet = try? dataPacket.serializedData() { + SDLLogger.log("[SDLUDPHole] sendPacket: \(remoteAddress), count: \(packet.count)", level: .debug) + self.send(remoteAddress: remoteAddress, type: .data, data: packet) + } + } + + // 通过sn服务器转发数据包, data已经是加密过后的数据 + func forwardPacket(context ctx: SDLContext, dst_mac: Data, data: Data) { + let remoteAddress = ctx.config.stunSocketAddress + + var dataPacket = SDLData() + dataPacket.networkID = ctx.devAddr.networkID + dataPacket.srcMac = ctx.devAddr.mac + dataPacket.dstMac = dst_mac + dataPacket.ttl = 255 + dataPacket.data = data + + if let packet = try? dataPacket.serializedData() { + NSLog("[SDLContext] forward packet, remoteAddress: \(remoteAddress), data size: \(packet.count)") + self.send(remoteAddress: remoteAddress, type: .data, data: packet) + } + } + + // 发送register包 + func sendRegister(context ctx: SDLContext, remoteAddress: SocketAddress, dst_mac: Data) { + var register = SDLRegister() + register.networkID = ctx.devAddr.networkID + register.srcMac = ctx.devAddr.mac + register.dstMac = dst_mac + + if let packet = try? register.serializedData() { + SDLLogger.log("[SDLUDPHole] SendRegister: \(remoteAddress), src_mac: \(LayerPacket.MacAddress.description(data: ctx.devAddr.mac)), dst_mac: \(LayerPacket.MacAddress.description(data: dst_mac))", level: .debug) + self.send(remoteAddress: remoteAddress, type: .register, data: packet) + } + } + + // 回复registerAck + func sendRegisterAck(context ctx: SDLContext, remoteAddress: SocketAddress, dst_mac: Data) { + var registerAck = SDLRegisterAck() + registerAck.networkID = ctx.devAddr.networkID + registerAck.srcMac = ctx.devAddr.mac + registerAck.dstMac = dst_mac + + if let packet = try? registerAck.serializedData() { + SDLLogger.log("[SDLUDPHole] SendRegisterAck: \(remoteAddress), \(registerAck)", level: .debug) + self.send(remoteAddress: remoteAddress, type: .registerAck, data: packet) + } + } + + // 处理写入逻辑 + private func send(remoteAddress: SocketAddress, type: SDLPacketType, data: Data) { + let message = UDPMessage(remoteAddress: remoteAddress, type: type, data: data) + self.continuation.yield(message) + } + + //--MARK: 编解码器 + private static func decode(buffer: inout ByteBuffer) throws -> SDLHoleInboundMessage? { + guard let type = buffer.readInteger(as: UInt8.self), + let packetType = SDLPacketType(rawValue: type), + let bytes = buffer.readBytes(length: buffer.readableBytes) else { + SDLLogger.log("[SDLUDPHole] decode error", level: .error) + return nil + } + + switch packetType { + case .data: + let dataPacket = try SDLData(serializedBytes: bytes) + return .data(dataPacket) + case .register: + let registerPacket = try SDLRegister(serializedBytes: bytes) + return .register(registerPacket) + case .registerAck: + let registerAck = try SDLRegisterAck(serializedBytes: bytes) + return .registerAck(registerAck) + case .stunReply: + let stunReply = try SDLStunReply(serializedBytes: bytes) + return .stunReply(stunReply) + case .stunProbeReply: + let stunProbeReply = try SDLStunProbeReply(serializedBytes: bytes) + return .stunProbeReply(stunProbeReply) + default: + return nil + } + } + + deinit { + try? self.group.syncShutdownGracefully() + } + +}