Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Model): add batchWrite Method #290

Merged
merged 4 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 136 additions & 16 deletions src/base-model.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { ObjectSchema } from 'joi';
import { DynamoDBClient, DynamoDBClientConfig } from '@aws-sdk/client-dynamodb';
import { DeleteRequest, DynamoDBClient, DynamoDBClientConfig, PutRequest, WriteRequest } from '@aws-sdk/client-dynamodb';
import {
BatchGetCommand,
BatchGetCommandInput,
BatchWriteCommand,
BatchWriteCommandInput,
BatchWriteCommandOutput,
DeleteCommand,
DeleteCommandInput,
DeleteCommandOutput,
Expand All @@ -28,8 +31,10 @@ type SimpleKey = KeyValue;
type CompositeKey = { pk: KeyValue; sk: KeyValue };
type Keys = SimpleKey[] | CompositeKey[];

const isComposite = (hashKeys_compositeKeys: Keys): hashKeys_compositeKeys is CompositeKey[] =>
(hashKeys_compositeKeys[0] as { pk: string } & unknown).pk !== undefined;
const isCompositeKey = (hashKeys_compositeKeys: Keys): hashKeys_compositeKeys is CompositeKey[] =>
hashKeys_compositeKeys.length > 0 && (hashKeys_compositeKeys[0] as { pk: string } & unknown).pk !== undefined;

const isSimpleKey = (hashKeys_compositeKeys: Keys): hashKeys_compositeKeys is SimpleKey[] => hashKeys_compositeKeys.length > 0 && Model.isKey(hashKeys_compositeKeys[0]);

export default abstract class Model<T> {
protected tableName: string | undefined;
Expand Down Expand Up @@ -115,6 +120,18 @@ export default abstract class Model<T> {
}
}

private isValidItem(item: unknown): item is T {
try {
this.pkValue(item as T);
if (!!this.sk) {
this.skValue(item as T);
}
return true;
} catch (e) {
return false;
}
}

private pkValue(item: T): KeyValue {
return Model.pkValue(item, this.pk);
}
Expand Down Expand Up @@ -448,7 +465,7 @@ export default abstract class Model<T> {
if (!this.pk) {
throw new Error('Primary key is not defined on your model');
}
if (isComposite(keys)) {
if (isCompositeKey(keys)) {
params = {
RequestItems: {
[this.tableName]: {
Expand Down Expand Up @@ -484,17 +501,9 @@ export default abstract class Model<T> {
if (keys.length === 0) {
return [];
}
const splitBatch = <K>(_keys: K[], CHUNK_SIZE = 100): K[][] => {
return _keys.reduce((batches: K[][], item: K, idx) => {
const chunkIndex = Math.floor(idx / CHUNK_SIZE);
if (!batches[chunkIndex]) batches[chunkIndex] = [];
batches[chunkIndex].push(item);
return batches;
}, []);
};
if (isComposite(keys)) {
if (isCompositeKey(keys)) {
// Split these IDs in batch of 100 as it is AWS DynamoDB batchGetItems operation limit
const batches = splitBatch(keys);
const batches = this.splitBatch(keys, 100);
// Perform the batchGet operation for each batch
const responsesBatches: T[][] = await Promise.all(
batches.map((batch: CompositeKey[]) => this.getSingleBatch(batch, options)),
Expand All @@ -503,7 +512,7 @@ export default abstract class Model<T> {
return responsesBatches.reduce((b1, b2) => b1.concat(b2), []);
}
// Split these IDs in batch of 100 as it is AWS DynamoDB batchGetItems operation limit
const batches = splitBatch(keys);
const batches = this.splitBatch(keys, 100);
// Perform the batchGet operation for each batch
const responsesBatches: T[][] = await Promise.all(
batches.map((batch: KeyValue[]) => this.getSingleBatch(batch, options)),
Expand Down Expand Up @@ -582,4 +591,115 @@ export default abstract class Model<T> {
? options
: (sk_options as Partial<GetCommandInput>);
}
}

private splitBatch = <K>(_keys: K[], CHUNK_SIZE: number): K[][] => {
return _keys.reduce((batches: K[][], item: K, idx) => {
const chunkIndex = Math.floor(idx / CHUNK_SIZE);
if (!batches[chunkIndex]) batches[chunkIndex] = [];
batches[chunkIndex].push(item);
return batches;
}, []);
};

/**
* Performs a batch write operation beyond the limit of 25 put or delete operations
* @param items.put: An array of items to be added or updated.
* @param items.delete: Keys or an array of items to be deleted.
* @param options: Additional options supported by AWS document client.
* @returns the result of the batch write operation.
*/
async batchWrite(items: { put: T[], delete: Keys | T[] }, options?: Partial<BatchWriteCommandInput>): Promise<BatchWriteCommandOutput[]> {
if (!this.tableName) {
throw new Error('Table name is not defined on your model');
}
if (!this.pk) {
throw new Error('Primary key is not defined on your model');
}
const writeRequests: WriteRequest[] = [];
//Building put requests
items.put.forEach(item => {
if (!this.isValidItem(item)) {
throw new Error("One of the required keys is missing")
}
if (this.autoUpdatedAt) {
item = {
...item,
updatedAt: new Date().toISOString()
}
}
writeRequests.push({
PutRequest: {
Item: item
} as PutRequest
});
});
//Building delete requests
if (isCompositeKey(items.delete as Keys)) {
(items.delete as CompositeKey[]).forEach(compositeKey => {
writeRequests.push({
DeleteRequest: {
Key: this.buildKeys(compositeKey.pk, compositeKey.sk)
} as DeleteRequest
});
})
} else if (isSimpleKey(items.delete as Keys)) {
(items.delete as SimpleKey[]).forEach(hashKey => {
writeRequests.push({
DeleteRequest: {
Key: this.buildKeys(hashKey)
} as DeleteRequest
});
})
} else {
items.delete.forEach(item => {
const pk = this.pkValue(item as T);
const sk = !!this.sk ? this.skValue(item as T) : undefined;
writeRequests.push({
DeleteRequest: {
Key: this.buildKeys(pk, sk)
} as DeleteRequest
});
})
}
// Split the array of operations into batches of 25
const batches = this.splitBatch(writeRequests, 25);
//Make one BatchWrite request for every batch of 25 operations
let params: BatchWriteCommandInput;
const output: BatchWriteCommandOutput[] = await Promise.all(
batches.map(batch => {
params = {
RequestItems: {
[this.tableName as string]: batch
}
}
if (options) {
Object.assign(params, options);
}
const command = new BatchWriteCommand(params);
return this.documentClient.send(command);
})
);
return output;
}

/**
* Performs a batch put operation beyond the limit of 25 operations
* @param items: An array of items to be added or updated in the database.
* @param options: Additional options supported by AWS document client.
* @returns the result of the batch write operation.
*/
async batchCreate(items: T[], options?: Partial<BatchWriteCommandInput>): Promise<BatchWriteCommandOutput[]> {
return this.batchWrite({ put: items, delete: [] }, options);
}


/**
* Performs a batch delete operation beyond the limit of 25 operations
* @param items: Keys or an array of items to be deleted from the database.
* @param options: Additional options supported by AWS document client.
* @returns the result of the batch write operation.
*/
async batchDelete(items: Keys | T[], options?: Partial<BatchWriteCommandInput>): Promise<BatchWriteCommandOutput[]> {
return this.batchWrite({ put: [], delete: items }, options);
}
}
145 changes: 145 additions & 0 deletions test/batch-write.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import { clearTables } from './hooks/create-tables';
import HashKeyModel, { HashKeyEntity } from './models/hashkey';
import CompositeKeyModel, { CompositeKeyEntity } from './models/composite-keys';
import { generateItems, hashOnly } from './factories';
import { KeyValue } from '../src/base-model';
import TimeTrackedModel from './models/autoCreatedAt-autoUpdatedAt';



const hashModel = new HashKeyModel();
const compositeModel = new CompositeKeyModel();
const timeTrackedModel = new TimeTrackedModel();


const setupTestData = async (nbEntries: number): Promise<Array<KeyValue>> => {
await clearTables();
const items = await hashOnly(hashModel, nbEntries);
return items.map((item) => item.hashkey as KeyValue);
};

const setupCompositeKeyTestData = async (nbEntries: number) => {
await clearTables();
const items = await hashOnly(compositeModel, nbEntries);
return items.map((item) => ({
pk: item.hashkey as KeyValue,
sk: item.rangekey as KeyValue,
}));
};


describe('The batchWrite method', () => {
beforeEach(async () => {
await clearTables();
});

test('should put a batch of items and delete a batch of items by their hashkeys', async () => {
const toSave = generateItems(50) as HashKeyEntity[];
const toDelete = await setupTestData(50);
await hashModel.batchWrite({ put: toSave, delete: toDelete });
const deleteResult = await hashModel.batchGet(toDelete);
const putResult = await hashModel.batchGet(toSave.map(item => item.hashkey));
expect(deleteResult.length).toBe(0);
expect(putResult.length).toBe(50);
});

test('should put a batch of items and delete a batch of items by their compositekeys', async () => {
const toSave = generateItems(50) as CompositeKeyEntity[];
const toDelete = await setupCompositeKeyTestData(50);
await compositeModel.batchWrite({ put: toSave, delete: toDelete });
const deleteResult = await compositeModel.batchGet(toDelete);
const putResult = await compositeModel.batchGet(toSave.map(item => ({ pk: item.hashkey, sk: item.rangekey })));
expect(deleteResult.length).toBe(0);
expect(putResult.length).toBe(50);
});

test('should put and delete a batch of items', async () => {
const toSave = generateItems(50) as HashKeyEntity[];
const keys = await setupTestData(50);
const toDelete = await hashModel.batchGet(keys);
await hashModel.batchWrite({ put: toSave, delete: toDelete });
const putResult = await hashModel.batchGet(toSave.map(item => item.hashkey));
const deleteResult = await hashModel.batchGet(keys);
expect(deleteResult.length).toBe(0);
expect(putResult.length).toBe(50);
});

})

describe('The batchDelete method', () => {
beforeEach(async () => {
await clearTables();
});

test('should delete a batch of items by their hashkeys', async () => {
const keys = await setupTestData(50);
await hashModel.batchDelete(keys)
const result = await hashModel.batchGet(keys);
expect(result.length).toBe(0);
})

test('should delete a batch of items by their compositekeys', async () => {
const keys = await setupCompositeKeyTestData(50);
await compositeModel.batchDelete(keys);
const result = await compositeModel.batchGet(keys);
expect(result.length).toBe(0);
})

test('should delete a batch of items', async () => {
const keys = await setupTestData(50);
const toDelete = await hashModel.batchGet(keys);
await hashModel.batchDelete(toDelete);
const deleteResult = await hashModel.batchGet(keys);
expect(deleteResult.length).toBe(0);
})
})

describe('The batchCreate method', () => {
beforeEach(async () => {
await clearTables();
});

test('should throw an error if hash key is not given', async () => {
let toSave = generateItems(50) as HashKeyEntity[];
toSave = toSave.map(item => {
const { hashkey, ...rest } = item;

Check warning on line 105 in test/batch-write.spec.ts

View workflow job for this annotation

GitHub Actions / test

'hashkey' is assigned a value but never used

Check warning on line 105 in test/batch-write.spec.ts

View workflow job for this annotation

GitHub Actions / lint

'hashkey' is assigned a value but never used
return rest;
}) as HashKeyEntity[];
try {
await hashModel.batchCreate(toSave);
} catch (e) {
expect((e as Error).message.includes('One of the required keys is missing')).toBe(true);
}
})

test('should throw an error if range key is not given', async () => {
let toSave = generateItems(2) as CompositeKeyEntity[];
toSave = toSave.map(item => {
const { rangekey, ...rest } = item;

Check warning on line 118 in test/batch-write.spec.ts

View workflow job for this annotation

GitHub Actions / test

'rangekey' is assigned a value but never used

Check warning on line 118 in test/batch-write.spec.ts

View workflow job for this annotation

GitHub Actions / lint

'rangekey' is assigned a value but never used
return rest;
}) as CompositeKeyEntity[];
try {
await compositeModel.batchCreate(toSave);
} catch (e) {
expect((e as Error).message.includes('One of the required keys is missing')).toBe(true);
}
})

test('should put a batch of items', async () => {
const toSave = generateItems(50) as HashKeyEntity[];
await hashModel.batchCreate(toSave);
const putResult = await hashModel.batchGet(toSave.map(item => item.hashkey));
const allHaveUpdatedAtProperty = putResult.every(item => item.hasOwnProperty('updatedAt'));
expect(putResult.length).toBe(50);
expect(allHaveUpdatedAtProperty).toBe(false);
})

test('should put a batch of items and add the updatedAt field when autoUpdatedAt is enabled', async () => {
const toSave = generateItems(50) as HashKeyEntity[];
await timeTrackedModel.batchCreate(toSave);
const putResult = await timeTrackedModel.batchGet(toSave.map(item => item.hashkey));
const allHaveUpdatedAtProperty = putResult.every(item => item.hasOwnProperty('updatedAt'));
expect(putResult.length).toBe(50);
expect(allHaveUpdatedAtProperty).toBe(true);
})
})
14 changes: 14 additions & 0 deletions test/factories/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,17 @@ export const hashOnly = async (
const results = await Promise.all(promises);
return results.map((r) => r);
};


export const generateItems = (nbEntries: number, numericalKeys?: boolean) => {
const items: Array<CommonFields & { rangekey: string | number } & { hashkey: string | number }> = [];
for (let i = 0; i < nbEntries; i += 1) {
const item: CommonFields & { rangekey: string | number } & { hashkey: string | number } = {
hashkey: numericalKeys ? i : `_hashkey-${i}`,
...generatePartial(i, numericalKeys),
rangekey: numericalKeys ? i : `_rangekey-${i}`,
};
items.push(item);
}
return items;
}
Loading