punchnet-macos/Tun/Punchnet/Actors/SDLContextActor.swift
2026-04-16 20:17:49 +08:00

1057 lines
40 KiB
Swift
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//
// SDLContext.swift
// Tun
//
// Created by on 2024/2/29.
//
import Foundation
import NetworkExtension
import NIOCore
//
/*
1. rsa
*/
actor SDLContextActor {
enum ReadyState {
case idle
case starting
case ready
case failed(any Error)
case stopped
}
private enum UDPHoleKind: Equatable {
case v4
case v6
func convertAddressType() -> Session.AddressType {
switch self {
case .v4:
return .v4
case .v6:
return .v6
}
}
}
private var readyState: ReadyState = .idle
private var readyWaiters: [CheckedContinuation<Void, Error>] = []
var config: SDLConfiguration
// nat
var natType: SDLNATProberActor.NatType = .blocked
// AES
private var dataCipher: CCDataCipher?
// session token
private var sessionToken: Data?
// rsa, public_key
//
nonisolated let rsaCipher: RSACipher
//
private var udpHole: SDLUDPHole?
private var udpHoleWorkers: [Task<Void, Never>]?
private var udpHoleLocalAddress: SocketAddress?
private var udpHoleV6: SDLUDPHoleV6?
private var udpHoleV6Workers: [Task<Void, Never>]?
private var udpHoleV6LocalAddress: SocketAddress?
// dnsclient
private var dnsClient: DNSCloudClient?
private var dnsWorker: Task<Void, Never>?
// Localdnsclient
private var dnsLocalClient: DNSLocalClient?
private var dnsLocalWorker: Task<Void, Never>?
private var quicClient: SDLQUICClient?
private var quicWorker: Task<Void, Never>?
nonisolated private let puncherActor: SDLPuncherActor
//
nonisolated private let proberActor: SDLNATProberActor
// ipv6
private var ipv6AssistClient: SDLIPV6AssistClient?
//
private var readTask: Task<(), Never>?
private let sessionManager = SessionManager()
nonisolated private let arpServer: ArpServer
//
private var monitor: SDLNetworkMonitor?
private var monitorWorker: Task<Void, Never>?
// socket
// App Group + Darwin Notification
//
nonisolated private let flowTracer = SDLFlowTracer()
//
private var supervisor = SDLSupervisor()
nonisolated private let provider: NEPacketTunnelProvider
//
private let identifyStore: IdentityStore
private var updatePolicyTask: Task<Void, Never>?
private let snapshotPublisher: SnapshotPublisher<IdentitySnapshot>
// Flow : 180
private let flowSessionManager = SDLFlowSessionManager(sessionTimeout: 180)
//
private var registerTask: Task<Void, Never>?
// stunRequest
private var stunRequestTask: Task<Void, Never>?
private let superRegistrationStateMachine = SDLSuperRegistrationStateMachine()
public init(provider: NEPacketTunnelProvider, config: SDLConfiguration, rsaCipher: RSACipher) {
self.provider = provider
self.config = config
self.rsaCipher = rsaCipher
self.puncherActor = SDLPuncherActor()
self.proberActor = SDLNATProberActor(addressArray: config.stunProbeSocketAddressArray)
self.arpServer = ArpServer()
//
let snapshotPublisher = SnapshotPublisher(initial: IdentitySnapshot.empty())
self.identifyStore = IdentityStore(publisher: snapshotPublisher)
self.snapshotPublisher = snapshotPublisher
}
public func start() async {
guard case .idle = self.readyState else {
return
}
self.readyState = .starting
self.prepareTunnelNotifier()
self.startMonitor()
// arp
await self.puncherActor.start()
await self.arpServer.start()
await self.startDnsClient()
await self.startDnsLocalClient()
await self.supervisor.addWorker(name: "quicClient") {
SDLLogger.log("[SDLContext] try start quicClient", for: .debug)
let quicClient = try await self.startQUICClient()
SDLLogger.log("[SDLContext] quicClient running!!!!")
await quicClient.waitClose()
SDLLogger.log("[SDLContext] quicClient closed!!!!")
}
await self.supervisor.addWorker(name: "udpHole") {
let udpHole = try await self.startUDPHole()
SDLLogger.log("[SDLContext] udp running!!!!")
try await udpHole.waitClose()
SDLLogger.log("[SDLContext] udp closed!!!!")
}
await self.supervisor.addWorker(name: "udpHoleV6") {
let udpHoleV6 = try await self.startUDPHoleV6()
SDLLogger.log("[SDLContext] udp v6 running!!!!")
try await udpHoleV6.waitClose()
SDLLogger.log("[SDLContext] udp v6 closed!!!!")
}
}
public func waitForReady() async throws {
switch self.readyState {
case .ready:
return
case .failed(let error):
throw error
case .stopped:
throw CancellationError()
case .idle, .starting:
try await withCheckedThrowingContinuation { continuation in
self.readyWaiters.append(continuation)
}
}
}
// ip: 0.0.0.0
public func updateExitNode(exitNodeIp: String) async throws {
if let ip = SDLUtil.ipv4StrToInt32(exitNodeIp), ip > 0 {
self.config.exitNode = .init(exitNodeIp: ip)
} else {
self.config.exitNode = nil
}
try await self.setNetworkSettings(config: config, dnsServer: DNSHelper.dnsServer)
}
private func startQUICClient() async throws -> SDLQUICClient {
self.quicWorker?.cancel()
self.quicClient?.stop()
// monitor
let quicClient = SDLQUICClient(host: self.config.serverHost, port: 443)
quicClient.start()
// quic
try await quicClient.waitReady()
// quic
try await Task.sleep(for: .seconds(0.2))
SDLLogger.log("[SDLContext] start quic client: \(self.config.serverHost)")
self.quicWorker = Task.detached {
for await message in quicClient.messageStream {
switch message {
case .welcome(let welcome):
SDLLogger.log("[SDLContext] quic welcome: \(welcome)")
//
await self.startRegisterLoop()
// stun
await self.startStunRequestTask(welcome: welcome)
case .pong:
//SDLLogger.shared.log("[SDLContext] quic pong")
()
case .registerSuperAck(let registerSuperAck):
await self.handleRegisterSuperAck(registerSuperAck: registerSuperAck)
case .registerSuperNak(let registerSuperNak):
await self.handleRegisterSuperNak(nakPacket: registerSuperNak)
case .peerInfo(let peerInfo):
//SDLLogger.shared.log("[SDLContext] peer message: \(peerInfo)")
await self.puncherActor.handlePeerInfo(using: self.udpHole, udpHoleV6: self.udpHoleV6, peerInfo: peerInfo)
case .event(let event):
await self.handleEvent(event: event)
case .policyReponse(let policyResponse):
//
await self.identifyStore.applyPolicyResponse(policyResponse)
case .arpResponse(let arpResponse):
//SDLLogger.shared.log("[SDLContext] get arp response: \(arpResponse)")
await self.arpServer.handleArpResponse(arpResponse: arpResponse)
}
}
}
self.quicClient = quicClient
return quicClient
}
private func prepareTunnelNotifier() {
// noticeClient
// UDP NoticeClient App Group
SDLTunnelAppNotifier.shared.clear()
SDLLogger.log("[SDLContext] tunnelAppNotifier ready")
}
private func startMonitor() {
self.monitorWorker?.cancel()
self.monitorWorker = nil
// monitor
let monitor = SDLNetworkMonitor()
monitor.start()
SDLLogger.log("[SDLContext] monitor started")
self.monitor = monitor
self.monitorWorker = Task.detached {
for await event in monitor.eventStream {
switch event {
case .changed:
// nat
await self.probeNatType()
SDLLogger.log("didNetworkPathChanged, nat type is:")
case .unreachable:
SDLLogger.log("didNetworkPathUnreachable")
}
}
}
}
private func startDnsClient() async {
self.dnsWorker?.cancel()
self.dnsWorker = nil
// dns
let dnsClient = DNSCloudClient(host: self.config.serverIp, port: 15353)
await dnsClient.start()
SDLLogger.log("[SDLContext] dnsClient started")
self.dnsClient = dnsClient
let packetFlow = dnsClient.packetFlow
self.dnsWorker = Task.detached {
//
for await packet in packetFlow {
if Task.isCancelled {
break
}
let nePacket = NEPacket(data: packet, protocolFamily: 2)
self.provider.packetFlow.writePacketObjects([nePacket])
}
}
}
private func startDnsLocalClient() async {
self.dnsLocalWorker?.cancel()
self.dnsLocalWorker = nil
// dns
let dnsLocalClient = DNSLocalClient()
await dnsLocalClient.start()
SDLLogger.log("[SDLContext] dnsClient started")
self.dnsLocalClient = dnsLocalClient
let packetFlow = dnsLocalClient.packetFlow
self.dnsLocalWorker = Task.detached {
//
for await packet in packetFlow {
if Task.isCancelled {
break
}
// Ip
let nePacket = NEPacket(data: packet, protocolFamily: 2)
self.provider.packetFlow.writePacketObjects([nePacket])
}
}
}
private func startUDPHole() async throws -> SDLUDPHole {
self.udpHoleWorkers?.forEach {$0.cancel()}
self.udpHoleWorkers = nil
// udp
let udpHole = try SDLUDPHole()
let localAddress = try udpHole.start()
SDLLogger.log("[SDLContext] udpHole started, on address: \(localAddress)")
//
let messageStream = udpHole.messageStream
let messageTask = Task.detached {
await self.consumeUDPHoleMessages(stream: messageStream, localAddress: localAddress, source: .v4)
}
self.udpHole = udpHole
self.udpHoleLocalAddress = localAddress
self.udpHoleWorkers = [messageTask]
// nat
await self.probeNatType()
return udpHole
}
private func startUDPHoleV6() async throws -> SDLUDPHoleV6 {
self.udpHoleV6Workers?.forEach {$0.cancel()}
self.udpHoleV6Workers = nil
// udp
let udpHoleV6 = try SDLUDPHoleV6()
let localAddress = try udpHoleV6.start()
SDLLogger.log("[SDLContext] udpHoleV6 started, on address: \(localAddress)")
//
let messageStream = udpHoleV6.messageStream
let messageTask = Task.detached {
await self.consumeUDPHoleMessages(stream: messageStream, localAddress: localAddress, source: .v6)
}
self.udpHoleV6 = udpHoleV6
self.udpHoleV6LocalAddress = localAddress
self.udpHoleV6Workers = [messageTask]
return udpHoleV6
}
// context
public func stop() async {
self.resumeReadyWaiters(.failure(CancellationError()))
self.readyState = .stopped
self.superRegistrationStateMachine.reset()
await self.supervisor.stop()
await self.puncherActor.stop()
self.udpHoleWorkers?.forEach { $0.cancel() }
self.udpHoleWorkers = nil
self.udpHole?.stop()
self.udpHole = nil
self.udpHoleLocalAddress = nil
self.udpHoleV6Workers?.forEach { $0.cancel() }
self.udpHoleV6Workers = nil
self.udpHoleV6?.stop()
self.udpHoleV6 = nil
self.udpHoleV6LocalAddress = nil
self.quicWorker?.cancel()
self.quicWorker = nil
self.quicClient?.stop()
self.quicClient = nil
await self.dnsClient?.stop()
self.dnsWorker?.cancel()
self.dnsWorker = nil
self.dnsClient = nil
await self.dnsLocalClient?.stop()
self.dnsLocalWorker?.cancel()
self.dnsLocalWorker = nil
self.dnsLocalClient = nil
self.monitor?.stop()
self.monitorWorker?.cancel()
self.monitorWorker = nil
self.monitor = nil
self.readTask?.cancel()
self.readTask = nil
self.registerTask?.cancel()
self.registerTask = nil
self.updatePolicyTask?.cancel()
self.updatePolicyTask = nil
self.sessionToken = nil
self.dataCipher = nil
await self.ipv6AssistClient?.stop()
self.ipv6AssistClient = nil
}
private func publishTunnelEvent(code: Int? = nil, message: String) {
SDLTunnelAppNotifier.shared.publish(code: code, message: message)
}
private func setNatType(natType: SDLNATProberActor.NatType) {
self.natType = natType
}
// super
private func whenRegistedSuper() async {
self.updatePolicyTask?.cancel()
self.updatePolicyTask = Task {
while !Task.isCancelled {
try? await Task.sleep(for: .seconds(300))
SDLLogger.log("[SDLContext] updatePolicyTask execute")
await self.identifyStore.batUpdatePolicy(using: self.quicClient, dstIdentityID: self.config.identityId)
}
}
}
// MARK: -- StunRequestTask
private func startStunRequestTask(welcome: SDLWelcome) async {
await self.ipv6AssistClient?.stop()
self.ipv6AssistClient = SDLIPV6AssistClient(assistServerInfo: welcome.ipv6Assist)
// welcome使ipv6
//
self.stunRequestTask = Task.detached {
let timerStream = SDLAsyncTimerStream()
timerStream.start(interval: .seconds(8))
for await _ in timerStream.stream {
let probeReply = try? await self.ipv6AssistClient?.probe(requestTimeout: .seconds(3))
if let v6Info = probeReply.v6Info {
let v6Address = SDLUtil.ipv6DataToString(v6Info.v6)
SDLLogger.log("[SDLContext] probe ipv6 address: \(v6Address)")
}
await self.sendStunRequest(probeReply: probeReply)
}
SDLLogger.log("[SDLContext] udp stunRequestTask cancel")
}
}
private func sendStunRequest(probeReply: SDLV6AssistProbeReply?) {
guard let sessionToken = self.sessionToken else {
return
}
var stunRequest = SDLStunRequest()
stunRequest.clientID = self.config.clientId
stunRequest.networkID = self.config.networkAddress.networkId
stunRequest.ip = self.config.networkAddress.ip
stunRequest.mac = self.config.networkAddress.mac
stunRequest.natType = UInt32(self.natType.rawValue)
stunRequest.sessionToken = sessionToken
if let v6Info = probeReply?.v6Info {
stunRequest.v6Info = v6Info
}
if let stunData = try? stunRequest.serializedData() {
self.sendSuperPacket(type: .stunRequest, data: stunData)
}
}
// , 线packetFlow
private func startReader() {
//
self.readTask?.cancel()
//
self.readTask = Task.detached(priority: .high) {
while true {
if Task.isCancelled {
return
}
let (packets, numbers) = await self.provider.packetFlow.readPackets()
for (data, number) in zip(packets, numbers) where number == 2 {
if let ipPacket = IPPacket(data) {
await self.dealTunPacket(packet: ipPacket)
}
}
}
}
}
// MARK:
private func setNetworkSettings(config: SDLConfiguration, dnsServer: String) async throws {
let networkAddress = config.networkAddress
//
var routes: [NEIPv4Route] = [
NEIPv4Route(destinationAddress: networkAddress.netAddress, subnetMask: networkAddress.maskAddress),
NEIPv4Route(destinationAddress: dnsServer, subnetMask: "255.255.255.255"),
]
//
if config.exitNode != nil {
routes.append(.default())
}
// Add code here to start the process of connecting the tunnel.
let networkSettings = NEPacketTunnelNetworkSettings(tunnelRemoteAddress: "8.8.8.8")
networkSettings.mtu = 1250
// DNS
let networkDomain = networkAddress.networkDomain
let dnsSettings = NEDNSSettings(servers: [dnsServer])
dnsSettings.searchDomains = [networkDomain]
dnsSettings.matchDomains = [networkDomain, ""]
// false Search Domain
dnsSettings.matchDomainsNoSearch = false
networkSettings.dnsSettings = dnsSettings
let ipv4Settings = NEIPv4Settings(addresses: [networkAddress.ipAddress], subnetMasks: [networkAddress.maskAddress])
//
ipv4Settings.includedRoutes = routes
//
ipv4Settings.excludedRoutes = self.getIpv4ExcludeRoutes()
networkSettings.ipv4Settings = ipv4Settings
//
try await self.provider.setTunnelNetworkSettings(networkSettings)
}
//
private func probeNatType() async {
guard let udpHole = self.udpHole else {
return
}
// nat
self.natType = await self.proberActor.probeNatType(using: udpHole)
SDLLogger.log("[SDLContext] nat_type is: \(natType)")
}
// super/stun
private func sendSuperPacket(type: SDLPacketType, data: Data) {
self.sendPacket(type: type, data: data, remoteAddress: self.config.stunSocketAddress)
}
// peer
private func sendPeerPacket(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) {
self.sendPacket(type: type, data: data, remoteAddress: remoteAddress)
}
private func sendPacket(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) {
switch remoteAddress {
case .v4:
guard let udpHole = self.udpHole else {
SDLLogger.log("[SDLContext] udpHole is nil for remoteAddress: \(remoteAddress)", for: .debug)
return
}
udpHole.send(type: type, data: data, remoteAddress: remoteAddress)
case .v6:
guard let udpHoleV6 = self.udpHoleV6 else {
SDLLogger.log("[SDLContext] udpHoleV6 is nil for remoteAddress: \(remoteAddress)", for: .debug)
return
}
udpHoleV6.send(type: type, data: data, remoteAddress: remoteAddress)
default:
SDLLogger.log("[SDLContext] unsupported socket family: \(remoteAddress)", for: .debug)
}
}
private func spawnLoop(_ body: @escaping () async throws -> Void) -> Task<Void, Never> {
return Task.detached {
while !Task.isCancelled {
do {
try await body()
} catch is CancellationError {
break
} catch {
try? await Task.sleep(nanoseconds: 2_000_000_000)
}
}
}
}
private func getIpv4ExcludeRoutes() -> [NEIPv4Route] {
//
let dnsServers = SDLUtil.getMacOSSystemDnsServers()
var ipv4DnsServers = dnsServers.filter {!$0.contains(":")}
// dns
let commonDnsServers = [
"8.8.8.8",
"8.8.4.4",
"223.5.5.5",
"223.6.6.6",
"114.114.114.114"
]
for ip in commonDnsServers {
if !ipv4DnsServers.contains(ip) {
ipv4DnsServers.append(ip)
}
}
return ipv4DnsServers.map { NEIPv4Route(destinationAddress: $0, subnetMask: "255.255.255.255") }
}
private func markReady() {
guard case .starting = self.readyState else {
return
}
self.readyState = .ready
self.resumeReadyWaiters(.success(()))
}
private func failReady(_ error: any Error) {
switch self.readyState {
case .ready, .failed, .stopped:
return
case .idle, .starting:
self.readyState = .failed(error)
self.resumeReadyWaiters(.failure(error))
}
}
private func resumeReadyWaiters(_ result: Result<Void, any Error>) {
let waiters = self.readyWaiters
self.readyWaiters.removeAll()
waiters.forEach { continuation in
switch result {
case .success:
continuation.resume()
case .failure(let error):
continuation.resume(throwing: error)
}
}
}
deinit {
self.udpHole = nil
self.udpHoleLocalAddress = nil
self.udpHoleV6 = nil
self.udpHoleV6LocalAddress = nil
self.dnsClient = nil
}
}
// Super
extension SDLContextActor {
private func makeSuperEventProcessor() -> SDLSuperEventProcessor {
return .init(networkAddress: self.config.networkAddress)
}
//
private func startRegisterLoop() {
guard self.registerTask == nil,
self.superRegistrationStateMachine.beginLoop() else {
return
}
self.registerTask = Task {
defer {
self.registerTask = nil
}
while !Task.isCancelled {
switch self.superRegistrationStateMachine.makeLoopAction() {
case .sendRegister:
self.doRegisterSuper()
case .stop:
return
}
try? await Task.sleep(for: .seconds(5))
switch self.superRegistrationStateMachine.makeWaitDecision() {
case .registered:
await self.whenRegistedSuper()
return
case .retry:
SDLLogger.log("[SDLContext] register super failed, retry")
case .stop:
return
}
}
}
}
private func handleRegisterSuperAck(registerSuperAck: SDLRegisterSuperAck) async {
// rsa
guard let key = try? self.rsaCipher.decode(data: Data(registerSuperAck.key)) else {
self.superRegistrationStateMachine.handleFailure()
SDLLogger.log("[SDLContext] registerSuperAck invalid key")
let error = SDLError.invalidKey
self.failReady(error)
self.provider.cancelTunnelWithError(error)
return
}
let algorithm = registerSuperAck.algorithm.lowercased()
let regionId = registerSuperAck.regionID
self.sessionToken = registerSuperAck.sessionToken
switch algorithm {
case "aes":
self.dataCipher = CCAESChiper(key: key)
case "chacha20":
self.dataCipher = CCChaCha20Cipher(regionId: regionId, keyData: key)
default:
self.superRegistrationStateMachine.handleFailure()
SDLLogger.log("[SDLContext] registerSuperAck invalid algorithm \(algorithm)")
let error = SDLError.unsupportedAlgorithm(algorithm: algorithm)
self.failReady(error)
self.provider.cancelTunnelWithError(error)
return
}
SDLLogger.log("[SDLContext] registerSuperAck, use algorithm \(algorithm), key len: \(key.count)")
// tun
do {
try await self.setNetworkSettings(config: self.config, dnsServer: DNSHelper.dnsServer)
SDLLogger.log("[SDLContext] setNetworkSettings successed")
self.superRegistrationStateMachine.handleRegisterSuperAck()
self.startReader()
self.markReady()
} catch let err {
self.superRegistrationStateMachine.handleFailure()
SDLLogger.log("[SDLContext] setTunnelNetworkSettings get error: \(err)")
self.failReady(err)
self.provider.cancelTunnelWithError(err)
}
}
private func handleRegisterSuperNak(nakPacket: SDLRegisterSuperNak) {
let errorMessage = nakPacket.errorMessage
guard let errorCode = SDLNAKErrorCode(rawValue: UInt8(nakPacket.errorCode)) else {
return
}
switch errorCode {
case .invalidToken, .nodeDisabled:
self.superRegistrationStateMachine.handleFailure()
self.publishTunnelEvent(code: Int(errorCode.rawValue), message: errorMessage)
// 退
let error = NSError(domain: "com.jihe.punchnet.tun", code: -1)
self.failReady(error)
self.provider.cancelTunnelWithError(error)
case .noIpAddress, .networkFault, .internalFault:
self.superRegistrationStateMachine.handleRetryableNak()
self.publishTunnelEvent(code: Int(errorCode.rawValue), message: errorMessage)
}
SDLLogger.log("[SDLContext] Get a SuperNak message exit")
}
private func handleEvent(event: SDLEvent) async {
let processor = self.makeSuperEventProcessor()
let plan = await processor.makeProcessingPlan(event: event)
if let logMessage = plan.logMessage {
SDLLogger.log(logMessage)
}
switch plan.action {
case .removeSession(let dstMac):
await self.sessionManager.removeSession(dstMac: dstMac)
case .sendRegister(let registerData, let remoteAddresses):
remoteAddresses.forEach { remoteAddress in
self.sendPeerPacket(type: .register, data: registerData, remoteAddress: remoteAddress)
}
case .shutdown(let message):
self.publishTunnelEvent(message: message)
// 退
let error = NSError(domain: "com.jihe.punchnet.tun", code: -2)
self.failReady(error)
self.provider.cancelTunnelWithError(error)
case .none:
()
}
}
private func doRegisterSuper() {
//
var registerSuper = SDLRegisterSuper()
registerSuper.clientID = self.config.clientId
registerSuper.networkID = self.config.networkAddress.networkId
registerSuper.mac = self.config.networkAddress.mac
registerSuper.ip = self.config.networkAddress.ip
registerSuper.maskLen = UInt32(self.config.networkAddress.maskLen)
registerSuper.hostname = self.config.hostname
registerSuper.pubKey = self.rsaCipher.pubKey
registerSuper.accessToken = self.config.accessToken
if let registerSuperData = try? registerSuper.serializedData() {
SDLLogger.log("[SDLContext] will send register super")
self.quicClient?.send(type: .registerSuper, data: registerSuperData)
}
}
}
// Hole
extension SDLContextActor {
private func consumeUDPHoleMessages(stream: AsyncStream<(SocketAddress, SDLHoleMessage)>, localAddress: SocketAddress, source: UDPHoleKind) async {
for await (remoteAddress, message) in stream {
if Task.isCancelled {
break
}
switch message.inboundMessage {
case .control(let controlMessage):
await self.handleHoleControlMessage(controlMessage, localAddress: localAddress, remoteAddress: remoteAddress, source: source)
case .data(let data):
try? await self.handleHoleData(data: data)
}
}
}
private func handleHoleControlMessage(_ message: SDLHoleControlMessage, localAddress: SocketAddress, remoteAddress: SocketAddress, source: UDPHoleKind) async {
switch message {
case .stunReply(_):
guard source == .v4 else {
return
}
SDLLogger.log("[SDLContext] get a stunReply", for: .debug)
case .stunProbeReply(let probeReply):
guard source == .v4 else {
return
}
await self.proberActor.handleProbeReply(localAddress: localAddress, reply: probeReply)
case .register(let register):
try? await self.handleRegister(remoteAddress: remoteAddress, register: register, source: source)
case .registerAck(let registerAck):
await self.handleRegisterAck(remoteAddress: remoteAddress, registerAck: registerAck, source: source)
}
}
private func handleRegister(remoteAddress: SocketAddress, register: SDLRegister, source: UDPHoleKind) async throws {
let networkAddr = config.networkAddress
SDLLogger.log("[SDLContext] register packet: \(register), network_address: \(networkAddr)")
// tun,
if register.dstMac == networkAddr.mac && register.networkID == networkAddr.networkId {
// ack
var registerAck = SDLRegisterAck()
registerAck.networkID = networkAddr.networkId
registerAck.srcMac = networkAddr.mac
registerAck.dstMac = register.srcMac
self.sendPeerPacket(type: .registerAck, data: try registerAck.serializedData(), remoteAddress: remoteAddress)
// , super-nodenatudpnat
if let session = Session(dstMac: register.srcMac, natAddress: remoteAddress, addressType: source.convertAddressType()) {
await self.sessionManager.addSession(session: session)
} else {
SDLLogger.log("[SDLContext] didReadRegister get unsupported remoteAddress: \(remoteAddress)", for: .debug)
}
} else {
SDLLogger.log("[SDLContext] didReadRegister get a invalid packet, because dst_ip not matched: \(register.dstMac)")
}
}
private func handleRegisterAck(remoteAddress: SocketAddress, registerAck: SDLRegisterAck, source: UDPHoleKind) async {
// tun,
let networkAddr = config.networkAddress
if registerAck.dstMac == networkAddr.mac && registerAck.networkID == networkAddr.networkId {
if let session = Session(dstMac: registerAck.srcMac, natAddress: remoteAddress, addressType: source.convertAddressType()) {
await self.sessionManager.addSession(session: session)
} else {
SDLLogger.log("[SDLContext] didReadRegisterAck get unsupported remoteAddress: \(remoteAddress)", for: .debug)
}
} else {
SDLLogger.log("[SDLContext] didReadRegisterAck get a invalid packet, because dst_mac not matched: \(registerAck.dstMac)")
}
}
private func makeHoleDataProcessor() -> SDLHoleDataProcessor {
return .init(
networkAddress: self.config.networkAddress,
dataCipher: self.dataCipher,
snapshotPublisher: self.snapshotPublisher,
flowSessionManager: self.flowSessionManager
)
}
private func handleHoleData(data: SDLData) async throws {
let processor = self.makeHoleDataProcessor()
guard let plan = try processor.makeProcessingPlan(data: data) else {
return
}
self.flowTracer.inc(num: plan.inboundBytes, type: .inbound)
switch plan.action {
case .sendARPReply(let dstMac, let responseData):
SDLLogger.log("[SDLContext] get arp request packet")
await self.routeLayerPacket(dstMac: dstMac, type: .arp, data: responseData)
case .appendARP(let ip, let mac):
SDLLogger.log("[SDLContext] get arp response packet")
await self.arpServer.append(ip: ip, mac: mac)
case .writeToTun(let packetData, let identityID):
let packet = NEPacket(data: packetData, protocolFamily: 2)
self.provider.packetFlow.writePacketObjects([packet])
SDLLogger.log("[SDLContext] hole identity: \(identityID), allow, data count: \(packetData.count)", for: .trace)
case .requestPolicy(let srcIdentityID):
SDLLogger.log("[SDLContext] not found identity: \(srcIdentityID) ruleMap", for: .debug)
//
await self.identifyStore.policyRequest(srcIdentityId: srcIdentityID, dstIdentityId: self.config.identityId, using: self.quicClient)
case .none:
()
}
}
}
private extension UInt32 {
// ip
func asIpAddress() -> String {
return SDLUtil.int32ToIp(self)
}
}
// Tun
extension SDLContextActor {
// , Tun
private func dealTunPacket(packet: IPPacket) async {
let router = SDLTunPacketRouter(networkAddress: self.config.networkAddress, exitNode: self.config.exitNode)
let decision = router.route(packet: packet)
// FlowSession
//
if decision.shouldTrackFlow, let flowSession = packet.flowSession() {
self.flowSessionManager.updateSession(flowSession)
//SDLLogger.shared.log("[SDLContext] flow_session: \(flowSession)", level: .debug)
}
await self.handleTunRouteDecision(decision)
}
private func handleTunRouteDecision(_ decision: SDLTunPacketRouter.RouteDecision) async {
switch decision {
case .loopback(let ipPacketData):
let nePacket = NEPacket(data: ipPacketData, protocolFamily: 2)
self.provider.packetFlow.writePacketObjects([nePacket])
case .cloudDNS(let name, let ipPacketData):
SDLLogger.log("[SDLContext] get cloud dns request: \(name)")
await self.dnsClient?.forward(ipPacketData: ipPacketData)
case .localDNS(let name, let payload, let tracker):
SDLLogger.log("[SDLContext] get local dns request: \(name)")
await self.dnsLocalClient?.query(tracker: tracker, dnsPayload: payload)
case .forwardToNextHop(let ip, let type, let data, let kind):
await self.forwardPacketToNextHop(ip: ip, type: type, data: data, kind: kind)
case .drop(let reason):
SDLLogger.log("[SDLContext] drop tun packet, reason: \(reason.rawValue)", for: .trace)
}
}
private func forwardPacketToNextHop(ip: UInt32, type: LayerPacket.PacketType, data: Data, kind: SDLTunPacketRouter.ForwardKind) async {
switch kind {
case .sameNetwork:
SDLLogger.log("[SDLContext] dstIp: \(ip.asIpAddress()) same network", for: .trace)
case .exitNode, .dnsExitNode:
SDLLogger.log("[SDLContext] use exit_node: \(ip.asIpAddress())", for: .trace)
}
// arpmac
if let dstMac = await self.arpServer.query(ip: ip) {
SDLLogger.log("[SDLContext] dstIp: \(ip.asIpAddress()), dst_mac is: \(SDLUtil.formatMacAddress(mac: dstMac))", for: .trace)
await self.routeLayerPacket(dstMac: dstMac, type: type, data: data)
}
else {
SDLLogger.log("[SDLContext] dstIp: \(ip.asIpAddress()) arp query not found, broadcast", for: .trace)
// // arp广
// let arpReqeust = ARPPacket.arpRequest(senderIP: networkAddr.ip, senderMAC: networkAddr.mac, targetIP: dstIp)
// await self.routeLayerPacket(dstMac: ARPPacket.broadcastMac , type: .arp, data: arpReqeust.marshal())
try? await self.arpServer.arpRequest(targetIp: ip, use: self.quicClient)
}
}
private func makeLayerPacketForwarder() -> SDLLayerPacketForwarder {
return .init(
networkAddress: self.config.networkAddress,
identityID: self.config.identityId,
dataCipher: self.dataCipher,
sessionManager: self.sessionManager
)
}
private func routeLayerPacket(dstMac: Data, type: LayerPacket.PacketType, data: Data) async {
// 2
//
let forwarder = self.makeLayerPacketForwarder()
guard let plan = try? await forwarder.makeDeliveryPlan(dstMac: dstMac, type: type, data: data) else {
return
}
// 广
switch plan {
case .superNode(let payload):
// super_node
self.sendSuperPacket(type: .data, data: payload)
case .peer(let payload, let session):
// session
SDLLogger.log("[SDLContext] step 5 send packet by session: \(session)", for: .trace)
self.sendPeerPacket(type: .data, data: payload, remoteAddress: session.natAddress)
self.flowTracer.inc(num: payload.count, type: .p2p)
case .superNodeAndPunch(let payload, let request):
// super_node
self.sendSuperPacket(type: .data, data: payload)
SDLLogger.log("[SDLContext] step 5 send packet by super: \(self.config.stunSocketAddress)", for: .trace)
//
self.flowTracer.inc(num: payload.count, type: .forward)
//
await self.puncherActor.submitRegisterRequest(quicClient: self.quicClient, request: request)
}
}
}