增加事件逻辑处理

This commit is contained in:
anlicheng 2025-08-01 14:39:33 +08:00
parent 3ce4e05613
commit f273da3b11
3 changed files with 21 additions and 21 deletions

View File

@ -124,7 +124,6 @@ public class SDLContext: @unchecked Sendable {
try await self.handleUDPEvent(event: event) try await self.handleUDPEvent(event: event)
} }
} }
} }
group.addTask { group.addTask {

View File

@ -58,12 +58,15 @@ actor SDLSuperClient {
func start() async throws { func start() async throws {
try await self.asyncChannel.executeThenClose { inbound, outbound in try await self.asyncChannel.executeThenClose { inbound, outbound in
self.inboundContinuation.yield(.ready)
try await withThrowingTaskGroup(of: Void.self) { group in try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { group.addTask {
defer { try await self.asyncChannel.channel.closeFuture.get()
self.inboundContinuation.finish() self.inboundContinuation.yield(.closed)
} self.inboundContinuation.finish()
}
group.addTask {
for try await var packet in inbound { for try await var packet in inbound {
if let message = SDLSuperClientDecoder.decode(buffer: &packet) { if let message = SDLSuperClientDecoder.decode(buffer: &packet) {
SDLLogger.log("[SDLSuperTransport] read message: \(message)", level: .warning) SDLLogger.log("[SDLSuperTransport] read message: \(message)", level: .warning)

View File

@ -19,13 +19,13 @@ struct UDPMessage {
actor SDLUDPHole { actor SDLUDPHole {
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>> private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>
private let (writeStream, continuation) = AsyncStream.makeStream(of: UDPMessage.self, bufferingPolicy: .unbounded) private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: UDPMessage.self, bufferingPolicy: .unbounded)
private var cookieGenerator = SDLIdGenerator(seed: 1) private var cookieGenerator = SDLIdGenerator(seed: 1)
private var promises: [UInt32:EventLoopPromise<SDLStunProbeReply>] = [:] private var promises: [UInt32:EventLoopPromise<SDLStunProbeReply>] = [:]
public var localAddress: SocketAddress? public var localAddress: SocketAddress?
public let (eventFlow, inboundContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded) public let (eventFlow, eventContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded)
// //
enum UDPEvent { enum UDPEvent {
@ -55,12 +55,16 @@ actor SDLUDPHole {
func start() async throws { func start() async throws {
try await self.asyncChannel.executeThenClose { inbound, outbound in try await self.asyncChannel.executeThenClose { inbound, outbound in
self.eventContinuation.yield(.ready)
try await withThrowingTaskGroup(of: Void.self) { group in try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { group.addTask {
defer { try await self.asyncChannel.channel.closeFuture.get()
self.inboundContinuation.finish() self.eventContinuation.yield(.closed)
} self.eventContinuation.finish()
}
group.addTask {
for try await envelope in inbound { for try await envelope in inbound {
var buffer = envelope.data var buffer = envelope.data
let remoteAddress = envelope.remoteAddress let remoteAddress = envelope.remoteAddress
@ -69,12 +73,12 @@ actor SDLUDPHole {
switch message { switch message {
case .data(let data): case .data(let data):
SDLLogger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug) SDLLogger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug)
self.inboundContinuation.yield(.data(data)) self.eventContinuation.yield(.data(data))
case .stunProbeReply(let probeReply): case .stunProbeReply(let probeReply):
// //
await self.trigger(probeReply: probeReply) await self.trigger(probeReply: probeReply)
default: default:
self.inboundContinuation.yield(.message(remoteAddress, message)) self.eventContinuation.yield(.message(remoteAddress, message))
} }
} else { } else {
SDLLogger.log("[SDLUDPHole] decode message, get null", level: .warning) SDLLogger.log("[SDLUDPHole] decode message, get null", level: .warning)
@ -87,7 +91,7 @@ actor SDLUDPHole {
group.addTask { group.addTask {
defer { defer {
self.continuation.finish() self.writeContinuation.finish()
} }
for try await message in self.writeStream { for try await message in self.writeStream {
@ -99,11 +103,6 @@ actor SDLUDPHole {
try await outbound.write(envelope) try await outbound.write(envelope)
} }
} }
//eventFlow.send(.ready)
try await group.waitForAll() try await group.waitForAll()
} }
} }
@ -224,7 +223,7 @@ actor SDLUDPHole {
// //
private func send(remoteAddress: SocketAddress, type: SDLPacketType, data: Data) { private func send(remoteAddress: SocketAddress, type: SDLPacketType, data: Data) {
let message = UDPMessage(remoteAddress: remoteAddress, type: type, data: data) let message = UDPMessage(remoteAddress: remoteAddress, type: type, data: data)
self.continuation.yield(message) self.writeContinuation.yield(message)
} }
//--MARK: //--MARK:
@ -260,5 +259,4 @@ actor SDLUDPHole {
deinit { deinit {
try? self.group.syncShutdownGracefully() try? self.group.syncShutdownGracefully()
} }
} }