Skip to content

Commit

Permalink
Merge pull request #1022 from yaoxuwan/issue_1017
Browse files Browse the repository at this point in the history
feat: 优化清理已删除节点任务速度 #1017
  • Loading branch information
owenlxu authored Aug 3, 2023
2 parents 4398fd7 + 2520c46 commit ca8417f
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

package com.tencent.bkrepo.job.batch

import com.mongodb.client.result.DeleteResult
import com.tencent.bkrepo.common.mongo.constant.ID
import com.tencent.bkrepo.common.service.cluster.ClusterProperties
import com.tencent.bkrepo.fs.server.constant.FAKE_SHA256
Expand All @@ -44,9 +45,12 @@ import org.slf4j.LoggerFactory
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.data.domain.PageRequest
import org.springframework.data.mongodb.core.MongoTemplate
import org.springframework.data.mongodb.core.findOne
import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.data.mongodb.core.query.Query
import org.springframework.data.mongodb.core.query.Update
import org.springframework.data.mongodb.core.query.and
import org.springframework.data.mongodb.core.query.inValues
import org.springframework.data.mongodb.core.query.isEqualTo
import org.springframework.data.mongodb.core.query.where
import org.springframework.stereotype.Component
Expand Down Expand Up @@ -83,6 +87,12 @@ class DeletedNodeCleanupJob(
val deleted: LocalDateTime?
)

data class FileReference(
val sha256: String,
val credentialsKey: String,
val count: String
)

override fun getLockAtMostFor(): Duration = Duration.ofDays(7)

