减少actor的使用
This commit is contained in:
parent
d930dbafad
commit
182f6ffd17
@ -46,7 +46,7 @@ actor SDLContextActor {
|
|||||||
private var dnsWorker: Task<Void, Never>?
|
private var dnsWorker: Task<Void, Never>?
|
||||||
|
|
||||||
private var quicClient: SDLQUICClient?
|
private var quicClient: SDLQUICClient?
|
||||||
private var quicWorkers: [Task<Void, Never>]?
|
private var quicWorker: Task<Void, Never>?
|
||||||
|
|
||||||
nonisolated private let puncherActor: SDLPuncherActor
|
nonisolated private let puncherActor: SDLPuncherActor
|
||||||
// 网络探测对象
|
// 网络探测对象
|
||||||
@ -136,7 +136,7 @@ actor SDLContextActor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private func startQUICClient() async throws -> SDLQUICClient {
|
private func startQUICClient() async throws -> SDLQUICClient {
|
||||||
self.quicWorkers?.forEach {$0.cancel()}
|
self.quicWorker?.cancel()
|
||||||
self.quicClient?.stop()
|
self.quicClient?.stop()
|
||||||
|
|
||||||
// 启动monitor
|
// 启动monitor
|
||||||
@ -149,7 +149,7 @@ actor SDLContextActor {
|
|||||||
try await Task.sleep(for: .seconds(0.2))
|
try await Task.sleep(for: .seconds(0.2))
|
||||||
SDLLogger.shared.log("[SDLContext] start quic client ready")
|
SDLLogger.shared.log("[SDLContext] start quic client ready")
|
||||||
|
|
||||||
let messageTask = Task.detached {
|
self.quicWorker = Task.detached {
|
||||||
for await message in quicClient.messageStream {
|
for await message in quicClient.messageStream {
|
||||||
switch message {
|
switch message {
|
||||||
case .welcome(let welcome):
|
case .welcome(let welcome):
|
||||||
@ -174,9 +174,7 @@ actor SDLContextActor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.quicClient = quicClient
|
self.quicClient = quicClient
|
||||||
self.quicWorkers = [messageTask]
|
|
||||||
|
|
||||||
return quicClient
|
return quicClient
|
||||||
}
|
}
|
||||||
@ -310,8 +308,8 @@ actor SDLContextActor {
|
|||||||
self.udpHoleWorkers?.forEach { $0.cancel() }
|
self.udpHoleWorkers?.forEach { $0.cancel() }
|
||||||
self.udpHoleWorkers = nil
|
self.udpHoleWorkers = nil
|
||||||
|
|
||||||
self.quicWorkers?.forEach { $0.cancel() }
|
self.quicWorker?.cancel()
|
||||||
self.quicWorkers = nil
|
self.quicWorker = nil
|
||||||
|
|
||||||
self.dnsWorker?.cancel()
|
self.dnsWorker?.cancel()
|
||||||
self.dnsWorker = nil
|
self.dnsWorker = nil
|
||||||
@ -433,7 +431,7 @@ actor SDLContextActor {
|
|||||||
case .natChanged(let natChangedEvent):
|
case .natChanged(let natChangedEvent):
|
||||||
let dstMac = natChangedEvent.mac
|
let dstMac = natChangedEvent.mac
|
||||||
SDLLogger.shared.log("[SDLContext] natChangedEvent, dstMac: \(dstMac)", level: .info)
|
SDLLogger.shared.log("[SDLContext] natChangedEvent, dstMac: \(dstMac)", level: .info)
|
||||||
await sessionManager.removeSession(dstMac: dstMac)
|
sessionManager.removeSession(dstMac: dstMac)
|
||||||
case .sendRegister(let sendRegisterEvent):
|
case .sendRegister(let sendRegisterEvent):
|
||||||
SDLLogger.shared.log("[SDLContext] sendRegisterEvent, ip: \(sendRegisterEvent)", level: .debug)
|
SDLLogger.shared.log("[SDLContext] sendRegisterEvent, ip: \(sendRegisterEvent)", level: .debug)
|
||||||
let address = SDLUtil.int32ToIp(sendRegisterEvent.natIp)
|
let address = SDLUtil.int32ToIp(sendRegisterEvent.natIp)
|
||||||
@ -473,7 +471,7 @@ actor SDLContextActor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func handleRegister(remoteAddress: SocketAddress, register: SDLRegister) async throws {
|
private func handleRegister(remoteAddress: SocketAddress, register: SDLRegister) throws {
|
||||||
let networkAddr = config.networkAddress
|
let networkAddr = config.networkAddress
|
||||||
SDLLogger.shared.log("register packet: \(register), network_address: \(networkAddr)", level: .debug)
|
SDLLogger.shared.log("register packet: \(register), network_address: \(networkAddr)", level: .debug)
|
||||||
|
|
||||||
@ -488,18 +486,18 @@ actor SDLContextActor {
|
|||||||
self.udpHole?.send(type: .registerAck, data: try registerAck.serializedData(), remoteAddress: remoteAddress)
|
self.udpHole?.send(type: .registerAck, data: try registerAck.serializedData(), remoteAddress: remoteAddress)
|
||||||
// 这里需要建立到来源的会话, 在复杂网络下,通过super-node查询到的nat地址不一定靠谱,需要通过udp包的来源地址作为nat地址
|
// 这里需要建立到来源的会话, 在复杂网络下,通过super-node查询到的nat地址不一定靠谱,需要通过udp包的来源地址作为nat地址
|
||||||
let session = Session(dstMac: register.srcMac, natAddress: remoteAddress)
|
let session = Session(dstMac: register.srcMac, natAddress: remoteAddress)
|
||||||
await self.sessionManager.addSession(session: session)
|
self.sessionManager.addSession(session: session)
|
||||||
} else {
|
} else {
|
||||||
SDLLogger.shared.log("SDLContext didReadRegister get a invalid packet, because dst_ip not matched: \(register.dstMac)", level: .warning)
|
SDLLogger.shared.log("SDLContext didReadRegister get a invalid packet, because dst_ip not matched: \(register.dstMac)", level: .warning)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func handleRegisterAck(remoteAddress: SocketAddress, registerAck: SDLRegisterAck) async {
|
private func handleRegisterAck(remoteAddress: SocketAddress, registerAck: SDLRegisterAck) {
|
||||||
// 判断目标地址是否是tun的网卡地址, 并且是在同一个网络下
|
// 判断目标地址是否是tun的网卡地址, 并且是在同一个网络下
|
||||||
let networkAddr = config.networkAddress
|
let networkAddr = config.networkAddress
|
||||||
if registerAck.dstMac == networkAddr.mac && registerAck.networkID == networkAddr.networkId {
|
if registerAck.dstMac == networkAddr.mac && registerAck.networkID == networkAddr.networkId {
|
||||||
let session = Session(dstMac: registerAck.srcMac, natAddress: remoteAddress)
|
let session = Session(dstMac: registerAck.srcMac, natAddress: remoteAddress)
|
||||||
await self.sessionManager.addSession(session: session)
|
self.sessionManager.addSession(session: session)
|
||||||
} else {
|
} else {
|
||||||
SDLLogger.shared.log("SDLContext didReadRegisterAck get a invalid packet, because dst_mac not matched: \(registerAck.dstMac)", level: .warning)
|
SDLLogger.shared.log("SDLContext didReadRegisterAck get a invalid packet, because dst_mac not matched: \(registerAck.dstMac)", level: .warning)
|
||||||
}
|
}
|
||||||
@ -553,7 +551,7 @@ actor SDLContextActor {
|
|||||||
let identitySnapshot = self.snapshotPublisher.current()
|
let identitySnapshot = self.snapshotPublisher.current()
|
||||||
let ruleMap = identitySnapshot.lookup(data.identityID)
|
let ruleMap = identitySnapshot.lookup(data.identityID)
|
||||||
|
|
||||||
if self.authIPPacket(ipPacket: ipPacket, ruleMap: ruleMap) {
|
if self.checkPolicy(ipPacket: ipPacket, ruleMap: ruleMap) {
|
||||||
let packet = NEPacket(data: ipPacket.data, protocolFamily: 2)
|
let packet = NEPacket(data: ipPacket.data, protocolFamily: 2)
|
||||||
self.provider.packetFlow.writePacketObjects([packet])
|
self.provider.packetFlow.writePacketObjects([packet])
|
||||||
SDLLogger.shared.log("[SDLContext] identity: \(data.identityID), allow", level: .debug)
|
SDLLogger.shared.log("[SDLContext] identity: \(data.identityID), allow", level: .debug)
|
||||||
@ -568,7 +566,7 @@ actor SDLContextActor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func authIPPacket(ipPacket: IPPacket, ruleMap: IdentityRuleMap?) -> Bool {
|
private func checkPolicy(ipPacket: IPPacket, ruleMap: IdentityRuleMap?) -> Bool {
|
||||||
// 进来的数据反转一下,然后再处理
|
// 进来的数据反转一下,然后再处理
|
||||||
if let reverseFlowSession = ipPacket.flowSession()?.reverse(),
|
if let reverseFlowSession = ipPacket.flowSession()?.reverse(),
|
||||||
self.flowSessionManager.hasSession(reverseFlowSession) {
|
self.flowSessionManager.hasSession(reverseFlowSession) {
|
||||||
|
|||||||
@ -28,7 +28,8 @@ struct Session {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
actor SessionManager {
|
class SessionManager {
|
||||||
|
private let locker = NSLock()
|
||||||
private var sessions: [Data:Session] = [:]
|
private var sessions: [Data:Session] = [:]
|
||||||
|
|
||||||
// session的有效时间
|
// session的有效时间
|
||||||
@ -36,6 +37,11 @@ actor SessionManager {
|
|||||||
|
|
||||||
func getSession(toAddress: Data) -> Session? {
|
func getSession(toAddress: Data) -> Session? {
|
||||||
let timestamp = Int32(Date().timeIntervalSince1970)
|
let timestamp = Int32(Date().timeIntervalSince1970)
|
||||||
|
locker.lock()
|
||||||
|
defer {
|
||||||
|
locker.unlock()
|
||||||
|
}
|
||||||
|
|
||||||
if var session = self.sessions[toAddress] {
|
if var session = self.sessions[toAddress] {
|
||||||
if session.lastTimestamp >= timestamp + ttl {
|
if session.lastTimestamp >= timestamp + ttl {
|
||||||
session.updateLastTimestamp(timestamp)
|
session.updateLastTimestamp(timestamp)
|
||||||
@ -50,10 +56,19 @@ actor SessionManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func addSession(session: Session) {
|
func addSession(session: Session) {
|
||||||
|
locker.lock()
|
||||||
|
defer {
|
||||||
|
locker.unlock()
|
||||||
|
}
|
||||||
self.sessions[session.dstMac] = session
|
self.sessions[session.dstMac] = session
|
||||||
}
|
}
|
||||||
|
|
||||||
func removeSession(dstMac: Data) {
|
func removeSession(dstMac: Data) {
|
||||||
|
locker.lock()
|
||||||
|
defer {
|
||||||
|
locker.unlock()
|
||||||
|
}
|
||||||
|
|
||||||
self.sessions.removeValue(forKey: dstMac)
|
self.sessions.removeValue(forKey: dstMac)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user