Socket
Socket
Sign inDemoInstall

bee-queue

Package Overview
Dependencies
Maintainers
2
Versions
19
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bee-queue - npm Package Compare versions

Comparing version 1.0.0 to 1.1.0

lib/.eslintrc.json

6

HISTORY.md

@@ -0,1 +1,7 @@

1.1.0 / 2017-08-12
==================
* Support sharing the redis command client between Queues.
* Add documentation, add tests for expected behavior.
1.0.0 / 2017-06-30

@@ -2,0 +8,0 @@ ==================

3

lib/defaults.js

@@ -22,2 +22,5 @@ 'use strict';

// quitCommandClient is dependent on whether the redis setting was an actual
// redis client, or just configuration options to create such a client.
// Method-specific defaults.

@@ -24,0 +27,0 @@ '#close': {

33

lib/queue.js

@@ -33,2 +33,3 @@ 'use strict';

redis: settings.redis || {},
quitCommandClient: settings.quitCommandClient,
keyPrefix: (settings.prefix || defaults.prefix) + ':' + this.name + ':'

@@ -53,2 +54,8 @@ };

// By default, if we're given a redis client and no additional instructions,
// don't quit the connection on Queue#close.
if (typeof this.settings.quitCommandClient !== 'boolean') {
this.settings.quitCommandClient = !redis.isClient(this.settings.redis);
}
// To avoid changing the hidden class of the Queue.

@@ -62,4 +69,4 @@ this._delayedTimer = this.settings.activateDelayedJobs

