Compare commits
10 Commits
b55d9913cc
...
0fe9b43c08
| Author | SHA1 | Date | |
|---|---|---|---|
| 0fe9b43c08 | |||
| 29b9bebe87 | |||
| 12b1d68635 | |||
| a280a22d3a | |||
| 26ee512a9a | |||
| f273da3b11 | |||
| 3ce4e05613 | |||
| 9a76b61ff0 | |||
| 1f629a58a0 | |||
| d1452ce0b7 |
@ -1,31 +0,0 @@
|
|||||||
//
|
|
||||||
// HolerManager.swift
|
|
||||||
// sdlan
|
|
||||||
//
|
|
||||||
// Created by 安礼成 on 2025/7/14.
|
|
||||||
//
|
|
||||||
import Foundation
|
|
||||||
|
|
||||||
actor HolerManager {
|
|
||||||
private var holers: [Data:Task<(), Never>] = [:]
|
|
||||||
|
|
||||||
func addHoler(dstMac: Data, creator: @escaping () -> Task<(), Never>) {
|
|
||||||
if let task = self.holers[dstMac] {
|
|
||||||
if task.isCancelled {
|
|
||||||
self.holers[dstMac] = creator()
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
self.holers[dstMac] = creator()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func cleanup() {
|
|
||||||
for holer in holers.values {
|
|
||||||
holer.cancel()
|
|
||||||
}
|
|
||||||
self.holers.removeAll()
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@ -47,8 +47,8 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
let rsaCipher: RSACipher
|
let rsaCipher: RSACipher
|
||||||
|
|
||||||
// 依赖的变量
|
// 依赖的变量
|
||||||
var udpHoleActor: SDLUDPHoleActor?
|
var udpHole: SDLUDPHole?
|
||||||
var superClientActor: SDLSuperClientActor?
|
var superClient: SDLSuperClient?
|
||||||
|
|
||||||
// 数据包读取任务
|
// 数据包读取任务
|
||||||
private var readTask: Task<(), Never>?
|
private var readTask: Task<(), Never>?
|
||||||
@ -56,7 +56,6 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
let provider: NEPacketTunnelProvider
|
let provider: NEPacketTunnelProvider
|
||||||
|
|
||||||
private var sessionManager: SessionManager
|
private var sessionManager: SessionManager
|
||||||
private var holerManager: HolerManager
|
|
||||||
private var arpServer: ArpServer
|
private var arpServer: ArpServer
|
||||||
|
|
||||||
// 记录最后发送的stunRequest的cookie
|
// 记录最后发送的stunRequest的cookie
|
||||||
@ -70,12 +69,23 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
private var monitorCancel: AnyCancellable?
|
private var monitorCancel: AnyCancellable?
|
||||||
|
|
||||||
// 内部socket通讯
|
// 内部socket通讯
|
||||||
private var noticeClient: SDLNoticeClient
|
private var noticeClient: SDLNoticeClient?
|
||||||
|
|
||||||
// 流量统计
|
// 流量统计
|
||||||
private var flowTracer = SDLFlowTracerActor()
|
private var flowTracer = SDLFlowTracerActor()
|
||||||
private var flowTracerCancel: AnyCancellable?
|
private var flowTracerCancel: AnyCancellable?
|
||||||
|
|
||||||
|
// 处理holer
|
||||||
|
private var holerPublishers: [Data:PassthroughSubject<RegisterRequest, Never>] = [:]
|
||||||
|
private var bag = Set<AnyCancellable>()
|
||||||
|
private var locker = NSLock()
|
||||||
|
|
||||||
|
struct RegisterRequest {
|
||||||
|
let srcMac: Data
|
||||||
|
let dstMac: Data
|
||||||
|
let networkId: UInt32
|
||||||
|
}
|
||||||
|
|
||||||
public init(provider: NEPacketTunnelProvider, config: SDLConfiguration, rsaCipher: RSACipher, aesCipher: AESCipher) {
|
public init(provider: NEPacketTunnelProvider, config: SDLConfiguration, rsaCipher: RSACipher, aesCipher: AESCipher) {
|
||||||
self.config = config
|
self.config = config
|
||||||
self.rsaCipher = rsaCipher
|
self.rsaCipher = rsaCipher
|
||||||
@ -88,73 +98,137 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
|
|
||||||
self.provider = provider
|
self.provider = provider
|
||||||
self.sessionManager = SessionManager()
|
self.sessionManager = SessionManager()
|
||||||
self.holerManager = HolerManager()
|
|
||||||
self.arpServer = ArpServer(known_macs: [:])
|
self.arpServer = ArpServer(known_macs: [:])
|
||||||
self.noticeClient = SDLNoticeClient()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public func start() async throws {
|
public func start() async throws {
|
||||||
self.udpHoleActor = try await SDLUDPHoleActor()
|
self.noticeClient = try await SDLNoticeClient()
|
||||||
self.superClientActor = try await SDLSuperClientActor(host: self.config.superHost, port: self.config.superPort)
|
|
||||||
|
|
||||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
group.addTask {
|
group.addTask {
|
||||||
try await self.udpHoleActor?.start()
|
while !Task.isCancelled {
|
||||||
|
do {
|
||||||
|
try await self.startUDPHole()
|
||||||
|
} catch let err {
|
||||||
|
NSLog("udp Hole get err: \(err)")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
group.addTask {
|
group.addTask {
|
||||||
try await self.superClientActor?.start()
|
while !Task.isCancelled {
|
||||||
|
do {
|
||||||
|
try await self.startSuperClient()
|
||||||
|
} catch let err {
|
||||||
|
NSLog("SuperClient get error: \(err)")
|
||||||
|
await self.arpServer.clear()
|
||||||
|
try? await Task.sleep(for: .seconds(2))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
group.addTask {
|
group.addTask {
|
||||||
if let eventFlow = self.superClientActor?.eventFlow {
|
try await self.startMonitor()
|
||||||
|
}
|
||||||
|
|
||||||
|
group.addTask {
|
||||||
|
while !Task.isCancelled {
|
||||||
|
do {
|
||||||
|
try await self.noticeClient?.start()
|
||||||
|
} catch let err {
|
||||||
|
NSLog("noticeClient get err: \(err)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try await group.waitForAll()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public func stop() async {
|
||||||
|
self.superClient = nil
|
||||||
|
self.udpHole = nil
|
||||||
|
|
||||||
|
self.readTask?.cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
private func startUDPHole() async throws {
|
||||||
|
self.udpHole = try await SDLUDPHole()
|
||||||
|
|
||||||
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
|
group.addTask {
|
||||||
|
try await self.udpHole?.start()
|
||||||
|
}
|
||||||
|
|
||||||
|
group.addTask {
|
||||||
|
while !Task.isCancelled {
|
||||||
|
try await Task.sleep(nanoseconds: 5 * 1_000_000_000)
|
||||||
|
self.lastCookie = await self.udpHole?.stunRequest(context: self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
group.addTask {
|
||||||
|
if let eventFlow = self.udpHole?.eventFlow {
|
||||||
|
for try await event in eventFlow {
|
||||||
|
try await self.handleUDPEvent(event: event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try await group.waitForAll()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func startSuperClient() async throws {
|
||||||
|
self.superClient = try await SDLSuperClient(host: self.config.superHost, port: self.config.superPort)
|
||||||
|
|
||||||
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
|
group.addTask {
|
||||||
|
try await self.superClient?.start()
|
||||||
|
}
|
||||||
|
|
||||||
|
group.addTask {
|
||||||
|
if let eventFlow = self.superClient?.eventFlow {
|
||||||
for try await event in eventFlow {
|
for try await event in eventFlow {
|
||||||
try await self.handleSuperEvent(event: event)
|
try await self.handleSuperEvent(event: event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
try await group.waitForAll()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func startMonitor() async throws {
|
||||||
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
|
group.addTask {
|
||||||
|
try await self.noticeClient?.start()
|
||||||
|
}
|
||||||
|
|
||||||
group.addTask {
|
group.addTask {
|
||||||
if let eventFlow = self.udpHoleActor?.eventFlow {
|
// 启动网络监控
|
||||||
for try await event in eventFlow {
|
self.monitorCancel = self.monitor.eventFlow.sink { event in
|
||||||
try await self.handleUDPEvent(event: event)
|
switch event {
|
||||||
|
case .changed:
|
||||||
|
// 需要重新探测网络的nat类型
|
||||||
|
Task {
|
||||||
|
self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config)
|
||||||
|
NSLog("didNetworkPathChanged, nat type is: \(self.natType)")
|
||||||
|
}
|
||||||
|
case .unreachable:
|
||||||
|
NSLog("didNetworkPathUnreachable")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
self.monitor.start()
|
||||||
}
|
}
|
||||||
|
|
||||||
try await group.waitForAll()
|
try await group.waitForAll()
|
||||||
}
|
}
|
||||||
|
|
||||||
// self.noticeClient.start()
|
|
||||||
// // 启动网络监控
|
|
||||||
// self.monitorCancel = self.monitor.eventFlow.sink { event in
|
|
||||||
// switch event {
|
|
||||||
// case .changed:
|
|
||||||
// // 需要重新探测网络的nat类型
|
|
||||||
// Task {
|
|
||||||
// self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config)
|
|
||||||
// NSLog("didNetworkPathChanged, nat type is: \(self.natType)")
|
|
||||||
// }
|
|
||||||
// case .unreachable:
|
|
||||||
// NSLog("didNetworkPathUnreachable")
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// self.monitor.start()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public func stop() async {
|
private func handleSuperEvent(event: SDLSuperClient.SuperEvent) async throws {
|
||||||
self.superClientActor = nil
|
|
||||||
self.udpHoleActor = nil
|
|
||||||
|
|
||||||
self.readTask?.cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
private func handleSuperEvent(event: SDLSuperClientActor.SuperEvent) async throws {
|
|
||||||
switch event {
|
switch event {
|
||||||
case .ready:
|
case .ready:
|
||||||
NSLog("[SDLContext] get registerSuper, mac address: \(Self.formatMacAddress(mac: self.devAddr.mac))")
|
NSLog("[SDLContext] get registerSuper, mac address: \(Self.formatMacAddress(mac: self.devAddr.mac))")
|
||||||
guard let message = try await self.superClientActor?.registerSuper(context: self) else {
|
guard let message = try await self.superClient?.registerSuper(context: self).get() else {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -169,7 +243,7 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
|
|
||||||
if upgradeType == .force {
|
if upgradeType == .force {
|
||||||
let forceUpgrade = NoticeMessage.UpgradeMessage(prompt: registerSuperAck.upgradePrompt, address: registerSuperAck.upgradeAddress)
|
let forceUpgrade = NoticeMessage.UpgradeMessage(prompt: registerSuperAck.upgradePrompt, address: registerSuperAck.upgradeAddress)
|
||||||
self.noticeClient.send(data: forceUpgrade.binaryData)
|
await self.noticeClient?.send(data: forceUpgrade.binaryData)
|
||||||
exit(-1)
|
exit(-1)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -179,7 +253,7 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
|
|
||||||
if upgradeType == .normal {
|
if upgradeType == .normal {
|
||||||
let normalUpgrade = NoticeMessage.UpgradeMessage(prompt: registerSuperAck.upgradePrompt, address: registerSuperAck.upgradeAddress)
|
let normalUpgrade = NoticeMessage.UpgradeMessage(prompt: registerSuperAck.upgradePrompt, address: registerSuperAck.upgradeAddress)
|
||||||
self.noticeClient.send(data: normalUpgrade.binaryData)
|
await self.noticeClient?.send(data: normalUpgrade.binaryData)
|
||||||
}
|
}
|
||||||
|
|
||||||
case .registerSuperNak(let nakPacket):
|
case .registerSuperNak(let nakPacket):
|
||||||
@ -191,25 +265,17 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
switch errorCode {
|
switch errorCode {
|
||||||
case .invalidToken, .nodeDisabled:
|
case .invalidToken, .nodeDisabled:
|
||||||
let alertNotice = NoticeMessage.AlertMessage(alert: errorMessage)
|
let alertNotice = NoticeMessage.AlertMessage(alert: errorMessage)
|
||||||
self.noticeClient.send(data: alertNotice.binaryData)
|
await self.noticeClient?.send(data: alertNotice.binaryData)
|
||||||
exit(-1)
|
exit(-1)
|
||||||
case .noIpAddress, .networkFault, .internalFault:
|
case .noIpAddress, .networkFault, .internalFault:
|
||||||
let alertNotice = NoticeMessage.AlertMessage(alert: errorMessage)
|
let alertNotice = NoticeMessage.AlertMessage(alert: errorMessage)
|
||||||
self.noticeClient.send(data: alertNotice.binaryData)
|
await self.noticeClient?.send(data: alertNotice.binaryData)
|
||||||
}
|
}
|
||||||
NSLog("[SDLContext] Get a SuperNak message exit")
|
NSLog("[SDLContext] Get a SuperNak message exit")
|
||||||
default:
|
default:
|
||||||
()
|
()
|
||||||
}
|
}
|
||||||
|
|
||||||
case .closed:
|
|
||||||
NSLog("[SDLContext] super client closed")
|
|
||||||
await self.arpServer.clear()
|
|
||||||
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
|
|
||||||
// Task {@MainActor in
|
|
||||||
// try await self.startSuperClient()
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
case .event(let evt):
|
case .event(let evt):
|
||||||
switch evt {
|
switch evt {
|
||||||
case .natChanged(let natChangedEvent):
|
case .natChanged(let natChangedEvent):
|
||||||
@ -221,12 +287,12 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
let address = SDLUtil.int32ToIp(sendRegisterEvent.natIp)
|
let address = SDLUtil.int32ToIp(sendRegisterEvent.natIp)
|
||||||
if let remoteAddress = try? SocketAddress.makeAddressResolvingHost(address, port: Int(sendRegisterEvent.natPort)) {
|
if let remoteAddress = try? SocketAddress.makeAddressResolvingHost(address, port: Int(sendRegisterEvent.natPort)) {
|
||||||
// 发送register包
|
// 发送register包
|
||||||
await self.udpHoleActor?.sendRegister(context: self, remoteAddress: remoteAddress, dst_mac: sendRegisterEvent.dstMac)
|
await self.udpHole?.sendRegister(remoteAddress: remoteAddress, networkId: self.devAddr.networkID, srcMac: self.devAddr.mac, dst_mac: sendRegisterEvent.dstMac)
|
||||||
}
|
}
|
||||||
|
|
||||||
case .networkShutdown(let shutdownEvent):
|
case .networkShutdown(let shutdownEvent):
|
||||||
let alertNotice = NoticeMessage.AlertMessage(alert: shutdownEvent.message)
|
let alertNotice = NoticeMessage.AlertMessage(alert: shutdownEvent.message)
|
||||||
self.noticeClient.send(data: alertNotice.binaryData)
|
await self.noticeClient?.send(data: alertNotice.binaryData)
|
||||||
exit(-1)
|
exit(-1)
|
||||||
}
|
}
|
||||||
case .command(let packetId, let command):
|
case .command(let packetId, let command):
|
||||||
@ -244,44 +310,19 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
var commandAck = SDLCommandAck()
|
var commandAck = SDLCommandAck()
|
||||||
commandAck.status = true
|
commandAck.status = true
|
||||||
|
|
||||||
await self.superClientActor?.commandAck(packetId: packetId, ack: commandAck)
|
await self.superClient?.commandAck(packetId: packetId, ack: commandAck)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private func startUDPHole() async throws {
|
private func handleUDPEvent(event: SDLUDPHole.UDPEvent) async throws {
|
||||||
// self.udpHole = SDLUDPHole()
|
|
||||||
//
|
|
||||||
// self.udpCancel?.cancel()
|
|
||||||
// self.udpCancel = self.udpHole?.eventFlow.sink { event in
|
|
||||||
// Task.detached {
|
|
||||||
// await self.handleUDPEvent(event: event)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// try await self.udpHole?.start()
|
|
||||||
}
|
|
||||||
|
|
||||||
private func handleUDPEvent(event: SDLUDPHoleActor.UDPEvent) async throws {
|
|
||||||
switch event {
|
switch event {
|
||||||
case .ready:
|
case .ready:
|
||||||
// 获取当前网络的类型
|
// 获取当前网络的类型
|
||||||
self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config)
|
//self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config)
|
||||||
SDLLogger.log("[SDLContext] nat type is: \(self.natType)", level: .debug)
|
SDLLogger.log("[SDLContext] nat type is: \(self.natType)", level: .debug)
|
||||||
|
|
||||||
let timer = Timer.publish(every: 5.0, on: .main, in: .common).autoconnect()
|
|
||||||
// self.stunCancel = Just(Date()).merge(with: timer).sink { _ in
|
|
||||||
// self.lastCookie = await self.udpHoleActor?.stunRequest(context: self)
|
|
||||||
// }
|
|
||||||
|
|
||||||
case .closed:
|
|
||||||
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
|
|
||||||
Task {
|
|
||||||
try await self.startUDPHole()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
case .message(let remoteAddress, let message):
|
case .message(let remoteAddress, let message):
|
||||||
switch message {
|
switch message {
|
||||||
case .register(let register):
|
case .register(let register):
|
||||||
@ -289,7 +330,7 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
// 判断目标地址是否是tun的网卡地址, 并且是在同一个网络下
|
// 判断目标地址是否是tun的网卡地址, 并且是在同一个网络下
|
||||||
if register.dstMac == self.devAddr.mac && register.networkID == self.devAddr.networkID {
|
if register.dstMac == self.devAddr.mac && register.networkID == self.devAddr.networkID {
|
||||||
// 回复ack包
|
// 回复ack包
|
||||||
self.udpHoleActor?.sendRegisterAck(context: self, remoteAddress: remoteAddress, dst_mac: register.srcMac)
|
await self.udpHole?.sendRegisterAck(context: self, remoteAddress: remoteAddress, dst_mac: register.srcMac)
|
||||||
// 这里需要建立到来源的会话, 在复杂网络下,通过super-node查询到的nat地址不一定靠谱,需要通过udp包的来源地址作为nat地址
|
// 这里需要建立到来源的会话, 在复杂网络下,通过super-node查询到的nat地址不一定靠谱,需要通过udp包的来源地址作为nat地址
|
||||||
let session = Session(dstMac: register.srcMac, natAddress: remoteAddress)
|
let session = Session(dstMac: register.srcMac, natAddress: remoteAddress)
|
||||||
await self.sessionManager.addSession(session: session)
|
await self.sessionManager.addSession(session: session)
|
||||||
@ -371,18 +412,18 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 流量统计
|
// 流量统计
|
||||||
public func flowReportTask() {
|
// public func flowReportTask() {
|
||||||
Task {
|
// Task {
|
||||||
// 每分钟汇报一次
|
// // 每分钟汇报一次
|
||||||
self.flowTracerCancel = Timer.publish(every: 60.0, on: .main, in: .common).autoconnect()
|
// self.flowTracerCancel = Timer.publish(every: 60.0, on: .main, in: .common).autoconnect()
|
||||||
.sink { _ in
|
// .sink { _ in
|
||||||
Task {
|
// Task {
|
||||||
let (forwardNum, p2pNum, inboundNum) = await self.flowTracer.reset()
|
// let (forwardNum, p2pNum, inboundNum) = await self.flowTracer.reset()
|
||||||
await self.superClientActor?.flowReport(forwardNum: forwardNum, p2pNum: p2pNum, inboundNum: inboundNum)
|
// await self.superClient?.flowReport(forwardNum: forwardNum, p2pNum: p2pNum, inboundNum: inboundNum)
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
// 网络改变时需要重新配置网络信息
|
// 网络改变时需要重新配置网络信息
|
||||||
private func didNetworkConfigChanged(devAddr: SDLDevAddr, dnsServers: [String]? = nil) async {
|
private func didNetworkConfigChanged(devAddr: SDLDevAddr, dnsServers: [String]? = nil) async {
|
||||||
@ -412,8 +453,6 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
// 网卡配置设置必须成功
|
// 网卡配置设置必须成功
|
||||||
do {
|
do {
|
||||||
try await self.provider.setTunnelNetworkSettings(networkSettings)
|
try await self.provider.setTunnelNetworkSettings(networkSettings)
|
||||||
|
|
||||||
await self.holerManager.cleanup()
|
|
||||||
self.startReader()
|
self.startReader()
|
||||||
|
|
||||||
NSLog("[SDLContext] setTunnelNetworkSettings success, start read packet")
|
NSLog("[SDLContext] setTunnelNetworkSettings success, start read packet")
|
||||||
@ -454,7 +493,7 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
else {
|
else {
|
||||||
// 构造arp请求
|
// 构造arp请求
|
||||||
let broadcastMac = Data([0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])
|
let broadcastMac = Data([0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])
|
||||||
let arpReqeust: ARPPacket = ARPPacket.arpRequest(senderIP: self.devAddr.netAddr, senderMAC: self.devAddr.mac, targetIP: dstIp)
|
let arpReqeust = ARPPacket.arpRequest(senderIP: self.devAddr.netAddr, senderMAC: self.devAddr.mac, targetIP: dstIp)
|
||||||
await self.routeLayerPacket(dstMac: broadcastMac, type: .arp, data: arpReqeust.marshal())
|
await self.routeLayerPacket(dstMac: broadcastMac, type: .arp, data: arpReqeust.marshal())
|
||||||
|
|
||||||
NSLog("[SDLContext] dstIp: \(dstIp) arp query not found")
|
NSLog("[SDLContext] dstIp: \(dstIp) arp query not found")
|
||||||
@ -478,26 +517,47 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
// 通过session发送到对端
|
// 通过session发送到对端
|
||||||
if let session = await self.sessionManager.getSession(toAddress: dstMac) {
|
if let session = await self.sessionManager.getSession(toAddress: dstMac) {
|
||||||
NSLog("[SDLContext] send packet by session: \(session)")
|
NSLog("[SDLContext] send packet by session: \(session)")
|
||||||
await self.udpHoleActor?.sendPacket(context: self, session: session, data: encodedPacket)
|
await self.udpHole?.sendPacket(context: self, session: session, data: encodedPacket)
|
||||||
|
|
||||||
await self.flowTracer.inc(num: data.count, type: .p2p)
|
await self.flowTracer.inc(num: data.count, type: .p2p)
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// 通过super_node进行转发
|
// 通过super_node进行转发
|
||||||
await self.udpHoleActor?.forwardPacket(context: self, dst_mac: dstMac, data: encodedPacket)
|
await self.udpHole?.forwardPacket(context: self, dst_mac: dstMac, data: encodedPacket)
|
||||||
// 流量统计
|
// 流量统计
|
||||||
await self.flowTracer.inc(num: data.count, type: .forward)
|
await self.flowTracer.inc(num: data.count, type: .forward)
|
||||||
|
|
||||||
// 尝试打洞
|
// 尝试打洞
|
||||||
await self.holerManager.addHoler(dstMac: dstMac) {
|
let registerRequest = RegisterRequest(srcMac: self.devAddr.mac, dstMac: dstMac, networkId: self.devAddr.networkID)
|
||||||
self.holerTask(dstMac: dstMac)
|
self.submitRegisterRequest(request: registerRequest)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func holerTask(dstMac: Data) -> Task<(), Never> {
|
private func submitRegisterRequest(request: RegisterRequest) {
|
||||||
return Task {
|
self.locker.lock()
|
||||||
guard let message = try? await self.superClientActor?.queryInfo(context: self, dst_mac: dstMac) else {
|
defer {
|
||||||
|
self.locker.unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
let dstMac = request.dstMac
|
||||||
|
if let publisher = self.holerPublishers[dstMac] {
|
||||||
|
publisher.send(request)
|
||||||
|
} else {
|
||||||
|
let publisher = PassthroughSubject<RegisterRequest, Never>()
|
||||||
|
publisher.debounce(for: .seconds(5), scheduler: DispatchQueue.global())
|
||||||
|
.sink { request in
|
||||||
|
Task {
|
||||||
|
await self.tryHole(request: request)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.store(in: &self.bag)
|
||||||
|
|
||||||
|
self.holerPublishers[dstMac] = publisher
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func tryHole(request: RegisterRequest) async {
|
||||||
|
guard let message = try? await self.superClient?.queryInfo(dst_mac: request.dstMac).get() else {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -508,7 +568,7 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
if let remoteAddress = peerInfo.v4Info.socketAddress() {
|
if let remoteAddress = peerInfo.v4Info.socketAddress() {
|
||||||
SDLLogger.log("[SDLContext] hole sock address: \(remoteAddress)", level: .warning)
|
SDLLogger.log("[SDLContext] hole sock address: \(remoteAddress)", level: .warning)
|
||||||
// 发送register包
|
// 发送register包
|
||||||
await self.udpHoleActor?.sendRegister(context: self, remoteAddress: remoteAddress, dst_mac: dstMac)
|
await self.udpHole?.sendRegister(remoteAddress: remoteAddress, networkId: request.networkId, srcMac: request.srcMac, dst_mac: request.dstMac)
|
||||||
} else {
|
} else {
|
||||||
SDLLogger.log("[SDLContext] hole sock address is invalid: \(peerInfo.v4Info)", level: .warning)
|
SDLLogger.log("[SDLContext] hole sock address is invalid: \(peerInfo.v4Info)", level: .warning)
|
||||||
}
|
}
|
||||||
@ -516,12 +576,11 @@ public class SDLContext: @unchecked Sendable {
|
|||||||
SDLLogger.log("[SDLContext] hole query_info is packet: \(message)", level: .warning)
|
SDLLogger.log("[SDLContext] hole query_info is packet: \(message)", level: .warning)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
deinit {
|
deinit {
|
||||||
self.stunCancel?.cancel()
|
self.stunCancel?.cancel()
|
||||||
self.udpHoleActor = nil
|
self.udpHole = nil
|
||||||
self.superClientActor = nil
|
self.superClient = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -54,7 +54,7 @@ enum SDLUpgradeType: UInt32 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Id生成器
|
// Id生成器
|
||||||
struct SDLIdGenerator {
|
struct SDLIdGenerator: Sendable {
|
||||||
// 消息体id
|
// 消息体id
|
||||||
private var packetId: UInt32
|
private var packetId: UInt32
|
||||||
|
|
||||||
|
|||||||
@ -34,7 +34,7 @@ struct SDLNatProber {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 网络没有在nat下
|
// 网络没有在nat下
|
||||||
if natAddress1 == udpHole.localAddress {
|
if await natAddress1 == udpHole.localAddress {
|
||||||
return .noNat
|
return .noNat
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -68,7 +68,7 @@ struct SDLNatProber {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static func getNatAddress(_ udpHole: SDLUDPHole, remoteAddress: SocketAddress, attr: SDLProbeAttr) async -> SocketAddress? {
|
private static func getNatAddress(_ udpHole: SDLUDPHole, remoteAddress: SocketAddress, attr: SDLProbeAttr) async -> SocketAddress? {
|
||||||
let stunProbeReply = await udpHole.stunProbe(remoteAddress: remoteAddress, attr: attr, timeout: 5)
|
let stunProbeReply = try? await udpHole.stunProbe(remoteAddress: remoteAddress, attr: attr, timeout: 5)
|
||||||
|
|
||||||
return stunProbeReply?.socketAddress()
|
return stunProbeReply?.socketAddress()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,72 +15,59 @@ import Foundation
|
|||||||
//
|
//
|
||||||
|
|
||||||
import Foundation
|
import Foundation
|
||||||
@preconcurrency import NIOCore
|
import NIOCore
|
||||||
import NIOPosix
|
import NIOPosix
|
||||||
|
|
||||||
// 处理和sn-server服务器之间的通讯
|
// 处理和sn-server服务器之间的通讯
|
||||||
class SDLNoticeClient: ChannelInboundHandler, @unchecked Sendable {
|
actor SDLNoticeClient {
|
||||||
public typealias InboundIn = AddressedEnvelope<ByteBuffer>
|
|
||||||
public typealias OutboundOut = AddressedEnvelope<ByteBuffer>
|
|
||||||
|
|
||||||
var channel: Channel?
|
|
||||||
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||||
|
private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>
|
||||||
private let remoteAddress: SocketAddress
|
private let remoteAddress: SocketAddress
|
||||||
|
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: Data.self, bufferingPolicy: .unbounded)
|
||||||
init() {
|
|
||||||
self.remoteAddress = try! SocketAddress(ipAddress: "127.0.0.1", port: 50195)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 启动函数
|
// 启动函数
|
||||||
func start() {
|
init() async throws {
|
||||||
|
self.remoteAddress = try! SocketAddress(ipAddress: "127.0.0.1", port: 50195)
|
||||||
|
|
||||||
let bootstrap = DatagramBootstrap(group: self.group)
|
let bootstrap = DatagramBootstrap(group: self.group)
|
||||||
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
||||||
.channelInitializer { channel in
|
|
||||||
// 接收缓冲区
|
self.asyncChannel = try await bootstrap.bind(host: "0.0.0.0", port: 0)
|
||||||
channel.pipeline.addHandler(self)
|
.flatMapThrowing {channel in
|
||||||
|
return try NIOAsyncChannel(wrappingChannelSynchronously: channel, configuration: .init(
|
||||||
|
inboundType: AddressedEnvelope<ByteBuffer>.self,
|
||||||
|
outboundType: AddressedEnvelope<ByteBuffer>.self
|
||||||
|
))
|
||||||
|
}
|
||||||
|
.get()
|
||||||
|
|
||||||
|
SDLLogger.log("[SDLNoticeClient] started and listening on: \(self.asyncChannel.channel.localAddress!)", level: .debug)
|
||||||
}
|
}
|
||||||
|
|
||||||
self.channel = try! bootstrap.bind(host: "0.0.0.0", port: 0).wait()
|
func start() async throws {
|
||||||
SDLLogger.log("[SDLNoticeClient] started and listening on: \(self.channel?.localAddress!)", level: .debug)
|
try await self.asyncChannel.executeThenClose { inbound, outbound in
|
||||||
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
|
group.addTask {
|
||||||
|
defer {
|
||||||
|
self.writeContinuation.finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
// -- MARK: ChannelInboundHandler Methods
|
for try await message in self.writeStream {
|
||||||
|
let buf = self.asyncChannel.channel.allocator.buffer(bytes: message)
|
||||||
public func channelActive(context: ChannelHandlerContext) {
|
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: self.remoteAddress, data: buf)
|
||||||
|
|
||||||
|
try await outbound.write(envelope)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 接收到的消息, 消息需要根据类型分流
|
try await group.waitForAll()
|
||||||
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
|
||||||
context.fireChannelRead(data)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public func errorCaught(context: ChannelHandlerContext, error: Error) {
|
|
||||||
// As we are not really interested getting notified on success or failure we just pass nil as promise to
|
|
||||||
// reduce allocations.
|
|
||||||
context.close(promise: nil)
|
|
||||||
self.channel = nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public func channelInactive(context: ChannelHandlerContext) {
|
|
||||||
self.channel = nil
|
|
||||||
context.close(promise: nil)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 处理写入逻辑
|
// 处理写入逻辑
|
||||||
func send(data: Data) {
|
func send(data: Data) {
|
||||||
guard let channel = self.channel else {
|
self.writeContinuation.yield(data)
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
let remoteAddress = self.remoteAddress
|
|
||||||
let allocator = channel.allocator
|
|
||||||
|
|
||||||
channel.eventLoop.execute { [allocator] in
|
|
||||||
let buffer = allocator.buffer(bytes: data)
|
|
||||||
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: remoteAddress, data: buffer)
|
|
||||||
channel.writeAndFlush(self.wrapOutboundOut(envelope), promise: nil)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
deinit {
|
deinit {
|
||||||
|
|||||||
@ -10,7 +10,7 @@ import NIOCore
|
|||||||
import NIOPosix
|
import NIOPosix
|
||||||
|
|
||||||
// --MARK: 和SuperNode的客户端
|
// --MARK: 和SuperNode的客户端
|
||||||
actor SDLSuperClientActor {
|
actor SDLSuperClient {
|
||||||
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||||
private let asyncChannel: NIOAsyncChannel<ByteBuffer,ByteBuffer>
|
private let asyncChannel: NIOAsyncChannel<ByteBuffer,ByteBuffer>
|
||||||
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: TcpMessage.self, bufferingPolicy: .unbounded)
|
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: TcpMessage.self, bufferingPolicy: .unbounded)
|
||||||
@ -31,7 +31,6 @@ actor SDLSuperClientActor {
|
|||||||
// 定义事件类型
|
// 定义事件类型
|
||||||
enum SuperEvent {
|
enum SuperEvent {
|
||||||
case ready
|
case ready
|
||||||
case closed
|
|
||||||
case event(SDLEvent)
|
case event(SDLEvent)
|
||||||
case command(UInt32, SDLCommand)
|
case command(UInt32, SDLCommand)
|
||||||
}
|
}
|
||||||
@ -58,12 +57,15 @@ actor SDLSuperClientActor {
|
|||||||
|
|
||||||
func start() async throws {
|
func start() async throws {
|
||||||
try await self.asyncChannel.executeThenClose { inbound, outbound in
|
try await self.asyncChannel.executeThenClose { inbound, outbound in
|
||||||
|
self.inboundContinuation.yield(.ready)
|
||||||
|
|
||||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
group.addTask {
|
group.addTask {
|
||||||
defer {
|
try await self.asyncChannel.channel.closeFuture.get()
|
||||||
self.inboundContinuation.finish()
|
NSLog("[SDLSuperClient] socket closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
group.addTask {
|
||||||
for try await var packet in inbound {
|
for try await var packet in inbound {
|
||||||
if let message = SDLSuperClientDecoder.decode(buffer: &packet) {
|
if let message = SDLSuperClientDecoder.decode(buffer: &packet) {
|
||||||
SDLLogger.log("[SDLSuperTransport] read message: \(message)", level: .warning)
|
SDLLogger.log("[SDLSuperTransport] read message: \(message)", level: .warning)
|
||||||
@ -80,10 +82,6 @@ actor SDLSuperClientActor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
group.addTask {
|
group.addTask {
|
||||||
defer {
|
|
||||||
self.writeContinuation.finish()
|
|
||||||
}
|
|
||||||
|
|
||||||
for try await message in self.writeStream {
|
for try await message in self.writeStream {
|
||||||
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 5)
|
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 5)
|
||||||
buffer.writeInteger(message.packetId, as: UInt32.self)
|
buffer.writeInteger(message.packetId, as: UInt32.self)
|
||||||
@ -116,7 +114,7 @@ actor SDLSuperClientActor {
|
|||||||
self.send(type: .commandAck, packetId: packetId, data: data)
|
self.send(type: .commandAck, packetId: packetId, data: data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func registerSuper(context ctx: SDLContext) async throws -> SDLSuperInboundMessage {
|
func registerSuper(context ctx: SDLContext) throws -> EventLoopFuture<SDLSuperInboundMessage> {
|
||||||
var registerSuper = SDLRegisterSuper()
|
var registerSuper = SDLRegisterSuper()
|
||||||
registerSuper.version = UInt32(ctx.config.version)
|
registerSuper.version = UInt32(ctx.config.version)
|
||||||
registerSuper.clientID = ctx.config.clientId
|
registerSuper.clientID = ctx.config.clientId
|
||||||
@ -126,15 +124,15 @@ actor SDLSuperClientActor {
|
|||||||
|
|
||||||
let data = try! registerSuper.serializedData()
|
let data = try! registerSuper.serializedData()
|
||||||
|
|
||||||
return try await self.write(type: .registerSuper, data: data).get()
|
return self.write(type: .registerSuper, data: data)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 查询目标服务器的相关信息
|
// 查询目标服务器的相关信息
|
||||||
func queryInfo(context ctx: SDLContext, dst_mac: Data) async throws -> SDLSuperInboundMessage {
|
func queryInfo(dst_mac: Data) async throws -> EventLoopFuture<SDLSuperInboundMessage> {
|
||||||
var queryInfo = SDLQueryInfo()
|
var queryInfo = SDLQueryInfo()
|
||||||
queryInfo.dstMac = dst_mac
|
queryInfo.dstMac = dst_mac
|
||||||
|
|
||||||
return try await self.write(type: .queryInfo, data: try! queryInfo.serializedData()).get()
|
return self.write(type: .queryInfo, data: try! queryInfo.serializedData())
|
||||||
}
|
}
|
||||||
|
|
||||||
func unregister(context ctx: SDLContext) throws {
|
func unregister(context ctx: SDLContext) throws {
|
||||||
@ -154,17 +152,18 @@ actor SDLSuperClientActor {
|
|||||||
self.send(type: .flowTracer, packetId: 0, data: try! flow.serializedData())
|
self.send(type: .flowTracer, packetId: 0, data: try! flow.serializedData())
|
||||||
}
|
}
|
||||||
|
|
||||||
func write(type: SDLPacketType, data: Data) -> EventLoopFuture<SDLSuperInboundMessage> {
|
private func write(type: SDLPacketType, data: Data) -> EventLoopFuture<SDLSuperInboundMessage> {
|
||||||
SDLLogger.log("[SDLSuperTransport] will write data: \(data)", level: .debug)
|
SDLLogger.log("[SDLSuperTransport] will write data: \(data)", level: .debug)
|
||||||
let packetId = idGenerator.nextId()
|
let packetId = idGenerator.nextId()
|
||||||
let promise = self.asyncChannel.channel.eventLoop.makePromise(of: SDLSuperInboundMessage.self)
|
let promise = self.asyncChannel.channel.eventLoop.makePromise(of: SDLSuperInboundMessage.self)
|
||||||
self.callbackPromises[packetId] = promise
|
self.callbackPromises[packetId] = promise
|
||||||
|
|
||||||
self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data))
|
self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data))
|
||||||
|
|
||||||
return promise.futureResult
|
return promise.futureResult
|
||||||
}
|
}
|
||||||
|
|
||||||
func send(type: SDLPacketType, packetId: UInt32, data: Data) {
|
private func send(type: SDLPacketType, packetId: UInt32, data: Data) {
|
||||||
self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data))
|
self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -180,6 +179,7 @@ actor SDLSuperClientActor {
|
|||||||
|
|
||||||
deinit {
|
deinit {
|
||||||
try! group.syncShutdownGracefully()
|
try! group.syncShutdownGracefully()
|
||||||
|
self.inboundContinuation.finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -9,28 +9,27 @@ import Foundation
|
|||||||
import NIOCore
|
import NIOCore
|
||||||
import NIOPosix
|
import NIOPosix
|
||||||
|
|
||||||
struct UDPMessage {
|
|
||||||
let remoteAddress: SocketAddress
|
|
||||||
let type: SDLPacketType
|
|
||||||
let data: Data
|
|
||||||
}
|
|
||||||
|
|
||||||
// 处理和sn-server服务器之间的通讯
|
// 处理和sn-server服务器之间的通讯
|
||||||
actor SDLUDPHoleActor {
|
actor SDLUDPHole {
|
||||||
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||||
private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>
|
private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>
|
||||||
private let (writeStream, continuation) = AsyncStream.makeStream(of: UDPMessage.self, bufferingPolicy: .unbounded)
|
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: UDPMessage.self, bufferingPolicy: .unbounded)
|
||||||
|
|
||||||
private var cookieGenerator = SDLIdGenerator(seed: 1)
|
private var cookieGenerator = SDLIdGenerator(seed: 1)
|
||||||
private var promises: [UInt32:EventLoopPromise<SDLStunProbeReply>] = [:]
|
private var promises: [UInt32:EventLoopPromise<SDLStunProbeReply>] = [:]
|
||||||
public var localAddress: SocketAddress?
|
public var localAddress: SocketAddress?
|
||||||
|
|
||||||
public let (eventFlow, inboundContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded)
|
public let (eventFlow, eventContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded)
|
||||||
|
|
||||||
|
struct UDPMessage {
|
||||||
|
let remoteAddress: SocketAddress
|
||||||
|
let type: SDLPacketType
|
||||||
|
let data: Data
|
||||||
|
}
|
||||||
|
|
||||||
// 定义事件类型
|
// 定义事件类型
|
||||||
enum UDPEvent {
|
enum UDPEvent {
|
||||||
case ready
|
case ready
|
||||||
case closed
|
|
||||||
case message(SocketAddress, SDLHoleInboundMessage)
|
case message(SocketAddress, SDLHoleInboundMessage)
|
||||||
case data(SDLData)
|
case data(SDLData)
|
||||||
}
|
}
|
||||||
@ -55,12 +54,15 @@ actor SDLUDPHoleActor {
|
|||||||
|
|
||||||
func start() async throws {
|
func start() async throws {
|
||||||
try await self.asyncChannel.executeThenClose { inbound, outbound in
|
try await self.asyncChannel.executeThenClose { inbound, outbound in
|
||||||
|
self.eventContinuation.yield(.ready)
|
||||||
|
|
||||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
group.addTask {
|
group.addTask {
|
||||||
defer {
|
try await self.asyncChannel.channel.closeFuture.get()
|
||||||
self.inboundContinuation.finish()
|
NSLog("[UDPHole] channel closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
group.addTask {
|
||||||
for try await envelope in inbound {
|
for try await envelope in inbound {
|
||||||
var buffer = envelope.data
|
var buffer = envelope.data
|
||||||
let remoteAddress = envelope.remoteAddress
|
let remoteAddress = envelope.remoteAddress
|
||||||
@ -69,12 +71,12 @@ actor SDLUDPHoleActor {
|
|||||||
switch message {
|
switch message {
|
||||||
case .data(let data):
|
case .data(let data):
|
||||||
SDLLogger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug)
|
SDLLogger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug)
|
||||||
self.inboundContinuation.yield(.data(data))
|
self.eventContinuation.yield(.data(data))
|
||||||
case .stunProbeReply(let probeReply):
|
case .stunProbeReply(let probeReply):
|
||||||
// 执行并移除回调
|
// 执行并移除回调
|
||||||
await self.trigger(probeReply: probeReply)
|
await self.trigger(probeReply: probeReply)
|
||||||
default:
|
default:
|
||||||
self.inboundContinuation.yield(.message(remoteAddress, message))
|
self.eventContinuation.yield(.message(remoteAddress, message))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
SDLLogger.log("[SDLUDPHole] decode message, get null", level: .warning)
|
SDLLogger.log("[SDLUDPHole] decode message, get null", level: .warning)
|
||||||
@ -86,10 +88,6 @@ actor SDLUDPHoleActor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
group.addTask {
|
group.addTask {
|
||||||
defer {
|
|
||||||
self.continuation.finish()
|
|
||||||
}
|
|
||||||
|
|
||||||
for try await message in self.writeStream {
|
for try await message in self.writeStream {
|
||||||
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1)
|
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1)
|
||||||
buffer.writeBytes([message.type.rawValue])
|
buffer.writeBytes([message.type.rawValue])
|
||||||
@ -100,8 +98,6 @@ actor SDLUDPHoleActor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//eventFlow.send(.ready)
|
|
||||||
|
|
||||||
try await group.waitForAll()
|
try await group.waitForAll()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -194,14 +190,14 @@ actor SDLUDPHoleActor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 发送register包
|
// 发送register包
|
||||||
func sendRegister(context ctx: SDLContext, remoteAddress: SocketAddress, dst_mac: Data) {
|
func sendRegister(remoteAddress: SocketAddress, networkId: UInt32, srcMac: Data, dst_mac: Data) {
|
||||||
var register = SDLRegister()
|
var register = SDLRegister()
|
||||||
register.networkID = ctx.devAddr.networkID
|
register.networkID = networkId
|
||||||
register.srcMac = ctx.devAddr.mac
|
register.srcMac = srcMac
|
||||||
register.dstMac = dst_mac
|
register.dstMac = dst_mac
|
||||||
|
|
||||||
if let packet = try? register.serializedData() {
|
if let packet = try? register.serializedData() {
|
||||||
SDLLogger.log("[SDLUDPHole] SendRegister: \(remoteAddress), src_mac: \(LayerPacket.MacAddress.description(data: ctx.devAddr.mac)), dst_mac: \(LayerPacket.MacAddress.description(data: dst_mac))", level: .debug)
|
SDLLogger.log("[SDLUDPHole] SendRegister: \(remoteAddress), src_mac: \(LayerPacket.MacAddress.description(data: srcMac)), dst_mac: \(LayerPacket.MacAddress.description(data: dst_mac))", level: .debug)
|
||||||
self.send(remoteAddress: remoteAddress, type: .register, data: packet)
|
self.send(remoteAddress: remoteAddress, type: .register, data: packet)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -222,7 +218,7 @@ actor SDLUDPHoleActor {
|
|||||||
// 处理写入逻辑
|
// 处理写入逻辑
|
||||||
private func send(remoteAddress: SocketAddress, type: SDLPacketType, data: Data) {
|
private func send(remoteAddress: SocketAddress, type: SDLPacketType, data: Data) {
|
||||||
let message = UDPMessage(remoteAddress: remoteAddress, type: type, data: data)
|
let message = UDPMessage(remoteAddress: remoteAddress, type: type, data: data)
|
||||||
self.continuation.yield(message)
|
self.writeContinuation.yield(message)
|
||||||
}
|
}
|
||||||
|
|
||||||
//--MARK: 编解码器
|
//--MARK: 编解码器
|
||||||
@ -257,6 +253,7 @@ actor SDLUDPHoleActor {
|
|||||||
|
|
||||||
deinit {
|
deinit {
|
||||||
try? self.group.syncShutdownGracefully()
|
try? self.group.syncShutdownGracefully()
|
||||||
|
self.writeContinuation.finish()
|
||||||
|
self.eventContinuation.finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user