This commit is contained in:
anlicheng 2025-08-01 12:01:37 +08:00
parent b55d9913cc
commit d1452ce0b7
5 changed files with 64 additions and 70 deletions

View File

@ -70,7 +70,7 @@ public class SDLContext: @unchecked Sendable {
private var monitorCancel: AnyCancellable? private var monitorCancel: AnyCancellable?
// socket // socket
private var noticeClient: SDLNoticeClient private var noticeClient: SDLNoticeClient?
// //
private var flowTracer = SDLFlowTracerActor() private var flowTracer = SDLFlowTracerActor()
@ -90,13 +90,13 @@ public class SDLContext: @unchecked Sendable {
self.sessionManager = SessionManager() self.sessionManager = SessionManager()
self.holerManager = HolerManager() self.holerManager = HolerManager()
self.arpServer = ArpServer(known_macs: [:]) self.arpServer = ArpServer(known_macs: [:])
self.noticeClient = SDLNoticeClient()
} }
public func start() async throws { public func start() async throws {
self.udpHoleActor = try await SDLUDPHoleActor() self.udpHoleActor = try await SDLUDPHoleActor()
self.superClientActor = try await SDLSuperClientActor(host: self.config.superHost, port: self.config.superPort) self.superClientActor = try await SDLSuperClientActor(host: self.config.superHost, port: self.config.superPort)
self.noticeClient = try await SDLNoticeClient()
try await withThrowingTaskGroup(of: Void.self) { group in try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { group.addTask {
try await self.udpHoleActor?.start() try await self.udpHoleActor?.start()
@ -106,6 +106,10 @@ public class SDLContext: @unchecked Sendable {
try await self.superClientActor?.start() try await self.superClientActor?.start()
} }
group.addTask {
try await self.noticeClient?.start()
}
group.addTask { group.addTask {
if let eventFlow = self.superClientActor?.eventFlow { if let eventFlow = self.superClientActor?.eventFlow {
for try await event in eventFlow { for try await event in eventFlow {
@ -123,10 +127,16 @@ public class SDLContext: @unchecked Sendable {
} }
group.addTask {
while !Task.isCancelled {
try await Task.sleep(nanoseconds: 5 * 1_000_000_000)
self.lastCookie = await self.udpHoleActor?.stunRequest(context: self)
}
}
try await group.waitForAll() try await group.waitForAll()
} }
// self.noticeClient.start()
// // // //
// self.monitorCancel = self.monitor.eventFlow.sink { event in // self.monitorCancel = self.monitor.eventFlow.sink { event in
// switch event { // switch event {
@ -169,7 +179,7 @@ public class SDLContext: @unchecked Sendable {
if upgradeType == .force { if upgradeType == .force {
let forceUpgrade = NoticeMessage.UpgradeMessage(prompt: registerSuperAck.upgradePrompt, address: registerSuperAck.upgradeAddress) let forceUpgrade = NoticeMessage.UpgradeMessage(prompt: registerSuperAck.upgradePrompt, address: registerSuperAck.upgradeAddress)
self.noticeClient.send(data: forceUpgrade.binaryData) await self.noticeClient?.send(data: forceUpgrade.binaryData)
exit(-1) exit(-1)
} }
@ -179,7 +189,7 @@ public class SDLContext: @unchecked Sendable {
if upgradeType == .normal { if upgradeType == .normal {
let normalUpgrade = NoticeMessage.UpgradeMessage(prompt: registerSuperAck.upgradePrompt, address: registerSuperAck.upgradeAddress) let normalUpgrade = NoticeMessage.UpgradeMessage(prompt: registerSuperAck.upgradePrompt, address: registerSuperAck.upgradeAddress)
self.noticeClient.send(data: normalUpgrade.binaryData) await self.noticeClient?.send(data: normalUpgrade.binaryData)
} }
case .registerSuperNak(let nakPacket): case .registerSuperNak(let nakPacket):
@ -191,11 +201,11 @@ public class SDLContext: @unchecked Sendable {
switch errorCode { switch errorCode {
case .invalidToken, .nodeDisabled: case .invalidToken, .nodeDisabled:
let alertNotice = NoticeMessage.AlertMessage(alert: errorMessage) let alertNotice = NoticeMessage.AlertMessage(alert: errorMessage)
self.noticeClient.send(data: alertNotice.binaryData) await self.noticeClient?.send(data: alertNotice.binaryData)
exit(-1) exit(-1)
case .noIpAddress, .networkFault, .internalFault: case .noIpAddress, .networkFault, .internalFault:
let alertNotice = NoticeMessage.AlertMessage(alert: errorMessage) let alertNotice = NoticeMessage.AlertMessage(alert: errorMessage)
self.noticeClient.send(data: alertNotice.binaryData) await self.noticeClient?.send(data: alertNotice.binaryData)
} }
NSLog("[SDLContext] Get a SuperNak message exit") NSLog("[SDLContext] Get a SuperNak message exit")
default: default:
@ -226,7 +236,7 @@ public class SDLContext: @unchecked Sendable {
case .networkShutdown(let shutdownEvent): case .networkShutdown(let shutdownEvent):
let alertNotice = NoticeMessage.AlertMessage(alert: shutdownEvent.message) let alertNotice = NoticeMessage.AlertMessage(alert: shutdownEvent.message)
self.noticeClient.send(data: alertNotice.binaryData) await self.noticeClient?.send(data: alertNotice.binaryData)
exit(-1) exit(-1)
} }
case .command(let packetId, let command): case .command(let packetId, let command):
@ -267,14 +277,9 @@ public class SDLContext: @unchecked Sendable {
switch event { switch event {
case .ready: case .ready:
// //
self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config) //self.natType = await SDLNatProber.getNatType(udpHole: self.udpHole, config: self.config)
SDLLogger.log("[SDLContext] nat type is: \(self.natType)", level: .debug) SDLLogger.log("[SDLContext] nat type is: \(self.natType)", level: .debug)
let timer = Timer.publish(every: 5.0, on: .main, in: .common).autoconnect()
// self.stunCancel = Just(Date()).merge(with: timer).sink { _ in
// self.lastCookie = await self.udpHoleActor?.stunRequest(context: self)
// }
case .closed: case .closed:
DispatchQueue.main.asyncAfter(deadline: .now() + 5) { DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
Task { Task {
@ -289,7 +294,7 @@ public class SDLContext: @unchecked Sendable {
// tun, // tun,
if register.dstMac == self.devAddr.mac && register.networkID == self.devAddr.networkID { if register.dstMac == self.devAddr.mac && register.networkID == self.devAddr.networkID {
// ack // ack
self.udpHoleActor?.sendRegisterAck(context: self, remoteAddress: remoteAddress, dst_mac: register.srcMac) await self.udpHoleActor?.sendRegisterAck(context: self, remoteAddress: remoteAddress, dst_mac: register.srcMac)
// , super-nodenatudpnat // , super-nodenatudpnat
let session = Session(dstMac: register.srcMac, natAddress: remoteAddress) let session = Session(dstMac: register.srcMac, natAddress: remoteAddress)
await self.sessionManager.addSession(session: session) await self.sessionManager.addSession(session: session)

View File

@ -54,7 +54,7 @@ enum SDLUpgradeType: UInt32 {
} }
// Id // Id
struct SDLIdGenerator { struct SDLIdGenerator: Sendable {
// id // id
private var packetId: UInt32 private var packetId: UInt32

View File

@ -22,7 +22,7 @@ struct SDLNatProber {
} }
// nat // nat
static func getNatType(udpHole: SDLUDPHole?, config: SDLConfiguration) async -> NatType { static func getNatType(udpHole: SDLUDPHoleActor?, config: SDLConfiguration) async -> NatType {
guard let udpHole else { guard let udpHole else {
return .blocked return .blocked
} }
@ -34,7 +34,7 @@ struct SDLNatProber {
} }
// nat // nat
if natAddress1 == udpHole.localAddress { if await natAddress1 == udpHole.localAddress {
return .noNat return .noNat
} }
@ -67,8 +67,8 @@ struct SDLNatProber {
} }
} }
private static func getNatAddress(_ udpHole: SDLUDPHole, remoteAddress: SocketAddress, attr: SDLProbeAttr) async -> SocketAddress? { private static func getNatAddress(_ udpHole: SDLUDPHoleActor, remoteAddress: SocketAddress, attr: SDLProbeAttr) async -> SocketAddress? {
let stunProbeReply = await udpHole.stunProbe(remoteAddress: remoteAddress, attr: attr, timeout: 5) let stunProbeReply = try? await udpHole.stunProbe(remoteAddress: remoteAddress, attr: attr, timeout: 5)
return stunProbeReply?.socketAddress() return stunProbeReply?.socketAddress()
} }

View File

@ -15,72 +15,59 @@ import Foundation
// //
import Foundation import Foundation
@preconcurrency import NIOCore import NIOCore
import NIOPosix import NIOPosix
// sn-server // sn-server
class SDLNoticeClient: ChannelInboundHandler, @unchecked Sendable { actor SDLNoticeClient {
public typealias InboundIn = AddressedEnvelope<ByteBuffer>
public typealias OutboundOut = AddressedEnvelope<ByteBuffer>
var channel: Channel?
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>
private let remoteAddress: SocketAddress private let remoteAddress: SocketAddress
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: Data.self, bufferingPolicy: .unbounded)
init() {
self.remoteAddress = try! SocketAddress(ipAddress: "127.0.0.1", port: 50195)
}
// //
func start() { init() async throws {
self.remoteAddress = try! SocketAddress(ipAddress: "127.0.0.1", port: 50195)
let bootstrap = DatagramBootstrap(group: self.group) let bootstrap = DatagramBootstrap(group: self.group)
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.channelInitializer { channel in
// self.asyncChannel = try await bootstrap.bind(host: "0.0.0.0", port: 0)
channel.pipeline.addHandler(self) .flatMapThrowing {channel in
return try NIOAsyncChannel(wrappingChannelSynchronously: channel, configuration: .init(
inboundType: AddressedEnvelope<ByteBuffer>.self,
outboundType: AddressedEnvelope<ByteBuffer>.self
))
} }
.get()
self.channel = try! bootstrap.bind(host: "0.0.0.0", port: 0).wait() SDLLogger.log("[SDLNoticeClient] started and listening on: \(self.asyncChannel.channel.localAddress!)", level: .debug)
SDLLogger.log("[SDLNoticeClient] started and listening on: \(self.channel?.localAddress!)", level: .debug)
} }
// -- MARK: ChannelInboundHandler Methods func start() async throws {
try await self.asyncChannel.executeThenClose { inbound, outbound in
public func channelActive(context: ChannelHandlerContext) { try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
} defer {
self.writeContinuation.finish()
// , }
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
context.fireChannelRead(data) for try await message in self.writeStream {
} let buf = self.asyncChannel.channel.allocator.buffer(bytes: message)
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: self.remoteAddress, data: buf)
public func errorCaught(context: ChannelHandlerContext, error: Error) {
// As we are not really interested getting notified on success or failure we just pass nil as promise to try await outbound.write(envelope)
// reduce allocations. }
context.close(promise: nil) }
self.channel = nil
} try await group.waitForAll()
}
public func channelInactive(context: ChannelHandlerContext) { }
self.channel = nil
context.close(promise: nil)
} }
// //
func send(data: Data) { func send(data: Data) {
guard let channel = self.channel else { self.writeContinuation.yield(data)
return
}
let remoteAddress = self.remoteAddress
let allocator = channel.allocator
channel.eventLoop.execute { [allocator] in
let buffer = allocator.buffer(bytes: data)
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: remoteAddress, data: buffer)
channel.writeAndFlush(self.wrapOutboundOut(envelope), promise: nil)
}
} }
deinit { deinit {

View File

@ -100,6 +100,8 @@ actor SDLUDPHoleActor {
} }
} }
//eventFlow.send(.ready) //eventFlow.send(.ready)
try await group.waitForAll() try await group.waitForAll()