// // SDLanServer.swift // Tun // // Created by 安礼成 on 2024/1/31. // import Foundation import NIOCore import NIOPosix // 处理和sn-server服务器之间的通讯 actor SDLUDPHoleActor { private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) private let asyncChannel: NIOAsyncChannel, AddressedEnvelope> private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: UDPMessage.self, bufferingPolicy: .unbounded) private var cookieGenerator = SDLIdGenerator(seed: 1) private var promises: [UInt32:EventLoopPromise] = [:] public var localAddress: SocketAddress? public let eventFlow: AsyncStream private let eventContinuation: AsyncStream.Continuation private let logger: SDLLogger // 依赖的外表能力 struct Capabilities { let logger: @Sendable (String) async -> Void } struct UDPMessage { let remoteAddress: SocketAddress let type: SDLPacketType let data: Data } // 定义事件类型 enum UDPEvent { case ready case message(SocketAddress, SDLHoleInboundMessage) case data(SDLData) } // 启动函数 init(logger: SDLLogger) async throws { self.logger = logger (self.eventFlow, self.eventContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded) let bootstrap = DatagramBootstrap(group: group) .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) self.asyncChannel = try await bootstrap.bind(host: "0.0.0.0", port: 0) .flatMapThrowing { channel in return try NIOAsyncChannel(wrappingChannelSynchronously: channel, configuration: .init( inboundType: AddressedEnvelope.self, outboundType: AddressedEnvelope.self )) } .get() self.localAddress = self.asyncChannel.channel.localAddress self.logger.log("[UDPHole] started and listening on: \(self.localAddress!)", level: .debug) } func start() async throws { try await withTaskCancellationHandler { try await self.asyncChannel.executeThenClose {inbound, outbound in self.eventContinuation.yield(.ready) try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { defer { self.logger.log("[SDLUDPHole] inbound closed", level: .warning) } for try await envelope in inbound { try Task.checkCancellation() var buffer = envelope.data let remoteAddress = envelope.remoteAddress do { if let message = try Self.decode(buffer: &buffer) { switch message { case .data(let data): self.logger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug) self.eventContinuation.yield(.data(data)) case .stunProbeReply(let probeReply): // 执行并移除回调 await self.trigger(probeReply: probeReply) default: self.eventContinuation.yield(.message(remoteAddress, message)) } } else { self.logger.log("[SDLUDPHole] decode message, get null", level: .warning) } } catch let err { self.logger.log("[SDLUDPHole] decode message, get error: \(err)", level: .warning) throw err } } } group.addTask { defer { self.logger.log("[SDLUDPHole] outbound closed", level: .warning) } for await message in self.writeStream { try Task.checkCancellation() var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1) buffer.writeBytes([message.type.rawValue]) buffer.writeBytes(message.data) let envelope = AddressedEnvelope(remoteAddress: message.remoteAddress, data: buffer) try await outbound.write(envelope) } } if let _ = try await group.next() { group.cancelAll() } } } } onCancel: { self.writeContinuation.finish() self.eventContinuation.finish() self.logger.log("[SDLUDPHole] withTaskCancellationHandler cancel") } } func getCookieId() -> UInt32 { return self.cookieGenerator.nextId() } // 探测tun信息 func stunProbe(remoteAddress: SocketAddress, attr: SDLProbeAttr = .none, timeout: Int = 5) async throws -> SDLStunProbeReply { return try await self._stunProbe(remoteAddress: remoteAddress, attr: attr, timeout: timeout).get() } private func _stunProbe(remoteAddress: SocketAddress, attr: SDLProbeAttr = .none, timeout: Int) -> EventLoopFuture { let cookie = self.cookieGenerator.nextId() var stunProbe = SDLStunProbe() stunProbe.cookie = cookie stunProbe.attr = UInt32(attr.rawValue) self.send( type: .stunProbe, data: try! stunProbe.serializedData(), remoteAddress: remoteAddress) self.logger.log("[SDLUDPHole] stunProbe: \(remoteAddress)", level: .debug) let promise = self.asyncChannel.channel.eventLoop.makePromise(of: SDLStunProbeReply.self) self.promises[cookie] = promise return promise.futureResult } private func trigger(probeReply: SDLStunProbeReply) { let id = probeReply.cookie // 执行并移除回调 if let promise = self.promises[id] { self.asyncChannel.channel.eventLoop.execute { promise.succeed(probeReply) } self.promises.removeValue(forKey: id) } } // MARK: client-client apis // 处理写入逻辑 func send(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) { let message = UDPMessage(remoteAddress: remoteAddress, type: type, data: data) self.writeContinuation.yield(message) } //--MARK: 编解码器 private static func decode(buffer: inout ByteBuffer) throws -> SDLHoleInboundMessage? { guard let type = buffer.readInteger(as: UInt8.self), let packetType = SDLPacketType(rawValue: type), let bytes = buffer.readBytes(length: buffer.readableBytes) else { return nil } switch packetType { case .data: let dataPacket = try SDLData(serializedBytes: bytes) return .data(dataPacket) case .register: let registerPacket = try SDLRegister(serializedBytes: bytes) return .register(registerPacket) case .registerAck: let registerAck = try SDLRegisterAck(serializedBytes: bytes) return .registerAck(registerAck) case .stunReply: let stunReply = try SDLStunReply(serializedBytes: bytes) return .stunReply(stunReply) case .stunProbeReply: let stunProbeReply = try SDLStunProbeReply(serializedBytes: bytes) return .stunProbeReply(stunProbeReply) default: return nil } } deinit { try? self.group.syncShutdownGracefully() self.writeContinuation.finish() self.eventContinuation.finish() } }