remove superClient

This commit is contained in:
anlicheng 2026-01-27 21:53:09 +08:00
parent 20993dd923
commit 21b8585d3c
5 changed files with 776 additions and 1183 deletions

View File

@ -1,312 +0,0 @@
//
// SDLWebsocketClient.swift
// Tun
//
// Created by on 2024/3/28.
//
import Foundation
import NIOCore
import NIOPosix
// --MARK: SuperNode
actor SDLSuperClientActor {
//
private typealias TcpMessage = (packetId: UInt32, type: SDLPacketType, data: Data)
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private let asyncChannel: NIOAsyncChannel<ByteBuffer,ByteBuffer>
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: TcpMessage.self, bufferingPolicy: .unbounded)
private var continuations: [UInt32:CheckedContinuation<SDLSuperInboundMessage, Error>] = [:]
public let eventFlow: AsyncStream<SuperEvent>
private let inboundContinuation: AsyncStream<SuperEvent>.Continuation
// id
var idGenerator = SDLIdGenerator(seed: 1)
private let logger: SDLLogger
//
enum SuperEvent {
case ready
case event(SDLEvent)
case command(UInt32, SDLCommand)
}
enum SuperClientError: Error {
case timeout
case connectionClosed
case cancelled
}
init(host: String, port: Int, logger: SDLLogger) async throws {
self.logger = logger
(self.eventFlow, self.inboundContinuation) = AsyncStream.makeStream(of: SuperEvent.self, bufferingPolicy: .unbounded)
let bootstrap = ClientBootstrap(group: self.group)
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.channelInitializer { channel in
return channel.pipeline.addHandlers([
ByteToMessageHandler(FixedHeaderDecoder()),
MessageToByteHandler(FixedHeaderEncoder())
])
}
self.asyncChannel = try await bootstrap.connect(host: host, port: port)
.flatMapThrowing { channel in
return try NIOAsyncChannel(wrappingChannelSynchronously: channel, configuration: .init(
inboundType: ByteBuffer.self,
outboundType: ByteBuffer.self
))
}
.get()
}
func start() async throws {
try await withTaskCancellationHandler {
try await self.asyncChannel.executeThenClose { inbound, outbound in
self.inboundContinuation.yield(.ready)
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
defer {
self.logger.log("[SDLSuperClient] inbound closed", level: .warning)
}
for try await var packet in inbound {
try Task.checkCancellation()
if let message = SDLSuperClientDecoder.decode(buffer: &packet) {
if !message.isPong() {
self.logger.log("[SDLSuperClient] read message: \(message)", level: .debug)
}
switch message.packet {
case .event(let event):
self.inboundContinuation.yield(.event(event))
case .command(let command):
self.inboundContinuation.yield(.command(message.msgId, command))
default:
await self.fireCallback(message: message)
}
}
}
}
group.addTask {
defer {
self.logger.log("[SDLSuperClient] outbound closed", level: .warning)
}
for await (packetId, type, data) in self.writeStream {
try Task.checkCancellation()
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: data.count + 5)
buffer.writeInteger(packetId, as: UInt32.self)
buffer.writeBytes([type.rawValue])
buffer.writeBytes(data)
try await outbound.write(buffer)
}
}
// --MARK:
group.addTask {
defer {
self.logger.log("[SDLSuperClient] ping task closed", level: .warning)
}
while true {
try Task.checkCancellation()
await self.ping()
try await Task.sleep(nanoseconds: 5 * 1_000_000_000)
}
}
// 退,
if let _ = try await group.next() {
group.cancelAll()
}
}
}
} onCancel: {
self.inboundContinuation.finish()
self.writeContinuation.finish()
self.logger.log("[SDLSuperClient] withTaskCancellationHandler cancel")
Task {
await self.failAllContinuations(SuperClientError.cancelled)
}
}
}
// -- MARK: apis
func unregister() throws {
self.send(type: .unregisterSuper, packetId: 0, data: Data())
}
private func ping() {
self.send(type: .ping, packetId: 0, data: Data())
}
func request(type: SDLPacketType, data: Data, timeout: Duration = .seconds(5)) async throws -> SDLSuperInboundMessage {
let packetId = idGenerator.nextId()
return try await withCheckedThrowingContinuation { cont in
self.continuations[packetId] = cont
self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data))
Task {
try? await Task.sleep(for: timeout)
self.timeout(packetId: packetId)
}
}
}
func send(type: SDLPacketType, packetId: UInt32, data: Data) {
self.writeContinuation.yield(TcpMessage(packetId: packetId, type: type, data: data))
}
//
private func fireCallback(message: SDLSuperInboundMessage) {
guard let cont = self.continuations.removeValue(forKey: message.msgId) else {
return
}
cont.resume(returning: message)
}
private func failAllContinuations(_ error: Error) {
let all = continuations
continuations.removeAll()
for (_, cont) in all {
cont.resume(throwing: error)
}
}
private func timeout(packetId: UInt32) {
guard let cont = self.continuations.removeValue(forKey: packetId) else {
return
}
cont.resume(throwing: SuperClientError.timeout)
}
deinit {
try! group.syncShutdownGracefully()
}
}
// --MARK:
private struct SDLSuperClientDecoder {
// : <<MsgId:32, Type:8, Body/binary>>
static func decode(buffer: inout ByteBuffer) -> SDLSuperInboundMessage? {
guard let msgId = buffer.readInteger(as: UInt32.self),
let type = buffer.readInteger(as: UInt8.self),
let messageType = SDLPacketType(rawValue: type) else {
return nil
}
switch messageType {
case .empty:
return .init(msgId: msgId, packet: .empty)
case .registerSuperAck:
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let registerSuperAck = try? SDLRegisterSuperAck(serializedBytes: bytes) else {
return nil
}
return .init(msgId: msgId, packet: .registerSuperAck(registerSuperAck))
case .registerSuperNak:
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let registerSuperNak = try? SDLRegisterSuperNak(serializedBytes: bytes) else {
return nil
}
return .init(msgId: msgId, packet: .registerSuperNak(registerSuperNak))
case .peerInfo:
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let peerInfo = try? SDLPeerInfo(serializedBytes: bytes) else {
return nil
}
return .init(msgId: msgId, packet: .peerInfo(peerInfo))
case .pong:
return .init(msgId: msgId, packet: .pong)
case .command:
guard let commandVal = buffer.readInteger(as: UInt8.self),
let command = SDLCommandType(rawValue: commandVal),
let bytes = buffer.readBytes(length: buffer.readableBytes) else {
return nil
}
switch command {
case .changeNetwork:
guard let changeNetworkCommand = try? SDLChangeNetworkCommand(serializedBytes: bytes) else {
return nil
}
return .init(msgId: msgId, packet: .command(.changeNetwork(changeNetworkCommand)))
}
case .event:
guard let eventVal = buffer.readInteger(as: UInt8.self),
let event = SDLEventType(rawValue: eventVal),
let bytes = buffer.readBytes(length: buffer.readableBytes) else {
return nil
}
switch event {
case .natChanged:
guard let natChangedEvent = try? SDLNatChangedEvent(serializedBytes: bytes) else {
return nil
}
return .init(msgId: msgId, packet: .event(.natChanged(natChangedEvent)))
case .sendRegister:
guard let sendRegisterEvent = try? SDLSendRegisterEvent(serializedBytes: bytes) else {
return nil
}
return .init(msgId: msgId, packet: .event(.sendRegister(sendRegisterEvent)))
case .networkShutdown:
guard let networkShutdownEvent = try? SDLNetworkShutdownEvent(serializedBytes: bytes) else {
return nil
}
return .init(msgId: msgId, packet: .event(.networkShutdown(networkShutdownEvent)))
}
default:
return nil
}
}
}
private final class FixedHeaderEncoder: MessageToByteEncoder, @unchecked Sendable {
typealias InboundIn = ByteBuffer
typealias InboundOut = ByteBuffer
func encode(data: ByteBuffer, out: inout ByteBuffer) throws {
let len = data.readableBytes
out.writeInteger(UInt16(len))
out.writeBytes(data.readableBytesView)
}
}
private final class FixedHeaderDecoder: ByteToMessageDecoder, @unchecked Sendable {
typealias InboundIn = ByteBuffer
typealias InboundOut = ByteBuffer
func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
guard let len = buffer.getInteger(at: buffer.readerIndex, endianness: .big, as: UInt16.self) else {
return .needMoreData
}
if buffer.readableBytes >= len + 2 {
buffer.moveReaderIndex(forwardBy: 2)
if let bytes = buffer.readBytes(length: Int(len)) {
context.fireChannelRead(self.wrapInboundOut(ByteBuffer(bytes: bytes)))
}
return .continue
} else {
return .needMoreData
}
}
}

