fix资源的清理逻辑
This commit is contained in:
parent
3a6b04aa9b
commit
7ed993a775
@ -305,14 +305,8 @@ actor SDLContextActor {
|
|||||||
|
|
||||||
// 启动udp服务器
|
// 启动udp服务器
|
||||||
let udpHole = try SDLUDPHole()
|
let udpHole = try SDLUDPHole()
|
||||||
try udpHole.start()
|
let localAddress = try udpHole.start()
|
||||||
SDLLogger.log("[SDLContext] udpHole started")
|
SDLLogger.log("[SDLContext] udpHole started, on address: \(localAddress.debugDescription)")
|
||||||
|
|
||||||
// 获取当前udp启动的地址
|
|
||||||
let localAddress = udpHole.getLocalAddress()
|
|
||||||
|
|
||||||
// 阻塞等待udpHole是准备好的状态
|
|
||||||
await udpHole.channelIsActived()
|
|
||||||
|
|
||||||
// 处理心跳逻辑
|
// 处理心跳逻辑
|
||||||
let pingTask = Task.detached {
|
let pingTask = Task.detached {
|
||||||
@ -370,25 +364,34 @@ actor SDLContextActor {
|
|||||||
public func stop() async {
|
public func stop() async {
|
||||||
self.resumeReadyWaiters(.failure(CancellationError()))
|
self.resumeReadyWaiters(.failure(CancellationError()))
|
||||||
self.readyState = .stopped
|
self.readyState = .stopped
|
||||||
|
self.state = .unregistered
|
||||||
|
|
||||||
await self.supervisor.stop()
|
await self.supervisor.stop()
|
||||||
|
|
||||||
self.udpHoleWorkers?.forEach { $0.cancel() }
|
self.udpHoleWorkers?.forEach { $0.cancel() }
|
||||||
self.udpHoleWorkers = nil
|
self.udpHoleWorkers = nil
|
||||||
|
self.udpHole?.stop()
|
||||||
|
self.udpHole = nil
|
||||||
|
|
||||||
self.quicWorker?.cancel()
|
self.quicWorker?.cancel()
|
||||||
self.quicWorker = nil
|
self.quicWorker = nil
|
||||||
|
self.quicClient?.stop()
|
||||||
|
self.quicClient = nil
|
||||||
|
|
||||||
self.dnsClient?.stop()
|
self.dnsClient?.stop()
|
||||||
self.dnsWorker?.cancel()
|
self.dnsWorker?.cancel()
|
||||||
self.dnsWorker = nil
|
self.dnsWorker = nil
|
||||||
|
self.dnsClient = nil
|
||||||
|
|
||||||
self.dnsLocalClient?.stop()
|
self.dnsLocalClient?.stop()
|
||||||
self.dnsLocalWorker?.cancel()
|
self.dnsLocalWorker?.cancel()
|
||||||
self.dnsLocalWorker = nil
|
self.dnsLocalWorker = nil
|
||||||
|
self.dnsLocalClient = nil
|
||||||
|
|
||||||
|
self.monitor?.stop()
|
||||||
self.monitorWorker?.cancel()
|
self.monitorWorker?.cancel()
|
||||||
self.monitorWorker = nil
|
self.monitorWorker = nil
|
||||||
|
self.monitor = nil
|
||||||
|
|
||||||
self.readTask?.cancel()
|
self.readTask?.cancel()
|
||||||
self.readTask = nil
|
self.readTask = nil
|
||||||
@ -398,6 +401,11 @@ actor SDLContextActor {
|
|||||||
|
|
||||||
self.updatePolicyTask?.cancel()
|
self.updatePolicyTask?.cancel()
|
||||||
self.updatePolicyTask = nil
|
self.updatePolicyTask = nil
|
||||||
|
|
||||||
|
self.noticeClient?.stop()
|
||||||
|
self.noticeClient = nil
|
||||||
|
self.sessionToken = nil
|
||||||
|
self.dataCipher = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
private func setNatType(natType: SDLNATProberActor.NatType) {
|
private func setNatType(natType: SDLNATProberActor.NatType) {
|
||||||
|
|||||||
@ -15,6 +15,7 @@ class SDLNetworkMonitor: @unchecked Sendable {
|
|||||||
private var interfaceType: NWInterface.InterfaceType?
|
private var interfaceType: NWInterface.InterfaceType?
|
||||||
private let publisher = PassthroughSubject<NWInterface.InterfaceType, Never>()
|
private let publisher = PassthroughSubject<NWInterface.InterfaceType, Never>()
|
||||||
private var cancel: AnyCancellable?
|
private var cancel: AnyCancellable?
|
||||||
|
private var isStopped = false
|
||||||
|
|
||||||
public let eventStream: AsyncStream<MonitorEvent>
|
public let eventStream: AsyncStream<MonitorEvent>
|
||||||
private let eventContinuation: AsyncStream<MonitorEvent>.Continuation
|
private let eventContinuation: AsyncStream<MonitorEvent>.Continuation
|
||||||
@ -55,10 +56,19 @@ class SDLNetworkMonitor: @unchecked Sendable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
deinit {
|
func stop() {
|
||||||
|
guard !self.isStopped else {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
self.isStopped = true
|
||||||
self.monitor.cancel()
|
self.monitor.cancel()
|
||||||
self.cancel?.cancel()
|
self.cancel?.cancel()
|
||||||
self.eventContinuation.finish()
|
self.eventContinuation.finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
deinit {
|
||||||
|
self.stop()
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -26,6 +26,7 @@ final class SDLNoticeClient {
|
|||||||
|
|
||||||
private var channel: Channel
|
private var channel: Channel
|
||||||
private let noticePort: Int
|
private let noticePort: Int
|
||||||
|
private var isStopped = false
|
||||||
|
|
||||||
// 启动函数
|
// 启动函数
|
||||||
init(noticePort: Int) throws {
|
init(noticePort: Int) throws {
|
||||||
@ -71,9 +72,20 @@ final class SDLNoticeClient {
|
|||||||
try await self.channel.closeFuture.get()
|
try await self.channel.closeFuture.get()
|
||||||
}
|
}
|
||||||
|
|
||||||
deinit {
|
func stop() {
|
||||||
|
guard !self.isStopped else {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
self.isStopped = true
|
||||||
self.writeContinuation.finish()
|
self.writeContinuation.finish()
|
||||||
self.task?.cancel()
|
self.task?.cancel()
|
||||||
|
self.task = nil
|
||||||
|
self.channel.close(promise: nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
deinit {
|
||||||
|
self.stop()
|
||||||
try? self.group.syncShutdownGracefully()
|
try? self.group.syncShutdownGracefully()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -21,15 +21,15 @@ final class SDLUDPHole: ChannelInboundHandler {
|
|||||||
private let messageContinuation: AsyncStream<(SocketAddress, SDLHoleMessage)>.Continuation
|
private let messageContinuation: AsyncStream<(SocketAddress, SDLHoleMessage)>.Continuation
|
||||||
|
|
||||||
// 解决channelready的问题
|
// 解决channelready的问题
|
||||||
private var cont: CheckedContinuation<Void, Never>?
|
|
||||||
private var isReady: Bool = false
|
private var isReady: Bool = false
|
||||||
|
private var isStopped: Bool = false
|
||||||
|
|
||||||
// 启动函数
|
// 启动函数
|
||||||
init() throws {
|
init() throws {
|
||||||
(self.messageStream, self.messageContinuation) = AsyncStream.makeStream(of: (SocketAddress, SDLHoleMessage).self, bufferingPolicy: .unbounded)
|
(self.messageStream, self.messageContinuation) = AsyncStream.makeStream(of: (SocketAddress, SDLHoleMessage).self, bufferingPolicy: .unbounded)
|
||||||
}
|
}
|
||||||
|
|
||||||
func start() throws {
|
func start() throws -> SocketAddress? {
|
||||||
let bootstrap = DatagramBootstrap(group: group)
|
let bootstrap = DatagramBootstrap(group: group)
|
||||||
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
||||||
.channelInitializer { channel in
|
.channelInitializer { channel in
|
||||||
@ -38,31 +38,28 @@ final class SDLUDPHole: ChannelInboundHandler {
|
|||||||
|
|
||||||
let channel = try bootstrap.bind(host: "0.0.0.0", port: 0).wait()
|
let channel = try bootstrap.bind(host: "0.0.0.0", port: 0).wait()
|
||||||
self.channel = channel
|
self.channel = channel
|
||||||
}
|
|
||||||
|
return channel.localAddress
|
||||||
func channelIsActived() async {
|
|
||||||
await withCheckedContinuation { c in
|
|
||||||
if isReady {
|
|
||||||
c.resume()
|
|
||||||
} else {
|
|
||||||
self.cont = c
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitClose() async throws {
|
func waitClose() async throws {
|
||||||
try await self.channel?.closeFuture.get()
|
try await self.channel?.closeFuture.get()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func stop() {
|
||||||
|
guard !self.isStopped else {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
self.isStopped = true
|
||||||
|
self.messageContinuation.finish()
|
||||||
|
self.channel?.close(promise: nil)
|
||||||
|
}
|
||||||
|
|
||||||
// --MARK: ChannelInboundHandler delegate
|
// --MARK: ChannelInboundHandler delegate
|
||||||
|
|
||||||
func channelActive(context: ChannelHandlerContext) {
|
func channelActive(context: ChannelHandlerContext) {
|
||||||
guard !isReady else {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
self.isReady = true
|
self.isReady = true
|
||||||
self.cont?.resume()
|
|
||||||
self.cont = nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||||
@ -95,10 +92,6 @@ final class SDLUDPHole: ChannelInboundHandler {
|
|||||||
context.close(promise: nil)
|
context.close(promise: nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getLocalAddress() -> SocketAddress? {
|
|
||||||
return self.channel?.localAddress
|
|
||||||
}
|
|
||||||
|
|
||||||
// MARK: 处理写入逻辑
|
// MARK: 处理写入逻辑
|
||||||
func send(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) {
|
func send(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) {
|
||||||
guard let channel = self.channel else {
|
guard let channel = self.channel else {
|
||||||
@ -162,8 +155,8 @@ final class SDLUDPHole: ChannelInboundHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
deinit {
|
deinit {
|
||||||
|
self.stop()
|
||||||
try? self.group.syncShutdownGracefully()
|
try? self.group.syncShutdownGracefully()
|
||||||
self.channel?.close(promise: nil)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user