add udp async actor

This commit is contained in:
anlicheng 2025-08-01 00:21:12 +08:00
parent ee6362a254
commit aae0b333de
4 changed files with 266 additions and 4 deletions

View File

@ -1,5 +1,5 @@
{ {
"originHash" : "c6ae4911d55046c288e0ecdc9de28a95dcb591d3daaa03914664a3d2e069977a", "originHash" : "597c0437584251872938aab4e6fd3cc1fc28e6da31c045adee36661e9c049c5e",
"pins" : [ "pins" : [
{ {
"identity" : "swift-atomics", "identity" : "swift-atomics",
@ -24,8 +24,8 @@
"kind" : "remoteSourceControl", "kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio.git", "location" : "https://github.com/apple/swift-nio.git",
"state" : { "state" : {
"revision" : "ad6b5f17270a7008f60d35ec5378e6144a575162", "revision" : "a5fea865badcb1c993c85b0f0e8d05a4bd2270fb",
"version" : "2.84.0" "version" : "2.85.0"
} }
}, },
{ {

View File

@ -16,7 +16,7 @@ let package = Package(
targets: ["Punchnet"]), targets: ["Punchnet"]),
], ],
dependencies: [ dependencies: [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.84.0"), .package(url: "https://github.com/apple/swift-nio.git", exact: "2.85.0"),
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.26.0") .package(url: "https://github.com/apple/swift-protobuf.git", from: "1.26.0")
], ],
@ -28,6 +28,7 @@ let package = Package(
dependencies: [ dependencies: [
.product(name: "NIOCore", package: "swift-nio"), .product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOPosix", package: "swift-nio"), .product(name: "NIOPosix", package: "swift-nio"),
.product(name: "NIOFoundationCompat", package: "swift-nio"),
.product(name: "SwiftProtobuf", package: "swift-protobuf") .product(name: "SwiftProtobuf", package: "swift-protobuf")
] ]
), ),

View File

@ -147,6 +147,7 @@ class SDLUDPHole: ChannelInboundHandler, @unchecked Sendable {
// //
func start() async throws { func start() async throws {
let bootstrap = DatagramBootstrap(group: self.group) let bootstrap = DatagramBootstrap(group: self.group)
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.channelInitializer { channel in .channelInitializer { channel in

View File

@ -0,0 +1,260 @@
//
// SDLanServer.swift
// Tun
//
// Created by on 2024/1/31.
//
import Foundation
import NIOCore
import NIOPosix
struct UDPMessage {
let remoteAddress: SocketAddress
let type: SDLPacketType
let data: Data
}
// sn-server
actor SDLUDPHoleActor {
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>
private let (writeStream, continuation) = AsyncStream.makeStream(of: UDPMessage.self, bufferingPolicy: .unbounded)
private var cookieGenerator = SDLIdGenerator(seed: 1)
private var promises: [UInt32:EventLoopPromise<SDLStunProbeReply>] = [:]
public var localAddress: SocketAddress?
public let (eventFlow, inboundContinuation) = AsyncStream.makeStream(of: UDPEvent.self, bufferingPolicy: .unbounded)
//
enum UDPEvent {
case ready
case closed
case message(SocketAddress, SDLHoleInboundMessage)
case data(SDLData)
}
//
init() async throws {
let bootstrap = DatagramBootstrap(group: group)
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
self.asyncChannel = try await bootstrap.bind(host: "0.0.0.0", port: 0)
.flatMapThrowing { channel in
return try NIOAsyncChannel(wrappingChannelSynchronously: channel, configuration: .init(
inboundType: AddressedEnvelope<ByteBuffer>.self,
outboundType: AddressedEnvelope<ByteBuffer>.self
))
}
.get()
self.localAddress = self.asyncChannel.channel.localAddress
SDLLogger.log("[UDPHole] started and listening on: \(self.localAddress!)", level: .debug)
try await self.asyncChannel.executeThenClose { inbound, outbound in
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
defer {
self.inboundContinuation.finish()
}
for try await envelope in inbound {
var buffer = envelope.data
let remoteAddress = envelope.remoteAddress
do {
if let message = try Self.decode(buffer: &buffer) {
switch message {
case .data(let data):
SDLLogger.log("[SDLUDPHole] read data: \(data.format()), from: \(remoteAddress)", level: .debug)
self.inboundContinuation.yield(.data(data))
case .stunProbeReply(let probeReply):
//
await self.trigger(probeReply: probeReply)
default:
self.inboundContinuation.yield(.message(remoteAddress, message))
}
} else {
SDLLogger.log("[SDLUDPHole] decode message, get null", level: .warning)
}
} catch let err {
SDLLogger.log("[SDLUDPHole] decode message, get error: \(err)", level: .debug)
}
}
}
group.addTask {
defer {
self.continuation.finish()
}
for try await message in self.writeStream {
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1)
buffer.writeBytes([message.type.rawValue])
buffer.writeBytes(message.data)
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: message.remoteAddress, data: buffer)
try await outbound.write(envelope)
}
}
//eventFlow.send(.ready)
try await group.waitForAll()
}
}
}
// MARK: super_node apis
func stunRequest(context ctx: SDLContext) -> UInt32 {
let cookie = self.cookieGenerator.nextId()
let remoteAddress = ctx.config.stunSocketAddress
var stunRequest = SDLStunRequest()
stunRequest.cookie = cookie
stunRequest.clientID = ctx.config.clientId
stunRequest.networkID = ctx.devAddr.networkID
stunRequest.ip = ctx.devAddr.netAddr
stunRequest.mac = ctx.devAddr.mac
stunRequest.natType = UInt32(ctx.natType.rawValue)
SDLLogger.log("[SDLUDPHole] stunRequest: \(remoteAddress), host: \(ctx.config.stunServers[0].host):\(ctx.config.stunServers[0].ports[0])", level: .warning)
self.send(remoteAddress: remoteAddress, type: .stunRequest, data: try! stunRequest.serializedData())
return cookie
}
// tun
func stunProbe(remoteAddress: SocketAddress, attr: SDLProbeAttr = .none, timeout: Int = 5) async throws -> SDLStunProbeReply {
return try await self._stunProbe(remoteAddress: remoteAddress, attr: attr, timeout: timeout).get()
}
private func _stunProbe(remoteAddress: SocketAddress, attr: SDLProbeAttr = .none, timeout: Int) -> EventLoopFuture<SDLStunProbeReply> {
let cookie = self.cookieGenerator.nextId()
var stunProbe = SDLStunProbe()
stunProbe.cookie = cookie
stunProbe.attr = UInt32(attr.rawValue)
self.send(remoteAddress: remoteAddress, type: .stunProbe, data: try! stunProbe.serializedData())
SDLLogger.log("[SDLUDPHole] stunProbe: \(remoteAddress)", level: .warning)
let promise = self.asyncChannel.channel.eventLoop.makePromise(of: SDLStunProbeReply.self)
self.promises[cookie] = promise
return promise.futureResult
}
private func trigger(probeReply: SDLStunProbeReply) {
let id = probeReply.cookie
//
if let promise = self.promises[id] {
self.asyncChannel.channel.eventLoop.execute {
promise.succeed(probeReply)
}
self.promises.removeValue(forKey: id)
}
}
// MARK: client-client apis
// session
func sendPacket(context ctx: SDLContext, session: Session, data: Data) {
let remoteAddress = session.natAddress
var dataPacket = SDLData()
dataPacket.networkID = ctx.devAddr.networkID
dataPacket.srcMac = ctx.devAddr.mac
dataPacket.dstMac = session.dstMac
dataPacket.ttl = 255
dataPacket.data = data
if let packet = try? dataPacket.serializedData() {
SDLLogger.log("[SDLUDPHole] sendPacket: \(remoteAddress), count: \(packet.count)", level: .debug)
self.send(remoteAddress: remoteAddress, type: .data, data: packet)
}
}
// sn, data
func forwardPacket(context ctx: SDLContext, dst_mac: Data, data: Data) {
let remoteAddress = ctx.config.stunSocketAddress
var dataPacket = SDLData()
dataPacket.networkID = ctx.devAddr.networkID
dataPacket.srcMac = ctx.devAddr.mac
dataPacket.dstMac = dst_mac
dataPacket.ttl = 255
dataPacket.data = data
if let packet = try? dataPacket.serializedData() {
NSLog("[SDLContext] forward packet, remoteAddress: \(remoteAddress), data size: \(packet.count)")
self.send(remoteAddress: remoteAddress, type: .data, data: packet)
}
}
// register
func sendRegister(context ctx: SDLContext, remoteAddress: SocketAddress, dst_mac: Data) {
var register = SDLRegister()
register.networkID = ctx.devAddr.networkID
register.srcMac = ctx.devAddr.mac
register.dstMac = dst_mac
if let packet = try? register.serializedData() {
SDLLogger.log("[SDLUDPHole] SendRegister: \(remoteAddress), src_mac: \(LayerPacket.MacAddress.description(data: ctx.devAddr.mac)), dst_mac: \(LayerPacket.MacAddress.description(data: dst_mac))", level: .debug)
self.send(remoteAddress: remoteAddress, type: .register, data: packet)
}
}
// registerAck
func sendRegisterAck(context ctx: SDLContext, remoteAddress: SocketAddress, dst_mac: Data) {
var registerAck = SDLRegisterAck()
registerAck.networkID = ctx.devAddr.networkID
registerAck.srcMac = ctx.devAddr.mac
registerAck.dstMac = dst_mac
if let packet = try? registerAck.serializedData() {
SDLLogger.log("[SDLUDPHole] SendRegisterAck: \(remoteAddress), \(registerAck)", level: .debug)
self.send(remoteAddress: remoteAddress, type: .registerAck, data: packet)
}
}
//
private func send(remoteAddress: SocketAddress, type: SDLPacketType, data: Data) {
let message = UDPMessage(remoteAddress: remoteAddress, type: type, data: data)
self.continuation.yield(message)
}
//--MARK:
private static func decode(buffer: inout ByteBuffer) throws -> SDLHoleInboundMessage? {
guard let type = buffer.readInteger(as: UInt8.self),
let packetType = SDLPacketType(rawValue: type),
let bytes = buffer.readBytes(length: buffer.readableBytes) else {
SDLLogger.log("[SDLUDPHole] decode error", level: .error)
return nil
}
switch packetType {
case .data:
let dataPacket = try SDLData(serializedBytes: bytes)
return .data(dataPacket)
case .register:
let registerPacket = try SDLRegister(serializedBytes: bytes)
return .register(registerPacket)
case .registerAck:
let registerAck = try SDLRegisterAck(serializedBytes: bytes)
return .registerAck(registerAck)
case .stunReply:
let stunReply = try SDLStunReply(serializedBytes: bytes)
return .stunReply(stunReply)
case .stunProbeReply:
let stunProbeReply = try SDLStunProbeReply(serializedBytes: bytes)
return .stunProbeReply(stunProbeReply)
default:
return nil
}
}
deinit {
try? self.group.syncShutdownGracefully()
}
}