punchnet-macos/Tun/Punchnet/Actors/SDLUDPHoleActor.swift
2026-01-07 16:29:03 +08:00

211 lines
8.5 KiB
Swift

//
// SDLanServer.swift
// Tun
//
// Created by on 2024/1/31.
//
import Foundation
import NIOCore
import NIOPosix
// sn-server
actor SDLUDPHoleActor {
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: UDPMessage.self, bufferingPolicy: .unbounded)
private var cookieGenerator = SDLIdGenerator(seed: 1)
private var promises: [UInt32:EventLoopPromise<SDLStunProbeReply>] = [:]
public var localAddress: SocketAddress?
public let eventFlow: AsyncStream<UDPEvent>
private let eventContinuation: AsyncStream<UDPEvent>.Continuation
private let logger: SDLLogger
//
struct Capabilities {
let logger: @Sendable (String) async -> Void
}
struct UDPMessage {
let remoteAddress: SocketAddress
let type: SDLPacketType
let data: Data
}
//
enum UDPEvent {
case ready
case message(SocketAddress, SDLHoleInboundMessage)
case data(SDLData)
}
//
init(logger: SDLLogger) async throws {
self.logger = logger
(self.eventFlow, self.eventContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded)
let bootstrap = DatagramBootstrap(group: group)
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
self.asyncChannel = try await bootstrap.bind(host: "0.0.0.0", port: 0)
.flatMapThrowing { channel in
return try NIOAsyncChannel(wrappingChannelSynchronously: channel, configuration: .init(
inboundType: AddressedEnvelope<ByteBuffer>.self,
outboundType: AddressedEnvelope<ByteBuffer>.self
))
}
.get()
self.localAddress = self.asyncChannel.channel.localAddress
self.logger.log("[UDPHole] started and listening on: \(self.localAddress!)", level: .debug)
}
func start() async throws {
try await withTaskCancellationHandler {
try await self.asyncChannel.executeThenClose {inbound, outbound in
self.eventContinuation.yield(.ready)
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
defer {
self.logger.log("[SDLUDPHole] inbound closed", level: .warning)
}
for try await envelope in inbound {
try Task.checkCancellation()
var buffer = envelope.data
let remoteAddress = envelope.remoteAddress
do {
if let message = try Self.decode(buffer: &buffer) {
switch message {
case .data(let data):
self.logger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug)
self.eventContinuation.yield(.data(data))
case .stunProbeReply(let probeReply):
//
await self.trigger(probeReply: probeReply)
default:
self.eventContinuation.yield(.message(remoteAddress, message))
}
} else {
self.logger.log("[SDLUDPHole] decode message, get null", level: .warning)
}
} catch let err {
self.logger.log("[SDLUDPHole] decode message, get error: \(err)", level: .warning)
throw err
}
}
}
group.addTask {
defer {
self.logger.log("[SDLUDPHole] outbound closed", level: .warning)
}
for await message in self.writeStream {
try Task.checkCancellation()
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1)
buffer.writeBytes([message.type.rawValue])
buffer.writeBytes(message.data)
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: message.remoteAddress, data: buffer)
try await outbound.write(envelope)
}
}
if let _ = try await group.next() {
group.cancelAll()
}
}
}
} onCancel: {
self.writeContinuation.finish()
self.eventContinuation.finish()
self.logger.log("[SDLUDPHole] withTaskCancellationHandler cancel")
}
}
func getCookieId() -> UInt32 {
return self.cookieGenerator.nextId()
}
// // tun
// func stunProbe(remoteAddress: SocketAddress, attr: SDLProbeAttr = .none, timeout: Int = 5) async throws -> SDLStunProbeReply {
// return try await self._stunProbe(remoteAddress: remoteAddress, attr: attr, timeout: timeout).get()
// }
//
// private func _stunProbe(remoteAddress: SocketAddress, attr: SDLProbeAttr = .none, timeout: Int) -> EventLoopFuture<SDLStunProbeReply> {
// let cookie = self.cookieGenerator.nextId()
// var stunProbe = SDLStunProbe()
// stunProbe.cookie = cookie
// stunProbe.attr = UInt32(attr.rawValue)
// self.send( type: .stunProbe, data: try! stunProbe.serializedData(), remoteAddress: remoteAddress)
// self.logger.log("[SDLUDPHole] stunProbe: \(remoteAddress)", level: .debug)
//
// let promise = self.asyncChannel.channel.eventLoop.makePromise(of: SDLStunProbeReply.self)
// self.promises[cookie] = promise
//
// return promise.futureResult
// }
private func trigger(probeReply: SDLStunProbeReply) {
let id = probeReply.cookie
//
if let promise = self.promises[id] {
self.asyncChannel.channel.eventLoop.execute {
promise.succeed(probeReply)
}
self.promises.removeValue(forKey: id)
}
}
// MARK: client-client apis
//
func send(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) {
let message = UDPMessage(remoteAddress: remoteAddress, type: type, data: data)
self.writeContinuation.yield(message)
}
//--MARK:
private static func decode(buffer: inout ByteBuffer) throws -> SDLHoleInboundMessage? {
guard let type = buffer.readInteger(as: UInt8.self),
let packetType = SDLPacketType(rawValue: type),
let bytes = buffer.readBytes(length: buffer.readableBytes) else {
return nil
}
switch packetType {
case .data:
let dataPacket = try SDLData(serializedBytes: bytes)
return .data(dataPacket)
case .register:
let registerPacket = try SDLRegister(serializedBytes: bytes)
return .register(registerPacket)
case .registerAck:
let registerAck = try SDLRegisterAck(serializedBytes: bytes)
return .registerAck(registerAck)
case .stunReply:
let stunReply = try SDLStunReply(serializedBytes: bytes)
return .stunReply(stunReply)
case .stunProbeReply:
let stunProbeReply = try SDLStunProbeReply(serializedBytes: bytes)
return .stunProbeReply(stunProbeReply)
default:
return nil
}
}
deinit {
try? self.group.syncShutdownGracefully()
self.writeContinuation.finish()
self.eventContinuation.finish()
}
}