fix context

This commit is contained in:
anlicheng 2026-01-29 00:17:23 +08:00
parent ce0f3fa29d
commit d15240a3a7
3 changed files with 60 additions and 98 deletions

View File

@ -22,13 +22,13 @@ import NIOPosix
actor SDLNoticeClientActor {
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private var channel: Channel?
private let remoteAddress: SocketAddress
private let logger: SDLLogger
private let noticePort: Int
//
init(noticePort: Int, logger: SDLLogger) throws {
self.logger = logger
self.remoteAddress = try! SocketAddress(ipAddress: "127.0.0.1", port: noticePort)
self.noticePort = noticePort
}
func start() throws {
@ -48,10 +48,12 @@ actor SDLNoticeClientActor {
return
}
let buf = channel.allocator.buffer(bytes: data)
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: self.remoteAddress, data: buf)
channel.eventLoop.execute {
channel.writeAndFlush(envelope, promise: nil)
if let remoteAddress = try? SocketAddress(ipAddress: "127.0.0.1", port: noticePort) {
let buf = channel.allocator.buffer(bytes: data)
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: remoteAddress, data: buf)
channel.eventLoop.execute {
channel.writeAndFlush(envelope, promise: nil)
}
}
}

View File

@ -72,7 +72,6 @@ public class SDLContext {
private var flowTracerCancel: AnyCancellable?
private let logger: SDLLogger
private var rootTask: Task<Void, Error>?
public init(provider: NEPacketTunnelProvider, config: SDLConfiguration, rsaCipher: RSACipher, aesCipher: AESCipher, logger: SDLLogger) {
self.logger = logger
@ -88,72 +87,29 @@ public class SDLContext {
}
public func start() async throws {
self.rootTask = Task {
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
while !Task.isCancelled {
do {
try await self.startDnsClient()
} catch let err {
self.logger.log("[SDLContext] UDPHole get err: \(err)", level: .warning)
try await Task.sleep(for: .seconds(2))
}
}
}
group.addTask {
while !Task.isCancelled {
do {
try await self.startUDPHole()
} catch let err {
self.logger.log("[SDLContext] UDPHole get err: \(err)", level: .warning)
try await Task.sleep(for: .seconds(2))
}
}
}
// group.addTask {
// await self.startMonitor()
// }
// group.addTask {
// while !Task.isCancelled {
// do {
// try await self.startNoticeClient()
// } catch let err {
// self.logger.log("[SDLContext] noticeClient get err: \(err)", level: .warning)
// try await Task.sleep(for: .seconds(2))
// }
// }
// }
try await group.waitForAll()
}
}
try await self.rootTask?.value
}
public func stop() async {
self.rootTask?.cancel()
self.udpHoleActor = nil
self.noticeClientActor = nil
self.readTask?.cancel()
}
private func startNoticeClient() async throws {
self.noticeClientActor = try SDLNoticeClientActor(noticePort: self.config.noticePort, logger: self.logger)
try await self.noticeClientActor?.start()
self.logger.log("[SDLContext] notice_client task cancel", level: .warning)
}
private func startUDPHole() async throws {
// udp
self.udpHoleActor = try SDLUDPHoleActor(logger: self.logger)
try await self.udpHoleActor?.start()
self.logger.log("[SDLContext] udpHole started")
// dns
let dnsSocketAddress = try SocketAddress.makeAddressResolvingHost(self.config.remoteDnsServer, port: 15353)
self.dnsClientActor = try await SDLDNSClientActor(dnsServerAddress: dnsSocketAddress, logger: self.logger)
try await self.dnsClientActor?.start()
self.logger.log("[SDLContext] dnsClient started")
// noticeClient
self.noticeClientActor = try SDLNoticeClientActor(noticePort: self.config.noticePort, logger: self.logger)
try await self.noticeClientActor?.start()
self.logger.log("[SDLContext] noticeClient started")
// monitor
self.monitor = SDLNetworkMonitor()
self.monitor?.start()
self.logger.log("[SDLContext] monitor started")
try await withThrowingTaskGroup(of: Void.self) { group in
// UDP
group.addTask {
while true {
try Task.checkCancellation()
@ -172,40 +128,42 @@ public class SDLContext {
}
}
// DNS
group.addTask {
if let packetFlow = self.dnsClientActor?.packetFlow {
for await packet in packetFlow {
let nePacket = NEPacket(data: packet, protocolFamily: 2)
await self.providerActor.writePackets(packets: [nePacket])
}
}
}
// Monitor
group.addTask {
for await event in self.monitor!.eventStream {
switch event {
case .changed:
// nat
//self.natType = await self.getNatType()
self.logger.log("didNetworkPathChanged, nat type is: \(self.natType)", level: .info)
case .unreachable:
self.logger.log("didNetworkPathUnreachable", level: .warning)
}
}
}
if let _ = try await group.next() {
self.logger.log("[SDLContext] taskGroup cancel")
group.cancelAll()
}
}
}
private func startMonitor() async {
self.monitor = SDLNetworkMonitor()
for await event in self.monitor!.eventStream {
switch event {
case .changed:
// nat
//self.natType = await self.getNatType()
self.logger.log("didNetworkPathChanged, nat type is: \(self.natType)", level: .info)
case .unreachable:
self.logger.log("didNetworkPathUnreachable", level: .warning)
}
}
}
private func startDnsClient() async throws {
let remoteDnsServer = config.remoteDnsServer
let dnsSocketAddress = try SocketAddress.makeAddressResolvingHost(remoteDnsServer, port: 15353)
self.dnsClientActor = try await SDLDNSClientActor(dnsServerAddress: dnsSocketAddress, logger: self.logger)
try await self.dnsClientActor?.start()
if let packetFlow = self.dnsClientActor?.packetFlow {
for await packet in packetFlow {
let nePacket = NEPacket(data: packet, protocolFamily: 2)
await self.providerActor.writePackets(packets: [nePacket])
}
}
public func stop() async {
self.udpHoleActor = nil
self.noticeClientActor = nil
self.readTask?.cancel()
}
private func handleUDPHoleReady() async throws {
@ -532,7 +490,6 @@ public class SDLContext {
}
deinit {
self.rootTask?.cancel()
self.udpHoleActor = nil
self.dnsClientActor = nil
}

3
tracelog.sh Executable file
View File

@ -0,0 +1,3 @@
#! /bin/sh
log stream --predicate 'subsystem == "com.jihe.punchnet"' --info