const makeClient = (clientName) => {
return redis.createClient(this.settings.redis)
const makeClient = (clientName, createNew) => {
return redis.createClient(this.settings.redis, createNew)
.then((client) => {

@@ -74,3 +81,3 @@ client.on('error', this.emit.bind(this, 'error'));

if (this.settings.getEvents || this.settings.activateDelayedJobs) {
eventsPromise = makeClient('eclient').then(() => {
eventsPromise = makeClient('eclient', true).then(() => {
this.eclient.on('message', this._onMessage.bind(this));

@@ -98,4 +105,4 @@ const channels = [];

// Make the clients
makeClient('client'),
this.settings.isWorker ? makeClient('bclient') : null,
makeClient('client', false),
this.settings.isWorker ? makeClient('bclient', true) : null,
eventsPromise

@@ -106,7 +113,6 @@ ]).then(() => {

}
}).then(() => this);
this._ready.then(() => {
}).then(() => {
this._isReady = true;
setImmediate(() => this.emit('ready'));
return this;
});

@@ -194,3 +200,6 @@ }

const clients = [this.client];
const clients = [];
if (this.settings.quitCommandClient) {
clients.push(this.client);
}
if (this.settings.getEvents) {

@@ -211,3 +220,3 @@ clients.push(this.eclient);

if (this.settings.isWorker) {
this.bclient.end(true);
redis.disconnect(this.bclient);
}

@@ -433,3 +442,3 @@

}, (err) => {
if (err.name === 'AbortError' && this.paused) {
if (redis.isAbortError(err) && this.paused) {
return null;

@@ -733,3 +742,3 @@ }

// Handle aborted redis connections.
if (err.name === 'AbortError') {
if (redis.isAbortError(err)) {
if (this.paused) return;

@@ -736,0 +745,0 @@ // Retry.

@@ -6,10 +6,28 @@ 'use strict';

function createClient(settings) {
// node_redis mutates the options object we provide it.
if (settings && typeof settings === 'object') {
settings = Object.assign({}, settings);
function createClient(settings, createNew) {
let client;
if (isClient(settings)) {
// We assume it's a redis client from either node_redis or ioredis.
client = settings;
if (createNew) {
// Both node_redis and ioredis support the duplicate method, which creates
// a new redis client with the same configuration.
client = client.duplicate();
} else if (isReady(client)) {
// If we were given a redis client, and we don't want to clone it (to
// enable connection sharing between Queue instances), and it's already
// ready, then just return it.
return Promise.resolve(client);
} // otherwise, we wait for the client to be in the ready state.
} else {
// node_redis mutates the options object we provide it, so we clone the
// settings first.
if (typeof settings === 'object') {
settings = Object.assign({}, settings);
}
client = redis.createClient(settings);
}
const client = redis.createClient(settings);
// Wait for the client to be ready, then resolve with the client itself.

@@ -20,2 +38,37 @@ return helpers.waitOn(client, 'ready', true)

function disconnect(client) {
// Redis#end is deprecated for ioredis.
/* istanbul ignore if: this is only for ioredis */
if (client.disconnect) {
client.disconnect();
} else {
// true indicates that it should invoke all pending callbacks with an
// AbortError; we need this behavior.
client.end(true);
}
}
function isAbortError(err) {
// node_redis has a designated class for abort errors, but ioredis just has
// a constant message defined in a utils file.
return err.name === 'AbortError' ||
/* istanbul ignore next: this is only for ioredis */
err.message === 'Connection is closed.';
}
function isClient(object) {
if (!object || typeof object !== 'object') return false;
const name = object.constructor.name;
return name === 'Redis' || name === 'RedisClient';
}
function isReady(client) {
// node_redis has a ready property, ioredis has a status property.
return client.ready || client.status === 'ready';
}
exports.createClient = createClient;
exports.disconnect = disconnect;
exports.isAbortError = isAbortError;
exports.isClient = isClient;
exports.isReady = isReady;
{
"name": "bee-queue",
"version": "1.0.0",
"version": "1.1.0",
"description": "A simple, fast, robust job/task queue, backed by Redis.",

@@ -20,2 +20,10 @@ "main": "index.js",

},
"files": [
"HISTORY.md",
"README.md",
"LICENSE",
"package.json",
"index.js",
"lib"
],
"scripts": {

@@ -27,3 +35,3 @@ "test": "npm run eslint && ava",

"coverage-and-publish": "npm run coverage && nyc report --reporter=text-lcov | coveralls",
"ci": "npm run coverage-and-publish"
"ci": "npm run eslint && npm run coverage-and-publish"
},

@@ -30,0 +38,0 @@ "keywords": [

@@ -283,2 +283,4 @@ <a name="top"></a>

- `socket`: string, Redis socket to be used instead of a host and port.
Note that this can also be a node_redis `RedisClient` instance, in which case Bee-Queue will issue normal commands over it. It will `duplicate()` the client for blocking commands and PubSub subscriptions, if enabled. This is advanced usage,
- `isWorker`: boolean. Disable if this queue will not process jobs.

@@ -292,2 +294,3 @@ - `getEvents`: boolean. Disable if this queue does not need to receive job events.

- `removeOnFailure`: boolean. Enable to have this worker automatically remove its failed jobs from Redis, so as to keep memory usage down. This will not remove jobs that are set to retry unless they fail all their retries.
- `quitCommandClient`: boolean. Whether to `QUIT` the redis command client (the client it sends normal operations over) when `Queue#close` is called. This defaults to `true` for normal usage, and `false` if an existing `RedisClient` object was provided to the `redis` option.
- `redisScanCount`: number. For setting the value of the `SSCAN` Redis command used in `Queue#getJobs` for succeeded and failed job types.

@@ -504,2 +507,12 @@

#### Queue#checkHealth([cb])
Check the "health" of the queue. Returns a promise that resolves to the number of jobs in each state (`waiting`, `active`, `succeeded`, `failed`, `delayed`), and the newest job ID (if using the default ID behavior) in `newestJob`. You can periodically query the `newestJob` ID to estimate the job creation throughput, and can infer the job processing throughput by incorporating the `waiting` and `active` counts.
```js
const counts = await queue.checkHealth();
// print all the job counts
console.log('job state counts:', counts);
```
#### Queue#close([cb])

@@ -509,2 +522,22 @@

The recommended pattern for gracefully shutting down your worker is:
```js
// Some reasonable period of time for all your concurrent jobs to finish
// processing. If a job does not finish processing in this time, it will stall
// and be retried. As such, do attempt to make your jobs idempotent, as you
// generally should with any queue that provides at-least-once delivery.
const TIMEOUT = 30 * 1000;
process.on('uncaughtException', async () => {
// Queue#close is idempotent - no need to guard against duplicate calls.
try {
await queue.close(TIMEOUT);
} catch (err) {
console.error('bee-queue failed to shut down gracefully', err);
}
process.exit(1);
});
```
## Job

@@ -511,0 +544,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc