fix
This commit is contained in:
parent
6b7e02a65b
commit
969921c438
@ -50,6 +50,7 @@ actor SDLContextActor {
|
||||
private var udpHoleWorkers: [Task<Void, Never>]?
|
||||
private var udpHoleV6: SDLUDPHoleV6?
|
||||
private var udpHoleV6Workers: [Task<Void, Never>]?
|
||||
private var udpHoleV6LocalAddress: SocketAddress?
|
||||
|
||||
// dns的client对象
|
||||
private var dnsClient: DNSCloudClient?
|
||||
@ -213,7 +214,7 @@ actor SDLContextActor {
|
||||
await self.handleRegisterSuperNak(nakPacket: registerSuperNak)
|
||||
case .peerInfo(let peerInfo):
|
||||
//SDLLogger.shared.log("[SDLContext] peer message: \(peerInfo)")
|
||||
await self.puncherActor.handlePeerInfo(using: self.udpHole, peerInfo: peerInfo)
|
||||
await self.puncherActor.handlePeerInfo(using: self.udpHole, udpHoleV6: self.udpHoleV6, peerInfo: peerInfo)
|
||||
case .event(let event):
|
||||
await self.handleEvent(event: event)
|
||||
case .policyReponse(let policyResponse):
|
||||
@ -360,7 +361,6 @@ actor SDLContextActor {
|
||||
()
|
||||
}
|
||||
}
|
||||
SDLLogger.log("[SDLContext] udp signalTask cancel")
|
||||
}
|
||||
|
||||
self.udpHole = udpHole
|
||||
@ -381,8 +381,29 @@ actor SDLContextActor {
|
||||
let localAddress = try udpHoleV6.start()
|
||||
SDLLogger.log("[SDLContext] udpHoleV6 started, on address: \(localAddress)")
|
||||
|
||||
// ip地址只会收到到 register:registerAck | data
|
||||
let messageTask = Task.detached {
|
||||
for await (remoteAddress, message) in udpHoleV6.messageStream {
|
||||
if Task.isCancelled {
|
||||
break
|
||||
}
|
||||
|
||||
switch message {
|
||||
case .register(let register):
|
||||
try? await self.handleRegister(remoteAddress: remoteAddress, register: register)
|
||||
case .registerAck(let registerAck):
|
||||
await self.handleRegisterAck(remoteAddress: remoteAddress, registerAck: registerAck)
|
||||
case .data(let data):
|
||||
try? await self.handleHoleData(data: data)
|
||||
default:
|
||||
()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.udpHoleV6 = udpHoleV6
|
||||
self.udpHoleV6Workers = []
|
||||
self.udpHoleV6LocalAddress = localAddress
|
||||
self.udpHoleV6Workers = [messageTask]
|
||||
|
||||
return udpHoleV6
|
||||
}
|
||||
@ -405,6 +426,7 @@ actor SDLContextActor {
|
||||
self.udpHoleV6Workers = nil
|
||||
self.udpHoleV6?.stop()
|
||||
self.udpHoleV6 = nil
|
||||
self.udpHoleV6LocalAddress = nil
|
||||
|
||||
self.quicWorker?.cancel()
|
||||
self.quicWorker = nil
|
||||
@ -494,10 +516,13 @@ actor SDLContextActor {
|
||||
stunRequest.mac = self.config.networkAddress.mac
|
||||
stunRequest.natType = UInt32(self.natType.rawValue)
|
||||
stunRequest.sessionToken = sessionToken
|
||||
if let v6Info = self.makeUDPHoleV6Info() {
|
||||
stunRequest.v6Info = v6Info
|
||||
}
|
||||
|
||||
if let stunData = try? stunRequest.serializedData() {
|
||||
let remoteAddress = self.config.stunSocketAddress
|
||||
self.udpHole?.send(type: .stunRequest, data: stunData, remoteAddress: remoteAddress)
|
||||
self.sendHolePacket(type: .stunRequest, data: stunData, remoteAddress: remoteAddress)
|
||||
}
|
||||
}
|
||||
|
||||
@ -574,14 +599,22 @@ actor SDLContextActor {
|
||||
sessionManager.removeSession(dstMac: dstMac)
|
||||
case .sendRegister(let sendRegisterEvent):
|
||||
SDLLogger.log("[SDLContext] sendRegisterEvent, ip: \(sendRegisterEvent)")
|
||||
let address = SDLUtil.int32ToIp(sendRegisterEvent.natIp)
|
||||
if let remoteAddress = try? SocketAddress.makeAddressResolvingHost(address, port: Int(sendRegisterEvent.natPort)) {
|
||||
// 发送register包
|
||||
var register = SDLRegister()
|
||||
register.networkID = self.config.networkAddress.networkId
|
||||
register.srcMac = self.config.networkAddress.mac
|
||||
register.dstMac = sendRegisterEvent.dstMac
|
||||
self.udpHole?.send(type: .register, data: try! register.serializedData(), remoteAddress: remoteAddress)
|
||||
let registerData = try! register.serializedData()
|
||||
|
||||
if sendRegisterEvent.natIp > 0 && sendRegisterEvent.natPort > 0 {
|
||||
let address = SDLUtil.int32ToIp(sendRegisterEvent.natIp)
|
||||
if let remoteAddress = try? SocketAddress.makeAddressResolvingHost(address, port: Int(sendRegisterEvent.natPort)) {
|
||||
self.sendHolePacket(type: .register, data: registerData, remoteAddress: remoteAddress)
|
||||
}
|
||||
}
|
||||
|
||||
if sendRegisterEvent.hasV6Info, let remoteAddress = try? await sendRegisterEvent.v6Info.socketAddress() {
|
||||
self.sendHolePacket(type: .register, data: registerData, remoteAddress: remoteAddress)
|
||||
}
|
||||
case .shutdown(let shutdownEvent):
|
||||
let alertNotice = NoticeMessage.alert(alert: shutdownEvent.message)
|
||||
@ -626,7 +659,7 @@ actor SDLContextActor {
|
||||
registerAck.srcMac = networkAddr.mac
|
||||
registerAck.dstMac = register.srcMac
|
||||
|
||||
self.udpHole?.send(type: .registerAck, data: try registerAck.serializedData(), remoteAddress: remoteAddress)
|
||||
self.sendHolePacket(type: .registerAck, data: try registerAck.serializedData(), remoteAddress: remoteAddress)
|
||||
// 这里需要建立到来源的会话, 在复杂网络下,通过super-node查询到的nat地址不一定靠谱,需要通过udp包的来源地址作为nat地址
|
||||
let session = Session(dstMac: register.srcMac, natAddress: remoteAddress)
|
||||
self.sessionManager.addSession(session: session)
|
||||
@ -741,20 +774,6 @@ actor SDLContextActor {
|
||||
return false
|
||||
}
|
||||
|
||||
// 流量统计
|
||||
// public func flowReportTask() {
|
||||
// Task {
|
||||
// // 每分钟汇报一次
|
||||
// self.flowTracerCancel = Timer.publish(every: 60.0, on: .main, in: .common).autoconnect()
|
||||
// .sink { _ in
|
||||
// Task {
|
||||
// let (forwardNum, p2pNum, inboundNum) = await self.flowTracer.reset()
|
||||
// await self.superClient?.flowReport(forwardNum: forwardNum, p2pNum: p2pNum, inboundNum: inboundNum)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// 开始读取数据, 用单独的线程处理packetFlow
|
||||
private func startReader() {
|
||||
// 停止之前的任务
|
||||
@ -874,7 +893,9 @@ actor SDLContextActor {
|
||||
let networkAddr = self.config.networkAddress
|
||||
// 将数据封装层2层的数据包
|
||||
let layerPacket = LayerPacket(dstMac: dstMac, srcMac: networkAddr.mac, type: type, data: data)
|
||||
guard let udpHole = self.udpHole, let dataCipher = self.dataCipher, let encodedPacket = try? dataCipher.encrypt(plainText: layerPacket.marshal()) else {
|
||||
guard (self.udpHole != nil || self.udpHoleV6 != nil),
|
||||
let dataCipher = self.dataCipher,
|
||||
let encodedPacket = try? dataCipher.encrypt(plainText: layerPacket.marshal()) else {
|
||||
return
|
||||
}
|
||||
|
||||
@ -891,18 +912,18 @@ actor SDLContextActor {
|
||||
// 广播地址不要去尝试打洞
|
||||
if ARPPacket.isBroadcastMac(dstMac) {
|
||||
// 通过super_node进行转发
|
||||
udpHole.send(type: .data, data: data, remoteAddress: self.config.stunSocketAddress)
|
||||
self.sendHolePacket(type: .data, data: data, remoteAddress: self.config.stunSocketAddress)
|
||||
}
|
||||
else {
|
||||
// 通过session发送到对端
|
||||
if let session = self.sessionManager.getSession(toAddress: dstMac) {
|
||||
SDLLogger.log("[SDLContext] step 5 send packet by session: \(session)", for: .trace)
|
||||
udpHole.send(type: .data, data: data, remoteAddress: session.natAddress)
|
||||
self.sendHolePacket(type: .data, data: data, remoteAddress: session.natAddress)
|
||||
self.flowTracer.inc(num: data.count, type: .p2p)
|
||||
}
|
||||
else {
|
||||
// 通过super_node进行转发
|
||||
udpHole.send(type: .data, data: data, remoteAddress: self.config.stunSocketAddress)
|
||||
self.sendHolePacket(type: .data, data: data, remoteAddress: self.config.stunSocketAddress)
|
||||
SDLLogger.log("[SDLContext] step 5 send packet by super: \(self.config.stunSocketAddress)", for: .trace)
|
||||
// 流量统计
|
||||
self.flowTracer.inc(num: data.count, type: .forward)
|
||||
@ -965,6 +986,43 @@ actor SDLContextActor {
|
||||
SDLLogger.log("[SDLContext] nat_type is: \(natType)")
|
||||
}
|
||||
|
||||
// 发送hole的数据
|
||||
private func sendHolePacket(type: SDLPacketType, data: Data, remoteAddress: SocketAddress) {
|
||||
switch remoteAddress {
|
||||
case .v4:
|
||||
self.udpHole?.send(type: type, data: data, remoteAddress: remoteAddress)
|
||||
case .v6:
|
||||
self.udpHoleV6?.send(type: type, data: data, remoteAddress: remoteAddress)
|
||||
default:
|
||||
SDLLogger.log("[SDLContext] unsupported remoteAddress: \(remoteAddress)", for: .debug)
|
||||
}
|
||||
}
|
||||
|
||||
private func makeUDPHoleV6Info() -> SDLV6Info? {
|
||||
guard let port = self.udpHoleV6LocalAddress?.port else {
|
||||
return nil
|
||||
}
|
||||
|
||||
let interfaces = NetworkInterfaceManager.getInterfaces()
|
||||
let interface = interfaces.first { interface in
|
||||
interface.ip.contains(":")
|
||||
&& !interface.name.hasPrefix("utun")
|
||||
&& !interface.ip.hasPrefix("fe80:")
|
||||
&& interface.ip != "::"
|
||||
}
|
||||
|
||||
guard let interface,
|
||||
let ipData = SDLUtil.ipv6StrToData(interface.ip) else {
|
||||
return nil
|
||||
}
|
||||
|
||||
var v6Info = SDLV6Info()
|
||||
v6Info.port = UInt32(port)
|
||||
v6Info.v6 = ipData
|
||||
|
||||
return v6Info
|
||||
}
|
||||
|
||||
private func spawnLoop(_ body: @escaping () async throws -> Void) -> Task<Void, Never> {
|
||||
return Task.detached {
|
||||
while !Task.isCancelled {
|
||||
@ -1037,6 +1095,7 @@ actor SDLContextActor {
|
||||
deinit {
|
||||
self.udpHole = nil
|
||||
self.udpHoleV6 = nil
|
||||
self.udpHoleV6LocalAddress = nil
|
||||
self.dnsClient = nil
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user