1060 lines
40 KiB
Swift
1060 lines
40 KiB
Swift
//
|
||
// SDLContext.swift
|
||
// Tun
|
||
//
|
||
// Created by 安礼成 on 2024/2/29.
|
||
//
|
||
|
||
import Foundation
|
||
import NetworkExtension
|
||
import NIOCore
|
||
|
||
// 上下文环境变量,全局共享
|
||
/*
|
||
1. 处理rsa的加解密逻辑
|
||
*/
|
||
actor SDLContextActor {
|
||
enum ReadyState {
|
||
case idle
|
||
case starting
|
||
case ready
|
||
case failed(any Error)
|
||
case stopped
|
||
}
|
||
|
||
private enum UDPHoleKind: Equatable {
|
||
case v4
|
||
case v6
|
||
|
||
func convertAddressType() -> Session.AddressType {
|
||
switch self {
|
||
case .v4:
|
||
return .v4
|
||
case .v6:
|
||
return .v6
|
||
}
|
||
}
|
||
}
|
||
|
||
private var readyState: ReadyState = .idle
|
||
private var readyWaiters: [CheckedContinuation<Void, Error>] = []
|
||
|
||
var config: SDLConfiguration
|
||
// nat的网络类型
|
||
var natType: SDLNATProberActor.NatType = .blocked
|
||
|
||
// AES加密,授权通过后,对象才会被创建
|
||
private var dataCipher: CCDataCipher?
|
||
|
||
// session token
|
||
private var sessionToken: Data?
|
||
|
||
// rsa的相关配置, public_key是本地生成的
|
||
// 加密算法相关
|
||
nonisolated let rsaCipher: RSACipher
|
||
|
||
// 依赖的变量
|
||
private var udpHole: SDLUDPHole?
|
||
private var udpHoleWorkers: [Task<Void, Never>]?
|
||
private var udpHoleLocalAddress: SocketAddress?
|
||
|
||
private var udpHoleV6: SDLUDPHoleV6?
|
||
private var udpHoleV6Workers: [Task<Void, Never>]?
|
||
private var udpHoleV6LocalAddress: SocketAddress?
|
||
|
||
// dns的client对象
|
||
private var dnsClient: DNSCloudClient?
|
||
private var dnsWorker: Task<Void, Never>?
|
||
|
||
// Localdns的client对象
|
||
private var dnsLocalClient: DNSLocalClient?
|
||
private var dnsLocalWorker: Task<Void, Never>?
|
||
|
||
private var quicClient: SDLQUICClient?
|
||
private var quicWorker: Task<Void, Never>?
|
||
|
||
nonisolated private let puncherActor: SDLPuncherActor
|
||
// 网络探测对象
|
||
nonisolated private let proberActor: SDLNATProberActor
|
||
|
||
// 本地ipv6地址信息探测
|
||
private var ipv6AssistClient: SDLIPV6AssistClient?
|
||
|
||
// 数据包读取任务
|
||
private var readTask: Task<(), Never>?
|
||
|
||
private let sessionManager = SessionManager()
|
||
nonisolated private let arpServer: ArpServer
|
||
|
||
// 网络状态变化的健康
|
||
private var monitor: SDLNetworkMonitor?
|
||
private var monitorWorker: Task<Void, Never>?
|
||
|
||
// 内部socket通讯
|
||
// 改为基于 App Group + Darwin Notification 的通知
|
||
|
||
// 流量统计
|
||
nonisolated private let flowTracer = SDLFlowTracer()
|
||
|
||
// 处理内部的需要长时间运行的任务
|
||
private var supervisor = SDLSupervisor()
|
||
|
||
nonisolated private let provider: NEPacketTunnelProvider
|
||
|
||
// 处理权限控制
|
||
private let identifyStore: IdentityStore
|
||
private var updatePolicyTask: Task<Void, Never>?
|
||
private let snapshotPublisher: SnapshotPublisher<IdentitySnapshot>
|
||
|
||
// Flow流会话管理, 过期时间为: 180秒
|
||
private let flowSessionManager = SDLFlowSessionManager(sessionTimeout: 180)
|
||
|
||
// 注册任务
|
||
private var registerTask: Task<Void, Never>?
|
||
|
||
// stunRequest任务
|
||
private var stunRequestTask: Task<Void, Never>?
|
||
|
||
private let superRegistrationStateMachine = SDLSuperRegistrationStateMachine()
|
||
|
||
public init(provider: NEPacketTunnelProvider, config: SDLConfiguration, rsaCipher: RSACipher) {
|
||
self.provider = provider
|
||
self.config = config
|
||
self.rsaCipher = rsaCipher
|
||
|
||
self.puncherActor = SDLPuncherActor()
|
||
self.proberActor = SDLNATProberActor(addressArray: config.stunProbeSocketAddressArray)
|
||
|
||
self.arpServer = ArpServer()
|
||
|
||
// 权限控制
|
||
let snapshotPublisher = SnapshotPublisher(initial: IdentitySnapshot.empty())
|
||
self.identifyStore = IdentityStore(publisher: snapshotPublisher)
|
||
self.snapshotPublisher = snapshotPublisher
|
||
}
|
||
|
||
public func start() async {
|
||
guard case .idle = self.readyState else {
|
||
return
|
||
}
|
||
|
||
self.readyState = .starting
|
||
self.prepareTunnelNotifier()
|
||
self.startMonitor()
|
||
|
||
// 启动arp的定时清理任务
|
||
await self.puncherActor.start()
|
||
await self.arpServer.start()
|
||
await self.startDnsClient()
|
||
await self.startDnsLocalClient()
|
||
|
||
await self.supervisor.addWorker(name: "quicClient") {
|
||
SDLLogger.log("[SDLContext] try start quicClient", for: .debug)
|
||
let quicClient = try await self.startQUICClient()
|
||
SDLLogger.log("[SDLContext] quicClient running!!!!")
|
||
await quicClient.waitClose()
|
||
SDLLogger.log("[SDLContext] quicClient closed!!!!")
|
||
}
|
||
|
||
await self.supervisor.addWorker(name: "udpHole") {
|
||
let udpHole = try await self.startUDPHole()
|
||
SDLLogger.log("[SDLContext] udp running!!!!")
|
||
try await udpHole.waitClose()
|
||
SDLLogger.log("[SDLContext] udp closed!!!!")
|
||
}
|
||
|
||
await self.supervisor.addWorker(name: "udpHoleV6") {
|
||
let udpHoleV6 = try await self.startUDPHoleV6()
|
||
SDLLogger.log("[SDLContext] udp v6 running!!!!")
|
||
try await udpHoleV6.waitClose()
|
||
SDLLogger.log("[SDLContext] udp v6 closed!!!!")
|
||
}
|
||
}
|
||
|
||
public func waitForReady() async throws {
|
||
switch self.readyState {
|
||
case .ready:
|
||
return
|
||
case .failed(let error):
|
||
throw error
|
||
case .stopped:
|
||
throw CancellationError()
|
||
case .idle, .starting:
|
||
try await withCheckedThrowingContinuation { continuation in
|
||
self.readyWaiters.append(continuation)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 取消出口节点的时候,ip地址为: 0.0.0.0
|
||
public func updateExitNode(exitNodeIp: String) async throws {
|
||
if let ip = SDLUtil.ipv4StrToInt32(exitNodeIp), ip > 0 {
|
||
self.config.exitNode = .init(exitNodeIp: ip)
|
||
} else {
|
||
self.config.exitNode = nil
|
||
}
|
||
try await self.setNetworkSettings(config: config, dnsServer: DNSHelper.dnsServer)
|
||
}
|
||
|
||
private func startQUICClient() async throws -> SDLQUICClient {
|
||
self.quicWorker?.cancel()
|
||
self.quicClient?.stop()
|
||
|
||
// 启动monitor
|
||
let quicClient = SDLQUICClient(host: self.config.serverHost, port: 443)
|
||
quicClient.start()
|
||
|
||
// 等待quic准备好
|
||
try await quicClient.waitReady()
|
||
// 这里必须等待quic的协商完成
|
||
try await Task.sleep(for: .seconds(0.2))
|
||
SDLLogger.log("[SDLContext] start quic client: \(self.config.serverHost)")
|
||
|
||
self.quicWorker = Task.detached {
|
||
for await message in quicClient.messageStream {
|
||
switch message {
|
||
case .welcome(let welcome):
|
||
SDLLogger.log("[SDLContext] quic welcome: \(welcome)")
|
||
// 注册
|
||
await self.startRegisterLoop()
|
||
// 启动stun任务
|
||
await self.startStunRequestTask(welcome: welcome)
|
||
|
||
case .pong:
|
||
//SDLLogger.shared.log("[SDLContext] quic pong")
|
||
()
|
||
case .registerSuperAck(let registerSuperAck):
|
||
await self.handleRegisterSuperAck(registerSuperAck: registerSuperAck)
|
||
case .registerSuperNak(let registerSuperNak):
|
||
await self.handleRegisterSuperNak(nakPacket: registerSuperNak)
|
||
case .peerInfo(let peerInfo):
|
||
//SDLLogger.shared.log("[SDLContext] peer message: \(peerInfo)")
|
||
await self.puncherActor.handlePeerInfo(using: self.udpHole, udpHoleV6: self.udpHoleV6, peerInfo: peerInfo)
|
||
case .event(let event):
|
||
await self.handleEvent(event: event)
|
||
case .policyReponse(let policyResponse):
|
||
// 处理权限的请求问题
|
||
await self.identifyStore.applyPolicyResponse(policyResponse)
|
||
case .arpResponse(let arpResponse):
|
||
//SDLLogger.shared.log("[SDLContext] get arp response: \(arpResponse)")
|
||
await self.arpServer.handleArpResponse(arpResponse: arpResponse)
|
||
}
|
||
}
|
||
}
|
||
self.quicClient = quicClient
|
||
|
||
return quicClient
|
||
}
|
||
|
||
private func prepareTunnelNotifier() {
|
||
// 启动noticeClient
|
||
// 旧的 UDP NoticeClient 已移除,改为初始化基于 App Group 的通知通道。
|
||
SDLTunnelAppNotifier.shared.clear()
|
||
|
||
SDLLogger.log("[SDLContext] tunnelAppNotifier ready")
|
||
}
|
||
|
||
private func startMonitor() {
|
||
self.monitorWorker?.cancel()
|
||
self.monitorWorker = nil
|
||
|
||
// 启动monitor
|
||
let monitor = SDLNetworkMonitor()
|
||
monitor.start()
|
||
SDLLogger.log("[SDLContext] monitor started")
|
||
self.monitor = monitor
|
||
|
||
self.monitorWorker = Task.detached {
|
||
for await event in monitor.eventStream {
|
||
switch event {
|
||
case .changed:
|
||
// 需要重新探测网络的nat类型
|
||
await self.probeNatType()
|
||
SDLLogger.log("didNetworkPathChanged, nat type is:")
|
||
case .unreachable:
|
||
SDLLogger.log("didNetworkPathUnreachable")
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
private func startDnsClient() async {
|
||
self.dnsWorker?.cancel()
|
||
self.dnsWorker = nil
|
||
|
||
// 启动dns服务
|
||
let dnsClient = DNSCloudClient(host: self.config.serverIp, port: 15353)
|
||
await dnsClient.start()
|
||
SDLLogger.log("[SDLContext] dnsClient started")
|
||
self.dnsClient = dnsClient
|
||
let packetFlow = dnsClient.packetFlow
|
||
self.dnsWorker = Task.detached {
|
||
// 处理事件流
|
||
for await packet in packetFlow {
|
||
if Task.isCancelled {
|
||
break
|
||
}
|
||
let nePacket = NEPacket(data: packet, protocolFamily: 2)
|
||
self.provider.packetFlow.writePacketObjects([nePacket])
|
||
}
|
||
}
|
||
}
|
||
|
||
private func startDnsLocalClient() async {
|
||
self.dnsLocalWorker?.cancel()
|
||
self.dnsLocalWorker = nil
|
||
|
||
// 启动dns服务
|
||
let dnsLocalClient = DNSLocalClient()
|
||
await dnsLocalClient.start()
|
||
SDLLogger.log("[SDLContext] dnsClient started")
|
||
self.dnsLocalClient = dnsLocalClient
|
||
let packetFlow = dnsLocalClient.packetFlow
|
||
self.dnsLocalWorker = Task.detached {
|
||
// 处理事件流
|
||
for await packet in packetFlow {
|
||
if Task.isCancelled {
|
||
break
|
||
}
|
||
|
||
// 要想办法构造一个完整的Ip包
|
||
let nePacket = NEPacket(data: packet, protocolFamily: 2)
|
||
self.provider.packetFlow.writePacketObjects([nePacket])
|
||
}
|
||
}
|
||
}
|
||
|
||
private func startUDPHole() async throws -> SDLUDPHole {
|
||
self.udpHoleWorkers?.forEach {$0.cancel()}
|
||
self.udpHoleWorkers = nil
|
||
|
||
// 启动udp服务器
|
||
let udpHole = try SDLUDPHole()
|
||
let localAddress = try udpHole.start()
|
||
SDLLogger.log("[SDLContext] udpHole started, on address: \(localAddress)")
|
||
|
||
// 处理消息流
|
||
let messageStream = udpHole.messageStream
|
||
let messageTask = Task.detached {
|
||
await self.consumeUDPHoleMessages(stream: messageStream, localAddress: localAddress, source: .v4)
|
||
}
|
||
|
||
self.udpHole = udpHole
|
||
self.udpHoleLocalAddress = localAddress
|
||
self.udpHoleWorkers = [messageTask]
|
||
|
||
// 开始探测nat的类型
|
||
await self.probeNatType()
|
||
|
||
return udpHole
|
||
}
|
||
|
||
private func startUDPHoleV6() async throws -> SDLUDPHoleV6 {
|
||
self.udpHoleV6Workers?.forEach {$0.cancel()}
|
||
self.udpHoleV6Workers = nil
|
||
|
||
// 启动udp服务器
|
||
let udpHoleV6 = try SDLUDPHoleV6()
|
||
let localAddress = try udpHoleV6.start()
|
||
SDLLogger.log("[SDLContext] udpHoleV6 started, on address: \(localAddress)")
|
||
|
||
// 处理消息流
|
||
let messageStream = udpHoleV6.messageStream
|
||
let messageTask = Task.detached {
|
||
await self.consumeUDPHoleMessages(stream: messageStream, localAddress: localAddress, source: .v6)
|
||
}
|
||
|
||
self.udpHoleV6 = udpHoleV6
|
||
self.udpHoleV6LocalAddress = localAddress
|
||
self.udpHoleV6Workers = [messageTask]
|
||
|
||
return udpHoleV6
|
||
}
|
||
|
||
// 处理context的停止问题
|
||
public func stop() async {
|
||
self.resumeReadyWaiters(.failure(CancellationError()))
|
||
self.readyState = .stopped
|
||
self.superRegistrationStateMachine.reset()
|
||
|
||
await self.supervisor.stop()
|
||
await self.puncherActor.stop()
|
||
|
||
self.udpHoleWorkers?.forEach { $0.cancel() }
|
||
self.udpHoleWorkers = nil
|
||
self.udpHole?.stop()
|
||
self.udpHole = nil
|
||
self.udpHoleLocalAddress = nil
|
||
|
||
self.udpHoleV6Workers?.forEach { $0.cancel() }
|
||
self.udpHoleV6Workers = nil
|
||
self.udpHoleV6?.stop()
|
||
self.udpHoleV6 = nil
|
||
self.udpHoleV6LocalAddress = nil
|
||
|
||
self.quicWorker?.cancel()
|
||
self.quicWorker = nil
|
||
self.quicClient?.stop()
|
||
self.quicClient = nil
|
||
|
||
await self.dnsClient?.stop()
|
||
self.dnsWorker?.cancel()
|
||
self.dnsWorker = nil
|
||
self.dnsClient = nil
|
||
|
||
await self.dnsLocalClient?.stop()
|
||
self.dnsLocalWorker?.cancel()
|
||
self.dnsLocalWorker = nil
|
||
self.dnsLocalClient = nil
|
||
|
||
self.monitor?.stop()
|
||
self.monitorWorker?.cancel()
|
||
self.monitorWorker = nil
|
||
self.monitor = nil
|
||
|
||
self.readTask?.cancel()
|
||
self.readTask = nil
|
||
|
||
self.registerTask?.cancel()
|
||
self.registerTask = nil
|
||
|
||
self.updatePolicyTask?.cancel()
|
||
self.updatePolicyTask = nil
|
||
|
||
self.sessionToken = nil
|
||
self.dataCipher = nil
|
||
|
||
await self.ipv6AssistClient?.stop()
|
||
self.ipv6AssistClient = nil
|
||
}
|
||
|
||
private func publishTunnelEvent(code: Int? = nil, message: String) {
|
||
SDLTunnelAppNotifier.shared.publish(code: code, message: message)
|
||
}
|
||
|
||
private func setNatType(natType: SDLNATProberActor.NatType) {
|
||
self.natType = natType
|
||
}
|
||
|
||
// 注册成功super的回调函数
|
||
private func whenRegistedSuper() async {
|
||
self.updatePolicyTask?.cancel()
|
||
|
||
self.updatePolicyTask = Task {
|
||
while !Task.isCancelled {
|
||
try? await Task.sleep(for: .seconds(300))
|
||
SDLLogger.log("[SDLContext] updatePolicyTask execute")
|
||
await self.identifyStore.batUpdatePolicy(using: self.quicClient, dstIdentityID: self.config.identityId)
|
||
}
|
||
}
|
||
}
|
||
|
||
// MARK: -- StunRequestTask
|
||
private func startStunRequestTask(welcome: SDLWelcome) async {
|
||
await self.ipv6AssistClient?.stop()
|
||
self.ipv6AssistClient = SDLIPV6AssistClient(assistServerInfo: welcome.ipv6Assist)
|
||
await self.ipv6AssistClient?.start()
|
||
|
||
SDLLogger.log("[SDLContext] v6 assist client: \(self.ipv6AssistClient)")
|
||
// 通过 welcome信息拿到当前可用使用的ipv6地址
|
||
// 处理心跳逻辑
|
||
self.stunRequestTask = Task.detached {
|
||
let timerStream = SDLAsyncTimerStream()
|
||
timerStream.start(interval: .seconds(8))
|
||
|
||
for await _ in timerStream.stream {
|
||
let probeReply = try? await self.ipv6AssistClient?.probe(requestTimeout: .seconds(3))
|
||
|
||
SDLLogger.log("[SDLContext] probe ipv6 address: \(probeReply)")
|
||
if let v6Info = probeReply?.v6Info, let v6Address = SDLUtil.ipv6DataToString(v6Info.v6) {
|
||
SDLLogger.log("[SDLContext] probe ipv6 address: \(v6Address)")
|
||
} else {
|
||
SDLLogger.log("[SDLContext] probe ipv6 address: empty")
|
||
}
|
||
await self.sendStunRequest(probeReply: probeReply)
|
||
}
|
||
|
||
SDLLogger.log("[SDLContext] udp stunRequestTask cancel")
|
||
}
|
||
}
|
||
|
||
private func sendStunRequest(probeReply: SDLV6AssistProbeReply?) {
|
||
guard let sessionToken = self.sessionToken else {
|
||
return
|
||
}
|
||
|
||
var stunRequest = SDLStunRequest()
|
||
stunRequest.clientID = self.config.clientId
|
||
stunRequest.networkID = self.config.networkAddress.networkId
|
||
stunRequest.ip = self.config.networkAddress.ip
|
||
stunRequest.mac = self.config.networkAddress.mac
|
||
stunRequest.natType = UInt32(self.natType.rawValue)
|
||
stunRequest.sessionToken = sessionToken
|
||
|
||
if let v6Info = probeReply?.v6Info {
|
||
stunRequest.v6Info = v6Info
|
||
}
|
||
|
||
if let stunData = try? stunRequest.serializedData() {
|
||
self.sendSuperPacket(type: .stunRequest, data: stunData)
|
||
}
|
||
}
|
||
|
||
// 开始读取数据, 用单独的线程处理packetFlow
|
||
private func startReader() {
|
||
// 停止之前的任务
|
||
self.readTask?.cancel()
|
||
|
||
// 开启新的任务
|
||
self.readTask = Task.detached(priority: .high) {
|
||
while true {
|
||
if Task.isCancelled {
|
||
return
|
||
}
|
||
|
||
let (packets, numbers) = await self.provider.packetFlow.readPackets()
|
||
for (data, number) in zip(packets, numbers) where number == 2 {
|
||
if let ipPacket = IPPacket(data) {
|
||
await self.dealTunPacket(packet: ipPacket)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// MARK: 网络改变时需要重新配置网络信息
|
||
private func setNetworkSettings(config: SDLConfiguration, dnsServer: String) async throws {
|
||
let networkAddress = config.networkAddress
|
||
|
||
// 配置路由规则
|
||
var routes: [NEIPv4Route] = [
|
||
NEIPv4Route(destinationAddress: networkAddress.netAddress, subnetMask: networkAddress.maskAddress),
|
||
NEIPv4Route(destinationAddress: dnsServer, subnetMask: "255.255.255.255"),
|
||
]
|
||
|
||
// 如果存在出口节点配置,则接管系统默认留有
|
||
if config.exitNode != nil {
|
||
routes.append(.default())
|
||
}
|
||
|
||
// Add code here to start the process of connecting the tunnel.
|
||
let networkSettings = NEPacketTunnelNetworkSettings(tunnelRemoteAddress: "8.8.8.8")
|
||
networkSettings.mtu = 1250
|
||
|
||
// 设置网卡的DNS解析
|
||
let networkDomain = networkAddress.networkDomain
|
||
let dnsSettings = NEDNSSettings(servers: [dnsServer])
|
||
|
||
dnsSettings.searchDomains = [networkDomain]
|
||
dnsSettings.matchDomains = [networkDomain, ""]
|
||
// 设置为 false 允许系统在补全 Search Domain 时也能匹配到此设置
|
||
dnsSettings.matchDomainsNoSearch = false
|
||
|
||
networkSettings.dnsSettings = dnsSettings
|
||
|
||
let ipv4Settings = NEIPv4Settings(addresses: [networkAddress.ipAddress], subnetMasks: [networkAddress.maskAddress])
|
||
// 设置路由表
|
||
ipv4Settings.includedRoutes = routes
|
||
|
||
// 配置要排除的路由
|
||
ipv4Settings.excludedRoutes = self.getIpv4ExcludeRoutes()
|
||
|
||
networkSettings.ipv4Settings = ipv4Settings
|
||
// 网卡配置设置必须成功
|
||
try await self.provider.setTunnelNetworkSettings(networkSettings)
|
||
}
|
||
|
||
// 探测当前网络的类型
|
||
private func probeNatType() async {
|
||
guard let udpHole = self.udpHole else {
|
||
return
|
||
}
|
||
// 开始探测nat的类型
|
||
self.natType = await self.proberActor.probeNatType(using: udpHole)
|
||
SDLLogger.log("[SDLContext] nat_type is: \(natType)")
|
||
}
|
||
|
||
// 发送给super/stun节点的数据
|
||
private func sendSuperPacket(type: SDLPacketType, data: Data) {
|
||
self.sendPacket(type: type, data: data, remoteAddress: self.config.stunSocketAddress)
|
||
}
|
||
|
||
// 发送给peer的数据
|
||
private func sendPeerPacket(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) {
|
||
self.sendPacket(type: type, data: data, remoteAddress: remoteAddress)
|
||
}
|
||
|
||
private func sendPacket(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) {
|
||
switch remoteAddress {
|
||
case .v4:
|
||
guard let udpHole = self.udpHole else {
|
||
SDLLogger.log("[SDLContext] udpHole is nil for remoteAddress: \(remoteAddress)", for: .debug)
|
||
return
|
||
}
|
||
udpHole.send(type: type, data: data, remoteAddress: remoteAddress)
|
||
case .v6:
|
||
guard let udpHoleV6 = self.udpHoleV6 else {
|
||
SDLLogger.log("[SDLContext] udpHoleV6 is nil for remoteAddress: \(remoteAddress)", for: .debug)
|
||
return
|
||
}
|
||
udpHoleV6.send(type: type, data: data, remoteAddress: remoteAddress)
|
||
default:
|
||
SDLLogger.log("[SDLContext] unsupported socket family: \(remoteAddress)", for: .debug)
|
||
}
|
||
}
|
||
|
||
private func spawnLoop(_ body: @escaping () async throws -> Void) -> Task<Void, Never> {
|
||
return Task.detached {
|
||
while !Task.isCancelled {
|
||
do {
|
||
try await body()
|
||
} catch is CancellationError {
|
||
break
|
||
} catch {
|
||
try? await Task.sleep(nanoseconds: 2_000_000_000)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
private func getIpv4ExcludeRoutes() -> [NEIPv4Route] {
|
||
// 要排除的路由表
|
||
let dnsServers = SDLUtil.getMacOSSystemDnsServers()
|
||
var ipv4DnsServers = dnsServers.filter {!$0.contains(":")}
|
||
|
||
// 增加常见的dns服务
|
||
let commonDnsServers = [
|
||
"8.8.8.8",
|
||
"8.8.4.4",
|
||
"223.5.5.5",
|
||
"223.6.6.6",
|
||
"114.114.114.114"
|
||
]
|
||
for ip in commonDnsServers {
|
||
if !ipv4DnsServers.contains(ip) {
|
||
ipv4DnsServers.append(ip)
|
||
}
|
||
}
|
||
|
||
return ipv4DnsServers.map { NEIPv4Route(destinationAddress: $0, subnetMask: "255.255.255.255") }
|
||
}
|
||
|
||
private func markReady() {
|
||
guard case .starting = self.readyState else {
|
||
return
|
||
}
|
||
|
||
self.readyState = .ready
|
||
self.resumeReadyWaiters(.success(()))
|
||
}
|
||
|
||
private func failReady(_ error: any Error) {
|
||
switch self.readyState {
|
||
case .ready, .failed, .stopped:
|
||
return
|
||
case .idle, .starting:
|
||
self.readyState = .failed(error)
|
||
self.resumeReadyWaiters(.failure(error))
|
||
}
|
||
}
|
||
|
||
private func resumeReadyWaiters(_ result: Result<Void, any Error>) {
|
||
let waiters = self.readyWaiters
|
||
self.readyWaiters.removeAll()
|
||
|
||
waiters.forEach { continuation in
|
||
switch result {
|
||
case .success:
|
||
continuation.resume()
|
||
case .failure(let error):
|
||
continuation.resume(throwing: error)
|
||
}
|
||
}
|
||
}
|
||
|
||
deinit {
|
||
self.udpHole = nil
|
||
self.udpHoleLocalAddress = nil
|
||
self.udpHoleV6 = nil
|
||
self.udpHoleV6LocalAddress = nil
|
||
self.dnsClient = nil
|
||
}
|
||
}
|
||
|
||
// 处理和Super之间的通讯
|
||
extension SDLContextActor {
|
||
|
||
private func makeSuperEventProcessor() -> SDLSuperEventProcessor {
|
||
return .init(networkAddress: self.config.networkAddress)
|
||
}
|
||
|
||
// 开启注册任务
|
||
private func startRegisterLoop() {
|
||
guard self.registerTask == nil,
|
||
self.superRegistrationStateMachine.beginLoop() else {
|
||
return
|
||
}
|
||
|
||
self.registerTask = Task {
|
||
defer {
|
||
self.registerTask = nil
|
||
}
|
||
|
||
while !Task.isCancelled {
|
||
switch self.superRegistrationStateMachine.makeLoopAction() {
|
||
case .sendRegister:
|
||
self.doRegisterSuper()
|
||
case .stop:
|
||
return
|
||
}
|
||
try? await Task.sleep(for: .seconds(5))
|
||
switch self.superRegistrationStateMachine.makeWaitDecision() {
|
||
case .registered:
|
||
await self.whenRegistedSuper()
|
||
return
|
||
case .retry:
|
||
SDLLogger.log("[SDLContext] register super failed, retry")
|
||
case .stop:
|
||
return
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
private func handleRegisterSuperAck(registerSuperAck: SDLRegisterSuperAck) async {
|
||
// 需要对数据通过rsa的私钥解码
|
||
guard let key = try? self.rsaCipher.decode(data: Data(registerSuperAck.key)) else {
|
||
self.superRegistrationStateMachine.handleFailure()
|
||
SDLLogger.log("[SDLContext] registerSuperAck invalid key")
|
||
let error = SDLError.invalidKey
|
||
self.failReady(error)
|
||
self.provider.cancelTunnelWithError(error)
|
||
return
|
||
}
|
||
|
||
let algorithm = registerSuperAck.algorithm.lowercased()
|
||
let regionId = registerSuperAck.regionID
|
||
self.sessionToken = registerSuperAck.sessionToken
|
||
|
||
switch algorithm {
|
||
case "aes":
|
||
self.dataCipher = CCAESChiper(key: key)
|
||
case "chacha20":
|
||
self.dataCipher = CCChaCha20Cipher(regionId: regionId, keyData: key)
|
||
default:
|
||
self.superRegistrationStateMachine.handleFailure()
|
||
SDLLogger.log("[SDLContext] registerSuperAck invalid algorithm \(algorithm)")
|
||
let error = SDLError.unsupportedAlgorithm(algorithm: algorithm)
|
||
self.failReady(error)
|
||
self.provider.cancelTunnelWithError(error)
|
||
return
|
||
}
|
||
|
||
SDLLogger.log("[SDLContext] registerSuperAck, use algorithm \(algorithm), key len: \(key.count)")
|
||
// 服务器分配的tun网卡信息
|
||
do {
|
||
try await self.setNetworkSettings(config: self.config, dnsServer: DNSHelper.dnsServer)
|
||
SDLLogger.log("[SDLContext] setNetworkSettings successed")
|
||
self.superRegistrationStateMachine.handleRegisterSuperAck()
|
||
self.startReader()
|
||
self.markReady()
|
||
} catch let err {
|
||
self.superRegistrationStateMachine.handleFailure()
|
||
SDLLogger.log("[SDLContext] setTunnelNetworkSettings get error: \(err)")
|
||
self.failReady(err)
|
||
self.provider.cancelTunnelWithError(err)
|
||
}
|
||
}
|
||
|
||
private func handleRegisterSuperNak(nakPacket: SDLRegisterSuperNak) {
|
||
let errorMessage = nakPacket.errorMessage
|
||
guard let errorCode = SDLNAKErrorCode(rawValue: UInt8(nakPacket.errorCode)) else {
|
||
return
|
||
}
|
||
|
||
switch errorCode {
|
||
case .invalidToken, .nodeDisabled:
|
||
self.superRegistrationStateMachine.handleFailure()
|
||
self.publishTunnelEvent(code: Int(errorCode.rawValue), message: errorMessage)
|
||
// 报告错误并退出
|
||
let error = NSError(domain: "com.jihe.punchnet.tun", code: -1)
|
||
self.failReady(error)
|
||
self.provider.cancelTunnelWithError(error)
|
||
|
||
case .noIpAddress, .networkFault, .internalFault:
|
||
self.superRegistrationStateMachine.handleRetryableNak()
|
||
self.publishTunnelEvent(code: Int(errorCode.rawValue), message: errorMessage)
|
||
}
|
||
SDLLogger.log("[SDLContext] Get a SuperNak message exit")
|
||
|
||
}
|
||
|
||
private func handleEvent(event: SDLEvent) async {
|
||
let processor = self.makeSuperEventProcessor()
|
||
let plan = await processor.makeProcessingPlan(event: event)
|
||
if let logMessage = plan.logMessage {
|
||
SDLLogger.log(logMessage)
|
||
}
|
||
|
||
switch plan.action {
|
||
case .removeSession(let dstMac):
|
||
await self.sessionManager.removeSession(dstMac: dstMac)
|
||
case .sendRegister(let registerData, let remoteAddresses):
|
||
remoteAddresses.forEach { remoteAddress in
|
||
self.sendPeerPacket(type: .register, data: registerData, remoteAddress: remoteAddress)
|
||
}
|
||
case .shutdown(let message):
|
||
self.publishTunnelEvent(message: message)
|
||
|
||
// 报告错误并退出
|
||
let error = NSError(domain: "com.jihe.punchnet.tun", code: -2)
|
||
self.failReady(error)
|
||
self.provider.cancelTunnelWithError(error)
|
||
case .none:
|
||
()
|
||
}
|
||
}
|
||
|
||
private func doRegisterSuper() {
|
||
// 注册
|
||
var registerSuper = SDLRegisterSuper()
|
||
registerSuper.clientID = self.config.clientId
|
||
registerSuper.networkID = self.config.networkAddress.networkId
|
||
registerSuper.mac = self.config.networkAddress.mac
|
||
registerSuper.ip = self.config.networkAddress.ip
|
||
registerSuper.maskLen = UInt32(self.config.networkAddress.maskLen)
|
||
registerSuper.hostname = self.config.hostname
|
||
registerSuper.pubKey = self.rsaCipher.pubKey
|
||
registerSuper.accessToken = self.config.accessToken
|
||
|
||
if let registerSuperData = try? registerSuper.serializedData() {
|
||
SDLLogger.log("[SDLContext] will send register super")
|
||
self.quicClient?.send(type: .registerSuper, data: registerSuperData)
|
||
}
|
||
}
|
||
|
||
}
|
||
|
||
// 处理从Hole收到的数据
|
||
extension SDLContextActor {
|
||
|
||
private func consumeUDPHoleMessages(stream: AsyncStream<(SocketAddress, SDLHoleMessage)>, localAddress: SocketAddress, source: UDPHoleKind) async {
|
||
for await (remoteAddress, message) in stream {
|
||
if Task.isCancelled {
|
||
break
|
||
}
|
||
|
||
switch message.inboundMessage {
|
||
case .control(let controlMessage):
|
||
await self.handleHoleControlMessage(controlMessage, localAddress: localAddress, remoteAddress: remoteAddress, source: source)
|
||
case .data(let data):
|
||
try? await self.handleHoleData(data: data)
|
||
}
|
||
}
|
||
}
|
||
|
||
private func handleHoleControlMessage(_ message: SDLHoleControlMessage, localAddress: SocketAddress, remoteAddress: SocketAddress, source: UDPHoleKind) async {
|
||
switch message {
|
||
case .stunReply(_):
|
||
guard source == .v4 else {
|
||
return
|
||
}
|
||
SDLLogger.log("[SDLContext] get a stunReply", for: .debug)
|
||
case .stunProbeReply(let probeReply):
|
||
guard source == .v4 else {
|
||
return
|
||
}
|
||
await self.proberActor.handleProbeReply(localAddress: localAddress, reply: probeReply)
|
||
case .register(let register):
|
||
try? await self.handleRegister(remoteAddress: remoteAddress, register: register, source: source)
|
||
case .registerAck(let registerAck):
|
||
await self.handleRegisterAck(remoteAddress: remoteAddress, registerAck: registerAck, source: source)
|
||
}
|
||
}
|
||
|
||
private func handleRegister(remoteAddress: SocketAddress, register: SDLRegister, source: UDPHoleKind) async throws {
|
||
let networkAddr = config.networkAddress
|
||
SDLLogger.log("[SDLContext] register packet: \(register), network_address: \(networkAddr)")
|
||
|
||
// 判断目标地址是否是tun的网卡地址, 并且是在同一个网络下
|
||
if register.dstMac == networkAddr.mac && register.networkID == networkAddr.networkId {
|
||
// 回复ack包
|
||
var registerAck = SDLRegisterAck()
|
||
registerAck.networkID = networkAddr.networkId
|
||
registerAck.srcMac = networkAddr.mac
|
||
registerAck.dstMac = register.srcMac
|
||
|
||
self.sendPeerPacket(type: .registerAck, data: try registerAck.serializedData(), remoteAddress: remoteAddress)
|
||
// 这里需要建立到来源的会话, 在复杂网络下,通过super-node查询到的nat地址不一定靠谱,需要通过udp包的来源地址作为nat地址
|
||
if let session = Session(dstMac: register.srcMac, natAddress: remoteAddress, addressType: source.convertAddressType()) {
|
||
await self.sessionManager.addSession(session: session)
|
||
} else {
|
||
SDLLogger.log("[SDLContext] didReadRegister get unsupported remoteAddress: \(remoteAddress)", for: .debug)
|
||
}
|
||
} else {
|
||
SDLLogger.log("[SDLContext] didReadRegister get a invalid packet, because dst_ip not matched: \(register.dstMac)")
|
||
}
|
||
}
|
||
|
||
private func handleRegisterAck(remoteAddress: SocketAddress, registerAck: SDLRegisterAck, source: UDPHoleKind) async {
|
||
// 判断目标地址是否是tun的网卡地址, 并且是在同一个网络下
|
||
let networkAddr = config.networkAddress
|
||
if registerAck.dstMac == networkAddr.mac && registerAck.networkID == networkAddr.networkId {
|
||
if let session = Session(dstMac: registerAck.srcMac, natAddress: remoteAddress, addressType: source.convertAddressType()) {
|
||
await self.sessionManager.addSession(session: session)
|
||
} else {
|
||
SDLLogger.log("[SDLContext] didReadRegisterAck get unsupported remoteAddress: \(remoteAddress)", for: .debug)
|
||
}
|
||
} else {
|
||
SDLLogger.log("[SDLContext] didReadRegisterAck get a invalid packet, because dst_mac not matched: \(registerAck.dstMac)")
|
||
}
|
||
}
|
||
|
||
private func makeHoleDataProcessor() -> SDLHoleDataProcessor {
|
||
return .init(
|
||
networkAddress: self.config.networkAddress,
|
||
dataCipher: self.dataCipher,
|
||
snapshotPublisher: self.snapshotPublisher,
|
||
flowSessionManager: self.flowSessionManager
|
||
)
|
||
}
|
||
|
||
private func handleHoleData(data: SDLData) async throws {
|
||
let processor = self.makeHoleDataProcessor()
|
||
guard let plan = try processor.makeProcessingPlan(data: data) else {
|
||
return
|
||
}
|
||
|
||
self.flowTracer.inc(num: plan.inboundBytes, type: .inbound)
|
||
|
||
switch plan.action {
|
||
case .sendARPReply(let dstMac, let responseData):
|
||
SDLLogger.log("[SDLContext] get arp request packet")
|
||
await self.routeLayerPacket(dstMac: dstMac, type: .arp, data: responseData)
|
||
case .appendARP(let ip, let mac):
|
||
SDLLogger.log("[SDLContext] get arp response packet")
|
||
await self.arpServer.append(ip: ip, mac: mac)
|
||
case .writeToTun(let packetData, let identityID):
|
||
let packet = NEPacket(data: packetData, protocolFamily: 2)
|
||
self.provider.packetFlow.writePacketObjects([packet])
|
||
SDLLogger.log("[SDLContext] hole identity: \(identityID), allow, data count: \(packetData.count)", for: .trace)
|
||
case .requestPolicy(let srcIdentityID):
|
||
SDLLogger.log("[SDLContext] not found identity: \(srcIdentityID) ruleMap", for: .debug)
|
||
// 向服务器请求权限逻辑
|
||
await self.identifyStore.policyRequest(srcIdentityId: srcIdentityID, dstIdentityId: self.config.identityId, using: self.quicClient)
|
||
case .none:
|
||
()
|
||
}
|
||
}
|
||
|
||
}
|
||
|
||
private extension UInt32 {
|
||
// 转换成ip地址
|
||
func asIpAddress() -> String {
|
||
return SDLUtil.int32ToIp(self)
|
||
}
|
||
}
|
||
|
||
|
||
// 处理从Tun收到的数据
|
||
extension SDLContextActor {
|
||
|
||
// 处理读取的每个数据包, Tun收到的包的一级路由
|
||
private func dealTunPacket(packet: IPPacket) async {
|
||
let router = SDLTunPacketRouter(networkAddress: self.config.networkAddress, exitNode: self.config.exitNode)
|
||
|
||
let decision = router.route(packet: packet)
|
||
|
||
// 外部出去的数据,需要建立FlowSession
|
||
// 外部数据进来的时候需要查找
|
||
if decision.shouldTrackFlow, let flowSession = packet.flowSession() {
|
||
self.flowSessionManager.updateSession(flowSession)
|
||
//SDLLogger.shared.log("[SDLContext] flow_session: \(flowSession)", level: .debug)
|
||
}
|
||
|
||
await self.handleTunRouteDecision(decision)
|
||
}
|
||
|
||
private func handleTunRouteDecision(_ decision: SDLTunPacketRouter.RouteDecision) async {
|
||
switch decision {
|
||
case .loopback(let ipPacketData):
|
||
let nePacket = NEPacket(data: ipPacketData, protocolFamily: 2)
|
||
self.provider.packetFlow.writePacketObjects([nePacket])
|
||
case .cloudDNS(let name, let ipPacketData):
|
||
//SDLLogger.log("[SDLContext] get cloud dns request: \(name)")
|
||
await self.dnsClient?.forward(ipPacketData: ipPacketData)
|
||
case .localDNS(let name, let payload, let tracker):
|
||
//SDLLogger.log("[SDLContext] get local dns request: \(name)")
|
||
await self.dnsLocalClient?.query(tracker: tracker, dnsPayload: payload)
|
||
case .forwardToNextHop(let ip, let type, let data, let kind):
|
||
await self.forwardPacketToNextHop(ip: ip, type: type, data: data, kind: kind)
|
||
case .drop(let reason):
|
||
SDLLogger.log("[SDLContext] drop tun packet, reason: \(reason.rawValue)", for: .trace)
|
||
}
|
||
}
|
||
|
||
private func forwardPacketToNextHop(ip: UInt32, type: LayerPacket.PacketType, data: Data, kind: SDLTunPacketRouter.ForwardKind) async {
|
||
switch kind {
|
||
case .sameNetwork:
|
||
SDLLogger.log("[SDLContext] dstIp: \(ip.asIpAddress()) same network", for: .trace)
|
||
case .exitNode, .dnsExitNode:
|
||
SDLLogger.log("[SDLContext] use exit_node: \(ip.asIpAddress())", for: .trace)
|
||
}
|
||
|
||
// 查找arp缓存中是否有目标mac地址
|
||
if let dstMac = await self.arpServer.query(ip: ip) {
|
||
SDLLogger.log("[SDLContext] dstIp: \(ip.asIpAddress()), dst_mac is: \(SDLUtil.formatMacAddress(mac: dstMac))", for: .trace)
|
||
await self.routeLayerPacket(dstMac: dstMac, type: type, data: data)
|
||
}
|
||
else {
|
||
SDLLogger.log("[SDLContext] dstIp: \(ip.asIpAddress()) arp query not found, broadcast", for: .trace)
|
||
// // 构造arp广播
|
||
// let arpReqeust = ARPPacket.arpRequest(senderIP: networkAddr.ip, senderMAC: networkAddr.mac, targetIP: dstIp)
|
||
// await self.routeLayerPacket(dstMac: ARPPacket.broadcastMac , type: .arp, data: arpReqeust.marshal())
|
||
|
||
try? await self.arpServer.arpRequest(targetIp: ip, use: self.quicClient)
|
||
}
|
||
}
|
||
|
||
private func makeLayerPacketForwarder() -> SDLLayerPacketForwarder {
|
||
return .init(
|
||
networkAddress: self.config.networkAddress,
|
||
identityID: self.config.identityId,
|
||
dataCipher: self.dataCipher,
|
||
sessionManager: self.sessionManager
|
||
)
|
||
}
|
||
|
||
private func routeLayerPacket(dstMac: Data, type: LayerPacket.PacketType, data: Data) async {
|
||
// 将数据封装层2层的数据包
|
||
// 构造数据包
|
||
let forwarder = self.makeLayerPacketForwarder()
|
||
guard let plan = try? await forwarder.makeDeliveryPlan(dstMac: dstMac, type: type, data: data) else {
|
||
return
|
||
}
|
||
|
||
// 广播地址不要去尝试打洞
|
||
switch plan {
|
||
case .superNode(let payload):
|
||
// 通过super_node进行转发
|
||
self.sendSuperPacket(type: .data, data: payload)
|
||
case .peer(let payload, let session):
|
||
// 通过session发送到对端
|
||
SDLLogger.log("[SDLContext] step 5 send packet by session: \(session)", for: .trace)
|
||
self.sendPeerPacket(type: .data, data: payload, remoteAddress: session.natAddress)
|
||
self.flowTracer.inc(num: payload.count, type: .p2p)
|
||
case .superNodeAndPunch(let payload, let request):
|
||
// 通过super_node进行转发
|
||
self.sendSuperPacket(type: .data, data: payload)
|
||
SDLLogger.log("[SDLContext] step 5 send packet by super: \(self.config.stunSocketAddress)", for: .trace)
|
||
// 流量统计
|
||
self.flowTracer.inc(num: payload.count, type: .forward)
|
||
|
||
// 尝试打洞
|
||
await self.puncherActor.submitRegisterRequest(quicClient: self.quicClient, request: request)
|
||
}
|
||
}
|
||
|
||
}
|