View File

@ -11,6 +11,8 @@ import NIOPosix
// sn-server // sn-server
actor SDLUDPHoleActor { actor SDLUDPHoleActor {
typealias HoleMessage = (SocketAddress, SDLHoleInboundMessage)
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>> private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: UDPMessage.self, bufferingPolicy: .unbounded) private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: UDPMessage.self, bufferingPolicy: .unbounded)
@ -19,35 +21,22 @@ actor SDLUDPHoleActor {
private var promises: [UInt32:EventLoopPromise<SDLStunProbeReply>] = [:] private var promises: [UInt32:EventLoopPromise<SDLStunProbeReply>] = [:]
public var localAddress: SocketAddress? public var localAddress: SocketAddress?
public let eventFlow: AsyncStream<UDPEvent> public let messageStream: AsyncStream<HoleMessage>
private let eventContinuation: AsyncStream<UDPEvent>.Continuation private let messageContinuation: AsyncStream<HoleMessage>.Continuation
private let logger: SDLLogger private let logger: SDLLogger
//
struct Capabilities {
let logger: @Sendable (String) async -> Void
}
struct UDPMessage { struct UDPMessage {
let remoteAddress: SocketAddress let remoteAddress: SocketAddress
let type: SDLPacketType let type: SDLPacketType
let data: Data let data: Data
} }
//
enum UDPEvent {
case ready
case message(SocketAddress, SDLHoleInboundMessage)
case data(SDLData)
}
// //
init(logger: SDLLogger) async throws { init(logger: SDLLogger) async throws {
self.logger = logger self.logger = logger
(self.eventFlow, self.eventContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded) (self.messageStream, self.messageContinuation) = AsyncStream.makeStream(of: HoleMessage.self, bufferingPolicy: .unbounded)
let bootstrap = DatagramBootstrap(group: group) let bootstrap = DatagramBootstrap(group: group)
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
@ -68,7 +57,6 @@ actor SDLUDPHoleActor {
func start() async throws { func start() async throws {
try await withTaskCancellationHandler { try await withTaskCancellationHandler {
try await self.asyncChannel.executeThenClose {inbound, outbound in try await self.asyncChannel.executeThenClose {inbound, outbound in
self.eventContinuation.yield(.ready)
try await withThrowingTaskGroup(of: Void.self) { group in try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { group.addTask {
defer { defer {
@ -85,12 +73,12 @@ actor SDLUDPHoleActor {
switch message { switch message {
case .data(let data): case .data(let data):
self.logger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug) self.logger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug)
self.eventContinuation.yield(.data(data)) self.messageContinuation.yield((remoteAddress, .data(data)))
case .stunProbeReply(let probeReply): case .stunProbeReply(let probeReply):
// //
await self.trigger(probeReply: probeReply) await self.trigger(probeReply: probeReply)
default: default:
self.eventContinuation.yield(.message(remoteAddress, message)) ()
} }
} else { } else {
self.logger.log("[SDLUDPHole] decode message, get null", level: .warning) self.logger.log("[SDLUDPHole] decode message, get null", level: .warning)
@ -126,7 +114,7 @@ actor SDLUDPHoleActor {
} }
} onCancel: { } onCancel: {
self.writeContinuation.finish() self.writeContinuation.finish()
self.eventContinuation.finish() self.messageContinuation.finish()
self.logger.log("[SDLUDPHole] withTaskCancellationHandler cancel") self.logger.log("[SDLUDPHole] withTaskCancellationHandler cancel")
} }
} }
@ -196,6 +184,52 @@ actor SDLUDPHoleActor {
case .stunProbeReply: case .stunProbeReply:
let stunProbeReply = try SDLStunProbeReply(serializedBytes: bytes) let stunProbeReply = try SDLStunProbeReply(serializedBytes: bytes)
return .stunProbeReply(stunProbeReply) return .stunProbeReply(stunProbeReply)
case .registerSuperAck:
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let registerSuperAck = try? SDLRegisterSuperAck(serializedBytes: bytes) else {
return nil
}
return .registerSuperAck(registerSuperAck)
case .registerSuperNak:
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let registerSuperNak = try? SDLRegisterSuperNak(serializedBytes: bytes) else {
return nil
}
return .registerSuperNak(registerSuperNak)
case .peerInfo:
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
let peerInfo = try? SDLPeerInfo(serializedBytes: bytes) else {
return nil
}
return .peerInfo(peerInfo)
case .event:
guard let eventVal = buffer.readInteger(as: UInt8.self),
let event = SDLEventType(rawValue: eventVal),
let bytes = buffer.readBytes(length: buffer.readableBytes) else {
return nil
}
switch event {
case .natChanged:
guard let natChangedEvent = try? SDLNatChangedEvent(serializedBytes: bytes) else {
return nil
}
return .event(.natChanged(natChangedEvent))
case .sendRegister:
guard let sendRegisterEvent = try? SDLSendRegisterEvent(serializedBytes: bytes) else {
return nil
}
return .event(.sendRegister(sendRegisterEvent))
case .networkShutdown:
guard let networkShutdownEvent = try? SDLNetworkShutdownEvent(serializedBytes: bytes) else {
return nil
}
return .event(.networkShutdown(networkShutdownEvent))
}
default: default:
return nil return nil
} }
@ -204,7 +238,7 @@ actor SDLUDPHoleActor {
deinit { deinit {
try? self.group.syncShutdownGracefully() try? self.group.syncShutdownGracefully()
self.writeContinuation.finish() self.writeContinuation.finish()
self.eventContinuation.finish() self.messageContinuation.finish()
} }
} }

View File

@ -49,7 +49,6 @@ public class SDLContext {
// //
var udpHoleActor: SDLUDPHoleActor? var udpHoleActor: SDLUDPHoleActor?
var superClientActor: SDLSuperClientActor?
var providerActor: SDLTunnelProviderActor var providerActor: SDLTunnelProviderActor
var puncherActor: SDLPuncherActor var puncherActor: SDLPuncherActor
// dnsclient // dnsclient
@ -156,7 +155,6 @@ public class SDLContext {
public func stop() async { public func stop() async {
self.rootTask?.cancel() self.rootTask?.cancel()
self.superClientActor = nil
self.udpHoleActor = nil self.udpHoleActor = nil
self.noticeClient = nil self.noticeClient = nil
@ -215,31 +213,6 @@ public class SDLContext {
} }
private func startSuperClient() async throws {
self.superClientActor = try await SDLSuperClientActor(host: self.config.superHost, port: self.config.superPort, logger: self.logger)
try await withThrowingTaskGroup(of: Void.self) { group in
defer {
self.logger.log("[SDLContext] super client task cancel", level: .warning)
}
group.addTask {
try await self.superClientActor?.start()
}
group.addTask {
if let eventFlow = self.superClientActor?.eventFlow {
for try await event in eventFlow {
try await self.handleSuperEvent(event: event)
}
}
}
if let _ = try await group.next() {
group.cancelAll()
}
}
}
private func startMonitor() async { private func startMonitor() async {
self.monitor = SDLNetworkMonitor() self.monitor = SDLNetworkMonitor()
for await event in self.monitor!.eventStream { for await event in self.monitor!.eventStream {

File diff suppressed because it is too large Load Diff

View File

@ -46,13 +46,6 @@ enum SDLPacketType: UInt8 {
case data = 0xFF case data = 0xFF
} }
//
enum SDLUpgradeType: UInt32 {
case none = 0
case normal = 1
case force = 2
}
// Id // Id
struct SDLIdGenerator: Sendable { struct SDLIdGenerator: Sendable {
// id // id
@ -71,29 +64,6 @@ struct SDLIdGenerator: Sendable {
// //
//
enum SDLEventType: UInt8 {
case natChanged = 0x03
case sendRegister = 0x04
case networkShutdown = 0xFF
}
enum SDLEvent {
case natChanged(SDLNatChangedEvent)
case sendRegister(SDLSendRegisterEvent)
case networkShutdown(SDLNetworkShutdownEvent)
}
// --MARK:
enum SDLCommandType: UInt8 {
case changeNetwork = 0x01
}
enum SDLCommand {
case changeNetwork(SDLChangeNetworkCommand)
}
// --MARK: // --MARK:
// Attr // Attr
enum SDLProbeAttr: UInt8 { enum SDLProbeAttr: UInt8 {
@ -130,6 +100,14 @@ extension SDLStunProbeReply {
// --MARK: , // --MARK: ,
enum SDLHoleInboundMessage { enum SDLHoleInboundMessage {
case registerSuperAck(SDLRegisterSuperAck)
case registerSuperNak(SDLRegisterSuperNak)
case peerInfo(SDLPeerInfo)
case pong
case event(SDLEvent)
case stunReply(SDLStunReply) case stunReply(SDLStunReply)
case stunProbeReply(SDLStunProbeReply) case stunProbeReply(SDLStunProbeReply)
@ -138,29 +116,15 @@ enum SDLHoleInboundMessage {
case registerAck(SDLRegisterAck) case registerAck(SDLRegisterAck)
} }
// --MARK: //
enum SDLEventType: UInt8 {
struct SDLSuperInboundMessage { case natChanged = 0x03
let msgId: UInt32 case sendRegister = 0x04
let packet: InboundPacket case networkShutdown = 0xFF
enum InboundPacket {
case empty
case registerSuperAck(SDLRegisterSuperAck)
case registerSuperNak(SDLRegisterSuperNak)
case peerInfo(SDLPeerInfo)
case pong
case event(SDLEvent)
case command(SDLCommand)
}
func isPong() -> Bool {
switch self.packet {
case .pong:
return true
default:
return false
}
} }
enum SDLEvent {
case natChanged(SDLNatChangedEvent)
case sendRegister(SDLSendRegisterEvent)
case networkShutdown(SDLNetworkShutdownEvent)
} }