Skip to content

Commit

Permalink
Release (#5)
Browse files Browse the repository at this point in the history
* docs: add Acknowledgments section to README.md

* fix: tsconfig.json with importHelpers and baseUrl changes

* fix: concurrency.ts with Pulse import statement and job definitions

* refactor: define function in define.ts to accept optional options parameter

* fix: JobAttributes interface in index.ts to allow any type for the 'data' property
  • Loading branch information
code-xhyun authored Apr 11, 2024
1 parent 5b2a9e1 commit 8495aac
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 74 deletions.
22 changes: 19 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
- [Project Roadmap](#project-roadmap)
- [Contributing](#contributing)
- [License](#license)
- [Acknowledgments](#acknowledgments)
</details>
<hr>

Expand All @@ -41,6 +42,8 @@
Pulse is a new fork of the [Agenda](https://github.com/agenda/agenda) project, created as the original project is no longer actively maintained. Positioned as a vital solution in the Node.js ecosystem for job scheduling, the hiatus of Agenda prompted the creation of Pulse. Utilizing MongoDB, Pulse introduces advanced functionalities, improved scalability, and contemporary features to address today’s complex scheduling challenges.

---
<br/>
<br/>



Expand All @@ -55,6 +58,8 @@ Pulse is a new fork of the [Agenda](https://github.com/agenda/agenda) project, c
- **Extensive Documentation**: Provides detailed guides and examples for a quick and easy start.

---
<br/>
<br/>

## Repository Structure

Expand All @@ -78,6 +83,8 @@ Pulse is a new fork of the [Agenda](https://github.com/agenda/agenda) project, c
```

---
<br/>
<br/>

## Modules

Expand Down Expand Up @@ -154,14 +161,15 @@ Pulse is a new fork of the [Agenda](https://github.com/agenda/agenda) project, c
</details>

---
<br/>
<br/>

## Getting Started



#### Installation

>
```console
$ npm install --save @pulsecron/pulse
```
Expand All @@ -170,8 +178,6 @@ Pulse is a new fork of the [Agenda](https://github.com/agenda/agenda) project, c

#### Example

>
>
```typescript
/**
* @file Illustrate concurrency and locking
Expand Down Expand Up @@ -242,6 +248,8 @@ pulse.define(


---
<br/>
<br/>

## Project Roadmap

Expand All @@ -251,6 +259,8 @@ pulse.define(
- [ ] **Rewrite Test Code**: Revamp our testing suite to increase coverage and ensure tests are up-to-date with modern testing practices. This rewrite aims to enhance test reliability and efficiency, facilitating smoother development and deployment cycles.
- [ ] **Rewrite Documentation**: Completely revise and update the documentation to reflect all new changes and features, ensure clarity of information, and improve navigation and readability for developers. This effort will include new getting started guides, API documentation, and use case examples to facilitate easier adoption and implementation by users.
---
<br/>
<br/>

## Contributing

Expand Down Expand Up @@ -296,11 +306,17 @@ Contributions are welcome! Here are several ways you can contribute:
</details>

---
<br/>
<br/>

## License

This project is protected under the [MIT](https://github.com/pulsecron/pulse?tab=MIT-1-ov-file#readme) License. For more details, refer to the [LICENSE](https://github.com/pulsecron/pulse?tab=MIT-1-ov-file#readme) file.

---
<br/>
<br/>


## Acknowledgments
- Pulse was forked from [Agenda](https://github.com/agenda/agenda) by [@pulsecron](https://github.com/pulsecron).
116 changes: 67 additions & 49 deletions examples/concurrency.ts
Original file line number Diff line number Diff line change
@@ -1,64 +1,82 @@
/**
* @file Illustrate concurrency and locking
*/
import Pulse from '../dist';
import Pulse from '@pulsecron/pulse';

function time() {
return new Date().toTimeString().split(' ')[0];
}
const mongoConnectionString = 'mongodb://localhost:27017/pulse';

function sleep(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
const pulse = new Pulse({ db: { address: mongoConnectionString } });

const pulse = new Pulse({
db: {
address: 'mongodb://localhost:27017/pulse-concurrency',
collection: `pulseJobs-${Math.random()}`,
},
});
// Or override the default collection name:
// const pulse = new Pulse({db: {address: mongoConnectionString, collection: 'jobCollectionName'}});

let jobRunCount = 1;
pulse.define(
'long-running job',
{
lockLifetime: 5 * 1000, // Max amount of time the job should take
concurrency: 3, // Max number of job instances to run at the same time
},
async (job, done) => {
const thisJob = jobRunCount++;
console.log(`#${thisJob} started`);
// or pass additional connection options:
// const pulse = new Pulse({db: {address: mongoConnectionString, collection: 'jobCollectionName', options: {ssl: true}}});

// 3 job instances will be running at the same time, as specified by `concurrency` above
await sleep(30 * 1000);
// Comment the job processing statement above, and uncomment one of the blocks below
// or pass in an existing mongodb-native MongoClient instance
// const pulse = new Pulse({mongo: myMongoClient});

/**
* Example of defining a job
*/
pulse.define('delete old users', async (job) => {
console.log('Deleting old users...');
return;
});

/**
* Example of repeating a job
*/
(async function () {
// IIFE to give access to async/await
await pulse.start();

await pulse.every('3 minutes', 'delete old users');

// Only one job will run at a time because 3000 < lockLifetime
// await sleep(3 * 1000);
// Alternatively, you could also do:
await pulse.every('*/3 * * * *', 'delete old users');
})();

console.log(`#${thisJob} finished`);
done();
}
/**
* Example of defining a job with options
*/
pulse.define(
'send email report',
async (job) => {
const { to } = job.attrs.data;

console.log(`Sending email report to ${to}`);
},
{ lockLifetime: 5 * 1000, priority: 'high', concurrency: 10 }
);

/**
* Example of scheduling a job
*/
(async function () {
console.log(time(), 'Pulse started');
pulse.processEvery('1 second');
await pulse.start();
await pulse.every('1 second', 'long-running job');
await pulse.schedule('in 20 minutes', 'send email report', { to: 'admin@example.com' });
})();

// Log job start and completion/failure
pulse.on('start', (job) => {
console.log(time(), `Job <${job.attrs.name}> starting`);
});
pulse.on('success', (job) => {
console.log(time(), `Job <${job.attrs.name}> succeeded`);
});
pulse.on('fail', (error, job) => {
console.log(time(), `Job <${job.attrs.name}> failed:`, error);
});
/**
* Example of repeating a job
*/
(async function () {
const weeklyReport = pulse.create('send email report', { to: 'example@example.com' });
await pulse.start();
await weeklyReport.repeatEvery('1 week').save();
})();

/**
* Check job start and completion/failure
*/
pulse.on('start', (job) => {
console.log(time(), `Job <${job.attrs.name}> starting`);
});
pulse.on('success', (job) => {
console.log(time(), `Job <${job.attrs.name}> succeeded`);
});
pulse.on('fail', (error, job) => {
console.log(time(), `Job <${job.attrs.name}> failed:`, error);
});

function time() {
return new Date().toTimeString().split(' ')[0];
}
2 changes: 1 addition & 1 deletion src/job/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export interface JobAttributes<T extends JobAttributesData = JobAttributesData>
/**
* The job details.
*/
data: T;
data: T | any;

unique?: any;
uniqueOpts?: {
Expand Down
27 changes: 10 additions & 17 deletions src/pulse/define.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,47 +33,40 @@ export interface DefineOptions {
* (lowest|low|normal|high|highest|number) specifies the priority of the job. Higher priority jobs will run
* first.
*/
priority?: JobPriority;
priority?: keyof typeof JobPriority;

/**
* Should the return value of the job be persisted
*/
shouldSaveResult?: boolean;
}

export type Processor<T extends JobAttributes> =
| ((job: Job<T>) => Promise<void>)
| ((job: Job<T>, done: () => void) => void);
export type Processor<T extends JobAttributes> = (job: Job<T>, done?: () => void) => void;

/**
* Setup definition for job
* Method is used by consumers of lib to setup their functions
* @name Pulse#define
* @function
* @param name name of job
* @param options options for job to run
* @param [processor] function to be called to run actual job
* @param options options for job to run
*/
export const define = function <T extends JobAttributes>(
this: Pulse,
name: string,
options: DefineOptions | Processor<T>,
processor?: Processor<T>
processor: Processor<T>,
options?: DefineOptions
): void {
if (processor === undefined) {
processor = options as Processor<T>;
options = {};
}

this._definitions[name] = {
fn: processor,
concurrency: (options as DefineOptions).concurrency || this._defaultConcurrency, // `null` is per interface definition of DefineOptions not valid
lockLimit: (options as DefineOptions).lockLimit || this._defaultLockLimit,
priority: (options as DefineOptions).priority || JobPriority.normal,
lockLifetime: (options as DefineOptions).lockLifetime || this._defaultLockLifetime,
concurrency: (options as DefineOptions)?.concurrency || this._defaultConcurrency, // `null` is per interface definition of DefineOptions not valid
lockLimit: (options as DefineOptions)?.lockLimit || this._defaultLockLimit,
priority: (options as DefineOptions)?.priority || JobPriority.normal,
lockLifetime: (options as DefineOptions)?.lockLifetime || this._defaultLockLifetime,
running: 0,
locked: 0,
shouldSaveResult: (options as DefineOptions).shouldSaveResult || false,
shouldSaveResult: (options as DefineOptions)?.shouldSaveResult || false,
};
debug('job [%s] defined with following options: \n%O', name, this._definitions[name]);
};
7 changes: 3 additions & 4 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"removeComments": true /* Do not emit comments to output. */,
// "noEmit": true /* Do not emit outputs. */,
"incremental": true /* Enable incremental compilation */,
// "importHelpers": true, /* Import emit helpers from 'tslib'. */
"importHelpers": true, /* Import emit helpers from 'tslib'. */
// "downlevelIteration": true, /* Provide full support for iterables in 'for-of', spread, and destructuring when targeting 'ES5' or 'ES3'. */
// "isolatedModules": true /* Transpile each file as a separate module (similar to 'ts.transpileModule'). */,

Expand All @@ -36,9 +36,9 @@

/* Module Resolution Options */
"moduleResolution": "node" /* Specify module resolution strategy: 'node' (Node.js) or 'classic' (TypeScript pre-1.6). */,
"baseUrl": "./" /* Base directory to resolve non-absolute module names. */,
"baseUrl": "." /* Base directory to resolve non-absolute module names. */,
"paths": {
"@src/*": ["./src/*"]
"@src/*": ["src/*"]
} /* A series of entries which re-map imports to lookup locations relative to the 'baseUrl'. */,
// "rootDirs": [], /* List of root folders whose combined content represents the structure of the project at runtime. */
// "typeRoots": [], /* List of folders to include type definitions from. */
Expand All @@ -56,7 +56,6 @@
// "mapRoot": "./", /* Specify the location where debugger should locate map files instead of generated locations. */
// "inlineSourceMap": true, /* Emit a single file with source maps instead of having a separate file. */
// "inlineSources": true, /* Emit the source alongside the sourcemaps within a single file; requires '--inlineSourceMap' or '--sourceMap' to be set. */

/* Experimental Options */
"experimentalDecorators": true /* Enables experimental support for ES7 decorators. */,
"emitDecoratorMetadata": true /* Enables experimental support for emitting type metadata for decorators. */
Expand Down

0 comments on commit 8495aac

Please sign in to comment.