fix quicReader

This commit is contained in:
anlicheng 2026-02-19 23:50:18 +08:00
parent d8c6eb67a6
commit 54d62eaba8
3 changed files with 241 additions and 157 deletions

View File

@ -131,18 +131,41 @@ actor SDLContextActor {
} }
private func startQUICClient() async throws -> SDLQUICClient { private func startQUICClient() async throws -> SDLQUICClient {
self.quicWorker?.cancel()
self.quicClient?.stop()
// monitor // monitor
let quicClient = SDLQUICClient(host: "118.178.229.213", port: 1365) let quicClient = SDLQUICClient(host: "118.178.229.213", port: 1365)
quicClient.start() quicClient.start()
// quic // quic
try await quicClient.waitReady() try await quicClient.waitReady()
try await Task.sleep(for: .seconds(0.2))
SDLLogger.shared.log("[SDLContext] start quic client ready") SDLLogger.shared.log("[SDLContext] start quic client ready")
self.quicWorker = Task.detached { self.quicWorker = Task.detached {
for await message in quicClient.receiveStream(maxLen: 86400) { let reader = quicClient.getReader()
SDLLogger.shared.log("[SDLContext] quic client receive message: \(message)") while let frame = try? await reader.next() {
if let message = SDLQUICCodec.decode(frame: frame) {
switch message {
case .welcome(let welcome):
SDLLogger.shared.log("[SDLContext] quic welcome: \(welcome)")
case .registerSuperAck(let registerSuperAck):
await self.handleRegisterSuperAck(registerSuperAck: registerSuperAck)
case .registerSuperNak(let registerSuperNak):
await self.handleRegisterSuperNak(nakPacket: registerSuperNak)
case .peerInfo(let peerInfo):
SDLLogger.shared.log("[SDLContext] peer message: \(peerInfo)")
case .event(let event):
await self.handleEvent(event: event)
case .policyReponse(let policyResponse):
//
await self.identifyStore.apply(policyResponse: policyResponse)
}
}
} }
} }
self.quicClient = quicClient self.quicClient = quicClient
@ -258,11 +281,6 @@ actor SDLContextActor {
await self.handleRegisterAck(remoteAddress: remoteAddress, registerAck: registerAck) await self.handleRegisterAck(remoteAddress: remoteAddress, registerAck: registerAck)
case .data(let data): case .data(let data):
try? await self.handleData(data: data) try? await self.handleData(data: data)
// case .policyReponse(let policyResponse):
// SDLLogger.shared.log("[SDLContext] get a policyResponse: \(policyResponse.totalNum) of \(policyResponse.index), bytes: \(policyResponse.rules.count)")
// //
// await self.identifyStore.apply(policyResponse: policyResponse)
} }
} }
@ -278,8 +296,6 @@ actor SDLContextActor {
self.setNatType(natType: natType) self.setNatType(natType: natType)
SDLLogger.shared.log("[SDLContext] nat_type is: \(natType)") SDLLogger.shared.log("[SDLContext] nat_type is: \(natType)")
} }
return udpHole return udpHole
} }
@ -367,7 +383,7 @@ actor SDLContextActor {
} }
private func handleEvent(event: SDLEvent) async throws { private func handleEvent(event: SDLEvent) async {
switch event { switch event {
// case .dropMacs(let dropMacsEvent): // case .dropMacs(let dropMacsEvent):
// SDLLogger.shared.log("[SDLContext] drop macs", level: .info) // SDLLogger.shared.log("[SDLContext] drop macs", level: .info)
@ -385,7 +401,7 @@ actor SDLContextActor {
register.networkID = self.config.networkAddress.networkId register.networkID = self.config.networkAddress.networkId
register.srcMac = self.config.networkAddress.mac register.srcMac = self.config.networkAddress.mac
register.dstMac = sendRegisterEvent.dstMac register.dstMac = sendRegisterEvent.dstMac
self.udpHole?.send(type: .register, data: try register.serializedData(), remoteAddress: remoteAddress) self.udpHole?.send(type: .register, data: try! register.serializedData(), remoteAddress: remoteAddress)
} }
case .networkShutdown(let shutdownEvent): case .networkShutdown(let shutdownEvent):
let alertNotice = NoticeMessage.alert(alert: shutdownEvent.message) let alertNotice = NoticeMessage.alert(alert: shutdownEvent.message)
@ -576,10 +592,12 @@ actor SDLContextActor {
// //
private func dealPacket(packet: IPPacket) async { private func dealPacket(packet: IPPacket) async {
let networkAddr = self.config.networkAddress let networkAddr = self.config.networkAddress
// TODO
if SDLDNSClient.Helper.isDnsRequestPacket(ipPacket: packet) { if SDLDNSClient.Helper.isDnsRequestPacket(ipPacket: packet) {
let destIp = packet.header.destination_ip let destIp = packet.header.destination_ip
SDLLogger.shared.log("[DNSQuery] destIp: \(destIp), int: \(packet.header.destination.asIpAddress())", level: .debug) SDLLogger.shared.log("[DNSQuery] destIp: \(destIp), int: \(packet.header.destination.asIpAddress())", level: .debug)
self.dnsClient?.forward(ipPacket: packet) //self.dnsClient?.forward(ipPacket: packet)
return return
} }

View File

@ -15,15 +15,15 @@ enum SDLQUICError: Error {
case connectionCancelled case connectionCancelled
case timeout case timeout
case decodeError(String) case decodeError(String)
case packetTooLarge
} }
final class SDLQUICClient { final class SDLQUICClient {
private let transport: SDLQUICTransport private let transport: SDLQUICTransport
private let allocator = ByteBufferAllocator()
private let queue = DispatchQueue(label: "com.sdl.QUICClient.queue") // 线 private let queue = DispatchQueue(label: "com.sdl.QUICClient.queue") // 线
private var closeCont: CheckedContinuation<Void, Never>? private let (closeStream, closeCont) = AsyncStream.makeStream(of: Void.self)
private var readyCont: CheckedContinuation<Void, Error>? private let (readyStream, readyCont) = AsyncStream.makeStream(of: Void.self)
init(host: String, port: UInt16) { init(host: String, port: UInt16) {
self.transport = SDLQUICTransport(host: host, port: port) self.transport = SDLQUICTransport(host: host, port: port)
@ -33,23 +33,17 @@ final class SDLQUICClient {
self.transport.start(queue: self.queue) { event in self.transport.start(queue: self.queue) { event in
switch event { switch event {
case .ready: case .ready:
self.readyCont?.resume() self.readyCont.yield()
self.readyCont = nil self.readyCont.finish()
case .failed(_): case .failed(_), .cancelled:
self.closeCont?.resume() self.closeCont.yield()
self.closeCont = nil self.closeCont.finish()
case .cancelled:
self.closeCont?.resume()
self.closeCont = nil
} }
} }
} }
func receiveStream(maxLen: Int) -> AsyncCompactMapSequence<AsyncStream<Data>, SDLQUICInboundMessage> { func getReader() -> SDLQUICReader {
return transport.receiveMessageStream(maxLen: maxLen).compactMap { data in return transport.getReader()
var buf = self.allocator.buffer(bytes: data)
return try? QUICCodec.decode(buffer: &buf)
}
} }
func send(type: SDLPacketType, data: Data) { func send(type: SDLPacketType, data: Data) {
@ -57,23 +51,14 @@ final class SDLQUICClient {
} }
func waitReady() async throws { func waitReady() async throws {
return try await withCheckedThrowingContinuation { cont in for await _ in readyStream {}
self.readyCont = cont
}
} }
func waitClose() async { func waitClose() async {
return await withCheckedContinuation { cont in for await _ in closeStream {}
self.closeCont = cont
}
} }
deinit { func stop() {
self.readyCont?.resume(throwing: SDLQUICError.connectionCancelled)
self.readyCont = nil
self.closeCont?.resume()
self.closeCont = nil
self.transport.stop() self.transport.stop()
} }
@ -89,7 +74,19 @@ final class SDLQUICTransport {
private let connection: NWConnection private let connection: NWConnection
init(host: String, port: UInt16) { init(host: String, port: UInt16) {
let params = NWParameters(quic: .init(alpn: ["punchnet/1.0"])) let options = NWProtocolQUIC.Options(alpn: ["punchnet/1.0"])
// TODO
sec_protocol_options_set_verify_block(
options.securityProtocolOptions,
{ metadata, trust, complete in
//
complete(true) // true =
},
DispatchQueue.global()
)
let params = NWParameters(quic: options)
self.connection = NWConnection(host: .init(host), port: .init(rawValue: port)!, using: params) self.connection = NWConnection(host: .init(host), port: .init(rawValue: port)!, using: params)
} }
@ -107,56 +104,13 @@ final class SDLQUICTransport {
connection.start(queue: queue) connection.start(queue: queue)
} }
func receiveMessageStream(maxLen: Int) -> AsyncStream<Data> { func getReader() -> SDLQUICReader {
let connection = self.connection return SDLQUICReader(connection: self.connection)
return AsyncStream { continuation in
var buffer = Data()
func tryParse() {
while true {
//
guard buffer.count >= 2 else {
return
}
let len0 = UInt16(bigEndian: buffer.withUnsafeBytes { $0.load(as: UInt16.self) })
let len = Int(len0)
//
guard buffer.count >= 2 + len else {
return
}
// body
let body = buffer.subdata(in: 2 ..< 2 + len)
continuation.yield(body)
//
buffer.removeSubrange(0 ..< 2 + len)
}
}
func loopReceive() {
connection.receive(minimumIncompleteLength: 1, maximumLength: maxLen) { data, _, _, error in
if let data, !data.isEmpty {
buffer.append(data)
tryParse()
}
if error == nil {
loopReceive()
} else {
continuation.finish()
}
}
}
loopReceive()
}
} }
func send(type: SDLPacketType, data: Data) { func send(type: SDLPacketType, data: Data) {
var len = UInt16(data.count).bigEndian var len = UInt16(data.count + 1).bigEndian
var packet = Data(Data(bytes: &len, count: 2)) var packet = Data(Data(bytes: &len, count: 2))
packet.append(type.rawValue) packet.append(type.rawValue)
packet.append(data) packet.append(data)
@ -170,82 +124,194 @@ final class SDLQUICTransport {
} }
extension SDLQUICClient { actor SDLQUICReader: AsyncIteratorProtocol {
struct QUICCodec { typealias Element = ByteBuffer
// --MARK:
public static func decode(buffer: inout ByteBuffer) throws -> SDLQUICInboundMessage? { private let allocator = ByteBufferAllocator()
guard let type = buffer.readInteger(as: UInt8.self), private var buffer: ByteBuffer
let packetType = SDLPacketType(rawValue: type) else { //
return nil private var packets: [ByteBuffer] = []
// 64K
private let maxPacketSize: Int
// 2M
private let maxBufferSize: Int
//
private var isComplete: Bool = false
private let connection: NWConnection
init(connection: NWConnection, maxPacketSize: Int = 64 * 1024, maxBufferSize: Int = 2 * 1024 * 1024) {
self.connection = connection
self.maxBufferSize = maxBufferSize
self.maxPacketSize = maxPacketSize
self.buffer = allocator.buffer(capacity: maxBufferSize)
}
func next() async throws -> ByteBuffer? {
//
if !self.packets.isEmpty {
return self.packets.removeFirst()
}
//
self.packets = try await self.readPacket()
if !self.packets.isEmpty {
return self.packets.removeFirst()
} else {
return nil
}
}
private func readPacket() async throws -> [ByteBuffer] {
while true {
if self.isComplete {
return try parseFrames()
} }
switch packetType { let (isComplete, data) = try await readOnce()
case .welcome: self.isComplete = isComplete
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let welcome = try? SDLWelcome(serializedBytes: bytes) else { if !data.isEmpty {
return nil buffer.writeBytes(data)
//
let packets = try parseFrames()
if !packets.isEmpty {
return packets
} }
return .welcome(welcome) }
}
case .registerSuperAck: }
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let registerSuperAck = try? SDLRegisterSuperAck(serializedBytes: bytes) else { //
return nil private func parseFrames() throws -> [ByteBuffer] {
} guard buffer.readableBytes >= 2 else {
return .registerSuperAck(registerSuperAck) return []
case .registerSuperNak: }
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let registerSuperNak = try? SDLRegisterSuperNak(serializedBytes: bytes) else { var frames: [ByteBuffer] = []
return nil while true {
} guard let len = buffer.getInteger(at: buffer.readerIndex, endianness: .big, as: UInt16.self) else {
return .registerSuperNak(registerSuperNak) break
case .peerInfo: }
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let peerInfo = try? SDLPeerInfo(serializedBytes: bytes) else { if len > self.maxPacketSize {
return nil throw SDLQUICError.packetTooLarge
} }
return .peerInfo(peerInfo)
case .policyResponse: guard buffer.readableBytes >= len + 2 else {
guard let bytes = buffer.readBytes(length: buffer.readableBytes), break
let policyResponse = try? SDLPolicyResponse(serializedBytes: bytes) else { }
return nil
} buffer.moveReaderIndex(forwardBy: 2)
return .policyReponse(policyResponse) if let buf = buffer.readSlice(length: Int(len)) {
case .event: frames.append(buf)
guard let eventVal = buffer.readInteger(as: UInt8.self), }
let event = SDLEventType(rawValue: eventVal), }
let bytes = buffer.readBytes(length: buffer.readableBytes) else {
SDLLogger.shared.log("[SDLUDPHole] decode error 15") if buffer.readerIndex > maxBufferSize / 10 * 6 {
return nil buffer.discardReadBytes()
}
return frames
}
//
private func readOnce() async throws -> (Bool, Data) {
return try await withCheckedThrowingContinuation { cont in
connection.receive(minimumIncompleteLength: 1, maximumLength: maxPacketSize) { data, _, isComplete, error in
if let error {
cont.resume(throwing: error)
return
} }
switch event { if let data, !data.isEmpty {
case .natChanged: SDLLogger.shared.log("[SDLQUICTransport] read bytes: \(data.count)")
guard let natChangedEvent = try? SDLNatChangedEvent(serializedBytes: bytes) else { cont.resume(returning: (isComplete, data))
SDLLogger.shared.log("[SDLUDPHole] decode error 16") } else {
return nil cont.resume(returning: (isComplete, Data()))
}
return .event(.natChanged(natChangedEvent))
case .sendRegister:
guard let sendRegisterEvent = try? SDLSendRegisterEvent(serializedBytes: bytes) else {
SDLLogger.shared.log("[SDLUDPHole] decode error 17")
return nil
}
return .event(.sendRegister(sendRegisterEvent))
case .networkShutdown:
guard let networkShutdownEvent = try? SDLNetworkShutdownEvent(serializedBytes: bytes) else {
SDLLogger.shared.log("[SDLUDPHole] decode error 18")
return nil
}
return .event(.networkShutdown(networkShutdownEvent))
} }
default:
SDLLogger.shared.log("SDLUDPHole decode miss type: \(type)")
return nil
} }
} }
} }
} }
struct SDLQUICCodec {
// --MARK:
public static func decode(frame: ByteBuffer) -> SDLQUICInboundMessage? {
var buffer = frame
guard let type = buffer.readInteger(as: UInt8.self),
let packetType = SDLPacketType(rawValue: type) else {
return nil
}
switch packetType {
case .welcome:
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let welcome = try? SDLWelcome(serializedBytes: bytes) else {
return nil
}
return .welcome(welcome)
case .registerSuperAck:
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let registerSuperAck = try? SDLRegisterSuperAck(serializedBytes: bytes) else {
return nil
}
return .registerSuperAck(registerSuperAck)
case .registerSuperNak:
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let registerSuperNak = try? SDLRegisterSuperNak(serializedBytes: bytes) else {
return nil
}
return .registerSuperNak(registerSuperNak)
case .peerInfo:
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let peerInfo = try? SDLPeerInfo(serializedBytes: bytes) else {
return nil
}
return .peerInfo(peerInfo)
case .policyResponse:
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let policyResponse = try? SDLPolicyResponse(serializedBytes: bytes) else {
return nil
}
return .policyReponse(policyResponse)
case .event:
guard let eventVal = buffer.readInteger(as: UInt8.self),
let event = SDLEventType(rawValue: eventVal),
let bytes = buffer.readBytes(length: buffer.readableBytes) else {
SDLLogger.shared.log("[SDLUDPHole] decode error 15")
return nil
}
switch event {
case .natChanged:
guard let natChangedEvent = try? SDLNatChangedEvent(serializedBytes: bytes) else {
SDLLogger.shared.log("[SDLUDPHole] decode error 16")
return nil
}
return .event(.natChanged(natChangedEvent))
case .sendRegister:
guard let sendRegisterEvent = try? SDLSendRegisterEvent(serializedBytes: bytes) else {
SDLLogger.shared.log("[SDLUDPHole] decode error 17")
return nil
}
return .event(.sendRegister(sendRegisterEvent))
case .networkShutdown:
guard let networkShutdownEvent = try? SDLNetworkShutdownEvent(serializedBytes: bytes) else {
SDLLogger.shared.log("[SDLUDPHole] decode error 18")
return nil
}
return .event(.networkShutdown(networkShutdownEvent))
}
default:
SDLLogger.shared.log("SDLUDPHole decode miss type: \(type)")
return nil
}
}
}

View File

@ -1,3 +1,3 @@
#! /bin/sh #! /bin/sh
log stream --predicate 'subsystem == "com.jihe.punchnet"' --info log stream --predicate 'subsystem == "com.jihe.punchnet"' --info --style compact