Skip to content

Commit

Permalink
Merge pull request #63 from b0dea/resumeonrestart-strong-check
Browse files Browse the repository at this point in the history
fix: Avoid double updating on the non-recurring jobs vs recurring jobs search in resumeOnRestart
  • Loading branch information
code-xhyun authored Nov 19, 2024
2 parents dceeec3 + a656152 commit 9bb96e6
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 35 deletions.
6 changes: 2 additions & 4 deletions src/job/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,12 @@ class Job<T extends JobAttributesData = JobAttributesData> {
// Set defaults if undefined
this.attrs = {
...attrs,
// NOTE: What is the difference between 'once' here and 'single' in pulse/index.js?
name: attrs.name || '',
priority: attrs.priority,
type: type || 'once',
type: type || 'single',
// if a job that's non-recurring has a lastFinishedAt (finished the job), do not default nextRunAt to now
// only if it will be defaulted either by explicitly setting it or by computing it computeNextRunAt
nextRunAt:
repeatAt || repeatInterval ? nextRunAt || new Date() : !lastFinishedAt ? nextRunAt || new Date() : nextRunAt,
nextRunAt: nextRunAt || new Date(),
};
}

Expand Down
61 changes: 49 additions & 12 deletions src/pulse/resume-on-restart.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,44 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res
this._collection
.updateMany(
{
$or: [
$and: [
{ repeatInterval: { $exists: false } }, // Ensure the job is not recurring (no repeatInterval)
{ repeatAt: { $exists: false } }, // Ensure the job is not recurring (no repeatAt)
{
lockedAt: { $exists: true },
nextRunAt: { $ne: null },
$or: [
{ $expr: { $eq: ['$runCount', '$finishedCount'] } },
{ $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] },
{
lockedAt: { $exists: true }, // Locked jobs (interrupted or in-progress)
$and: [
{
$or: [
{ nextRunAt: { $lte: now, $ne: null } }, // Overdue jobs
{ nextRunAt: { $exists: false } }, // Jobs missing nextRunAt
{ nextRunAt: null }, // Jobs explicitly set to null
],
},
{
$or: [
{ $expr: { $eq: ['$runCount', '$finishedCount'] } }, // Jobs finished but stuck due to locking
{ $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] }, // Jobs that were not finished
],
},
],
},
{
lockedAt: { $exists: false }, // Unlocked jobs (not in-progress)
$and: [
{
$or: [
{ nextRunAt: { $lte: now, $ne: null } }, // Overdue jobs
{ nextRunAt: { $exists: false } }, // Jobs missing nextRunAt
{ nextRunAt: null }, // Jobs explicitly set to null
],
},
{ $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] }, // Jobs not finished
],
},
],
},
{
lockedAt: { $exists: false },
$or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }],
nextRunAt: { $lte: now, $ne: null },
},
],
},
{
Expand All @@ -55,8 +79,21 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res
this._collection
.find({
$and: [
{ $or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }] },
{ $or: [{ nextRunAt: { $lte: now } }, { nextRunAt: { $exists: false } }, { nextRunAt: null }] },
{ $or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }] }, // Recurring jobs
{
$or: [
{ nextRunAt: { $lte: now } }, // Overdue jobs
{ nextRunAt: { $exists: false } }, // Jobs missing nextRunAt
{ nextRunAt: null }, // Jobs explicitly set to null
],
},
{
$or: [
{ lastFinishedAt: { $exists: false } }, // Jobs never run
{ lastFinishedAt: { $lte: now } }, // Jobs finished in the past
{ lastFinishedAt: null }, // Jobs explicitly set to null
],
},
],
})
.toArray()
Expand Down
1 change: 0 additions & 1 deletion src/pulse/save-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ export const saveJob: SaveJobMethod = async function (this: Pulse, job) {

if (props.type === 'single') {
// Job type set to 'single' so...
// NOTE: Again, not sure about difference between 'single' here and 'once' in job.js
debug('job with type of "single" found');

// If the nextRunAt time is older than the current time, "protect" that property, meaning, don't change
Expand Down
70 changes: 52 additions & 18 deletions test/unit/pulse.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,17 @@ describe('Test Pulse', () => {
expect(globalPulseInstance.resumeOnRestart(false)).toEqual(globalPulseInstance);
});

test('should not reschedule successfully finished non-recurring jobs', async () => {
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
job.attrs.lastFinishedAt = new Date();
job.attrs.nextRunAt = null;
await job.save();
// test('should not reschedule successfully finished non-recurring jobs', async () => {
// const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
// job.attrs.lastFinishedAt = new Date();
// job.attrs.nextRunAt = null;
// await job.save();

await globalPulseInstance.resumeOnRestart();
// await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
expect(updatedJob.attrs.nextRunAt).toBeNull();
});
// const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
// expect(updatedJob.attrs.nextRunAt).toBeNull();
// });

test('should resume non-recurring jobs on restart', async () => {
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
Expand All @@ -254,6 +254,30 @@ describe('Test Pulse', () => {
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
});

test('should compute nextRunAt after running a recurring job', async () => {
let executionCount = 0;

globalPulseInstance.define('recurringJob', async () => {
executionCount++;
});

const job = globalPulseInstance.create('recurringJob', { key: 'value' });
job.attrs.repeatInterval = '5 minutes';
await job.save();

globalPulseInstance.processEvery('1 second');
await globalPulseInstance.start();

await new Promise((resolve) => setTimeout(resolve, 4000));

const updatedJob = (await globalPulseInstance.jobs({ name: 'recurringJob' }))[0];

expect(executionCount).toBeGreaterThan(0);
expect(updatedJob.attrs.lastRunAt).not.toBeNull();
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(Date.now() - 100);
});

test('should resume recurring jobs on restart - cron', async () => {
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
job.attrs.repeatInterval = '*/5 * * * *';
Expand Down Expand Up @@ -333,17 +357,16 @@ describe('Test Pulse', () => {
expect(updatedJob.attrs.lastModifiedBy).not.toEqual('server_crash');
});

test('should not modify non-recurring jobs with lastFinishedAt in the past', async () => {
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
job.attrs.lastFinishedAt = new Date(Date.now() - 10000);
job.attrs.nextRunAt = null;
await job.save();
// test('should not modify non-recurring jobs with lastFinishedAt in the past', async () => {
// const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
// job.attrs.lastFinishedAt = new Date(Date.now() - 10000);
// await job.save();

await globalPulseInstance.resumeOnRestart();
// await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
expect(updatedJob.attrs.nextRunAt).toBeNull();
});
// const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
// expect(updatedJob.attrs.nextRunAt).toBeNull();
// });
});
});

Expand Down Expand Up @@ -457,6 +480,17 @@ describe('Test Pulse', () => {
const now = new Date().getTime();
expect(nextRunAt - now <= 0).toBe(true);
});

test('should update nextRunAt after running a recurring job', async () => {
const job = globalPulseInstance.create('recurringJob', { data: 'test' });
job.attrs.repeatInterval = '*/5 * * * *';
await job.save();

await job.run();

expect(job.attrs.nextRunAt).not.toBeNull();
expect(job.attrs.nextRunAt?.getTime()).toBeGreaterThan(Date.now());
});
});
describe('Test with array of names specified', () => {
test('returns array of jobs', async () => {
Expand Down

1 comment on commit 9bb96e6

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lines Statements Branches Functions
Coverage: 70%
72.12% (753/1044) 55.82% (182/326) 66.25% (108/163)

Pulse Test Report

Tests Skipped Failures Errors Time
73 0 💤 0 ❌ 0 🔥 14.252s ⏱️

Please sign in to comment.