Skip to content

Commit

Permalink
Merge pull request #1204 from zacYL/issue_1195
Browse files Browse the repository at this point in the history
feat: 监听节点重命名事件,并更新对应目录信息#1195
  • Loading branch information
owenlxu authored Oct 10, 2023
2 parents f727bcc + 5ac8afe commit ad8eca1
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ import com.tencent.bkrepo.common.artifact.event.node.NodeCopiedEvent
import com.tencent.bkrepo.common.artifact.event.node.NodeCreatedEvent
import com.tencent.bkrepo.common.artifact.event.node.NodeDeletedEvent
import com.tencent.bkrepo.common.artifact.event.node.NodeMovedEvent
import com.tencent.bkrepo.common.artifact.event.node.NodeRenamedEvent
import com.tencent.bkrepo.common.artifact.path.PathUtils
import com.tencent.bkrepo.common.artifact.path.PathUtils.combineFullPath
import com.tencent.bkrepo.repository.dao.NodeDao
import com.tencent.bkrepo.repository.model.TNode
import com.tencent.bkrepo.repository.service.node.NodeService
Expand All @@ -48,6 +50,7 @@ import org.springframework.data.mongodb.core.query.Query
import org.springframework.data.mongodb.core.query.and
import org.springframework.data.mongodb.core.query.isEqualTo
import org.springframework.data.mongodb.core.query.where
import org.springframework.scheduling.annotation.Async
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.time.LocalDateTime
Expand Down Expand Up @@ -86,10 +89,12 @@ class NodeModifyEventListener(
EventType.NODE_COPIED,
EventType.NODE_CREATED,
EventType.NODE_DELETED,
EventType.NODE_MOVED
EventType.NODE_MOVED,
EventType.NODE_RENAMED
)


