actor的最小依赖原则

This commit is contained in:
anlicheng 2026-02-04 13:56:05 +08:00
parent f801344370
commit b1c6b45f35
3 changed files with 51 additions and 65 deletions

View File

@ -52,8 +52,7 @@ actor SDLNATProberActor {
// MARK: - Dependencies
private let udpHole: SDLUDPHole
private let addressArray: [[SocketAddress]]
nonisolated private let addressArray: [[SocketAddress]]
// MARK: - Completion
private var cookieId: UInt32 = 1
@ -62,14 +61,13 @@ actor SDLNATProberActor {
// MARK: - Init
init(udpHole: SDLUDPHole, addressArray: [[SocketAddress]]) {
self.udpHole = udpHole
init(addressArray: [[SocketAddress]]) {
self.addressArray = addressArray
}
// MARK: - Public API
func probeNatType() async -> NatType {
func probeNatType(using udpHole: SDLUDPHole) async -> NatType {
let cookieId = self.cookieId
self.cookieId &+= 1
@ -86,13 +84,13 @@ actor SDLNATProberActor {
)
self.sessions[cookieId] = session
Task {
await self.sendProbe(cookie: cookieId)
await self.sendProbe(using: udpHole, cookie: cookieId)
}
}
}
/// UDP STUN
func handleProbeReply(reply: SDLStunProbeReply) async {
func handleProbeReply(localAddress: SocketAddress?, reply: SDLStunProbeReply) async {
guard let session = self.sessions[reply.cookie] else {
return
}
@ -101,7 +99,6 @@ actor SDLNATProberActor {
// 退nat
if session.replies[1] != nil {
let localAddress = self.udpHole.getLocalAddress()
if await reply.socketAddress() == localAddress {
finish(cookie: session.cookieId, .noNat)
return
@ -160,11 +157,11 @@ actor SDLNATProberActor {
// MARK: - Internal helpers
private func sendProbe(cookie: UInt32) async {
self.udpHole.send(type: .stunProbe, data: makeProbePacket(cookieId: cookie, step: 1, attr: .none), remoteAddress: addressArray[0][0])
self.udpHole.send(type: .stunProbe, data: makeProbePacket(cookieId: cookie, step: 2, attr: .none), remoteAddress: addressArray[1][1])
self.udpHole.send(type: .stunProbe, data: makeProbePacket(cookieId: cookie, step: 3, attr: .peer), remoteAddress: addressArray[0][0])
self.udpHole.send(type: .stunProbe, data: makeProbePacket(cookieId: cookie, step: 4, attr: .port), remoteAddress: addressArray[0][0])
private func sendProbe(using udpHole: SDLUDPHole, cookie: UInt32) async {
udpHole.send(type: .stunProbe, data: makeProbePacket(cookieId: cookie, step: 1, attr: .none), remoteAddress: addressArray[0][0])
udpHole.send(type: .stunProbe, data: makeProbePacket(cookieId: cookie, step: 2, attr: .none), remoteAddress: addressArray[1][1])
udpHole.send(type: .stunProbe, data: makeProbePacket(cookieId: cookie, step: 3, attr: .peer), remoteAddress: addressArray[0][0])
udpHole.send(type: .stunProbe, data: makeProbePacket(cookieId: cookie, step: 4, attr: .port), remoteAddress: addressArray[0][0])
}
private func makeProbePacket(cookieId: UInt32, step: UInt32, attr: SDLProbeAttr) -> Data {

View File

@ -9,20 +9,16 @@ import Foundation
import NIOCore
actor SDLPuncherActor {
nonisolated private let cooldown: Duration = .seconds(5)
// dstMac
private var coolingDown: Set<Data> = []
private let cooldown: Duration = .seconds(5)
private var udpHole: SDLUDPHole
private var pktId: UInt32 = 1
//
private var pendingRequests: [UInt32:RegisterRequest] = [:]
private var pendingRequests: [UInt32: RegisterRequest] = [:]
// holer
private var querySocketAddress: SocketAddress
nonisolated private let(requestStream, requestContinuation) = AsyncStream.makeStream(of: RegisterRequest.self, bufferingPolicy: .unbounded)
nonisolated private let querySocketAddress: SocketAddress
struct RegisterRequest {
let srcMac: Data
@ -30,25 +26,13 @@ actor SDLPuncherActor {
let networkId: UInt32
}
init(udpHole: SDLUDPHole, querySocketAddress: SocketAddress) {
self.udpHole = udpHole
init(querySocketAddress: SocketAddress) {
self.querySocketAddress = querySocketAddress
}
nonisolated func submitRegisterRequest(request: RegisterRequest) {
self.requestContinuation.yield(request)
}
func startConsuming() async {
for await request in self.requestStream {
self.submitRegisterRequest(request: request)
}
}
private func handleRegisterRequest(request: RegisterRequest) {
func submitRegisterRequest(using udpHole: SDLUDPHole?, request: RegisterRequest) {
let dstMac = request.dstMac
guard !coolingDown.contains(dstMac) else {
guard let udpHole, !coolingDown.contains(dstMac) else {
return
}
@ -60,9 +44,9 @@ actor SDLPuncherActor {
if self.pktId == 0 {
self.pktId = 1
}
self.tryHole(using: udpHole, pktId: pktId, request: request)
Task {
await self.tryHole(pktId: pktId, request: request)
//
try? await Task.sleep(for: .seconds(5))
self.endCooldown(for: dstMac)
@ -70,7 +54,7 @@ actor SDLPuncherActor {
}
}
func handlePeerInfo(peerInfo: SDLPeerInfo) async {
func handlePeerInfo(using udpHole: SDLUDPHole, peerInfo: SDLPeerInfo) async {
if let request = pendingRequests.removeValue(forKey: peerInfo.pktID) {
if let remoteAddress = try? await peerInfo.v4Info.socketAddress() {
SDLLogger.shared.log("[SDLContext] hole sock address: \(remoteAddress)", level: .debug)
@ -80,7 +64,7 @@ actor SDLPuncherActor {
register.srcMac = request.srcMac
register.dstMac = request.dstMac
self.udpHole.send(type: .register, data: try! register.serializedData(), remoteAddress: remoteAddress)
udpHole.send(type: .register, data: try! register.serializedData(), remoteAddress: remoteAddress)
} else {
SDLLogger.shared.log("[SDLContext] hole sock address is invalid: \(peerInfo.v4Info)", level: .warning)
}
@ -95,14 +79,14 @@ actor SDLPuncherActor {
self.pendingRequests.removeValue(forKey: pktId)
}
private func tryHole(pktId: UInt32, request: RegisterRequest) async {
private func tryHole(using udpHole: SDLUDPHole, pktId: UInt32, request: RegisterRequest) {
var queryInfo = SDLQueryInfo()
queryInfo.pktID = pktId
queryInfo.dstMac = request.dstMac
self.pendingRequests[pktId] = request
if let queryData = try? queryInfo.serializedData() {
self.udpHole.send(type: .queryInfo, data: queryData, remoteAddress: self.querySocketAddress)
udpHole.send(type: .queryInfo, data: queryData, remoteAddress: self.querySocketAddress)
}
}
}

View File

@ -36,13 +36,13 @@ actor SDLContextActor {
private var udpHoleWorkers: [Task<Void, Never>]?
nonisolated let providerAdapter: SDLTunnelProviderAdapter
var puncherActor: SDLPuncherActor?
// dnsclient
private var dnsClient: SDLDNSClient?
private var dnsWorker: Task<Void, Never>?
nonisolated private let puncherActor: SDLPuncherActor
//
var proberActor: SDLNATProberActor?
nonisolated private let proberActor: SDLNATProberActor
//
private var readTask: Task<(), Never>?
@ -71,6 +71,9 @@ actor SDLContextActor {
self.sessionManager = SessionManager()
self.arpServer = ArpServer(known_macs: [:])
self.providerAdapter = SDLTunnelProviderAdapter(provider: provider)
self.puncherActor = SDLPuncherActor(querySocketAddress: config.stunSocketAddress)
self.proberActor = SDLNATProberActor(addressArray: config.stunProbeSocketAddressArray)
}
public func start() {
@ -165,6 +168,10 @@ actor SDLContextActor {
let udpHole = try SDLUDPHole()
try udpHole.start()
SDLLogger.shared.log("[SDLContext] udpHole started")
// udp
let localAddress = udpHole.getLocalAddress()
self.udpHole = udpHole
await udpHole.channelIsActived()
@ -209,11 +216,11 @@ actor SDLContextActor {
case .registerSuperNak(let registerSuperNak):
await self.handleRegisterSuperNak(nakPacket: registerSuperNak)
case .peerInfo(let peerInfo):
await self.puncherActor?.handlePeerInfo(peerInfo: peerInfo)
await self.puncherActor.handlePeerInfo(using: udpHole, peerInfo: peerInfo)
case .event(let event):
try? await self.handleEvent(event: event)
case .stunProbeReply(let probeReply):
await self.proberActor?.handleProbeReply(reply: probeReply)
await self.proberActor.handleProbeReply(localAddress: localAddress, reply: probeReply)
case .register(let register):
try? await self.handleRegister(remoteAddress: remoteAddress, register: register)
case .registerAck(let registerAck):
@ -255,16 +262,11 @@ actor SDLContextActor {
return
}
self.puncherActor = SDLPuncherActor(udpHole: udpHole, querySocketAddress: config.stunSocketAddress)
self.proberActor = SDLNATProberActor(udpHole: udpHole, addressArray: self.config.stunProbeSocketAddressArray)
// nat
Task.detached {
await Task.yield()
if let natType = await self.proberActor?.probeNatType() {
await self.setNatType(natType: natType)
SDLLogger.shared.log("[SDLContext] nat_type is: \(natType)")
}
Task {
let natType = await self.proberActor.probeNatType(using: udpHole)
self.setNatType(natType: natType)
SDLLogger.shared.log("[SDLContext] nat_type is: \(natType)")
}
//
@ -410,7 +412,7 @@ actor SDLContextActor {
}
}
private func handleData(data: SDLData) throws {
private func handleData(data: SDLData) async throws {
guard let aesKey = self.aesKey else {
return
}
@ -439,7 +441,7 @@ actor SDLContextActor {
case .request:
SDLLogger.shared.log("[SDLContext] get arp request packet", level: .debug)
let response = ARPPacket.arpResponse(for: arpPacket, mac: networkAddr.mac, ip: networkAddr.ip)
self.routeLayerPacket(dstMac: arpPacket.senderMAC, type: .arp, data: response.marshal())
await self.routeLayerPacket(dstMac: arpPacket.senderMAC, type: .arp, data: response.marshal())
case .response:
SDLLogger.shared.log("[SDLContext] get arp response packet", level: .debug)
self.arpServer.append(ip: arpPacket.senderIP, mac: arpPacket.senderMAC)
@ -498,7 +500,7 @@ actor SDLContextActor {
}
//
private func dealPacket(packet: IPPacket) {
private func dealPacket(packet: IPPacket) async {
let networkAddr = self.config.networkAddress
if SDLDNSClient.Helper.isDnsRequestPacket(ipPacket: packet) {
let destIp = packet.header.destination_ip
@ -517,21 +519,21 @@ actor SDLContextActor {
// arpmac
if let dstMac = self.arpServer.query(ip: dstIp) {
self.routeLayerPacket(dstMac: dstMac, type: .ipv4, data: packet.data)
await self.routeLayerPacket(dstMac: dstMac, type: .ipv4, data: packet.data)
}
else {
SDLLogger.shared.log("[SDLContext] dstIp: \(dstIp.asIpAddress()) arp query not found, broadcast", level: .debug)
// arp广
let arpReqeust = ARPPacket.arpRequest(senderIP: networkAddr.ip, senderMAC: networkAddr.mac, targetIP: dstIp)
self.routeLayerPacket(dstMac: ARPPacket.broadcastMac , type: .arp, data: arpReqeust.marshal())
await self.routeLayerPacket(dstMac: ARPPacket.broadcastMac , type: .arp, data: arpReqeust.marshal())
}
}
private func routeLayerPacket(dstMac: Data, type: LayerPacket.PacketType, data: Data) {
private func routeLayerPacket(dstMac: Data, type: LayerPacket.PacketType, data: Data) async {
let networkAddr = self.config.networkAddress
// 2
let layerPacket = LayerPacket(dstMac: dstMac, srcMac: networkAddr.mac, type: type, data: data)
guard let aesKey = self.aesKey, let encodedPacket = try? self.aesCipher.encrypt(aesKey: aesKey, data: layerPacket.marshal()) else {
guard let udpHole = self.udpHole, let aesKey = self.aesKey, let encodedPacket = try? self.aesCipher.encrypt(aesKey: aesKey, data: layerPacket.marshal()) else {
return
}
@ -547,22 +549,25 @@ actor SDLContextActor {
// 广
if ARPPacket.isBroadcastMac(dstMac) {
// super_node
self.udpHole?.send(type: .data, data: data, remoteAddress: self.config.stunSocketAddress)
udpHole.send(type: .data, data: data, remoteAddress: self.config.stunSocketAddress)
}
else {
// session
if let session = self.sessionManager.getSession(toAddress: dstMac) {
SDLLogger.shared.log("[SDLContext] send packet by session: \(session)", level: .debug)
self.udpHole?.send(type: .data, data: data, remoteAddress: session.natAddress)
udpHole.send(type: .data, data: data, remoteAddress: session.natAddress)
self.flowTracer.inc(num: data.count, type: .p2p)
}
else {
// super_node
self.udpHole?.send(type: .data, data: data, remoteAddress: self.config.stunSocketAddress)
udpHole.send(type: .data, data: data, remoteAddress: self.config.stunSocketAddress)
//
self.flowTracer.inc(num: data.count, type: .forward)
//
self.puncherActor?.submitRegisterRequest(request: .init(srcMac: networkAddr.mac, dstMac: dstMac, networkId: networkAddr.networkId))
Task.detached {
await self.puncherActor.submitRegisterRequest(using: udpHole, request: .init(srcMac: networkAddr.mac, dstMac: dstMac, networkId: networkAddr.networkId))
}
}
}
}