diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/DeletedNodeCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/DeletedNodeCleanupJob.kt index 1f94b5c6ad..a1fe31dd59 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/DeletedNodeCleanupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/DeletedNodeCleanupJob.kt @@ -27,6 +27,7 @@ package com.tencent.bkrepo.job.batch +import com.mongodb.client.result.DeleteResult import com.tencent.bkrepo.common.mongo.constant.ID import com.tencent.bkrepo.common.service.cluster.ClusterProperties import com.tencent.bkrepo.fs.server.constant.FAKE_SHA256 @@ -44,9 +45,12 @@ import org.slf4j.LoggerFactory import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.data.domain.PageRequest import org.springframework.data.mongodb.core.MongoTemplate +import org.springframework.data.mongodb.core.findOne import org.springframework.data.mongodb.core.query.Criteria import org.springframework.data.mongodb.core.query.Query +import org.springframework.data.mongodb.core.query.Update import org.springframework.data.mongodb.core.query.and +import org.springframework.data.mongodb.core.query.inValues import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.data.mongodb.core.query.where import org.springframework.stereotype.Component @@ -83,6 +87,12 @@ class DeletedNodeCleanupJob( val deleted: LocalDateTime? ) + data class FileReference( + val sha256: String, + val credentialsKey: String, + val count: String + ) + override fun getLockAtMostFor(): Duration = Duration.ofDays(7) override fun createJobContext(): DeletedNodeCleanupJobContext { @@ -121,14 +131,10 @@ class DeletedNodeCleanupJob( val deletedNodeList = mongoTemplate.find(query, Node::class.java, nodeCollectionName).takeIf { it.isNotEmpty() } ?: break logger.info("Retrieved [${deletedNodeList.size}] deleted records from ${row.projectId}/${row.name}") - deletedNodeList.forEach { node -> - cleanUpNode(row, node, nodeCollectionName) - if (node.folder) { - context.folderCount.incrementAndGet() - } else { - context.fileCount.incrementAndGet() - } - } + val folderNodeList = deletedNodeList.filter { it.folder } + cleanupFolderNode(context, folderNodeList, nodeCollectionName) + val fileNodeList = deletedNodeList.filter { !it.folder } + cleanUpFileNode(context, row, fileNodeList, nodeCollectionName) } // 仓库被标记为已删除,且该仓库下不存在任何节点时,删除仓库 if (row.deleted != null && @@ -141,24 +147,76 @@ class DeletedNodeCleanupJob( } } - private fun cleanUpNode(repo: Repository, node: Node, nodeCollectionName: String) { - var fileReferenceChanged = false + private fun cleanupFolderNode( + context: DeletedNodeCleanupJobContext, + nodes: List, + nodeCollectionName: String + ) { + val idList = nodes.map { it.id } + val query = Query.query(Criteria.where(ID).inValues(idList)) + val result = mongoTemplate.remove(query, nodeCollectionName) + context.folderCount.addAndGet(result.deletedCount) + } + + private fun cleanUpFileNode( + context: DeletedNodeCleanupJobContext, + repo: Repository, + nodes: List, + nodeCollectionName: String + ) { + val idList = nodes.filter { it.clusterNames == null || it.clusterNames.contains(clusterProperties.self.name) } + .map { it.id } + val query = Query.query(Criteria.where(ID).inValues(idList)) + val sha256List = nodes.filter { !it.sha256.isNullOrBlank() && it.sha256 != FAKE_SHA256 } + .map { it.sha256!! }.distinct() + + var result: DeleteResult? = null try { - val nodeQuery = Query.query(Criteria.where(ID).isEqualTo(node.id)) - mongoTemplate.remove(nodeQuery, nodeCollectionName) - if (!node.folder - && (node.clusterNames == null || node.clusterNames.contains(clusterProperties.self.name)) - ) { - fileReferenceChanged = decrementFileReference(node, repo) - } + result = mongoTemplate.remove(query, nodeCollectionName) + decrementFileReferences(sha256List, repo.credentialsKey) } catch (ignored: Exception) { - logger.error("Clean up deleted node[$node] failed in collection[$nodeCollectionName].", ignored) - if (fileReferenceChanged) { - incrementFileReference(node, repo) + logger.error("Clean up deleted nodes[$idList] failed in collection[$nodeCollectionName].", ignored) + } + + context.fileCount.addAndGet(result?.deletedCount ?: 0) + } + + private fun decrementFileReferences(sha256List: List, credentialsKey: String?) { + sha256List.forEach { + val collectionName = COLLECTION_FILE_REFERENCE + MongoShardingUtils.shardingSequence(it, SHARDING_COUNT) + val criteria = buildCriteria(it, credentialsKey) + criteria.and(FileReference::count.name).gt(0) + val query = Query(criteria) + val update = Update().apply { inc(FileReference::count.name, -1) } + val result = mongoTemplate.updateFirst(query, update, collectionName) + + if (result.modifiedCount == 1L) { + logger.info("Decrement references of file [$it] on credentialsKey [$credentialsKey].") + return@forEach + } + + val newQuery = Query(buildCriteria(it, credentialsKey)) + mongoTemplate.findOne(newQuery, collectionName) ?: run { + logger.error("Failed to decrement reference of file [$it] on credentialsKey [$credentialsKey]") + return@forEach } + + logger.error( + "Failed to decrement reference of file [$it] on credentialsKey [$credentialsKey]: " + + "reference count is 0." + ) } } + private fun buildCriteria( + it: String, + credentialsKey: String? + ): Criteria { + val criteria = Criteria.where(FileReference::sha256.name).`is`(it) + criteria.and(FileReference::credentialsKey.name).`is`(credentialsKey) + return criteria + } + private fun buildNodeQuery(projectId: String, repoName: String, deletedBefore: LocalDateTime? = null): Query { val criteria = where(Node::projectId).isEqualTo(projectId) .and(Node::repoName).isEqualTo(repoName) @@ -183,6 +241,7 @@ class DeletedNodeCleanupJob( companion object { private val logger = LoggerFactory.getLogger(DeletedNodeCleanupJob::class.java) private const val COLLECTION_NODE_PREFIX = "node_" + private const val COLLECTION_FILE_REFERENCE = "file_reference_" private const val COLLECTION_REPOSITORY = "repository" private const val PAGE_SIZE = 1000 } diff --git a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/service/file/impl/FileReferenceServiceImpl.kt b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/service/file/impl/FileReferenceServiceImpl.kt index 821dc9953e..2df53820ab 100644 --- a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/service/file/impl/FileReferenceServiceImpl.kt +++ b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/service/file/impl/FileReferenceServiceImpl.kt @@ -92,24 +92,28 @@ class FileReferenceServiceImpl( } override fun decrement(sha256: String, credentialsKey: String?): Boolean { - val query = buildQuery(sha256, credentialsKey) - val fileReference = fileReferenceDao.findOne(query) ?: run { + val criteria = Criteria.where(TFileReference::sha256.name).`is`(sha256) + criteria.and(TFileReference::credentialsKey.name).`is`(credentialsKey) + criteria.and(TFileReference::count.name).gt(0) + val query = Query(criteria) + val update = Update().apply { inc(TFileReference::count.name, -1) } + val result = fileReferenceDao.updateFirst(query, update) + + if (result.modifiedCount == 1L) { + logger.info("Decrement references of file [$sha256] on credentialsKey [$credentialsKey].") + return true + } + + fileReferenceDao.findOne(buildQuery(sha256, credentialsKey)) ?: run { logger.error("Failed to decrement reference of file [$sha256] on credentialsKey [$credentialsKey]") return false } - return if (fileReference.count >= 1) { - val update = Update().apply { inc(TFileReference::count.name, -1) } - fileReferenceDao.upsert(query, update) - logger.info("Decrement references of file [$sha256] on credentialsKey [$credentialsKey].") - true - } else { - logger.error( - "Failed to decrement reference of file [$sha256] on credentialsKey [$credentialsKey]: " + - "reference count is 0." - ) - false - } + logger.error( + "Failed to decrement reference of file [$sha256] on credentialsKey [$credentialsKey]: " + + "reference count is 0." + ) + return false } override fun count(sha256: String, credentialsKey: String?): Long {