fix tcp actor

This commit is contained in:
anlicheng 2025-08-01 10:44:47 +08:00
parent 3a0c21280c
commit 4c815397da

View File

@ -9,11 +9,7 @@ import Foundation
import NIOCore import NIOCore
import NIOPosix import NIOPosix
struct TcpMessage {
let packetId: UInt32
let type: SDLPacketType
let data: Data
}
// --MARK: SuperNode // --MARK: SuperNode
actor SDLSuperClientActor { actor SDLSuperClientActor {
@ -27,10 +23,12 @@ actor SDLSuperClientActor {
// id // id
var idGenerator = SDLIdGenerator(seed: 1) var idGenerator = SDLIdGenerator(seed: 1)
let host: String //
let port: Int struct TcpMessage {
let packetId: UInt32
private var pingCancel: AnyCancellable? let type: SDLPacketType
let data: Data
}
// //
enum SuperEvent { enum SuperEvent {
@ -40,12 +38,7 @@ actor SDLSuperClientActor {
case command(UInt32, SDLCommand) case command(UInt32, SDLCommand)
} }
init(host: String, port: Int) { init(host: String, port: Int) async throws {
self.host = host
self.port = port
}
init() async throws {
let bootstrap = ClientBootstrap(group: self.group) let bootstrap = ClientBootstrap(group: self.group)
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.channelInitializer { channel in .channelInitializer { channel in
@ -100,19 +93,19 @@ actor SDLSuperClientActor {
} }
} }
// --MARK:
group.addTask {
while !Task.isCancelled {
await self.ping()
try? await Task.sleep(nanoseconds: 5 * 1_000_000_000)
}
}
try await group.waitForAll() try await group.waitForAll()
} }
} }
} }
private func fireCallback(message: SDLSuperInboundMessage) {
if let promise = self.callbackPromises[message.msgId] {
self.asyncChannel.channel.eventLoop.execute {
promise.succeed(message)
}
self.callbackPromises.removeValue(forKey: message.msgId)
}
}
// -- MARK: apis // -- MARK: apis
@ -162,12 +155,6 @@ actor SDLSuperClientActor {
self.send(type: .flowTracer, packetId: 0, data: try! flow.serializedData()) self.send(type: .flowTracer, packetId: 0, data: try! flow.serializedData())
} }
// --MARK: ChannelInboundHandler
public func channelActive(context: ChannelHandlerContext) {
self.startPingTicker()
}
func write(type: SDLPacketType, data: Data) -> EventLoopFuture<SDLSuperInboundMessage> { func write(type: SDLPacketType, data: Data) -> EventLoopFuture<SDLSuperInboundMessage> {
SDLLogger.log("[SDLSuperTransport] will write data: \(data)", level: .debug) SDLLogger.log("[SDLSuperTransport] will write data: \(data)", level: .debug)
let packetId = idGenerator.nextId() let packetId = idGenerator.nextId()
@ -182,25 +169,24 @@ actor SDLSuperClientActor {
self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data)) self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data))
} }
// --MARK: //
private func fireCallback(message: SDLSuperInboundMessage) {
private func startPingTicker() { if let promise = self.callbackPromises[message.msgId] {
self.pingCancel = Timer.publish(every: 5.0, on: .main, in: .common).autoconnect() self.asyncChannel.channel.eventLoop.execute {
.sink { _ in promise.succeed(message)
// super-node }
self.ping() self.callbackPromises.removeValue(forKey: message.msgId)
} }
} }
deinit { deinit {
self.pingCancel?.cancel()
try! group.syncShutdownGracefully() try! group.syncShutdownGracefully()
} }
} }
// --MARK: // --MARK:
struct SDLSuperClientDecoder { private struct SDLSuperClientDecoder {
// : <<MsgId:32, Type:8, Body/binary>> // : <<MsgId:32, Type:8, Body/binary>>
static func decode(buffer: inout ByteBuffer) -> SDLSuperInboundMessage? { static func decode(buffer: inout ByteBuffer) -> SDLSuperInboundMessage? {
guard let msgId = buffer.readInteger(as: UInt32.self), guard let msgId = buffer.readInteger(as: UInt32.self),