Skip to content

Commit

Permalink
feat: 添加限流模块 #1539
Browse files Browse the repository at this point in the history
* feat: 增加限流功能#1539

* feat: 添加执行机器判断#1539

* feat: 代码调整#1539

* feat: 代码调整#1539

* feat: 针对带宽限流调整#1539

* feat: 只针对上传流量进行判断#1539

* feat: 添加测试代码#1539

* feat: 添加限流相关测试用例#1539

* feat: 测试代码调整#1539

* feat: 代码调整#1539

* feat: 上传限流调整#1539

* feat: 上传下载限流异常捕获#1539

* feat: 测试代码调整#1539

* feat: 测试代码调整#1539

* feat: 配置参数调整#1539

* feat: 由于redis expire只支持秒为单位,所以周期最小单位为秒#1539

* feat: contentLengthLong为-1的情况特殊处理#1539

* feat: 去掉多余引用#1539

* feat: 测试用例调整#1539

* feat: 时间参数从redisserver读取#1539

* feat: embeddedredis版本调整#1539

* feat: 测试用例调整#1539

* feat: 支持对url进行仓库维度限频#1539

* feat: 代码调整#1539

* feat: 后台增加限流配置页面 #2655

* feat: 后台增加限流配置页面 #2655

* feat: 后台增加限流配置页面 #2655

* feat: 后台增加限流配置页面 #2655

* feat: 后台增加限流配置页面 #2655

* feat: 代码回滚,因redis版本问题,获取统一时间逻辑调整#1539

* feat: 代码调整#1539

* feat: 后台增加限流配置页面 #2655

* feat: 异常显示调整#1539

* feat: 后台增加限流配置页面 #2655

* feat: 正则匹配规则修改#1539

* feat: 正则匹配规则修改#1539

* feat: 下载容量限制添加#1539

* feat: 后台增加限流配置页面 #2655

* feat: 后台增加限流配置页面 #2655

* feat: 代码调整#1539

* feat: 滑动窗口脚本调整#1539

* feat: 后台增加限流配置页面 #2655

* feat: targets过滤调整#1539

* feat: 代码调整#1539

* feat: 后台增加限流配置页面 #2655

* feat: 读取项目仓库信息增加异常捕获#1539

* feat: 读取项目仓库信息增加异常捕获#1539

* feat: 使用globalexceptionhandler处理#1539

* feat: targets校验调整#1539

* feat: 测试代码调整#1539

* feat: 后台增加限流配置页面 #2655

* feat: 代码调整#1539

* feat: 代码调整#1539

* feat: 代码调整#1539

---------

Co-authored-by: lannoy0523 <46735290+lannoy0523@users.noreply.github.com>
Co-authored-by: lannoy0523 <935275025@qq.com>
  • Loading branch information
3 people authored Nov 5, 2024
1 parent a96e0fe commit 4401758
Show file tree
Hide file tree
Showing 139 changed files with 14,911 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.common.api.exception

import com.tencent.bkrepo.common.api.constant.HttpStatus
import com.tencent.bkrepo.common.api.message.CommonMessageCode

