Skip to content

Commit

Permalink
fix: Add redis config option for Bull & BullMQ (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamlogic authored Jun 4, 2024
1 parent 9f4b591 commit bb57dbd
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 21 deletions.
5 changes: 5 additions & 0 deletions packages/bull/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ const { Judoscale } = require('judoscale-bull')
const judoscale = new Judoscale({
redis_url: process.env.REDISCLOUD_URL, // defaults to process.env.REDIS_URL
})

// You can optionally pass a Redis config object or an ioredis instance
const judoscale = new Judoscale({
redis: redisConfig,
})
```

3. If you want to scale your workers down to zero instances, you also need to install judoscale-bull in your web process so Judoscale knows when to scale them back up:
Expand Down
14 changes: 10 additions & 4 deletions packages/bull/src/bull-metrics-collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ process.on('exit', async () => {
})

class BullMetricsCollector extends WorkerMetricsCollector {
constructor() {
super('Bull')
constructor(config = {}) {
super('Bull', config)
collectors.push(this)
this.queues = new Map()
}
Expand Down Expand Up @@ -61,8 +61,14 @@ class BullMetricsCollector extends WorkerMetricsCollector {

get redis() {
if (!this._redis) {
const redisUrl = this.config.redis_url || process.env.REDIS_URL || 'redis://127.0.0.1:6379'
this._redis = new Redis(redisUrl)
if (this.config.redis instanceof Redis) {
this._redis = this.config.redis
} else if (this.config.redis) {
this._redis = new Redis(this.config.redis)
} else {
const redisUrl = this.config.redis_url || process.env.REDIS_URL || 'redis://127.0.0.1:6379'
this._redis = new Redis(redisUrl)
}
}

return this._redis
Expand Down
5 changes: 5 additions & 0 deletions packages/bullmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ require('judoscale-bullmq')
const judoscale = new Judoscale({
redis_url: process.env.REDISCLOUD_URL, // defaults to process.env.REDIS_URL
})

// You can optionally pass a Redis config object or an ioredis instance
const judoscale = new Judoscale({
redis: redisConfig,
})
```

## Troubleshooting
Expand Down
14 changes: 10 additions & 4 deletions packages/bullmq/src/bull-mq-metrics-collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ const { Queue } = require('bullmq')
const { Metric, WorkerMetricsCollector } = require('judoscale-node-core')

class BullMQMetricsCollector extends WorkerMetricsCollector {
constructor() {
super('BullMQ')
constructor(config = {}) {
super('BullMQ', config)
this.queueNames = new Set()
}

Expand Down Expand Up @@ -43,8 +43,14 @@ class BullMQMetricsCollector extends WorkerMetricsCollector {

get redis() {
if (!this._redis) {
const redisUrl = this.config.redis_url || process.env.REDIS_URL || 'redis://127.0.0.1:6379'
this._redis = new Redis(redisUrl)
if (this.config.redis instanceof Redis) {
this._redis = this.config.redis
} else if (this.config.redis) {
this._redis = new Redis(this.config.redis)
} else {
const redisUrl = this.config.redis_url || process.env.REDIS_URL || 'redis://127.0.0.1:6379'
this._redis = new Redis(redisUrl)
}
}

return this._redis
Expand Down
58 changes: 58 additions & 0 deletions packages/bullmq/test/bull-mq-metrics-collector.test.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const Redis = require('ioredis')
const { Queue } = require('bullmq')
const BullMQMetricsCollector = require('../src/bull-mq-metrics-collector')

Expand Down Expand Up @@ -31,4 +32,61 @@ describe('BullMQMetricsCollector', () => {
expect(metrics[1].queueName).toEqual('foo')
expect(metrics[1].value).toEqual(0)
})

test('uses the redis_url config if provided', async () => {
const collector1 = new BullMQMetricsCollector({ redis_url: 'redis://localhost:6379/1' })
const collector2 = new BullMQMetricsCollector({ redis_url: 'redis://localhost:6379/2' })

const queue = new Queue('foo', { connection: collector1.redis })
await queue.add('test-job')

const metrics1 = await collector1.collect()
const metrics2 = await collector2.collect()

expect(metrics1.length).toEqual(2)
expect(metrics2.length).toEqual(0)

collector1.redis.quit()
collector2.redis.quit()
})

test('uses the redis connection if provided', async () => {
const redis1 = new Redis('redis://localhost:6379/1')
const redis2 = new Redis('redis://localhost:6379/2')

const collector1 = new BullMQMetricsCollector({ redis: redis1 })
const collector2 = new BullMQMetricsCollector({ redis: redis2 })

const queue = new Queue('foo', { connection: redis1 })
await queue.add('test-job')

const metrics1 = await collector1.collect()
const metrics2 = await collector2.collect()

expect(metrics1.length).toEqual(2)
expect(metrics2.length).toEqual(0)

redis1.quit()
redis2.quit()
})

test('uses the redis connection options if provided', async () => {
const redis1 = { host: 'localhost', port: 6379, db: 1 }
const redis2 = { host: 'localhost', port: 6379, db: 2 }

const collector1 = new BullMQMetricsCollector({ redis: redis1 })
const collector2 = new BullMQMetricsCollector({ redis: redis2 })

const queue = new Queue('foo', { connection: collector1.redis })
await queue.add('test-job')

const metrics1 = await collector1.collect()
const metrics2 = await collector2.collect()

expect(metrics1.length).toEqual(2)
expect(metrics2.length).toEqual(0)

collector1.redis.quit()
collector2.redis.quit()
})
})
2 changes: 1 addition & 1 deletion packages/node-core/src/judoscale.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Judoscale {

// Expose config to the collectors
for (const adapter of Judoscale.adapters) {
adapter.collector.config = this.config
adapter.collector.config = { ...this.config, ...adapter.collector.config }
}

new Reporter().start(this.config, Judoscale.adapters)
Expand Down
4 changes: 2 additions & 2 deletions packages/node-core/src/web-metrics-collector.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
class WebMetricsCollector {
constructor(store, collectorName = 'Web') {
constructor(store, collectorName = 'Web', config = {}) {
this.collectorName = collectorName
this.store = store
this.config = {}
this.config = config
}

collect() {
Expand Down
4 changes: 2 additions & 2 deletions packages/node-core/src/worker-metrics-collector.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
class WorkerMetricsCollector {
constructor(collectorName) {
constructor(collectorName, config = {}) {
this.collectorName = collectorName
this.config = {}
this.config = config
}

collect() {
Expand Down
7 changes: 3 additions & 4 deletions sample_apps/express_web/src/web.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@ import 'judoscale-bullmq'

const app = express()
const port = process.env.PORT || 5000
const redisUrl = process.env.REDIS_URL || 'redis://127.0.0.1:6379'
const redisOpts = {
const redis = new Redis('redis://127.0.0.1:6379', {
maxRetriesPerRequest: null, // Since bull v4
enableReadyCheck: false, // Since bull v4
}
const redis = new Redis(redisUrl, redisOpts)
})

app.set('views', './views')
app.set('view engine', 'ejs')

const judoscale = new Judoscale({
api_base_url: process.env.JUDOSCALE_URL || 'https://judoscale-node-sample.requestcatcher.com',
redis: redis,
})

app.use(judoscaleMiddleware(judoscale))
Expand Down
7 changes: 3 additions & 4 deletions sample_apps/express_web/src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ import Redis from 'ioredis'
import { Worker } from 'bullmq'
import { Judoscale } from 'judoscale-bullmq'

const redisUrl = process.env.REDIS_URL || 'redis://127.0.0.1:6379'
const redisOpts = {
const redis = new Redis('redis://127.0.0.1:6379', {
maxRetriesPerRequest: null, // Since bull v4
enableReadyCheck: false, // Since bull v4
}
const redis = new Redis(redisUrl, redisOpts)
})
const queueNames = ['default', 'urgent']

new Judoscale({
api_base_url: process.env.JUDOSCALE_URL || 'https://judoscale-node-sample.requestcatcher.com',
redis: redis,
})

const workers = queueNames.map((queueName) => {
Expand Down

0 comments on commit bb57dbd

Please sign in to comment.