Skip to content

Commit

Permalink
Merge pull request #28 from streamich/concurrency-decorator
Browse files Browse the repository at this point in the history
Concurrency decorator
  • Loading branch information
streamich authored Mar 11, 2024
2 parents 225ede3 + 24bc03b commit 0d1c495
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 1 deletion.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ Useful TypeScript utilities. [__Documentation__](https://streamich.github.io/thi
---

- `concurrency` — limits the number of concurrent executions of asynchronous
code.
code. `concurrencyDecorator` limits the number of concurrent executions of a
class method.

---

Expand Down
117 changes: 117 additions & 0 deletions src/__tests__/concurrencyDecorator.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import {concurrency} from '../concurrencyDecorator';
import {tick} from '../tick';

test('can execute one function with limit 1', async () => {
const res: number[] = [];
class A {
@concurrency(1)
async create(value: number) {
res.push(value);
}
}
const a = new A();
await a.create(123);
expect(res).toStrictEqual([123]);
await a.create(456);
expect(res).toStrictEqual([123, 456]);
await Promise.all([a.create(1), a.create(2)]);
expect(res).toStrictEqual([123, 456, 1, 2]);
});

test('can execute one function with limit 10', async () => {
const res: number[] = [];
class A {
@concurrency(10)
async create(value: number) {
res.push(value);
}
}
const a = new A();
await a.create(123);
expect(res).toStrictEqual([123]);
await a.create(456);
expect(res).toStrictEqual([123, 456]);
await Promise.all([a.create(1)]);
expect(res).toStrictEqual([123, 456, 1]);
});

describe('limits concurrency to 1', () => {
for (let i = 0; i < 10; i++) {
test(`${i + 1}`, async () => {
const res: number[] = [];
class A {
@concurrency(1)
async create(value: number) {
await tick(Math.round(Math.random() * 10) + 1);
res.push(value);
}
}
const a = new A();
await Promise.all([a.create(1), a.create(2), a.create(3), a.create(4), a.create(5)]);
expect(res).toStrictEqual([1, 2, 3, 4, 5]);
});
}
});

describe('check concurrency in-flight', () => {
for (let limit = 1; limit <= 6; limit++) {
describe(`limits concurrency to ${limit}`, () => {
for (let i = 0; i < 10; i++) {
test(`${i + 1}`, async () => {
const running: boolean[] = [];
const assert = async () => {
const count = running.filter(Boolean).length;
if (count > limit) throw new Error('Too many running');
};
class A {
@concurrency(limit)
async create(index: number) {
running[index] = false;
return async () => {
running[index] = true;
await assert();
await tick(Math.round(Math.random() * 10) + 1);
await assert();
running[index] = false;
};
}
}
const a = new A();
const promises: Promise<any>[] = [];
for (let i = 0; i < limit * 2; i++) {
promises.push(a.create(i));
}
await Promise.all(promises);
});
}
});
}
});

describe('check execution order', () => {
for (let limit = 1; limit <= 6; limit++) {
describe(`limits concurrency to ${limit}`, () => {
for (let i = 0; i < 10; i++) {
test(`${i + 1}`, async () => {
let expectedIndex: number = 0;
class A {
@concurrency(limit)
async create(index: number) {
return async () => {
if (index !== expectedIndex) throw new Error('Wrong order');
expectedIndex++;
await tick(Math.round(Math.random() * 10) + 1);
};
}
}
const a = new A();
const promises: Promise<any>[] = [];
for (let i = 0; i < limit * 2; i++) {
promises.push(a.create(i));
}
await Promise.all(promises);
});
}
});
}
});
18 changes: 18 additions & 0 deletions src/concurrencyDecorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import {concurrency as _concurrency} from './concurrency';

/**
* A class method decorator that limits the concurrency of the method to the
* given number of parallel executions. All invocations are queued and executed
* in the order they were called.
*/
export function concurrency<This, Args extends any[], Return>(limit: number) {
return (
target: (this: This, ...args: Args) => Promise<Return>,
context?: ClassMethodDecoratorContext<This, (this: This, ...args: Args) => Promise<Return>>,
) => {
const limiter = _concurrency(limit);
return async function (this: This, ...args: Args): Promise<Return> {
return limiter(async () => await target.call(this, ...args));
};
};
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export * from './base64';
export * from './Cache';
export * from './codeMutex';
export * from './concurrency';
export {concurrency as concurrencyDecorator} from './concurrencyDecorator';
export * from './dataUri';
export * from './Defer';
export * from './fanout';
Expand Down

0 comments on commit 0d1c495

Please sign in to comment.