bee-queue
Advanced tools
Comparing version 1.5.0 to 1.6.0
@@ -0,1 +1,7 @@ | ||
## [1.6.0](https://github.com/bee-queue/bee-queue/compare/v1.5.0...v1.6.0) (2023-11-02) | ||
### Features | ||
- **queue:** allow manual connection ([#708](https://github.com/bee-queue/bee-queue/issues/708)) ([425fb89](https://github.com/bee-queue/bee-queue/commit/425fb897ef8515f881e9db7e4462ed4f3625eba8)) | ||
## [1.5.0](https://github.com/bee-queue/bee-queue/compare/v1.4.3...v1.5.0) (2022-11-28) | ||
@@ -2,0 +8,0 @@ |
@@ -34,2 +34,3 @@ /// <reference types="node" /> | ||
connect(): Promise<boolean>; | ||
createJob<U extends T>(data: U): BeeQueue.Job<U>; | ||
@@ -105,2 +106,3 @@ | ||
redisScanCount?: number; | ||
autoConnect?: boolean; | ||
} | ||
@@ -107,0 +109,0 @@ |
@@ -9,2 +9,9 @@ 'use strict'; | ||
function bool(input, defaultValue) { | ||
if (typeof input === 'boolean') { | ||
return input; | ||
} | ||
return defaultValue; | ||
} | ||
/** | ||
@@ -30,2 +37,3 @@ * A variant of the Promise#finally implementation. Instead of rejecting with | ||
asCallback: promiseUtils.asCallback, | ||
bool, | ||
callAsync: promiseUtils.callAsync, | ||
@@ -32,0 +40,0 @@ deferred: promiseUtils.deferred, |
117
lib/queue.js
@@ -15,3 +15,3 @@ 'use strict'; | ||
class Queue extends Emitter { | ||
constructor(name, settings) { | ||
constructor(name, settings = {}) { | ||
super(); | ||
@@ -37,3 +37,2 @@ | ||
settings = settings || {}; | ||
this.settings = { | ||
@@ -43,4 +42,8 @@ redis: settings.redis || {}, | ||
keyPrefix: (settings.prefix || defaults.prefix) + ':' + this.name + ':', | ||
autoConnect: helpers.bool(settings.autoConnect, true), | ||
}; | ||
this._isReady = false; | ||
this._ready = false; | ||
for (const prop in defaults) { | ||
@@ -78,53 +81,69 @@ const def = defaults[prop], | ||
const makeClient = (clientName, createNew) => { | ||
return redis | ||
.createClient(this.settings.redis, createNew) | ||
.then((client) => { | ||
// This event gets cleaned up and removed in Queue#close for the | ||
// primary client if quitCommandClient is disabled. | ||
client.on('error', this._emitError); | ||
return (this[clientName] = client); | ||
}); | ||
}; | ||
if (this.settings.autoConnect) { | ||
this.connect(); | ||
} | ||
} | ||
let eventsPromise = null; | ||
makeClient(clientName, createNew) { | ||
return redis.createClient(this.settings.redis, createNew).then((client) => { | ||
// This event gets cleaned up and removed in Queue#close for the | ||
// primary client if quitCommandClient is disabled. | ||
client.on('error', this._emitError); | ||
return (this[clientName] = client); | ||
}); | ||
} | ||
if (this.settings.getEvents || this.settings.activateDelayedJobs) { | ||
eventsPromise = makeClient('eclient', true).then(() => { | ||
this.eclient.on('message', this._onMessage.bind(this)); | ||
const channels = []; | ||
if (this.settings.getEvents) { | ||
channels.push(this.toKey('events')); | ||
} | ||
if (this.settings.activateDelayedJobs) { | ||
channels.push(this.toKey('earlierDelayed')); | ||
} | ||
return Promise.all( | ||
channels.map((channel) => | ||
helpers.callAsync((done) => this.eclient.subscribe(channel, done)) | ||
) | ||
); | ||
}); | ||
} | ||
connect() { | ||
return new Promise((resolve, reject) => { | ||
try { | ||
if (this._isReady) return resolve(this._isReady); | ||
this._isReady = false; | ||
const getEventPromise = () => { | ||
if (this.settings.getEvents || this.settings.activateDelayedJobs) { | ||
return this.makeClient('eclient', true).then(() => { | ||
this.eclient.on('message', this._onMessage.bind(this)); | ||
const channels = []; | ||
if (this.settings.getEvents) { | ||
channels.push(this.toKey('events')); | ||
} | ||
if (this.settings.activateDelayedJobs) { | ||
channels.push(this.toKey('earlierDelayed')); | ||
} | ||
return Promise.all( | ||
channels.map((channel) => | ||
helpers.callAsync((done) => | ||
this.eclient.subscribe(channel, done) | ||
) | ||
) | ||
); | ||
}); | ||
} | ||
// Wait for Lua scripts and client connections to load. Also wait for | ||
// bclient and eclient/subscribe if they're needed. | ||
this._ready = Promise.all([ | ||
// Make the clients | ||
makeClient('client', false), | ||
this.settings.isWorker ? makeClient('bclient', true) : null, | ||
eventsPromise, | ||
]) | ||
.then(() => { | ||
if (this.settings.ensureScripts) { | ||
return lua.buildCache(this.client); | ||
} | ||
}) | ||
.then(() => { | ||
this._isReady = true; | ||
setImmediate(() => this.emit('ready')); | ||
return this; | ||
}); | ||
return null; | ||
}; | ||
const eventsPromise = getEventPromise(); | ||
// Wait for Lua scripts and client connections to load. Also wait for | ||
// bclient and eclient/subscribe if they're needed. | ||
this._ready = Promise.all([ | ||
// Make the clients | ||
this.makeClient('client', false), | ||
this.settings.isWorker ? this.makeClient('bclient', true) : null, | ||
eventsPromise, | ||
]) | ||
.then(() => { | ||
if (this.settings.ensureScripts) { | ||
return lua.buildCache(this.client); | ||
} | ||
}) | ||
.then(() => { | ||
this._isReady = true; | ||
setImmediate(() => this.emit('ready')); | ||
resolve(this._isReady); | ||
return this; | ||
}); | ||
} catch (err) { | ||
reject(err); | ||
} | ||
}); | ||
} | ||
@@ -131,0 +150,0 @@ |
{ | ||
"name": "bee-queue", | ||
"version": "1.5.0", | ||
"version": "1.6.0", | ||
"description": "A simple, fast, robust job/task queue, backed by Redis.", | ||
"main": "index.js", | ||
"dependencies": { | ||
"p-finally": "^1.0.0", | ||
"p-finally": "^2.0.0", | ||
"promise-callbacks": "^3.8.1", | ||
@@ -12,24 +12,24 @@ "redis": "^3.1.2" | ||
"devDependencies": { | ||
"@commitlint/cli": "^17.2.0", | ||
"@commitlint/config-conventional": "^17.3.0", | ||
"@commitlint/cli": "^18.0.0", | ||
"@commitlint/config-conventional": "^18.1.0", | ||
"@semantic-release/changelog": "^6.0.1", | ||
"@semantic-release/commit-analyzer": "^9.0.2", | ||
"@semantic-release/commit-analyzer": "^11.0.0", | ||
"@semantic-release/exec": "^6.0.3", | ||
"@semantic-release/git": "^10.0.1", | ||
"@semantic-release/github": "^8.0.6", | ||
"@semantic-release/npm": "^9.0.1", | ||
"@semantic-release/release-notes-generator": "^10.0.3", | ||
"ava": "^0.25.0", | ||
"@semantic-release/github": "^9.0.2", | ||
"@semantic-release/npm": "^10.0.3", | ||
"@semantic-release/release-notes-generator": "^12.0.0", | ||
"@sinonjs/fake-timers": "^11.2.2", | ||
"ava": "^5.1.0", | ||
"ava-spec": "^1.1.0", | ||
"conventional-changelog-conventionalcommits": "^5.0.0", | ||
"conventional-changelog-conventionalcommits": "^7.0.1", | ||
"eslint": "^8.28.0", | ||
"eslint-config-prettier": "^8.5.0", | ||
"eslint-plugin-prettier": "^4.2.1", | ||
"lolex": "^6.0.0", | ||
"eslint-config-prettier": "^9.0.0", | ||
"eslint-plugin-prettier": "^5.0.0", | ||
"nyc": "^15.0.1", | ||
"prettier": "^2.0.4", | ||
"prettier": "^3.0.3", | ||
"sandboxed-module": "^2.0.3", | ||
"semantic-release": "^19.0.5", | ||
"semantic-release": "^22.0.5", | ||
"semver": "^7.2.1", | ||
"sinon": "^7.5.0" | ||
"sinon": "^17.0.0" | ||
}, | ||
@@ -36,0 +36,0 @@ "files": [ |
@@ -325,2 +325,3 @@ <a name="top"></a> | ||
redisScanCount: 100, | ||
autoConnect: true, | ||
}); | ||
@@ -353,2 +354,3 @@ ``` | ||
- `redisScanCount`: number. For setting the value of the `SSCAN` Redis command used in `Queue#getJobs` for succeeded and failed job types. | ||
- `autoConnect`: if set to `false`, then `queue.connect()` must be called to connect to the redis host. This is useful when the timing of connection to the redis need to be strictly controlled. | ||
@@ -609,2 +611,17 @@ ### Properties | ||
#### Queue#connect() | ||
Establish the queue's connections to Redis. Will only works if `settings.autoConnect` is set to `false` | ||
```js | ||
const Queue = require('bee-queue'); | ||
const queue = new Queue('example', { | ||
redis: redis: redis.createClient(process.env.REDIS_URL), | ||
autoConnect: false; | ||
}); | ||
await queue.connect(); | ||
queue.createJob({...}) | ||
//.... | ||
``` | ||
#### Queue#isRunning() | ||
@@ -611,0 +628,0 @@ |
105052
1411
940
+ Addedp-finally@2.0.1(transitive)
- Removedp-finally@1.0.0(transitive)
Updatedp-finally@^2.0.0