This commit is contained in:
anlicheng 2026-02-20 00:20:11 +08:00
parent 54d62eaba8
commit c12d2216df
2 changed files with 76 additions and 116 deletions

View File

@ -144,28 +144,24 @@ actor SDLContextActor {
SDLLogger.shared.log("[SDLContext] start quic client ready") SDLLogger.shared.log("[SDLContext] start quic client ready")
self.quicWorker = Task.detached { self.quicWorker = Task.detached {
let reader = quicClient.getReader() let stream = await quicClient.messageStream()
while let frame = try? await reader.next() { for await message in stream {
if let message = SDLQUICCodec.decode(frame: frame) { switch message {
switch message { case .welcome(let welcome):
case .welcome(let welcome): SDLLogger.shared.log("[SDLContext] quic welcome: \(welcome)")
SDLLogger.shared.log("[SDLContext] quic welcome: \(welcome)") case .registerSuperAck(let registerSuperAck):
case .registerSuperAck(let registerSuperAck): await self.handleRegisterSuperAck(registerSuperAck: registerSuperAck)
await self.handleRegisterSuperAck(registerSuperAck: registerSuperAck) case .registerSuperNak(let registerSuperNak):
case .registerSuperNak(let registerSuperNak): await self.handleRegisterSuperNak(nakPacket: registerSuperNak)
await self.handleRegisterSuperNak(nakPacket: registerSuperNak) case .peerInfo(let peerInfo):
case .peerInfo(let peerInfo): SDLLogger.shared.log("[SDLContext] peer message: \(peerInfo)")
SDLLogger.shared.log("[SDLContext] peer message: \(peerInfo)") case .event(let event):
case .event(let event): await self.handleEvent(event: event)
await self.handleEvent(event: event) case .policyReponse(let policyResponse):
case .policyReponse(let policyResponse): //
// await self.identifyStore.apply(policyResponse: policyResponse)
await self.identifyStore.apply(policyResponse: policyResponse)
}
} }
} }
} }
self.quicClient = quicClient self.quicClient = quicClient

View File

@ -19,60 +19,18 @@ enum SDLQUICError: Error {
} }
final class SDLQUICClient { final class SDLQUICClient {
private let transport: SDLQUICTransport private let connection: NWConnection
private let queue = DispatchQueue(label: "com.sdl.QUICClient.queue") // 线 private let queue = DispatchQueue(label: "com.sdl.QUICClient.queue") // 线
private let (closeStream, closeCont) = AsyncStream.makeStream(of: Void.self) private let (closeStream, closeCont) = AsyncStream.makeStream(of: Void.self)
private let (readyStream, readyCont) = AsyncStream.makeStream(of: Void.self) private let (readyStream, readyCont) = AsyncStream.makeStream(of: Void.self)
init(host: String, port: UInt16) {
self.transport = SDLQUICTransport(host: host, port: port)
}
func start() {
self.transport.start(queue: self.queue) { event in
switch event {
case .ready:
self.readyCont.yield()
self.readyCont.finish()
case .failed(_), .cancelled:
self.closeCont.yield()
self.closeCont.finish()
}
}
}
func getReader() -> SDLQUICReader {
return transport.getReader()
}
func send(type: SDLPacketType, data: Data) {
self.transport.send(type: type, data: data)
}
func waitReady() async throws {
for await _ in readyStream {}
}
func waitClose() async {
for await _ in closeStream {}
}
func stop() {
self.transport.stop()
}
}
final class SDLQUICTransport {
enum Event { enum Event {
case ready case ready
case failed(Error) case failed(Error)
case cancelled case cancelled
} }
private let connection: NWConnection
init(host: String, port: UInt16) { init(host: String, port: UInt16) {
let options = NWProtocolQUIC.Options(alpn: ["punchnet/1.0"]) let options = NWProtocolQUIC.Options(alpn: ["punchnet/1.0"])
@ -90,24 +48,31 @@ final class SDLQUICTransport {
self.connection = NWConnection(host: .init(host), port: .init(rawValue: port)!, using: params) self.connection = NWConnection(host: .init(host), port: .init(rawValue: port)!, using: params)
} }
func start(queue: DispatchQueue, onEvent: @escaping (Event) -> Void) { func start() {
SDLLogger.shared.log("[SDLQUICTransport] call start") SDLLogger.shared.log("[SDLQUICTransport] call start")
connection.stateUpdateHandler = { state in connection.stateUpdateHandler = { state in
SDLLogger.shared.log("[SDLQUICTransport] new state: \(state)") SDLLogger.shared.log("[SDLQUICTransport] new state: \(state)")
switch state { switch state {
case .ready: onEvent(.ready) case .ready:
case .failed(let e): onEvent(.failed(e)) self.readyCont.yield()
case .cancelled: onEvent(.cancelled) self.readyCont.finish()
default: break case .failed(_), .cancelled:
self.closeCont.yield()
self.closeCont.finish()
default:
()
} }
} }
connection.start(queue: queue) connection.start(queue: self.queue)
} }
func getReader() -> SDLQUICReader { func messageStream() async -> AsyncStream<SDLQUICInboundMessage> {
return SDLQUICReader(connection: self.connection) let reader = SDLQUICReader(connection: self.connection)
await reader.start()
return await reader.messageStream
} }
func send(type: SDLPacketType, data: Data) { func send(type: SDLPacketType, data: Data) {
var len = UInt16(data.count + 1).bigEndian var len = UInt16(data.count + 1).bigEndian
@ -117,75 +82,70 @@ final class SDLQUICTransport {
connection.send(content: packet, completion: .contentProcessed { _ in }) connection.send(content: packet, completion: .contentProcessed { _ in })
} }
func waitReady() async throws {
for await _ in readyStream {}
}
func waitClose() async {
for await _ in closeStream {}
}
func stop() { func stop() {
connection.cancel() self.connection.cancel()
} }
} }
actor SDLQUICReader: AsyncIteratorProtocol { actor SDLQUICReader {
typealias Element = ByteBuffer
private let allocator = ByteBufferAllocator() private let allocator = ByteBufferAllocator()
private var buffer: ByteBuffer
//
private var packets: [ByteBuffer] = []
// 64K // 64K
private let maxPacketSize: Int private let maxPacketSize: Int
// 2M // 2M
private let maxBufferSize: Int private let maxBufferSize: Int
// public var messageStream: AsyncStream<SDLQUICInboundMessage>
private var isComplete: Bool = false private let messageCont: AsyncStream<SDLQUICInboundMessage>.Continuation
private var readTask: Task<Void, Never>?
private let connection: NWConnection private let connection: NWConnection
init(connection: NWConnection, maxPacketSize: Int = 64 * 1024, maxBufferSize: Int = 2 * 1024 * 1024) { init(connection: NWConnection, maxPacketSize: Int = 64 * 1024, maxBufferSize: Int = 2 * 1024 * 1024) {
self.connection = connection self.connection = connection
self.maxBufferSize = maxBufferSize self.maxBufferSize = maxBufferSize
self.maxPacketSize = maxPacketSize self.maxPacketSize = maxPacketSize
self.buffer = allocator.buffer(capacity: maxBufferSize) (self.messageStream, self.messageCont) = AsyncStream.makeStream(of: SDLQUICInboundMessage.self)
} }
func next() async throws -> ByteBuffer? { func start() {
// self.readTask = Task {
if !self.packets.isEmpty { var buffer: ByteBuffer = allocator.buffer(capacity: self.maxBufferSize)
return self.packets.removeFirst() do {
} while !Task.isCancelled {
let (isComplete, data) = try await self.readOnce()
// if !data.isEmpty {
self.packets = try await self.readPacket() buffer.writeBytes(data)
if !self.packets.isEmpty { let frames = try parseFrames(buffer: &buffer)
return self.packets.removeFirst() for frame in frames {
} else { if let message = SDLQUICCodec.decode(frame: frame) {
return nil self.messageCont.yield(message)
} }
} }
}
private func readPacket() async throws -> [ByteBuffer] {
while true { if isComplete {
if self.isComplete { break
return try parseFrames() }
}
let (isComplete, data) = try await readOnce()
self.isComplete = isComplete
if !data.isEmpty {
buffer.writeBytes(data)
//
let packets = try parseFrames()
if !packets.isEmpty {
return packets
} }
self.messageCont.finish()
} catch {
self.messageCont.finish()
} }
} }
} }
// //
private func parseFrames() throws -> [ByteBuffer] { private func parseFrames(buffer: inout ByteBuffer) throws -> [ByteBuffer] {
guard buffer.readableBytes >= 2 else { guard buffer.readableBytes >= 2 else {
return [] return []
} }
@ -236,6 +196,11 @@ actor SDLQUICReader: AsyncIteratorProtocol {
} }
} }
deinit {
self.readTask?.cancel()
self.messageCont.finish()
}
} }
struct SDLQUICCodec { struct SDLQUICCodec {
@ -314,4 +279,3 @@ struct SDLQUICCodec {
} }
} }
} }