diff --git a/README.md b/README.md index 207a7eb..24d16de 100644 --- a/README.md +++ b/README.md @@ -78,7 +78,8 @@ Useful TypeScript utilities. [__Documentation__](https://streamich.github.io/thi --- - `concurrency` — limits the number of concurrent executions of asynchronous - code. + code. `concurrencyDecorator` limits the number of concurrent executions of a + class method. --- diff --git a/src/__tests__/concurrencyDecorator.spec.ts b/src/__tests__/concurrencyDecorator.spec.ts new file mode 100644 index 0000000..0e10fcf --- /dev/null +++ b/src/__tests__/concurrencyDecorator.spec.ts @@ -0,0 +1,117 @@ +import {concurrency} from '../concurrencyDecorator'; +import {tick} from '../tick'; + +test('can execute one function with limit 1', async () => { + const res: number[] = []; + class A { + @concurrency(1) + async create(value: number) { + res.push(value); + } + } + const a = new A(); + await a.create(123); + expect(res).toStrictEqual([123]); + await a.create(456); + expect(res).toStrictEqual([123, 456]); + await Promise.all([a.create(1), a.create(2)]); + expect(res).toStrictEqual([123, 456, 1, 2]); +}); + +test('can execute one function with limit 10', async () => { + const res: number[] = []; + class A { + @concurrency(10) + async create(value: number) { + res.push(value); + } + } + const a = new A(); + await a.create(123); + expect(res).toStrictEqual([123]); + await a.create(456); + expect(res).toStrictEqual([123, 456]); + await Promise.all([a.create(1)]); + expect(res).toStrictEqual([123, 456, 1]); +}); + +describe('limits concurrency to 1', () => { + for (let i = 0; i < 10; i++) { + test(`${i + 1}`, async () => { + const res: number[] = []; + class A { + @concurrency(1) + async create(value: number) { + await tick(Math.round(Math.random() * 10) + 1); + res.push(value); + } + } + const a = new A(); + await Promise.all([a.create(1), a.create(2), a.create(3), a.create(4), a.create(5)]); + expect(res).toStrictEqual([1, 2, 3, 4, 5]); + }); + } +}); + +describe('check concurrency in-flight', () => { + for (let limit = 1; limit <= 6; limit++) { + describe(`limits concurrency to ${limit}`, () => { + for (let i = 0; i < 10; i++) { + test(`${i + 1}`, async () => { + const running: boolean[] = []; + const assert = async () => { + const count = running.filter(Boolean).length; + if (count > limit) throw new Error('Too many running'); + }; + class A { + @concurrency(limit) + async create(index: number) { + running[index] = false; + return async () => { + running[index] = true; + await assert(); + await tick(Math.round(Math.random() * 10) + 1); + await assert(); + running[index] = false; + }; + } + } + const a = new A(); + const promises: Promise[] = []; + for (let i = 0; i < limit * 2; i++) { + promises.push(a.create(i)); + } + await Promise.all(promises); + }); + } + }); + } +}); + +describe('check execution order', () => { + for (let limit = 1; limit <= 6; limit++) { + describe(`limits concurrency to ${limit}`, () => { + for (let i = 0; i < 10; i++) { + test(`${i + 1}`, async () => { + let expectedIndex: number = 0; + class A { + @concurrency(limit) + async create(index: number) { + return async () => { + if (index !== expectedIndex) throw new Error('Wrong order'); + expectedIndex++; + await tick(Math.round(Math.random() * 10) + 1); + }; + } + } + const a = new A(); + const promises: Promise[] = []; + for (let i = 0; i < limit * 2; i++) { + promises.push(a.create(i)); + } + await Promise.all(promises); + }); + } + }); + } +}); diff --git a/src/concurrencyDecorator.ts b/src/concurrencyDecorator.ts new file mode 100644 index 0000000..e068ec6 --- /dev/null +++ b/src/concurrencyDecorator.ts @@ -0,0 +1,18 @@ +import {concurrency as _concurrency} from './concurrency'; + +/** + * A class method decorator that limits the concurrency of the method to the + * given number of parallel executions. All invocations are queued and executed + * in the order they were called. + */ +export function concurrency(limit: number) { + return ( + target: (this: This, ...args: Args) => Promise, + context?: ClassMethodDecoratorContext Promise>, + ) => { + const limiter = _concurrency(limit); + return async function (this: This, ...args: Args): Promise { + return limiter(async () => await target.call(this, ...args)); + }; + }; +} diff --git a/src/index.ts b/src/index.ts index 44e4d97..3370799 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,6 +2,7 @@ export * from './base64'; export * from './Cache'; export * from './codeMutex'; export * from './concurrency'; +export {concurrency as concurrencyDecorator} from './concurrencyDecorator'; export * from './dataUri'; export * from './Defer'; export * from './fanout';