Skip to content

Commit

Permalink
feat: 分块上传校验range #1552 (#1570)
Browse files Browse the repository at this point in the history
* feat: 分块上传校验range #1552

* feat: 代码调整 #1552

* feat: 代码调整 #1552

* feat: 注释调整 #1552

* feat: 注释调整 #1552
  • Loading branch information
zacYL authored Dec 27, 2023
1 parent 9cdcdc8 commit be9bc6a
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ abstract class FileBlockSupport : CleanupSupport() {
}
}

override fun findLengthOfAppendFile(appendId: String, storageCredentials: StorageCredentials?): Long {
val credentials = getCredentialsOrDefault(storageCredentials)
val tempClient = getTempClient(credentials)
try {
return tempClient.length(CURRENT_PATH, appendId)
} catch (exception: Exception) {
logger.error("Failed to read length of id [$appendId] on [${credentials.key}]", exception)
throw StorageErrorException(StorageMessageCode.STORE_ERROR)
}
}

override fun append(appendId: String, artifactFile: ArtifactFile, storageCredentials: StorageCredentials?): Long {
val credentials = getCredentialsOrDefault(storageCredentials)
val tempClient = getTempClient(credentials)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ interface FileBlockOperation {
*/
fun createAppendId(storageCredentials: StorageCredentials?): String

/**
* 查询追加文件长度
*/
fun findLengthOfAppendFile(appendId: String, storageCredentials: StorageCredentials?): Long

/**
* 追加文件,返回当前文件长度
* appendId: 文件追加Id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ import java.io.IOException
import java.io.InputStream
import java.nio.channels.FileChannel
import java.nio.channels.ReadableByteChannel
import java.nio.file.FileAlreadyExistsException
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.nio.file.FileAlreadyExistsException

/**
* 本地文件存储客户端
Expand Down Expand Up @@ -297,6 +297,17 @@ class FileSystemClient(private val root: String) {
}
}

/**
* 获取文件大小
*/
fun length(dir: String, filename: String): Long {
val filePath = Paths.get(this.root, dir, filename)
if (!Files.isRegularFile(filePath)) {
throw IllegalArgumentException("[$filePath] is not a regular file.")
}
return Files.size(filePath)
}

private fun transfer(input: ReadableByteChannel, output: FileChannel, size: Long, append: Boolean = false) {
val startPosition: Long = if (append) output.size() else 0L
var bytesCopied: Long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.tencent.bkrepo.common.artifact.api.ArtifactFile
import com.tencent.bkrepo.common.service.util.HttpContextHolder
import com.tencent.bkrepo.common.storage.core.StorageService
import com.tencent.bkrepo.common.storage.credentials.StorageCredentials
import com.tencent.bkrepo.common.storage.message.StorageErrorException
import com.tencent.bkrepo.common.storage.pojo.FileInfo
import com.tencent.bkrepo.replication.constant.BOLBS_UPLOAD_FIRST_STEP_URL_STRING
import com.tencent.bkrepo.replication.exception.ReplicationMessageCode
Expand Down Expand Up @@ -74,36 +75,45 @@ class BlobChunkedServiceImpl(
) {
val range = HttpContextHolder.getRequest().getHeader("Content-Range")
val length = HttpContextHolder.getRequest().contentLength
if (!range.isNullOrEmpty() && length > -1) {
logger.info("range $range, length $length, uuid $uuid")
val (start, end) = getRangeInfo(range)
// 判断要上传的长度是否超长
if (end - start > length - 1) {
buildBlobUploadPatchResponse(

val lengthOfAppendFile = storageService.findLengthOfAppendFile(uuid, credentials)
logger.info("current length of append file is $lengthOfAppendFile")
val (patchLen, status) = when (chunkedRequestCheck(
uuid = uuid,
locationStr = buildLocationUrl(uuid, projectId, repoName),
response = HttpContextHolder.getResponse(),
range = length.toLong(),
status = HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE
lengthOfAppendFile = lengthOfAppendFile,
range = range,
contentLength = length
)) {
RangeStatus.ILLEGAL_RANGE -> {
Pair(length.toLong(), HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
}
RangeStatus.READY_TO_APPEND -> {
val patchLen = storageService.append(
appendId = uuid,
artifactFile = artifactFile,
storageCredentials = credentials
)
logger.info(
"Part of file with sha256 $sha256 in repo $projectId|$repoName " +
"has been uploaded, size pf append file is $patchLen and uuid: $uuid"
)
return
Pair(patchLen, HttpStatus.ACCEPTED)
}
else -> {
logger.info(
"Part of file with sha256 $sha256 in repo $projectId|$repoName " +
"already appended, size pf append file is $lengthOfAppendFile and uuid: $uuid")
Pair(lengthOfAppendFile, HttpStatus.ACCEPTED)
}
}
val patchLen = storageService.append(
appendId = uuid,
artifactFile = artifactFile,
storageCredentials = credentials
)
logger.info(
"Part of file with sha256 $sha256 in repo $projectId|$repoName " +
"has been uploaded, uploaded size is $patchLen uuid: $uuid,"
)
buildBlobUploadPatchResponse(
uuid = uuid,
locationStr = buildLocationUrl(uuid, projectId, repoName),
response = HttpContextHolder.getResponse(),
range = patchLen
range = patchLen,
status = status
)

}

override fun finishChunkedUpload(
Expand All @@ -123,7 +133,11 @@ class BlobChunkedServiceImpl(
} else {
null
}
val fileInfo = storageService.finishAppend(uuid, credentials, originalFileInfo)
val fileInfo = try {
storageService.finishAppend(uuid, credentials, originalFileInfo)
} catch (e: StorageErrorException) {
throw BadRequestException(ReplicationMessageCode.REPLICA_ARTIFACT_BROKEN, sha256)
}
logger.info(
"The file with sha256 $sha256 in repo $projectId|$repoName has been uploaded with uuid: $uuid," +
" received sha256 of file is ${fileInfo.sha256}")
Expand All @@ -142,11 +156,55 @@ class BlobChunkedServiceImpl(
)
}

private fun chunkedRequestCheck(
contentLength: Int,
range: String?,
uuid: String,
lengthOfAppendFile: Long
): RangeStatus {
// 当range不存在或者length < 0时
if (!validateValue(contentLength, range)) {
return RangeStatus.ILLEGAL_RANGE
}
logger.info("range $range, length $contentLength, uuid $uuid")
val (start, end) = getRangeInfo(range!!)
// 当上传的长度和range内容不匹配时
return if ((end - start) != (contentLength - 1).toLong()) {
RangeStatus.ILLEGAL_RANGE
} else {
// 当追加的文件大小和range的起始大小一致时代表写入正常
if (start == lengthOfAppendFile) {
RangeStatus.READY_TO_APPEND
} else if (start > lengthOfAppendFile) {
// 当追加的文件大小比start小时,说明文件写入有误
RangeStatus.ILLEGAL_RANGE
} else {
// 当追加的文件大小==end+1时,可能存在重试导致已经写入一次
if (lengthOfAppendFile == end + 1) {
RangeStatus.ALREADY_APPENDED
} else {
// 当追加的文件大小大于start时,并且不等于end+1时,文件已损坏
RangeStatus.ILLEGAL_RANGE
}
}
}
}

private fun validateValue(contentLength: Int, range: String?): Boolean {
return !(range.isNullOrEmpty() || contentLength < 0)
}

private fun buildLocationUrl(uuid: String, projectId: String, repoName: String) : String {
val path = BOLBS_UPLOAD_FIRST_STEP_URL_STRING.format(projectId, repoName)
return serviceName+path+uuid
}

enum class RangeStatus {
ILLEGAL_RANGE,
ALREADY_APPENDED,
READY_TO_APPEND;
}

companion object {
private val logger = LoggerFactory.getLogger(BlobChunkedServiceImpl::class.java)
}
Expand Down

0 comments on commit be9bc6a

Please sign in to comment.