Skip to content

Commit

Permalink
feat: add attempts and backoff options for failling job (#27)
Browse files Browse the repository at this point in the history
* feat: add attempts and backoff options for failling job

* chore: update readme.md
  • Loading branch information
code-xhyun authored May 5, 2024
1 parent b17a449 commit 120adf5
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 15 deletions.
10 changes: 4 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

- [Overview](#overview)
- [Related Projects](#related-projects)
- [Features](#features)
- [Unique Features in Pulse](#unique-features-in-pulse)
- [Repository Structure](#repository-structure)
- [Modules](#modules)
- [Getting Started](#getting-started)
Expand Down Expand Up @@ -56,12 +56,10 @@ Pulse is a new fork of the [Agenda](https://github.com/agenda/agenda) project, c
<br/>


## Features
## Unique Features in Pulse

- **High Scalability**: Designed to efficiently manage large-scale job processing.
- **Modern Architecture**: Employs the latest Node.js features and best practices for superior performance.
- **Flexible Scheduling**: Offers support for cron, one-time, and recurring jobs with fine-tuned control.
- **Seamless Integration**: Easily integrates with existing Node.js applications and MongoDB setups.
- **Resume Incomplete Tasks After System Restart**: hen the system restarts, Pulse resumes incomplete tasks that were in progress or queued for execution, providing seamless continuation without manual intervention.
- **Retry Failed Tasks**: Pulse uses an intelligent retry mechanism with configurable attempts and backoff strategies, ensuring failed tasks are retried efficiently.
- **Extensive Documentation**: Provides detailed guides and examples for a quick and easy start.

---
Expand Down
31 changes: 23 additions & 8 deletions src/job/fail.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,37 @@ import { Job } from '.';
const debug = createDebugger('pulse:job');

export type FailMethod = (reason: string | Error) => Job;

/**
* Fails the job with a reason (error) specified
* @name Job#fail
* @function
* @param reason reason job failed
*/
export const fail: FailMethod = function (this: Job, reason) {
if (reason instanceof Error) {
reason = reason.message;
}
const failReason = reason instanceof Error ? reason.message : reason;

this.attrs.failReason = reason;
this.attrs.failCount = (this.attrs.failCount || 0) + 1;
const attrs = this.attrs;
attrs.failReason = failReason;
attrs.failCount = (attrs.failCount || 0) + 1;
attrs.runCount = attrs.runCount || 1;
const now = new Date();
this.attrs.failedAt = now;
this.attrs.lastFinishedAt = now;
debug('[%s:%s] fail() called [%d] times so far', this.attrs.name, this.attrs._id, this.attrs.failCount);
attrs.failedAt = attrs.lastFinishedAt = now;

debug('[%s:%s] fail() called [%d] times so far', attrs.name, attrs._id, attrs.failCount);

const attempts = attrs.attempts || 0;
const step = attrs.runCount <= attempts * (attempts + 1) ? Math.ceil(attrs.runCount / (attempts + 1)) : attempts;
const retryCount = attrs.failCount - step;

const backoff = attrs.backoff;
if (attempts && backoff && retryCount < step * attempts) {
const delayMultiplier = backoff.type === 'fixed' ? 1 : Math.pow(2, retryCount);
attrs.nextRunAt = new Date(now.getTime() + delayMultiplier * backoff.delay);

// Log next retry time
debug('[%s:%s] setting retry at time [%s]', attrs.name, attrs._id, attrs.nextRunAt);
}

return this;
};
30 changes: 30 additions & 0 deletions src/job/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ export interface JobAttributes<T extends JobAttributesData = JobAttributesData>
*/
lastRunAt?: Date;

/*
* Count of the number of times the job has run.
*/
runCount?: number;

/**
* Date/time the job last finished running.
*/
Expand All @@ -100,6 +105,11 @@ export interface JobAttributes<T extends JobAttributesData = JobAttributesData>
endDate?: Date | number | null;
skipDays?: string | null;

/**
* The number of times the job has finished running.
*/
finishedCount?: number;

/**
* The reason the job failed.
*/
Expand All @@ -125,6 +135,26 @@ export interface JobAttributes<T extends JobAttributesData = JobAttributesData>
*/
shouldSaveResult?: boolean;

/**
* Number of attempts to run the job.
*/
attempts?: number;

/**
* Backoff options.
*/
backoff?: {
/**
* Type of backoff to use.
*/
type: 'exponential' | 'fixed';

/**
* Delay in ms.
*/
delay: number;
};

/**
* Result of the finished job.
*/
Expand Down
2 changes: 2 additions & 0 deletions src/job/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export const run: RunMethod = async function (this: Job) {
// eslint-disable-next-line no-async-promise-executor
return new Promise(async (resolve, reject) => {
this.attrs.lastRunAt = new Date();
this.attrs.runCount = (this.attrs.runCount || 0) + 1;
debug('[%s:%s] setting lastRunAt to: %s', this.attrs.name, this.attrs._id, this.attrs.lastRunAt.toISOString());
this.computeNextRunAt();
await this.save();
Expand All @@ -35,6 +36,7 @@ export const run: RunMethod = async function (this: Job) {
this.fail(error);
} else {
this.attrs.lastFinishedAt = new Date();
this.attrs.finishedCount = (this.attrs.finishedCount || 0) + 1;

if (this.attrs.shouldSaveResult && result) {
this.attrs.result = result;
Expand Down
18 changes: 17 additions & 1 deletion src/pulse/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,23 @@ export const create: CreateMethod = function (this: Pulse, name, data) {
debug('Pulse.create(%s, [Object])', name);
const priority = this._definitions[name] ? this._definitions[name].priority : 0;
const shouldSaveResult = this._definitions[name] ? this._definitions[name].shouldSaveResult || false : false;
const job = new Job({ name, data, type: 'normal', priority, shouldSaveResult, pulse: this });
const attempts = this._definitions[name] ? this._definitions[name].attempts || 0 : 0;
const backoff = attempts
? this._definitions[name]
? this._definitions[name].backoff || { type: 'exponential', delay: 1000 }
: { type: 'exponential', delay: 1000 }
: undefined;

const job = new Job({
name,
data,
type: 'normal',
priority,
shouldSaveResult,
attempts,
backoff,
pulse: this,
});

return job;
};
29 changes: 29 additions & 0 deletions src/pulse/define.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,30 @@ export interface DefineOptions {
* Should the return value of the job be persisted
*/
shouldSaveResult?: boolean;

/**
* Number of attempts to run the job
* @default 0
*/
attempts?: number;

/**
* Backoff options
*/
backoff?: {
/**
* Type of backoff to use
* @default exponential
*/
type: 'exponential' | 'fixed';

/**
* Delay in ms
* @default 1000
* Math.pow(2, attempts - 1) * delay
*/
delay: number;
};
}

export type Processor<T extends JobAttributesData> = (
Expand Down Expand Up @@ -71,6 +95,11 @@ export const define: DefineMethod = function (this: Pulse, name, processor, opti
running: 0,
locked: 0,
shouldSaveResult: (options as DefineOptions)?.shouldSaveResult || false,
attempts: (options as DefineOptions)?.attempts || 0,
backoff: (options as DefineOptions)?.attempts && {
type: (options as DefineOptions)?.backoff?.type || 'exponential',
delay: (options as DefineOptions)?.backoff?.delay || 1000,
},
};

debug('job [%s] defined with following options: \n%O', name, this._definitions[name]);
Expand Down

0 comments on commit 120adf5

Please sign in to comment.