/**
* 超过限流配置异常
*/
class OverloadException(
val resource: String
) : ErrorCodeException(CommonMessageCode.RATE_LIMITER_OVERLOAD, resource, status = HttpStatus.TOO_MANY_REQUESTS)
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ enum class CommonMessageCode(private val key: String) : MessageCode {
MEDIA_TYPE_UNACCEPTABLE("system.media-type.unacceptable"),
TOO_MANY_REQUESTS("too.many.requests"),
PIPELINE_NOT_RUNNING("pipeline.not-running"),
INVALID_CONFIG("system.config.invalid"),
ACQUIRE_LOCK_FAILED("acquire.lock.failed"),
RATE_LIMITER_OVERLOAD("rate.limiter.overload")
;

override fun getBusinessCode() = ordinal + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ operation.cross-cluster.not-allowed=Cross location operation is not allowed
system.media-type.unacceptable=Unacceptable Media Type
too.many.requests=Too Many Requests: {0}
pipeline.not-running=Pipeline[{0}] is not running status
system.config.invalid=Config [{0}] is invalid
acquire.lock.failed=acquire lock failed:[{0}]
rate.limiter.overload=resource requests reached rate limit:[{0}]
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ operation.cross-cluster.not-allowed=不允许跨地点操作
system.media-type.unacceptable=不接受的Media Type
too.many.requests=请求过多: {0}
pipeline.not-running=流水线[{0}]不是运行状态
system.config.invalid=配置[{0}]无效
acquire.lock.failed=获取锁失败: [{0}]
rate.limiter.overload=资源请求量超过限流值: [{0}]
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ operation.cross-cluster.not-allowed=不允許跨地點操作
system.media-type.unacceptable=不接受的Media Type
too.many.requests=請求過多: {0}
pipeline.not-running=流水線[{0}]不是運行狀態
system.config.invalid=配置[{0}]無效
acquire.lock.failed=獲取鎖失敗: [{0}]
rate.limiter.overload=資源請求量超過限流值: [{0}]
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dependencies {
api(project(":common:common-security"))
api(project(":common:common-artifact:artifact-api"))
api(project(":common:common-storage:storage-service"))
api(project(":common:common-ratelimiter"))
api(project(":common:common-stream"))
api(project(":common:common-metrics-push"))
api(project(":common:common-metadata:metadata-service"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import com.tencent.bkrepo.common.artifact.resolve.path.ResolverMap
import com.tencent.bkrepo.common.artifact.resolve.response.ArtifactResourceWriter
import com.tencent.bkrepo.common.artifact.resolve.response.DefaultArtifactResourceWriter
import com.tencent.bkrepo.common.storage.config.StorageProperties
import com.tencent.bkrepo.common.ratelimiter.service.RequestLimitCheckService
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
Expand Down Expand Up @@ -84,8 +85,14 @@ class ArtifactResolverConfiguration {

@Bean
@ConditionalOnMissingBean(ArtifactResourceWriter::class)
fun artifactResourceWriter(storageProperties: StorageProperties): ArtifactResourceWriter {
return DefaultArtifactResourceWriter(storageProperties)
fun artifactResourceWriter(
storageProperties: StorageProperties,
requestLimitCheckService: RequestLimitCheckService
): ArtifactResourceWriter {
return DefaultArtifactResourceWriter(
storageProperties,
requestLimitCheckService
)
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,23 @@
package com.tencent.bkrepo.common.artifact.resolve.file

import com.tencent.bkrepo.common.api.constant.retry
import com.tencent.bkrepo.common.api.exception.OverloadException
import com.tencent.bkrepo.common.artifact.exception.ArtifactReceiveException
import com.tencent.bkrepo.common.artifact.hash.sha256
import com.tencent.bkrepo.common.artifact.metrics.ArtifactMetrics
import com.tencent.bkrepo.common.artifact.metrics.TrafficHandler
import com.tencent.bkrepo.common.artifact.stream.DigestCalculateListener
import com.tencent.bkrepo.common.artifact.stream.rateLimit
import com.tencent.bkrepo.common.artifact.util.http.IOExceptionUtils
import com.tencent.bkrepo.common.ratelimiter.service.RequestLimitCheckService
import com.tencent.bkrepo.common.ratelimiter.stream.CommonRateLimitInputStream
import com.tencent.bkrepo.common.storage.config.MonitorProperties
import com.tencent.bkrepo.common.storage.config.ReceiveProperties
import com.tencent.bkrepo.common.storage.core.locator.HashFileLocator
import com.tencent.bkrepo.common.storage.config.MonitorProperties
import com.tencent.bkrepo.common.storage.monitor.StorageHealthMonitor
import com.tencent.bkrepo.common.storage.monitor.Throughput
import com.tencent.bkrepo.common.storage.util.createFile
import com.tencent.bkrepo.common.storage.util.delete
import org.slf4j.LoggerFactory
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.IOException
Expand All @@ -57,6 +59,7 @@ import java.security.SecureRandom
import java.time.Duration
import kotlin.math.abs
import kotlin.system.measureTimeMillis
import org.slf4j.LoggerFactory

/**
* artifact数据接收类,作用:
Expand All @@ -75,6 +78,8 @@ class ArtifactDataReceiver(
private val filename: String = generateRandomName(),
private val randomPath: Boolean = false,
private val originPath: Path = path,
private val requestLimitCheckService: RequestLimitCheckService? = null,
private val contentLength: Long? = null,
) : StorageHealthMonitor.Observer, AutoCloseable {

/**
Expand Down Expand Up @@ -187,9 +192,15 @@ class ArtifactDataReceiver(
startTime = System.nanoTime()
}
try {
requestLimitCheckService?.uploadBandwidthCheck(
length.toLong(),
receiveProperties.circuitBreakerThreshold
)
writeData(chunk, offset, length)
} catch (exception: IOException) {
handleIOException(exception)
} catch (overloadEx: OverloadException) {
handleOverloadException(overloadEx)
}
}

Expand All @@ -203,13 +214,18 @@ class ArtifactDataReceiver(
startTime = System.nanoTime()
}
try {
requestLimitCheckService?.uploadBandwidthCheck(
1, receiveProperties.circuitBreakerThreshold
)
checkFallback()
outputStream.write(b)
listener.data(b)
received += 1
checkThreshold()
} catch (exception: IOException) {
handleIOException(exception)
} catch (overloadEx: OverloadException) {
handleOverloadException(overloadEx)
}
}

Expand All @@ -222,8 +238,13 @@ class ArtifactDataReceiver(
if (startTime == 0L) {
startTime = System.nanoTime()
}
var rateLimitFlag = false
var exp: Exception? = null
try {
val input = source.rateLimit(receiveProperties.rateLimit.toBytes())
val input = requestLimitCheckService?.bandwidthCheck(
source, receiveProperties.circuitBreakerThreshold, contentLength
) ?: source.rateLimit(receiveProperties.rateLimit.toBytes())
rateLimitFlag = input is CommonRateLimitInputStream
val buffer = ByteArray(bufferSize)
input.use {
var bytes = input.read(buffer)
Expand All @@ -233,7 +254,15 @@ class ArtifactDataReceiver(
}
}
} catch (exception: IOException) {
exp = exception
handleIOException(exception)
} catch (overloadEx: OverloadException) {
exp = overloadEx
handleOverloadException(overloadEx)
} finally {
if (rateLimitFlag) {
requestLimitCheckService?.bandwidthFinish(exp)
}
}
}

Expand Down Expand Up @@ -322,16 +351,28 @@ class ArtifactDataReceiver(
* 处理IO异常
*/
private fun handleIOException(exception: IOException) {
finished = true
endTime = System.nanoTime()
close()
finishWithException()
if (IOExceptionUtils.isClientBroken(exception)) {
throw ArtifactReceiveException(exception.message.orEmpty())
} else {
throw exception
}
}

/**
* 处理限流请求
*/
private fun handleOverloadException(exception: OverloadException) {
finishWithException()
throw exception
}

private fun finishWithException() {
finished = true
endTime = System.nanoTime()
close()
}

/**
* 检查是否需要fall back操作
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import com.tencent.bkrepo.common.artifact.resolve.file.multipart.MultipartArtifa
import com.tencent.bkrepo.common.artifact.resolve.file.stream.StreamArtifactFile
import com.tencent.bkrepo.common.bksync.BlockChannel
import com.tencent.bkrepo.common.storage.config.StorageProperties
import com.tencent.bkrepo.common.ratelimiter.service.RequestLimitCheckService
import com.tencent.bkrepo.common.storage.credentials.StorageCredentials
import com.tencent.bkrepo.common.storage.monitor.StorageHealthMonitor
import com.tencent.bkrepo.common.storage.monitor.StorageHealthMonitorHelper
Expand All @@ -54,17 +55,20 @@ import java.io.InputStream
class ArtifactFileFactory(
storageProperties: StorageProperties,
storageHealthMonitorHelper: StorageHealthMonitorHelper,
private val limitCheckService: RequestLimitCheckService
) {

init {
monitorHelper = storageHealthMonitorHelper
properties = storageProperties
requestLimitCheckService = limitCheckService
}

companion object {

private lateinit var monitorHelper: StorageHealthMonitorHelper
private lateinit var properties: StorageProperties
private lateinit var requestLimitCheckService: RequestLimitCheckService

const val ARTIFACT_FILES = "artifact.files"

Expand All @@ -89,34 +93,49 @@ class ArtifactFileFactory(
* 构造分块接收数据的artifact file
*/
fun buildChunked(): ChunkedArtifactFile {
return ChunkedArtifactFile(getMonitor(), properties, getStorageCredentials()).apply {
return ChunkedArtifactFile(
getMonitor(), properties, getStorageCredentials(),
).apply {
track(this)
}
}

fun buildChunked(storageCredentials: StorageCredentials): ChunkedArtifactFile {
return ChunkedArtifactFile(getMonitor(storageCredentials), properties, storageCredentials).apply {
return ChunkedArtifactFile(
getMonitor(storageCredentials), properties, storageCredentials,
).apply {
track(this)
}
}

fun buildDfsArtifactFile(): RandomAccessArtifactFile {
return RandomAccessArtifactFile(getMonitor(), getStorageCredentials(), properties).apply {
return RandomAccessArtifactFile(
getMonitor(), getStorageCredentials(), properties,
).apply {
track(this)
}
}

/**
* 通过输入流构造artifact file, 主要针对上传请求对其做限流操作
* @param inputStream 输入流
*/
fun buildWithRateLimiter(inputStream: InputStream, contentLength: Long? = null): ArtifactFile {
return StreamArtifactFile(
inputStream, getMonitor(), properties, getStorageCredentials(), contentLength,
requestLimitCheckService = requestLimitCheckService
).apply {
track(this)
}
}

/**
* 通过输入流构造artifact file
* 通过输入流构造artifact file,服务内部输入流转换成文件使用
* @param inputStream 输入流
*/
fun build(inputStream: InputStream, contentLength: Long? = null): ArtifactFile {
return StreamArtifactFile(
inputStream,
getMonitor(),
properties,
getStorageCredentials(),
contentLength,
inputStream, getMonitor(), properties, getStorageCredentials(), contentLength
).apply {
track(this)
}
Expand All @@ -137,10 +156,8 @@ class ArtifactFileFactory(
*/
fun build(multipartFile: MultipartFile, storageCredentials: StorageCredentials): ArtifactFile {
return MultipartArtifactFile(
multipartFile,
getMonitor(storageCredentials),
properties,
storageCredentials,
multipartFile, getMonitor(storageCredentials), properties, storageCredentials,
requestLimitCheckService = requestLimitCheckService
).apply {
track(this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.nio.file.NoSuchFileException
class RandomAccessArtifactFile(
private val monitor: StorageHealthMonitor,
private val storageCredentials: StorageCredentials,
storageProperties: StorageProperties
storageProperties: StorageProperties,
) : ArtifactFile {

/**
Expand All @@ -43,7 +43,9 @@ class RandomAccessArtifactFile(

init {
val path = storageCredentials.upload.location.toPath()
receiver = ArtifactDataReceiver(storageProperties.receive, storageProperties.monitor, path)
receiver = ArtifactDataReceiver(
storageProperties.receive, storageProperties.monitor, path,
)
monitor.add(receiver)
if (!monitor.healthy.get()) {
receiver.unhealthy(monitor.getFallbackPath(), monitor.fallBackReason)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package com.tencent.bkrepo.common.artifact.resolve.file.multipart

import com.tencent.bkrepo.common.artifact.resolve.file.stream.StreamArtifactFile
import com.tencent.bkrepo.common.storage.config.StorageProperties
import com.tencent.bkrepo.common.ratelimiter.service.RequestLimitCheckService
import com.tencent.bkrepo.common.storage.credentials.StorageCredentials
import com.tencent.bkrepo.common.storage.monitor.StorageHealthMonitor
import org.springframework.web.multipart.MultipartFile
Expand All @@ -37,9 +38,11 @@ class MultipartArtifactFile(
private val multipartFile: MultipartFile,
monitor: StorageHealthMonitor,
storageProperties: StorageProperties,
storageCredentials: StorageCredentials
storageCredentials: StorageCredentials,
requestLimitCheckService: RequestLimitCheckService
) : StreamArtifactFile(
multipartFile.inputStream, monitor, storageProperties, storageCredentials, multipartFile.size
multipartFile.inputStream, monitor, storageProperties, storageCredentials, multipartFile.size,
requestLimitCheckService
) {
fun getOriginalFilename() = multipartFile.originalFilename.orEmpty()
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ class ArtifactFileMethodArgumentResolver : HandlerMethodArgumentResolver {
}

private fun resolveOctetStream(request: HttpServletRequest): ArtifactFile {
return ArtifactFileFactory.build(request.inputStream, request.contentLengthLong)
return ArtifactFileFactory.buildWithRateLimiter(request.inputStream, request.contentLengthLong)
}
}
Loading

0 comments on commit 4401758

Please sign in to comment.