// // SDLanServer.swift // Tun // // Created by 安礼成 on 2024/1/31. // import Foundation import NIOCore import NIOPosix import SwiftProtobuf // 处理和sn-server服务器之间的通讯 actor SDLUDPHoleActor { private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) private let asyncChannel: NIOAsyncChannel, AddressedEnvelope> private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: UDPHoleOutboundMessage.self, bufferingPolicy: .unbounded) public var localAddress: SocketAddress? public let eventStream: AsyncStream private let eventContinuation: AsyncStream.Continuation private let logger: SDLLogger struct UDPHoleOutboundMessage { let remoteAddress: SocketAddress let type: SDLPacketType let data: Data } enum UDPHoleEvent { case ready case message(SocketAddress, SDLHoleInboundMessage) } // 启动函数 init(logger: SDLLogger) async throws { self.logger = logger (self.eventStream, self.eventContinuation) = AsyncStream.makeStream(of: UDPHoleEvent.self, bufferingPolicy: .unbounded) let bootstrap = DatagramBootstrap(group: group) .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) self.asyncChannel = try await bootstrap.bind(host: "0.0.0.0", port: 0) .flatMapThrowing { channel in return try NIOAsyncChannel(wrappingChannelSynchronously: channel, configuration: .init( inboundType: AddressedEnvelope.self, outboundType: AddressedEnvelope.self )) } .get() self.localAddress = self.asyncChannel.channel.localAddress self.logger.log("[UDPHole] started and listening on: \(self.localAddress!)", level: .debug) } func start() async throws { try await self.asyncChannel.executeThenClose {inbound, outbound in self.eventContinuation.yield(.ready) try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { defer { self.logger.log("[SDLUDPHole] inbound closed", level: .warning) } for try await envelope in inbound { try Task.checkCancellation() var buffer = envelope.data let remoteAddress = envelope.remoteAddress do { if let message = try Self.decode(buffer: &buffer) { self.eventContinuation.yield(.message(remoteAddress, message)) } else { self.logger.log("[SDLUDPHole] decode message, get null", level: .warning) } } catch let err { self.logger.log("[SDLUDPHole] decode message, get error: \(err)", level: .warning) throw err } } } group.addTask { defer { self.logger.log("[SDLUDPHole] outbound closed", level: .warning) } for await message in self.writeStream { try Task.checkCancellation() var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1) buffer.writeBytes([message.type.rawValue]) buffer.writeBytes(message.data) let envelope = AddressedEnvelope(remoteAddress: message.remoteAddress, data: buffer) try await outbound.write(envelope) } } if let _ = try await group.next() { group.cancelAll() } } } } func getLocalAddress() -> SocketAddress? { return self.localAddress } // MARK: client-client apis // 处理写入逻辑 func send(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) { let message = UDPHoleOutboundMessage(remoteAddress: remoteAddress, type: type, data: data) self.writeContinuation.yield(message) } //--MARK: 编解码器 private static func decode(buffer: inout ByteBuffer) throws -> SDLHoleInboundMessage? { guard let type = buffer.readInteger(as: UInt8.self), let packetType = SDLPacketType(rawValue: type), let bytes = buffer.readBytes(length: buffer.readableBytes) else { return nil } switch packetType { case .data: let dataPacket = try SDLData(serializedBytes: bytes) return .data(dataPacket) case .register: let registerPacket = try SDLRegister(serializedBytes: bytes) return .register(registerPacket) case .registerAck: let registerAck = try SDLRegisterAck(serializedBytes: bytes) return .registerAck(registerAck) case .stunReply: let stunReply = try SDLStunReply(serializedBytes: bytes) return .stunReply(stunReply) case .stunProbeReply: let stunProbeReply = try SDLStunProbeReply(serializedBytes: bytes) return .stunProbeReply(stunProbeReply) case .registerSuperAck: guard let bytes = buffer.readBytes(length: buffer.readableBytes), let registerSuperAck = try? SDLRegisterSuperAck(serializedBytes: bytes) else { return nil } return .registerSuperAck(registerSuperAck) case .registerSuperNak: guard let bytes = buffer.readBytes(length: buffer.readableBytes), let registerSuperNak = try? SDLRegisterSuperNak(serializedBytes: bytes) else { return nil } return .registerSuperNak(registerSuperNak) case .peerInfo: guard let bytes = buffer.readBytes(length: buffer.readableBytes), let peerInfo = try? SDLPeerInfo(serializedBytes: bytes) else { return nil } return .peerInfo(peerInfo) case .event: guard let eventVal = buffer.readInteger(as: UInt8.self), let event = SDLEventType(rawValue: eventVal), let bytes = buffer.readBytes(length: buffer.readableBytes) else { return nil } switch event { case .natChanged: guard let natChangedEvent = try? SDLNatChangedEvent(serializedBytes: bytes) else { return nil } return .event(.natChanged(natChangedEvent)) case .sendRegister: guard let sendRegisterEvent = try? SDLSendRegisterEvent(serializedBytes: bytes) else { return nil } return .event(.sendRegister(sendRegisterEvent)) case .networkShutdown: guard let networkShutdownEvent = try? SDLNetworkShutdownEvent(serializedBytes: bytes) else { return nil } return .event(.networkShutdown(networkShutdownEvent)) } default: return nil } } deinit { try? self.group.syncShutdownGracefully() self.writeContinuation.finish() self.eventContinuation.finish() } }