override fun createJobContext(): DeletedNodeCleanupJobContext {
Expand Down Expand Up @@ -121,14 +131,10 @@ class DeletedNodeCleanupJob(
val deletedNodeList =
mongoTemplate.find(query, Node::class.java, nodeCollectionName).takeIf { it.isNotEmpty() } ?: break
logger.info("Retrieved [${deletedNodeList.size}] deleted records from ${row.projectId}/${row.name}")
deletedNodeList.forEach { node ->
cleanUpNode(row, node, nodeCollectionName)
if (node.folder) {
context.folderCount.incrementAndGet()
} else {
context.fileCount.incrementAndGet()
}
}
val folderNodeList = deletedNodeList.filter { it.folder }
cleanupFolderNode(context, folderNodeList, nodeCollectionName)
val fileNodeList = deletedNodeList.filter { !it.folder }
cleanUpFileNode(context, row, fileNodeList, nodeCollectionName)
}
// 仓库被标记为已删除,且该仓库下不存在任何节点时,删除仓库
if (row.deleted != null &&
Expand All @@ -141,24 +147,76 @@ class DeletedNodeCleanupJob(
}
}

private fun cleanUpNode(repo: Repository, node: Node, nodeCollectionName: String) {
var fileReferenceChanged = false
private fun cleanupFolderNode(
context: DeletedNodeCleanupJobContext,
nodes: List<Node>,
nodeCollectionName: String
) {
val idList = nodes.map { it.id }
val query = Query.query(Criteria.where(ID).inValues(idList))
val result = mongoTemplate.remove(query, nodeCollectionName)
context.folderCount.addAndGet(result.deletedCount)
}

private fun cleanUpFileNode(
context: DeletedNodeCleanupJobContext,
repo: Repository,
nodes: List<Node>,
nodeCollectionName: String
) {
val idList = nodes.filter { it.clusterNames == null || it.clusterNames.contains(clusterProperties.self.name) }
.map { it.id }
val query = Query.query(Criteria.where(ID).inValues(idList))
val sha256List = nodes.filter { !it.sha256.isNullOrBlank() && it.sha256 != FAKE_SHA256 }
.map { it.sha256!! }.distinct()

var result: DeleteResult? = null
try {
val nodeQuery = Query.query(Criteria.where(ID).isEqualTo(node.id))
mongoTemplate.remove(nodeQuery, nodeCollectionName)
if (!node.folder
&& (node.clusterNames == null || node.clusterNames.contains(clusterProperties.self.name))
) {
fileReferenceChanged = decrementFileReference(node, repo)
}
result = mongoTemplate.remove(query, nodeCollectionName)
decrementFileReferences(sha256List, repo.credentialsKey)
} catch (ignored: Exception) {
logger.error("Clean up deleted node[$node] failed in collection[$nodeCollectionName].", ignored)
if (fileReferenceChanged) {
incrementFileReference(node, repo)
logger.error("Clean up deleted nodes[$idList] failed in collection[$nodeCollectionName].", ignored)
}

context.fileCount.addAndGet(result?.deletedCount ?: 0)
}

private fun decrementFileReferences(sha256List: List<String>, credentialsKey: String?) {
sha256List.forEach {
val collectionName = COLLECTION_FILE_REFERENCE + MongoShardingUtils.shardingSequence(it, SHARDING_COUNT)
val criteria = buildCriteria(it, credentialsKey)
criteria.and(FileReference::count.name).gt(0)
val query = Query(criteria)
val update = Update().apply { inc(FileReference::count.name, -1) }
val result = mongoTemplate.updateFirst(query, update, collectionName)

if (result.modifiedCount == 1L) {
logger.info("Decrement references of file [$it] on credentialsKey [$credentialsKey].")
return@forEach
}

val newQuery = Query(buildCriteria(it, credentialsKey))
mongoTemplate.findOne<FileReference>(newQuery, collectionName) ?: run {
logger.error("Failed to decrement reference of file [$it] on credentialsKey [$credentialsKey]")
return@forEach
}

logger.error(
"Failed to decrement reference of file [$it] on credentialsKey [$credentialsKey]: " +
"reference count is 0."
)
}
}

private fun buildCriteria(
it: String,
credentialsKey: String?
): Criteria {
val criteria = Criteria.where(FileReference::sha256.name).`is`(it)
criteria.and(FileReference::credentialsKey.name).`is`(credentialsKey)
return criteria
}

private fun buildNodeQuery(projectId: String, repoName: String, deletedBefore: LocalDateTime? = null): Query {
val criteria = where(Node::projectId).isEqualTo(projectId)
.and(Node::repoName).isEqualTo(repoName)
Expand All @@ -183,6 +241,7 @@ class DeletedNodeCleanupJob(
companion object {
private val logger = LoggerFactory.getLogger(DeletedNodeCleanupJob::class.java)
private const val COLLECTION_NODE_PREFIX = "node_"
private const val COLLECTION_FILE_REFERENCE = "file_reference_"
private const val COLLECTION_REPOSITORY = "repository"
private const val PAGE_SIZE = 1000
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,28 @@ class FileReferenceServiceImpl(
}

override fun decrement(sha256: String, credentialsKey: String?): Boolean {
val query = buildQuery(sha256, credentialsKey)
val fileReference = fileReferenceDao.findOne(query) ?: run {
val criteria = Criteria.where(TFileReference::sha256.name).`is`(sha256)
criteria.and(TFileReference::credentialsKey.name).`is`(credentialsKey)
criteria.and(TFileReference::count.name).gt(0)
val query = Query(criteria)
val update = Update().apply { inc(TFileReference::count.name, -1) }
val result = fileReferenceDao.updateFirst(query, update)

if (result.modifiedCount == 1L) {
logger.info("Decrement references of file [$sha256] on credentialsKey [$credentialsKey].")
return true
}

fileReferenceDao.findOne(buildQuery(sha256, credentialsKey)) ?: run {
logger.error("Failed to decrement reference of file [$sha256] on credentialsKey [$credentialsKey]")
return false
}

return if (fileReference.count >= 1) {
val update = Update().apply { inc(TFileReference::count.name, -1) }
fileReferenceDao.upsert(query, update)
logger.info("Decrement references of file [$sha256] on credentialsKey [$credentialsKey].")
true
} else {
logger.error(
"Failed to decrement reference of file [$sha256] on credentialsKey [$credentialsKey]: " +
"reference count is 0."
)
false
}
logger.error(
"Failed to decrement reference of file [$sha256] on credentialsKey [$credentialsKey]: " +
"reference count is 0."
)
return false
}

override fun count(sha256: String, credentialsKey: String?): Long {
Expand Down

0 comments on commit ca8417f

Please sign in to comment.