Skip to content

Commit

Permalink
fix: 修复预加载计划生成时查询node超时 #2658
Browse files Browse the repository at this point in the history
* fix: 修复预加载计划生成时查询node超时 #2658

* fix: 修复预加载计划生成时查询node超时 #2658

* fix: 增加预加载策略生成相关debug日志 #2658
  • Loading branch information
cnlkl authored Nov 25, 2024
1 parent cf0f44b commit 311cb3f
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ import com.tencent.bkrepo.common.metadata.service.repo.RepositoryService
import com.tencent.bkrepo.common.mongo.dao.util.Pages
import com.tencent.bkrepo.repository.pojo.node.NodeDetail
import com.tencent.bkrepo.repository.pojo.node.NodeInfo
import com.tencent.bkrepo.repository.pojo.node.NodeListOption
import com.tencent.bkrepo.repository.pojo.repo.RepositoryInfo
import org.slf4j.LoggerFactory
import org.springframework.data.domain.PageRequest
Expand Down Expand Up @@ -112,13 +111,20 @@ class ArtifactPreloadPlanServiceImpl(
if (!properties.enabled) {
return
}
val option = NodeListOption(pageSize = properties.maxNodes, includeFolder = false)
val res = nodeService.listNodePageBySha256(sha256, option)
val nodes = res.records
val nodes = nodeService.listNodeBySha256(
sha256 = sha256,
limit = properties.maxNodes,
includeMetadata = false,
includeDeleted = false,
tillLimit = false
)
if (nodes.size >= properties.maxNodes) {
// 限制查询出来的最大node数量,避免预加载计划创建时间过久
logger.warn("nodes of sha256[$sha256] exceed max page size[${properties.maxNodes}]")
return
} else if (nodes.isEmpty()) {
logger.debug("nodes of sha256[$sha256] found")
return
}
// node属于同一项目仓库的概率较大,缓存避免频繁查询策略
val strategyCache = HashMap<String, List<ArtifactPreloadStrategy>>()
Expand All @@ -127,11 +133,15 @@ class ArtifactPreloadPlanServiceImpl(
for (node in nodes) {
val repo = repositoryCache.get(buildRepoId(node.projectId, node.repoName))
if (repo.storageCredentialsKey != credentialsKey) {
logger.debug("credentialsKey of repo[${repo.name}] not match dst credentialsKey[${credentialsKey}]")
continue
}
val strategies = strategyCache.getOrPut(buildRepoId(node.projectId, node.repoName)) {
strategyService.list(node.projectId, node.repoName)
}
if (strategies.isEmpty()) {
logger.debug("preload strategy of repo[${repo.projectId}/${repo.name}] is empty")
}
strategies.forEach { strategy ->
matchAndGeneratePlan(strategy, node, repo.storageCredentialsKey)?.let { plans.add(it) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers.anyBoolean
import org.mockito.ArgumentMatchers.anyString
import org.mockito.kotlin.any
import org.mockito.kotlin.anyOrNull
Expand Down Expand Up @@ -135,9 +136,9 @@ class ArtifactPreloadPlanServiceImplTest @Autowired constructor(
node.copy(fullPath = "test.txt"),
node.copy(fullPath = "test2.txt"),
)
whenever(nodeService.listNodePageBySha256(anyString(), any())).thenReturn(
Pages.ofResponse(Pages.ofRequest(0, 2000), nodes.size.toLong(), nodes)
)
whenever(
nodeService.listNodeBySha256(anyString(), any(), anyBoolean(), anyBoolean(), anyBoolean())
).thenReturn(nodes)

preloadPlanService.generatePlan(null, UT_SHA256)
plans = preloadPlanService.plans(UT_PROJECT_ID, UT_REPO_NAME, Pages.ofRequest(0, 10)).records
Expand Down Expand Up @@ -177,9 +178,9 @@ class ArtifactPreloadPlanServiceImplTest @Autowired constructor(
for (i in 0..1000) {
nodes.add(buildNodeInfo())
}
whenever(nodeService.listNodePageBySha256(anyString(), any())).thenReturn(
Pages.ofResponse(Pages.ofRequest(0, 2000), nodes.size.toLong(), nodes)
)
whenever(
nodeService.listNodeBySha256(anyString(), any(), anyBoolean(), anyBoolean(), anyBoolean())
).thenReturn(nodes)
preloadPlanService.generatePlan(null, UT_SHA256)
val plans = preloadPlanService.plans(UT_PROJECT_ID, UT_REPO_NAME, Pages.ofRequest(0, 10)).records
assertEquals(0, plans.size)
Expand Down Expand Up @@ -218,9 +219,9 @@ class ArtifactPreloadPlanServiceImplTest @Autowired constructor(
buildRepo(projectId = projectId, repoName = repoName)
)
val nodes = listOf(buildNodeInfo(projectId, repoName))
whenever(nodeService.listNodePageBySha256(anyString(), any())).thenReturn(
Pages.ofResponse(Pages.ofRequest(0, 20), 1L, nodes)
)
whenever(
nodeService.listNodeBySha256(anyString(), any(), anyBoolean(), anyBoolean(), anyBoolean())
).thenReturn(nodes)
}

// 构造测试数据
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package com.tencent.bkrepo.common.metadata.dao.node

import com.tencent.bkrepo.common.api.constant.DEFAULT_PAGE_SIZE
import com.tencent.bkrepo.common.api.constant.StringPool
import com.tencent.bkrepo.common.artifact.path.PathUtils
import com.tencent.bkrepo.common.metadata.condition.SyncCondition
Expand Down Expand Up @@ -145,6 +146,53 @@ class NodeDao : HashShardingMongoDao<TNode>() {
return pageWithoutShardingKey(pageRequest, query)
}

/**
* 根据[sha256]查询node列表,用于不需要分页的场景提高查询速度
*
* @param sha256 待查询sha256
* @param limit 查询选项
* @param includeMetadata 是否包含元数据
* @param includeDeleted 是否包含被删除的制品
* @param tillLimit 为true时将遍历所有分表直到查询到的结果数量达到limit
*
* @return 指定sha256的node列表
*/
fun listBySha256(
sha256: String,
limit: Int = DEFAULT_PAGE_SIZE,
includeMetadata: Boolean =false,
includeDeleted: Boolean = true,
tillLimit: Boolean = true,
): List<TNode> {
// 构造查询条件
val criteria = where(TNode::sha256).isEqualTo(sha256).and(TNode::folder).isEqualTo(false)
if (!includeDeleted) {
criteria.and(TNode::deleted).isEqualTo(null)
}
val query = Query(criteria)
if (!includeMetadata) {
query.fields().exclude(TNode::metadata.name)
}

if (shardingCount <= 0 || shardingCount > MAX_SHARDING_COUNT_OF_PAGE_QUERY) {
throw UnsupportedOperationException()
}

// 遍历所有分表进行查询
val template = determineMongoTemplate()
val result = ArrayList<TNode>()
for (sequence in 0 until shardingCount) {
query.limit(limit - result.size)
val collectionName = parseSequenceToCollectionName(sequence)
result.addAll(template.find(query, classType, collectionName))
if (result.isNotEmpty() && !tillLimit || result.size == limit) {
break
}
}

return result
}

companion object {
fun buildRootNode(projectId: String, repoName: String): TNode {
return TNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package com.tencent.bkrepo.common.metadata.service.node

import com.tencent.bkrepo.common.api.constant.DEFAULT_PAGE_SIZE
import com.tencent.bkrepo.common.api.pojo.Page
import com.tencent.bkrepo.common.artifact.api.ArtifactInfo
import com.tencent.bkrepo.repository.pojo.node.NodeDetail
Expand Down Expand Up @@ -66,6 +67,17 @@ interface NodeBaseOperation {
*/
fun listNodePageBySha256(sha256: String, option: NodeListOption): Page<NodeInfo>

/**
* 根据sha256列出指定数量的节点
*/
fun listNodeBySha256(
sha256: String,
limit: Int = DEFAULT_PAGE_SIZE,
includeMetadata: Boolean =false,
includeDeleted: Boolean = true,
tillLimit: Boolean = true,
): List<NodeInfo>

/**
* 判断节点是否存在
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,16 @@ abstract class NodeBaseService(
)
}

override fun listNodeBySha256(
sha256: String,
limit: Int,
includeMetadata: Boolean,
includeDeleted: Boolean,
tillLimit: Boolean
): List<NodeInfo> {
return nodeDao.listBySha256(sha256, limit, includeMetadata, includeDeleted, tillLimit).map { convert(it)!! }
}

override fun checkExist(artifact: ArtifactInfo): Boolean {
return nodeDao.exists(artifact.projectId, artifact.repoName, artifact.getArtifactFullPath())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,6 @@ abstract class ShardingMongoDao<E> : AbstractMongoDao<E>() {
companion object {

private val logger = LoggerFactory.getLogger(ShardingMongoDao::class.java)
private const val MAX_SHARDING_COUNT_OF_PAGE_QUERY = 256
const val MAX_SHARDING_COUNT_OF_PAGE_QUERY = 256
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ import com.tencent.bkrepo.repository.pojo.node.service.NodeCreateRequest
import com.tencent.bkrepo.common.metadata.service.node.NodeService
import com.tencent.bkrepo.common.metadata.service.project.ProjectService
import com.tencent.bkrepo.common.metadata.service.repo.RepositoryService
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
Expand All @@ -61,68 +62,90 @@ class NodeQueryWithoutShardingKeyTest @Autowired constructor(
private val nodeService: NodeService
) : ServiceBaseTest() {

// 创建测试数据sha256为sha256-0,sha256-1...sha256-8,sha256-9
// sha256-0和sha256-1有18条数据,其余sha256各有15条数据
val generateSha256Func = { i: Int -> "sha256-${i % 10}" }

@BeforeAll
fun beforeAll() {
initMock()
generateTestData(52, generateSha256Func)
}

@Test
@DisplayName("测试按SHA256分页查询")
fun testListNodePageBySha256() {
// 创建测试数据sha256为sha256-0,sha256-1...sha256-8,sha256-9
// sha256-0和sha256-1有18条数据,其余sha256各有15条数据
val generateSha256Func = { i: Int -> "sha256-${i % 10}" }
generateTestData(52, generateSha256Func)
val option = NodeListOption(1, 5, includeMetadata = true, sort = true)
// 测试获取不存在的Node列表
nodeService.listNodePageBySha256("notExistsSha256", option).apply {
Assertions.assertEquals(0L, totalRecords)
Assertions.assertEquals(0L, totalPages)
Assertions.assertTrue(records.isEmpty())
assertEquals(0L, totalRecords)
assertEquals(0L, totalPages)
assertTrue(records.isEmpty())
}

// 测试数据量小于pageSize的情况
nodeService.listNodePageBySha256(generateSha256Func(1), option.copy(pageSize = 20)).apply {
Assertions.assertEquals(18, totalRecords)
Assertions.assertEquals(1, totalPages)
Assertions.assertEquals(18, records.size)
assertEquals(18, totalRecords)
assertEquals(1, totalPages)
assertEquals(18, records.size)
}

// 测试获取数据量等于pageSize的页
nodeService.listNodePageBySha256(generateSha256Func(1), option.copy(pageSize = 4)).apply {
Assertions.assertEquals(18, totalRecords)
Assertions.assertEquals(5, totalPages)
Assertions.assertEquals(4, records.size)
assertEquals(18, totalRecords)
assertEquals(5, totalPages)
assertEquals(4, records.size)
}


// 测试获数据量小于pageSize的页
nodeService.listNodePageBySha256(generateSha256Func(1), option.copy(pageSize = 4, pageNumber = 5)).apply {
Assertions.assertEquals(18, totalRecords)
Assertions.assertEquals(5, totalPages)
Assertions.assertEquals(2, records.size)
assertEquals(18, totalRecords)
assertEquals(5, totalPages)
assertEquals(2, records.size)
}

// 测试获取不存在的页
nodeService.listNodePageBySha256(generateSha256Func(1), option.copy(pageSize = 4, pageNumber = 6)).apply {
Assertions.assertEquals(18, totalRecords)
Assertions.assertEquals(5, totalPages)
Assertions.assertEquals(0, records.size)
assertEquals(18, totalRecords)
assertEquals(5, totalPages)
assertEquals(0, records.size)
}

// 测试分页数据在两个分表的情况,[0,1,2,3,4, 5][6,7,8,9, 10,11][12,13,14,15,16,17]
nodeService.listNodePageBySha256(generateSha256Func(1), option.copy(pageSize = 5, pageNumber = 2)).apply {
Assertions.assertEquals(18, totalRecords)
Assertions.assertEquals(4, totalPages)
Assertions.assertEquals(5, records.size)
Assertions.assertEquals(PROJECT_SHARDING_207, records[0].projectId)
Assertions.assertEquals(PROJECT_SHARDING_208, records[1].projectId)
Assertions.assertEquals(PROJECT_SHARDING_208, records[2].projectId)
Assertions.assertEquals(PROJECT_SHARDING_208, records[3].projectId)
Assertions.assertEquals(PROJECT_SHARDING_208, records[4].projectId)
assertEquals(18, totalRecords)
assertEquals(4, totalPages)
assertEquals(5, records.size)
assertEquals(PROJECT_SHARDING_207, records[0].projectId)
assertEquals(PROJECT_SHARDING_208, records[1].projectId)
assertEquals(PROJECT_SHARDING_208, records[2].projectId)
assertEquals(PROJECT_SHARDING_208, records[3].projectId)
assertEquals(PROJECT_SHARDING_208, records[4].projectId)
}
}

@Test
@DisplayName("测试按SHA256查询")
fun testListNodeBySha256() {
val sha256 = generateSha256Func(1)

// 测试获取不存在的Node列表
nodeService.listNodeBySha256("notExistsSha256").apply { assertEquals(0, size) }

// 测试单表数据量小于limit的情况
nodeService.listNodeBySha256(sha256, 20, tillLimit = false).apply { assertEquals(6, size) }
nodeService.listNodeBySha256(sha256, 100, tillLimit = true).apply { assertEquals(18, size) }

// 测试单表数据量等于limit的页
nodeService.listNodeBySha256(sha256, 6, tillLimit = true).apply { assertEquals(6, size) }
nodeService.listNodeBySha256(sha256, 6, tillLimit = false).apply { assertEquals(6, size) }

// 测试获数据量小于pageSize的页
nodeService.listNodeBySha256(sha256, 3, tillLimit = true).apply { assertEquals(3, size) }
nodeService.listNodeBySha256(sha256, 3, tillLimit = false).apply { assertEquals(3, size) }
}

private fun generateTestData(size: Int, generateSha256Func: (Int) -> String) {
for (projectId in arrayOf(PROJECT_SHARDING_207, PROJECT_SHARDING_208, PROJECT_SHARDING_209)) {
createProject(projectService, projectId)
Expand Down

0 comments on commit 311cb3f

Please sign in to comment.