fix udpHole
This commit is contained in:
parent
d15240a3a7
commit
2abef3d0bf
@ -52,7 +52,7 @@ actor SDLNATProberActor {
|
|||||||
|
|
||||||
// MARK: - Dependencies
|
// MARK: - Dependencies
|
||||||
|
|
||||||
private let udpHole: SDLUDPHoleActor
|
private let udpHole: SDLUDPHole
|
||||||
private let addressArray: [[SocketAddress]]
|
private let addressArray: [[SocketAddress]]
|
||||||
private let logger: SDLLogger
|
private let logger: SDLLogger
|
||||||
|
|
||||||
@ -63,7 +63,7 @@ actor SDLNATProberActor {
|
|||||||
|
|
||||||
// MARK: - Init
|
// MARK: - Init
|
||||||
|
|
||||||
init(udpHole: SDLUDPHoleActor, addressArray: [[SocketAddress]], logger: SDLLogger) {
|
init(udpHole: SDLUDPHole, addressArray: [[SocketAddress]], logger: SDLLogger) {
|
||||||
self.udpHole = udpHole
|
self.udpHole = udpHole
|
||||||
self.addressArray = addressArray
|
self.addressArray = addressArray
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
|
|||||||
@ -13,7 +13,7 @@ actor SDLPuncherActor {
|
|||||||
private var coolingDown: Set<Data> = []
|
private var coolingDown: Set<Data> = []
|
||||||
private let cooldown: Duration = .seconds(5)
|
private let cooldown: Duration = .seconds(5)
|
||||||
|
|
||||||
private var udpHoleActor: SDLUDPHoleActor?
|
private var udpHole: SDLUDPHole?
|
||||||
|
|
||||||
private var pktId: UInt32 = 1
|
private var pktId: UInt32 = 1
|
||||||
// 提交后还没有响应的请求
|
// 提交后还没有响应的请求
|
||||||
@ -34,8 +34,8 @@ actor SDLPuncherActor {
|
|||||||
self.logger = logger
|
self.logger = logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func setUDPHoleActor(udpHoleActor: SDLUDPHoleActor?) {
|
func setUDPHoleActor(udpHole: SDLUDPHole?) {
|
||||||
self.udpHoleActor = udpHoleActor
|
self.udpHole = udpHole
|
||||||
}
|
}
|
||||||
|
|
||||||
func submitRegisterRequest(request: RegisterRequest) {
|
func submitRegisterRequest(request: RegisterRequest) {
|
||||||
@ -73,7 +73,7 @@ actor SDLPuncherActor {
|
|||||||
register.srcMac = request.srcMac
|
register.srcMac = request.srcMac
|
||||||
register.dstMac = request.dstMac
|
register.dstMac = request.dstMac
|
||||||
|
|
||||||
await self.udpHoleActor?.send(type: .register, data: try! register.serializedData(), remoteAddress: remoteAddress)
|
self.udpHole?.send(type: .register, data: try! register.serializedData(), remoteAddress: remoteAddress)
|
||||||
} else {
|
} else {
|
||||||
self.logger.log("[SDLContext] hole sock address is invalid: \(peerInfo.v4Info)", level: .warning)
|
self.logger.log("[SDLContext] hole sock address is invalid: \(peerInfo.v4Info)", level: .warning)
|
||||||
}
|
}
|
||||||
@ -95,7 +95,7 @@ actor SDLPuncherActor {
|
|||||||
self.pendingRequests[pktId] = request
|
self.pendingRequests[pktId] = request
|
||||||
|
|
||||||
if let queryData = try? queryInfo.serializedData() {
|
if let queryData = try? queryInfo.serializedData() {
|
||||||
await self.udpHoleActor?.send(type: .queryInfo, data: queryData, remoteAddress: self.querySocketAddress)
|
self.udpHole?.send(type: .queryInfo, data: queryData, remoteAddress: self.querySocketAddress)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -19,7 +19,7 @@ import NIOPosix
|
|||||||
import SwiftProtobuf
|
import SwiftProtobuf
|
||||||
|
|
||||||
// 处理和sn-server服务器之间的通讯
|
// 处理和sn-server服务器之间的通讯
|
||||||
actor SDLUDPHoleActor {
|
final class SDLUDPHole {
|
||||||
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||||
private var channel: Channel?
|
private var channel: Channel?
|
||||||
|
|
||||||
@ -77,7 +77,7 @@ actor SDLUDPHoleActor {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
extension SDLUDPHoleActor {
|
extension SDLUDPHole {
|
||||||
|
|
||||||
final class SDLUDPHoleHandler: ChannelInboundHandler {
|
final class SDLUDPHoleHandler: ChannelInboundHandler {
|
||||||
typealias InboundIn = AddressedEnvelope<ByteBuffer>
|
typealias InboundIn = AddressedEnvelope<ByteBuffer>
|
||||||
@ -43,7 +43,7 @@ public class SDLContext {
|
|||||||
let rsaCipher: RSACipher
|
let rsaCipher: RSACipher
|
||||||
|
|
||||||
// 依赖的变量
|
// 依赖的变量
|
||||||
var udpHoleActor: SDLUDPHoleActor?
|
var udpHole: SDLUDPHole?
|
||||||
var providerActor: SDLTunnelProviderActor
|
var providerActor: SDLTunnelProviderActor
|
||||||
var puncherActor: SDLPuncherActor
|
var puncherActor: SDLPuncherActor
|
||||||
// dns的client对象
|
// dns的client对象
|
||||||
@ -88,8 +88,8 @@ public class SDLContext {
|
|||||||
|
|
||||||
public func start() async throws {
|
public func start() async throws {
|
||||||
// 启动udp服务器
|
// 启动udp服务器
|
||||||
self.udpHoleActor = try SDLUDPHoleActor(logger: self.logger)
|
self.udpHole = try SDLUDPHole(logger: self.logger)
|
||||||
try await self.udpHoleActor?.start()
|
try self.udpHole?.start()
|
||||||
self.logger.log("[SDLContext] udpHole started")
|
self.logger.log("[SDLContext] udpHole started")
|
||||||
|
|
||||||
// 启动dns服务
|
// 启动dns服务
|
||||||
@ -120,10 +120,12 @@ public class SDLContext {
|
|||||||
}
|
}
|
||||||
|
|
||||||
group.addTask {
|
group.addTask {
|
||||||
if let eventStream = self.udpHoleActor?.eventStream {
|
if let eventStream = self.udpHole?.eventStream {
|
||||||
for try await event in eventStream {
|
for try await event in eventStream {
|
||||||
try Task.checkCancellation()
|
try Task.checkCancellation()
|
||||||
try? await self.dispatchEvent(event: event)
|
Task {
|
||||||
|
try await self.dispatchEvent(event: event)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -160,35 +162,43 @@ public class SDLContext {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public func stop() async {
|
public func stop() async {
|
||||||
self.udpHoleActor = nil
|
self.udpHole = nil
|
||||||
self.noticeClientActor = nil
|
self.noticeClientActor = nil
|
||||||
|
|
||||||
self.readTask?.cancel()
|
self.readTask?.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
private func handleUDPHoleReady() async throws {
|
private func handleUDPHoleReady() async throws {
|
||||||
await self.puncherActor.setUDPHoleActor(udpHoleActor: self.udpHoleActor)
|
await self.puncherActor.setUDPHoleActor(udpHole: self.udpHole)
|
||||||
|
|
||||||
// 开始探测nat的类型
|
await withTaskGroup(of: Void.self) { group in
|
||||||
if let udpHoleActor = self.udpHoleActor {
|
group.addTask {
|
||||||
self.proberActor = SDLNATProberActor(udpHole: udpHoleActor, addressArray: self.config.stunProbeSocketAddressArray, logger: self.logger)
|
// 开始探测nat的类型
|
||||||
self.natType = await self.proberActor!.probeNatType()
|
if let udpHoleActor = self.udpHole {
|
||||||
|
self.proberActor = SDLNATProberActor(udpHole: udpHoleActor, addressArray: self.config.stunProbeSocketAddressArray, logger: self.logger)
|
||||||
|
self.natType = await self.proberActor!.probeNatType()
|
||||||
|
self.logger.log("[SDLContext] nat_type is: \(self.natType)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
group.addTask {
|
||||||
|
var registerSuper = SDLRegisterSuper()
|
||||||
|
registerSuper.pktID = 0
|
||||||
|
registerSuper.clientID = self.config.clientId
|
||||||
|
registerSuper.networkID = self.config.networkAddress.networkId
|
||||||
|
registerSuper.mac = self.config.networkAddress.mac
|
||||||
|
registerSuper.ip = self.config.networkAddress.ip
|
||||||
|
registerSuper.maskLen = UInt32(self.config.networkAddress.maskLen)
|
||||||
|
registerSuper.hostname = self.config.hostname
|
||||||
|
registerSuper.pubKey = self.rsaCipher.pubKey
|
||||||
|
registerSuper.accessToken = self.config.accessToken
|
||||||
|
|
||||||
|
if let registerSuperData = try? registerSuper.serializedData() {
|
||||||
|
self.logger.log("[SDLContext] will send register super")
|
||||||
|
self.udpHole?.send(type: .registerSuper, data: registerSuperData, remoteAddress: self.config.stunSocketAddress)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var registerSuper = SDLRegisterSuper()
|
|
||||||
registerSuper.pktID = 0
|
|
||||||
registerSuper.clientID = self.config.clientId
|
|
||||||
registerSuper.networkID = self.config.networkAddress.networkId
|
|
||||||
registerSuper.mac = self.config.networkAddress.mac
|
|
||||||
registerSuper.ip = self.config.networkAddress.ip
|
|
||||||
registerSuper.maskLen = UInt32(self.config.networkAddress.maskLen)
|
|
||||||
registerSuper.hostname = self.config.hostname
|
|
||||||
registerSuper.pubKey = self.rsaCipher.pubKey
|
|
||||||
registerSuper.accessToken = self.config.accessToken
|
|
||||||
|
|
||||||
self.logger.log("will send register super")
|
|
||||||
|
|
||||||
await self.udpHoleActor?.send(type: .registerSuper, data: try registerSuper.serializedData(), remoteAddress: self.config.stunSocketAddress)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private func sendStunRequest() async {
|
private func sendStunRequest() async {
|
||||||
@ -201,11 +211,11 @@ public class SDLContext {
|
|||||||
|
|
||||||
if let stunData = try? stunRequest.serializedData() {
|
if let stunData = try? stunRequest.serializedData() {
|
||||||
let remoteAddress = self.config.stunSocketAddress
|
let remoteAddress = self.config.stunSocketAddress
|
||||||
await self.udpHoleActor?.send(type: .stunRequest, data: stunData, remoteAddress: remoteAddress)
|
self.udpHole?.send(type: .stunRequest, data: stunData, remoteAddress: remoteAddress)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func dispatchEvent(event: SDLUDPHoleActor.UDPHoleEvent) async throws {
|
private func dispatchEvent(event: SDLUDPHole.UDPHoleEvent) async throws {
|
||||||
switch event {
|
switch event {
|
||||||
case .ready:
|
case .ready:
|
||||||
try await self.handleUDPHoleReady()
|
try await self.handleUDPHoleReady()
|
||||||
@ -284,7 +294,7 @@ public class SDLContext {
|
|||||||
register.networkID = self.config.networkAddress.networkId
|
register.networkID = self.config.networkAddress.networkId
|
||||||
register.srcMac = self.config.networkAddress.mac
|
register.srcMac = self.config.networkAddress.mac
|
||||||
register.dstMac = sendRegisterEvent.dstMac
|
register.dstMac = sendRegisterEvent.dstMac
|
||||||
await self.udpHoleActor?.send(type: .register, data: try register.serializedData(), remoteAddress: remoteAddress)
|
self.udpHole?.send(type: .register, data: try register.serializedData(), remoteAddress: remoteAddress)
|
||||||
}
|
}
|
||||||
|
|
||||||
case .networkShutdown(let shutdownEvent):
|
case .networkShutdown(let shutdownEvent):
|
||||||
@ -306,7 +316,7 @@ public class SDLContext {
|
|||||||
registerAck.srcMac = networkAddr.mac
|
registerAck.srcMac = networkAddr.mac
|
||||||
registerAck.dstMac = register.srcMac
|
registerAck.dstMac = register.srcMac
|
||||||
|
|
||||||
await self.udpHoleActor?.send(type: .registerAck, data: try registerAck.serializedData(), remoteAddress: remoteAddress)
|
self.udpHole?.send(type: .registerAck, data: try registerAck.serializedData(), remoteAddress: remoteAddress)
|
||||||
// 这里需要建立到来源的会话, 在复杂网络下,通过super-node查询到的nat地址不一定靠谱,需要通过udp包的来源地址作为nat地址
|
// 这里需要建立到来源的会话, 在复杂网络下,通过super-node查询到的nat地址不一定靠谱,需要通过udp包的来源地址作为nat地址
|
||||||
let session = Session(dstMac: register.srcMac, natAddress: remoteAddress)
|
let session = Session(dstMac: register.srcMac, natAddress: remoteAddress)
|
||||||
await self.sessionManager.addSession(session: session)
|
await self.sessionManager.addSession(session: session)
|
||||||
@ -397,8 +407,10 @@ public class SDLContext {
|
|||||||
self.readTask = Task(priority: .high) {
|
self.readTask = Task(priority: .high) {
|
||||||
repeat {
|
repeat {
|
||||||
let packets = await self.providerActor.readPackets()
|
let packets = await self.providerActor.readPackets()
|
||||||
let ipPackets = packets.compactMap { IPPacket($0) }
|
Task {
|
||||||
await self.batchProcessPackets(batchSize: 20, packets: ipPackets)
|
let ipPackets = packets.compactMap { IPPacket($0) }
|
||||||
|
await self.batchProcessPackets(batchSize: 20, packets: ipPackets)
|
||||||
|
}
|
||||||
} while true
|
} while true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -469,18 +481,18 @@ public class SDLContext {
|
|||||||
// 广播地址不要去尝试打洞
|
// 广播地址不要去尝试打洞
|
||||||
if ARPPacket.isBroadcastMac(dstMac) {
|
if ARPPacket.isBroadcastMac(dstMac) {
|
||||||
// 通过super_node进行转发
|
// 通过super_node进行转发
|
||||||
await self.udpHoleActor?.send(type: .data, data: data, remoteAddress: self.config.stunSocketAddress)
|
self.udpHole?.send(type: .data, data: data, remoteAddress: self.config.stunSocketAddress)
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// 通过session发送到对端
|
// 通过session发送到对端
|
||||||
if let session = await self.sessionManager.getSession(toAddress: dstMac) {
|
if let session = await self.sessionManager.getSession(toAddress: dstMac) {
|
||||||
self.logger.log("[SDLContext] send packet by session: \(session)", level: .debug)
|
self.logger.log("[SDLContext] send packet by session: \(session)", level: .debug)
|
||||||
await self.udpHoleActor?.send(type: .data, data: data, remoteAddress: session.natAddress)
|
self.udpHole?.send(type: .data, data: data, remoteAddress: session.natAddress)
|
||||||
await self.flowTracer.inc(num: data.count, type: .p2p)
|
await self.flowTracer.inc(num: data.count, type: .p2p)
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// 通过super_node进行转发
|
// 通过super_node进行转发
|
||||||
await self.udpHoleActor?.send(type: .data, data: data, remoteAddress: self.config.stunSocketAddress)
|
self.udpHole?.send(type: .data, data: data, remoteAddress: self.config.stunSocketAddress)
|
||||||
// 流量统计
|
// 流量统计
|
||||||
await self.flowTracer.inc(num: data.count, type: .forward)
|
await self.flowTracer.inc(num: data.count, type: .forward)
|
||||||
// 尝试打洞
|
// 尝试打洞
|
||||||
@ -490,7 +502,7 @@ public class SDLContext {
|
|||||||
}
|
}
|
||||||
|
|
||||||
deinit {
|
deinit {
|
||||||
self.udpHoleActor = nil
|
self.udpHole = nil
|
||||||
self.dnsClientActor = nil
|
self.dnsClientActor = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user