处理周期行的权限更新问题
This commit is contained in:
parent
4e781e881c
commit
5f8647d402
@ -75,6 +75,7 @@ actor SDLContextActor {
|
||||
|
||||
// 处理权限控制
|
||||
private let identifyStore: IdentityStore
|
||||
private var updatePolicyTask: Task<Void, Never>?
|
||||
private let snapshotPublisher: SnapshotPublisher<IdentitySnapshot>
|
||||
|
||||
// 注册任务
|
||||
@ -164,7 +165,7 @@ actor SDLContextActor {
|
||||
await self.handleEvent(event: event)
|
||||
case .policyReponse(let policyResponse):
|
||||
// 处理权限的请求问题
|
||||
await self.identifyStore.apply(policyResponse: policyResponse)
|
||||
await self.identifyStore.applyPolicyResponse(policyResponse)
|
||||
case .arpResponse(let arpResponse):
|
||||
await self.arpServer.handleArpResponse(arpResponse: arpResponse)
|
||||
}
|
||||
@ -336,6 +337,9 @@ actor SDLContextActor {
|
||||
|
||||
self.registerTask?.cancel()
|
||||
self.registerTask = nil
|
||||
|
||||
self.updatePolicyTask?.cancel()
|
||||
self.updatePolicyTask = nil
|
||||
}
|
||||
|
||||
private func setNatType(natType: SDLNATProberActor.NatType) {
|
||||
@ -353,6 +357,7 @@ actor SDLContextActor {
|
||||
self.doRegisterSuper()
|
||||
try? await Task.sleep(for: .seconds(5))
|
||||
if self.state == .registered {
|
||||
await self.whenRegistedSuper()
|
||||
break
|
||||
}
|
||||
SDLLogger.shared.log("[SDLContext] register super failed, retry")
|
||||
@ -361,6 +366,19 @@ actor SDLContextActor {
|
||||
}
|
||||
}
|
||||
|
||||
// 注册成功super的回调函数
|
||||
private func whenRegistedSuper() async {
|
||||
self.updatePolicyTask?.cancel()
|
||||
|
||||
self.updatePolicyTask = Task {
|
||||
while !Task.isCancelled {
|
||||
try? await Task.sleep(for: .seconds(300))
|
||||
SDLLogger.shared.log("[SDLContext] updatePolicyTask execute")
|
||||
await self.identifyStore.batUpdatePolicy(using: self.quicClient, dstIdentityID: self.config.identityId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func sendStunRequest() {
|
||||
guard let sessionToken = self.sessionToken else {
|
||||
return
|
||||
@ -572,11 +590,7 @@ actor SDLContextActor {
|
||||
} else {
|
||||
SDLLogger.shared.log("[SDLContext] not found identity: \(data.identityID) ruleMap", level: .debug)
|
||||
// 向服务器请求权限逻辑
|
||||
var policyRequest = SDLPolicyRequest()
|
||||
policyRequest.srcIdentityID = data.identityID
|
||||
policyRequest.dstIdentityID = self.config.identityId
|
||||
|
||||
await self.identifyStore.policyRequest(using: self.quicClient, request: &policyRequest)
|
||||
await self.identifyStore.policyRequest(srcIdentityId: data.identityID, dstIdentityId: self.config.identityId, using: self.quicClient)
|
||||
}
|
||||
default:
|
||||
SDLLogger.shared.log("[SDLContext] get invalid packet", level: .debug)
|
||||
|
||||
@ -1,14 +1,13 @@
|
||||
//
|
||||
// IdentityStore.swift
|
||||
// punchnet
|
||||
//
|
||||
// 1. 需要增加规则基于轮训更新的逻辑
|
||||
// Created by 安礼成 on 2026/2/5.
|
||||
//
|
||||
import Foundation
|
||||
import NIO
|
||||
|
||||
actor IdentityStore {
|
||||
typealias IdentityID = UInt32
|
||||
|
||||
// 处理权限的请求问题
|
||||
nonisolated private let cooldown: Duration = .seconds(5)
|
||||
@ -20,39 +19,58 @@ actor IdentityStore {
|
||||
nonisolated private let alloctor = ByteBufferAllocator()
|
||||
|
||||
private let publisher: SnapshotPublisher<IdentitySnapshot>
|
||||
private var identityMap: [IdentityID: IdentityRuleMap] = [:]
|
||||
private var identityMap: [UInt32: IdentityRuleMap] = [:]
|
||||
|
||||
init(publisher: SnapshotPublisher<IdentitySnapshot>) {
|
||||
self.publisher = publisher
|
||||
}
|
||||
|
||||
// 提交权限请求
|
||||
func policyRequest(using quicClient: SDLQUICClient?, request: inout SDLPolicyRequest) {
|
||||
let identityId = request.srcIdentityID
|
||||
guard let quicClient, !coolingDown.contains(identityId) else {
|
||||
// 批量更新, 有外部任务驱动,因为这里依赖于当前的quicClient
|
||||
func batUpdatePolicy(using quicClient: SDLQUICClient?, dstIdentityID: UInt32) {
|
||||
guard let quicClient else {
|
||||
return
|
||||
}
|
||||
|
||||
// 触发一次打洞
|
||||
coolingDown.insert(identityId)
|
||||
self.identityMap.keys.forEach { identityId in
|
||||
var policyRequest = SDLPolicyRequest()
|
||||
policyRequest.srcIdentityID = identityId
|
||||
policyRequest.dstIdentityID = dstIdentityID
|
||||
policyRequest.version = self.nextVersion(identityId: identityId)
|
||||
|
||||
// 发送请求
|
||||
if let queryData = try? policyRequest.serializedData() {
|
||||
quicClient.send(type: .policyRequest, data: queryData)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 提交权限请求
|
||||
func policyRequest(srcIdentityId: UInt32, dstIdentityId: UInt32, using quicClient: SDLQUICClient?) {
|
||||
guard let quicClient, !coolingDown.contains(srcIdentityId) else {
|
||||
return
|
||||
}
|
||||
|
||||
let version = self.versions[identityId, default: 1]
|
||||
request.version = version
|
||||
// 更新请求的版本问题
|
||||
self.versions[identityId] = version + 1
|
||||
var policyRequest = SDLPolicyRequest()
|
||||
policyRequest.srcIdentityID = srcIdentityId
|
||||
policyRequest.dstIdentityID = dstIdentityId
|
||||
policyRequest.version = self.nextVersion(identityId: srcIdentityId)
|
||||
|
||||
// 触发一次打洞
|
||||
coolingDown.insert(srcIdentityId)
|
||||
// 发送请求
|
||||
if let queryData = try? request.serializedData() {
|
||||
if let queryData = try? policyRequest.serializedData() {
|
||||
quicClient.send(type: .policyRequest, data: queryData)
|
||||
}
|
||||
|
||||
Task {
|
||||
// 启动冷却期
|
||||
try? await Task.sleep(for: .seconds(5))
|
||||
self.endCooldown(for: identityId)
|
||||
self.endCooldown(for: srcIdentityId)
|
||||
}
|
||||
}
|
||||
|
||||
func apply(policyResponse: SDLPolicyResponse) {
|
||||
// 处理权限的响应
|
||||
func applyPolicyResponse(_ policyResponse: SDLPolicyResponse) {
|
||||
let id = policyResponse.srcIdentityID
|
||||
let version = policyResponse.version
|
||||
|
||||
@ -72,8 +90,6 @@ actor IdentityStore {
|
||||
}
|
||||
self.identityMap[id] = IdentityRuleMap(version: version, ruleMap: ruleMap)
|
||||
|
||||
SDLLogger.shared.log("[IdentitySession] get compile Snapshot rules nums: \(self.identityMap[id]?.ruleMap.count), success: \(self.identityMap[id]?.isAllow(proto: 1, port: 80))")
|
||||
|
||||
// 发布新的快照信息
|
||||
let snapshot = compileSnapshot()
|
||||
publisher.publish(snapshot)
|
||||
@ -87,4 +103,11 @@ actor IdentityStore {
|
||||
self.coolingDown.remove(key)
|
||||
}
|
||||
|
||||
private func nextVersion(identityId: UInt32) -> UInt32 {
|
||||
let version = self.versions[identityId, default: 1]
|
||||
// 更新请求的版本问题
|
||||
self.versions[identityId] = version + 1
|
||||
|
||||
return version
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user