From c03632a69b2254521a2b3e5dff03a2a7088ab7d1 Mon Sep 17 00:00:00 2001 From: itleigns Date: Mon, 7 Aug 2023 16:27:59 +0900 Subject: [PATCH 1/2] fix exec error handling --- src/core/queue.ts | 50 +++++++++++++++++++++++++++++++++++++------ src/core/scheduler.ts | 8 ++++++- src/core/worker.ts | 26 +++++++++++++++++++--- 3 files changed, 74 insertions(+), 10 deletions(-) diff --git a/src/core/queue.ts b/src/core/queue.ts index 52744f12..8e9b67a8 100644 --- a/src/core/queue.ts +++ b/src/core/queue.ts @@ -86,12 +86,18 @@ export class Queue extends EventEmitter { const toRun = await RunPlugins(this, "beforeEnqueue", func, q, job, args); if (toRun === false) return toRun; - await this.connection.redis + const response = await this.connection.redis .multi() .sadd(this.connection.key("queues"), q) .rpush(this.connection.key("queue", q), this.encode(q, func, args)) .exec(); + response.forEach((res) => { + if (res[0] !== null) { + throw res[0]; + } + }); + await RunPlugins(this, "afterEnqueue", func, q, job, args); return toRun; } @@ -129,7 +135,7 @@ export class Queue extends EventEmitter { } } - await this.connection.redis + const response = await this.connection.redis .multi() .rpush(this.connection.key("delayed:" + rTimestamp), item) .sadd(this.connection.key("timestamps:" + item), "delayed:" + rTimestamp) @@ -139,6 +145,12 @@ export class Queue extends EventEmitter { rTimestamp.toString(), ) .exec(); + + response.forEach((res) => { + if (res[0] !== null) { + throw res[0]; + } + }); } /** * - In ms, the number of ms to delay before this job is able to start being worked on. @@ -168,11 +180,17 @@ export class Queue extends EventEmitter { */ async delQueue(q: string) { const { redis } = this.connection; - await redis + const response = await redis .multi() .del(this.connection.key("queue", q)) .srem(this.connection.key("queues"), q) .exec(); + + response.forEach((res) => { + if (res[0] !== null) { + throw res[0]; + } + }); } /** @@ -230,7 +248,14 @@ export class Queue extends EventEmitter { } } - await pipeline.exec(); + const response = await pipeline.exec(); + + response.forEach((res) => { + if (res[0] !== null) { + throw res[0]; + } + }); + return numJobsDeleted; } @@ -258,7 +283,14 @@ export class Queue extends EventEmitter { } } - await pipeline.exec(); + const response = await pipeline.exec(); + + response.forEach((res) => { + if (res[0] !== null) { + throw res[0]; + } + }); + return timestamps.map((t) => parseInt(t, 10)); } @@ -501,7 +533,13 @@ export class Queue extends EventEmitter { ); } - await pipeline.exec(); + const response = await pipeline.exec(); + + response.forEach((res) => { + if (res[0] !== null) { + throw res[0]; + } + }); return errorPayload; } diff --git a/src/core/scheduler.ts b/src/core/scheduler.ts index 91f0b864..a7e387f4 100644 --- a/src/core/scheduler.ts +++ b/src/core/scheduler.ts @@ -254,11 +254,17 @@ export class Scheduler extends EventEmitter { await this.watchIfPossible(this.connection.key("delayed_queue_schedule")); const length = await this.connection.redis.llen(key); if (length === 0) { - await this.connection.redis + const response = await this.connection.redis .multi() .del(key) .zrem(this.connection.key("delayed_queue_schedule"), timestamp) .exec(); + + response.forEach((res) => { + if (res[0] !== null) { + throw res[0]; + } + }); } await this.unwatchIfPossible(); } diff --git a/src/core/worker.ts b/src/core/worker.ts index ce9bc060..4606a9ba 100644 --- a/src/core/worker.ts +++ b/src/core/worker.ts @@ -404,16 +404,23 @@ export class Worker extends EventEmitter { } private async succeed(job: ParsedJob, duration: number) { - await this.connection.redis + const response = await this.connection.redis .multi() .incr(this.connection.key("stat", "processed")) .incr(this.connection.key("stat", "processed", this.name)) .exec(); + + response.forEach((res) => { + if (res[0] !== null) { + throw res[0]; + } + }); + this.emit("success", this.queue, job, this.result, duration); } private async fail(err: Error, duration: number) { - await this.connection.redis + const response = await this.connection.redis .multi() .incr(this.connection.key("stat", "failed")) .incr(this.connection.key("stat", "failed", this.name)) @@ -422,6 +429,13 @@ export class Worker extends EventEmitter { JSON.stringify(this.failurePayload(err, this.job)), ) .exec(); + + response.forEach((res) => { + if (res[0] !== null) { + throw res[0]; + } + }); + this.emit("failure", this.queue, this.job, err, duration); } @@ -512,7 +526,7 @@ export class Worker extends EventEmitter { return; } - await this.connection.redis + const response = await this.connection.redis .multi() .srem(this.connection.key("workers"), name + ":" + queues) .del(this.connection.key("worker", "ping", name)) @@ -521,6 +535,12 @@ export class Worker extends EventEmitter { .del(this.connection.key("stat", "failed", name)) .del(this.connection.key("stat", "processed", name)) .exec(); + + response.forEach((res) => { + if (res[0] !== null) { + throw res[0]; + } + }); } async checkQueues() { From c4527bfda4af44325c13c65ed58baac1d91b6299 Mon Sep 17 00:00:00 2001 From: itleigns Date: Mon, 7 Aug 2023 17:58:23 +0900 Subject: [PATCH 2/2] lint --- src/core/queue.ts | 8 ++++---- src/core/worker.ts | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/core/queue.ts b/src/core/queue.ts index 8e9b67a8..d038fd7a 100644 --- a/src/core/queue.ts +++ b/src/core/queue.ts @@ -185,7 +185,7 @@ export class Queue extends EventEmitter { .del(this.connection.key("queue", q)) .srem(this.connection.key("queues"), q) .exec(); - + response.forEach((res) => { if (res[0] !== null) { throw res[0]; @@ -249,7 +249,7 @@ export class Queue extends EventEmitter { } const response = await pipeline.exec(); - + response.forEach((res) => { if (res[0] !== null) { throw res[0]; @@ -284,7 +284,7 @@ export class Queue extends EventEmitter { } const response = await pipeline.exec(); - + response.forEach((res) => { if (res[0] !== null) { throw res[0]; @@ -534,7 +534,7 @@ export class Queue extends EventEmitter { } const response = await pipeline.exec(); - + response.forEach((res) => { if (res[0] !== null) { throw res[0]; diff --git a/src/core/worker.ts b/src/core/worker.ts index 4606a9ba..cd6c1cc9 100644 --- a/src/core/worker.ts +++ b/src/core/worker.ts @@ -409,13 +409,13 @@ export class Worker extends EventEmitter { .incr(this.connection.key("stat", "processed")) .incr(this.connection.key("stat", "processed", this.name)) .exec(); - + response.forEach((res) => { if (res[0] !== null) { throw res[0]; } }); - + this.emit("success", this.queue, job, this.result, duration); }