This commit is contained in:
anlicheng 2025-08-01 15:40:05 +08:00
parent 26ee512a9a
commit a280a22d3a
3 changed files with 29 additions and 14 deletions

View File

@ -219,7 +219,7 @@ public class SDLContext: @unchecked Sendable {
switch event { switch event {
case .ready: case .ready:
NSLog("[SDLContext] get registerSuper, mac address: \(Self.formatMacAddress(mac: self.devAddr.mac))") NSLog("[SDLContext] get registerSuper, mac address: \(Self.formatMacAddress(mac: self.devAddr.mac))")
guard let message = try await self.superClient?.registerSuper(context: self) else { guard let message = try await self.superClient?.registerSuper(context: self).get() else {
return return
} }
@ -529,7 +529,7 @@ public class SDLContext: @unchecked Sendable {
func holerTask(dstMac: Data) -> Task<(), Never> { func holerTask(dstMac: Data) -> Task<(), Never> {
return Task { return Task {
guard let message = try? await self.superClient?.queryInfo(context: self, dst_mac: dstMac) else { guard let message = try? await self.superClient?.queryInfo(context: self, dst_mac: dstMac).get() else {
return return
} }

View File

@ -117,7 +117,7 @@ actor SDLSuperClient {
self.send(type: .commandAck, packetId: packetId, data: data) self.send(type: .commandAck, packetId: packetId, data: data)
} }
func registerSuper(context ctx: SDLContext) async throws -> SDLSuperInboundMessage { func registerSuper(context ctx: SDLContext) throws -> EventLoopFuture<SDLSuperInboundMessage> {
var registerSuper = SDLRegisterSuper() var registerSuper = SDLRegisterSuper()
registerSuper.version = UInt32(ctx.config.version) registerSuper.version = UInt32(ctx.config.version)
registerSuper.clientID = ctx.config.clientId registerSuper.clientID = ctx.config.clientId
@ -127,15 +127,15 @@ actor SDLSuperClient {
let data = try! registerSuper.serializedData() let data = try! registerSuper.serializedData()
return try await self.write(type: .registerSuper, data: data).get() return self.write(type: .registerSuper, data: data)
} }
// //
func queryInfo(context ctx: SDLContext, dst_mac: Data) async throws -> SDLSuperInboundMessage { func queryInfo(context ctx: SDLContext, dst_mac: Data) async throws -> EventLoopFuture<SDLSuperInboundMessage> {
var queryInfo = SDLQueryInfo() var queryInfo = SDLQueryInfo()
queryInfo.dstMac = dst_mac queryInfo.dstMac = dst_mac
return try await self.write(type: .queryInfo, data: try! queryInfo.serializedData()).get() return self.write(type: .queryInfo, data: try! queryInfo.serializedData())
} }
func unregister(context ctx: SDLContext) throws { func unregister(context ctx: SDLContext) throws {
@ -155,17 +155,18 @@ actor SDLSuperClient {
self.send(type: .flowTracer, packetId: 0, data: try! flow.serializedData()) self.send(type: .flowTracer, packetId: 0, data: try! flow.serializedData())
} }
func write(type: SDLPacketType, data: Data) -> EventLoopFuture<SDLSuperInboundMessage> { private func write(type: SDLPacketType, data: Data) -> EventLoopFuture<SDLSuperInboundMessage> {
SDLLogger.log("[SDLSuperTransport] will write data: \(data)", level: .debug) SDLLogger.log("[SDLSuperTransport] will write data: \(data)", level: .debug)
let packetId = idGenerator.nextId() let packetId = idGenerator.nextId()
let promise = self.asyncChannel.channel.eventLoop.makePromise(of: SDLSuperInboundMessage.self) let promise = self.asyncChannel.channel.eventLoop.makePromise(of: SDLSuperInboundMessage.self)
self.callbackPromises[packetId] = promise self.callbackPromises[packetId] = promise
self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data)) self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data))
return promise.futureResult return promise.futureResult
} }
func send(type: SDLPacketType, packetId: UInt32, data: Data) { private func send(type: SDLPacketType, packetId: UInt32, data: Data) {
self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data)) self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data))
} }

View File

@ -9,12 +9,6 @@ import Foundation
import NIOCore import NIOCore
import NIOPosix import NIOPosix
struct UDPMessage {
let remoteAddress: SocketAddress
let type: SDLPacketType
let data: Data
}
// sn-server // sn-server
actor SDLUDPHole { actor SDLUDPHole {
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
@ -27,6 +21,15 @@ actor SDLUDPHole {
public let (eventFlow, eventContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded) public let (eventFlow, eventContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded)
//
private var isClosed: Bool = true
struct UDPMessage {
let remoteAddress: SocketAddress
let type: SDLPacketType
let data: Data
}
// //
enum UDPEvent { enum UDPEvent {
case ready case ready
@ -55,10 +58,12 @@ actor SDLUDPHole {
func start() async throws { func start() async throws {
try await self.asyncChannel.executeThenClose { inbound, outbound in try await self.asyncChannel.executeThenClose { inbound, outbound in
self.eventContinuation.yield(.ready) self.eventContinuation.yield(.ready)
self.closeChannel(closed: false)
try await withThrowingTaskGroup(of: Void.self) { group in try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { group.addTask {
try await self.asyncChannel.channel.closeFuture.get() try await self.asyncChannel.channel.closeFuture.get()
await self.closeChannel(closed: true)
self.eventContinuation.finish() self.eventContinuation.finish()
} }
@ -101,6 +106,7 @@ actor SDLUDPHole {
try await outbound.write(envelope) try await outbound.write(envelope)
} }
} }
try await group.waitForAll() try await group.waitForAll()
} }
} }
@ -157,6 +163,10 @@ actor SDLUDPHole {
} }
} }
private func closeChannel(closed: Bool) {
self.isClosed = closed
}
// MARK: client-client apis // MARK: client-client apis
// session // session
@ -220,6 +230,10 @@ actor SDLUDPHole {
// //
private func send(remoteAddress: SocketAddress, type: SDLPacketType, data: Data) { private func send(remoteAddress: SocketAddress, type: SDLPacketType, data: Data) {
guard !self.isClosed else {
return
}
let message = UDPMessage(remoteAddress: remoteAddress, type: type, data: data) let message = UDPMessage(remoteAddress: remoteAddress, type: type, data: data)
self.writeContinuation.yield(message) self.writeContinuation.yield(message)
} }