解决通讯模型的问题

This commit is contained in:
anlicheng 2026-01-28 18:53:03 +08:00
parent 047f5b90ec
commit d74bc61060
8 changed files with 120 additions and 217 deletions

View File

@ -13,7 +13,6 @@ import NIOPosix
@available(macOS 14, *)
actor SDLDNSClientActor {
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: Data.self, bufferingPolicy: .unbounded)
private var channel: Channel?
private let logger: SDLLogger
@ -54,7 +53,7 @@ actor SDLDNSClientActor {
deinit {
try? self.group.syncShutdownGracefully()
self.writeContinuation.finish()
self.packetContinuation.finish()
}
}

View File

@ -94,7 +94,7 @@ actor SDLNATProberActor {
}
/// UDP STUN
func handleProbeReply(from address: SocketAddress, reply: SDLStunProbeReply) async {
func handleProbeReply(reply: SDLStunProbeReply) async {
guard let session = self.sessions[reply.cookie] else {
return
}
@ -104,7 +104,7 @@ actor SDLNATProberActor {
// 退nat
if let step1 = session.replies[1] {
let localAddress = await self.udpHole.getLocalAddress()
if address == localAddress {
if reply.socketAddress() == localAddress {
finish(cookie: session.cookieId, .noNat)
return
}

View File

@ -0,0 +1,70 @@
//
// SDLNoticeClient.swift
// Tun
//
// Created by on 2024/5/20.
//
import Foundation
//
// SDLanServer.swift
// Tun
//
// Created by on 2024/1/31.
//
import Foundation
import NIOCore
import NIOPosix
// sn-server
actor SDLNoticeClientActor {
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private var channel: Channel?
private let remoteAddress: SocketAddress
private let logger: SDLLogger
//
init(noticePort: Int, logger: SDLLogger) throws {
self.logger = logger
self.remoteAddress = try! SocketAddress(ipAddress: "127.0.0.1", port: noticePort)
}
func start() throws {
let bootstrap = DatagramBootstrap(group: self.group)
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.channelInitializer { channel in
channel.pipeline.addHandler(SDLNoticeClientInboundHandler())
}
self.channel = try bootstrap.bind(host: "0.0.0.0", port: 0).wait()
self.logger.log("[SDLNoticeClient] started", level: .debug)
}
//
func send(data: Data) {
guard let channel = self.channel else {
return
}
let buf = channel.allocator.buffer(bytes: data)
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: self.remoteAddress, data: buf)
channel.eventLoop.execute {
channel.writeAndFlush(envelope, promise: nil)
}
}
deinit {
try? self.group.syncShutdownGracefully()
}
}
extension SDLNoticeClientActor {
private class SDLNoticeClientInboundHandler: ChannelInboundHandler {
typealias InboundIn = AddressedEnvelope<ByteBuffer>
}
}

View File

@ -138,9 +138,6 @@ extension SDLUDPHoleActor {
case .registerAck:
let registerAck = try SDLRegisterAck(serializedBytes: bytes)
return .registerAck(registerAck)
case .stunReply:
let stunReply = try SDLStunReply(serializedBytes: bytes)
return .stunReply(stunReply)
case .stunProbeReply:
let stunProbeReply = try SDLStunProbeReply(serializedBytes: bytes)
return .stunProbeReply(stunProbeReply)

View File

@ -65,7 +65,7 @@ public class SDLContext {
private var monitor: SDLNetworkMonitor?
// socket
private var noticeClient: SDLNoticeClient?
private var noticeClientActor: SDLNoticeClientActor?
//
private var flowTracer = SDLFlowTracerActor()
@ -137,14 +137,14 @@ public class SDLContext {
public func stop() async {
self.rootTask?.cancel()
self.udpHoleActor = nil
self.noticeClient = nil
self.noticeClientActor = nil
self.readTask?.cancel()
}
private func startNoticeClient() async throws {
self.noticeClient = try await SDLNoticeClient(noticePort: self.config.noticePort, logger: self.logger)
try await self.noticeClient?.start()
self.noticeClientActor = try SDLNoticeClientActor(noticePort: self.config.noticePort, logger: self.logger)
try await self.noticeClientActor?.start()
self.logger.log("[SDLContext] notice_client task cancel", level: .warning)
}
@ -274,10 +274,8 @@ public class SDLContext {
await self.puncherActor.handlePeerInfo(peerInfo: peerInfo)
case .event(let event):
try await self.handleEvent(event: event)
case .stunReply(let stunReply):
await self.handleStunReply(stunReply: stunReply)
case .stunProbeReply(_):
()
case .stunProbeReply(let probeReply):
await self.proberActor?.handleProbeReply(reply: probeReply)
case .data(let data):
try await self.handleData(data: data)
case .register(let register):
@ -296,7 +294,7 @@ public class SDLContext {
// tun
do {
let ipAddress = try await self.providerActor.setNetworkSettings(networkAddress: self.config.networkAddress, dnsServer: SDLDNSClientActor.Helper.dnsServer)
await self.noticeClient?.send(data: NoticeMessage.ipAdress(ip: ipAddress))
await self.noticeClientActor?.send(data: NoticeMessage.ipAdress(ip: ipAddress))
self.startReader()
} catch let err {
@ -316,11 +314,11 @@ public class SDLContext {
switch errorCode {
case .invalidToken, .nodeDisabled:
let alertNotice = NoticeMessage.alert(alert: errorMessage)
await self.noticeClient?.send(data: alertNotice)
await self.noticeClientActor?.send(data: alertNotice)
exit(-1)
case .noIpAddress, .networkFault, .internalFault:
let alertNotice = NoticeMessage.alert(alert: errorMessage)
await self.noticeClient?.send(data: alertNotice)
await self.noticeClientActor?.send(data: alertNotice)
}
self.logger.log("[SDLContext] Get a SuperNak message exit", level: .warning)
@ -346,18 +344,11 @@ public class SDLContext {
case .networkShutdown(let shutdownEvent):
let alertNotice = NoticeMessage.alert(alert: shutdownEvent.message)
await self.noticeClient?.send(data: alertNotice)
await self.noticeClientActor?.send(data: alertNotice)
exit(-1)
}
}
//// case .ready:
//// await self.puncherActor.setUDPHoleActor(udpHoleActor: self.udpHoleActor)
//// //
//// self.natType = await getNatType()
//// self.logger.log("[SDLContext] broadcast is: \(self.natType)", level: .debug)
//
private func handleRegister(remoteAddress: SocketAddress, register: SDLRegister) async throws {
let networkAddr = config.networkAddress
self.logger.log("register packet: \(register), network_address: \(networkAddr)", level: .debug)
@ -390,15 +381,6 @@ public class SDLContext {
}
}
private func handleStunReply(stunReply: SDLStunReply) async {
// let cookie = stunReply.cookie
// if cookie == self.lastCookie {
// // nat
// //self.natAddress = stunReply.natAddress
// self.logger.log("[SDLContext] get a stunReply: \(try! stunReply.jsonString())", level: .debug)
// }
}
private func handleData(data: SDLData) async throws {
let mac = LayerPacket.MacAddress(data: data.dstMac)
@ -470,46 +452,55 @@ public class SDLContext {
self.readTask = Task(priority: .high) {
repeat {
let packets = await self.providerActor.readPackets()
for packet in packets {
await self.dealPacket(data: packet)
}
let ipPackets = packets.compactMap { IPPacket($0) }
await self.batchProcessPackets(batchSize: 20, packets: ipPackets)
} while true
}
}
//
private func dealPacket(data: Data) async {
guard let packet = IPPacket(data) else {
return
// ip
private func batchProcessPackets(batchSize: Int, packets: [IPPacket]) async {
for startIndex in stride(from: 0, to: packets.count, by: batchSize) {
let endIndex = Swift.min(startIndex + batchSize, packets.count)
let chunkPackets = packets[startIndex..<endIndex]
await withTaskGroup(of: Void.self) { group in
for packet in chunkPackets {
group.addTask {
await self.dealPacket(packet: packet)
}
}
}
}
}
//
private func dealPacket(packet: IPPacket) async {
let networkAddr = self.config.networkAddress
if SDLDNSClientActor.Helper.isDnsRequestPacket(ipPacket: packet) {
let destIp = packet.header.destination_ip
self.logger.log("[DNSQuery] destIp: \(destIp), int: \(packet.header.destination.asIpAddress())", level: .debug)
await self.dnsClientActor?.forward(ipPacket: packet)
return
}
let dstIp = packet.header.destination
// , ip
if dstIp == networkAddr.ip {
let nePacket = NEPacket(data: packet.data, protocolFamily: 2)
await self.providerActor.writePackets(packets: [nePacket])
return
}
// arpmac
if let dstMac = await self.arpServer.query(ip: dstIp) {
await self.routeLayerPacket(dstMac: dstMac, type: .ipv4, data: packet.data)
}
else {
Task.detached {
let dstIp = packet.header.destination
// , ip
if dstIp == networkAddr.ip {
let nePacket = NEPacket(data: packet.data, protocolFamily: 2)
await self.providerActor.writePackets(packets: [nePacket])
return
}
// arpmac
if let dstMac = await self.arpServer.query(ip: dstIp) {
await self.routeLayerPacket(dstMac: dstMac, type: .ipv4, data: packet.data)
}
else {
self.logger.log("[SDLContext] dstIp: \(dstIp.asIpAddress()) arp query not found, broadcast", level: .debug)
// arp广
let arpReqeust = ARPPacket.arpRequest(senderIP: networkAddr.ip, senderMAC: networkAddr.mac, targetIP: dstIp)
await self.routeLayerPacket(dstMac: ARPPacket.broadcastMac , type: .arp, data: arpReqeust.marshal())
}
}
self.logger.log("[SDLContext] dstIp: \(dstIp.asIpAddress()) arp query not found, broadcast", level: .debug)
// arp广
let arpReqeust = ARPPacket.arpRequest(senderIP: networkAddr.ip, senderMAC: networkAddr.mac, targetIP: dstIp)
await self.routeLayerPacket(dstMac: ARPPacket.broadcastMac , type: .arp, data: arpReqeust.marshal())
}
}

View File

@ -313,22 +313,6 @@ struct SDLStunRequest: @unchecked Sendable {
fileprivate var _v6Info: SDLV6Info? = nil
}
struct SDLStunReply: Sendable {
// SwiftProtobuf.Message conformance is added in an extension below. See the
// `Message` and `Message+*Additions` files in the SwiftProtobuf library for
// methods supported on all messages.
var networkID: UInt32 = 0
var code: UInt32 = 0
var message: String = String()
var unknownFields = SwiftProtobuf.UnknownStorage()
init() {}
}
struct SDLData: @unchecked Sendable {
// SwiftProtobuf.Message conformance is added in an extension below. See the
// `Message` and `Message+*Additions` files in the SwiftProtobuf library for
@ -1155,50 +1139,6 @@ extension SDLStunRequest: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement
}
}
extension SDLStunReply: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding {
static let protoMessageName: String = "SDLStunReply"
static let _protobuf_nameMap: SwiftProtobuf._NameMap = [
1: .standard(proto: "network_id"),
2: .same(proto: "code"),
3: .same(proto: "message"),
]
mutating func decodeMessage<D: SwiftProtobuf.Decoder>(decoder: inout D) throws {
while let fieldNumber = try decoder.nextFieldNumber() {
// The use of inline closures is to circumvent an issue where the compiler
// allocates stack space for every case branch when no optimizations are
// enabled. https://github.com/apple/swift-protobuf/issues/1034
switch fieldNumber {
case 1: try { try decoder.decodeSingularUInt32Field(value: &self.networkID) }()
case 2: try { try decoder.decodeSingularUInt32Field(value: &self.code) }()
case 3: try { try decoder.decodeSingularStringField(value: &self.message) }()
default: break
}
}
}
func traverse<V: SwiftProtobuf.Visitor>(visitor: inout V) throws {
if self.networkID != 0 {
try visitor.visitSingularUInt32Field(value: self.networkID, fieldNumber: 1)
}
if self.code != 0 {
try visitor.visitSingularUInt32Field(value: self.code, fieldNumber: 2)
}
if !self.message.isEmpty {
try visitor.visitSingularStringField(value: self.message, fieldNumber: 3)
}
try unknownFields.traverse(visitor: &visitor)
}
static func ==(lhs: SDLStunReply, rhs: SDLStunReply) -> Bool {
if lhs.networkID != rhs.networkID {return false}
if lhs.code != rhs.code {return false}
if lhs.message != rhs.message {return false}
if lhs.unknownFields != rhs.unknownFields {return false}
return true
}
}
extension SDLData: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding {
static let protoMessageName: String = "SDLData"
static let _protobuf_nameMap: SwiftProtobuf._NameMap = [

View File

@ -27,10 +27,6 @@ enum SDLPacketType: UInt8 {
//
case event = 0x10
// ,
case command = 0x11
case commandAck = 0x12
//
case flowTracer = 0x15
@ -38,7 +34,6 @@ enum SDLPacketType: UInt8 {
case registerAck = 0x21
case stunRequest = 0x30
case stunReply = 0x31
case stunProbe = 0x32
case stunProbeReply = 0x33
@ -106,7 +101,6 @@ enum SDLHoleInboundMessage {
case peerInfo(SDLPeerInfo)
case event(SDLEvent)
case stunReply(SDLStunReply)
case stunProbeReply(SDLStunProbeReply)
case data(SDLData)

View File

@ -1,88 +0,0 @@
//
// SDLNoticeClient.swift
// Tun
//
// Created by on 2024/5/20.
//
import Foundation
//
// SDLanServer.swift
// Tun
//
// Created by on 2024/1/31.
//
import Foundation
import NIOCore
import NIOPosix
// sn-server
actor SDLNoticeClient {
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>
private let remoteAddress: SocketAddress
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: Data.self, bufferingPolicy: .unbounded)
private let logger: SDLLogger
//
init(noticePort: Int, logger: SDLLogger) async throws {
self.logger = logger
self.remoteAddress = try! SocketAddress(ipAddress: "127.0.0.1", port: noticePort)
let bootstrap = DatagramBootstrap(group: self.group)
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
self.asyncChannel = try await bootstrap.bind(host: "0.0.0.0", port: 0)
.flatMapThrowing {channel in
return try NIOAsyncChannel(wrappingChannelSynchronously: channel, configuration: .init(
inboundType: AddressedEnvelope<ByteBuffer>.self,
outboundType: AddressedEnvelope<ByteBuffer>.self
))
}
.get()
self.logger.log("[SDLNoticeClient] started and listening on: \(self.asyncChannel.channel.localAddress!)", level: .debug)
}
func start() async throws {
try await self.asyncChannel.executeThenClose { inbound, outbound in
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await self.asyncChannel.channel.closeFuture.get()
throw SDLError.socketClosed
}
group.addTask {
defer {
self.writeContinuation.finish()
}
for try await message in self.writeStream {
let buf = self.asyncChannel.channel.allocator.buffer(bytes: message)
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: self.remoteAddress, data: buf)
try await outbound.write(envelope)
}
}
for try await _ in group {
}
}
}
}
//
func send(data: Data) {
self.writeContinuation.yield(data)
}
deinit {
try? self.group.syncShutdownGracefully()
self.writeContinuation.finish()
}
}