Skip to content

Commit

Permalink
Merge pull request #170 from oliver-oloughlin/feature/optional.atomic…
Browse files Browse the repository at this point in the history
…-batching

Feature/optional.atomic batching
  • Loading branch information
oliver-oloughlin authored Feb 9, 2024
2 parents bad472d + 28345f8 commit 97ea549
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 13 deletions.
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ _Supported Deno verisons:_ **^1.40.0**
- [Zod](#zod)
- [zodModel()](#zodmodel)
- [Kv-Schemas](#kv-schemas)
- [Blob Storage](#blob-storage)
- [Development](#development)
- [License](#license)

Expand Down Expand Up @@ -1293,6 +1294,25 @@ const PostSchema = z.object({
})
```

## Blob Storage

To store large blob sizes, and bypass the data limit of a single atomic
operation, a combination of serialized collections and the `atomicBatchSize`
option for write operations can be used.

```ts
import { collection, kvdex, model } from "jsr:@olli/kvdex"

const kv = await Deno.openKv()
const db = kvdex(kv, {
blobs: collection(model<Uint8Array>(), { serialize: "json" }),
})

const blob = // read from disk, etc.

const result = await db.blobs.add(blob, { atomicBatchSize: 10 })
```

## Development

Any contributions are welcomed and appreciated. How to contribute:
Expand Down
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@olli/kvdex",
"version": "0.33.0",
"version": "0.33.1",
"exports": {
".": "./mod.ts",
"./ext/zod": "./ext/zod.ts"
Expand Down
63 changes: 63 additions & 0 deletions src/atomic_pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import type { AtomicSetOptions } from "./types.ts"

export class AtomicPool implements Deno.AtomicOperation {
private pool: Array<(op: Deno.AtomicOperation) => Deno.AtomicOperation>

constructor() {
this.pool = []
}

set(key: Deno.KvKey, value: unknown, options?: AtomicSetOptions) {
this.pool.push((op) => op.set(key, value, options))
return this
}

delete(key: Deno.KvKey) {
this.pool.push((op) => op.delete(key))
return this
}

mutate(...mutations: Deno.KvMutation[]) {
this.pool.push((op) => op.mutate(...mutations))
return this
}

check(...checks: Deno.AtomicCheck[]) {
this.pool.push((op) => op.check(...checks))
return this
}

sum(key: Deno.KvKey, n: bigint) {
this.pool.push((op) => op.sum(key, n))
return this
}

max(key: Deno.KvKey, n: bigint) {
this.pool.push((op) => op.max(key, n))
return this
}

min(key: Deno.KvKey, n: bigint): this {
this.pool.push((op) => op.min(key, n))
return this
}

enqueue(
value: unknown,
options?: {
delay?: number | undefined
keysIfUndelivered?: Deno.KvKey[] | undefined
} | undefined,
) {
this.pool.push((op) => op.enqueue(value, options))
return this
}

commit(): Promise<Deno.KvCommitResult | Deno.KvCommitError> {
throw Error("Not Implemented")
}

bindTo(atomic: Deno.AtomicOperation) {
this.pool.forEach((mutation) => mutation(atomic))
}
}
4 changes: 2 additions & 2 deletions src/atomic_wrapper.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ATOMIC_OPERATION_MUTATION_LIMIT } from "./constants.ts"
import type { SetOptions } from "./types.ts"
import type { AtomicSetOptions } from "./types.ts"
import { clamp } from "./utils.ts"

export class AtomicWrapper implements Deno.AtomicOperation {
Expand All @@ -24,7 +24,7 @@ export class AtomicWrapper implements Deno.AtomicOperation {
)
}

set(key: Deno.KvKey, value: unknown, options?: SetOptions) {
set(key: Deno.KvKey, value: unknown, options?: AtomicSetOptions) {
this.addMutation((op) => op.set(key, value, options))
return this
}
Expand Down
80 changes: 71 additions & 9 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import type {
import {
allFulfilled,
checkIndices,
clamp,
compress,
createHandlerId,
createListOptions,
Expand All @@ -64,6 +65,7 @@ import {
setIndices,
} from "./utils.ts"
import {
ATOMIC_OPERATION_MUTATION_LIMIT,
DEFAULT_UPDATE_STRATEGY,
HISTORY_KEY_PREFIX,
ID_KEY_PREFIX,
Expand All @@ -75,6 +77,7 @@ import {
UNDELIVERED_KEY_PREFIX,
} from "./constants.ts"
import { AtomicWrapper } from "./atomic_wrapper.ts"
import { AtomicPool } from "./atomic_pool.ts"
import { Document } from "./document.ts"
import { model } from "./model.ts"
import { concat, deepMerge, ulid } from "./deps.ts"
Expand Down Expand Up @@ -1905,15 +1908,20 @@ export class Collection<
options: SetOptions | undefined,
): Promise<CommitResult<TOutput> | Deno.KvCommitError> {
// Initialize atomic operation and keys list
const atomic = this.kv.atomic()
const ids: KvId[] = []
let docValue: any = value
const isUint8Array = value instanceof Uint8Array
const timeId = ulid()

const atomicBatchSize = options?.atomicBatchSize &&
clamp(1, options?.atomicBatchSize, ATOMIC_OPERATION_MUTATION_LIMIT)

const operationPool = new AtomicPool()
const indexOperationPool = new AtomicPool()

// Check for id collision
if (!options?.overwrite) {
atomic.check({
operationPool.check({
key: idKey,
versionstamp: null,
})
Expand All @@ -1932,7 +1940,8 @@ export class Collection<
const part = compressed.subarray(i, i + UINT8ARRAY_LENGTH_LIMIT)
const key = extendKey(this._keys.segment, docId, index)
ids.push(index)
atomic.set(key, part, options)

operationPool.set(key, part, options)

// Set history segments if keeps history
if (this._keepsHistory) {
Expand All @@ -1943,7 +1952,7 @@ export class Collection<
index,
)

atomic.set(historySegmentKey, part)
operationPool.set(historySegmentKey, part)
}

index++
Expand All @@ -1959,7 +1968,7 @@ export class Collection<
}

// Set document entry
atomic.set(idKey, docValue, options)
operationPool.set(idKey, docValue, options)

// Set history entry if keeps history
if (this._keepsHistory) {
Expand All @@ -1971,7 +1980,7 @@ export class Collection<
value: docValue,
}

atomic.set(historyKey, historyEntry)
operationPool.set(historyKey, historyEntry)
}

// Set indices if is indexable
Expand All @@ -1980,24 +1989,77 @@ export class Collection<
docId,
value as KvObject,
docValue,
atomic,
indexOperationPool,
this,
options,
)
}

// Commit atomic operation
const cr = await atomic.commit()
// Initialize index check, commit result and atomic operation
let indexCheck = false
let cr: Deno.KvCommitResult | Deno.KvCommitError = { ok: false }

const atomic = atomicBatchSize
? new AtomicWrapper(this.kv, atomicBatchSize)
: this.kv.atomic()

// Perform index mutations first if operation is batched, else bind all mutations to main operation
if (atomicBatchSize) {
const indexAtomic = this.kv.atomic()
indexOperationPool.bindTo(indexAtomic)
const indexCr = await indexAtomic.commit()
indexCheck = indexCr.ok
} else {
indexOperationPool.bindTo(atomic)
}

// Bind remaining mutations to main operation
operationPool.bindTo(atomic)

// Commit operation if not batched or if index setters completed successfully
if (!atomicBatchSize || indexCheck) {
cr = await atomic.commit()
}

// Handle failed operation
if (!cr.ok) {
// Delete any entries upon failed batched operation
if (atomicBatchSize && indexCheck) {
const failedAtomic = new AtomicWrapper(this.kv, atomicBatchSize)

if (this._keepsHistory) {
const historyKey = extendKey(this._keys.history, docId, timeId)
failedAtomic.delete(historyKey)
}

if (this._isSerialized) {
const { ids } = docValue as SerializedEntry
ids.forEach((id) =>
failedAtomic.delete(extendKey(this._keys.segment, docId, id))
)
}

if (this._isIndexable) {
deleteIndices(
docId,
value as KvObject,
failedAtomic,
this,
)
}

await failedAtomic.commit()
}

// Return commit error if no remaining retry attempts
const retry = options?.retry ?? 0
if (!retry) {
return {
ok: false,
}
}

// Retry operation and decrement retry count
return await this.setDoc(docId, idKey, value, {
...options,
retry: retry - 1,
Expand Down
9 changes: 8 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,13 @@ export type SetOptions = NonNullable<Parameters<Deno.Kv["set"]>["2"]> & {
* @default false
*/
overwrite?: boolean

/**
* The max number of mutations to be batched in a single atomic operation.
*
* If not set, the operation will not be batched, and will pool all mutations into a single atomic operation.
*/
atomicBatchSize?: number
}

export type ListOptions<T> = Deno.KvListOptions & {
Expand Down Expand Up @@ -393,7 +400,7 @@ export type HandleManyOptions<T> = ListOptions<T> & {
}

export type AtomicBatchOptions = {
/** Batch size of atomic operations where applicable */
/** Max number of mutations to be batched in a single atomic operation */
atomicBatchSize?: number
}

Expand Down

0 comments on commit 97ea549

Please sign in to comment.