Compare commits

..

No commits in common. "0fe9b43c087b5dacb6cb24fea347dee86b86ac3f" and "b55d9913cc4ce93a61aa6869f17173e1a7214108" have entirely different histories.

7 changed files with 260 additions and 272 deletions

View File

@ -0,0 +1,31 @@
//
// HolerManager.swift
// sdlan
//
// Created by on 2025/7/14.
//
import Foundation
actor HolerManager {
private var holers: [Data:Task<(), Never>] = [:]
func addHoler(dstMac: Data, creator: @escaping () -> Task<(), Never>) {
if let task = self.holers[dstMac] {
if task.isCancelled {
self.holers[dstMac] = creator()
}
} else {
self.holers[dstMac] = creator()
}
}
func cleanup() {
for holer in holers.values {
holer.cancel()
}
self.holers.removeAll()
}
}

View File

@ -47,8 +47,8 @@ public class SDLContext: @unchecked Sendable {
let rsaCipher: RSACipher let rsaCipher: RSACipher
// //
var udpHole: SDLUDPHole? var udpHoleActor: SDLUDPHoleActor?
var superClient: SDLSuperClient? var superClientActor: SDLSuperClientActor?
// //
private var readTask: Task<(), Never>? private var readTask: Task<(), Never>?
@ -56,6 +56,7 @@ public class SDLContext: @unchecked Sendable {
let provider: NEPacketTunnelProvider let provider: NEPacketTunnelProvider
private var sessionManager: SessionManager private var sessionManager: SessionManager
private var holerManager: HolerManager
private var arpServer: ArpServer private var arpServer: ArpServer
// stunRequestcookie // stunRequestcookie
@ -69,23 +70,12 @@ public class SDLContext: @unchecked Sendable {
private var monitorCancel: AnyCancellable? private var monitorCancel: AnyCancellable?
// socket // socket
private var noticeClient: SDLNoticeClient? private var noticeClient: SDLNoticeClient
// //
private var flowTracer = SDLFlowTracerActor() private var flowTracer = SDLFlowTracerActor()
private var flowTracerCancel: AnyCancellable? private var flowTracerCancel: AnyCancellable?
// holer
private var holerPublishers: [Data:PassthroughSubject<RegisterRequest, Never>] = [:]
private var bag = Set<AnyCancellable>()
private var locker = NSLock()
struct RegisterRequest {
let srcMac: Data
let dstMac: Data
let networkId: UInt32
}
public init(provider: NEPacketTunnelProvider, config: SDLConfiguration, rsaCipher: RSACipher, aesCipher: AESCipher) { public init(provider: NEPacketTunnelProvider, config: SDLConfiguration, rsaCipher: RSACipher, aesCipher: AESCipher) {
self.config = config self.config = config
self.rsaCipher = rsaCipher self.rsaCipher = rsaCipher
@ -98,137 +88,73 @@ public class SDLContext: @unchecked Sendable {
self.provider = provider self.provider = provider
self.sessionManager = SessionManager() self.sessionManager = SessionManager()
self.holerManager = HolerManager()
self.arpServer = ArpServer(known_macs: [:]) self.arpServer = ArpServer(known_macs: [:])
self.noticeClient = SDLNoticeClient()
} }
public func start() async throws { public func start() async throws {
self.noticeClient = try await SDLNoticeClient() self.udpHoleActor = try await SDLUDPHoleActor()
self.superClientActor = try await SDLSuperClientActor(host: self.config.superHost, port: self.config.superPort)
try await withThrowingTaskGroup(of: Void.self) { group in try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { group.addTask {
while !Task.isCancelled { try await self.udpHoleActor?.start()
do {
try await self.startUDPHole()
} catch let err {
NSLog("udp Hole get err: \(err)")
}
}
} }
group.addTask { group.addTask {
while !Task.isCancelled { try await self.superClientActor?.start()
do {
try await self.startSuperClient()
} catch let err {
NSLog("SuperClient get error: \(err)")
await self.arpServer.clear()
try? await Task.sleep(for: .seconds(2))
}
}
} }
group.addTask { group.addTask {
try await self.startMonitor() if let eventFlow = self.superClientActor?.eventFlow {
}
group.addTask {
while !Task.isCancelled {
do {
try await self.noticeClient?.start()
} catch let err {
NSLog("noticeClient get err: \(err)")
}
}
}
try await group.waitForAll()
}
}
public func stop() async {
self.superClient = nil
self.udpHole = nil
self.readTask?.cancel()
}
private func startUDPHole() async throws {
self.udpHole = try await SDLUDPHole()
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await self.udpHole?.start()
}
group.addTask {
while !Task.isCancelled {
try await Task.sleep(nanoseconds: 5 * 1_000_000_000)
self.lastCookie = await self.udpHole?.stunRequest(context: self)
}
}
group.addTask {
if let eventFlow = self.udpHole?.eventFlow {
for try await event in eventFlow {
try await self.handleUDPEvent(event: event)
}
}
}
try await group.waitForAll()
}
}
private func startSuperClient() async throws {
self.superClient = try await SDLSuperClient(host: self.config.superHost, port: self.config.superPort)
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await self.superClient?.start()
}
group.addTask {
if let eventFlow = self.superClient?.eventFlow {
for try await event in eventFlow { for try await event in eventFlow {
try await self.handleSuperEvent(event: event) try await self.handleSuperEvent(event: event)
} }
} }
} }
try await group.waitForAll()
group.addTask {
if let eventFlow = self.udpHoleActor?.eventFlow {
for try await event in eventFlow {
try await self.handleUDPEvent(event: event)
} }
} }
private func startMonitor() async throws {
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await self.noticeClient?.start()
}
group.addTask {
//
self.monitorCancel = self.monitor.eventFlow.sink { event in
switch event {
case .changed:
// nat
Task {
self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config)
NSLog("didNetworkPathChanged, nat type is: \(self.natType)")
}
case .unreachable:
NSLog("didNetworkPathUnreachable")
}
}
self.monitor.start()
} }
try await group.waitForAll() try await group.waitForAll()
} }
// self.noticeClient.start()
// //
// self.monitorCancel = self.monitor.eventFlow.sink { event in
// switch event {
// case .changed:
// // nat
// Task {
// self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config)
// NSLog("didNetworkPathChanged, nat type is: \(self.natType)")
// }
// case .unreachable:
// NSLog("didNetworkPathUnreachable")
// }
// }
// self.monitor.start()
} }
private func handleSuperEvent(event: SDLSuperClient.SuperEvent) async throws { public func stop() async {
self.superClientActor = nil
self.udpHoleActor = nil
self.readTask?.cancel()
}
private func handleSuperEvent(event: SDLSuperClientActor.SuperEvent) async throws {
switch event { switch event {
case .ready: case .ready:
NSLog("[SDLContext] get registerSuper, mac address: \(Self.formatMacAddress(mac: self.devAddr.mac))") NSLog("[SDLContext] get registerSuper, mac address: \(Self.formatMacAddress(mac: self.devAddr.mac))")
guard let message = try await self.superClient?.registerSuper(context: self).get() else { guard let message = try await self.superClientActor?.registerSuper(context: self) else {
return return
} }
@ -243,7 +169,7 @@ public class SDLContext: @unchecked Sendable {
if upgradeType == .force { if upgradeType == .force {
let forceUpgrade = NoticeMessage.UpgradeMessage(prompt: registerSuperAck.upgradePrompt, address: registerSuperAck.upgradeAddress) let forceUpgrade = NoticeMessage.UpgradeMessage(prompt: registerSuperAck.upgradePrompt, address: registerSuperAck.upgradeAddress)
await self.noticeClient?.send(data: forceUpgrade.binaryData) self.noticeClient.send(data: forceUpgrade.binaryData)
exit(-1) exit(-1)
} }
@ -253,7 +179,7 @@ public class SDLContext: @unchecked Sendable {
if upgradeType == .normal { if upgradeType == .normal {
let normalUpgrade = NoticeMessage.UpgradeMessage(prompt: registerSuperAck.upgradePrompt, address: registerSuperAck.upgradeAddress) let normalUpgrade = NoticeMessage.UpgradeMessage(prompt: registerSuperAck.upgradePrompt, address: registerSuperAck.upgradeAddress)
await self.noticeClient?.send(data: normalUpgrade.binaryData) self.noticeClient.send(data: normalUpgrade.binaryData)
} }
case .registerSuperNak(let nakPacket): case .registerSuperNak(let nakPacket):
@ -265,17 +191,25 @@ public class SDLContext: @unchecked Sendable {
switch errorCode { switch errorCode {
case .invalidToken, .nodeDisabled: case .invalidToken, .nodeDisabled:
let alertNotice = NoticeMessage.AlertMessage(alert: errorMessage) let alertNotice = NoticeMessage.AlertMessage(alert: errorMessage)
await self.noticeClient?.send(data: alertNotice.binaryData) self.noticeClient.send(data: alertNotice.binaryData)
exit(-1) exit(-1)
case .noIpAddress, .networkFault, .internalFault: case .noIpAddress, .networkFault, .internalFault:
let alertNotice = NoticeMessage.AlertMessage(alert: errorMessage) let alertNotice = NoticeMessage.AlertMessage(alert: errorMessage)
await self.noticeClient?.send(data: alertNotice.binaryData) self.noticeClient.send(data: alertNotice.binaryData)
} }
NSLog("[SDLContext] Get a SuperNak message exit") NSLog("[SDLContext] Get a SuperNak message exit")
default: default:
() ()
} }
case .closed:
NSLog("[SDLContext] super client closed")
await self.arpServer.clear()
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
// Task {@MainActor in
// try await self.startSuperClient()
// }
}
case .event(let evt): case .event(let evt):
switch evt { switch evt {
case .natChanged(let natChangedEvent): case .natChanged(let natChangedEvent):
@ -287,12 +221,12 @@ public class SDLContext: @unchecked Sendable {
let address = SDLUtil.int32ToIp(sendRegisterEvent.natIp) let address = SDLUtil.int32ToIp(sendRegisterEvent.natIp)
if let remoteAddress = try? SocketAddress.makeAddressResolvingHost(address, port: Int(sendRegisterEvent.natPort)) { if let remoteAddress = try? SocketAddress.makeAddressResolvingHost(address, port: Int(sendRegisterEvent.natPort)) {
// register // register
await self.udpHole?.sendRegister(remoteAddress: remoteAddress, networkId: self.devAddr.networkID, srcMac: self.devAddr.mac, dst_mac: sendRegisterEvent.dstMac) await self.udpHoleActor?.sendRegister(context: self, remoteAddress: remoteAddress, dst_mac: sendRegisterEvent.dstMac)
} }
case .networkShutdown(let shutdownEvent): case .networkShutdown(let shutdownEvent):
let alertNotice = NoticeMessage.AlertMessage(alert: shutdownEvent.message) let alertNotice = NoticeMessage.AlertMessage(alert: shutdownEvent.message)
await self.noticeClient?.send(data: alertNotice.binaryData) self.noticeClient.send(data: alertNotice.binaryData)
exit(-1) exit(-1)
} }
case .command(let packetId, let command): case .command(let packetId, let command):
@ -310,19 +244,44 @@ public class SDLContext: @unchecked Sendable {
var commandAck = SDLCommandAck() var commandAck = SDLCommandAck()
commandAck.status = true commandAck.status = true
await self.superClient?.commandAck(packetId: packetId, ack: commandAck) await self.superClientActor?.commandAck(packetId: packetId, ack: commandAck)
} }
} }
} }
private func handleUDPEvent(event: SDLUDPHole.UDPEvent) async throws { private func startUDPHole() async throws {
// self.udpHole = SDLUDPHole()
//
// self.udpCancel?.cancel()
// self.udpCancel = self.udpHole?.eventFlow.sink { event in
// Task.detached {
// await self.handleUDPEvent(event: event)
// }
// }
//
// try await self.udpHole?.start()
}
private func handleUDPEvent(event: SDLUDPHoleActor.UDPEvent) async throws {
switch event { switch event {
case .ready: case .ready:
// //
//self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config) self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config)
SDLLogger.log("[SDLContext] nat type is: \(self.natType)", level: .debug) SDLLogger.log("[SDLContext] nat type is: \(self.natType)", level: .debug)
let timer = Timer.publish(every: 5.0, on: .main, in: .common).autoconnect()
// self.stunCancel = Just(Date()).merge(with: timer).sink { _ in
// self.lastCookie = await self.udpHoleActor?.stunRequest(context: self)
// }
case .closed:
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
Task {
try await self.startUDPHole()
}
}
case .message(let remoteAddress, let message): case .message(let remoteAddress, let message):
switch message { switch message {
case .register(let register): case .register(let register):
@ -330,7 +289,7 @@ public class SDLContext: @unchecked Sendable {
// tun, // tun,
if register.dstMac == self.devAddr.mac && register.networkID == self.devAddr.networkID { if register.dstMac == self.devAddr.mac && register.networkID == self.devAddr.networkID {
// ack // ack
await self.udpHole?.sendRegisterAck(context: self, remoteAddress: remoteAddress, dst_mac: register.srcMac) self.udpHoleActor?.sendRegisterAck(context: self, remoteAddress: remoteAddress, dst_mac: register.srcMac)
// , super-nodenatudpnat // , super-nodenatudpnat
let session = Session(dstMac: register.srcMac, natAddress: remoteAddress) let session = Session(dstMac: register.srcMac, natAddress: remoteAddress)
await self.sessionManager.addSession(session: session) await self.sessionManager.addSession(session: session)
@ -412,18 +371,18 @@ public class SDLContext: @unchecked Sendable {
} }
// //
// public func flowReportTask() { public func flowReportTask() {
// Task { Task {
// // //
// self.flowTracerCancel = Timer.publish(every: 60.0, on: .main, in: .common).autoconnect() self.flowTracerCancel = Timer.publish(every: 60.0, on: .main, in: .common).autoconnect()
// .sink { _ in .sink { _ in
// Task { Task {
// let (forwardNum, p2pNum, inboundNum) = await self.flowTracer.reset() let (forwardNum, p2pNum, inboundNum) = await self.flowTracer.reset()
// await self.superClient?.flowReport(forwardNum: forwardNum, p2pNum: p2pNum, inboundNum: inboundNum) await self.superClientActor?.flowReport(forwardNum: forwardNum, p2pNum: p2pNum, inboundNum: inboundNum)
// } }
// } }
// } }
// } }
// //
private func didNetworkConfigChanged(devAddr: SDLDevAddr, dnsServers: [String]? = nil) async { private func didNetworkConfigChanged(devAddr: SDLDevAddr, dnsServers: [String]? = nil) async {
@ -453,6 +412,8 @@ public class SDLContext: @unchecked Sendable {
// //
do { do {
try await self.provider.setTunnelNetworkSettings(networkSettings) try await self.provider.setTunnelNetworkSettings(networkSettings)
await self.holerManager.cleanup()
self.startReader() self.startReader()
NSLog("[SDLContext] setTunnelNetworkSettings success, start read packet") NSLog("[SDLContext] setTunnelNetworkSettings success, start read packet")
@ -493,7 +454,7 @@ public class SDLContext: @unchecked Sendable {
else { else {
// arp // arp
let broadcastMac = Data([0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]) let broadcastMac = Data([0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])
let arpReqeust = ARPPacket.arpRequest(senderIP: self.devAddr.netAddr, senderMAC: self.devAddr.mac, targetIP: dstIp) let arpReqeust: ARPPacket = ARPPacket.arpRequest(senderIP: self.devAddr.netAddr, senderMAC: self.devAddr.mac, targetIP: dstIp)
await self.routeLayerPacket(dstMac: broadcastMac, type: .arp, data: arpReqeust.marshal()) await self.routeLayerPacket(dstMac: broadcastMac, type: .arp, data: arpReqeust.marshal())
NSLog("[SDLContext] dstIp: \(dstIp) arp query not found") NSLog("[SDLContext] dstIp: \(dstIp) arp query not found")
@ -517,47 +478,26 @@ public class SDLContext: @unchecked Sendable {
// session // session
if let session = await self.sessionManager.getSession(toAddress: dstMac) { if let session = await self.sessionManager.getSession(toAddress: dstMac) {
NSLog("[SDLContext] send packet by session: \(session)") NSLog("[SDLContext] send packet by session: \(session)")
await self.udpHole?.sendPacket(context: self, session: session, data: encodedPacket) await self.udpHoleActor?.sendPacket(context: self, session: session, data: encodedPacket)
await self.flowTracer.inc(num: data.count, type: .p2p) await self.flowTracer.inc(num: data.count, type: .p2p)
} }
else { else {
// super_node // super_node
await self.udpHole?.forwardPacket(context: self, dst_mac: dstMac, data: encodedPacket) await self.udpHoleActor?.forwardPacket(context: self, dst_mac: dstMac, data: encodedPacket)
// //
await self.flowTracer.inc(num: data.count, type: .forward) await self.flowTracer.inc(num: data.count, type: .forward)
// //
let registerRequest = RegisterRequest(srcMac: self.devAddr.mac, dstMac: dstMac, networkId: self.devAddr.networkID) await self.holerManager.addHoler(dstMac: dstMac) {
self.submitRegisterRequest(request: registerRequest) self.holerTask(dstMac: dstMac)
}
} }
} }
private func submitRegisterRequest(request: RegisterRequest) { func holerTask(dstMac: Data) -> Task<(), Never> {
self.locker.lock() return Task {
defer { guard let message = try? await self.superClientActor?.queryInfo(context: self, dst_mac: dstMac) else {
self.locker.unlock()
}
let dstMac = request.dstMac
if let publisher = self.holerPublishers[dstMac] {
publisher.send(request)
} else {
let publisher = PassthroughSubject<RegisterRequest, Never>()
publisher.debounce(for: .seconds(5), scheduler: DispatchQueue.global())
.sink { request in
Task {
await self.tryHole(request: request)
}
}
.store(in: &self.bag)
self.holerPublishers[dstMac] = publisher
}
}
private func tryHole(request: RegisterRequest) async {
guard let message = try? await self.superClient?.queryInfo(dst_mac: request.dstMac).get() else {
return return
} }
@ -568,7 +508,7 @@ public class SDLContext: @unchecked Sendable {
if let remoteAddress = peerInfo.v4Info.socketAddress() { if let remoteAddress = peerInfo.v4Info.socketAddress() {
SDLLogger.log("[SDLContext] hole sock address: \(remoteAddress)", level: .warning) SDLLogger.log("[SDLContext] hole sock address: \(remoteAddress)", level: .warning)
// register // register
await self.udpHole?.sendRegister(remoteAddress: remoteAddress, networkId: request.networkId, srcMac: request.srcMac, dst_mac: request.dstMac) await self.udpHoleActor?.sendRegister(context: self, remoteAddress: remoteAddress, dst_mac: dstMac)
} else { } else {
SDLLogger.log("[SDLContext] hole sock address is invalid: \(peerInfo.v4Info)", level: .warning) SDLLogger.log("[SDLContext] hole sock address is invalid: \(peerInfo.v4Info)", level: .warning)
} }
@ -576,11 +516,12 @@ public class SDLContext: @unchecked Sendable {
SDLLogger.log("[SDLContext] hole query_info is packet: \(message)", level: .warning) SDLLogger.log("[SDLContext] hole query_info is packet: \(message)", level: .warning)
} }
} }
}
deinit { deinit {
self.stunCancel?.cancel() self.stunCancel?.cancel()
self.udpHole = nil self.udpHoleActor = nil
self.superClient = nil self.superClientActor = nil
} }
} }

View File

@ -54,7 +54,7 @@ enum SDLUpgradeType: UInt32 {
} }
// Id // Id
struct SDLIdGenerator: Sendable { struct SDLIdGenerator {
// id // id
private var packetId: UInt32 private var packetId: UInt32

View File

@ -34,7 +34,7 @@ struct SDLNatProber {
} }
// nat // nat
if await natAddress1 == udpHole.localAddress { if natAddress1 == udpHole.localAddress {
return .noNat return .noNat
} }
@ -68,7 +68,7 @@ struct SDLNatProber {
} }
private static func getNatAddress(_ udpHole: SDLUDPHole, remoteAddress: SocketAddress, attr: SDLProbeAttr) async -> SocketAddress? { private static func getNatAddress(_ udpHole: SDLUDPHole, remoteAddress: SocketAddress, attr: SDLProbeAttr) async -> SocketAddress? {
let stunProbeReply = try? await udpHole.stunProbe(remoteAddress: remoteAddress, attr: attr, timeout: 5) let stunProbeReply = await udpHole.stunProbe(remoteAddress: remoteAddress, attr: attr, timeout: 5)
return stunProbeReply?.socketAddress() return stunProbeReply?.socketAddress()
} }

View File

@ -15,59 +15,72 @@ import Foundation
// //
import Foundation import Foundation
import NIOCore @preconcurrency import NIOCore
import NIOPosix import NIOPosix
// sn-server // sn-server
actor SDLNoticeClient { class SDLNoticeClient: ChannelInboundHandler, @unchecked Sendable {
public typealias InboundIn = AddressedEnvelope<ByteBuffer>
public typealias OutboundOut = AddressedEnvelope<ByteBuffer>
var channel: Channel?
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>
private let remoteAddress: SocketAddress private let remoteAddress: SocketAddress
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: Data.self, bufferingPolicy: .unbounded)
init() {
self.remoteAddress = try! SocketAddress(ipAddress: "127.0.0.1", port: 50195)
}
// //
init() async throws { func start() {
self.remoteAddress = try! SocketAddress(ipAddress: "127.0.0.1", port: 50195)
let bootstrap = DatagramBootstrap(group: self.group) let bootstrap = DatagramBootstrap(group: self.group)
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.channelInitializer { channel in
self.asyncChannel = try await bootstrap.bind(host: "0.0.0.0", port: 0) //
.flatMapThrowing {channel in channel.pipeline.addHandler(self)
return try NIOAsyncChannel(wrappingChannelSynchronously: channel, configuration: .init(
inboundType: AddressedEnvelope<ByteBuffer>.self,
outboundType: AddressedEnvelope<ByteBuffer>.self
))
}
.get()
SDLLogger.log("[SDLNoticeClient] started and listening on: \(self.asyncChannel.channel.localAddress!)", level: .debug)
} }
func start() async throws { self.channel = try! bootstrap.bind(host: "0.0.0.0", port: 0).wait()
try await self.asyncChannel.executeThenClose { inbound, outbound in SDLLogger.log("[SDLNoticeClient] started and listening on: \(self.channel?.localAddress!)", level: .debug)
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
defer {
self.writeContinuation.finish()
} }
for try await message in self.writeStream { // -- MARK: ChannelInboundHandler Methods
let buf = self.asyncChannel.channel.allocator.buffer(bytes: message)
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: self.remoteAddress, data: buf) public func channelActive(context: ChannelHandlerContext) {
try await outbound.write(envelope)
}
} }
try await group.waitForAll() // ,
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
context.fireChannelRead(data)
} }
public func errorCaught(context: ChannelHandlerContext, error: Error) {
// As we are not really interested getting notified on success or failure we just pass nil as promise to
// reduce allocations.
context.close(promise: nil)
self.channel = nil
} }
public func channelInactive(context: ChannelHandlerContext) {
self.channel = nil
context.close(promise: nil)
} }
// //
func send(data: Data) { func send(data: Data) {
self.writeContinuation.yield(data) guard let channel = self.channel else {
return
}
let remoteAddress = self.remoteAddress
let allocator = channel.allocator
channel.eventLoop.execute { [allocator] in
let buffer = allocator.buffer(bytes: data)
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: remoteAddress, data: buffer)
channel.writeAndFlush(self.wrapOutboundOut(envelope), promise: nil)
}
} }
deinit { deinit {

View File

@ -10,7 +10,7 @@ import NIOCore
import NIOPosix import NIOPosix
// --MARK: SuperNode // --MARK: SuperNode
actor SDLSuperClient { actor SDLSuperClientActor {
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private let asyncChannel: NIOAsyncChannel<ByteBuffer,ByteBuffer> private let asyncChannel: NIOAsyncChannel<ByteBuffer,ByteBuffer>
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: TcpMessage.self, bufferingPolicy: .unbounded) private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: TcpMessage.self, bufferingPolicy: .unbounded)
@ -31,6 +31,7 @@ actor SDLSuperClient {
// //
enum SuperEvent { enum SuperEvent {
case ready case ready
case closed
case event(SDLEvent) case event(SDLEvent)
case command(UInt32, SDLCommand) case command(UInt32, SDLCommand)
} }
@ -57,15 +58,12 @@ actor SDLSuperClient {
func start() async throws { func start() async throws {
try await self.asyncChannel.executeThenClose { inbound, outbound in try await self.asyncChannel.executeThenClose { inbound, outbound in
self.inboundContinuation.yield(.ready)
try await withThrowingTaskGroup(of: Void.self) { group in try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { group.addTask {
try await self.asyncChannel.channel.closeFuture.get() defer {
NSLog("[SDLSuperClient] socket closed") self.inboundContinuation.finish()
} }
group.addTask {
for try await var packet in inbound { for try await var packet in inbound {
if let message = SDLSuperClientDecoder.decode(buffer: &packet) { if let message = SDLSuperClientDecoder.decode(buffer: &packet) {
SDLLogger.log("[SDLSuperTransport] read message: \(message)", level: .warning) SDLLogger.log("[SDLSuperTransport] read message: \(message)", level: .warning)
@ -82,6 +80,10 @@ actor SDLSuperClient {
} }
group.addTask { group.addTask {
defer {
self.writeContinuation.finish()
}
for try await message in self.writeStream { for try await message in self.writeStream {
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 5) var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 5)
buffer.writeInteger(message.packetId, as: UInt32.self) buffer.writeInteger(message.packetId, as: UInt32.self)
@ -114,7 +116,7 @@ actor SDLSuperClient {
self.send(type: .commandAck, packetId: packetId, data: data) self.send(type: .commandAck, packetId: packetId, data: data)
} }
func registerSuper(context ctx: SDLContext) throws -> EventLoopFuture<SDLSuperInboundMessage> { func registerSuper(context ctx: SDLContext) async throws -> SDLSuperInboundMessage {
var registerSuper = SDLRegisterSuper() var registerSuper = SDLRegisterSuper()
registerSuper.version = UInt32(ctx.config.version) registerSuper.version = UInt32(ctx.config.version)
registerSuper.clientID = ctx.config.clientId registerSuper.clientID = ctx.config.clientId
@ -124,15 +126,15 @@ actor SDLSuperClient {
let data = try! registerSuper.serializedData() let data = try! registerSuper.serializedData()
return self.write(type: .registerSuper, data: data) return try await self.write(type: .registerSuper, data: data).get()
} }
// //
func queryInfo(dst_mac: Data) async throws -> EventLoopFuture<SDLSuperInboundMessage> { func queryInfo(context ctx: SDLContext, dst_mac: Data) async throws -> SDLSuperInboundMessage {
var queryInfo = SDLQueryInfo() var queryInfo = SDLQueryInfo()
queryInfo.dstMac = dst_mac queryInfo.dstMac = dst_mac
return self.write(type: .queryInfo, data: try! queryInfo.serializedData()) return try await self.write(type: .queryInfo, data: try! queryInfo.serializedData()).get()
} }
func unregister(context ctx: SDLContext) throws { func unregister(context ctx: SDLContext) throws {
@ -152,18 +154,17 @@ actor SDLSuperClient {
self.send(type: .flowTracer, packetId: 0, data: try! flow.serializedData()) self.send(type: .flowTracer, packetId: 0, data: try! flow.serializedData())
} }
private func write(type: SDLPacketType, data: Data) -> EventLoopFuture<SDLSuperInboundMessage> { func write(type: SDLPacketType, data: Data) -> EventLoopFuture<SDLSuperInboundMessage> {
SDLLogger.log("[SDLSuperTransport] will write data: \(data)", level: .debug) SDLLogger.log("[SDLSuperTransport] will write data: \(data)", level: .debug)
let packetId = idGenerator.nextId() let packetId = idGenerator.nextId()
let promise = self.asyncChannel.channel.eventLoop.makePromise(of: SDLSuperInboundMessage.self) let promise = self.asyncChannel.channel.eventLoop.makePromise(of: SDLSuperInboundMessage.self)
self.callbackPromises[packetId] = promise self.callbackPromises[packetId] = promise
self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data)) self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data))
return promise.futureResult return promise.futureResult
} }
private func send(type: SDLPacketType, packetId: UInt32, data: Data) { func send(type: SDLPacketType, packetId: UInt32, data: Data) {
self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data)) self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data))
} }
@ -179,7 +180,6 @@ actor SDLSuperClient {
deinit { deinit {
try! group.syncShutdownGracefully() try! group.syncShutdownGracefully()
self.inboundContinuation.finish()
} }
} }

View File

@ -9,27 +9,28 @@ import Foundation
import NIOCore import NIOCore
import NIOPosix import NIOPosix
// sn-server
actor SDLUDPHole {
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: UDPMessage.self, bufferingPolicy: .unbounded)
private var cookieGenerator = SDLIdGenerator(seed: 1)
private var promises: [UInt32:EventLoopPromise<SDLStunProbeReply>] = [:]
public var localAddress: SocketAddress?
public let (eventFlow, eventContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded)
struct UDPMessage { struct UDPMessage {
let remoteAddress: SocketAddress let remoteAddress: SocketAddress
let type: SDLPacketType let type: SDLPacketType
let data: Data let data: Data
} }
// sn-server
actor SDLUDPHoleActor {
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>
private let (writeStream, continuation) = AsyncStream.makeStream(of: UDPMessage.self, bufferingPolicy: .unbounded)
private var cookieGenerator = SDLIdGenerator(seed: 1)
private var promises: [UInt32:EventLoopPromise<SDLStunProbeReply>] = [:]
public var localAddress: SocketAddress?
public let (eventFlow, inboundContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded)
// //
enum UDPEvent { enum UDPEvent {
case ready case ready
case closed
case message(SocketAddress, SDLHoleInboundMessage) case message(SocketAddress, SDLHoleInboundMessage)
case data(SDLData) case data(SDLData)
} }
@ -54,15 +55,12 @@ actor SDLUDPHole {
func start() async throws { func start() async throws {
try await self.asyncChannel.executeThenClose { inbound, outbound in try await self.asyncChannel.executeThenClose { inbound, outbound in
self.eventContinuation.yield(.ready)
try await withThrowingTaskGroup(of: Void.self) { group in try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { group.addTask {
try await self.asyncChannel.channel.closeFuture.get() defer {
NSLog("[UDPHole] channel closed") self.inboundContinuation.finish()
} }
group.addTask {
for try await envelope in inbound { for try await envelope in inbound {
var buffer = envelope.data var buffer = envelope.data
let remoteAddress = envelope.remoteAddress let remoteAddress = envelope.remoteAddress
@ -71,12 +69,12 @@ actor SDLUDPHole {
switch message { switch message {
case .data(let data): case .data(let data):
SDLLogger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug) SDLLogger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug)
self.eventContinuation.yield(.data(data)) self.inboundContinuation.yield(.data(data))
case .stunProbeReply(let probeReply): case .stunProbeReply(let probeReply):
// //
await self.trigger(probeReply: probeReply) await self.trigger(probeReply: probeReply)
default: default:
self.eventContinuation.yield(.message(remoteAddress, message)) self.inboundContinuation.yield(.message(remoteAddress, message))
} }
} else { } else {
SDLLogger.log("[SDLUDPHole] decode message, get null", level: .warning) SDLLogger.log("[SDLUDPHole] decode message, get null", level: .warning)
@ -88,6 +86,10 @@ actor SDLUDPHole {
} }
group.addTask { group.addTask {
defer {
self.continuation.finish()
}
for try await message in self.writeStream { for try await message in self.writeStream {
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1) var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1)
buffer.writeBytes([message.type.rawValue]) buffer.writeBytes([message.type.rawValue])
@ -98,6 +100,8 @@ actor SDLUDPHole {
} }
} }
//eventFlow.send(.ready)
try await group.waitForAll() try await group.waitForAll()
} }
} }
@ -190,14 +194,14 @@ actor SDLUDPHole {
} }
// register // register
func sendRegister(remoteAddress: SocketAddress, networkId: UInt32, srcMac: Data, dst_mac: Data) { func sendRegister(context ctx: SDLContext, remoteAddress: SocketAddress, dst_mac: Data) {
var register = SDLRegister() var register = SDLRegister()
register.networkID = networkId register.networkID = ctx.devAddr.networkID
register.srcMac = srcMac register.srcMac = ctx.devAddr.mac
register.dstMac = dst_mac register.dstMac = dst_mac
if let packet = try? register.serializedData() { if let packet = try? register.serializedData() {
SDLLogger.log("[SDLUDPHole] SendRegister: \(remoteAddress), src_mac: \(LayerPacket.MacAddress.description(data: srcMac)), dst_mac: \(LayerPacket.MacAddress.description(data: dst_mac))", level: .debug) SDLLogger.log("[SDLUDPHole] SendRegister: \(remoteAddress), src_mac: \(LayerPacket.MacAddress.description(data: ctx.devAddr.mac)), dst_mac: \(LayerPacket.MacAddress.description(data: dst_mac))", level: .debug)
self.send(remoteAddress: remoteAddress, type: .register, data: packet) self.send(remoteAddress: remoteAddress, type: .register, data: packet)
} }
} }
@ -218,7 +222,7 @@ actor SDLUDPHole {
// //
private func send(remoteAddress: SocketAddress, type: SDLPacketType, data: Data) { private func send(remoteAddress: SocketAddress, type: SDLPacketType, data: Data) {
let message = UDPMessage(remoteAddress: remoteAddress, type: type, data: data) let message = UDPMessage(remoteAddress: remoteAddress, type: type, data: data)
self.writeContinuation.yield(message) self.continuation.yield(message)
} }
//--MARK: //--MARK:
@ -253,7 +257,6 @@ actor SDLUDPHole {
deinit { deinit {
try? self.group.syncShutdownGracefully() try? self.group.syncShutdownGracefully()
self.writeContinuation.finish()
self.eventContinuation.finish()
} }
} }