// // SDLContext.swift // Tun // // Created by 安礼成 on 2024/2/29. // import Foundation import NetworkExtension import NIOCore // 上下文环境变量,全局共享 /* 1. 处理rsa的加解密逻辑 */ actor SDLContextActor { enum ReadyState { case idle case starting case ready case failed(any Error) case stopped } private enum UDPHoleKind: Equatable { case v4 case v6 } private var readyState: ReadyState = .idle private var readyWaiters: [CheckedContinuation] = [] var config: SDLConfiguration // nat的网络类型 var natType: SDLNATProberActor.NatType = .blocked // AES加密,授权通过后,对象才会被创建 private var dataCipher: CCDataCipher? // session token private var sessionToken: Data? // rsa的相关配置, public_key是本地生成的 // 加密算法相关 nonisolated let rsaCipher: RSACipher // 依赖的变量 private var udpHole: SDLUDPHole? private var udpHoleWorkers: [Task]? private var udpHoleLocalAddress: SocketAddress? private var udpHoleV6: SDLUDPHoleV6? private var udpHoleV6Workers: [Task]? private var udpHoleV6LocalAddress: SocketAddress? // dns的client对象 private var dnsClient: DNSCloudClient? private var dnsWorker: Task? // Localdns的client对象 private var dnsLocalClient: DNSLocalClient? private var dnsLocalWorker: Task? private var quicClient: SDLQUICClient? private var quicWorker: Task? nonisolated private let puncherActor: SDLPuncherActor // 网络探测对象 nonisolated private let proberActor: SDLNATProberActor // 数据包读取任务 private var readTask: Task<(), Never>? nonisolated private let sessionManager = SessionManager() nonisolated private let arpServer: ArpServer // 网络状态变化的健康 private var monitor: SDLNetworkMonitor? private var monitorWorker: Task? // 内部socket通讯 // 改为基于 App Group + Darwin Notification 的通知 // 流量统计 nonisolated private let flowTracer = SDLFlowTracer() // 处理内部的需要长时间运行的任务 private var supervisor = SDLSupervisor() nonisolated private let provider: NEPacketTunnelProvider // 处理权限控制 private let identifyStore: IdentityStore private var updatePolicyTask: Task? private let snapshotPublisher: SnapshotPublisher // Flow流会话管理, 过期时间为: 180秒 private let flowSessionManager = SDLFlowSessionManager(sessionTimeout: 180) // 注册任务 private var registerTask: Task? private let superRegistrationStateMachine = SDLSuperRegistrationStateMachine() public init(provider: NEPacketTunnelProvider, config: SDLConfiguration, rsaCipher: RSACipher) { self.provider = provider self.config = config self.rsaCipher = rsaCipher self.puncherActor = SDLPuncherActor() self.proberActor = SDLNATProberActor(addressArray: config.stunProbeSocketAddressArray) self.arpServer = ArpServer() // 权限控制 let snapshotPublisher = SnapshotPublisher(initial: IdentitySnapshot.empty()) self.identifyStore = IdentityStore(publisher: snapshotPublisher) self.snapshotPublisher = snapshotPublisher } public func start() async { guard case .idle = self.readyState else { return } self.readyState = .starting self.prepareTunnelNotifier() self.startMonitor() // 启动arp的定时清理任务 await self.puncherActor.start() await self.arpServer.start() await self.startDnsClient() await self.startDnsLocalClient() await self.supervisor.addWorker(name: "quicClient") { SDLLogger.log("[SDLContext] try start quicClient", for: .debug) let quicClient = try await self.startQUICClient() SDLLogger.log("[SDLContext] quicClient running!!!!") await quicClient.waitClose() SDLLogger.log("[SDLContext] quicClient closed!!!!") } await self.supervisor.addWorker(name: "udpHole") { let udpHole = try await self.startUDPHole() SDLLogger.log("[SDLContext] udp running!!!!") try await udpHole.waitClose() SDLLogger.log("[SDLContext] udp closed!!!!") } await self.supervisor.addWorker(name: "udpHoleV6") { let udpHoleV6 = try await self.startUDPHoleV6() SDLLogger.log("[SDLContext] udp v6 running!!!!") try await udpHoleV6.waitClose() SDLLogger.log("[SDLContext] udp v6 closed!!!!") } } public func waitForReady() async throws { switch self.readyState { case .ready: return case .failed(let error): throw error case .stopped: throw CancellationError() case .idle, .starting: try await withCheckedThrowingContinuation { continuation in self.readyWaiters.append(continuation) } } } // 取消出口节点的时候,ip地址为: 0.0.0.0 public func updateExitNode(exitNodeIp: String) async throws { if let ip = SDLUtil.ipv4StrToInt32(exitNodeIp), ip > 0 { self.config.exitNode = .init(exitNodeIp: ip) } else { self.config.exitNode = nil } try await self.setNetworkSettings(config: config, dnsServer: DNSHelper.dnsServer) } private func startQUICClient() async throws -> SDLQUICClient { self.quicWorker?.cancel() self.quicClient?.stop() // 启动monitor let quicClient = SDLQUICClient(host: self.config.serverHost, port: 443) quicClient.start() // 等待quic准备好 try await quicClient.waitReady() // 这里必须等待quic的协商完成 try await Task.sleep(for: .seconds(0.2)) SDLLogger.log("[SDLContext] start quic client: \(self.config.serverHost)") self.quicWorker = Task.detached { for await message in quicClient.messageStream { switch message { case .welcome(let welcome): SDLLogger.log("[SDLContext] quic welcome: \(welcome)") // 注册 await self.startRegisterLoop() case .pong: //SDLLogger.shared.log("[SDLContext] quic pong") () case .registerSuperAck(let registerSuperAck): await self.handleRegisterSuperAck(registerSuperAck: registerSuperAck) case .registerSuperNak(let registerSuperNak): await self.handleRegisterSuperNak(nakPacket: registerSuperNak) case .peerInfo(let peerInfo): //SDLLogger.shared.log("[SDLContext] peer message: \(peerInfo)") await self.puncherActor.handlePeerInfo(using: self.udpHole, udpHoleV6: self.udpHoleV6, peerInfo: peerInfo) case .event(let event): await self.handleEvent(event: event) case .policyReponse(let policyResponse): // 处理权限的请求问题 await self.identifyStore.applyPolicyResponse(policyResponse) case .arpResponse(let arpResponse): //SDLLogger.shared.log("[SDLContext] get arp response: \(arpResponse)") await self.arpServer.handleArpResponse(arpResponse: arpResponse) } } } self.quicClient = quicClient return quicClient } private func prepareTunnelNotifier() { // 启动noticeClient // 旧的 UDP NoticeClient 已移除,改为初始化基于 App Group 的通知通道。 SDLTunnelAppNotifier.shared.clear() SDLLogger.log("[SDLContext] tunnelAppNotifier ready") } private func startMonitor() { self.monitorWorker?.cancel() self.monitorWorker = nil // 启动monitor let monitor = SDLNetworkMonitor() monitor.start() SDLLogger.log("[SDLContext] monitor started") self.monitor = monitor self.monitorWorker = Task.detached { for await event in monitor.eventStream { switch event { case .changed: // 需要重新探测网络的nat类型 await self.probeNatType() SDLLogger.log("didNetworkPathChanged, nat type is:") case .unreachable: SDLLogger.log("didNetworkPathUnreachable") } } } } private func startDnsClient() async { self.dnsWorker?.cancel() self.dnsWorker = nil // 启动dns服务 let dnsClient = DNSCloudClient(host: self.config.serverIp, port: 15353) await dnsClient.start() SDLLogger.log("[SDLContext] dnsClient started") self.dnsClient = dnsClient let packetFlow = dnsClient.packetFlow self.dnsWorker = Task.detached { // 处理事件流 for await packet in packetFlow { if Task.isCancelled { break } let nePacket = NEPacket(data: packet, protocolFamily: 2) self.provider.packetFlow.writePacketObjects([nePacket]) } } } private func startDnsLocalClient() async { self.dnsLocalWorker?.cancel() self.dnsLocalWorker = nil // 启动dns服务 let dnsLocalClient = DNSLocalClient() await dnsLocalClient.start() SDLLogger.log("[SDLContext] dnsClient started") self.dnsLocalClient = dnsLocalClient let packetFlow = dnsLocalClient.packetFlow self.dnsLocalWorker = Task.detached { // 处理事件流 for await packet in packetFlow { if Task.isCancelled { break } // 要想办法构造一个完整的Ip包 let nePacket = NEPacket(data: packet, protocolFamily: 2) self.provider.packetFlow.writePacketObjects([nePacket]) } } } private func startUDPHole() async throws -> SDLUDPHole { self.udpHoleWorkers?.forEach {$0.cancel()} self.udpHoleWorkers = nil // 启动udp服务器 let udpHole = try SDLUDPHole() let localAddress = try udpHole.start() SDLLogger.log("[SDLContext] udpHole started, on address: \(localAddress)") // 处理心跳逻辑 let pingTask = Task.detached { let timerStream = SDLAsyncTimerStream() timerStream.start(interval: .seconds(5)) for await _ in timerStream.stream { if Task.isCancelled { break } await self.sendStunRequest() } SDLLogger.log("[SDLContext] udp pingTask cancel") } // 处理消息流 let messageStream = udpHole.messageStream let messageTask = Task.detached { await self.consumeUDPHoleMessages(stream: messageStream, localAddress: localAddress, source: .v4) } self.udpHole = udpHole self.udpHoleLocalAddress = localAddress self.udpHoleWorkers = [pingTask, messageTask] // 开始探测nat的类型 await self.probeNatType() return udpHole } private func startUDPHoleV6() async throws -> SDLUDPHoleV6 { self.udpHoleV6Workers?.forEach {$0.cancel()} self.udpHoleV6Workers = nil // 启动udp服务器 let udpHoleV6 = try SDLUDPHoleV6() let localAddress = try udpHoleV6.start() SDLLogger.log("[SDLContext] udpHoleV6 started, on address: \(localAddress)") // 处理消息流 let messageStream = udpHoleV6.messageStream let messageTask = Task.detached { await self.consumeUDPHoleMessages(stream: messageStream, localAddress: localAddress, source: .v6) } self.udpHoleV6 = udpHoleV6 self.udpHoleV6LocalAddress = localAddress self.udpHoleV6Workers = [messageTask] return udpHoleV6 } // 处理context的停止问题 public func stop() async { self.resumeReadyWaiters(.failure(CancellationError())) self.readyState = .stopped self.superRegistrationStateMachine.reset() await self.supervisor.stop() await self.puncherActor.stop() self.udpHoleWorkers?.forEach { $0.cancel() } self.udpHoleWorkers = nil self.udpHole?.stop() self.udpHole = nil self.udpHoleLocalAddress = nil self.udpHoleV6Workers?.forEach { $0.cancel() } self.udpHoleV6Workers = nil self.udpHoleV6?.stop() self.udpHoleV6 = nil self.udpHoleV6LocalAddress = nil self.quicWorker?.cancel() self.quicWorker = nil self.quicClient?.stop() self.quicClient = nil await self.dnsClient?.stop() self.dnsWorker?.cancel() self.dnsWorker = nil self.dnsClient = nil await self.dnsLocalClient?.stop() self.dnsLocalWorker?.cancel() self.dnsLocalWorker = nil self.dnsLocalClient = nil self.monitor?.stop() self.monitorWorker?.cancel() self.monitorWorker = nil self.monitor = nil self.readTask?.cancel() self.readTask = nil self.registerTask?.cancel() self.registerTask = nil self.updatePolicyTask?.cancel() self.updatePolicyTask = nil self.sessionToken = nil self.dataCipher = nil } private func publishTunnelEvent(code: Int? = nil, message: String) { SDLTunnelAppNotifier.shared.publish(code: code, message: message) } private func setNatType(natType: SDLNATProberActor.NatType) { self.natType = natType } // 注册成功super的回调函数 private func whenRegistedSuper() async { self.updatePolicyTask?.cancel() self.updatePolicyTask = Task { while !Task.isCancelled { try? await Task.sleep(for: .seconds(300)) SDLLogger.log("[SDLContext] updatePolicyTask execute") await self.identifyStore.batUpdatePolicy(using: self.quicClient, dstIdentityID: self.config.identityId) } } } private func sendStunRequest() { guard let sessionToken = self.sessionToken else { return } var stunRequest = SDLStunRequest() stunRequest.clientID = self.config.clientId stunRequest.networkID = self.config.networkAddress.networkId stunRequest.ip = self.config.networkAddress.ip stunRequest.mac = self.config.networkAddress.mac stunRequest.natType = UInt32(self.natType.rawValue) stunRequest.sessionToken = sessionToken if let v6Info = self.makeCurrentV6Info() { stunRequest.v6Info = v6Info } if let stunData = try? stunRequest.serializedData() { self.sendSuperPacket(type: .stunRequest, data: stunData) } } // 开始读取数据, 用单独的线程处理packetFlow private func startReader() { // 停止之前的任务 self.readTask?.cancel() // 开启新的任务 self.readTask = Task.detached(priority: .high) { while true { if Task.isCancelled { return } let (packets, numbers) = await self.provider.packetFlow.readPackets() for (data, number) in zip(packets, numbers) where number == 2 { SDLLogger.log("[SDLContext] read Tun packet step 1, data count: \(data.count)", for: .trace) if let ipPacket = IPPacket(data) { SDLLogger.log("[SDLContext] read Tun packet step 2, data count: \(ipPacket.data.count)", for: .trace) await self.dealTunPacket(packet: ipPacket) } } } } } // MARK: 网络改变时需要重新配置网络信息 private func setNetworkSettings(config: SDLConfiguration, dnsServer: String) async throws { let networkAddress = config.networkAddress // 配置路由规则 var routes: [NEIPv4Route] = [ NEIPv4Route(destinationAddress: networkAddress.netAddress, subnetMask: networkAddress.maskAddress), NEIPv4Route(destinationAddress: dnsServer, subnetMask: "255.255.255.255"), ] // 如果存在出口节点配置,则接管系统默认留有 if config.exitNode != nil { routes.append(.default()) } // Add code here to start the process of connecting the tunnel. let networkSettings = NEPacketTunnelNetworkSettings(tunnelRemoteAddress: "8.8.8.8") networkSettings.mtu = 1250 // 设置网卡的DNS解析 let networkDomain = networkAddress.networkDomain let dnsSettings = NEDNSSettings(servers: [dnsServer]) dnsSettings.searchDomains = [networkDomain] dnsSettings.matchDomains = [networkDomain, ""] // 设置为 false 允许系统在补全 Search Domain 时也能匹配到此设置 dnsSettings.matchDomainsNoSearch = false networkSettings.dnsSettings = dnsSettings let ipv4Settings = NEIPv4Settings(addresses: [networkAddress.ipAddress], subnetMasks: [networkAddress.maskAddress]) // 设置路由表 ipv4Settings.includedRoutes = routes // 配置要排除的路由 ipv4Settings.excludedRoutes = self.getIpv4ExcludeRoutes() networkSettings.ipv4Settings = ipv4Settings // 网卡配置设置必须成功 try await self.provider.setTunnelNetworkSettings(networkSettings) } // 探测当前网络的类型 private func probeNatType() async { guard let udpHole = self.udpHole else { return } // 开始探测nat的类型 self.natType = await self.proberActor.probeNatType(using: udpHole) SDLLogger.log("[SDLContext] nat_type is: \(natType)") } // 发送给super/stun节点的数据 private func sendSuperPacket(type: SDLPacketType, data: Data) { self.sendPacket(type: type, data: data, remoteAddress: self.config.stunSocketAddress) } // 发送给peer的数据 private func sendPeerPacket(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) { self.sendPacket(type: type, data: data, remoteAddress: remoteAddress) } private func makeCurrentV6Info() -> SDLV6Info? { guard let port = self.udpHoleV6LocalAddress?.port, let ip = NetworkInterfaceManager.getPublicIPv6Address(), let ipData = SDLUtil.ipv6StrToData(ip) else { return nil } var v6Info = SDLV6Info() v6Info.port = UInt32(port) v6Info.v6 = ipData return v6Info } private func sendPacket(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) { switch remoteAddress { case .v4: guard let udpHole = self.udpHole else { SDLLogger.log("[SDLContext] udpHole is nil for remoteAddress: \(remoteAddress)", for: .debug) return } udpHole.send(type: type, data: data, remoteAddress: remoteAddress) case .v6: guard let udpHoleV6 = self.udpHoleV6 else { SDLLogger.log("[SDLContext] udpHoleV6 is nil for remoteAddress: \(remoteAddress)", for: .debug) return } udpHoleV6.send(type: type, data: data, remoteAddress: remoteAddress) default: SDLLogger.log("[SDLContext] unsupported socket family: \(remoteAddress)", for: .debug) } } private func spawnLoop(_ body: @escaping () async throws -> Void) -> Task { return Task.detached { while !Task.isCancelled { do { try await body() } catch is CancellationError { break } catch { try? await Task.sleep(nanoseconds: 2_000_000_000) } } } } private func getIpv4ExcludeRoutes() -> [NEIPv4Route] { // 要排除的路由表 let dnsServers = SDLUtil.getMacOSSystemDnsServers() var ipv4DnsServers = dnsServers.filter {!$0.contains(":")} // 增加常见的dns服务 let commonDnsServers = [ "8.8.8.8", "8.8.4.4", "223.5.5.5", "223.6.6.6", "114.114.114.114" ] for ip in commonDnsServers { if !ipv4DnsServers.contains(ip) { ipv4DnsServers.append(ip) } } return ipv4DnsServers.map { NEIPv4Route(destinationAddress: $0, subnetMask: "255.255.255.255") } } private func markReady() { guard case .starting = self.readyState else { return } self.readyState = .ready self.resumeReadyWaiters(.success(())) } private func failReady(_ error: any Error) { switch self.readyState { case .ready, .failed, .stopped: return case .idle, .starting: self.readyState = .failed(error) self.resumeReadyWaiters(.failure(error)) } } private func resumeReadyWaiters(_ result: Result) { let waiters = self.readyWaiters self.readyWaiters.removeAll() waiters.forEach { continuation in switch result { case .success: continuation.resume() case .failure(let error): continuation.resume(throwing: error) } } } deinit { self.udpHole = nil self.udpHoleLocalAddress = nil self.udpHoleV6 = nil self.udpHoleV6LocalAddress = nil self.dnsClient = nil } } // 处理和Super之间的通讯 extension SDLContextActor { private func makeSuperEventProcessor() -> SDLSuperEventProcessor { return .init(networkAddress: self.config.networkAddress) } // 开启注册任务 private func startRegisterLoop() { guard self.registerTask == nil, self.superRegistrationStateMachine.beginLoop() else { return } self.registerTask = Task { defer { self.registerTask = nil } while !Task.isCancelled { switch self.superRegistrationStateMachine.makeLoopAction() { case .sendRegister: self.doRegisterSuper() case .stop: return } try? await Task.sleep(for: .seconds(5)) switch self.superRegistrationStateMachine.makeWaitDecision() { case .registered: await self.whenRegistedSuper() return case .retry: SDLLogger.log("[SDLContext] register super failed, retry") case .stop: return } } } } private func handleRegisterSuperAck(registerSuperAck: SDLRegisterSuperAck) async { // 需要对数据通过rsa的私钥解码 guard let key = try? self.rsaCipher.decode(data: Data(registerSuperAck.key)) else { self.superRegistrationStateMachine.handleFailure() SDLLogger.log("[SDLContext] registerSuperAck invalid key") let error = SDLError.invalidKey self.failReady(error) self.provider.cancelTunnelWithError(error) return } let algorithm = registerSuperAck.algorithm.lowercased() let regionId = registerSuperAck.regionID self.sessionToken = registerSuperAck.sessionToken switch algorithm { case "aes": self.dataCipher = CCAESChiper(key: key) case "chacha20": self.dataCipher = CCChaCha20Cipher(regionId: regionId, keyData: key) default: self.superRegistrationStateMachine.handleFailure() SDLLogger.log("[SDLContext] registerSuperAck invalid algorithm \(algorithm)") let error = SDLError.unsupportedAlgorithm(algorithm: algorithm) self.failReady(error) self.provider.cancelTunnelWithError(error) return } SDLLogger.log("[SDLContext] registerSuperAck, use algorithm \(algorithm), key len: \(key.count)") // 服务器分配的tun网卡信息 do { try await self.setNetworkSettings(config: self.config, dnsServer: DNSHelper.dnsServer) SDLLogger.log("[SDLContext] setNetworkSettings successed") self.superRegistrationStateMachine.handleRegisterSuperAck() self.startReader() self.markReady() } catch let err { self.superRegistrationStateMachine.handleFailure() SDLLogger.log("[SDLContext] setTunnelNetworkSettings get error: \(err)") self.failReady(err) self.provider.cancelTunnelWithError(err) } } private func handleRegisterSuperNak(nakPacket: SDLRegisterSuperNak) { let errorMessage = nakPacket.errorMessage guard let errorCode = SDLNAKErrorCode(rawValue: UInt8(nakPacket.errorCode)) else { return } switch errorCode { case .invalidToken, .nodeDisabled: self.superRegistrationStateMachine.handleFailure() self.publishTunnelEvent(code: Int(errorCode.rawValue), message: errorMessage) // 报告错误并退出 let error = NSError(domain: "com.jihe.punchnet.tun", code: -1) self.failReady(error) self.provider.cancelTunnelWithError(error) case .noIpAddress, .networkFault, .internalFault: self.superRegistrationStateMachine.handleRetryableNak() self.publishTunnelEvent(code: Int(errorCode.rawValue), message: errorMessage) } SDLLogger.log("[SDLContext] Get a SuperNak message exit") } private func handleEvent(event: SDLEvent) async { let processor = self.makeSuperEventProcessor() let plan = await processor.makeProcessingPlan(event: event) if let logMessage = plan.logMessage { SDLLogger.log(logMessage) } switch plan.action { case .removeSession(let dstMac): await self.sessionManager.removeSession(dstMac: dstMac) case .sendRegister(let registerData, let remoteAddresses): remoteAddresses.forEach { remoteAddress in self.sendPeerPacket(type: .register, data: registerData, remoteAddress: remoteAddress) } case .shutdown(let message): self.publishTunnelEvent(message: message) // 报告错误并退出 let error = NSError(domain: "com.jihe.punchnet.tun", code: -2) self.failReady(error) self.provider.cancelTunnelWithError(error) case .none: () } } private func doRegisterSuper() { // 注册 var registerSuper = SDLRegisterSuper() registerSuper.clientID = self.config.clientId registerSuper.networkID = self.config.networkAddress.networkId registerSuper.mac = self.config.networkAddress.mac registerSuper.ip = self.config.networkAddress.ip registerSuper.maskLen = UInt32(self.config.networkAddress.maskLen) registerSuper.hostname = self.config.hostname registerSuper.pubKey = self.rsaCipher.pubKey registerSuper.accessToken = self.config.accessToken if let registerSuperData = try? registerSuper.serializedData() { SDLLogger.log("[SDLContext] will send register super") self.quicClient?.send(type: .registerSuper, data: registerSuperData) } } } // 处理从Hole收到的数据 extension SDLContextActor { private func consumeUDPHoleMessages(stream: AsyncStream<(SocketAddress, SDLHoleMessage)>, localAddress: SocketAddress, source: UDPHoleKind) async { for await (remoteAddress, message) in stream { if Task.isCancelled { break } switch message.inboundMessage { case .control(let controlMessage): await self.handleHoleControlMessage(controlMessage, localAddress: localAddress, remoteAddress: remoteAddress, source: source) case .data(let data): try? await self.handleHoleData(data: data) } } } private func handleHoleControlMessage(_ message: SDLHoleControlMessage, localAddress: SocketAddress, remoteAddress: SocketAddress, source: UDPHoleKind) async { switch message { case .stunReply(_): guard source == .v4 else { return } SDLLogger.log("[SDLContext] get a stunReply", for: .debug) case .stunProbeReply(let probeReply): guard source == .v4 else { return } await self.proberActor.handleProbeReply(localAddress: localAddress, reply: probeReply) case .register(let register): try? await self.handleRegister(remoteAddress: remoteAddress, register: register) case .registerAck(let registerAck): await self.handleRegisterAck(remoteAddress: remoteAddress, registerAck: registerAck) } } private func handleRegister(remoteAddress: SocketAddress, register: SDLRegister) async throws { let networkAddr = config.networkAddress SDLLogger.log("[SDLContext] register packet: \(register), network_address: \(networkAddr)") // 判断目标地址是否是tun的网卡地址, 并且是在同一个网络下 if register.dstMac == networkAddr.mac && register.networkID == networkAddr.networkId { // 回复ack包 var registerAck = SDLRegisterAck() registerAck.networkID = networkAddr.networkId registerAck.srcMac = networkAddr.mac registerAck.dstMac = register.srcMac self.sendPeerPacket(type: .registerAck, data: try registerAck.serializedData(), remoteAddress: remoteAddress) // 这里需要建立到来源的会话, 在复杂网络下,通过super-node查询到的nat地址不一定靠谱,需要通过udp包的来源地址作为nat地址 if let session = Session(dstMac: register.srcMac, natAddress: remoteAddress) { await self.sessionManager.addSession(session: session) } else { SDLLogger.log("[SDLContext] didReadRegister get unsupported remoteAddress: \(remoteAddress)", for: .debug) } } else { SDLLogger.log("[SDLContext] didReadRegister get a invalid packet, because dst_ip not matched: \(register.dstMac)") } } private func handleRegisterAck(remoteAddress: SocketAddress, registerAck: SDLRegisterAck) async { // 判断目标地址是否是tun的网卡地址, 并且是在同一个网络下 let networkAddr = config.networkAddress if registerAck.dstMac == networkAddr.mac && registerAck.networkID == networkAddr.networkId { if let session = Session(dstMac: registerAck.srcMac, natAddress: remoteAddress) { await self.sessionManager.addSession(session: session) } else { SDLLogger.log("[SDLContext] didReadRegisterAck get unsupported remoteAddress: \(remoteAddress)", for: .debug) } } else { SDLLogger.log("[SDLContext] didReadRegisterAck get a invalid packet, because dst_mac not matched: \(registerAck.dstMac)") } } private func makeHoleDataProcessor() -> SDLHoleDataProcessor { return .init( networkAddress: self.config.networkAddress, dataCipher: self.dataCipher, snapshotPublisher: self.snapshotPublisher, flowSessionManager: self.flowSessionManager ) } private func handleHoleData(data: SDLData) async throws { let processor = self.makeHoleDataProcessor() guard let plan = try processor.makeProcessingPlan(data: data) else { return } self.flowTracer.inc(num: plan.inboundBytes, type: .inbound) switch plan.action { case .sendARPReply(let dstMac, let responseData): SDLLogger.log("[SDLContext] get arp request packet") await self.routeLayerPacket(dstMac: dstMac, type: .arp, data: responseData) case .appendARP(let ip, let mac): SDLLogger.log("[SDLContext] get arp response packet") await self.arpServer.append(ip: ip, mac: mac) case .writeToTun(let packetData, let identityID): let packet = NEPacket(data: packetData, protocolFamily: 2) self.provider.packetFlow.writePacketObjects([packet]) SDLLogger.log("[SDLContext] hole identity: \(identityID), allow, data count: \(packetData.count)", for: .trace) case .requestPolicy(let srcIdentityID): SDLLogger.log("[SDLContext] not found identity: \(srcIdentityID) ruleMap", for: .debug) // 向服务器请求权限逻辑 await self.identifyStore.policyRequest(srcIdentityId: srcIdentityID, dstIdentityId: self.config.identityId, using: self.quicClient) case .none: () } } } private extension UInt32 { // 转换成ip地址 func asIpAddress() -> String { return SDLUtil.int32ToIp(self) } } // 处理从Tun收到的数据 extension SDLContextActor { // 处理读取的每个数据包, Tun收到的包的一级路由 private func dealTunPacket(packet: IPPacket) async { let router = SDLTunPacketRouter(networkAddress: self.config.networkAddress, exitNode: self.config.exitNode) let decision = router.route(packet: packet) // 外部出去的数据,需要建立FlowSession // 外部数据进来的时候需要查找 if decision.shouldTrackFlow, let flowSession = packet.flowSession() { self.flowSessionManager.updateSession(flowSession) //SDLLogger.shared.log("[SDLContext] flow_session: \(flowSession)", level: .debug) } await self.handleTunRouteDecision(decision) } private func handleTunRouteDecision(_ decision: SDLTunPacketRouter.RouteDecision) async { switch decision { case .loopback(let ipPacketData): let nePacket = NEPacket(data: ipPacketData, protocolFamily: 2) self.provider.packetFlow.writePacketObjects([nePacket]) case .cloudDNS(let name, let ipPacketData): SDLLogger.log("[SDLContext] get cloud dns request: \(name)") await self.dnsClient?.forward(ipPacketData: ipPacketData) case .localDNS(let name, let payload, let tracker): SDLLogger.log("[SDLContext] get local dns request: \(name)") await self.dnsLocalClient?.query(tracker: tracker, dnsPayload: payload) case .forwardToNextHop(let ip, let type, let data, let kind): await self.forwardPacketToNextHop(ip: ip, type: type, data: data, kind: kind) case .drop(let reason): SDLLogger.log("[SDLContext] drop tun packet, reason: \(reason.rawValue)", for: .trace) } } private func forwardPacketToNextHop(ip: UInt32, type: LayerPacket.PacketType, data: Data, kind: SDLTunPacketRouter.ForwardKind) async { switch kind { case .sameNetwork: SDLLogger.log("[SDLContext] dstIp: \(ip.asIpAddress()) same network", for: .trace) case .exitNode, .dnsExitNode: SDLLogger.log("[SDLContext] use exit_node: \(ip.asIpAddress())", for: .trace) } // 查找arp缓存中是否有目标mac地址 if let dstMac = await self.arpServer.query(ip: ip) { SDLLogger.log("[SDLContext] dstIp: \(ip.asIpAddress()), dst_mac is: \(SDLUtil.formatMacAddress(mac: dstMac))", for: .trace) await self.routeLayerPacket(dstMac: dstMac, type: type, data: data) } else { SDLLogger.log("[SDLContext] dstIp: \(ip.asIpAddress()) arp query not found, broadcast", for: .trace) // // 构造arp广播 // let arpReqeust = ARPPacket.arpRequest(senderIP: networkAddr.ip, senderMAC: networkAddr.mac, targetIP: dstIp) // await self.routeLayerPacket(dstMac: ARPPacket.broadcastMac , type: .arp, data: arpReqeust.marshal()) try? await self.arpServer.arpRequest(targetIp: ip, use: self.quicClient) } } private func makeLayerPacketForwarder() -> SDLLayerPacketForwarder { return .init( networkAddress: self.config.networkAddress, identityID: self.config.identityId, dataCipher: self.dataCipher, sessionManager: self.sessionManager ) } private func routeLayerPacket(dstMac: Data, type: LayerPacket.PacketType, data: Data) async { // 将数据封装层2层的数据包 // 构造数据包 let forwarder = self.makeLayerPacketForwarder() guard let plan = try? await forwarder.makeDeliveryPlan(dstMac: dstMac, type: type, data: data) else { return } // 广播地址不要去尝试打洞 switch plan { case .superNode(let payload): // 通过super_node进行转发 self.sendSuperPacket(type: .data, data: payload) case .peer(let payload, let session): // 通过session发送到对端 SDLLogger.log("[SDLContext] step 5 send packet by session: \(session)", for: .trace) self.sendPeerPacket(type: .data, data: payload, remoteAddress: session.natAddress) self.flowTracer.inc(num: payload.count, type: .p2p) case .superNodeAndPunch(let payload, let request): // 通过super_node进行转发 self.sendSuperPacket(type: .data, data: payload) SDLLogger.log("[SDLContext] step 5 send packet by super: \(self.config.stunSocketAddress)", for: .trace) // 流量统计 self.flowTracer.inc(num: payload.count, type: .forward) // 尝试打洞 await self.puncherActor.submitRegisterRequest(quicClient: self.quicClient, request: request) } } }