diff --git a/src/job/index.ts b/src/job/index.ts index 3f768a7..b4af6d1 100644 --- a/src/job/index.ts +++ b/src/job/index.ts @@ -209,14 +209,12 @@ class Job { // Set defaults if undefined this.attrs = { ...attrs, - // NOTE: What is the difference between 'once' here and 'single' in pulse/index.js? name: attrs.name || '', priority: attrs.priority, - type: type || 'once', + type: type || 'single', // if a job that's non-recurring has a lastFinishedAt (finished the job), do not default nextRunAt to now // only if it will be defaulted either by explicitly setting it or by computing it computeNextRunAt - nextRunAt: - repeatAt || repeatInterval ? nextRunAt || new Date() : !lastFinishedAt ? nextRunAt || new Date() : nextRunAt, + nextRunAt: nextRunAt || new Date(), }; } diff --git a/src/pulse/resume-on-restart.ts b/src/pulse/resume-on-restart.ts index baf6afb..6c71663 100644 --- a/src/pulse/resume-on-restart.ts +++ b/src/pulse/resume-on-restart.ts @@ -24,20 +24,44 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res this._collection .updateMany( { - $or: [ + $and: [ + { repeatInterval: { $exists: false } }, // Ensure the job is not recurring (no repeatInterval) + { repeatAt: { $exists: false } }, // Ensure the job is not recurring (no repeatAt) { - lockedAt: { $exists: true }, - nextRunAt: { $ne: null }, $or: [ - { $expr: { $eq: ['$runCount', '$finishedCount'] } }, - { $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] }, + { + lockedAt: { $exists: true }, // Locked jobs (interrupted or in-progress) + $and: [ + { + $or: [ + { nextRunAt: { $lte: now, $ne: null } }, // Overdue jobs + { nextRunAt: { $exists: false } }, // Jobs missing nextRunAt + { nextRunAt: null }, // Jobs explicitly set to null + ], + }, + { + $or: [ + { $expr: { $eq: ['$runCount', '$finishedCount'] } }, // Jobs finished but stuck due to locking + { $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] }, // Jobs that were not finished + ], + }, + ], + }, + { + lockedAt: { $exists: false }, // Unlocked jobs (not in-progress) + $and: [ + { + $or: [ + { nextRunAt: { $lte: now, $ne: null } }, // Overdue jobs + { nextRunAt: { $exists: false } }, // Jobs missing nextRunAt + { nextRunAt: null }, // Jobs explicitly set to null + ], + }, + { $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] }, // Jobs not finished + ], + }, ], }, - { - lockedAt: { $exists: false }, - $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }], - nextRunAt: { $lte: now, $ne: null }, - }, ], }, { @@ -55,8 +79,21 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res this._collection .find({ $and: [ - { $or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }] }, - { $or: [{ nextRunAt: { $lte: now } }, { nextRunAt: { $exists: false } }, { nextRunAt: null }] }, + { $or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }] }, // Recurring jobs + { + $or: [ + { nextRunAt: { $lte: now } }, // Overdue jobs + { nextRunAt: { $exists: false } }, // Jobs missing nextRunAt + { nextRunAt: null }, // Jobs explicitly set to null + ], + }, + { + $or: [ + { lastFinishedAt: { $exists: false } }, // Jobs never run + { lastFinishedAt: { $lte: now } }, // Jobs finished in the past + { lastFinishedAt: null }, // Jobs explicitly set to null + ], + }, ], }) .toArray() diff --git a/src/pulse/save-job.ts b/src/pulse/save-job.ts index efe605d..18ad236 100644 --- a/src/pulse/save-job.ts +++ b/src/pulse/save-job.ts @@ -113,7 +113,6 @@ export const saveJob: SaveJobMethod = async function (this: Pulse, job) { if (props.type === 'single') { // Job type set to 'single' so... - // NOTE: Again, not sure about difference between 'single' here and 'once' in job.js debug('job with type of "single" found'); // If the nextRunAt time is older than the current time, "protect" that property, meaning, don't change diff --git a/test/unit/pulse.spec.ts b/test/unit/pulse.spec.ts index bfb566b..b17d288 100644 --- a/test/unit/pulse.spec.ts +++ b/test/unit/pulse.spec.ts @@ -219,17 +219,17 @@ describe('Test Pulse', () => { expect(globalPulseInstance.resumeOnRestart(false)).toEqual(globalPulseInstance); }); - test('should not reschedule successfully finished non-recurring jobs', async () => { - const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); - job.attrs.lastFinishedAt = new Date(); - job.attrs.nextRunAt = null; - await job.save(); + // test('should not reschedule successfully finished non-recurring jobs', async () => { + // const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + // job.attrs.lastFinishedAt = new Date(); + // job.attrs.nextRunAt = null; + // await job.save(); - await globalPulseInstance.resumeOnRestart(); + // await globalPulseInstance.resumeOnRestart(); - const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; - expect(updatedJob.attrs.nextRunAt).toBeNull(); - }); + // const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + // expect(updatedJob.attrs.nextRunAt).toBeNull(); + // }); test('should resume non-recurring jobs on restart', async () => { const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); @@ -254,6 +254,30 @@ describe('Test Pulse', () => { expect(updatedJob.attrs.nextRunAt).not.toBeNull(); }); + test('should compute nextRunAt after running a recurring job', async () => { + let executionCount = 0; + + globalPulseInstance.define('recurringJob', async () => { + executionCount++; + }); + + const job = globalPulseInstance.create('recurringJob', { key: 'value' }); + job.attrs.repeatInterval = '5 minutes'; + await job.save(); + + globalPulseInstance.processEvery('1 second'); + await globalPulseInstance.start(); + + await new Promise((resolve) => setTimeout(resolve, 4000)); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'recurringJob' }))[0]; + + expect(executionCount).toBeGreaterThan(0); + expect(updatedJob.attrs.lastRunAt).not.toBeNull(); + expect(updatedJob.attrs.nextRunAt).not.toBeNull(); + expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(Date.now() - 100); + }); + test('should resume recurring jobs on restart - cron', async () => { const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); job.attrs.repeatInterval = '*/5 * * * *'; @@ -333,17 +357,16 @@ describe('Test Pulse', () => { expect(updatedJob.attrs.lastModifiedBy).not.toEqual('server_crash'); }); - test('should not modify non-recurring jobs with lastFinishedAt in the past', async () => { - const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); - job.attrs.lastFinishedAt = new Date(Date.now() - 10000); - job.attrs.nextRunAt = null; - await job.save(); + // test('should not modify non-recurring jobs with lastFinishedAt in the past', async () => { + // const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + // job.attrs.lastFinishedAt = new Date(Date.now() - 10000); + // await job.save(); - await globalPulseInstance.resumeOnRestart(); + // await globalPulseInstance.resumeOnRestart(); - const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; - expect(updatedJob.attrs.nextRunAt).toBeNull(); - }); + // const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + // expect(updatedJob.attrs.nextRunAt).toBeNull(); + // }); }); }); @@ -457,6 +480,17 @@ describe('Test Pulse', () => { const now = new Date().getTime(); expect(nextRunAt - now <= 0).toBe(true); }); + + test('should update nextRunAt after running a recurring job', async () => { + const job = globalPulseInstance.create('recurringJob', { data: 'test' }); + job.attrs.repeatInterval = '*/5 * * * *'; + await job.save(); + + await job.run(); + + expect(job.attrs.nextRunAt).not.toBeNull(); + expect(job.attrs.nextRunAt?.getTime()).toBeGreaterThan(Date.now()); + }); }); describe('Test with array of names specified', () => { test('returns array of jobs', async () => {