@Async
@EventListener(ArtifactEvent::class)
fun handle(event: ArtifactEvent) {
if (!acceptTypes.contains(event.type)) {
Expand Down Expand Up @@ -133,49 +138,70 @@ class NodeModifyEventListener(
logger.info("event type ${event.type}")
val modifiedNodeList = mutableListOf<ModifiedNodeInfo>()
when (event.type) {
EventType.NODE_MOVED -> {
require(event is NodeMovedEvent)
val dstFullPath = buildDstFullPath(event.dstFullPath, event.resourceKey)
val createdNode = ModifiedNodeInfo(
projectId = event.dstProjectId,
repoName = event.dstRepoName,
fullPath = dstFullPath
)
EventType.NODE_DELETED -> {
require(event is NodeDeletedEvent)
val deletedNode = ModifiedNodeInfo(
projectId = event.projectId,
repoName = event.repoName,
fullPath = event.resourceKey,
deleted = true
)
modifiedNodeList.add(createdNode)
modifiedNodeList.add(deletedNode)
}
EventType.NODE_DELETED -> {
require(event is NodeDeletedEvent)
EventType.NODE_CREATED -> {
require(event is NodeCreatedEvent)
val createdNode = ModifiedNodeInfo(
projectId = event.projectId,
repoName = event.repoName,
fullPath = event.resourceKey
)
modifiedNodeList.add(createdNode)
}
EventType.NODE_RENAMED -> {
require(event is NodeRenamedEvent)
// 节点重命名逻辑和其他操作不同,它会对旧节点下的目录删除,然后新建,但是对于非目录节点是进行更新动作,而不是删除再新建
// 节点重命名操作只需要更新该节点下的子目录的统计信息,不需要更新其上层目录统计信息
val renamedNode = ModifiedNodeInfo(
projectId = event.projectId,
repoName = event.repoName,
fullPath = event.newFullPath,
includePrefix = event.newFullPath
)
modifiedNodeList.add(renamedNode)
}
EventType.NODE_MOVED -> {
require(event is NodeMovedEvent)
// 1 move空目录,2 move到已存在目录, 3 move到新目录 4 同路径,跳过 5 src为dst目录下的子节点,跳过
// 针对1 2 两种情况,需要判断原目录中的节点,然后再进行目标目录的统计信息更新
val createdNode = ModifiedNodeInfo(
projectId = event.dstProjectId,
repoName = event.dstRepoName,
fullPath = event.dstFullPath,
srcProjectId = event.projectId,
srcRepoName = event.repoName,
srcFullPath = event.resourceKey,
srcDeleted = true
)
val deletedNode = ModifiedNodeInfo(
projectId = event.projectId,
repoName = event.repoName,
fullPath = event.resourceKey,
deleted = true
)
modifiedNodeList.add(createdNode)
modifiedNodeList.add(deletedNode)
}
EventType.NODE_COPIED -> {
require(event is NodeCopiedEvent)
val dstFullPath = buildDstFullPath(event.dstFullPath, event.resourceKey)
// 1 copy空目录, 2 copy到已存在目录,3 copy到新目录 4 同路径,跳过 5 src为dst目录下的子节点,跳过
// 针对1 2 两种情况,需要判断原目录中的节点,然后再进行目标目录的统计信息更新
val createdNode = ModifiedNodeInfo(
projectId = event.dstProjectId,
repoName = event.dstRepoName,
fullPath = dstFullPath
)
modifiedNodeList.add(createdNode)
}
EventType.NODE_CREATED -> {
require(event is NodeCreatedEvent)
val createdNode = ModifiedNodeInfo(
projectId = event.projectId,
repoName = event.repoName,
fullPath = event.resourceKey
fullPath = event.dstFullPath,
srcProjectId = event.projectId,
srcRepoName = event.repoName,
srcFullPath = event.resourceKey,
)
modifiedNodeList.add(createdNode)
}
Expand Down Expand Up @@ -203,64 +229,83 @@ class NodeModifyEventListener(
logger.info("start to stat modified node size with fullPath ${node.fullPath}" +
" in repo ${node.projectId}|${node.repoName}")
if (node.folder) {
val sourceNodes = filterSourceNodesFromMoveOrCopy(modifiedNode)
logger.info("the size of node ${modifiedNode.srcFullPath} is ${sourceNodes?.size}" +
" in repo ${modifiedNode.srcProjectId}|${modifiedNode.srcRepoName}")
if (sourceNodes != null && sourceNodes.isEmpty()) return
findAndCacheSubFolders(
artifactInfo = artifactInfo,
deleted = node.nodeInfo.deleted,
deletedFlag = modifiedNode.deleted
deletedFlag = modifiedNode.deleted,
includePrefix = modifiedNode.includePrefix,
sourceNodes = sourceNodes
)
} else {
updateCache(
projectId = artifactInfo.projectId,
repoName = artifactInfo.repoName,
fullPath = artifactInfo.getArtifactFullPath(),
size = node.size,
deleted = modifiedNode.deleted
deleted = modifiedNode.deleted,
includePrefix = modifiedNode.includePrefix
)
}
}

/**
* 更新缓存
* 当要更新包含该文件所有的目录的缓存记录时includePrefix为空
* 当只需要更新特定目录前缀目录的缓存记录时设置includePrefix
*/
private fun updateCache(
projectId: String,
repoName: String,
fullPath: String,
size: Long,
deleted: Boolean = false
deleted: Boolean = false,
includePrefix: String? = null
) {

// 更新当前节点所有上级目录统计信息
PathUtils.resolveAncestorFolder(fullPath).forEach{
if (it != PathUtils.ROOT) {
val key = Triple(projectId, repoName, it)
var (cachedSize, nodeNum) = cache.getIfPresent(key) ?: Pair(0L, 0L)
if (deleted) {
cachedSize -= size
nodeNum -= 1
} else {
cachedSize += size
nodeNum += 1
}
cache.put(key, Pair(cachedSize, nodeNum))
val folderPaths = PathUtils.resolveAncestorFolder(fullPath)
folderPaths.forEach { it ->
if (it == PathUtils.ROOT) return@forEach
// 当只需要更新特定目录前缀目录的缓存记录时设置folderPrefix
if (!includePrefix.isNullOrEmpty() && !it.startsWith(includePrefix)) return@forEach
val key = Triple(projectId, repoName, it)
var (cachedSize, nodeNum) = cache.getIfPresent(key) ?: Pair(0L, 0L)
if (deleted) {
cachedSize -= size
nodeNum -= 1
} else {
cachedSize += size
nodeNum += 1
}
cache.put(key, Pair(cachedSize, nodeNum))
}
}

private fun findAndCacheSubFolders(
artifactInfo: ArtifactInfo,
deleted: String? = null,
deletedFlag: Boolean = false
deletedFlag: Boolean = false,
includePrefix: String? = null,
sourceNodes: List<String>? = null
) {
findAllNodesUnderFolder(
artifactInfo.projectId,
artifactInfo.repoName,
artifactInfo.getArtifactFullPath(),
deleted = deleted
).forEach {
if (!sourceNodes.isNullOrEmpty() && !sourceNodes.contains(it.fullPath)) return@forEach
updateCache(
projectId = artifactInfo.projectId,
repoName = artifactInfo.repoName,
fullPath = it.fullPath.getFolderPath(),
fullPath = it.fullPath,
size = it.size,
deleted = deletedFlag
deleted = deletedFlag,
includePrefix = includePrefix
)
}
}
Expand All @@ -278,6 +323,42 @@ class NodeModifyEventListener(
}


/**
* 针对move/copy情况下目标节点是目录的情况下,过滤出变更的节点信息
* 可能情况:1 源节点为空目录 2 目标节点为已存在的目录,其下可能已经包含文件
*/
private fun filterSourceNodesFromMoveOrCopy(modifiedNode: ModifiedNodeInfo): List<String>? {
if (modifiedNode.srcFullPath.isNullOrEmpty()) return null
val artifactInfo = ArtifactInfo(
projectId = modifiedNode.srcProjectId!!,
repoName = modifiedNode.srcRepoName!!,
artifactUri = modifiedNode.srcFullPath!!
)
val sourceNodes = mutableListOf<String>()
val node = if (modifiedNode.srcDeleted) {
nodeService.getDeletedNodeDetail(artifactInfo).firstOrNull() ?: return emptyList()
} else {
// 查询节点信息,当节点新增,然后删除后可能会找不到节点
nodeService.getNodeDetail(artifactInfo)
?: nodeService.getDeletedNodeDetail(artifactInfo).firstOrNull() ?: return emptyList()
}
val path = PathUtils.resolveParent(modifiedNode.srcFullPath!!)
if (node.folder) {
findAllNodesUnderFolder(
artifactInfo.projectId,
artifactInfo.repoName,
artifactInfo.getArtifactFullPath(),
node.nodeInfo.deleted
).map {
sourceNodes.add(combineFullPath(modifiedNode.fullPath, it.fullPath.removePrefix(path)))
}
} else {
sourceNodes.add(combineFullPath(modifiedNode.fullPath, node.fullPath.removePrefix(path)))
}
return sourceNodes
}


/**
* 查询目录下的节点,排除path为"/"的节点
*/
Expand All @@ -303,28 +384,23 @@ class NodeModifyEventListener(
return Query(criteria).withHint(TNode.FULL_PATH_IDX)
}


private fun buildDstFullPath(dstFullPath: String, srcFullPath: String): String {
val path = PathUtils.toPath(dstFullPath)
val name = PathUtils.resolveName(srcFullPath)
return PathUtils.combineFullPath(path, name)
}

private fun String.getFolderPath(): String {
val path = PathUtils.resolveParent(this)
return PathUtils.normalizeFullPath(path)
}

private data class ModifiedNodeInfo(
var projectId: String,
var repoName: String,
var fullPath: String,
var deleted: Boolean = false
var deleted: Boolean = false,
// 针对重命名去过滤上层目录
var includePrefix: String? = null,
// 针对move/copy 目标节点是目录的情况下去判断来源节点信息
var srcProjectId: String? = null,
var srcRepoName: String? = null,
var srcFullPath: String? = null,
var srcDeleted: Boolean = false
)

companion object {
private val logger = LoggerFactory.getLogger(NodeModifyEventListener::class.java)
private const val FIXED_DELAY = 30000L
private const val FIXED_DELAY = 10000L
private val IGNORE_PROJECT_PREFIX_LIST = listOf("CODE_", "CLOSED_SOURCE_", "git_")
private val IGNORE_REPO_LIST = listOf(REPORT, LOG)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,17 +199,17 @@ abstract class NodeBaseService(
deletedTime: LocalDateTime?
) {
with(node) {
if (isGenericRepo(repo)) {
publishEvent(buildCreatedEvent(node))
}
reportNode2Bkbase(node)
val createEnd = System.currentTimeMillis()
val timeout = createEnd - createStart > repositoryProperties.nodeCreateTimeout
if (timeout) {
logger.info("Create node[$fullPath] timeout")
rollbackCreate(parents, node, deletedTime)
throw ErrorCodeException(ArtifactMessageCode.NODE_CREATE_TIMEOUT, fullPath)
}
if (isGenericRepo(repo)) {
publishEvent(buildCreatedEvent(node))
}
reportNode2Bkbase(node)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,11 @@ open class NodeMoveCopySupport(

override fun moveNode(moveRequest: NodeMoveCopyRequest) {
moveCopy(moveRequest, true)
publishEvent(NodeEventFactory.buildMovedEvent(moveRequest))
logger.info("Move node success: [$moveRequest]")
}

override fun copyNode(copyRequest: NodeMoveCopyRequest) {
moveCopy(copyRequest, false)
publishEvent(NodeEventFactory.buildCopiedEvent(copyRequest))
logger.info("Copy node success: [$copyRequest]")
}

Expand All @@ -97,6 +95,11 @@ open class NodeMoveCopySupport(
} else {
moveCopyFile(this)
}
if (move) {
publishEvent(NodeEventFactory.buildMovedEvent(request))
} else {
publishEvent(NodeEventFactory.buildCopiedEvent(request))
}
}
}

Expand Down Expand Up @@ -184,6 +187,8 @@ open class NodeMoveCopySupport(
path = dstPath,
name = dstName,
fullPath = dstFullPath,
size = if (node.folder) 0 else node.size,
nodeNum = if (node.folder) null else node.nodeNum,
lastModifiedBy = operator,
lastModifiedDate = LocalDateTime.now()
)
Expand Down

0 comments on commit ad8eca1

Please sign in to comment.