punchnet-macos/Tun/Punchnet/UDPHole/SDLUDPHole.swift

164 lines
5.2 KiB
Swift
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//
// SDLanServer.swift
// Tun
//
// Created by on 2024/1/31.
//
import Foundation
import NIOCore
import NIOPosix
import SwiftProtobuf
// sn-server
final class SDLUDPHole: ChannelInboundHandler {
typealias InboundIn = AddressedEnvelope<ByteBuffer>
private enum State: Equatable {
case idle
case ready
case stopping
case stopped
}
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private var channel: Channel?
private var closeFuture: EventLoopFuture<Void>?
private var state: State = .idle
private var didFinishMessageStream: Bool = false
public let messageStream: AsyncStream<(SocketAddress, SDLHoleMessage)>
private let messageContinuation: AsyncStream<(SocketAddress, SDLHoleMessage)>.Continuation
//
init() throws {
let (stream, continuation) = AsyncStream.makeStream(of: (SocketAddress, SDLHoleMessage).self, bufferingPolicy: .bufferingNewest(2048))
self.messageStream = stream
self.messageContinuation = continuation
}
func start() throws -> SocketAddress {
let bootstrap = DatagramBootstrap(group: group)
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.channelInitializer { channel in
channel.pipeline.addHandler(self)
}
// IPv4IPv4
let channel = try bootstrap.bind(host: "0.0.0.0", port: 0).wait()
self.channel = channel
self.closeFuture = channel.closeFuture
self.state = .ready
precondition(channel.localAddress != nil, "UDP channel has no localAddress after bind")
return channel.localAddress!
}
func waitClose() async throws {
switch self.state {
case .idle:
SDLLogger.log("[SDLUDPHole] waitClose11", for: .debug)
return
case .ready, .stopping, .stopped:
guard let closeFuture = self.closeFuture else {
SDLLogger.log("[SDLUDPHole] waitClose22", for: .debug)
return
}
try await closeFuture.get()
SDLLogger.log("[SDLUDPHole] waitClose33", for: .debug)
}
}
func stop() {
SDLLogger.log("[SDLUDPHole] waitClose stop", for: .debug)
switch self.state {
case .stopping, .stopped:
return
case .idle:
self.state = .stopped
self.finishMessageStream()
return
case .ready:
self.state = .stopping
}
self.finishMessageStream()
self.channel?.close(promise: nil)
}
// --MARK: ChannelInboundHandler delegate
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
guard case .ready = self.state else {
return
}
let envelope = unwrapInboundIn(data)
var buffer = envelope.data
let remoteAddress = envelope.remoteAddress
if let rawBytes = buffer.getBytes(at: buffer.readerIndex, length: buffer.readableBytes) {
SDLLogger.log("[SDLUDPHole] get raw bytes: \(rawBytes.count), from: \(remoteAddress)", for: .debug)
}
do {
if let message = try SDLHoleMessage.decode(buffer: &buffer) {
self.messageContinuation.yield((remoteAddress, message))
} else {
SDLLogger.log("[SDLUDPHole] decode message, get null", for: .debug)
}
} catch let err {
SDLLogger.log("[SDLUDPHole] decode message, get error: \(err)", for: .debug)
}
}
func channelInactive(context: ChannelHandlerContext) {
self.finishMessageStream()
self.channel = nil
self.state = .stopped
SDLLogger.log("[SDLUDPHole] channelInactive", for: .debug)
}
func errorCaught(context: ChannelHandlerContext, error: any Error) {
SDLLogger.log("[SDLUDPHole] channel error: \(error)", for: .debug)
self.finishMessageStream()
if self.state != .stopped {
self.state = .stopping
}
context.close(promise: nil)
SDLLogger.log("[SDLUDPHole] errorCaught", for: .debug)
}
// MARK:
func send(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) {
guard case .ready = self.state, let channel = self.channel else {
return
}
var buffer = channel.allocator.buffer(capacity: data.count + 1)
buffer.writeBytes([type.rawValue])
buffer.writeBytes(data)
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: remoteAddress, data: buffer)
_ = channel.eventLoop.submit {
channel.writeAndFlush(envelope, promise: nil)
}
}
private func finishMessageStream() {
guard !self.didFinishMessageStream else {
return
}
self.didFinishMessageStream = true
self.messageContinuation.finish()
}
deinit {
SDLLogger.log("[SDLUDPHole] closeWait deinit", for: .debug)
self.stop()
try? self.group.syncShutdownGracefully()
}
}