fix actor

This commit is contained in:
anlicheng 2025-08-01 12:15:46 +08:00
parent 1f629a58a0
commit 9a76b61ff0

View File

@ -47,8 +47,8 @@ public class SDLContext: @unchecked Sendable {
let rsaCipher: RSACipher let rsaCipher: RSACipher
// //
var udpHoleActor: SDLUDPHole? var udpHole: SDLUDPHole?
var superClientActor: SDLSuperClient? var superClient: SDLSuperClient?
// //
private var readTask: Task<(), Never>? private var readTask: Task<(), Never>?
@ -93,17 +93,17 @@ public class SDLContext: @unchecked Sendable {
} }
public func start() async throws { public func start() async throws {
self.udpHoleActor = try await SDLUDPHole() self.udpHole = try await SDLUDPHole()
self.superClientActor = try await SDLSuperClient(host: self.config.superHost, port: self.config.superPort) self.superClient = try await SDLSuperClient(host: self.config.superHost, port: self.config.superPort)
self.noticeClient = try await SDLNoticeClient() self.noticeClient = try await SDLNoticeClient()
try await withThrowingTaskGroup(of: Void.self) { group in try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { group.addTask {
try await self.udpHoleActor?.start() try await self.udpHole?.start()
} }
group.addTask { group.addTask {
try await self.superClientActor?.start() try await self.superClient?.start()
} }
group.addTask { group.addTask {
@ -111,7 +111,7 @@ public class SDLContext: @unchecked Sendable {
} }
group.addTask { group.addTask {
if let eventFlow = self.superClientActor?.eventFlow { if let eventFlow = self.superClient?.eventFlow {
for try await event in eventFlow { for try await event in eventFlow {
try await self.handleSuperEvent(event: event) try await self.handleSuperEvent(event: event)
} }
@ -119,7 +119,7 @@ public class SDLContext: @unchecked Sendable {
} }
group.addTask { group.addTask {
if let eventFlow = self.udpHoleActor?.eventFlow { if let eventFlow = self.udpHole?.eventFlow {
for try await event in eventFlow { for try await event in eventFlow {
try await self.handleUDPEvent(event: event) try await self.handleUDPEvent(event: event)
} }
@ -130,32 +130,32 @@ public class SDLContext: @unchecked Sendable {
group.addTask { group.addTask {
while !Task.isCancelled { while !Task.isCancelled {
try await Task.sleep(nanoseconds: 5 * 1_000_000_000) try await Task.sleep(nanoseconds: 5 * 1_000_000_000)
self.lastCookie = await self.udpHoleActor?.stunRequest(context: self) self.lastCookie = await self.udpHole?.stunRequest(context: self)
} }
} }
try await group.waitForAll() try await group.waitForAll()
} }
// // //
// self.monitorCancel = self.monitor.eventFlow.sink { event in self.monitorCancel = self.monitor.eventFlow.sink { event in
// switch event { switch event {
// case .changed: case .changed:
// // nat // nat
// Task { Task {
// self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config) self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config)
// NSLog("didNetworkPathChanged, nat type is: \(self.natType)") NSLog("didNetworkPathChanged, nat type is: \(self.natType)")
// } }
// case .unreachable: case .unreachable:
// NSLog("didNetworkPathUnreachable") NSLog("didNetworkPathUnreachable")
// } }
// } }
// self.monitor.start() self.monitor.start()
} }
public func stop() async { public func stop() async {
self.superClientActor = nil self.superClient = nil
self.udpHoleActor = nil self.udpHole = nil
self.readTask?.cancel() self.readTask?.cancel()
} }
@ -164,7 +164,7 @@ public class SDLContext: @unchecked Sendable {
switch event { switch event {
case .ready: case .ready:
NSLog("[SDLContext] get registerSuper, mac address: \(Self.formatMacAddress(mac: self.devAddr.mac))") NSLog("[SDLContext] get registerSuper, mac address: \(Self.formatMacAddress(mac: self.devAddr.mac))")
guard let message = try await self.superClientActor?.registerSuper(context: self) else { guard let message = try await self.superClient?.registerSuper(context: self) else {
return return
} }
@ -231,7 +231,7 @@ public class SDLContext: @unchecked Sendable {
let address = SDLUtil.int32ToIp(sendRegisterEvent.natIp) let address = SDLUtil.int32ToIp(sendRegisterEvent.natIp)
if let remoteAddress = try? SocketAddress.makeAddressResolvingHost(address, port: Int(sendRegisterEvent.natPort)) { if let remoteAddress = try? SocketAddress.makeAddressResolvingHost(address, port: Int(sendRegisterEvent.natPort)) {
// register // register
await self.udpHoleActor?.sendRegister(context: self, remoteAddress: remoteAddress, dst_mac: sendRegisterEvent.dstMac) await self.udpHole?.sendRegister(context: self, remoteAddress: remoteAddress, dst_mac: sendRegisterEvent.dstMac)
} }
case .networkShutdown(let shutdownEvent): case .networkShutdown(let shutdownEvent):
@ -254,7 +254,7 @@ public class SDLContext: @unchecked Sendable {
var commandAck = SDLCommandAck() var commandAck = SDLCommandAck()
commandAck.status = true commandAck.status = true
await self.superClientActor?.commandAck(packetId: packetId, ack: commandAck) await self.superClient?.commandAck(packetId: packetId, ack: commandAck)
} }
} }
@ -294,7 +294,7 @@ public class SDLContext: @unchecked Sendable {
// tun, // tun,
if register.dstMac == self.devAddr.mac && register.networkID == self.devAddr.networkID { if register.dstMac == self.devAddr.mac && register.networkID == self.devAddr.networkID {
// ack // ack
await self.udpHoleActor?.sendRegisterAck(context: self, remoteAddress: remoteAddress, dst_mac: register.srcMac) await self.udpHole?.sendRegisterAck(context: self, remoteAddress: remoteAddress, dst_mac: register.srcMac)
// , super-nodenatudpnat // , super-nodenatudpnat
let session = Session(dstMac: register.srcMac, natAddress: remoteAddress) let session = Session(dstMac: register.srcMac, natAddress: remoteAddress)
await self.sessionManager.addSession(session: session) await self.sessionManager.addSession(session: session)
@ -383,7 +383,7 @@ public class SDLContext: @unchecked Sendable {
.sink { _ in .sink { _ in
Task { Task {
let (forwardNum, p2pNum, inboundNum) = await self.flowTracer.reset() let (forwardNum, p2pNum, inboundNum) = await self.flowTracer.reset()
await self.superClientActor?.flowReport(forwardNum: forwardNum, p2pNum: p2pNum, inboundNum: inboundNum) await self.superClient?.flowReport(forwardNum: forwardNum, p2pNum: p2pNum, inboundNum: inboundNum)
} }
} }
} }
@ -483,13 +483,13 @@ public class SDLContext: @unchecked Sendable {
// session // session
if let session = await self.sessionManager.getSession(toAddress: dstMac) { if let session = await self.sessionManager.getSession(toAddress: dstMac) {
NSLog("[SDLContext] send packet by session: \(session)") NSLog("[SDLContext] send packet by session: \(session)")
await self.udpHoleActor?.sendPacket(context: self, session: session, data: encodedPacket) await self.udpHole?.sendPacket(context: self, session: session, data: encodedPacket)
await self.flowTracer.inc(num: data.count, type: .p2p) await self.flowTracer.inc(num: data.count, type: .p2p)
} }
else { else {
// super_node // super_node
await self.udpHoleActor?.forwardPacket(context: self, dst_mac: dstMac, data: encodedPacket) await self.udpHole?.forwardPacket(context: self, dst_mac: dstMac, data: encodedPacket)
// //
await self.flowTracer.inc(num: data.count, type: .forward) await self.flowTracer.inc(num: data.count, type: .forward)
@ -502,7 +502,7 @@ public class SDLContext: @unchecked Sendable {
func holerTask(dstMac: Data) -> Task<(), Never> { func holerTask(dstMac: Data) -> Task<(), Never> {
return Task { return Task {
guard let message = try? await self.superClientActor?.queryInfo(context: self, dst_mac: dstMac) else { guard let message = try? await self.superClient?.queryInfo(context: self, dst_mac: dstMac) else {
return return
} }
@ -513,7 +513,7 @@ public class SDLContext: @unchecked Sendable {
if let remoteAddress = peerInfo.v4Info.socketAddress() { if let remoteAddress = peerInfo.v4Info.socketAddress() {
SDLLogger.log("[SDLContext] hole sock address: \(remoteAddress)", level: .warning) SDLLogger.log("[SDLContext] hole sock address: \(remoteAddress)", level: .warning)
// register // register
await self.udpHoleActor?.sendRegister(context: self, remoteAddress: remoteAddress, dst_mac: dstMac) await self.udpHole?.sendRegister(context: self, remoteAddress: remoteAddress, dst_mac: dstMac)
} else { } else {
SDLLogger.log("[SDLContext] hole sock address is invalid: \(peerInfo.v4Info)", level: .warning) SDLLogger.log("[SDLContext] hole sock address is invalid: \(peerInfo.v4Info)", level: .warning)
} }
@ -525,8 +525,8 @@ public class SDLContext: @unchecked Sendable {
deinit { deinit {
self.stunCancel?.cancel() self.stunCancel?.cancel()
self.udpHoleActor = nil self.udpHole = nil
self.superClientActor = nil self.superClient = nil
} }
} }