Skip to content

Commit

Permalink
feat: 增加ddc服务批量操作接口 #2776
Browse files Browse the repository at this point in the history
  • Loading branch information
cnlkl committed Nov 28, 2024
1 parent 2c19920 commit ebffbe6
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@

package com.tencent.bkrepo.ddc.pojo

import com.tencent.bkrepo.common.api.exception.ErrorCodeException
import com.tencent.bkrepo.common.api.message.CommonMessageCode
import com.tencent.bkrepo.ddc.serialization.CbArray
import com.tencent.bkrepo.ddc.serialization.CbField
import com.tencent.bkrepo.ddc.serialization.CbObject
import com.tencent.bkrepo.ddc.utils.isBool
Expand All @@ -43,9 +46,11 @@ data class BatchOps(
) {
companion object {
fun deserialize(byteArray: ByteArray): BatchOps {
val ops = CbObject(ByteBuffer.wrap(byteArray))[BatchOps::ops.name].asArray().map {
deserializeBatchOp(BatchOp(), it.asObject())
val opsCbArray = CbObject(ByteBuffer.wrap(byteArray))[BatchOps::ops.name].asArray()
if (opsCbArray == CbArray.EMPTY) {
throw ErrorCodeException(CommonMessageCode.PARAMETER_INVALID, "ops is empty")
}
val ops = opsCbArray.map { deserializeBatchOp(BatchOp(), it.asObject()) }
return BatchOps(ops)
}

Expand All @@ -60,8 +65,10 @@ data class BatchOps(
field.isObject() -> field.asObject()
else -> throw RuntimeException("unsupported field type ${field.getType()}")
}
cbObject.forEach { field ->
obj.javaClass.getDeclaredField(field.name).set(obj, valOf(field))
cbObject.forEach { cbField ->
val field = obj.javaClass.getDeclaredField(cbField.name)
field.isAccessible = true
field.set(obj, valOf(cbField))
}
return obj
}
Expand All @@ -72,22 +79,22 @@ data class BatchOps(
* 操作
*/
data class BatchOp(
var opId: Int = 0,
var bucket: String = "",
var key: String = "",
var op: String = Operation.INVALID.toString(),
val opId: Int = 0,
val bucket: String = "",
val key: String = "",
val op: String = Operation.INVALID.toString(),
/**
* 是否检查ref引用的所有blob是否存在
*/
var resolveAttachments: Boolean = false,
val resolveAttachments: Boolean = false,
/**
* ref inline blob,op为PUT时有值
*/
var payload: CbObject? = null,
val payload: CbObject? = null,
/**
* ref inline blob hash, op为PUT时有值
*/
var payloadHash: String? = null,
val payloadHash: String? = null,
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import com.tencent.bkrepo.ddc.serialization.CbFieldType
import com.tencent.bkrepo.ddc.serialization.CbObject
import com.tencent.bkrepo.ddc.utils.beginUniformArray
import com.tencent.bkrepo.ddc.utils.writeInteger
import com.tencent.bkrepo.ddc.utils.writerObject

data class BatchOpsResponse(
val results: List<OpResponse>
Expand All @@ -41,10 +42,7 @@ data class BatchOpsResponse(
results.forEach {
writer.beginObject()
writer.writeInteger(OpResponse::opId.name, it.opId)
// 移除response CbObject的type与name信息,仅保留其payload
val resPayload = it.response.getView()
resPayload.position(1)
writer.writeField(CbFieldType.Object, OpResponse::response.name, resPayload.remaining()).put(resPayload)
writer.writerObject(OpResponse::response.name, it.response)
writer.writeInteger(OpResponse::statusCode.name, it.statusCode)
writer.endObject()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package com.tencent.bkrepo.ddc.utils

import com.tencent.bkrepo.ddc.serialization.CbFieldType
import com.tencent.bkrepo.ddc.serialization.CbObject
import com.tencent.bkrepo.ddc.serialization.CbWriterBase
import com.tencent.bkrepo.ddc.serialization.VarULong
import com.tencent.bkrepo.ddc.utils.BlakeUtils.OUT_LEN
Expand Down Expand Up @@ -124,3 +125,11 @@ fun CbWriterBase.writeBinary(name: String? = null, value: ByteBuffer) {
fun CbWriterBase.writeBinaryArrayValue(value: ByteArray) = writeBinaryValue(ByteBuffer.wrap(value))

fun CbWriterBase.writeBinaryArray(name: String, value: ByteArray) = writeBinary(name, ByteBuffer.wrap(value))

fun CbWriterBase.writerObject(name: String, value: CbObject) {
val view = value.getView()
// 由于类型信息已经包含在view中,此处需要跳过类型信息
view.position(1)
// 写入数据
writeField(CbFieldType.Object, name, view.remaining()).put(view)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.tencent.bkrepo.ddc.pojo

import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.tencent.bkrepo.ddc.serialization.CbFieldType
import com.tencent.bkrepo.ddc.serialization.CbObject
import com.tencent.bkrepo.ddc.serialization.CbWriter
import com.tencent.bkrepo.ddc.utils.beginUniformArray
import com.tencent.bkrepo.ddc.utils.writeBool
import com.tencent.bkrepo.ddc.utils.writeInteger
import com.tencent.bkrepo.ddc.utils.writeString
import com.tencent.bkrepo.ddc.utils.writerObject
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test

class BatchOpsTest {
@Test
fun test() {
val writer = CbWriter()
writer.beginObject()
writer.beginUniformArray(BatchOps::ops.name, CbFieldType.Object)
writer.beginObject()
writer.writeInteger(BatchOp::opId.name, 0)
writer.writeString(BatchOp::bucket.name, "bucket")
writer.writeString(BatchOp::key.name, "key")
writer.writeString(BatchOp::op.name, Operation.GET.name)
writer.writeBool(BatchOp::resolveAttachments.name, true)
writer.endObject()

writer.beginObject()
writer.writeInteger(BatchOp::opId.name, 0)
writer.writeString(BatchOp::bucket.name, "bucket")
writer.writeString(BatchOp::key.name, "key")
writer.writeString(BatchOp::op.name, Operation.HEAD.name)
writer.endObject()

val payload = CbObject.build { innerWriter -> innerWriter.writeString("test", "test value") }
writer.beginObject()
writer.writeInteger(BatchOp::opId.name, 0)
writer.writeString(BatchOp::bucket.name, "bucket")
writer.writeString(BatchOp::key.name, "key")
writer.writeString(BatchOp::op.name, Operation.PUT.name)
writer.writerObject(BatchOp::payload.name, payload)
writer.writeString(BatchOp::payloadHash.name, "test hash")
writer.endObject()
writer.endArray()
writer.endObject()

val batchOps = BatchOps.deserialize(writer.toByteArray())
println(batchOps.ops[2].payload!!.toJson(jacksonObjectMapper()))
assertEquals(
"{\"test\":\"test value\"}",
batchOps.ops[2].payload!!.toJson(jacksonObjectMapper())
)
}
}

0 comments on commit ebffbe6

Please sign in to comment.