// // SDLanServer.swift // Tun // // Created by 安礼成 on 2024/1/31. // import Foundation import NIOCore import NIOPosix struct UDPMessage { let remoteAddress: SocketAddress let type: SDLPacketType let data: Data } // 处理和sn-server服务器之间的通讯 actor SDLUDPHole { private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) private let asyncChannel: NIOAsyncChannel, AddressedEnvelope> private let (writeStream, continuation) = AsyncStream.makeStream(of: UDPMessage.self, bufferingPolicy: .unbounded) private var cookieGenerator = SDLIdGenerator(seed: 1) private var promises: [UInt32:EventLoopPromise] = [:] public var localAddress: SocketAddress? public let (eventFlow, inboundContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded) // 定义事件类型 enum UDPEvent { case ready case closed case message(SocketAddress, SDLHoleInboundMessage) case data(SDLData) } // 启动函数 init() async throws { let bootstrap = DatagramBootstrap(group: group) .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) self.asyncChannel = try await bootstrap.bind(host: "0.0.0.0", port: 0) .flatMapThrowing { channel in return try NIOAsyncChannel(wrappingChannelSynchronously: channel, configuration: .init( inboundType: AddressedEnvelope.self, outboundType: AddressedEnvelope.self )) } .get() self.localAddress = self.asyncChannel.channel.localAddress SDLLogger.log("[UDPHole] started and listening on: \(self.localAddress!)", level: .debug) } func start() async throws { try await self.asyncChannel.executeThenClose { inbound, outbound in try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { defer { self.inboundContinuation.finish() } for try await envelope in inbound { var buffer = envelope.data let remoteAddress = envelope.remoteAddress do { if let message = try Self.decode(buffer: &buffer) { switch message { case .data(let data): SDLLogger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug) self.inboundContinuation.yield(.data(data)) case .stunProbeReply(let probeReply): // 执行并移除回调 await self.trigger(probeReply: probeReply) default: self.inboundContinuation.yield(.message(remoteAddress, message)) } } else { SDLLogger.log("[SDLUDPHole] decode message, get null", level: .warning) } } catch let err { SDLLogger.log("[SDLUDPHole] decode message, get error: \(err)", level: .debug) } } } group.addTask { defer { self.continuation.finish() } for try await message in self.writeStream { var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1) buffer.writeBytes([message.type.rawValue]) buffer.writeBytes(message.data) let envelope = AddressedEnvelope(remoteAddress: message.remoteAddress, data: buffer) try await outbound.write(envelope) } } //eventFlow.send(.ready) try await group.waitForAll() } } } // MARK: super_node apis func stunRequest(context ctx: SDLContext) -> UInt32 { let cookie = self.cookieGenerator.nextId() let remoteAddress = ctx.config.stunSocketAddress var stunRequest = SDLStunRequest() stunRequest.cookie = cookie stunRequest.clientID = ctx.config.clientId stunRequest.networkID = ctx.devAddr.networkID stunRequest.ip = ctx.devAddr.netAddr stunRequest.mac = ctx.devAddr.mac stunRequest.natType = UInt32(ctx.natType.rawValue) SDLLogger.log("[SDLUDPHole] stunRequest: \(remoteAddress), host: \(ctx.config.stunServers[0].host):\(ctx.config.stunServers[0].ports[0])", level: .warning) self.send(remoteAddress: remoteAddress, type: .stunRequest, data: try! stunRequest.serializedData()) return cookie } // 探测tun信息 func stunProbe(remoteAddress: SocketAddress, attr: SDLProbeAttr = .none, timeout: Int = 5) async throws -> SDLStunProbeReply { return try await self._stunProbe(remoteAddress: remoteAddress, attr: attr, timeout: timeout).get() } private func _stunProbe(remoteAddress: SocketAddress, attr: SDLProbeAttr = .none, timeout: Int) -> EventLoopFuture { let cookie = self.cookieGenerator.nextId() var stunProbe = SDLStunProbe() stunProbe.cookie = cookie stunProbe.attr = UInt32(attr.rawValue) self.send(remoteAddress: remoteAddress, type: .stunProbe, data: try! stunProbe.serializedData()) SDLLogger.log("[SDLUDPHole] stunProbe: \(remoteAddress)", level: .warning) let promise = self.asyncChannel.channel.eventLoop.makePromise(of: SDLStunProbeReply.self) self.promises[cookie] = promise return promise.futureResult } private func trigger(probeReply: SDLStunProbeReply) { let id = probeReply.cookie // 执行并移除回调 if let promise = self.promises[id] { self.asyncChannel.channel.eventLoop.execute { promise.succeed(probeReply) } self.promises.removeValue(forKey: id) } } // MARK: client-client apis // 发送数据包到其他session func sendPacket(context ctx: SDLContext, session: Session, data: Data) { let remoteAddress = session.natAddress var dataPacket = SDLData() dataPacket.networkID = ctx.devAddr.networkID dataPacket.srcMac = ctx.devAddr.mac dataPacket.dstMac = session.dstMac dataPacket.ttl = 255 dataPacket.data = data if let packet = try? dataPacket.serializedData() { SDLLogger.log("[SDLUDPHole] sendPacket: \(remoteAddress), count: \(packet.count)", level: .debug) self.send(remoteAddress: remoteAddress, type: .data, data: packet) } } // 通过sn服务器转发数据包, data已经是加密过后的数据 func forwardPacket(context ctx: SDLContext, dst_mac: Data, data: Data) { let remoteAddress = ctx.config.stunSocketAddress var dataPacket = SDLData() dataPacket.networkID = ctx.devAddr.networkID dataPacket.srcMac = ctx.devAddr.mac dataPacket.dstMac = dst_mac dataPacket.ttl = 255 dataPacket.data = data if let packet = try? dataPacket.serializedData() { NSLog("[SDLContext] forward packet, remoteAddress: \(remoteAddress), data size: \(packet.count)") self.send(remoteAddress: remoteAddress, type: .data, data: packet) } } // 发送register包 func sendRegister(context ctx: SDLContext, remoteAddress: SocketAddress, dst_mac: Data) { var register = SDLRegister() register.networkID = ctx.devAddr.networkID register.srcMac = ctx.devAddr.mac register.dstMac = dst_mac if let packet = try? register.serializedData() { SDLLogger.log("[SDLUDPHole] SendRegister: \(remoteAddress), src_mac: \(LayerPacket.MacAddress.description(data: ctx.devAddr.mac)), dst_mac: \(LayerPacket.MacAddress.description(data: dst_mac))", level: .debug) self.send(remoteAddress: remoteAddress, type: .register, data: packet) } } // 回复registerAck func sendRegisterAck(context ctx: SDLContext, remoteAddress: SocketAddress, dst_mac: Data) { var registerAck = SDLRegisterAck() registerAck.networkID = ctx.devAddr.networkID registerAck.srcMac = ctx.devAddr.mac registerAck.dstMac = dst_mac if let packet = try? registerAck.serializedData() { SDLLogger.log("[SDLUDPHole] SendRegisterAck: \(remoteAddress), \(registerAck)", level: .debug) self.send(remoteAddress: remoteAddress, type: .registerAck, data: packet) } } // 处理写入逻辑 private func send(remoteAddress: SocketAddress, type: SDLPacketType, data: Data) { let message = UDPMessage(remoteAddress: remoteAddress, type: type, data: data) self.continuation.yield(message) } //--MARK: 编解码器 private static func decode(buffer: inout ByteBuffer) throws -> SDLHoleInboundMessage? { guard let type = buffer.readInteger(as: UInt8.self), let packetType = SDLPacketType(rawValue: type), let bytes = buffer.readBytes(length: buffer.readableBytes) else { SDLLogger.log("[SDLUDPHole] decode error", level: .error) return nil } switch packetType { case .data: let dataPacket = try SDLData(serializedBytes: bytes) return .data(dataPacket) case .register: let registerPacket = try SDLRegister(serializedBytes: bytes) return .register(registerPacket) case .registerAck: let registerAck = try SDLRegisterAck(serializedBytes: bytes) return .registerAck(registerAck) case .stunReply: let stunReply = try SDLStunReply(serializedBytes: bytes) return .stunReply(stunReply) case .stunProbeReply: let stunProbeReply = try SDLStunProbeReply(serializedBytes: bytes) return .stunProbeReply(stunProbeReply) default: return nil } } deinit { try? self.group.syncShutdownGracefully() } }