diff --git a/README.md b/README.md index 32b862b..77ee7a0 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,7 @@ _Supported Deno verisons:_ **^1.40.0** - [Zod](#zod) - [zodModel()](#zodmodel) - [Kv-Schemas](#kv-schemas) + - [Blob Storage](#blob-storage) - [Development](#development) - [License](#license) @@ -1293,6 +1294,25 @@ const PostSchema = z.object({ }) ``` +## Blob Storage + +To store large blob sizes, and bypass the data limit of a single atomic +operation, a combination of serialized collections and the `atomicBatchSize` +option for write operations can be used. + +```ts +import { collection, kvdex, model } from "jsr:@olli/kvdex" + +const kv = await Deno.openKv() +const db = kvdex(kv, { + blobs: collection(model(), { serialize: "json" }), +}) + +const blob = // read from disk, etc. + +const result = await db.blobs.add(blob, { atomicBatchSize: 10 }) +``` + ## Development Any contributions are welcomed and appreciated. How to contribute: diff --git a/deno.json b/deno.json index 7bdb3ef..8307655 100644 --- a/deno.json +++ b/deno.json @@ -1,6 +1,6 @@ { "name": "@olli/kvdex", - "version": "0.33.0", + "version": "0.33.1", "exports": { ".": "./mod.ts", "./ext/zod": "./ext/zod.ts" diff --git a/src/atomic_pool.ts b/src/atomic_pool.ts new file mode 100644 index 0000000..07327f7 --- /dev/null +++ b/src/atomic_pool.ts @@ -0,0 +1,63 @@ +import type { AtomicSetOptions } from "./types.ts" + +export class AtomicPool implements Deno.AtomicOperation { + private pool: Array<(op: Deno.AtomicOperation) => Deno.AtomicOperation> + + constructor() { + this.pool = [] + } + + set(key: Deno.KvKey, value: unknown, options?: AtomicSetOptions) { + this.pool.push((op) => op.set(key, value, options)) + return this + } + + delete(key: Deno.KvKey) { + this.pool.push((op) => op.delete(key)) + return this + } + + mutate(...mutations: Deno.KvMutation[]) { + this.pool.push((op) => op.mutate(...mutations)) + return this + } + + check(...checks: Deno.AtomicCheck[]) { + this.pool.push((op) => op.check(...checks)) + return this + } + + sum(key: Deno.KvKey, n: bigint) { + this.pool.push((op) => op.sum(key, n)) + return this + } + + max(key: Deno.KvKey, n: bigint) { + this.pool.push((op) => op.max(key, n)) + return this + } + + min(key: Deno.KvKey, n: bigint): this { + this.pool.push((op) => op.min(key, n)) + return this + } + + enqueue( + value: unknown, + options?: { + delay?: number | undefined + keysIfUndelivered?: Deno.KvKey[] | undefined + } | undefined, + ) { + this.pool.push((op) => op.enqueue(value, options)) + return this + } + + commit(): Promise { + throw Error("Not Implemented") + } + + bindTo(atomic: Deno.AtomicOperation) { + this.pool.forEach((mutation) => mutation(atomic)) + } +} diff --git a/src/atomic_wrapper.ts b/src/atomic_wrapper.ts index e37d253..1884621 100644 --- a/src/atomic_wrapper.ts +++ b/src/atomic_wrapper.ts @@ -1,5 +1,5 @@ import { ATOMIC_OPERATION_MUTATION_LIMIT } from "./constants.ts" -import type { SetOptions } from "./types.ts" +import type { AtomicSetOptions } from "./types.ts" import { clamp } from "./utils.ts" export class AtomicWrapper implements Deno.AtomicOperation { @@ -24,7 +24,7 @@ export class AtomicWrapper implements Deno.AtomicOperation { ) } - set(key: Deno.KvKey, value: unknown, options?: SetOptions) { + set(key: Deno.KvKey, value: unknown, options?: AtomicSetOptions) { this.addMutation((op) => op.set(key, value, options)) return this } diff --git a/src/collection.ts b/src/collection.ts index 1773a9e..f9f559c 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -43,6 +43,7 @@ import type { import { allFulfilled, checkIndices, + clamp, compress, createHandlerId, createListOptions, @@ -64,6 +65,7 @@ import { setIndices, } from "./utils.ts" import { + ATOMIC_OPERATION_MUTATION_LIMIT, DEFAULT_UPDATE_STRATEGY, HISTORY_KEY_PREFIX, ID_KEY_PREFIX, @@ -75,6 +77,7 @@ import { UNDELIVERED_KEY_PREFIX, } from "./constants.ts" import { AtomicWrapper } from "./atomic_wrapper.ts" +import { AtomicPool } from "./atomic_pool.ts" import { Document } from "./document.ts" import { model } from "./model.ts" import { concat, deepMerge, ulid } from "./deps.ts" @@ -1905,15 +1908,20 @@ export class Collection< options: SetOptions | undefined, ): Promise | Deno.KvCommitError> { // Initialize atomic operation and keys list - const atomic = this.kv.atomic() const ids: KvId[] = [] let docValue: any = value const isUint8Array = value instanceof Uint8Array const timeId = ulid() + const atomicBatchSize = options?.atomicBatchSize && + clamp(1, options?.atomicBatchSize, ATOMIC_OPERATION_MUTATION_LIMIT) + + const operationPool = new AtomicPool() + const indexOperationPool = new AtomicPool() + // Check for id collision if (!options?.overwrite) { - atomic.check({ + operationPool.check({ key: idKey, versionstamp: null, }) @@ -1932,7 +1940,8 @@ export class Collection< const part = compressed.subarray(i, i + UINT8ARRAY_LENGTH_LIMIT) const key = extendKey(this._keys.segment, docId, index) ids.push(index) - atomic.set(key, part, options) + + operationPool.set(key, part, options) // Set history segments if keeps history if (this._keepsHistory) { @@ -1943,7 +1952,7 @@ export class Collection< index, ) - atomic.set(historySegmentKey, part) + operationPool.set(historySegmentKey, part) } index++ @@ -1959,7 +1968,7 @@ export class Collection< } // Set document entry - atomic.set(idKey, docValue, options) + operationPool.set(idKey, docValue, options) // Set history entry if keeps history if (this._keepsHistory) { @@ -1971,7 +1980,7 @@ export class Collection< value: docValue, } - atomic.set(historyKey, historyEntry) + operationPool.set(historyKey, historyEntry) } // Set indices if is indexable @@ -1980,17 +1989,69 @@ export class Collection< docId, value as KvObject, docValue, - atomic, + indexOperationPool, this, options, ) } - // Commit atomic operation - const cr = await atomic.commit() + // Initialize index check, commit result and atomic operation + let indexCheck = false + let cr: Deno.KvCommitResult | Deno.KvCommitError = { ok: false } + + const atomic = atomicBatchSize + ? new AtomicWrapper(this.kv, atomicBatchSize) + : this.kv.atomic() + + // Perform index mutations first if operation is batched, else bind all mutations to main operation + if (atomicBatchSize) { + const indexAtomic = this.kv.atomic() + indexOperationPool.bindTo(indexAtomic) + const indexCr = await indexAtomic.commit() + indexCheck = indexCr.ok + } else { + indexOperationPool.bindTo(atomic) + } + + // Bind remaining mutations to main operation + operationPool.bindTo(atomic) + + // Commit operation if not batched or if index setters completed successfully + if (!atomicBatchSize || indexCheck) { + cr = await atomic.commit() + } // Handle failed operation if (!cr.ok) { + // Delete any entries upon failed batched operation + if (atomicBatchSize && indexCheck) { + const failedAtomic = new AtomicWrapper(this.kv, atomicBatchSize) + + if (this._keepsHistory) { + const historyKey = extendKey(this._keys.history, docId, timeId) + failedAtomic.delete(historyKey) + } + + if (this._isSerialized) { + const { ids } = docValue as SerializedEntry + ids.forEach((id) => + failedAtomic.delete(extendKey(this._keys.segment, docId, id)) + ) + } + + if (this._isIndexable) { + deleteIndices( + docId, + value as KvObject, + failedAtomic, + this, + ) + } + + await failedAtomic.commit() + } + + // Return commit error if no remaining retry attempts const retry = options?.retry ?? 0 if (!retry) { return { @@ -1998,6 +2059,7 @@ export class Collection< } } + // Retry operation and decrement retry count return await this.setDoc(docId, idKey, value, { ...options, retry: retry - 1, diff --git a/src/types.ts b/src/types.ts index 1f3613e..24d3206 100644 --- a/src/types.ts +++ b/src/types.ts @@ -363,6 +363,13 @@ export type SetOptions = NonNullable["2"]> & { * @default false */ overwrite?: boolean + + /** + * The max number of mutations to be batched in a single atomic operation. + * + * If not set, the operation will not be batched, and will pool all mutations into a single atomic operation. + */ + atomicBatchSize?: number } export type ListOptions = Deno.KvListOptions & { @@ -393,7 +400,7 @@ export type HandleManyOptions = ListOptions & { } export type AtomicBatchOptions = { - /** Batch size of atomic operations where applicable */ + /** Max number of mutations to be batched in a single atomic operation */ atomicBatchSize?: number }