202 lines
7.8 KiB
Swift
202 lines
7.8 KiB
Swift
//
|
|
// SDLanServer.swift
|
|
// Tun
|
|
//
|
|
// Created by 安礼成 on 2024/1/31.
|
|
//
|
|
|
|
import Foundation
|
|
import NIOCore
|
|
import NIOPosix
|
|
import SwiftProtobuf
|
|
|
|
// 处理和sn-server服务器之间的通讯
|
|
actor SDLUDPHoleActor {
|
|
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
|
private let asyncChannel: NIOAsyncChannel<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>
|
|
private let (writeStream, writeContinuation) = AsyncStream.makeStream(of: UDPHoleOutboundMessage.self, bufferingPolicy: .unbounded)
|
|
|
|
public var localAddress: SocketAddress?
|
|
public let eventStream: AsyncStream<UDPHoleEvent>
|
|
private let eventContinuation: AsyncStream<UDPHoleEvent>.Continuation
|
|
|
|
private let logger: SDLLogger
|
|
|
|
struct UDPHoleOutboundMessage {
|
|
let remoteAddress: SocketAddress
|
|
let type: SDLPacketType
|
|
let data: Data
|
|
}
|
|
|
|
enum UDPHoleEvent {
|
|
case ready
|
|
case message(SocketAddress, SDLHoleInboundMessage)
|
|
}
|
|
|
|
// 启动函数
|
|
init(logger: SDLLogger) async throws {
|
|
self.logger = logger
|
|
|
|
(self.eventStream, self.eventContinuation) = AsyncStream.makeStream(of: UDPHoleEvent.self, bufferingPolicy: .unbounded)
|
|
|
|
let bootstrap = DatagramBootstrap(group: group)
|
|
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
|
|
|
self.asyncChannel = try await bootstrap.bind(host: "0.0.0.0", port: 0)
|
|
.flatMapThrowing { channel in
|
|
return try NIOAsyncChannel(wrappingChannelSynchronously: channel, configuration: .init(
|
|
inboundType: AddressedEnvelope<ByteBuffer>.self,
|
|
outboundType: AddressedEnvelope<ByteBuffer>.self
|
|
))
|
|
}
|
|
.get()
|
|
|
|
self.localAddress = self.asyncChannel.channel.localAddress
|
|
self.logger.log("[UDPHole] started and listening on: \(self.localAddress!)", level: .debug)
|
|
}
|
|
|
|
func start() async throws {
|
|
try await self.asyncChannel.executeThenClose {inbound, outbound in
|
|
self.eventContinuation.yield(.ready)
|
|
|
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
|
group.addTask {
|
|
defer {
|
|
self.logger.log("[SDLUDPHole] inbound closed", level: .warning)
|
|
}
|
|
|
|
for try await envelope in inbound {
|
|
try Task.checkCancellation()
|
|
|
|
var buffer = envelope.data
|
|
let remoteAddress = envelope.remoteAddress
|
|
do {
|
|
if let message = try Self.decode(buffer: &buffer) {
|
|
self.eventContinuation.yield(.message(remoteAddress, message))
|
|
} else {
|
|
self.logger.log("[SDLUDPHole] decode message, get null", level: .warning)
|
|
}
|
|
} catch let err {
|
|
self.logger.log("[SDLUDPHole] decode message, get error: \(err)", level: .warning)
|
|
throw err
|
|
}
|
|
}
|
|
}
|
|
|
|
group.addTask {
|
|
defer {
|
|
self.logger.log("[SDLUDPHole] outbound closed", level: .warning)
|
|
}
|
|
|
|
for await message in self.writeStream {
|
|
try Task.checkCancellation()
|
|
|
|
var buffer = self.asyncChannel.channel.allocator.buffer(capacity: message.data.count + 1)
|
|
buffer.writeBytes([message.type.rawValue])
|
|
buffer.writeBytes(message.data)
|
|
|
|
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: message.remoteAddress, data: buffer)
|
|
try await outbound.write(envelope)
|
|
}
|
|
}
|
|
|
|
if let _ = try await group.next() {
|
|
group.cancelAll()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func getLocalAddress() -> SocketAddress? {
|
|
return self.localAddress
|
|
}
|
|
|
|
// MARK: client-client apis
|
|
// 处理写入逻辑
|
|
func send(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) {
|
|
let message = UDPHoleOutboundMessage(remoteAddress: remoteAddress, type: type, data: data)
|
|
self.writeContinuation.yield(message)
|
|
}
|
|
|
|
//--MARK: 编解码器
|
|
private static func decode(buffer: inout ByteBuffer) throws -> SDLHoleInboundMessage? {
|
|
guard let type = buffer.readInteger(as: UInt8.self),
|
|
let packetType = SDLPacketType(rawValue: type),
|
|
let bytes = buffer.readBytes(length: buffer.readableBytes) else {
|
|
return nil
|
|
}
|
|
|
|
switch packetType {
|
|
case .data:
|
|
let dataPacket = try SDLData(serializedBytes: bytes)
|
|
return .data(dataPacket)
|
|
case .register:
|
|
let registerPacket = try SDLRegister(serializedBytes: bytes)
|
|
return .register(registerPacket)
|
|
case .registerAck:
|
|
let registerAck = try SDLRegisterAck(serializedBytes: bytes)
|
|
return .registerAck(registerAck)
|
|
case .stunReply:
|
|
let stunReply = try SDLStunReply(serializedBytes: bytes)
|
|
return .stunReply(stunReply)
|
|
case .stunProbeReply:
|
|
let stunProbeReply = try SDLStunProbeReply(serializedBytes: bytes)
|
|
return .stunProbeReply(stunProbeReply)
|
|
case .registerSuperAck:
|
|
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
|
|
let registerSuperAck = try? SDLRegisterSuperAck(serializedBytes: bytes) else {
|
|
return nil
|
|
}
|
|
return .registerSuperAck(registerSuperAck)
|
|
case .registerSuperNak:
|
|
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
|
|
let registerSuperNak = try? SDLRegisterSuperNak(serializedBytes: bytes) else {
|
|
return nil
|
|
}
|
|
return .registerSuperNak(registerSuperNak)
|
|
|
|
case .peerInfo:
|
|
guard let bytes = buffer.readBytes(length: buffer.readableBytes),
|
|
let peerInfo = try? SDLPeerInfo(serializedBytes: bytes) else {
|
|
return nil
|
|
}
|
|
|
|
return .peerInfo(peerInfo)
|
|
case .event:
|
|
guard let eventVal = buffer.readInteger(as: UInt8.self),
|
|
let event = SDLEventType(rawValue: eventVal),
|
|
let bytes = buffer.readBytes(length: buffer.readableBytes) else {
|
|
return nil
|
|
}
|
|
|
|
switch event {
|
|
case .natChanged:
|
|
guard let natChangedEvent = try? SDLNatChangedEvent(serializedBytes: bytes) else {
|
|
return nil
|
|
}
|
|
return .event(.natChanged(natChangedEvent))
|
|
case .sendRegister:
|
|
guard let sendRegisterEvent = try? SDLSendRegisterEvent(serializedBytes: bytes) else {
|
|
return nil
|
|
}
|
|
return .event(.sendRegister(sendRegisterEvent))
|
|
case .networkShutdown:
|
|
guard let networkShutdownEvent = try? SDLNetworkShutdownEvent(serializedBytes: bytes) else {
|
|
return nil
|
|
}
|
|
return .event(.networkShutdown(networkShutdownEvent))
|
|
}
|
|
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
deinit {
|
|
try? self.group.syncShutdownGracefully()
|
|
self.writeContinuation.finish()
|
|
self.eventContinuation.finish()
|
|
}
|
|
|
|
}
|