From 4c815397da87a4baf7925c104972f37881599488 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 1 Aug 2025 10:44:47 +0800 Subject: [PATCH] fix tcp actor --- Sources/Punchnet/SDLSuperClientActor.swift | 64 +++++++++------------- 1 file changed, 25 insertions(+), 39 deletions(-) diff --git a/Sources/Punchnet/SDLSuperClientActor.swift b/Sources/Punchnet/SDLSuperClientActor.swift index f57441f..2c03aa9 100644 --- a/Sources/Punchnet/SDLSuperClientActor.swift +++ b/Sources/Punchnet/SDLSuperClientActor.swift @@ -9,11 +9,7 @@ import Foundation import NIOCore import NIOPosix -struct TcpMessage { - let packetId: UInt32 - let type: SDLPacketType - let data: Data -} + // --MARK: 和SuperNode的客户端 actor SDLSuperClientActor { @@ -27,10 +23,12 @@ actor SDLSuperClientActor { // id生成器 var idGenerator = SDLIdGenerator(seed: 1) - let host: String - let port: Int - - private var pingCancel: AnyCancellable? + // 发送的消息格式 + struct TcpMessage { + let packetId: UInt32 + let type: SDLPacketType + let data: Data + } // 定义事件类型 enum SuperEvent { @@ -40,12 +38,7 @@ actor SDLSuperClientActor { case command(UInt32, SDLCommand) } - init(host: String, port: Int) { - self.host = host - self.port = port - } - - init() async throws { + init(host: String, port: Int) async throws { let bootstrap = ClientBootstrap(group: self.group) .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .channelInitializer { channel in @@ -100,19 +93,19 @@ actor SDLSuperClientActor { } } + // --MARK: 心跳机制 + group.addTask { + while !Task.isCancelled { + await self.ping() + try? await Task.sleep(nanoseconds: 5 * 1_000_000_000) + } + } + try await group.waitForAll() } } } - - private func fireCallback(message: SDLSuperInboundMessage) { - if let promise = self.callbackPromises[message.msgId] { - self.asyncChannel.channel.eventLoop.execute { - promise.succeed(message) - } - self.callbackPromises.removeValue(forKey: message.msgId) - } - } + // -- MARK: apis @@ -162,12 +155,6 @@ actor SDLSuperClientActor { self.send(type: .flowTracer, packetId: 0, data: try! flow.serializedData()) } - // --MARK: ChannelInboundHandler - - public func channelActive(context: ChannelHandlerContext) { - self.startPingTicker() - } - func write(type: SDLPacketType, data: Data) -> EventLoopFuture { SDLLogger.log("[SDLSuperTransport] will write data: \(data)", level: .debug) let packetId = idGenerator.nextId() @@ -182,25 +169,24 @@ actor SDLSuperClientActor { self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data)) } - // --MARK: 心跳机制 - - private func startPingTicker() { - self.pingCancel = Timer.publish(every: 5.0, on: .main, in: .common).autoconnect() - .sink { _ in - // 保持和super-node的心跳机制 - self.ping() + // 处理回调函数 + private func fireCallback(message: SDLSuperInboundMessage) { + if let promise = self.callbackPromises[message.msgId] { + self.asyncChannel.channel.eventLoop.execute { + promise.succeed(message) } + self.callbackPromises.removeValue(forKey: message.msgId) + } } deinit { - self.pingCancel?.cancel() try! group.syncShutdownGracefully() } } // --MARK: 编解码器 -struct SDLSuperClientDecoder { +private struct SDLSuperClientDecoder { // 消息格式为: <> static func decode(buffer: inout ByteBuffer) -> SDLSuperInboundMessage? { guard let msgId = buffer.readInteger(as: UInt32.self),