Compare commits
38 Commits
main
...
async_chan
| Author | SHA1 | Date | |
|---|---|---|---|
| 8932fc8b9d | |||
| 5d87075108 | |||
| f410433aac | |||
| 075010703a | |||
| de79811623 | |||
| 90939a68c1 | |||
| 77a631a001 | |||
| 2c05d71dbe | |||
| 325a1674d3 | |||
| de16c1ad18 | |||
| 45fda4ce9b | |||
| 6cb8641869 | |||
| 2d6adba8a8 | |||
| c8f1787ea2 | |||
| 95760764a5 | |||
| 5756422ebb | |||
| e6fc903439 | |||
| d775f5ba96 | |||
| a292bec2c4 | |||
| 3e96169690 | |||
| 7af3b235e7 | |||
| 5b57087ea2 | |||
| b03159c693 | |||
| 0fe9b43c08 | |||
| 29b9bebe87 | |||
| 12b1d68635 | |||
| a280a22d3a | |||
| 26ee512a9a | |||
| f273da3b11 | |||
| 3ce4e05613 | |||
| 9a76b61ff0 | |||
| 1f629a58a0 | |||
| d1452ce0b7 | |||
| b55d9913cc | |||
| cd24d2ac7c | |||
| 4c815397da | |||
| 3a0c21280c | |||
| aae0b333de |
@ -1,5 +1,5 @@
|
|||||||
{
|
{
|
||||||
"originHash" : "c6ae4911d55046c288e0ecdc9de28a95dcb591d3daaa03914664a3d2e069977a",
|
"originHash" : "597c0437584251872938aab4e6fd3cc1fc28e6da31c045adee36661e9c049c5e",
|
||||||
"pins" : [
|
"pins" : [
|
||||||
{
|
{
|
||||||
"identity" : "swift-atomics",
|
"identity" : "swift-atomics",
|
||||||
@ -24,8 +24,8 @@
|
|||||||
"kind" : "remoteSourceControl",
|
"kind" : "remoteSourceControl",
|
||||||
"location" : "https://github.com/apple/swift-nio.git",
|
"location" : "https://github.com/apple/swift-nio.git",
|
||||||
"state" : {
|
"state" : {
|
||||||
"revision" : "ad6b5f17270a7008f60d35ec5378e6144a575162",
|
"revision" : "a5fea865badcb1c993c85b0f0e8d05a4bd2270fb",
|
||||||
"version" : "2.84.0"
|
"version" : "2.85.0"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|||||||
@ -7,7 +7,7 @@ let package = Package(
|
|||||||
name: "sdlan",
|
name: "sdlan",
|
||||||
platforms: [
|
platforms: [
|
||||||
.iOS(.v17),
|
.iOS(.v17),
|
||||||
.macOS(.v10_13)
|
.macOS(.v14)
|
||||||
],
|
],
|
||||||
products: [
|
products: [
|
||||||
// Products define the executables and libraries a package produces, making them visible to other packages.
|
// Products define the executables and libraries a package produces, making them visible to other packages.
|
||||||
@ -16,7 +16,7 @@ let package = Package(
|
|||||||
targets: ["Punchnet"]),
|
targets: ["Punchnet"]),
|
||||||
],
|
],
|
||||||
dependencies: [
|
dependencies: [
|
||||||
.package(url: "https://github.com/apple/swift-nio.git", from: "2.84.0"),
|
.package(url: "https://github.com/apple/swift-nio.git", exact: "2.85.0"),
|
||||||
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.26.0")
|
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.26.0")
|
||||||
|
|
||||||
],
|
],
|
||||||
@ -28,6 +28,7 @@ let package = Package(
|
|||||||
dependencies: [
|
dependencies: [
|
||||||
.product(name: "NIOCore", package: "swift-nio"),
|
.product(name: "NIOCore", package: "swift-nio"),
|
||||||
.product(name: "NIOPosix", package: "swift-nio"),
|
.product(name: "NIOPosix", package: "swift-nio"),
|
||||||
|
.product(name: "NIOFoundationCompat", package: "swift-nio"),
|
||||||
.product(name: "SwiftProtobuf", package: "swift-protobuf")
|
.product(name: "SwiftProtobuf", package: "swift-protobuf")
|
||||||
]
|
]
|
||||||
),
|
),
|
||||||
|
|||||||
@ -6,7 +6,14 @@
|
|||||||
//
|
//
|
||||||
import Foundation
|
import Foundation
|
||||||
|
|
||||||
struct ARPPacket {
|
struct ARPPacket: CustomStringConvertible {
|
||||||
|
var description: String {
|
||||||
|
return """
|
||||||
|
opcode: \(self.opcode), sender_ip: \(SDLUtil.int32ToIp(self.senderIP)), sender_mac: \(SDLUtil.formatMacAddress(mac: senderMAC)),
|
||||||
|
target_ip: \(SDLUtil.int32ToIp(self.targetIP)), target_mac: \(SDLUtil.formatMacAddress(mac: targetMAC))
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
|
||||||
// ARP操作码
|
// ARP操作码
|
||||||
enum Opcode: UInt16 {
|
enum Opcode: UInt16 {
|
||||||
case request = 0x01
|
case request = 0x01
|
||||||
@ -31,8 +38,15 @@ struct ARPPacket {
|
|||||||
var targetMAC: Data
|
var targetMAC: Data
|
||||||
var targetIP: UInt32
|
var targetIP: UInt32
|
||||||
|
|
||||||
init(hardwareType: UInt16, protocolType: UInt16, hardwareSize: UInt8, protocolSize: UInt8, opcode: Opcode,
|
init(hardwareType: UInt16,
|
||||||
senderMAC: Data, senderIP: UInt32, targetMAC: Data, targetIP: UInt32) {
|
protocolType: UInt16,
|
||||||
|
hardwareSize: UInt8,
|
||||||
|
protocolSize: UInt8,
|
||||||
|
opcode: Opcode,
|
||||||
|
senderMAC: Data,
|
||||||
|
senderIP: UInt32,
|
||||||
|
targetMAC: Data,
|
||||||
|
targetIP: UInt32) {
|
||||||
|
|
||||||
self.hardwareType = hardwareType
|
self.hardwareType = hardwareType
|
||||||
self.protocolType = protocolType
|
self.protocolType = protocolType
|
||||||
@ -47,7 +61,6 @@ struct ARPPacket {
|
|||||||
|
|
||||||
init?(data: Data) {
|
init?(data: Data) {
|
||||||
guard data.count >= 28 else {
|
guard data.count >= 28 else {
|
||||||
NSLog("length < 28: len: \(data.count)")
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,7 +69,6 @@ struct ARPPacket {
|
|||||||
self.hardwareSize = data[4]
|
self.hardwareSize = data[4]
|
||||||
self.protocolSize = data[5]
|
self.protocolSize = data[5]
|
||||||
guard let opcode = Opcode(rawValue: UInt16(data[6]) << 8 | UInt16(data[7])) else {
|
guard let opcode = Opcode(rawValue: UInt16(data[6]) << 8 | UInt16(data[7])) else {
|
||||||
NSLog("opcode error")
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,31 +0,0 @@
|
|||||||
//
|
|
||||||
// HolerManager.swift
|
|
||||||
// sdlan
|
|
||||||
//
|
|
||||||
// Created by 安礼成 on 2025/7/14.
|
|
||||||
//
|
|
||||||
import Foundation
|
|
||||||
|
|
||||||
actor HolerManager {
|
|
||||||
private var holers: [Data:Task<(), Never>] = [:]
|
|
||||||
|
|
||||||
func addHoler(dstMac: Data, creator: @escaping () -> Task<(), Never>) {
|
|
||||||
if let task = self.holers[dstMac] {
|
|
||||||
if task.isCancelled {
|
|
||||||
self.holers[dstMac] = creator()
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
self.holers[dstMac] = creator()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func cleanup() {
|
|
||||||
for holer in holers.values {
|
|
||||||
holer.cancel()
|
|
||||||
}
|
|
||||||
self.holers.removeAll()
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@ -1,67 +0,0 @@
|
|||||||
//
|
|
||||||
// NetworkInterface.swift
|
|
||||||
// Tun
|
|
||||||
//
|
|
||||||
// Created by 安礼成 on 2024/1/19.
|
|
||||||
//
|
|
||||||
|
|
||||||
import Foundation
|
|
||||||
|
|
||||||
public struct NetworkInterface {
|
|
||||||
public let name: String
|
|
||||||
public let ip: String
|
|
||||||
public let netmask: String
|
|
||||||
}
|
|
||||||
|
|
||||||
public struct NetworkInterfaceManager {
|
|
||||||
/**
|
|
||||||
获取网卡信息, (let name: String let ip: String let netmask: String)
|
|
||||||
*/
|
|
||||||
public static func getInterfaces() -> [NetworkInterface] {
|
|
||||||
var interfaces: [NetworkInterface] = []
|
|
||||||
|
|
||||||
// Get list of all interfaces on the local machine:
|
|
||||||
var ifaddr : UnsafeMutablePointer<ifaddrs>? = nil
|
|
||||||
if getifaddrs(&ifaddr) == 0 {
|
|
||||||
|
|
||||||
// For each interface ...
|
|
||||||
var ptr = ifaddr
|
|
||||||
while( ptr != nil) {
|
|
||||||
|
|
||||||
let flags = Int32(ptr!.pointee.ifa_flags)
|
|
||||||
var addr = ptr!.pointee.ifa_addr.pointee
|
|
||||||
|
|
||||||
// Check for running IPv4, IPv6 interfaces. Skip the loopback interface.
|
|
||||||
if (flags & (IFF_UP|IFF_RUNNING|IFF_LOOPBACK)) == (IFF_UP|IFF_RUNNING) {
|
|
||||||
if addr.sa_family == UInt8(AF_INET) || addr.sa_family == UInt8(AF_INET6) {
|
|
||||||
|
|
||||||
var mask = ptr!.pointee.ifa_netmask.pointee
|
|
||||||
|
|
||||||
// Convert interface address to a human readable string:
|
|
||||||
let zero = CChar(0)
|
|
||||||
var hostname = [CChar](repeating: zero, count: Int(NI_MAXHOST))
|
|
||||||
var netmask = [CChar](repeating: zero, count: Int(NI_MAXHOST))
|
|
||||||
if (getnameinfo(&addr, socklen_t(addr.sa_len), &hostname, socklen_t(hostname.count),
|
|
||||||
nil, socklen_t(0), NI_NUMERICHOST) == 0) {
|
|
||||||
let address = String(cString: hostname)
|
|
||||||
|
|
||||||
let name = ptr!.pointee.ifa_name!
|
|
||||||
let ifname = String(cString: name)
|
|
||||||
|
|
||||||
if (getnameinfo(&mask, socklen_t(mask.sa_len), &netmask, socklen_t(netmask.count), nil, socklen_t(0), NI_NUMERICHOST) == 0) {
|
|
||||||
let netmaskIP = String(cString: netmask)
|
|
||||||
|
|
||||||
interfaces.append(NetworkInterface(name: ifname, ip: address, netmask: netmaskIP))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ptr = ptr!.pointee.ifa_next
|
|
||||||
}
|
|
||||||
freeifaddrs(ifaddr)
|
|
||||||
}
|
|
||||||
|
|
||||||
return interfaces
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -15,6 +15,7 @@ import Combine
|
|||||||
1. 处理rsa的加解密逻辑
|
1. 处理rsa的加解密逻辑
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
@available(macOS 14, *)
|
||||||
public class SDLContext: @unchecked Sendable {
|
public class SDLContext: @unchecked Sendable {
|
||||||
|
|
||||||
// 路由信息
|
// 路由信息
|
||||||
@ -48,10 +49,7 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
|
|
||||||
// 依赖的变量
|
// 依赖的变量
|
||||||
var udpHole: SDLUDPHole?
|
var udpHole: SDLUDPHole?
|
||||||
private var udpCancel: AnyCancellable?
|
|
||||||
|
|
||||||
var superClient: SDLSuperClient?
|
var superClient: SDLSuperClient?
|
||||||
private var superCancel: AnyCancellable?
|
|
||||||
|
|
||||||
// 数据包读取任务
|
// 数据包读取任务
|
||||||
private var readTask: Task<(), Never>?
|
private var readTask: Task<(), Never>?
|
||||||
@ -59,27 +57,39 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
let provider: NEPacketTunnelProvider
|
let provider: NEPacketTunnelProvider
|
||||||
|
|
||||||
private var sessionManager: SessionManager
|
private var sessionManager: SessionManager
|
||||||
private var holerManager: HolerManager
|
|
||||||
private var arpServer: ArpServer
|
private var arpServer: ArpServer
|
||||||
|
|
||||||
// 记录最后发送的stunRequest的cookie
|
// 记录最后发送的stunRequest的cookie
|
||||||
private var lastCookie: UInt32? = 0
|
private var lastCookie: UInt32? = 0
|
||||||
|
|
||||||
// 定时器
|
|
||||||
private var stunCancel: AnyCancellable?
|
|
||||||
|
|
||||||
// 网络状态变化的健康
|
// 网络状态变化的健康
|
||||||
private var monitor = SDLNetworkMonitor()
|
private var monitor: SDLNetworkMonitor?
|
||||||
private var monitorCancel: AnyCancellable?
|
|
||||||
|
|
||||||
// 内部socket通讯
|
// 内部socket通讯
|
||||||
private var noticeClient: SDLNoticeClient
|
private var noticeClient: SDLNoticeClient?
|
||||||
|
|
||||||
// 流量统计
|
// 流量统计
|
||||||
private var flowTracer = SDLFlowTracerActor()
|
private var flowTracer = SDLFlowTracerActor()
|
||||||
private var flowTracerCancel: AnyCancellable?
|
private var flowTracerCancel: AnyCancellable?
|
||||||
|
|
||||||
public init(provider: NEPacketTunnelProvider, config: SDLConfiguration, rsaCipher: RSACipher, aesCipher: AESCipher) {
|
// 处理holer
|
||||||
|
private var holerPublishers: [Data:PassthroughSubject<RegisterRequest, Never>] = [:]
|
||||||
|
private var bag = Set<AnyCancellable>()
|
||||||
|
private var locker = NSLock()
|
||||||
|
|
||||||
|
private let logger: SDLLogger
|
||||||
|
|
||||||
|
private var rootTask: Task<Void, Error>?
|
||||||
|
|
||||||
|
struct RegisterRequest {
|
||||||
|
let srcMac: Data
|
||||||
|
let dstMac: Data
|
||||||
|
let networkId: UInt32
|
||||||
|
}
|
||||||
|
|
||||||
|
public init(provider: NEPacketTunnelProvider, config: SDLConfiguration, rsaCipher: RSACipher, aesCipher: AESCipher, logger: SDLLogger) {
|
||||||
|
self.logger = logger
|
||||||
|
|
||||||
self.config = config
|
self.config = config
|
||||||
self.rsaCipher = rsaCipher
|
self.rsaCipher = rsaCipher
|
||||||
self.aesCipher = aesCipher
|
self.aesCipher = aesCipher
|
||||||
@ -91,59 +101,155 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
|
|
||||||
self.provider = provider
|
self.provider = provider
|
||||||
self.sessionManager = SessionManager()
|
self.sessionManager = SessionManager()
|
||||||
self.holerManager = HolerManager()
|
|
||||||
self.arpServer = ArpServer(known_macs: [:])
|
self.arpServer = ArpServer(known_macs: [:])
|
||||||
self.noticeClient = SDLNoticeClient()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public func start() async throws {
|
public func start() async throws {
|
||||||
try await self.startSuperClient()
|
self.rootTask = Task {
|
||||||
try await self.startUDPHole()
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
self.noticeClient.start()
|
group.addTask {
|
||||||
|
defer {
|
||||||
|
self.logger.log("[SDLContext] UDPHole exit", level: .debug)
|
||||||
|
}
|
||||||
|
|
||||||
// 启动网络监控
|
while !Task.isCancelled {
|
||||||
self.monitorCancel = self.monitor.eventFlow.sink { event in
|
do {
|
||||||
switch event {
|
try await self.startUDPHole()
|
||||||
case .changed:
|
} catch let err {
|
||||||
// 需要重新探测网络的nat类型
|
self.logger.log("[SDLContext] UDPHole get err: \(err)", level: .warning)
|
||||||
Task {
|
try await Task.sleep(for: .seconds(2))
|
||||||
self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config)
|
}
|
||||||
NSLog("didNetworkPathChanged, nat type is: \(self.natType)")
|
}
|
||||||
}
|
}
|
||||||
case .unreachable:
|
|
||||||
NSLog("didNetworkPathUnreachable")
|
group.addTask {
|
||||||
|
defer {
|
||||||
|
self.logger.log("[SDLContext] SuperClient exit", level: .debug)
|
||||||
|
}
|
||||||
|
|
||||||
|
while !Task.isCancelled {
|
||||||
|
do {
|
||||||
|
try await self.startSuperClient()
|
||||||
|
} catch let err {
|
||||||
|
self.logger.log("[SDLContext] SuperClient get error: \(err), will restart", level: .warning)
|
||||||
|
await self.arpServer.clear()
|
||||||
|
try? await Task.sleep(for: .seconds(2))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
group.addTask {
|
||||||
|
await self.startMonitor()
|
||||||
|
}
|
||||||
|
|
||||||
|
group.addTask {
|
||||||
|
defer {
|
||||||
|
self.logger.log("[SDLContext] noticeClient exit", level: .debug)
|
||||||
|
}
|
||||||
|
|
||||||
|
while !Task.isCancelled {
|
||||||
|
do {
|
||||||
|
try await self.startNoticeClient()
|
||||||
|
} catch let err {
|
||||||
|
self.logger.log("[SDLContext] noticeClient get err: \(err)", level: .warning)
|
||||||
|
try await Task.sleep(for: .seconds(2))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try await group.waitForAll()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.monitor.start()
|
|
||||||
|
try await self.rootTask?.value
|
||||||
}
|
}
|
||||||
|
|
||||||
public func stop() async {
|
public func stop() async {
|
||||||
self.superCancel?.cancel()
|
self.rootTask?.cancel()
|
||||||
self.superClient = nil
|
self.superClient = nil
|
||||||
|
|
||||||
self.udpCancel?.cancel()
|
|
||||||
self.udpHole = nil
|
self.udpHole = nil
|
||||||
|
self.noticeClient = nil
|
||||||
|
|
||||||
self.readTask?.cancel()
|
self.readTask?.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
private func startSuperClient() async throws {
|
private func startNoticeClient() async throws {
|
||||||
self.superClient = SDLSuperClient(host: config.superHost, port: config.superPort)
|
self.noticeClient = try await SDLNoticeClient(logger: self.logger)
|
||||||
// 建立super的绑定关系
|
try await self.noticeClient?.start()
|
||||||
self.superCancel?.cancel()
|
self.logger.log("[SDLContext] notice_client task cancel", level: .warning)
|
||||||
self.superCancel = self.superClient?.eventFlow.sink { event in
|
|
||||||
Task {
|
|
||||||
await self.handleSuperEvent(event: event)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try await self.superClient?.start()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private func handleSuperEvent(event: SDLSuperClient.SuperEvent) async {
|
private func startUDPHole() async throws {
|
||||||
|
self.udpHole = try await SDLUDPHole(logger: self.logger)
|
||||||
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
|
defer {
|
||||||
|
self.logger.log("[SDLContext] udp_hole task cancel", level: .warning)
|
||||||
|
}
|
||||||
|
|
||||||
|
group.addTask {
|
||||||
|
try await self.udpHole?.start()
|
||||||
|
}
|
||||||
|
|
||||||
|
group.addTask {
|
||||||
|
while !Task.isCancelled {
|
||||||
|
try await Task.sleep(nanoseconds: 5 * 1_000_000_000)
|
||||||
|
self.lastCookie = await self.udpHole?.stunRequest(context: self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
group.addTask {
|
||||||
|
if let eventFlow = self.udpHole?.eventFlow {
|
||||||
|
for try await event in eventFlow {
|
||||||
|
try await self.handleUDPEvent(event: event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try await group.waitForAll()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private func startSuperClient() async throws {
|
||||||
|
self.superClient = try await SDLSuperClient(host: self.config.superHost, port: self.config.superPort, logger: self.logger)
|
||||||
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
|
defer {
|
||||||
|
self.logger.log("[SDLContext] super client task cancel", level: .warning)
|
||||||
|
}
|
||||||
|
|
||||||
|
group.addTask {
|
||||||
|
try await self.superClient?.start()
|
||||||
|
}
|
||||||
|
|
||||||
|
group.addTask {
|
||||||
|
if let eventFlow = self.superClient?.eventFlow {
|
||||||
|
for try await event in eventFlow {
|
||||||
|
try await self.handleSuperEvent(event: event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try await group.waitForAll()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func startMonitor() async {
|
||||||
|
self.monitor = SDLNetworkMonitor()
|
||||||
|
for await event in self.monitor!.eventStream {
|
||||||
|
switch event {
|
||||||
|
case .changed:
|
||||||
|
// 需要重新探测网络的nat类型
|
||||||
|
self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config, logger: self.logger)
|
||||||
|
self.logger.log("didNetworkPathChanged, nat type is: \(self.natType)", level: .info)
|
||||||
|
case .unreachable:
|
||||||
|
self.logger.log("didNetworkPathUnreachable", level: .warning)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func handleSuperEvent(event: SDLSuperClient.SuperEvent) async throws {
|
||||||
switch event {
|
switch event {
|
||||||
case .ready:
|
case .ready:
|
||||||
NSLog("[SDLContext] get registerSuper, mac address: \(Self.formatMacAddress(mac: self.devAddr.mac))")
|
self.logger.log("[SDLContext] get registerSuper, mac address: \(SDLUtil.formatMacAddress(mac: self.devAddr.mac))", level: .debug)
|
||||||
guard let message = await self.superClient?.registerSuper(context: self) else {
|
guard let message = try await self.superClient?.registerSuper(context: self).get() else {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -153,12 +259,12 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
let aesKey = try! self.rsaCipher.decode(data: Data(registerSuperAck.aesKey))
|
let aesKey = try! self.rsaCipher.decode(data: Data(registerSuperAck.aesKey))
|
||||||
let upgradeType = SDLUpgradeType(rawValue: registerSuperAck.upgradeType)
|
let upgradeType = SDLUpgradeType(rawValue: registerSuperAck.upgradeType)
|
||||||
|
|
||||||
NSLog("[SDLContext] get registerSuperAck, aes_key len: \(aesKey.count)")
|
self.logger.log("[SDLContext] get registerSuperAck, aes_key len: \(aesKey.count)", level: .info)
|
||||||
self.devAddr = registerSuperAck.devAddr
|
self.devAddr = registerSuperAck.devAddr
|
||||||
|
|
||||||
if upgradeType == .force {
|
if upgradeType == .force {
|
||||||
let forceUpgrade = NoticeMessage.UpgradeMessage(prompt: registerSuperAck.upgradePrompt, address: registerSuperAck.upgradeAddress)
|
let forceUpgrade = NoticeMessage.UpgradeMessage(prompt: registerSuperAck.upgradePrompt, address: registerSuperAck.upgradeAddress)
|
||||||
self.noticeClient.send(data: forceUpgrade.binaryData)
|
await self.noticeClient?.send(data: forceUpgrade.binaryData)
|
||||||
exit(-1)
|
exit(-1)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -168,7 +274,7 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
|
|
||||||
if upgradeType == .normal {
|
if upgradeType == .normal {
|
||||||
let normalUpgrade = NoticeMessage.UpgradeMessage(prompt: registerSuperAck.upgradePrompt, address: registerSuperAck.upgradeAddress)
|
let normalUpgrade = NoticeMessage.UpgradeMessage(prompt: registerSuperAck.upgradePrompt, address: registerSuperAck.upgradeAddress)
|
||||||
self.noticeClient.send(data: normalUpgrade.binaryData)
|
await self.noticeClient?.send(data: normalUpgrade.binaryData)
|
||||||
}
|
}
|
||||||
|
|
||||||
case .registerSuperNak(let nakPacket):
|
case .registerSuperNak(let nakPacket):
|
||||||
@ -180,42 +286,34 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
switch errorCode {
|
switch errorCode {
|
||||||
case .invalidToken, .nodeDisabled:
|
case .invalidToken, .nodeDisabled:
|
||||||
let alertNotice = NoticeMessage.AlertMessage(alert: errorMessage)
|
let alertNotice = NoticeMessage.AlertMessage(alert: errorMessage)
|
||||||
self.noticeClient.send(data: alertNotice.binaryData)
|
await self.noticeClient?.send(data: alertNotice.binaryData)
|
||||||
exit(-1)
|
exit(-1)
|
||||||
case .noIpAddress, .networkFault, .internalFault:
|
case .noIpAddress, .networkFault, .internalFault:
|
||||||
let alertNotice = NoticeMessage.AlertMessage(alert: errorMessage)
|
let alertNotice = NoticeMessage.AlertMessage(alert: errorMessage)
|
||||||
self.noticeClient.send(data: alertNotice.binaryData)
|
await self.noticeClient?.send(data: alertNotice.binaryData)
|
||||||
}
|
}
|
||||||
NSLog("[SDLContext] Get a SuperNak message exit")
|
self.logger.log("[SDLContext] Get a SuperNak message exit", level: .warning)
|
||||||
default:
|
default:
|
||||||
()
|
()
|
||||||
}
|
}
|
||||||
|
|
||||||
case .closed:
|
|
||||||
NSLog("[SDLContext] super client closed")
|
|
||||||
await self.arpServer.clear()
|
|
||||||
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
|
|
||||||
Task {@MainActor in
|
|
||||||
try await self.startSuperClient()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case .event(let evt):
|
case .event(let evt):
|
||||||
switch evt {
|
switch evt {
|
||||||
case .natChanged(let natChangedEvent):
|
case .natChanged(let natChangedEvent):
|
||||||
let dstMac = natChangedEvent.mac
|
let dstMac = natChangedEvent.mac
|
||||||
NSLog("[SDLContext] natChangedEvent, dstMac: \(dstMac)")
|
self.logger.log("[SDLContext] natChangedEvent, dstMac: \(dstMac)", level: .info)
|
||||||
await sessionManager.removeSession(dstMac: dstMac)
|
await sessionManager.removeSession(dstMac: dstMac)
|
||||||
case .sendRegister(let sendRegisterEvent):
|
case .sendRegister(let sendRegisterEvent):
|
||||||
NSLog("[SDLContext] sendRegisterEvent, ip: \(sendRegisterEvent)")
|
self.logger.log("[SDLContext] sendRegisterEvent, ip: \(sendRegisterEvent)", level: .debug)
|
||||||
let address = SDLUtil.int32ToIp(sendRegisterEvent.natIp)
|
let address = SDLUtil.int32ToIp(sendRegisterEvent.natIp)
|
||||||
if let remoteAddress = try? SocketAddress.makeAddressResolvingHost(address, port: Int(sendRegisterEvent.natPort)) {
|
if let remoteAddress = try? SocketAddress.makeAddressResolvingHost(address, port: Int(sendRegisterEvent.natPort)) {
|
||||||
// 发送register包
|
// 发送register包
|
||||||
self.udpHole?.sendRegister(context: self, remoteAddress: remoteAddress, dst_mac: sendRegisterEvent.dstMac)
|
await self.udpHole?.sendRegister(remoteAddress: remoteAddress, networkId: self.devAddr.networkID, srcMac: self.devAddr.mac, dst_mac: sendRegisterEvent.dstMac)
|
||||||
}
|
}
|
||||||
|
|
||||||
case .networkShutdown(let shutdownEvent):
|
case .networkShutdown(let shutdownEvent):
|
||||||
let alertNotice = NoticeMessage.AlertMessage(alert: shutdownEvent.message)
|
let alertNotice = NoticeMessage.AlertMessage(alert: shutdownEvent.message)
|
||||||
self.noticeClient.send(data: alertNotice.binaryData)
|
await self.noticeClient?.send(data: alertNotice.binaryData)
|
||||||
exit(-1)
|
exit(-1)
|
||||||
}
|
}
|
||||||
case .command(let packetId, let command):
|
case .command(let packetId, let command):
|
||||||
@ -223,7 +321,7 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
case .changeNetwork(let changeNetworkCommand):
|
case .changeNetwork(let changeNetworkCommand):
|
||||||
// 需要对数据通过rsa的私钥解码
|
// 需要对数据通过rsa的私钥解码
|
||||||
let aesKey = try! self.rsaCipher.decode(data: Data(changeNetworkCommand.aesKey))
|
let aesKey = try! self.rsaCipher.decode(data: Data(changeNetworkCommand.aesKey))
|
||||||
NSLog("[SDLContext] change network command get aes_key len: \(aesKey.count)")
|
self.logger.log("[SDLContext] change network command get aes_key len: \(aesKey.count)", level: .info)
|
||||||
self.devAddr = changeNetworkCommand.devAddr
|
self.devAddr = changeNetworkCommand.devAddr
|
||||||
|
|
||||||
// 服务器分配的tun网卡信息
|
// 服务器分配的tun网卡信息
|
||||||
@ -233,57 +331,32 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
var commandAck = SDLCommandAck()
|
var commandAck = SDLCommandAck()
|
||||||
commandAck.status = true
|
commandAck.status = true
|
||||||
|
|
||||||
self.superClient?.commandAck(packetId: packetId, ack: commandAck)
|
await self.superClient?.commandAck(packetId: packetId, ack: commandAck)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private func startUDPHole() async throws {
|
private func handleUDPEvent(event: SDLUDPHole.UDPEvent) async throws {
|
||||||
self.udpHole = SDLUDPHole()
|
|
||||||
|
|
||||||
self.udpCancel?.cancel()
|
|
||||||
self.udpCancel = self.udpHole?.eventFlow.sink { event in
|
|
||||||
Task.detached {
|
|
||||||
await self.handleUDPEvent(event: event)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try await self.udpHole?.start()
|
|
||||||
}
|
|
||||||
|
|
||||||
private func handleUDPEvent(event: SDLUDPHole.UDPEvent) async {
|
|
||||||
switch event {
|
switch event {
|
||||||
case .ready:
|
case .ready:
|
||||||
// 获取当前网络的类型
|
// 获取当前网络的类型
|
||||||
self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config)
|
//self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config)
|
||||||
SDLLogger.log("[SDLContext] nat type is: \(self.natType)", level: .debug)
|
self.logger.log("[SDLContext] nat type is: \(self.natType)", level: .debug)
|
||||||
|
|
||||||
let timer = Timer.publish(every: 5.0, on: .main, in: .common).autoconnect()
|
|
||||||
self.stunCancel = Just(Date()).merge(with: timer).sink { _ in
|
|
||||||
self.lastCookie = self.udpHole?.stunRequest(context: self)
|
|
||||||
}
|
|
||||||
|
|
||||||
case .closed:
|
|
||||||
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
|
|
||||||
Task {
|
|
||||||
try await self.startUDPHole()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
case .message(let remoteAddress, let message):
|
case .message(let remoteAddress, let message):
|
||||||
switch message {
|
switch message {
|
||||||
case .register(let register):
|
case .register(let register):
|
||||||
NSLog("register packet: \(register), dev_addr: \(self.devAddr)")
|
self.logger.log("register packet: \(register), dev_addr: \(self.devAddr)", level: .debug)
|
||||||
// 判断目标地址是否是tun的网卡地址, 并且是在同一个网络下
|
// 判断目标地址是否是tun的网卡地址, 并且是在同一个网络下
|
||||||
if register.dstMac == self.devAddr.mac && register.networkID == self.devAddr.networkID {
|
if register.dstMac == self.devAddr.mac && register.networkID == self.devAddr.networkID {
|
||||||
// 回复ack包
|
// 回复ack包
|
||||||
self.udpHole?.sendRegisterAck(context: self, remoteAddress: remoteAddress, dst_mac: register.srcMac)
|
await self.udpHole?.sendRegisterAck(context: self, remoteAddress: remoteAddress, dst_mac: register.srcMac)
|
||||||
// 这里需要建立到来源的会话, 在复杂网络下,通过super-node查询到的nat地址不一定靠谱,需要通过udp包的来源地址作为nat地址
|
// 这里需要建立到来源的会话, 在复杂网络下,通过super-node查询到的nat地址不一定靠谱,需要通过udp包的来源地址作为nat地址
|
||||||
let session = Session(dstMac: register.srcMac, natAddress: remoteAddress)
|
let session = Session(dstMac: register.srcMac, natAddress: remoteAddress)
|
||||||
await self.sessionManager.addSession(session: session)
|
await self.sessionManager.addSession(session: session)
|
||||||
} else {
|
} else {
|
||||||
SDLLogger.log("SDLContext didReadRegister get a invalid packet, because dst_ip not matched: \(register.dstMac)", level: .warning)
|
self.logger.log("SDLContext didReadRegister get a invalid packet, because dst_ip not matched: \(register.dstMac)", level: .warning)
|
||||||
}
|
}
|
||||||
case .registerAck(let registerAck):
|
case .registerAck(let registerAck):
|
||||||
// 判断目标地址是否是tun的网卡地址, 并且是在同一个网络下
|
// 判断目标地址是否是tun的网卡地址, 并且是在同一个网络下
|
||||||
@ -291,14 +364,14 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
let session = Session(dstMac: registerAck.srcMac, natAddress: remoteAddress)
|
let session = Session(dstMac: registerAck.srcMac, natAddress: remoteAddress)
|
||||||
await self.sessionManager.addSession(session: session)
|
await self.sessionManager.addSession(session: session)
|
||||||
} else {
|
} else {
|
||||||
SDLLogger.log("SDLContext didReadRegisterAck get a invalid packet, because dst_mac not matched: \(registerAck.dstMac)", level: .warning)
|
self.logger.log("SDLContext didReadRegisterAck get a invalid packet, because dst_mac not matched: \(registerAck.dstMac)", level: .warning)
|
||||||
}
|
}
|
||||||
case .stunReply(let stunReply):
|
case .stunReply(let stunReply):
|
||||||
let cookie = stunReply.cookie
|
let cookie = stunReply.cookie
|
||||||
if cookie == self.lastCookie {
|
if cookie == self.lastCookie {
|
||||||
// 记录下当前在nat上的映射信息,暂时没有用;后续会用来判断网络类型
|
// 记录下当前在nat上的映射信息,暂时没有用;后续会用来判断网络类型
|
||||||
//self.natAddress = stunReply.natAddress
|
//self.natAddress = stunReply.natAddress
|
||||||
SDLLogger.log("[SDLContext] get a stunReply: \(try! stunReply.jsonString())")
|
self.logger.log("[SDLContext] get a stunReply: \(try! stunReply.jsonString())", level: .debug)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
()
|
()
|
||||||
@ -307,12 +380,10 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
case .data(let data):
|
case .data(let data):
|
||||||
let mac = LayerPacket.MacAddress(data: data.dstMac)
|
let mac = LayerPacket.MacAddress(data: data.dstMac)
|
||||||
guard (data.dstMac == self.devAddr.mac || mac.isBroadcast() || mac.isMulticast()) else {
|
guard (data.dstMac == self.devAddr.mac || mac.isBroadcast() || mac.isMulticast()) else {
|
||||||
NSLog("[SDLContext] didReadData 1")
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
guard let decyptedData = try? self.aesCipher.decypt(aesKey: self.aesKey, data: Data(data.data)) else {
|
guard let decyptedData = try? self.aesCipher.decypt(aesKey: self.aesKey, data: Data(data.data)) else {
|
||||||
NSLog("[SDLContext] didReadData 2")
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -328,50 +399,48 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
if arpPacket.targetIP == self.devAddr.netAddr {
|
if arpPacket.targetIP == self.devAddr.netAddr {
|
||||||
switch arpPacket.opcode {
|
switch arpPacket.opcode {
|
||||||
case .request:
|
case .request:
|
||||||
NSLog("[SDLContext] get arp request packet")
|
self.logger.log("[SDLContext] get arp request packet", level: .debug)
|
||||||
let response = ARPPacket.arpResponse(for: arpPacket, mac: self.devAddr.mac, ip: self.devAddr.netAddr)
|
let response = ARPPacket.arpResponse(for: arpPacket, mac: self.devAddr.mac, ip: self.devAddr.netAddr)
|
||||||
await self.routeLayerPacket(dstMac: arpPacket.senderMAC, type: .arp, data: response.marshal())
|
await self.routeLayerPacket(dstMac: arpPacket.senderMAC, type: .arp, data: response.marshal())
|
||||||
case .response:
|
case .response:
|
||||||
NSLog("[SDLContext] get arp response packet")
|
self.logger.log("[SDLContext] get arp response packet", level: .debug)
|
||||||
await self.arpServer.append(ip: arpPacket.senderIP, mac: arpPacket.senderMAC)
|
await self.arpServer.append(ip: arpPacket.senderIP, mac: arpPacket.senderMAC)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
NSLog("[SDLContext] get invalid arp packet, target_ip: \(arpPacket)")
|
self.logger.log("[SDLContext] get invalid arp packet: \(arpPacket), target_ip: \(SDLUtil.int32ToIp(arpPacket.targetIP)), net ip: \(SDLUtil.int32ToIp(self.devAddr.netAddr))", level: .debug)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
NSLog("[SDLContext] get invalid arp packet")
|
self.logger.log("[SDLContext] get invalid arp packet", level: .debug)
|
||||||
}
|
}
|
||||||
case .ipv4:
|
case .ipv4:
|
||||||
NSLog("[SDLContext] get ipv4 packet")
|
|
||||||
guard let ipPacket = IPPacket(layerPacket.data), ipPacket.header.destination == self.devAddr.netAddr else {
|
guard let ipPacket = IPPacket(layerPacket.data), ipPacket.header.destination == self.devAddr.netAddr else {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
let packet = NEPacket(data: ipPacket.data, protocolFamily: 2)
|
let packet = NEPacket(data: ipPacket.data, protocolFamily: 2)
|
||||||
self.provider.packetFlow.writePacketObjects([packet])
|
self.provider.packetFlow.writePacketObjects([packet])
|
||||||
default:
|
default:
|
||||||
NSLog("[SDLContext] get invalid packet")
|
self.logger.log("[SDLContext] get invalid packet", level: .debug)
|
||||||
}
|
}
|
||||||
} catch let err {
|
} catch let err {
|
||||||
NSLog("[SDLContext] didReadData err: \(err)")
|
self.logger.log("[SDLContext] didReadData err: \(err)", level: .warning)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 流量统计
|
// 流量统计
|
||||||
public func flowReportTask() {
|
// public func flowReportTask() {
|
||||||
Task {
|
// Task {
|
||||||
// 每分钟汇报一次
|
// // 每分钟汇报一次
|
||||||
self.flowTracerCancel = Timer.publish(every: 60.0, on: .main, in: .common).autoconnect()
|
// self.flowTracerCancel = Timer.publish(every: 60.0, on: .main, in: .common).autoconnect()
|
||||||
.sink { _ in
|
// .sink { _ in
|
||||||
Task {
|
// Task {
|
||||||
let (forwardNum, p2pNum, inboundNum) = await self.flowTracer.reset()
|
// let (forwardNum, p2pNum, inboundNum) = await self.flowTracer.reset()
|
||||||
self.superClient?.flowReport(forwardNum: forwardNum, p2pNum: p2pNum, inboundNum: inboundNum)
|
// await self.superClient?.flowReport(forwardNum: forwardNum, p2pNum: p2pNum, inboundNum: inboundNum)
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
// 网络改变时需要重新配置网络信息
|
// 网络改变时需要重新配置网络信息
|
||||||
private func didNetworkConfigChanged(devAddr: SDLDevAddr, dnsServers: [String]? = nil) async {
|
private func didNetworkConfigChanged(devAddr: SDLDevAddr, dnsServers: [String]? = nil) async {
|
||||||
@ -389,7 +458,7 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
networkSettings.dnsSettings = NEDNSSettings(servers: ["8.8.8.8", "114.114.114.114"])
|
networkSettings.dnsSettings = NEDNSSettings(servers: ["8.8.8.8", "114.114.114.114"])
|
||||||
}
|
}
|
||||||
|
|
||||||
NSLog("[SDLContext] Tun started at network ip: \(netAddress.ipAddress), mask: \(netAddress.maskAddress)")
|
self.logger.log("[SDLContext] Tun started at network ip: \(netAddress.ipAddress), mask: \(netAddress.maskAddress)", level: .info)
|
||||||
|
|
||||||
let ipv4Settings = NEIPv4Settings(addresses: [netAddress.ipAddress], subnetMasks: [netAddress.maskAddress])
|
let ipv4Settings = NEIPv4Settings(addresses: [netAddress.ipAddress], subnetMasks: [netAddress.maskAddress])
|
||||||
// 设置路由表
|
// 设置路由表
|
||||||
@ -401,13 +470,11 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
// 网卡配置设置必须成功
|
// 网卡配置设置必须成功
|
||||||
do {
|
do {
|
||||||
try await self.provider.setTunnelNetworkSettings(networkSettings)
|
try await self.provider.setTunnelNetworkSettings(networkSettings)
|
||||||
|
|
||||||
await self.holerManager.cleanup()
|
|
||||||
self.startReader()
|
self.startReader()
|
||||||
|
|
||||||
NSLog("[SDLContext] setTunnelNetworkSettings success, start read packet")
|
self.logger.log("[SDLContext] setTunnelNetworkSettings success, start read packet", level: .info)
|
||||||
} catch let err {
|
} catch let err {
|
||||||
NSLog("[SDLContext] setTunnelNetworkSettings get error: \(err)")
|
self.logger.log("[SDLContext] setTunnelNetworkSettings get error: \(err)", level: .error)
|
||||||
exit(-1)
|
exit(-1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -420,10 +487,6 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
// 开启新的任务
|
// 开启新的任务
|
||||||
self.readTask = Task(priority: .high) {
|
self.readTask = Task(priority: .high) {
|
||||||
repeat {
|
repeat {
|
||||||
if Task.isCancelled {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
let (packets, numbers) = await self.provider.packetFlow.readPackets()
|
let (packets, numbers) = await self.provider.packetFlow.readPackets()
|
||||||
for (data, number) in zip(packets, numbers) where number == 2 {
|
for (data, number) in zip(packets, numbers) where number == 2 {
|
||||||
if let packet = IPPacket(data) {
|
if let packet = IPPacket(data) {
|
||||||
@ -443,10 +506,10 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
else {
|
else {
|
||||||
// 构造arp请求
|
// 构造arp请求
|
||||||
let broadcastMac = Data([0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])
|
let broadcastMac = Data([0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])
|
||||||
let arpReqeust: ARPPacket = ARPPacket.arpRequest(senderIP: self.devAddr.netAddr, senderMAC: self.devAddr.mac, targetIP: dstIp)
|
let arpReqeust = ARPPacket.arpRequest(senderIP: self.devAddr.netAddr, senderMAC: self.devAddr.mac, targetIP: dstIp)
|
||||||
await self.routeLayerPacket(dstMac: broadcastMac, type: .arp, data: arpReqeust.marshal())
|
await self.routeLayerPacket(dstMac: broadcastMac, type: .arp, data: arpReqeust.marshal())
|
||||||
|
|
||||||
NSLog("[SDLContext] dstIp: \(dstIp) arp query not found")
|
self.logger.log("[SDLContext] dstIp: \(dstIp) arp query not found", level: .debug)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -466,59 +529,73 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
|
|
||||||
// 通过session发送到对端
|
// 通过session发送到对端
|
||||||
if let session = await self.sessionManager.getSession(toAddress: dstMac) {
|
if let session = await self.sessionManager.getSession(toAddress: dstMac) {
|
||||||
NSLog("[SDLContext] send packet by session: \(session)")
|
self.logger.log("[SDLContext] send packet by session: \(session)", level: .debug)
|
||||||
self.udpHole?.sendPacket(context: self, session: session, data: encodedPacket)
|
await self.udpHole?.sendPacket(context: self, session: session, data: encodedPacket)
|
||||||
|
|
||||||
await self.flowTracer.inc(num: data.count, type: .p2p)
|
await self.flowTracer.inc(num: data.count, type: .p2p)
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// 通过super_node进行转发
|
// 通过super_node进行转发
|
||||||
self.udpHole?.forwardPacket(context: self, dst_mac: dstMac, data: encodedPacket)
|
await self.udpHole?.forwardPacket(context: self, dst_mac: dstMac, data: encodedPacket)
|
||||||
// 流量统计
|
// 流量统计
|
||||||
await self.flowTracer.inc(num: data.count, type: .forward)
|
await self.flowTracer.inc(num: data.count, type: .forward)
|
||||||
|
|
||||||
// 尝试打洞
|
// 尝试打洞
|
||||||
await self.holerManager.addHoler(dstMac: dstMac) {
|
let registerRequest = RegisterRequest(srcMac: self.devAddr.mac, dstMac: dstMac, networkId: self.devAddr.networkID)
|
||||||
self.holerTask(dstMac: dstMac)
|
self.submitRegisterRequest(request: registerRequest)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func holerTask(dstMac: Data) -> Task<(), Never> {
|
private func submitRegisterRequest(request: RegisterRequest) {
|
||||||
return Task {
|
self.locker.lock()
|
||||||
guard let message = try? await self.superClient?.queryInfo(context: self, dst_mac: dstMac) else {
|
defer {
|
||||||
return
|
self.locker.unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
switch message.packet {
|
let dstMac = request.dstMac
|
||||||
case .empty:
|
if let publisher = self.holerPublishers[dstMac] {
|
||||||
SDLLogger.log("[SDLContext] hole query_info get empty: \(message)", level: .debug)
|
publisher.send(request)
|
||||||
case .peerInfo(let peerInfo):
|
} else {
|
||||||
if let remoteAddress = peerInfo.v4Info.socketAddress() {
|
let publisher = PassthroughSubject<RegisterRequest, Never>()
|
||||||
SDLLogger.log("[SDLContext] hole sock address: \(remoteAddress)", level: .warning)
|
publisher.throttle(for: .seconds(5), scheduler: DispatchQueue.global(), latest: true)
|
||||||
// 发送register包
|
.sink { request in
|
||||||
self.udpHole?.sendRegister(context: self, remoteAddress: remoteAddress, dst_mac: dstMac)
|
Task {
|
||||||
} else {
|
await self.tryHole(request: request)
|
||||||
SDLLogger.log("[SDLContext] hole sock address is invalid: \(peerInfo.v4Info)", level: .warning)
|
}
|
||||||
}
|
}
|
||||||
default:
|
.store(in: &self.bag)
|
||||||
SDLLogger.log("[SDLContext] hole query_info is packet: \(message)", level: .warning)
|
|
||||||
|
self.holerPublishers[dstMac] = publisher
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func tryHole(request: RegisterRequest) async {
|
||||||
|
guard let message = try? await self.superClient?.queryInfo(dst_mac: request.dstMac).get() else {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch message.packet {
|
||||||
|
case .empty:
|
||||||
|
self.logger.log("[SDLContext] hole query_info get empty: \(message)", level: .debug)
|
||||||
|
case .peerInfo(let peerInfo):
|
||||||
|
if let remoteAddress = peerInfo.v4Info.socketAddress() {
|
||||||
|
self.logger.log("[SDLContext] hole sock address: \(remoteAddress)", level: .debug)
|
||||||
|
// 发送register包
|
||||||
|
await self.udpHole?.sendRegister(remoteAddress: remoteAddress, networkId: request.networkId, srcMac: request.srcMac, dst_mac: request.dstMac)
|
||||||
|
} else {
|
||||||
|
self.logger.log("[SDLContext] hole sock address is invalid: \(peerInfo.v4Info)", level: .warning)
|
||||||
}
|
}
|
||||||
|
default:
|
||||||
|
self.logger.log("[SDLContext] hole query_info is packet: \(message)", level: .warning)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
deinit {
|
deinit {
|
||||||
self.stunCancel?.cancel()
|
self.rootTask?.cancel()
|
||||||
self.udpHole = nil
|
self.udpHole = nil
|
||||||
self.superClient = nil
|
self.superClient = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
//--MARK: 获取设备的UUID
|
|
||||||
|
|
||||||
extension SDLContext {
|
|
||||||
|
|
||||||
public static func getUUID() -> String {
|
public static func getUUID() -> String {
|
||||||
let userDefaults = UserDefaults.standard
|
let userDefaults = UserDefaults.standard
|
||||||
if let uuid = userDefaults.value(forKey: "gClientId") as? String {
|
if let uuid = userDefaults.value(forKey: "gClientId") as? String {
|
||||||
@ -557,11 +634,4 @@ extension SDLContext {
|
|||||||
return Data(macAddress)
|
return Data(macAddress)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 将mac地址转换成字符串
|
|
||||||
private static func formatMacAddress(mac: Data) -> String {
|
|
||||||
let bytes = [UInt8](mac)
|
|
||||||
|
|
||||||
return bytes.map { String(format: "%02X", $0) }.joined(separator: ":").lowercased()
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
11
Sources/Punchnet/SDLError.swift
Normal file
11
Sources/Punchnet/SDLError.swift
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
//
|
||||||
|
// SDLError.swift
|
||||||
|
// sdlan
|
||||||
|
//
|
||||||
|
// Created by 安礼成 on 2025/8/2.
|
||||||
|
//
|
||||||
|
|
||||||
|
enum SDLError: Error {
|
||||||
|
case socketClosed
|
||||||
|
case socketError
|
||||||
|
}
|
||||||
@ -4,19 +4,42 @@
|
|||||||
//
|
//
|
||||||
// Created by 安礼成 on 2024/3/13.
|
// Created by 安礼成 on 2024/3/13.
|
||||||
//
|
//
|
||||||
|
|
||||||
import Foundation
|
import Foundation
|
||||||
|
import os.log
|
||||||
|
|
||||||
struct SDLLogger: @unchecked Sendable {
|
public class SDLLogger: @unchecked Sendable {
|
||||||
enum Level {
|
public enum Level: Int8, CustomStringConvertible {
|
||||||
case debug
|
case debug = 0
|
||||||
case info
|
case info = 1
|
||||||
case warning
|
case warning = 2
|
||||||
case error
|
case error = 3
|
||||||
|
|
||||||
|
public var description: String {
|
||||||
|
switch self {
|
||||||
|
case .debug:
|
||||||
|
return "Debug"
|
||||||
|
case .info:
|
||||||
|
return "Info"
|
||||||
|
case .warning:
|
||||||
|
return "Warning"
|
||||||
|
case .error:
|
||||||
|
return "Error"
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static func log(_ message: String, level: Level = .debug) {
|
private let level: Level
|
||||||
NSLog(message)
|
private let log: OSLog
|
||||||
|
|
||||||
|
public init(level: Level) {
|
||||||
|
self.level = level
|
||||||
|
self.log = OSLog(subsystem: "com.jihe.punchnet", category: "punchnet")
|
||||||
|
}
|
||||||
|
|
||||||
|
public func log(_ message: String, level: Level = .debug) {
|
||||||
|
if self.level.rawValue <= level.rawValue {
|
||||||
|
os_log("%{public}@: %{public}@", log: self.log, type: .debug, level.description, message)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -54,7 +54,7 @@ enum SDLUpgradeType: UInt32 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Id生成器
|
// Id生成器
|
||||||
struct SDLIdGenerator {
|
struct SDLIdGenerator: Sendable {
|
||||||
// 消息体id
|
// 消息体id
|
||||||
private var packetId: UInt32
|
private var packetId: UInt32
|
||||||
|
|
||||||
|
|||||||
@ -9,6 +9,7 @@ import Foundation
|
|||||||
import NIOCore
|
import NIOCore
|
||||||
|
|
||||||
// 网络类型探测器
|
// 网络类型探测器
|
||||||
|
@available(macOS 14, *)
|
||||||
struct SDLNatProber {
|
struct SDLNatProber {
|
||||||
|
|
||||||
// 定义nat类型
|
// 定义nat类型
|
||||||
@ -22,7 +23,7 @@ struct SDLNatProber {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 获取当前所处的网络的nat类型
|
// 获取当前所处的网络的nat类型
|
||||||
static func getNatType(udpHole: SDLUDPHole?, config: SDLConfiguration) async -> NatType {
|
static func getNatType(udpHole: SDLUDPHole?, config: SDLConfiguration, logger: SDLLogger) async -> NatType {
|
||||||
guard let udpHole else {
|
guard let udpHole else {
|
||||||
return .blocked
|
return .blocked
|
||||||
}
|
}
|
||||||
@ -34,7 +35,7 @@ struct SDLNatProber {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 网络没有在nat下
|
// 网络没有在nat下
|
||||||
if natAddress1 == udpHole.localAddress {
|
if await natAddress1 == udpHole.localAddress {
|
||||||
return .noNat
|
return .noNat
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -45,7 +46,7 @@ struct SDLNatProber {
|
|||||||
|
|
||||||
// 如果natAddress2 的IP地址与上次回来的IP是不一样的,它就是对称型NAT; 这次的包也一定能发成功并收到
|
// 如果natAddress2 的IP地址与上次回来的IP是不一样的,它就是对称型NAT; 这次的包也一定能发成功并收到
|
||||||
// 如果ip地址变了,这说明{dstIp, dstPort, srcIp, srcPort}, 其中有一个变了;则用新的ip地址
|
// 如果ip地址变了,这说明{dstIp, dstPort, srcIp, srcPort}, 其中有一个变了;则用新的ip地址
|
||||||
NSLog("nat_address1: \(natAddress1), nat_address2: \(natAddress2)")
|
logger.log("[SDLNatProber] nat_address1: \(natAddress1), nat_address2: \(natAddress2)", level: .debug)
|
||||||
if let ipAddress1 = natAddress1.ipAddress, let ipAddress2 = natAddress2.ipAddress, ipAddress1 != ipAddress2 {
|
if let ipAddress1 = natAddress1.ipAddress, let ipAddress2 = natAddress2.ipAddress, ipAddress1 != ipAddress2 {
|
||||||
return .symmetric
|
return .symmetric
|
||||||
}
|
}
|
||||||
@ -53,14 +54,14 @@ struct SDLNatProber {
|
|||||||
// step3: ip1:port1 <---- ip2:port2 (ip地址和port都变的情况)
|
// step3: ip1:port1 <---- ip2:port2 (ip地址和port都变的情况)
|
||||||
// 如果能收到的,说明是完全锥形 说明是IP地址限制锥型NAT,如果不能收到说明是端口限制锥型。
|
// 如果能收到的,说明是完全锥形 说明是IP地址限制锥型NAT,如果不能收到说明是端口限制锥型。
|
||||||
if let natAddress3 = await getNatAddress(udpHole, remoteAddress: addressArray[0][0], attr: .peer) {
|
if let natAddress3 = await getNatAddress(udpHole, remoteAddress: addressArray[0][0], attr: .peer) {
|
||||||
NSLog("nat_address1: \(natAddress1), nat_address2: \(natAddress2), nat_address3: \(natAddress3)")
|
logger.log("[SDLNatProber] nat_address1: \(natAddress1), nat_address2: \(natAddress2), nat_address3: \(natAddress3)", level: .debug)
|
||||||
return .fullCone
|
return .fullCone
|
||||||
}
|
}
|
||||||
|
|
||||||
// step3: ip1:port1 <---- ip1:port2 (port改变情况)
|
// step3: ip1:port1 <---- ip1:port2 (port改变情况)
|
||||||
// 如果能收到的说明是IP地址限制锥型NAT,如果不能收到说明是端口限制锥型。
|
// 如果能收到的说明是IP地址限制锥型NAT,如果不能收到说明是端口限制锥型。
|
||||||
if let natAddress4 = await getNatAddress(udpHole, remoteAddress: addressArray[0][0], attr: .port) {
|
if let natAddress4 = await getNatAddress(udpHole, remoteAddress: addressArray[0][0], attr: .port) {
|
||||||
NSLog("nat_address1: \(natAddress1), nat_address2: \(natAddress2), nat_address4: \(natAddress4)")
|
logger.log("[SDLNatProber] nat_address1: \(natAddress1), nat_address2: \(natAddress2), nat_address4: \(natAddress4)", level: .debug)
|
||||||
return .coneRestricted
|
return .coneRestricted
|
||||||
} else {
|
} else {
|
||||||
return .portRestricted
|
return .portRestricted
|
||||||
@ -68,7 +69,7 @@ struct SDLNatProber {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static func getNatAddress(_ udpHole: SDLUDPHole, remoteAddress: SocketAddress, attr: SDLProbeAttr) async -> SocketAddress? {
|
private static func getNatAddress(_ udpHole: SDLUDPHole, remoteAddress: SocketAddress, attr: SDLProbeAttr) async -> SocketAddress? {
|
||||||
let stunProbeReply = await udpHole.stunProbe(remoteAddress: remoteAddress, attr: attr, timeout: 5)
|
let stunProbeReply = try? await udpHole.stunProbe(remoteAddress: remoteAddress, attr: attr, timeout: 5)
|
||||||
|
|
||||||
return stunProbeReply?.socketAddress()
|
return stunProbeReply?.socketAddress()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,9 +15,9 @@ class SDLNetworkMonitor: @unchecked Sendable {
|
|||||||
private var interfaceType: NWInterface.InterfaceType?
|
private var interfaceType: NWInterface.InterfaceType?
|
||||||
private let publisher = PassthroughSubject<NWInterface.InterfaceType, Never>()
|
private let publisher = PassthroughSubject<NWInterface.InterfaceType, Never>()
|
||||||
private var cancel: AnyCancellable?
|
private var cancel: AnyCancellable?
|
||||||
private let queue = DispatchQueue(label: "networkMonitorQueue")
|
|
||||||
|
|
||||||
public let eventFlow = PassthroughSubject<MonitorEvent, Never>()
|
public let eventStream: AsyncStream<MonitorEvent>
|
||||||
|
private let eventContinuation: AsyncStream<MonitorEvent>.Continuation
|
||||||
|
|
||||||
enum MonitorEvent {
|
enum MonitorEvent {
|
||||||
case changed
|
case changed
|
||||||
@ -26,6 +26,7 @@ class SDLNetworkMonitor: @unchecked Sendable {
|
|||||||
|
|
||||||
init() {
|
init() {
|
||||||
self.monitor = NWPathMonitor()
|
self.monitor = NWPathMonitor()
|
||||||
|
(self.eventStream , self.eventContinuation) = AsyncStream.makeStream(of: MonitorEvent.self, bufferingPolicy: .unbounded)
|
||||||
}
|
}
|
||||||
|
|
||||||
func start() {
|
func start() {
|
||||||
@ -39,16 +40,16 @@ class SDLNetworkMonitor: @unchecked Sendable {
|
|||||||
self.publisher.send(.wiredEthernet)
|
self.publisher.send(.wiredEthernet)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
self.eventFlow.send(.unreachable)
|
self.eventContinuation.yield(.unreachable)
|
||||||
self.interfaceType = nil
|
self.interfaceType = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.monitor.start(queue: self.queue)
|
self.monitor.start(queue: DispatchQueue.global())
|
||||||
|
|
||||||
self.cancel = publisher.throttle(for: 5.0, scheduler: self.queue, latest: true)
|
self.cancel = publisher.throttle(for: 5.0, scheduler: DispatchQueue.global(), latest: true)
|
||||||
.sink { type in
|
.sink { type in
|
||||||
if self.interfaceType != nil && self.interfaceType != type {
|
if self.interfaceType != nil && self.interfaceType != type {
|
||||||
self.eventFlow.send(.changed)
|
self.eventContinuation.yield(.changed)
|
||||||
}
|
}
|
||||||
self.interfaceType = type
|
self.interfaceType = type
|
||||||
}
|
}
|
||||||
@ -57,6 +58,7 @@ class SDLNetworkMonitor: @unchecked Sendable {
|
|||||||
deinit {
|
deinit {
|
||||||
self.monitor.cancel()
|
self.monitor.cancel()
|
||||||
self.cancel?.cancel()
|
self.cancel?.cancel()
|
||||||
|
self.eventContinuation.finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,75 +15,75 @@ import Foundation
|
|||||||
//
|
//
|
||||||
|
|
||||||
import Foundation
|
import Foundation
|
||||||
@preconcurrency import NIOCore
|
import NIOCore
|
||||||
import NIOPosix
|
import NIOPosix
|
||||||
|
|
||||||
// 处理和sn-server服务器之间的通讯
|
// 处理和sn-server服务器之间的通讯
|
||||||
class SDLNoticeClient: ChannelInboundHandler, @unchecked Sendable {
|
@available(macOS 14, *)
|
||||||
public typealias InboundIn = AddressedEnvelope<ByteBuffer>
|
actor SDLNoticeClient {
|
||||||
public typealias OutboundOut = AddressedEnvelope<ByteBuffer>
|
|
||||||
|
|
||||||
var channel: Channel?
|
|
||||||
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||||
|
private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>
|
||||||
private let remoteAddress: SocketAddress
|
private let remoteAddress: SocketAddress
|
||||||
|
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: Data.self, bufferingPolicy: .unbounded)
|
||||||
|
|
||||||
init() {
|
private let logger: SDLLogger
|
||||||
self.remoteAddress = try! SocketAddress(ipAddress: "127.0.0.1", port: 50195)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 启动函数
|
// 启动函数
|
||||||
func start() {
|
init(logger: SDLLogger) async throws {
|
||||||
|
self.logger = logger
|
||||||
|
|
||||||
|
self.remoteAddress = try! SocketAddress(ipAddress: "127.0.0.1", port: 50195)
|
||||||
|
|
||||||
let bootstrap = DatagramBootstrap(group: self.group)
|
let bootstrap = DatagramBootstrap(group: self.group)
|
||||||
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
||||||
.channelInitializer { channel in
|
|
||||||
// 接收缓冲区
|
self.asyncChannel = try await bootstrap.bind(host: "0.0.0.0", port: 0)
|
||||||
channel.pipeline.addHandler(self)
|
.flatMapThrowing {channel in
|
||||||
|
return try NIOAsyncChannel(wrappingChannelSynchronously: channel, configuration: .init(
|
||||||
|
inboundType: AddressedEnvelope<ByteBuffer>.self,
|
||||||
|
outboundType: AddressedEnvelope<ByteBuffer>.self
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
.get()
|
||||||
|
|
||||||
self.channel = try! bootstrap.bind(host: "0.0.0.0", port: 0).wait()
|
self.logger.log("[SDLNoticeClient] started and listening on: \(self.asyncChannel.channel.localAddress!)", level: .debug)
|
||||||
SDLLogger.log("[SDLNoticeClient] started and listening on: \(self.channel?.localAddress!)", level: .debug)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// -- MARK: ChannelInboundHandler Methods
|
func start() async throws {
|
||||||
|
try await self.asyncChannel.executeThenClose { inbound, outbound in
|
||||||
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
|
group.addTask {
|
||||||
|
try await self.asyncChannel.channel.closeFuture.get()
|
||||||
|
throw SDLError.socketClosed
|
||||||
|
}
|
||||||
|
|
||||||
public func channelActive(context: ChannelHandlerContext) {
|
group.addTask {
|
||||||
|
defer {
|
||||||
|
self.writeContinuation.finish()
|
||||||
|
}
|
||||||
|
|
||||||
}
|
for try await message in self.writeStream {
|
||||||
|
let buf = self.asyncChannel.channel.allocator.buffer(bytes: message)
|
||||||
|
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: self.remoteAddress, data: buf)
|
||||||
|
|
||||||
// 接收到的消息, 消息需要根据类型分流
|
try await outbound.write(envelope)
|
||||||
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
}
|
||||||
context.fireChannelRead(data)
|
}
|
||||||
}
|
|
||||||
|
|
||||||
public func errorCaught(context: ChannelHandlerContext, error: Error) {
|
for try await _ in group {
|
||||||
// As we are not really interested getting notified on success or failure we just pass nil as promise to
|
|
||||||
// reduce allocations.
|
|
||||||
context.close(promise: nil)
|
|
||||||
self.channel = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
public func channelInactive(context: ChannelHandlerContext) {
|
}
|
||||||
self.channel = nil
|
}
|
||||||
context.close(promise: nil)
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 处理写入逻辑
|
// 处理写入逻辑
|
||||||
func send(data: Data) {
|
func send(data: Data) {
|
||||||
guard let channel = self.channel else {
|
self.writeContinuation.yield(data)
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
let remoteAddress = self.remoteAddress
|
|
||||||
let allocator = channel.allocator
|
|
||||||
|
|
||||||
channel.eventLoop.execute { [allocator] in
|
|
||||||
let buffer = allocator.buffer(bytes: data)
|
|
||||||
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: remoteAddress, data: buffer)
|
|
||||||
channel.writeAndFlush(self.wrapOutboundOut(envelope), promise: nil)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
deinit {
|
deinit {
|
||||||
try? self.group.syncShutdownGracefully()
|
try? self.group.syncShutdownGracefully()
|
||||||
|
self.writeContinuation.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -20,7 +20,6 @@ class SDLQPSCounter: @unchecked Sendable {
|
|||||||
timer.schedule(deadline: .now(), repeating: .seconds(1), leeway: .milliseconds(100))
|
timer.schedule(deadline: .now(), repeating: .seconds(1), leeway: .milliseconds(100))
|
||||||
timer.setEventHandler { [weak self] in
|
timer.setEventHandler { [weak self] in
|
||||||
guard let self = self else { return }
|
guard let self = self else { return }
|
||||||
NSLog("[\(self.label)] QPS: \(self.count)")
|
|
||||||
self.count = 0
|
self.count = 0
|
||||||
}
|
}
|
||||||
timer.resume()
|
timer.resume()
|
||||||
|
|||||||
@ -8,59 +8,126 @@
|
|||||||
import Foundation
|
import Foundation
|
||||||
import NIOCore
|
import NIOCore
|
||||||
import NIOPosix
|
import NIOPosix
|
||||||
import Combine
|
|
||||||
|
|
||||||
// --MARK: 和SuperNode的客户端
|
// --MARK: 和SuperNode的客户端
|
||||||
class SDLSuperClient: ChannelInboundHandler, @unchecked Sendable {
|
@available(macOS 14, *)
|
||||||
public typealias InboundIn = ByteBuffer
|
actor SDLSuperClient {
|
||||||
public typealias OutboundOut = ByteBuffer
|
|
||||||
|
|
||||||
public typealias CallbackFun = (SDLSuperInboundMessage?) -> Void
|
|
||||||
|
|
||||||
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||||
private var channel: Channel?
|
private let asyncChannel: NIOAsyncChannel<ByteBuffer,ByteBuffer>
|
||||||
|
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: TcpMessage.self, bufferingPolicy: .unbounded)
|
||||||
|
private var callbackPromises: [UInt32:EventLoopPromise<SDLSuperInboundMessage>] = [:]
|
||||||
|
|
||||||
|
public let eventFlow: AsyncStream<SuperEvent>
|
||||||
|
private let inboundContinuation: AsyncStream<SuperEvent>.Continuation
|
||||||
|
|
||||||
// id生成器
|
// id生成器
|
||||||
var idGenerator = SDLIdGenerator(seed: 1)
|
var idGenerator = SDLIdGenerator(seed: 1)
|
||||||
private var callbackManager = SuperCallbackManager()
|
|
||||||
|
|
||||||
let host: String
|
private let logger: SDLLogger
|
||||||
let port: Int
|
// 发送的消息格式
|
||||||
|
struct TcpMessage {
|
||||||
private var pingCancel: AnyCancellable?
|
let packetId: UInt32
|
||||||
|
let type: SDLPacketType
|
||||||
public var eventFlow = PassthroughSubject<SuperEvent, Never>()
|
let data: Data
|
||||||
|
}
|
||||||
|
|
||||||
// 定义事件类型
|
// 定义事件类型
|
||||||
enum SuperEvent {
|
enum SuperEvent {
|
||||||
case ready
|
case ready
|
||||||
case closed
|
|
||||||
case event(SDLEvent)
|
case event(SDLEvent)
|
||||||
case command(UInt32, SDLCommand)
|
case command(UInt32, SDLCommand)
|
||||||
}
|
}
|
||||||
|
|
||||||
init(host: String, port: Int) {
|
init(host: String, port: Int, logger: SDLLogger) async throws {
|
||||||
self.host = host
|
self.logger = logger
|
||||||
self.port = port
|
|
||||||
}
|
|
||||||
|
|
||||||
func start() async throws {
|
(self.eventFlow, self.inboundContinuation) = AsyncStream.makeStream(of: SuperEvent.self, bufferingPolicy: .unbounded)
|
||||||
let bootstrap = ClientBootstrap(group: self.group)
|
let bootstrap = ClientBootstrap(group: self.group)
|
||||||
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
||||||
.channelInitializer { channel in
|
.channelInitializer { channel in
|
||||||
return channel.pipeline.addHandlers([
|
return channel.pipeline.addHandlers([
|
||||||
ByteToMessageHandler(FixedHeaderDelimiterCoder()),
|
ByteToMessageHandler(FixedHeaderDecoder()),
|
||||||
MessageToByteHandler(FixedHeaderDelimiterCoder()),
|
MessageToByteHandler(FixedHeaderEncoder())
|
||||||
self
|
|
||||||
])
|
])
|
||||||
}
|
}
|
||||||
|
|
||||||
do {
|
self.asyncChannel = try await bootstrap.connect(host: host, port: port)
|
||||||
NSLog("super client connect: \(self.host):\(self.port)")
|
.flatMapThrowing { channel in
|
||||||
self.channel = try await bootstrap.connect(host: self.host, port: self.port).get()
|
return try NIOAsyncChannel(wrappingChannelSynchronously: channel, configuration: .init(
|
||||||
} catch let err {
|
inboundType: ByteBuffer.self,
|
||||||
NSLog("super client get error: \(err)")
|
outboundType: ByteBuffer.self
|
||||||
self.eventFlow.send(.closed)
|
))
|
||||||
|
}
|
||||||
|
.get()
|
||||||
|
}
|
||||||
|
|
||||||
|
func start() async throws {
|
||||||
|
try await withTaskCancellationHandler {
|
||||||
|
try await self.asyncChannel.executeThenClose { inbound, outbound in
|
||||||
|
self.inboundContinuation.yield(.ready)
|
||||||
|
|
||||||
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
|
group.addTask {
|
||||||
|
defer {
|
||||||
|
self.logger.log("[SDLSuperClient] inbound closed", level: .warning)
|
||||||
|
}
|
||||||
|
|
||||||
|
for try await var packet in inbound {
|
||||||
|
try Task.checkCancellation()
|
||||||
|
|
||||||
|
if let message = SDLSuperClientDecoder.decode(buffer: &packet) {
|
||||||
|
self.logger.log("[SDLSuperTransport] read message: \(message)", level: .debug)
|
||||||
|
switch message.packet {
|
||||||
|
case .event(let event):
|
||||||
|
self.inboundContinuation.yield(.event(event))
|
||||||
|
case .command(let command):
|
||||||
|
self.inboundContinuation.yield(.command(message.msgId, command))
|
||||||
|
default:
|
||||||
|
await self.fireCallback(message: message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
group.addTask {
|
||||||
|
defer {
|
||||||
|
self.logger.log("[SDLSuperClient] outbound closed", level: .warning)
|
||||||
|
}
|
||||||
|
|
||||||
|
for try await message in self.writeStream {
|
||||||
|
try Task.checkCancellation()
|
||||||
|
|
||||||
|
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 5)
|
||||||
|
buffer.writeInteger(message.packetId, as: UInt32.self)
|
||||||
|
buffer.writeBytes([message.type.rawValue])
|
||||||
|
buffer.writeBytes(message.data)
|
||||||
|
try await outbound.write(buffer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --MARK: 心跳机制
|
||||||
|
group.addTask {
|
||||||
|
defer {
|
||||||
|
self.logger.log("[SDLSuperClient] ping task closed", level: .warning)
|
||||||
|
}
|
||||||
|
|
||||||
|
while true {
|
||||||
|
try Task.checkCancellation()
|
||||||
|
await self.ping()
|
||||||
|
try await Task.sleep(nanoseconds: 5 * 1_000_000_000)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 迭代等待所有任务的退出, 第一个异常会被抛出
|
||||||
|
for try await _ in group {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} onCancel: {
|
||||||
|
self.inboundContinuation.finish()
|
||||||
|
self.writeContinuation.finish()
|
||||||
|
self.logger.log("[SDLSuperClient] withTaskCancellationHandler cancel")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -74,15 +141,7 @@ class SDLSuperClient: ChannelInboundHandler, @unchecked Sendable {
|
|||||||
self.send(type: .commandAck, packetId: packetId, data: data)
|
self.send(type: .commandAck, packetId: packetId, data: data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func registerSuper(context ctx: SDLContext) async -> SDLSuperInboundMessage? {
|
func registerSuper(context ctx: SDLContext) throws -> EventLoopFuture<SDLSuperInboundMessage> {
|
||||||
return await withCheckedContinuation { c in
|
|
||||||
self.registerSuper(context: ctx) { message in
|
|
||||||
c.resume(returning: message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func registerSuper(context ctx: SDLContext, callback: @escaping CallbackFun) {
|
|
||||||
var registerSuper = SDLRegisterSuper()
|
var registerSuper = SDLRegisterSuper()
|
||||||
registerSuper.version = UInt32(ctx.config.version)
|
registerSuper.version = UInt32(ctx.config.version)
|
||||||
registerSuper.clientID = ctx.config.clientId
|
registerSuper.clientID = ctx.config.clientId
|
||||||
@ -92,23 +151,15 @@ class SDLSuperClient: ChannelInboundHandler, @unchecked Sendable {
|
|||||||
|
|
||||||
let data = try! registerSuper.serializedData()
|
let data = try! registerSuper.serializedData()
|
||||||
|
|
||||||
self.write(type: .registerSuper, data: data, callback: callback)
|
return self.write(type: .registerSuper, data: data)
|
||||||
}
|
|
||||||
|
|
||||||
func queryInfo(context ctx: SDLContext, dst_mac: Data) async throws -> SDLSuperInboundMessage? {
|
|
||||||
return await withCheckedContinuation { c in
|
|
||||||
self.queryInfo(context: ctx, dst_mac: dst_mac) { message in
|
|
||||||
c.resume(returning: message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 查询目标服务器的相关信息
|
// 查询目标服务器的相关信息
|
||||||
func queryInfo(context ctx: SDLContext, dst_mac: Data, callback: @escaping CallbackFun) {
|
func queryInfo(dst_mac: Data) async throws -> EventLoopFuture<SDLSuperInboundMessage> {
|
||||||
var queryInfo = SDLQueryInfo()
|
var queryInfo = SDLQueryInfo()
|
||||||
queryInfo.dstMac = dst_mac
|
queryInfo.dstMac = dst_mac
|
||||||
|
|
||||||
self.write(type: .queryInfo, data: try! queryInfo.serializedData(), callback: callback)
|
return self.write(type: .queryInfo, data: try! queryInfo.serializedData())
|
||||||
}
|
}
|
||||||
|
|
||||||
func unregister(context ctx: SDLContext) throws {
|
func unregister(context ctx: SDLContext) throws {
|
||||||
@ -128,156 +179,40 @@ class SDLSuperClient: ChannelInboundHandler, @unchecked Sendable {
|
|||||||
self.send(type: .flowTracer, packetId: 0, data: try! flow.serializedData())
|
self.send(type: .flowTracer, packetId: 0, data: try! flow.serializedData())
|
||||||
}
|
}
|
||||||
|
|
||||||
// --MARK: ChannelInboundHandler
|
private func write(type: SDLPacketType, data: Data) -> EventLoopFuture<SDLSuperInboundMessage> {
|
||||||
|
|
||||||
public func channelActive(context: ChannelHandlerContext) {
|
|
||||||
self.startPingTicker()
|
|
||||||
self.eventFlow.send(.ready)
|
|
||||||
}
|
|
||||||
|
|
||||||
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
|
||||||
var buffer = self.unwrapInboundIn(data)
|
|
||||||
if let message = decode(buffer: &buffer) {
|
|
||||||
SDLLogger.log("[SDLSuperTransport] read message: \(message)", level: .warning)
|
|
||||||
|
|
||||||
switch message.packet {
|
|
||||||
case .event(let event):
|
|
||||||
self.eventFlow.send(.event(event))
|
|
||||||
case .command(let command):
|
|
||||||
self.eventFlow.send(.command(message.msgId, command))
|
|
||||||
default:
|
|
||||||
self.callbackManager.fireCallback(message: message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public func errorCaught(context: ChannelHandlerContext, error: Error) {
|
|
||||||
SDLLogger.log("[SDLSuperTransport] error: \(error)", level: .warning)
|
|
||||||
self.channel = nil
|
|
||||||
self.eventFlow.send(.closed)
|
|
||||||
context.close(promise: nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
public func channelInactive(context: ChannelHandlerContext) {
|
|
||||||
SDLLogger.log("[SDLSuperTransport] channelInactive", level: .warning)
|
|
||||||
self.channel = nil
|
|
||||||
context.close(promise: nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func write(type: SDLPacketType, data: Data, callback: @escaping CallbackFun) {
|
|
||||||
guard let channel = self.channel else {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
SDLLogger.log("[SDLSuperTransport] will write data: \(data)", level: .debug)
|
|
||||||
|
|
||||||
let packetId = idGenerator.nextId()
|
let packetId = idGenerator.nextId()
|
||||||
self.callbackManager.addCallback(id: packetId, callback: callback)
|
let promise = self.asyncChannel.channel.eventLoop.makePromise(of: SDLSuperInboundMessage.self)
|
||||||
|
self.callbackPromises[packetId] = promise
|
||||||
|
|
||||||
channel.eventLoop.execute {
|
self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data))
|
||||||
var buffer = channel.allocator.buffer(capacity: data.count + 5)
|
|
||||||
buffer.writeInteger(packetId, as: UInt32.self)
|
|
||||||
buffer.writeBytes([type.rawValue])
|
|
||||||
buffer.writeBytes(data)
|
|
||||||
|
|
||||||
channel.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
|
return promise.futureResult
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func send(type: SDLPacketType, packetId: UInt32, data: Data) {
|
private func send(type: SDLPacketType, packetId: UInt32, data: Data) {
|
||||||
guard let channel = self.channel else {
|
self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data))
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
channel.eventLoop.execute {
|
|
||||||
var buffer = channel.allocator.buffer(capacity: data.count + 5)
|
|
||||||
buffer.writeInteger(packetId, as: UInt32.self)
|
|
||||||
buffer.writeBytes([type.rawValue])
|
|
||||||
buffer.writeBytes(data)
|
|
||||||
|
|
||||||
channel.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// --MARK: 心跳机制
|
// 处理回调函数
|
||||||
|
private func fireCallback(message: SDLSuperInboundMessage) {
|
||||||
private func startPingTicker() {
|
if let promise = self.callbackPromises[message.msgId] {
|
||||||
self.pingCancel = Timer.publish(every: 5.0, on: .main, in: .common).autoconnect()
|
self.asyncChannel.channel.eventLoop.execute {
|
||||||
.sink { _ in
|
promise.succeed(message)
|
||||||
// 保持和super-node的心跳机制
|
|
||||||
self.ping()
|
|
||||||
}
|
}
|
||||||
|
self.callbackPromises.removeValue(forKey: message.msgId)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
deinit {
|
deinit {
|
||||||
self.pingCancel?.cancel()
|
|
||||||
try! group.syncShutdownGracefully()
|
try! group.syncShutdownGracefully()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// 基于2字节固定长度的分包协议
|
|
||||||
extension SDLSuperClient {
|
|
||||||
private final class FixedHeaderDelimiterCoder: ByteToMessageDecoder, MessageToByteEncoder {
|
|
||||||
typealias InboundIn = ByteBuffer
|
|
||||||
typealias InboundOut = ByteBuffer
|
|
||||||
|
|
||||||
func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
|
|
||||||
guard let len = buffer.getInteger(at: buffer.readerIndex, endianness: .big, as: UInt16.self) else {
|
|
||||||
return .needMoreData
|
|
||||||
}
|
|
||||||
|
|
||||||
if buffer.readableBytes >= len + 2 {
|
|
||||||
buffer.moveReaderIndex(forwardBy: 2)
|
|
||||||
if let bytes = buffer.readBytes(length: Int(len)) {
|
|
||||||
context.fireChannelRead(self.wrapInboundOut(ByteBuffer(bytes: bytes)))
|
|
||||||
}
|
|
||||||
return .continue
|
|
||||||
} else {
|
|
||||||
return .needMoreData
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func encode(data: ByteBuffer, out: inout ByteBuffer) throws {
|
|
||||||
let len = data.readableBytes
|
|
||||||
out.writeInteger(UInt16(len))
|
|
||||||
out.writeBytes(data.readableBytesView)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 回调函数管理器
|
|
||||||
extension SDLSuperClient {
|
|
||||||
|
|
||||||
private struct SuperCallbackManager {
|
|
||||||
// 对应请求体和相应的关系
|
|
||||||
private var callbacks: [UInt32:CallbackFun] = [:]
|
|
||||||
|
|
||||||
mutating func addCallback(id: UInt32, callback: @escaping CallbackFun) {
|
|
||||||
self.callbacks[id] = callback
|
|
||||||
}
|
|
||||||
|
|
||||||
mutating func fireCallback(message: SDLSuperInboundMessage) {
|
|
||||||
if let callback = self.callbacks[message.msgId] {
|
|
||||||
callback(message)
|
|
||||||
self.callbacks.removeValue(forKey: message.msgId)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
mutating func fireAllCallbacks(message: SDLSuperInboundMessage) {
|
|
||||||
for (_, callback) in self.callbacks {
|
|
||||||
callback(nil)
|
|
||||||
}
|
|
||||||
self.callbacks.removeAll()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// --MARK: 编解码器
|
// --MARK: 编解码器
|
||||||
extension SDLSuperClient {
|
private struct SDLSuperClientDecoder {
|
||||||
// 消息格式为: <<MsgId:32, Type:8, Body/binary>>
|
// 消息格式为: <<MsgId:32, Type:8, Body/binary>>
|
||||||
func decode(buffer: inout ByteBuffer) -> SDLSuperInboundMessage? {
|
static func decode(buffer: inout ByteBuffer) -> SDLSuperInboundMessage? {
|
||||||
guard let msgId = buffer.readInteger(as: UInt32.self),
|
guard let msgId = buffer.readInteger(as: UInt32.self),
|
||||||
let type = buffer.readInteger(as: UInt8.self),
|
let type = buffer.readInteger(as: UInt8.self),
|
||||||
let messageType = SDLPacketType(rawValue: type) else {
|
let messageType = SDLPacketType(rawValue: type) else {
|
||||||
@ -356,13 +291,36 @@ extension SDLSuperClient {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
extension ByteToMessageHandler: @unchecked @retroactive Sendable {
|
private final class FixedHeaderEncoder: MessageToByteEncoder, @unchecked Sendable {
|
||||||
|
typealias InboundIn = ByteBuffer
|
||||||
|
typealias InboundOut = ByteBuffer
|
||||||
|
|
||||||
|
func encode(data: ByteBuffer, out: inout ByteBuffer) throws {
|
||||||
|
let len = data.readableBytes
|
||||||
|
out.writeInteger(UInt16(len))
|
||||||
|
out.writeBytes(data.readableBytesView)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
extension MessageToByteHandler: @unchecked @retroactive Sendable {
|
private final class FixedHeaderDecoder: ByteToMessageDecoder, @unchecked Sendable {
|
||||||
|
typealias InboundIn = ByteBuffer
|
||||||
|
typealias InboundOut = ByteBuffer
|
||||||
|
|
||||||
|
func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
|
||||||
|
guard let len = buffer.getInteger(at: buffer.readerIndex, endianness: .big, as: UInt16.self) else {
|
||||||
|
return .needMoreData
|
||||||
|
}
|
||||||
|
|
||||||
|
if buffer.readableBytes >= len + 2 {
|
||||||
|
buffer.moveReaderIndex(forwardBy: 2)
|
||||||
|
if let bytes = buffer.readBytes(length: Int(len)) {
|
||||||
|
context.fireChannelRead(self.wrapInboundOut(ByteBuffer(bytes: bytes)))
|
||||||
|
}
|
||||||
|
return .continue
|
||||||
|
} else {
|
||||||
|
return .needMoreData
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -8,36 +8,120 @@
|
|||||||
import Foundation
|
import Foundation
|
||||||
import NIOCore
|
import NIOCore
|
||||||
import NIOPosix
|
import NIOPosix
|
||||||
import Combine
|
|
||||||
|
|
||||||
// 处理和sn-server服务器之间的通讯
|
// 处理和sn-server服务器之间的通讯
|
||||||
class SDLUDPHole: ChannelInboundHandler, @unchecked Sendable {
|
@available(macOS 14, *)
|
||||||
public typealias InboundIn = AddressedEnvelope<ByteBuffer>
|
actor SDLUDPHole {
|
||||||
public typealias OutboundOut = AddressedEnvelope<ByteBuffer>
|
|
||||||
|
|
||||||
// 回调函数
|
|
||||||
public typealias CallbackFun = (SDLStunProbeReply?) -> Void
|
|
||||||
|
|
||||||
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||||
|
private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>
|
||||||
|
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: UDPMessage.self, bufferingPolicy: .unbounded)
|
||||||
|
|
||||||
private var cookieGenerator = SDLIdGenerator(seed: 1)
|
private var cookieGenerator = SDLIdGenerator(seed: 1)
|
||||||
private var callbackManager = HoleCallbackManager()
|
private var promises: [UInt32:EventLoopPromise<SDLStunProbeReply>] = [:]
|
||||||
|
|
||||||
public var localAddress: SocketAddress?
|
public var localAddress: SocketAddress?
|
||||||
public var channel: Channel?
|
|
||||||
|
|
||||||
public var eventFlow = PassthroughSubject<UDPEvent, Never>()
|
public let eventFlow: AsyncStream<UDPEvent>
|
||||||
|
private let eventContinuation: AsyncStream<UDPEvent>.Continuation
|
||||||
|
|
||||||
|
private let logger: SDLLogger
|
||||||
|
|
||||||
|
struct UDPMessage {
|
||||||
|
let remoteAddress: SocketAddress
|
||||||
|
let type: SDLPacketType
|
||||||
|
let data: Data
|
||||||
|
}
|
||||||
|
|
||||||
// 定义事件类型
|
// 定义事件类型
|
||||||
enum UDPEvent {
|
enum UDPEvent {
|
||||||
case ready
|
case ready
|
||||||
case closed
|
|
||||||
case message(SocketAddress, SDLHoleInboundMessage)
|
case message(SocketAddress, SDLHoleInboundMessage)
|
||||||
case data(SDLData)
|
case data(SDLData)
|
||||||
}
|
}
|
||||||
|
|
||||||
init() {
|
// 启动函数
|
||||||
|
init(logger: SDLLogger) async throws {
|
||||||
|
self.logger = logger
|
||||||
|
|
||||||
|
(self.eventFlow, self.eventContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded)
|
||||||
|
|
||||||
|
let bootstrap = DatagramBootstrap(group: group)
|
||||||
|
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
||||||
|
|
||||||
|
self.asyncChannel = try await bootstrap.bind(host: "0.0.0.0", port: 0)
|
||||||
|
.flatMapThrowing { channel in
|
||||||
|
return try NIOAsyncChannel(wrappingChannelSynchronously: channel, configuration: .init(
|
||||||
|
inboundType: AddressedEnvelope<ByteBuffer>.self,
|
||||||
|
outboundType: AddressedEnvelope<ByteBuffer>.self
|
||||||
|
))
|
||||||
|
}
|
||||||
|
.get()
|
||||||
|
|
||||||
|
self.localAddress = self.asyncChannel.channel.localAddress
|
||||||
|
self.logger.log("[UDPHole] started and listening on: \(self.localAddress!)", level: .debug)
|
||||||
|
}
|
||||||
|
|
||||||
|
func start() async throws {
|
||||||
|
try await withTaskCancellationHandler {
|
||||||
|
try await self.asyncChannel.executeThenClose {inbound, outbound in
|
||||||
|
self.eventContinuation.yield(.ready)
|
||||||
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
|
group.addTask {
|
||||||
|
defer {
|
||||||
|
self.logger.log("[SDLUDPHole] inbound closed", level: .warning)
|
||||||
|
}
|
||||||
|
|
||||||
|
for try await envelope in inbound {
|
||||||
|
try Task.checkCancellation()
|
||||||
|
|
||||||
|
var buffer = envelope.data
|
||||||
|
let remoteAddress = envelope.remoteAddress
|
||||||
|
do {
|
||||||
|
if let message = try Self.decode(buffer: &buffer) {
|
||||||
|
switch message {
|
||||||
|
case .data(let data):
|
||||||
|
self.logger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug)
|
||||||
|
self.eventContinuation.yield(.data(data))
|
||||||
|
case .stunProbeReply(let probeReply):
|
||||||
|
// 执行并移除回调
|
||||||
|
await self.trigger(probeReply: probeReply)
|
||||||
|
default:
|
||||||
|
self.eventContinuation.yield(.message(remoteAddress, message))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.logger.log("[SDLUDPHole] decode message, get null", level: .warning)
|
||||||
|
}
|
||||||
|
} catch let err {
|
||||||
|
self.logger.log("[SDLUDPHole] decode message, get error: \(err)", level: .warning)
|
||||||
|
throw err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
group.addTask {
|
||||||
|
defer {
|
||||||
|
self.logger.log("[SDLUDPHole] outbound closed", level: .warning)
|
||||||
|
}
|
||||||
|
|
||||||
|
for try await message in self.writeStream {
|
||||||
|
try Task.checkCancellation()
|
||||||
|
|
||||||
|
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1)
|
||||||
|
buffer.writeBytes([message.type.rawValue])
|
||||||
|
buffer.writeBytes(message.data)
|
||||||
|
|
||||||
|
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: message.remoteAddress, data: buffer)
|
||||||
|
try await outbound.write(envelope)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for try await _ in group { }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} onCancel: {
|
||||||
|
self.writeContinuation.finish()
|
||||||
|
self.eventContinuation.finish()
|
||||||
|
self.logger.log("[SDLUDPHole] withTaskCancellationHandler cancel")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MARK: super_node apis
|
// MARK: super_node apis
|
||||||
@ -54,7 +138,7 @@ class SDLUDPHole: ChannelInboundHandler, @unchecked Sendable {
|
|||||||
stunRequest.mac = ctx.devAddr.mac
|
stunRequest.mac = ctx.devAddr.mac
|
||||||
stunRequest.natType = UInt32(ctx.natType.rawValue)
|
stunRequest.natType = UInt32(ctx.natType.rawValue)
|
||||||
|
|
||||||
SDLLogger.log("[SDLUDPHole] stunRequest: \(remoteAddress), host: \(ctx.config.stunServers[0].host):\(ctx.config.stunServers[0].ports[0])", level: .warning)
|
self.logger.log("[SDLUDPHole] stunRequest: \(remoteAddress), host: \(ctx.config.stunServers[0].host):\(ctx.config.stunServers[0].ports[0])", level: .debug)
|
||||||
|
|
||||||
self.send(remoteAddress: remoteAddress, type: .stunRequest, data: try! stunRequest.serializedData())
|
self.send(remoteAddress: remoteAddress, type: .stunRequest, data: try! stunRequest.serializedData())
|
||||||
|
|
||||||
@ -62,26 +146,33 @@ class SDLUDPHole: ChannelInboundHandler, @unchecked Sendable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 探测tun信息
|
// 探测tun信息
|
||||||
func stunProbe(remoteAddress: SocketAddress, attr: SDLProbeAttr = .none, timeout: Int = 5) async -> SDLStunProbeReply? {
|
func stunProbe(remoteAddress: SocketAddress, attr: SDLProbeAttr = .none, timeout: Int = 5) async throws -> SDLStunProbeReply {
|
||||||
return await withCheckedContinuation { continuation in
|
return try await self._stunProbe(remoteAddress: remoteAddress, attr: attr, timeout: timeout).get()
|
||||||
self.stunProbe(remoteAddress: remoteAddress, attr: attr, timeout: timeout) { probeReply in
|
|
||||||
continuation.resume(returning: probeReply)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private func stunProbe(remoteAddress: SocketAddress, attr: SDLProbeAttr = .none, timeout: Int, callback: @escaping CallbackFun) {
|
private func _stunProbe(remoteAddress: SocketAddress, attr: SDLProbeAttr = .none, timeout: Int) -> EventLoopFuture<SDLStunProbeReply> {
|
||||||
let cookie = self.cookieGenerator.nextId()
|
let cookie = self.cookieGenerator.nextId()
|
||||||
|
|
||||||
var stunProbe = SDLStunProbe()
|
var stunProbe = SDLStunProbe()
|
||||||
stunProbe.cookie = cookie
|
stunProbe.cookie = cookie
|
||||||
stunProbe.attr = UInt32(attr.rawValue)
|
stunProbe.attr = UInt32(attr.rawValue)
|
||||||
|
|
||||||
self.send(remoteAddress: remoteAddress, type: .stunProbe, data: try! stunProbe.serializedData())
|
self.send(remoteAddress: remoteAddress, type: .stunProbe, data: try! stunProbe.serializedData())
|
||||||
|
self.logger.log("[SDLUDPHole] stunProbe: \(remoteAddress)", level: .debug)
|
||||||
|
|
||||||
SDLLogger.log("[SDLUDPHole] stunProbe: \(remoteAddress)", level: .warning)
|
let promise = self.asyncChannel.channel.eventLoop.makePromise(of: SDLStunProbeReply.self)
|
||||||
|
self.promises[cookie] = promise
|
||||||
|
|
||||||
self.callbackManager.addCallback(id: cookie, callback: callback)
|
return promise.futureResult
|
||||||
|
}
|
||||||
|
|
||||||
|
private func trigger(probeReply: SDLStunProbeReply) {
|
||||||
|
let id = probeReply.cookie
|
||||||
|
// 执行并移除回调
|
||||||
|
if let promise = self.promises[id] {
|
||||||
|
self.asyncChannel.channel.eventLoop.execute {
|
||||||
|
promise.succeed(probeReply)
|
||||||
|
}
|
||||||
|
self.promises.removeValue(forKey: id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MARK: client-client apis
|
// MARK: client-client apis
|
||||||
@ -96,11 +187,10 @@ class SDLUDPHole: ChannelInboundHandler, @unchecked Sendable {
|
|||||||
dataPacket.dstMac = session.dstMac
|
dataPacket.dstMac = session.dstMac
|
||||||
dataPacket.ttl = 255
|
dataPacket.ttl = 255
|
||||||
dataPacket.data = data
|
dataPacket.data = data
|
||||||
let packet = try! dataPacket.serializedData()
|
if let packet = try? dataPacket.serializedData() {
|
||||||
|
self.logger.log("[SDLUDPHole] sendPacket: \(remoteAddress), count: \(packet.count)", level: .debug)
|
||||||
SDLLogger.log("[SDLUDPHole] sendPacket: \(remoteAddress), count: \(packet.count)", level: .debug)
|
self.send(remoteAddress: remoteAddress, type: .data, data: packet)
|
||||||
|
}
|
||||||
self.send(remoteAddress: remoteAddress, type: .data, data: packet)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 通过sn服务器转发数据包, data已经是加密过后的数据
|
// 通过sn服务器转发数据包, data已经是加密过后的数据
|
||||||
@ -114,23 +204,23 @@ class SDLUDPHole: ChannelInboundHandler, @unchecked Sendable {
|
|||||||
dataPacket.ttl = 255
|
dataPacket.ttl = 255
|
||||||
dataPacket.data = data
|
dataPacket.data = data
|
||||||
|
|
||||||
let packet = try! dataPacket.serializedData()
|
if let packet = try? dataPacket.serializedData() {
|
||||||
|
self.logger.log("[SDLContext] forward packet, remoteAddress: \(remoteAddress), data size: \(packet.count)", level: .debug)
|
||||||
NSLog("[SDLContext] forward packet, remoteAddress: \(remoteAddress), data size: \(packet.count)")
|
self.send(remoteAddress: remoteAddress, type: .data, data: packet)
|
||||||
|
}
|
||||||
self.send(remoteAddress: remoteAddress, type: .data, data: packet)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 发送register包
|
// 发送register包
|
||||||
func sendRegister(context ctx: SDLContext, remoteAddress: SocketAddress, dst_mac: Data) {
|
func sendRegister(remoteAddress: SocketAddress, networkId: UInt32, srcMac: Data, dst_mac: Data) {
|
||||||
var register = SDLRegister()
|
var register = SDLRegister()
|
||||||
register.networkID = ctx.devAddr.networkID
|
register.networkID = networkId
|
||||||
register.srcMac = ctx.devAddr.mac
|
register.srcMac = srcMac
|
||||||
register.dstMac = dst_mac
|
register.dstMac = dst_mac
|
||||||
|
|
||||||
SDLLogger.log("[SDLUDPHole] SendRegister: \(remoteAddress), src_mac: \(LayerPacket.MacAddress.description(data: ctx.devAddr.mac)), dst_mac: \(LayerPacket.MacAddress.description(data: dst_mac))", level: .debug)
|
if let packet = try? register.serializedData() {
|
||||||
|
self.logger.log("[SDLUDPHole] SendRegister: \(remoteAddress), src_mac: \(LayerPacket.MacAddress.description(data: srcMac)), dst_mac: \(LayerPacket.MacAddress.description(data: dst_mac))", level: .debug)
|
||||||
self.send(remoteAddress: remoteAddress, type: .register, data: try! register.serializedData())
|
self.send(remoteAddress: remoteAddress, type: .register, data: packet)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 回复registerAck
|
// 回复registerAck
|
||||||
@ -140,108 +230,23 @@ class SDLUDPHole: ChannelInboundHandler, @unchecked Sendable {
|
|||||||
registerAck.srcMac = ctx.devAddr.mac
|
registerAck.srcMac = ctx.devAddr.mac
|
||||||
registerAck.dstMac = dst_mac
|
registerAck.dstMac = dst_mac
|
||||||
|
|
||||||
SDLLogger.log("[SDLUDPHole] SendRegisterAck: \(remoteAddress), \(registerAck)", level: .debug)
|
if let packet = try? registerAck.serializedData() {
|
||||||
|
self.logger.log("[SDLUDPHole] SendRegisterAck: \(remoteAddress), \(registerAck)", level: .debug)
|
||||||
self.send(remoteAddress: remoteAddress, type: .registerAck, data: try! registerAck.serializedData())
|
self.send(remoteAddress: remoteAddress, type: .registerAck, data: packet)
|
||||||
}
|
|
||||||
|
|
||||||
// 启动函数
|
|
||||||
func start() async throws {
|
|
||||||
let bootstrap = DatagramBootstrap(group: self.group)
|
|
||||||
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
|
||||||
.channelInitializer { channel in
|
|
||||||
// 接收缓冲区
|
|
||||||
return channel.setOption(ChannelOptions.socketOption(.so_rcvbuf), value: 5 * 1024 * 1024)
|
|
||||||
.flatMap {
|
|
||||||
channel.setOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_SNDBUF), value: 5 * 1024 * 1024)
|
|
||||||
}.flatMap {
|
|
||||||
channel.pipeline.addHandler(self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let channel = try await bootstrap.bind(host: "0.0.0.0", port: 0).get()
|
|
||||||
|
|
||||||
SDLLogger.log("[UDPHole] started and listening on: \(channel.localAddress!)", level: .debug)
|
|
||||||
self.localAddress = channel.localAddress
|
|
||||||
self.channel = channel
|
|
||||||
}
|
|
||||||
|
|
||||||
// -- MARK: ChannelInboundHandler Methods
|
|
||||||
|
|
||||||
public func channelActive(context: ChannelHandlerContext) {
|
|
||||||
self.eventFlow.send(.ready)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 接收到的消息, 消息需要根据类型分流
|
|
||||||
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
|
||||||
let envelope = self.unwrapInboundIn(data)
|
|
||||||
var buffer = envelope.data
|
|
||||||
let remoteAddress = envelope.remoteAddress
|
|
||||||
|
|
||||||
do {
|
|
||||||
if let message = try decode(buffer: &buffer) {
|
|
||||||
Task {
|
|
||||||
switch message {
|
|
||||||
case .data(let data):
|
|
||||||
SDLLogger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug)
|
|
||||||
self.eventFlow.send(.data(data))
|
|
||||||
case .stunProbeReply(let probeReply):
|
|
||||||
self.callbackManager.fireCallback(message: probeReply)
|
|
||||||
default:
|
|
||||||
self.eventFlow.send(.message(remoteAddress, message))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
SDLLogger.log("[SDLUDPHole] decode message, get null", level: .warning)
|
|
||||||
}
|
|
||||||
} catch let err {
|
|
||||||
SDLLogger.log("[SDLUDPHole] decode message, get error: \(err)", level: .debug)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public func errorCaught(context: ChannelHandlerContext, error: Error) {
|
|
||||||
SDLLogger.log("[SDLUDPHole] get error: \(error)", level: .error)
|
|
||||||
// As we are not really interested getting notified on success or failure we just pass nil as promise to
|
|
||||||
// reduce allocations.
|
|
||||||
context.close(promise: nil)
|
|
||||||
self.channel = nil
|
|
||||||
self.eventFlow.send(.closed)
|
|
||||||
}
|
|
||||||
|
|
||||||
public func channelInactive(context: ChannelHandlerContext) {
|
|
||||||
self.channel = nil
|
|
||||||
context.close(promise: nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 处理写入逻辑
|
// 处理写入逻辑
|
||||||
func send(remoteAddress: SocketAddress, type: SDLPacketType, data: Data) {
|
private func send(remoteAddress: SocketAddress, type: SDLPacketType, data: Data) {
|
||||||
guard let channel = self.channel else {
|
let message = UDPMessage(remoteAddress: remoteAddress, type: type, data: data)
|
||||||
return
|
self.writeContinuation.yield(message)
|
||||||
}
|
|
||||||
|
|
||||||
channel.eventLoop.execute {
|
|
||||||
var buffer = channel.allocator.buffer(capacity: data.count + 1)
|
|
||||||
buffer.writeBytes([type.rawValue])
|
|
||||||
buffer.writeBytes(data)
|
|
||||||
|
|
||||||
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: remoteAddress, data: buffer)
|
|
||||||
channel.writeAndFlush(self.wrapOutboundOut(envelope), promise: nil)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
deinit {
|
//--MARK: 编解码器
|
||||||
try? self.group.syncShutdownGracefully()
|
private static func decode(buffer: inout ByteBuffer) throws -> SDLHoleInboundMessage? {
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//--MARK: 编解码器
|
|
||||||
extension SDLUDPHole {
|
|
||||||
|
|
||||||
func decode(buffer: inout ByteBuffer) throws -> SDLHoleInboundMessage? {
|
|
||||||
guard let type = buffer.readInteger(as: UInt8.self),
|
guard let type = buffer.readInteger(as: UInt8.self),
|
||||||
let packetType = SDLPacketType(rawValue: type),
|
let packetType = SDLPacketType(rawValue: type),
|
||||||
let bytes = buffer.readBytes(length: buffer.readableBytes) else {
|
let bytes = buffer.readBytes(length: buffer.readableBytes) else {
|
||||||
SDLLogger.log("[SDLUDPHole] decode error", level: .error)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -265,42 +270,11 @@ extension SDLUDPHole {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// --MARK: 回调函数管理器
|
|
||||||
extension SDLUDPHole {
|
|
||||||
|
|
||||||
private struct HoleCallbackManager {
|
|
||||||
// 存储回调函数和对应的超时任务
|
|
||||||
private var callbacks: [UInt32: CallbackFun] = [:]
|
|
||||||
|
|
||||||
//private var timeoutCallbacks: [UInt32: CallbackFun] = [:]
|
|
||||||
|
|
||||||
// 添加回调并设置超时
|
|
||||||
mutating func addCallback(id: UInt32, callback: @escaping CallbackFun) {
|
|
||||||
// 存储回调
|
|
||||||
self.callbacks[id] = callback
|
|
||||||
}
|
|
||||||
|
|
||||||
// 正常触发回调(收到响应)
|
|
||||||
mutating func fireCallback(message: SDLStunProbeReply) {
|
|
||||||
let id = message.cookie
|
|
||||||
// 执行并移除回调
|
|
||||||
if let callback = callbacks[id] {
|
|
||||||
callback(message)
|
|
||||||
self.callbacks.removeValue(forKey: id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 触发所有回调(清理场景)
|
|
||||||
mutating func fireAllCallbacks(message: SDLSuperInboundMessage) {
|
|
||||||
// 触发所有回调
|
|
||||||
for callback in callbacks.values {
|
|
||||||
callback(nil)
|
|
||||||
}
|
|
||||||
self.callbacks.removeAll()
|
|
||||||
}
|
|
||||||
|
|
||||||
|
deinit {
|
||||||
|
try? self.group.syncShutdownGracefully()
|
||||||
|
self.writeContinuation.finish()
|
||||||
|
self.eventContinuation.finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -43,4 +43,10 @@ struct SDLUtil {
|
|||||||
return ip & mask == compareIp & mask
|
return ip & mask == compareIp & mask
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static func formatMacAddress(mac: Data) -> String {
|
||||||
|
let bytes = [UInt8](mac)
|
||||||
|
|
||||||
|
return bytes.map { String(format: "%02X", $0) }.joined(separator: ":").lowercased()
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user