Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bee-queue

Package Overview
Dependencies
Maintainers
1
Versions
19
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bee-queue - npm Package Compare versions

Comparing version 0.3.0 to 1.0.0

HISTORY.md

23

lib/defaults.js

@@ -0,9 +1,30 @@

'use strict';
module.exports = {
stallInterval: 5000,
// Avoid scheduling timers for further out than this period of time. The
// workers will all poll on this interval, at minimum, to find new delayed
// jobs.
nearTermWindow: 20 * 60 * 1000,
// Avoids rapid churn during processing of nearly-concurrent events.
delayedDebounce: 1000,
prefix: 'bq',
isWorker: true,
getEvents: true,
ensureScripts: true,
activateDelayedJobs: false,
sendEvents: true,
storeJobs: true,
removeOnSuccess: false,
catchExceptions: false
removeOnFailure: false,
redisScanCount: 100,
// Method-specific defaults.
'#close': {
timeout: 5000
},
'#process': {
concurrency: 1
}
};

31

lib/helpers.js

@@ -1,20 +0,19 @@

// Effectively _.after
var barrier = function (n, done) {
return function () {
n -= 1;
if (n === 0) {
done();
}
};
};
'use strict';
var defaultCb = function (err) {
if (err) {
throw err;
}
};
const hasOwn = Object.prototype.hasOwnProperty;
function has(object, name) {
return hasOwn.call(object, name);
}
const promiseUtils = require('promise-callbacks');
module.exports = {
barrier: barrier,
defaultCb: defaultCb
asCallback: promiseUtils.asCallback,
deferred: promiseUtils.deferred,
delay: promiseUtils.delay,
waitOn: promiseUtils.waitOn,
withTimeout: promiseUtils.withTimeout,
wrapAsync: promiseUtils.wrapAsync,
has,
};

@@ -1,117 +0,196 @@

var events = require('events');
var util = require('util');
'use strict';
var helpers = require('./helpers');
var lua = require('./lua');
const Emitter = require('events').EventEmitter;
function Job(queue, jobId, data, options) {
this.queue = queue;
this.id = jobId;
this.progress = 0;
this.data = data || {};
this.options = options || {};
this.status = 'created';
}
const helpers = require('./helpers');
const strategies = require('./backoff');
util.inherits(Job, events.EventEmitter);
class Job extends Emitter {
constructor(queue, jobId, data, options) {
super();
Job.fromId = function (queue, jobId, cb) {
queue.client.hget(queue.toKey('jobs'), jobId, function (err, data) {
/* istanbul ignore if */
if (err) return cb(err);
return cb(null, Job.fromData(queue, jobId, data));
});
};
this.queue = queue;
this.id = jobId;
this.progress = 0;
this.data = data || {};
this.options = options || {};
this.options.timestamp = this.options.timestamp || Date.now();
this.options.stacktraces = this.options.stacktraces || [];
this.status = 'created';
}
Job.fromData = function (queue, jobId, data) {
// no need for try-catch here since we made the JSON ourselves in job#toData
data = JSON.parse(data);
var job = new Job(queue, jobId, data.data, data.options);
job.status = data.status;
return job;
};
static fromId(queue, jobId, cb) {
const promise = queue._commandable().then((client) => {
const jobPromise = helpers.deferred();
client.hget(queue.toKey('jobs'), jobId, jobPromise.defer());
return jobPromise;
}).then((data) => data ? Job.fromData(queue, jobId, data) : null);
Job.prototype.toData = function () {
return JSON.stringify({
data: this.data,
options: this.options,
status: this.status
});
};
if (cb) helpers.asCallback(promise, cb);
return promise;
}
Job.prototype.save = function (cb) {
cb = cb || helpers.defaultCb;
var self = this;
this.queue.client.evalsha(lua.shas.addJob, 3,
this.queue.toKey('id'), this.queue.toKey('jobs'), this.queue.toKey('waiting'),
this.toData(),
function (err, jobId) {
/* istanbul ignore if */
if (err) return cb(err);
self.id = jobId;
self.queue.jobs[jobId] = self;
return cb(null, self);
static fromData(queue, jobId, data) {
// no need for try-catch here since we made the JSON ourselves in Job#toData
data = JSON.parse(data);
const job = new Job(queue, jobId, data.data, data.options);
job.status = data.status;
return job;
}
toData() {
return JSON.stringify({
data: this.data,
options: this.options,
status: this.status
});
}
save(cb) {
const toKey = this.queue.toKey.bind(this.queue);
let promise;
if (this.options.delay) {
promise = this.queue._evalScript('addDelayedJob', 4,
toKey('id'), toKey('jobs'), toKey('delayed'), toKey('earlierDelayed'),
this.id || '', this.toData(), this.options.delay);
if (this.queue.settings.activateDelayedJobs) {
promise = promise.then((jobId) => {
// Only reschedule if the job was actually created.
if (jobId) {
this.queue._delayedTimer.schedule(this.options.delay);
}
return jobId;
});
}
} else {
promise = this.queue._evalScript('addJob', 3,
toKey('id'), toKey('jobs'), toKey('waiting'),
this.id || '', this.toData());
}
);
return this;
};
Job.prototype.retries = function (n) {
if (n < 0) {
throw Error('Retries cannot be negative');
promise = promise.then((jobId) => {
this.id = jobId;
// If the jobId is not null, then store the job in the job map.
if (jobId && this.queue.settings.storeJobs) {
this.queue.jobs.set(jobId, this);
}
return this;
});
if (cb) helpers.asCallback(promise, cb);
return promise;
}
this.options.retries = n;
return this;
};
Job.prototype.timeout = function (ms) {
if (ms < 0) {
throw Error('Timeout cannot be negative');
setId(id) {
this.id = id;
return this;
}
this.options.timeout = ms;
return this;
};
Job.prototype.reportProgress = function (progress, cb) {
// right now we just send the pubsub event
// might consider also updating the job hash for persistence
cb = cb || helpers.defaultCb;
progress = Number(progress);
if (progress < 0 || progress > 100) {
return process.nextTick(cb.bind(null, Error('Progress must be between 0 and 100')));
retries(n) {
if (n < 0) {
throw new Error('Retries cannot be negative');
}
this.options.retries = n;
return this;
}
this.progress = progress;
this.queue.client.publish(this.queue.toKey('events'), JSON.stringify({
id: this.id,
event: 'progress',
data: progress
}), cb);
};
Job.prototype.remove = function (cb) {
cb = cb || helpers.defaultCb;
this.queue.client.evalsha(lua.shas.removeJob, 6,
this.queue.toKey('succeeded'), this.queue.toKey('failed'), this.queue.toKey('waiting'),
this.queue.toKey('active'), this.queue.toKey('stalling'), this.queue.toKey('jobs'),
this.id,
cb
);
};
delayUntil(timestamp) {
// Get the timestamp from Date objects.
if (timestamp && typeof timestamp.getTime === 'function') {
timestamp = timestamp.getTime();
} else {
timestamp = parseInt(timestamp, 10);
}
if (isNaN(timestamp) || timestamp < 0) {
throw new Error('invalid delay timestamp');
}
if (timestamp > Date.now()) {
this.options.delay = timestamp;
}
return this;
}
Job.prototype.retry = function (cb) {
cb = cb || helpers.defaultCb;
this.queue.client.multi()
.srem(this.queue.toKey('failed'), this.id)
.lpush(this.queue.toKey('waiting'), this.id)
.exec(cb);
};
timeout(ms) {
if (ms < 0) {
throw new Error('Timeout cannot be negative');
}
this.options.timeout = ms;
return this;
}
Job.prototype.isInSet = function (set, cb) {
this.queue.client.sismember(this.queue.toKey(set), this.id, function (err, result) {
/* istanbul ignore if */
if (err) return cb(err);
return cb(null, result === 1);
});
};
backoff(strategy, delay) {
if (!strategies.has(strategy)) {
throw new Error('unknown strategy');
}
if (!Number.isSafeInteger(delay) || delay <= 0) {
throw new Error('delay must be a positive integer');
}
this.options.backoff = {
strategy,
delay
};
return this;
}
reportProgress(progress, cb) {
// right now we just send the pubsub event
// might consider also updating the job hash for persistence
progress = parseInt(progress, 10);
let promise;
if (progress >= 0 && progress <= 100) {
this.progress = progress;
promise = this.queue._commandable().then((client) => {
const publishPromise = helpers.deferred();
const payload = JSON.stringify({
id: this.id,
event: 'progress',
data: progress
});
client.publish(this.queue.toKey('events'), payload,
publishPromise.defer());
return publishPromise;
});
} else {
promise = Promise.reject(new Error('Progress must be between 0 and 100'));
}
if (cb) helpers.asCallback(promise, cb);
return promise;
}
remove(cb) {
const promise = this.queue.removeJob(this.id).then(() => this);
if (cb) helpers.asCallback(promise, cb);
return promise;
}
retry(cb) {
const promise = this.queue._commandable().then((client) => {
const retryPromise = helpers.deferred();
client.multi()
.srem(this.queue.toKey('failed'), this.id)
.lpush(this.queue.toKey('waiting'), this.id)
.exec(retryPromise.defer());
return retryPromise;
});
if (cb) helpers.asCallback(promise, cb);
return promise;
}
isInSet(set, cb) {
const promise = this.queue._commandable().then((client) => {
const memberPromise = helpers.deferred();
client.sismember(this.queue.toKey(set), this.id, memberPromise.defer());
return memberPromise;
}).then((result) => result === 1);
if (cb) helpers.asCallback(promise, cb);
return promise;
}
}
module.exports = Job;

@@ -1,58 +0,65 @@

var fs = require('fs');
var crypto = require('crypto');
var path = require('path');
var barrier = require('../helpers').barrier;
'use strict';
var scripts = {};
var shas = {};
var scriptsRead = false;
var cachedServers = {};
const fs = require('fs');
const crypto = require('crypto');
const path = require('path');
var readScripts = function () {
fs.readdirSync(__dirname)
.filter(function (file) {
return file.slice(-4) === '.lua';
}).forEach(function (file) {
var hash = crypto.createHash('sha1');
var key = file.slice(0, -4);
scripts[key] = fs.readFileSync(path.join(__dirname, file)).toString();
hash.update(scripts[key]);
shas[key] = hash.digest('hex');
});
scriptsRead = true;
};
const promisify = require('promise-callbacks').promisify;
var buildCache = function (serverKey, client, cb) {
if (cachedServers[serverKey]) {
return cb();
}
const scripts = {};
const shas = {};
let scriptsRead = false;
let scriptsPromise = null;
if (!scriptsRead) {
readScripts();
}
const readFile = promisify.methods(fs, ['readFile']).readFile;
const readDir = promisify.methods(fs, ['readdir']).readdir;
var reportLoaded = barrier(Object.keys(shas).length, function () {
cachedServers[serverKey] = true;
return cb();
});
function readScript(file) {
return readFile(path.join(__dirname, file), 'utf8')
.then((script) => {
const name = file.slice(0, -4);
scripts[name] = script;
const hash = crypto.createHash('sha1');
hash.update(script);
shas[name] = hash.digest('hex');
});
}
// todo: could theoretically pipeline this, but it's pretty insignificant
Object.keys(shas).forEach(function (key) {
client.script('exists', shas[key], function (err, exists) {
/* istanbul ignore if */
if (err) {
client.emit('error', 'Could not build Lua script cache');
} else if (exists[0] === 0) {
client.script('load', scripts[key], reportLoaded);
} else {
reportLoaded();
}
function readScripts() {
if (scriptsRead) return scriptsPromise;
scriptsRead = true;
return scriptsPromise = readDir(__dirname).then((files) => {
return Promise.all(files.filter((file) => file.endsWith('.lua'))
.map(readScript));
}).then(() => scripts);
}
function buildCache(client) {
// We could theoretically pipeline this, but it's pretty insignificant.
return readScripts().then(() => Promise.all(Object.keys(shas).map((key) => {
return new Promise((resolve, reject) => {
client.script('exists', shas[key], (err, exists) => {
/* istanbul ignore if */
if (err) {
reject(err);
} else if (exists[0] === 0) {
client.script('load', scripts[key], (loadErr) => {
/* istanbul ignore if */
if (loadErr) {
return reject(loadErr);
}
resolve();
});
} else {
resolve();
}
});
});
});
};
})));
}
module.exports = {
scripts: scripts,
shas: shas,
buildCache: buildCache
scripts,
shas,
buildCache
};

@@ -1,355 +0,765 @@

var redis = require('redis');
var events = require('events');
var util = require('util');
'use strict';
var Job = require('./job');
var defaults = require('./defaults');
var lua = require('./lua');
var helpers = require('./helpers');
var barrier = helpers.barrier;
const redis = require('./redis');
const Emitter = require('events').EventEmitter;
function Queue(name, settings) {
if (!(this instanceof Queue)) {
return new Queue(name, settings);
}
const Job = require('./job');
const defaults = require('./defaults');
const lua = require('./lua');
const helpers = require('./helpers');
const backoff = require('./backoff');
const EagerTimer = require('./eager-timer');
this.name = name;
this.paused = false;
this.jobs = {};
class Queue extends Emitter {
constructor(name, settings) {
super();
settings = settings || {};
this.settings = {
redis: settings.redis || {},
stallInterval: typeof settings.stallInterval === 'number' ?
settings.stallInterval :
defaults.stallInterval,
keyPrefix: (settings.prefix || defaults.prefix) + ':' + this.name + ':'
};
this.name = name;
this.paused = false;
this.jobs = new Map();
this.activeJobs = new Set();
this.checkTimer = null;
var boolProps = ['isWorker', 'getEvents', 'sendEvents', 'removeOnSuccess', 'catchExceptions'];
boolProps.forEach(function (prop) {
this.settings[prop] = typeof settings[prop] === 'boolean' ? settings[prop] : defaults[prop];
}.bind(this));
this._closed = null;
this._isClosed = false;
/* istanbul ignore if */
if (this.settings.redis.socket) {
this.settings.redis.params = [this.settings.redis.socket, this.settings.redis.options];
} else {
this.settings.redis.port = this.settings.redis.port || 6379;
this.settings.redis.host = this.settings.redis.host || '127.0.0.1';
this.settings.redis.params = [
this.settings.redis.port, this.settings.redis.host, this.settings.redis.options
];
this.client = null;
this.bclient = null;
this.eclient = null;
settings = settings || {};
this.settings = {
redis: settings.redis || {},
keyPrefix: (settings.prefix || defaults.prefix) + ':' + this.name + ':'
};
for (let prop in defaults) {
const def = defaults[prop], setting = settings[prop], type = typeof def;
if (type === 'boolean') {
this.settings[prop] = typeof setting === 'boolean' ? setting : def;
} else if (type === 'number') {
this.settings[prop] = Number.isSafeInteger(setting) ? setting : def;
}
}
/* istanbul ignore if */
if (this.settings.redis.socket) {
this.settings.redis = Object.assign({}, this.settings.redis, {
path: this.settings.redis.socket
});
}
// To avoid changing the hidden class of the Queue.
this._delayedTimer = this.settings.activateDelayedJobs
? new EagerTimer(this.settings.nearTermWindow)
: null;
if (this._delayedTimer) {
this._delayedTimer.on('trigger', this._activateDelayed.bind(this));
}
const makeClient = (clientName) => {
return redis.createClient(this.settings.redis)
.then((client) => {
client.on('error', this.emit.bind(this, 'error'));
return this[clientName] = client;
});
};
let eventsPromise = null;
if (this.settings.getEvents || this.settings.activateDelayedJobs) {
eventsPromise = makeClient('eclient').then(() => {
this.eclient.on('message', this._onMessage.bind(this));
const channels = [];
if (this.settings.getEvents) {
channels.push(this.toKey('events'));
}
if (this.settings.activateDelayedJobs) {
channels.push(this.toKey('earlierDelayed'));
}
return Promise.all(channels.map((channel) => {
const promise = helpers.deferred();
this.eclient.subscribe(channel, promise.defer());
return promise;
}));
});
}
this._isReady = false;
// Wait for Lua scripts and client connections to load. Also wait for
// bclient and eclient/subscribe if they're needed.
this._ready = Promise.all([
// Make the clients
makeClient('client'),
this.settings.isWorker ? makeClient('bclient') : null,
eventsPromise
]).then(() => {
if (this.settings.ensureScripts) {
return lua.buildCache(this.client);
}
}).then(() => this);
this._ready.then(() => {
this._isReady = true;
setImmediate(() => this.emit('ready'));
});
}
this.settings.redis.db = this.settings.redis.db || 0;
// Wait for Lua loading and client connection; bclient and eclient/subscribe if needed
var reportReady = barrier(
2 + this.settings.isWorker + this.settings.getEvents * 2,
this.emit.bind(this, 'ready')
);
_onMessage(channel, message) {
if (channel === this.toKey('earlierDelayed')) {
// We should only receive these messages if activateDelayedJobs is
// enabled.
this._delayedTimer.schedule(parseInt(message, 10));
return;
}
var makeClient = function (clientName) {
this[clientName] = redis.createClient.apply(redis, this.settings.redis.params);
this[clientName].on('error', this.emit.bind(this, 'error'));
this[clientName].select(this.settings.redis.db, reportReady);
}.bind(this);
message = JSON.parse(message);
if (message.event === 'failed' || message.event === 'retrying') {
message.data = new Error(message.data);
}
makeClient('client');
this.emit('job ' + message.event, message.id, message.data);
if (this.settings.isWorker) {
makeClient('bclient');
const job = this.jobs.get(message.id);
if (job) {
if (message.event === 'progress') {
job.progress = message.data;
} else if (message.event === 'retrying') {
job.options.retries -= 1;
}
job.emit(message.event, message.data);
if (message.event === 'succeeded' || message.event === 'failed') {
this.jobs.delete(message.id);
}
}
}
if (this.settings.getEvents) {
makeClient('eclient');
this.eclient.subscribe(this.toKey('events'));
this.eclient.on('message', this.onMessage.bind(this));
this.eclient.on('subscribe', reportReady);
isRunning() {
return !this.paused;
}
this.settings.serverKey = this.settings.redis.socket || this.settings.redis.host + ':' + this.settings.redis.port;
lua.buildCache(this.settings.serverKey, this.client, reportReady);
}
ready(cb) {
if (cb) this._ready.then(() => cb(null), cb);
util.inherits(Queue, events.EventEmitter);
return this._ready;
}
Queue.prototype.onMessage = function (channel, message) {
message = JSON.parse(message);
if (message.event === 'failed' || message.event === 'retrying') {
message.data = Error(message.data);
_commandable(requireBlocking) {
if (requireBlocking ? this.paused : this._isClosed) {
return Promise.reject(new Error('closed'));
}
if (this._isReady) {
return Promise.resolve(requireBlocking ? this.bclient : this.client);
}
return this._ready.then(() => this._commandable(requireBlocking));
}
this.emit('job ' + message.event, message.id, message.data);
close(timeout, cb) {
if (typeof timeout === 'function') {
cb = timeout;
timeout = defaults['#close'].timeout;
} else if (!Number.isSafeInteger(timeout) || timeout <= 0) {
timeout = defaults['#close'].timeout;
}
if (this.jobs[message.id]) {
if (message.event === 'progress') {
this.jobs[message.id].progress = message.data;
} else if (message.event === 'retrying') {
this.jobs[message.id].options.retries -= 1;
if (this.paused) {
return this._closed;
}
this.jobs[message.id].emit(message.event, message.data);
this.paused = true;
if (message.event === 'succeeded' || message.event === 'failed') {
delete this.jobs[message.id];
if (this.checkTimer) {
clearTimeout(this.checkTimer);
this.checkTimer = null;
}
}
};
Queue.prototype.close = function (cb) {
cb = cb || helpers.defaultCb;
this.paused = true;
if (this._delayedTimer) {
this._delayedTimer.stop();
}
/* istanbul ignore next */
var closeTimeout = setTimeout(function () {
return cb(Error('Timed out closing redis connections'));
}, 5000);
const cleanup = () => {
this._isClosed = true;
var clients = [this.client];
if (this.settings.isWorker) {
clients.push(this.bclient);
const clients = [this.client];
if (this.settings.getEvents) {
clients.push(this.eclient);
}
return Promise.all(clients.map((client) => {
const promise = helpers.deferred();
client.quit(promise.defer());
return promise;
}));
};
const closed = helpers.withTimeout(this._ready.then(() => {
// Stop the blocking connection, ensures that we don't accept additional
// jobs while waiting for the ongoing jobs to terminate.
if (this.settings.isWorker) {
this.bclient.end(true);
}
// Wait for all the jobs to complete. Ignore job errors during shutdown.
const waitJobs = Array.from(this.activeJobs);
return Promise.all(waitJobs.map((promise) => promise.catch(() => {})));
}), timeout).then(() => {
return cleanup().then(() => undefined);
}, (err) => {
return cleanup().then(() => Promise.reject(err));
});
this._closed = closed;
if (cb) helpers.asCallback(closed, cb);
return closed;
}
if (this.settings.getEvents) {
clients.push(this.eclient);
destroy(cb) {
const promise = this._commandable().then((client) => {
const deleted = helpers.deferred();
const args = ['id', 'jobs', 'stallBlock', 'stalling', 'waiting', 'active',
'succeeded', 'failed', 'delayed']
.map((key) => this.toKey(key));
args.push(deleted.defer());
client.del.apply(client, args);
return deleted;
});
if (cb) helpers.asCallback(promise, cb);
return promise;
}
var handleEnd = barrier(clients.length, function () {
clearTimeout(closeTimeout);
return cb(null);
});
checkHealth(cb) {
const promise = this._commandable().then((client) => {
const multi = helpers.deferred();
client.multi()
.llen(this.toKey('waiting'))
.llen(this.toKey('active'))
.scard(this.toKey('succeeded'))
.scard(this.toKey('failed'))
.zcard(this.toKey('delayed'))
.get(this.toKey('id'))
.exec(multi.defer());
return multi;
}).then((results) => ({
waiting: results[0],
active: results[1],
succeeded: results[2],
failed: results[3],
delayed: results[4],
newestJob: results[5] ? parseInt(results[5], 10) : 0
}));
clients.forEach(function (client) {
client.end();
client.stream.on('close', handleEnd);
});
};
if (cb) helpers.asCallback(promise, cb);
return promise;
}
Queue.prototype.destroy = function (cb) {
cb = cb || helpers.defaultCb;
var keys = ['id', 'jobs', 'stallTime', 'stalling', 'waiting', 'active', 'succeeded', 'failed']
.map(this.toKey.bind(this));
this.client.del.apply(this.client, keys.concat(cb));
};
_scanForJobs(key, cursor, size, set, cb) {
const batchCount = Math.min(size, this.settings.redisScanCount);
this.client.sscan(key, cursor, 'COUNT', batchCount, (err, results) => {
/* istanbul ignore if */
if (err) {
return cb(err);
}
Queue.prototype.checkHealth = function (cb) {
this.client.multi()
.llen(this.toKey('waiting'))
.llen(this.toKey('active'))
.scard(this.toKey('succeeded'))
.scard(this.toKey('failed'))
.exec(function (err, results) {
/* istanbul ignore if */
if (err) return cb(err);
return cb(null, {
waiting: results[0],
active: results[1],
succeeded: results[2],
failed: results[3]
});
const nextCursor = results[0];
const ids = results[1];
// A given element may be returned multiple times in SSCAN.
// So, we use a set to remove duplicates.
// https://redis.io/commands/scan#scan-guarantees
for (let id of ids) {
// For small sets, encoded as intsets, SSCAN will ignore COUNT.
// https://redis.io/commands/scan#the-count-option
if (set.size === size) break;
set.add(id);
}
if (nextCursor === '0' || set.size >= size) {
return cb(null, set);
}
this._scanForJobs(key, nextCursor, size, set, cb);
});
};
}
Queue.prototype.createJob = function (data) {
return new Job(this, null, data);
};
_addJobsByIds(jobs, ids) {
// We need to re-ensure the queue is commandable, as we might be shutting
// down during this operation.
return this._commandable().then((client) => {
const got = helpers.deferred();
const commandArgs = [this.toKey('jobs')].concat(ids, got.defer());
client.hmget.apply(client, commandArgs);
return got;
}).then((dataArray) => {
const count = ids.length;
// Some jobs returned by the scan may have already been removed, so filter
// them out.
for (let i = 0; i < count; ++i) {
const jobData = dataArray[i];
/* istanbul ignore else: not worth unit-testing this edge case */
if (jobData) {
jobs.push(Job.fromData(this, ids[i], jobData));
}
}
return jobs;
});
}
Queue.prototype.getJob = function (jobId, cb) {
var self = this;
if (jobId in this.jobs) {
return process.nextTick(cb.bind(null, null, this.jobs[jobId]));
} else {
Job.fromId(this, jobId, function (err, job) {
/* istanbul ignore if */
if (err) return cb(err);
self.jobs[jobId] = job;
return cb(err, job);
/**
* Get jobs from queue type.
*
* @param {String} type The queue type (failed, succeeded, waiting, etc.)
* @param {?Object=} page An object containing some of the following fields.
* @param {Number=} page.start Start of query range for waiting/active/delayed
* queue types. Defaults to 0.
* @param {Number=} page.end End of query range for waiting/active/delayed
* queue types. Defaults to 100.
* @param {Number=} page.size Number jobs to return for failed/succeeded (SET)
* types. Defaults to 100.
* @param {Function=} callback Called with the equivalent of the returned
* promise.
* @return {Promise<Job[]>} Resolves to the jobs the function found.
*/
getJobs(type, page, cb) {
if (typeof page === 'function') {
cb = page;
page = null;
}
// Set defaults.
page = Object.assign({
size: 100,
start: 0,
end: 100
}, page);
const promise = this._commandable().then((client) => {
const idsPromise = helpers.deferred(), next = idsPromise.defer();
const key = this.toKey(type);
switch (type) {
case 'failed':
case 'succeeded':
this._scanForJobs(key, '0', page.size, new Set(), next);
break;
case 'waiting':
case 'active':
client.lrange(key, page.start, page.end, next);
break;
case 'delayed':
client.zrange(key, page.start, page.end, next);
break;
default:
throw new Error('Improper queue type');
}
return idsPromise;
}).then((ids) => {
const jobs = [], idsToFetch = [];
// ids might be a Set or an Array, but this will iterate just the same.
for (let jobId of ids) {
const job = this.jobs.get(jobId);
if (job) {
jobs.push(job);
} else {
idsToFetch.push(jobId);
}
}
if (!idsToFetch.length) {
return jobs;
}
return this._addJobsByIds(jobs, idsToFetch);
});
if (cb) helpers.asCallback(promise, cb);
return promise;
}
};
Queue.prototype.getNextJob = function (cb) {
var self = this;
this.bclient.brpoplpush(this.toKey('waiting'), this.toKey('active'), 0, function (err, jobId) {
/* istanbul ignore if */
if (err) return cb(err);
return Job.fromId(self, Number(jobId), cb);
});
};
createJob(data) {
return new Job(this, null, data);
}
Queue.prototype.runJob = function (job, cb) {
var self = this;
var psTimeout;
var handled = false;
getJob(jobId, cb) {
const promise = this._commandable().then(() => {
if (this.jobs.has(jobId)) {
return this.jobs.get(jobId);
}
return Job.fromId(this, jobId);
}).then((job) => {
if (job && this.settings.storeJobs) {
this.jobs.set(jobId, job);
}
return job;
});
var preventStalling = function () {
self.client.srem(self.toKey('stalling'), job.id, function () {
if (!handled) {
psTimeout = setTimeout(preventStalling, self.settings.stallInterval / 2);
if (cb) helpers.asCallback(promise, cb);
return promise;
}
removeJob(jobId, cb) {
const promise = this._evalScript('removeJob', 7,
this.toKey('succeeded'), this.toKey('failed'), this.toKey('waiting'),
this.toKey('active'), this.toKey('stalling'), this.toKey('jobs'),
this.toKey('delayed'), jobId)
.then(() => this);
if (cb) helpers.asCallback(promise, cb);
return promise;
}
_waitForJob() {
const idPromise = helpers.deferred();
this.bclient.brpoplpush(this.toKey('waiting'), this.toKey('active'), 0,
idPromise.defer());
return idPromise.then((jobId) => {
// Note that the job may be null in the case that the client has removed
// the job before processing can take place, but after the brpoplpush has
// returned the job id.
return Job.fromId(this, jobId);
}, (err) => {
if (err.name === 'AbortError' && this.paused) {
return null;
}
throw err;
});
};
preventStalling();
}
var handleOutcome = function (err, data) {
// silently ignore any multiple calls
if (handled) {
return;
_getNextJob() {
// Under normal calling conditions, commandable will not reject because we
// will have just checked paused in Queue#process.
return this._commandable(true).then(() => this._waitForJob());
}
_runJob(job) {
let psTimeout = null, completed = false;
const preventStalling = () => {
psTimeout = null;
if (this._isClosed) return;
this._preventStall(job.id).then(() => {
if (completed || this._isClosed) return;
const interval = this.settings.stallInterval / 2;
psTimeout = setTimeout(preventStalling, interval);
});
};
preventStalling();
const handleOutcome = (err, data) => {
completed = true;
if (psTimeout) {
clearTimeout(psTimeout);
psTimeout = null;
}
return this._finishJob(err, data, job);
};
let promise = this.handler(job);
if (job.options.timeout) {
const message = `Job ${job.id} timed out (${job.options.timeout} ms)`;
promise = helpers.withTimeout(promise, job.options.timeout, message);
}
handled = true;
clearTimeout(psTimeout);
const jobPromise = promise
.then((data) => handleOutcome(null, data), handleOutcome)
.then((data) => {
this.activeJobs.delete(jobPromise);
return data;
}, (err) => {
// The only error that can happen here is either network- or
// Redis-related, or if Queue#close times out while a job is processing,
// and the job later finishes.
this.activeJobs.delete(jobPromise);
throw err;
});
self.finishJob(err, data, job, cb);
};
// We specifically use the value produced by then to avoid cases where the
// process handler returns the same Promise object each invocation.
this.activeJobs.add(jobPromise);
return jobPromise;
}
if (job.options.timeout) {
var msg = 'Job ' + job.id + ' timed out (' + job.options.timeout + ' ms)';
setTimeout(handleOutcome.bind(null, Error(msg)), job.options.timeout);
_preventStall(jobId) {
const promise = helpers.deferred(), cb = promise.defer();
this.client.srem(this.toKey('stalling'), jobId, cb);
/* istanbul ignore next: these errors are only redis or network errors */
return promise.catch((err) => this.emit('error', err));
}
if (this.settings.catchExceptions) {
try {
this.handler(job, handleOutcome);
} catch (err) {
handleOutcome(err);
_finishJob(err, data, job) {
const status = err ? 'failed' : 'succeeded';
if (this._isClosed) {
throw new Error(`unable to update the status of ${status} job ${job.id}`);
}
} else {
this.handler(job, handleOutcome);
}
};
Queue.prototype.finishJob = function (err, data, job, cb) {
var status = err ? 'failed' : 'succeeded';
const multi = this.client.multi()
.lrem(this.toKey('active'), 0, job.id)
.srem(this.toKey('stalling'), job.id);
var multi = this.client.multi()
.lrem(this.toKey('active'), 0, job.id)
.srem(this.toKey('stalling'), job.id);
const jobEvent = {
id: job.id,
event: status,
data: err ? err.message : data
};
var jobEvent = {
id: job.id,
event: status,
data: err ? err.message : data
};
if (err) {
const errInfo = err.stack || err.message || err;
job.options.stacktraces.unshift(errInfo);
if (status === 'failed') {
if (job.options.retries > 0) {
job.options.retries -= 1;
jobEvent.event = 'retrying';
multi.hset(this.toKey('jobs'), job.id, job.toData())
.lpush(this.toKey('waiting'), job.id);
const strategyName = job.options.backoff
? job.options.backoff.strategy
: 'immediate';
const strategy = job.options.retries > 0
? backoff.get(strategyName)
: null;
const delay = strategy ? strategy(job) : -1;
if (delay < 0) {
job.status = 'failed';
if (this.settings.removeOnFailure) {
multi.hdel(this.toKey('jobs'), job.id);
} else {
multi.hset(this.toKey('jobs'), job.id, job.toData());
multi.sadd(this.toKey('failed'), job.id);
}
} else {
job.options.retries -= 1;
job.status = 'retrying';
jobEvent.event = 'retrying';
multi.hset(this.toKey('jobs'), job.id, job.toData());
if (delay === 0) {
multi.lpush(this.toKey('waiting'), job.id);
} else {
const time = Date.now() + delay;
multi.zadd(this.toKey('delayed'), time, job.id)
.publish(this.toKey('earlierDelayed'), time);
}
}
} else {
multi.sadd(this.toKey('failed'), job.id);
job.status = 'succeeded';
if (this.settings.removeOnSuccess) {
multi.hdel(this.toKey('jobs'), job.id);
} else {
multi.hset(this.toKey('jobs'), job.id, job.toData());
multi.sadd(this.toKey('succeeded'), job.id);
}
}
} else {
if (this.settings.removeOnSuccess) {
multi.hdel(this.toKey('jobs'), job.id);
} else {
multi.sadd(this.toKey('succeeded'), job.id);
if (this.settings.sendEvents) {
multi.publish(this.toKey('events'), JSON.stringify(jobEvent));
}
}
if (this.settings.sendEvents) {
multi.publish(this.toKey('events'), JSON.stringify(jobEvent));
}
const result = err || data;
multi.exec(function (errMulti) {
/* istanbul ignore if */
if (errMulti) return cb(errMulti);
return cb(null, status, err ? err : data);
});
};
const promise = helpers.deferred();
multi.exec(promise.defer());
Queue.prototype.process = function (concurrency, handler) {
if (!this.settings.isWorker) {
throw Error('Cannot call Queue.prototype.process on a non-worker');
return promise.then(() => [status, result]);
}
if (this.handler) {
throw Error('Cannot call Queue.prototype.process twice');
}
process(concurrency, handler) {
if (!this.settings.isWorker) {
throw new Error('Cannot call Queue#process on a non-worker');
}
if (typeof concurrency === 'function') {
handler = concurrency;
concurrency = 1;
}
if (this.handler) {
throw new Error('Cannot call Queue#process twice');
}
var self = this;
this.handler = handler;
this.running = 0;
this.queued = 1;
this.concurrency = concurrency;
if (this.paused) {
throw new Error('closed');
}
var jobTick = function () {
if (self.paused) {
self.queued -= 1;
return;
if (typeof concurrency === 'function') {
handler = concurrency;
concurrency = defaults['#process'].concurrency;
}
// invariant: in this code path, self.running < self.concurrency, always
// after spoolup, self.running + self.queued === self.concurrency
self.getNextJob(function (getErr, job) {
/* istanbul ignore if */
if (getErr) {
self.emit('error', getErr);
return setImmediate(jobTick);
}
// If the handler throws a synchronous exception (only applicable to
// non-`async` functions), catch it, and fail the job.
const catchExceptions = true;
this.handler = helpers.wrapAsync(handler, catchExceptions);
this.running = 0;
this.queued = 1;
this.concurrency = concurrency;
self.running += 1;
self.queued -= 1;
if (self.running + self.queued < self.concurrency) {
self.queued += 1;
setImmediate(jobTick);
const jobTick = () => {
if (this.paused) {
this.queued -= 1;
return;
}
self.runJob(job, function (err, status, result) {
self.running -= 1;
self.queued += 1;
// invariant: in this code path, this.running < this.concurrency, always
// after spoolup, this.running + this.queued === this.concurrency
this._getNextJob().then((job) => {
// We're shutting down.
if (this.paused) {
// This job will get picked up later as a stalled job if we happen to
// get here. We can't easily process this job because at this point
// Queue#close has already captured the activeJobs set in a
// Promise.all call, and we'd prefer to delay a job than half-process
// it.
this.queued -= 1;
return;
}
/* istanbul ignore if */
if (err) {
self.emit('error', err);
} else {
self.emit(status, job, result);
this.running += 1;
this.queued -= 1;
if (this.running + this.queued < this.concurrency) {
this.queued += 1;
setImmediate(jobTick);
}
if (!job) {
// Per comment in Queue#_waitForJob, this branch is possible when the
// job is removed before processing can take place, but after being
// initially acquired.
setImmediate(jobTick);
return;
}
return this._runJob(job).catch((err) => {
this.emit('error', err);
}).then((results) => {
this.running -= 1;
this.queued += 1;
setImmediate(jobTick);
/* istanbul ignore else */
if (results) {
const status = results[0], result = results[1];
this.emit(status, job, result);
}
});
}, (err) => {
setImmediate(jobTick);
throw err;
}).catch((err) => this.emit('error', err));
};
this.checkStalledJobs(jobTick);
this._activateDelayed();
return this;
}
_doStalledJobCheck() {
return this._evalScript('checkStalledJobs', 4, this.toKey('stallBlock'),
this.toKey('stalling'), this.toKey('waiting'), this.toKey('active'),
this.settings.stallInterval)
.then((stalled) => {
for (let jobId of stalled) {
this.emit('stalled', jobId);
}
return stalled.length;
});
});
};
}
var restartProcessing = function () {
// maybe need to increment queued here?
self.bclient.once('ready', jobTick);
};
this.bclient.on('error', restartProcessing);
this.bclient.on('end', restartProcessing);
_checkStalledJobs(interval, cb) {
const promise = this._doStalledJobCheck();
if (cb) helpers.asCallback(promise, cb);
this.checkStalledJobs(setImmediate.bind(null, jobTick));
};
if (interval && !this.checkTimer) {
promise.then(() => {
if (this.checkTimer || this.paused) return;
this.checkTimer = setTimeout(() => {
// The checkTimer is removed when Queue#close is called, so we don't
// need to check for it here.
this.checkTimer = null;
const postStalled = this._checkStalledJobs(interval, cb);
// If it's not the first call, and a callback is not defined, then we
// must emit errors to avoid unnecessary unhandled rejections.
/* istanbul ignore next: these are only redis and connection errors */
postStalled.catch(cb ? (err) => this.emit('error', err) : null);
}, interval);
});
}
Queue.prototype.checkStalledJobs = function (interval, cb) {
var self = this;
cb = typeof interval === 'function' ? interval : cb || helpers.defaultCb;
return promise;
}
this.client.evalsha(lua.shas.checkStalledJobs, 4,
this.toKey('stallTime'), this.toKey('stalling'), this.toKey('waiting'), this.toKey('active'),
Date.now(), this.settings.stallInterval, function (err) {
/* istanbul ignore if */
if (err) return cb(err);
/**
* Check for stalled jobs.
*
* @param {Number=} interval The interval on which to check for stalled jobs.
* This should be set to half the stallInterval setting, to avoid
* unnecessary work.
* @param {Function=} callback Called with the equivalent of the returned
* promise. If interval is provided, the callback will be invoked after each
* invocation of checkStalledJobs.
* @return {Promise<Number>} Resolves to the number of stalled jobs the
* function found.
*/
checkStalledJobs(interval, cb) {
if (typeof interval === 'function') {
cb = interval;
interval = null;
} else if (!Number.isSafeInteger(interval)) {
interval = null;
}
return this._checkStalledJobs(interval, cb);
}
if (typeof interval === 'number') {
setTimeout(self.checkStalledJobs.bind(self, interval, cb), interval);
}
_activateDelayed() {
if (!this.settings.activateDelayedJobs) return;
this._evalScript('raiseDelayedJobs', 2,
this.toKey('delayed'),
this.toKey('waiting'),
Date.now(), this.settings.delayedDebounce)
.then((results) => {
const numRaised = results[0], nextOpportunity = results[1];
if (numRaised) {
this.emit('raised jobs', numRaised);
}
this._delayedTimer.schedule(parseInt(nextOpportunity, 10));
}, /* istanbul ignore next */ (err) => {
// Handle aborted redis connections.
if (err.name === 'AbortError') {
if (this.paused) return;
// Retry.
return this._activateDelayed();
}
this.emit('error', err);
});
}
return cb();
toKey(str) {
return this.settings.keyPrefix + str;
}
/**
* Evaluate the named script, return a promise with its results.
*
* Same parameter list/syntax as evalsha, except for the name.
*/
_evalScript(name) {
// Avoid deoptimization by leaking arguments: store them directly in an
// array instead of passing them to a helper.
const args = new Array(arguments.length);
// Skip the first because it's just the name, and it'll get filled in within
// the promise.
for (let i = 1; i < arguments.length; ++i) {
args[i] = arguments[i];
}
);
};
Queue.prototype.toKey = function (str) {
return this.settings.keyPrefix + str;
};
return this._commandable().then((client) => {
// Get the sha for the script after we're ready to avoid a race condition
// with the lua script loader.
args[0] = lua.shas[name];
const promise = helpers.deferred();
args.push(promise.defer());
client.evalsha.apply(client, args);
return promise;
});
}
}
module.exports = Queue;
{
"name": "bee-queue",
"version": "0.3.0",
"version": "1.0.0",
"description": "A simple, fast, robust job/task queue, backed by Redis.",
"main": "index.js",
"dependencies": {
"redis": "1.0.0"
"promise-callbacks": "^3.0.0",
"redis": "^2.7.1"
},
"devDependencies": {
"chai": "^1.10.0",
"grunt": "~0.4.5",
"grunt-eslint": "^14.0.0",
"grunt-githooks": "^0.3.1",
"grunt-mocha-test": "^0.12.6",
"sinon": "^1.12.2",
"coveralls": "^2.11.2"
"ava": "^0.21.0",
"ava-spec": "^1.1.0",
"coveralls": "^2.11.2",
"eslint": ">= 3",
"lolex": "^2.0.0",
"nyc": "^11.0.3",
"sandboxed-module": "^2.0.3",
"sinon": "^2.3.8"
},
"scripts": {
"test": "grunt test"
"test": "npm run eslint && ava",
"eslint": "eslint lib/ test/",
"coverage": "nyc ava",
"report": "npm run coverage && nyc report --reporter=html",
"coverage-and-publish": "npm run coverage && nyc report --reporter=text-lcov | coveralls",
"ci": "npm run coverage-and-publish"
},

@@ -31,2 +38,5 @@ "keywords": [

],
"engines": {
"node": ">= 4"
},
"author": "Lewis J Ellis <me@lewisjellis.com>",

@@ -36,8 +46,8 @@ "license": "MIT",

"type": "git",
"url": "https://github.com/LewisJEllis/bee-queue.git"
"url": "https://github.com/bee-queue/bee-queue.git"
},
"bugs": {
"url": "https://github.com/LewisJEllis/bee-queue/issues"
"url": "https://github.com/bee-queue/bee-queue/issues"
},
"homepage": "https://github.com/LewisJEllis/bee-queue"
"homepage": "https://github.com/bee-queue/bee-queue"
}
<a name="top"></a>
![bee-queue logo](https://raw.githubusercontent.com/LewisJEllis/bee-queue/master/bee-queue.png)
![bee-queue logo](https://raw.githubusercontent.com/bee-queue/bee-queue/master/bee-queue.png)
[![NPM Version][npm-image]][npm-url] [![Build Status][travis-image]][travis-url] [![Coverage Status][coveralls-image]][coveralls-url]
A simple, fast, robust job/task queue for Node.js, backed by Redis.
- Simple: ~500 LOC, and the only dependency is [node_redis](https://github.com/mranney/node_redis).
- Simple: ~1000 LOC, and minimal dependencies.
- Fast: maximizes throughput by minimizing Redis and network overhead. [Benchmarks](#benchmarks) well.
- Robust: designed with concurrency, atomicity, and failure in mind; 100% code coverage.
- Robust: designed with concurrency, atomicity, and failure in mind; close to full code coverage.
```javascript
var Queue = require('bee-queue');
var queue = new Queue('example');
```js
const Queue = require('bee-queue');
const queue = new Queue('example');
var job = queue.createJob({x: 2, y: 3}).save();
job.on('succeeded', function (result) {
console.log('Received result for job ' + job.id + ': ' + result);
const job = queue.createJob({x: 2, y: 3}).save();
job.on('succeeded', (result) => {
console.log(`Received result for job ${job.id}: result`);
});

@@ -21,3 +22,3 @@

queue.process(function (job, done) {
console.log('Processing job ' + job.id);
console.log(`Processing job ${job.id}`);
return done(null, job.data.x + job.data.y);

@@ -28,28 +29,38 @@ });

## Introduction
Bee-Queue is meant to power a distributed worker pool and was built with short, real-time jobs in mind. A web server can enqueue a job, wait for a worker process to complete it, and return its results within an HTTP request. Scaling is as simple as running more workers.
[Celery](http://www.celeryproject.org/), [Resque](https://github.com/resque/resque), [Kue](https://github.com/LearnBoost/kue), and [Bull](https://github.com/OptimalBits/bull) operate similarly, but are generally designed for longer background jobs, supporting things like job scheduling and prioritization, which Bee-Queue [currently does not](#contributing). Bee-Queue can handle longer background jobs just fine, but they aren't [the primary focus](#motivation).
Thanks to the folks at [Mixmax](https://mixmax.com), Bee-Queue is once again being regularly maintained!
[Celery](http://www.celeryproject.org/), [Resque](https://github.com/resque/resque), [Kue](https://github.com/Automattic/kue), and [Bull](https://github.com/OptimalBits/bull) operate similarly, but are generally designed for longer background jobs, supporting things like job prioritization and repeatable jobs, which Bee-Queue [currently does not](#contributing). Bee-Queue can handle longer background jobs just fine, but they aren't [the primary focus](#motivation).
- Create, save, and process jobs
- Concurrent processing
- Job timeouts and retries
- Job timeouts, retries, and retry strategies
- Scheduled jobs
- Pass events via Pub/Sub
- Progress reporting
- Send job results back to creators
- Send job results back to producers
- Robust design
- Strives for all atomic operations
- Retries [stuck jobs](#under-the-hood)
- 100% code coverage
- Retries [stuck jobs](#under-the-hood) for at-least-once delivery
- High code coverage
- Performance-focused
- Keeps [Redis usage](#under-the-hood) to the bare minimum
- Minimizes [Redis usage](#under-the-hood)
- Uses [Lua scripting](http://redis.io/commands/EVAL) and [pipelining](http://redis.io/topics/pipelining) to minimize network overhead
- [Benchmarks](#benchmarks) favorably against similar libraries
- Fully callback- and Promise-compatible API
## Installation
```sh
$ npm install bee-queue
```
npm install bee-queue
```
You'll also need [Redis 2.8+](http://redis.io/topics/quickstart) running somewhere.
You'll also need [Redis 2.8+](http://redis.io/topics/quickstart)* running somewhere.
\* We've been noticing that some jobs get delayed by virtue of an issue with Redis < 3.2, and therefore recommend the use of Redis 3.2+.
# Table of Contents
- [Motivation](#motivation)

@@ -63,14 +74,15 @@ - [Benchmarks](#benchmarks)

- [Job & Queue Events](#job-and-queue-events)
- [Stalled Jobs](#stalled-jobs)
- [Stalling Jobs](#stalling-jobs)
- [API Reference](#api-reference)
- [Under The Hood](#under-the-hood)
- [Contributing](#contributing)
- [License](https://github.com/LewisJEllis/bee-queue/blob/master/LICENSE)
- [License](https://github.com/bee-queue/bee-queue/blob/master/LICENSE)
# Motivation
Celery is for Python, and Resque is for Ruby, but [Kue](https://github.com/LearnBoost/kue) and [Bull](https://github.com/OptimalBits/bull) already exist for Node, and they're good at what they do, so why does Bee-Queue also need to exist?
In short: we needed to mix and match things that Kue does well with things that Bull does well, and we needed to squeeze out more performance. There's also a [long version](https://github.com/LewisJEllis/bee-queue/wiki/Origin) with more details.
In short: we needed to mix and match things that Kue does well with things that Bull does well, and we needed to squeeze out more performance. There's also a [long version](https://github.com/bee-queue/bee-queue/wiki/Origin) with more details.
Bee-Queue starts by combining Bull's simplicity and robustness with Kue's ability to send events back to job creators, then focuses heavily on minimizing overhead, and finishes by being strict about [code quality](https://github.com/LewisJEllis/bee-queue/blob/master/.eslintrc) and [testing](https://coveralls.io/r/LewisJEllis/bee-queue?branch=master). It compromises on breadth of features, so there are certainly cases where Kue or Bull might be preferable (see [Contributing](#contributing)).
Bee-Queue starts by combining Bull's simplicity and robustness with Kue's ability to send events back to job creators, then focuses heavily on minimizing overhead, and finishes by being strict about [code quality](https://github.com/bee-queue/bee-queue/blob/master/.eslintrc.json) and [testing](https://coveralls.io/r/bee-queue/bee-queue?branch=master). It compromises on breadth of features, so there are certainly cases where Kue or Bull might be preferable (see [Contributing](#contributing)).

@@ -80,3 +92,5 @@ Bull and Kue do things really well and deserve a lot of credit. Bee-Queue borrows ideas from both, and Bull was an especially invaluable reference during initial development.

#### Why Bees?
Bee-Queue is like a bee because it:
- is small and simple

@@ -88,8 +102,11 @@ - is fast (bees can fly 20mph!)

# Benchmarks
![benchmark chart](https://raw.githubusercontent.com/LewisJEllis/bee-queue/master/benchmark/benchmark-chart.png)
These basic benchmarks ran 10,000 jobs through each library, at varying levels of concurrency, with io.js 2.2.1 and Redis 3.0.2 running directly on a 13" MBPr. The numbers shown are averages of 3 runs; the raw data collected and code used are available in the benchmark folder.
![benchmark chart](https://raw.githubusercontent.com/bee-queue/bee-queue/master/benchmark/benchmark-chart.png)
For a quick idea of space efficiency, the following table contains Redis memory usage as reported by [INFO](http://redis.io/commands/INFO) after doing a [FLUSHALL](http://redis.io/commands/FLUSHALL), restarting Redis, and running a single basic 10k job benchmark:
See the raw data [here](https://raw.githubusercontent.com/bee-queue/bee-queue/master/summary-2017-07-20.txt).
These basic benchmarks ran 10,000 jobs through each library, at varying levels of concurrency, with Node.js (v6.9.1, v7.6.0, and v8.2.0) and Redis 3.2.9 running directly on an AWS m4.xlarge. The numbers shown are averages of 3 runs; the raw data collected and code used are available in the benchmark folder.
For a rough idea of space efficiency, the following table contains Redis memory usage as reported by [INFO](http://redis.io/commands/INFO) after doing a [FLUSHALL](http://redis.io/commands/FLUSHALL), restarting Redis, and running a single basic 10k job benchmark:
| Library | Memory After | Memory Peak | Memory After Δ | Memory Peak Δ |

@@ -101,15 +118,23 @@ | --------- | ------------- | ----------- | -------------- | ------------- |

The Δ columns factor out the ~986KB of memory usage reported by Redis on a fresh startup.
The Δ columns factor out the ~986KB of memory usage reported by Redis on a fresh startup. Note that this data was collected in 2015, but should still accurately reflect the memory usage of Bee-Queue.
# Web Interface
Check out the [Arena](https://github.com/bee-queue/arena) web interface to manage jobs and inspect queue health.
# Overview
## Creating Queues
[Queue](#queue) objects are the starting point to everything this library does. To make one, we just need to give it a name, typically indicating the sort of job it will process:
```javascript
var Queue = require('bee-queue');
var addQueue = new Queue('addition');
```js
const Queue = require('bee-queue');
const addQueue = new Queue('addition');
```
Queues are very lightweight — the only significant overhead is connecting to Redis — so if you need to handle different types of jobs, just instantiate a queue for each:
```javascript
var subQueue = new Queue('subtraction', {
```js
const subQueue = new Queue('subtraction', {
redis: {

@@ -121,35 +146,58 @@ host: 'somewhereElse'

```
Here, we pass a `settings` object to specify an alternate Redis host and to indicate that this queue will only add jobs (not process them). See [Queue Settings](#settings) for more options.
## Creating Jobs
Jobs are created using `Queue.createJob(data)`, which returns a [Job](#job) object storing arbitrary `data`.
Jobs have a chaining API with commands `.retries(n)` and `.timeout(ms)` for setting options, and `.save([cb])` to save the job into Redis and enqueue it for processing:
Jobs have a chaining API for configuring the Job, and `.save([cb])` method to save the job into Redis and enqueue it for processing:
```javascript
var job = addQueue.createJob({x: 2, y: 3});
job.timeout(3000).retries(2).save(function (err, job) {
// job enqueued, job.id populated
});
```js
const job = addQueue.createJob({x: 2, y: 3});
job
.timeout(3000)
.retries(2)
.save()
.then((job) => {
// job enqueued, job.id populated
});
```
Jobs can later be retrieved from Redis using [Queue.getJob](#queueprototypegetjobjobid-cberr-job), but most use cases won't need this, and can instead use [Job and Queue Events](#job-and-queue-events).
The Job's `save` method returns a Promise in addition to calling the optional callback.
Each Job can be configured with the commands `.setId(id)`, `.retries(n)`, `.backoff(strategy, delayFactor)`, `.delayUntil(date|timestamp)`, and `.timeout(ms)` for setting options.
Jobs can later be retrieved from Redis using [Queue#getJob](#queuegetjobjobid-cb), but most use cases won't need this, and can instead use [Job and Queue Events](#job-and-queue-events).
## Processing Jobs
To start processing jobs, call `Queue.process` and provide a handler function:
```javascript
```js
addQueue.process(function (job, done) {
console.log('Processing job ' + job.id);
console.log(`Processing job ${job.id}`);
return done(null, job.data.x + job.data.y);
});
```
The handler function is given the job it needs to process, including `job.data` from when the job was created. It should then pass results to the `done` callback. For more on handlers, see [Queue.process](#queueprototypeprocessconcurrency-handlerjob-done).
`.process` can only be called once per Queue instance, but we can process on as many instances as we like, spanning multiple processes or servers, as long as they all connect to the same Redis instance. From this, we can easily make a worker pool of machines who all run the same code and spend their lives processing our jobs, no matter where those jobs are created.
Instead of calling the provided callback, the handler function can return a `Promise`. This enables the intuitive use of `async`/`await`:
`.process` can also take a concurrency parameter. If your jobs spend most of their time just waiting on external resources, you might want each processor instance to handle 10 at a time:
```javascript
var baseUrl = 'http://www.google.com/search?q=';
```js
addQueue.process(async (job) => {
console.log(`Processing job ${job.id}`);
return job.data.x + job.data.y;
});
```
The handler function is given the job it needs to process, including `job.data` from when the job was created. It should then pass results either by returning a `Promise` or by calling the `done` callback. For more on handlers, see [Queue#process](#queueprocessconcurrency-handlerjob-done).
`.process` can only be called once per `Queue` instance, but we can process on as many instances as we like, spanning multiple processes or servers, as long as they all connect to the same Redis instance. From this, we can easily make a worker pool of machines who all run the same code and spend their lives processing our jobs, no matter where those jobs are created.
`.process` can also take a concurrency parameter. If your jobs spend most of their time just waiting on external resources, you might want each processor instance to handle at most 10 at a time:
```js
const baseUrl = 'http://www.google.com/search?q=';
subQueue.process(10, function (job, done) {
http.get(baseUrl + job.data.x + '-' + job.data.y, function (res) {
http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
// parse the difference out of the response...

@@ -162,10 +210,12 @@ return done(null, difference);

## Progress Reporting
Handlers can send progress reports, which will be received as events on the original job instance:
```javascript
var job = addQueue.createJob({x: 2, y: 3}).save();
job.on('progress', function (progress) {
console.log('Job ' + job.id + ' reported progress: ' + progress + '%');
```js
const job = addQueue.createJob({x: 2, y: 3}).save();
job.on('progress', (progress) => {
console.log(`Job ${job.id} reported progress: ${progress}%`);
});
addQueue.process(function (job, done) {
addQueue.process(async (job) => {
// do some work

@@ -176,7 +226,7 @@ job.reportProgress(30);

// do the rest
done();
});
```
Just like `.process`, these `progress` events work across multiple processes or servers; the job instance will receive the progress event no matter where processing happens.
Just like `.process`, these `progress` events work across multiple processes or servers; the job instance will receive the progress event no matter where processing happens. Note that this mechanism depends on Pub/Sub, and thus will incur additional overhead for each additional worker node.
## Job and Queue Events

@@ -192,9 +242,9 @@

Note that Job events become unreliable across process restarts, since the queue's reference to the associated job object will be lost. Queue PubSub events are thus potentially more reliable, but Job events are more convenient in places like HTTP requests where a process restart loses state anyway.
Note that Job events become unreliable across process restarts, since the queue's reference to the associated job object will be lost. Queue-level events are thus potentially more reliable, but Job events are more convenient in places like HTTP requests where a process restart loses state anyway.
## Stalling Jobs
Bee-Queue attempts to provide ["at least once delivery"](http://www.cloudcomputingpatterns.org/At-least-once_Delivery), in the sense that any job enqueued should be processed at least once, even if a worker crashes, gets disconnected, or otherwise fails to confirm completion of the job.
Bee-Queue attempts to provide ["at least once delivery"](http://www.cloudcomputingpatterns.org/At-least-once_Delivery). Any job enqueued should be processed at least once - and if a worker crashes, gets disconnected, or otherwise fails to confirm completion of the job, the job will be dispatched to another worker for processing.
To make this happen, workers periodically phone home to Redis about each job they're working on, just to say "I'm still working on this and I haven't stalled, so you don't need to retry it." The [`checkStalledJobs`](#queueprototypecheckstalledjobsinterval-cb) method finds any active jobs whose workers have gone silent (not phoned home for at least [`stallInterval`](#settings) ms), assumes they have stalled, and re-enqueues them.
To make this happen, workers periodically phone home to Redis about each job they're working on, just to say "I'm still working on this and I haven't stalled, so you don't need to retry it." The [`checkStalledJobs`](#queuecheckstalledjobsinterval-cb) method finds any active jobs whose workers have gone silent (not phoned home for at least [`stallInterval`](#settings) ms), assumes they have stalled, and re-enqueues them.

@@ -206,7 +256,11 @@ # API Reference

### Settings
The default Queue settings are:
```javascript
var queue = Queue('test', {
```js
const queue = new Queue('test', {
prefix: 'bq',
stallInterval: 5000,
nearTermWindow: 1200000,
delayedDebounce: 1000,
redis: {

@@ -218,27 +272,39 @@ host: '127.0.0.1',

},
isWorker: true,
getEvents: true,
isWorker: true,
sendEvents: true,
storeJobs: true,
ensureScripts: true,
activateDelayedJobs: false,
removeOnSuccess: false,
catchExceptions: false
removeOnFailure: false,
redisScanCount: 100
});
```
The `settings` fields are:
- `prefix`: string, default `bq`. Useful if the `bq:` namespace is, for whatever reason, unavailable on your redis database.
- `stallInterval`: number, ms; the length of the window in which workers must report that they aren't stalling. Higher values will reduce Redis/network overhead, but if a worker stalls, it will take longer before its stalled job(s) will be retried.
- `redis`: object, specifies how to connect to Redis.
- `prefix`: string, default `bq`. Useful if the `bq:` namespace is, for whatever reason, unavailable or problematic on your redis instance.
- `stallInterval`: number, ms; the length of the window in which workers must report that they aren't stalling. Higher values will reduce Redis/network overhead, but if a worker stalls, it will take longer before its stalled job(s) will be retried. A higher value will also result in a lower probability of false-positives during stall detection.
- `nearTermWindow`: number, ms; the window during which delayed jobs will be specifically scheduled using `setTimeout` - if all delayed jobs are further out that this window, the Queue will double-check that it hasn't missed any jobs after the window elapses.
- `delayedDebounce`: number, ms; to avoid unnecessary churn for several jobs in short succession, the Queue may delay individual jobs by up to this amount.
- `redis`: object or string, specifies how to connect to Redis. See [`redis.createClient()`](https://github.com/NodeRedis/node_redis#rediscreateclient) for the full set of options.
- `host`: string, Redis host.
- `port`: number, Redis port.
- `socket`: string, Redis socket to be used instead of a host and port.
- `db`: number, Redis [DB index](http://redis.io/commands/SELECT).
- `options`: options object, passed to [node_redis](https://github.com/mranney/node_redis#rediscreateclient).
- `isWorker`: boolean, default true. Disable if this queue will not process jobs.
- `getEvents`: boolean, default true. Disable if this queue does not need to receive job events.
- `sendEvents`: boolean, default true. Disable if this worker does not need to send job events back to other queues.
- `removeOnSuccess`: boolean, default false. Enable to keep memory usage down by automatically removing jobs from Redis when they succeed.
- `catchExceptions`: boolean, default false. Only enable if you want exceptions thrown by the [handler](#queueprototypeprocessconcurrency-handlerjob-done) to be caught by Bee-Queue and interpreted as job failures. Communicating failures via `done(err)` is preferred.
- `isWorker`: boolean. Disable if this queue will not process jobs.
- `getEvents`: boolean. Disable if this queue does not need to receive job events.
- `sendEvents`: boolean. Disable if this worker does not need to send job events back to other queues.
- `storeJobs`: boolean. Disable if this worker does not need to associate events with specific `Job` instances. This normally improves [memory usage](https://github.com/bee-queue/bee-queue/issues/54), as the storage of jobs is unnecessary for many use-cases.
- `ensureScripts`: boolean. Ensure that the Lua scripts exist in redis before running any commands against redis.
- `activateDelayedJobs`: boolean. Activate delayed jobs once they've passed their `delayUntil` timestamp. Note that this must be enabled on at least one `Queue` instance for the delayed retry strategies (`fixed` and `exponential`) - this will reactivate them after their computed delay.
- `removeOnSuccess`: boolean. Enable to have this worker automatically remove its successfully completed jobs from Redis, so as to keep memory usage down.
- `removeOnFailure`: boolean. Enable to have this worker automatically remove its failed jobs from Redis, so as to keep memory usage down. This will not remove jobs that are set to retry unless they fail all their retries.
- `redisScanCount`: number. For setting the value of the `SSCAN` Redis command used in `Queue#getJobs` for succeeded and failed job types.
### Properties
- `name`: string, the name passed to the constructor.
- `keyPrefix`: string, the prefix used for all Redis keys associated with this queue.
- `jobs`: a `Map` associating the currently tracked jobs (when `storeJobs` and `getEvents` are enabled).
- `paused`: boolean, whether the queue instance is paused. Only true if the queue is in the process of closing.

@@ -250,42 +316,55 @@ - `settings`: object, the settings determined between those passed and the defaults

#### ready
```javascript
queue.on('ready', function () {
Instead of listening to this event, consider calling `Queue#ready([cb])`, which returns a Promise that resolves once the Queue is ready. If the Queue is already ready, then the Promise will be already resolved.
```js
queue.on('ready', () => {
console.log('queue now ready to start doing things');
});
```
The queue has connected to Redis and ensured that the [Lua scripts are cached](http://redis.io/commands/script-load). You can often get away without checking for this event, but it's a good idea to wait for it in case the Redis host didn't have the scripts cached beforehand; if you try to enqueue jobs when the scripts are not yet cached, you may run into a Redis error.
#### error
```javascript
queue.on('error', function (err) {
console.log('A queue error happened: ' + err.message);
```js
queue.on('error', (err) => {
console.log(`A queue error happened: ${err.message}`);
});
```
Any Redis errors are re-emitted from the Queue.
Any Redis errors are re-emitted from the Queue. Note that this event will not be emitted for failed jobs.
#### succeeded
```javascript
queue.on('succeeded', function (job, result) {
console.log('Job ' + job.id + ' succeeded with result: ' + result);
```js
queue.on('succeeded', (job, result) => {
console.log(`Job ${job.id} succeeded with result: ${result}`);
});
```
This queue has successfully processed `job`. If `result` is defined, the handler called `done(null, result)`.
#### retrying
```javascript
queue.on('retrying', function (job, err) {
console.log('Job ' + job.id + ' failed with error ' + err.message ' but is being retried!');
```js
queue.on('retrying', (job, err) => {
console.log(`Job ${job.id} failed with error ${err.message} but is being retried!`);
});
```
This queue has processed `job`, but it reported a failure and has been re-enqueued for another attempt. `job.options.retries` has been decremented accordingly.
This queue has processed `job`, but it reported a failure and has been re-enqueued for another attempt. `job.options.retries` has been decremented, and the stack trace (or error message) has been added to its `job.options.stacktraces` array.
#### failed
```javascript
queue.on('failed', function (job, err) {
console.log('Job ' + job.id + ' failed with error ' + err.message);
```js
queue.on('failed', (job, err) => {
console.log(`Job ${job.id} failed with error ${err.message}`);
});
```
This queue has processed `job`, but its handler reported a failure with `done(err)`.
This queue has processed `job`, but its handler reported a failure either by rejecting its returned Promise, or by calling `done(err)`.
### Queue PubSub Events
These events are all reported by some worker queue (with `sendEvents` enabled) and sent as Redis Pub/Sub messages back to any queues listening for them (with `getEvents` enabled). This means that listening for these events is effectively a monitor for all activity by all workers on the queue.

@@ -298,33 +377,49 @@

#### job succeeded
```javascript
queue.on('job succeeded', function (jobId, result) {
console.log('Job ' + job.id + ' succeeded with result: ' + result);
```js
queue.on('job succeeded', (jobId, result) => {
console.log(`Job ${jobId} succeeded with result: ${result}`);
});
```
Some worker has successfully processed job `jobId`. If `result` is defined, the handler called `done(null, result)`.
#### job retrying
```javascript
queue.on('job retrying', function (jobId, err) {
console.log('Job ' + jobId + ' failed with error ' + err.message ' but is being retried!');
```js
queue.on('job retrying', (jobId, err) => {
console.log(`Job ${jobId} failed with error ${err.message} but is being retried!`);
});
```
Some worker has processed job `jobId`, but it reported a failure and has been re-enqueued for another attempt.
#### job failed
```javascript
queue.on('job failed', function (jobId, err) {
console.log('Job ' + jobId + ' failed with error ' + err.message);
```js
queue.on('job failed', (jobId, err) => {
console.log(`Job ${jobId} failed with error ${err.message}`);
});
```
Some worker has processed `job`, but its handler reported a failure with `done(err)`.
#### job progress
```javascript
queue.on('job progress', function (jobId, progress) {
console.log('Job ' + jobId + ' reported progress: ' + progress + '%');
```js
queue.on('job progress', (jobId, progress) => {
console.log(`Job ${jobId} reported progress: ${progress}%`);
});
```
Some worker is processing job `jobId`, and it sent a [progress report](#jobprototypereportprogressn) of `progress` percent.
Some worker is processing job `jobId`, and it sent a [progress report](#jobreportprogressn) of `progress` percent.
### Queue Delayed Job activation
The `Queue` will activate no delayed jobs unless `activateDelayedJobs` is set to `true`.
The promptness of the job activation is controlled with the `delayedDebounce` setting on the `Queue`. This setting defines a window across which to group delayed jobs. If three jobs are enqueued for 10s, 10.5s, and 12s in the future, then a `delayedDebounce` of `1000` will cause the first two jobs to activate when the timestamp of the second job passes.
The `nearTermWindow` setting on the `Queue` determines the maximum duration the `Queue` should wait before attempting to activate any of the elapsed delayed jobs in Redis. This setting is to control for network failures in the delivery of the `earlierDelayed` event in conjunction with the death of the publishing `Queue`.
### Methods

@@ -336,27 +431,56 @@

#### Queue.prototype.createJob(data)
```javascript
var job = queue.createJob({...});
#### Queue#createJob(data)
```js
const job = queue.createJob({...});
```
Returns a new [Job object](#job) with the associated user data.
#### Queue.prototype.getJob(jobId, cb(err, job))
```javascript
#### Queue#getJob(jobId, [cb])
```js
queue.getJob(3, function (err, job) {
console.log('Job 3 has status ' + job.status);
console.log(`Job 3 has status ${job.status}`);
});
queue.getJob(3)
.then((job) => console.log(`Job 3 has status ${job.status}`));
```
Looks up a job by its `jobId`. The returned job will emit events if `getEvents` is true.
Be careful with this method; most potential uses would be better served by job events on already-existing job instances. Using this method indiscriminately can lead to increasing memory usage, as each queue maintains a table of all associated jobs in order to dispatch events.
Looks up a job by its `jobId`. The returned job will emit events if `getEvents` and `storeJobs` is true.
#### Queue.prototype.process([concurrency], handler(job, done))
Be careful with this method; most potential uses would be better served by job events on already-existing job instances. Using this method indiscriminately can lead to increasing memory usage when the `storeJobs` setting is `true`, as each queue maintains a table of all associated jobs in order to dispatch events.
#### Queue#getJobs(type, page, [cb])
```js
queue.getJobs('waiting', {start: 0, end: 25})
.then((jobs) => {
const jobIds = jobs.map((job) => job.id);
console.log(`Job ids: ${jobIds.join(' ')}`);
});
queue.getJobs('failed', {size: 100})
.then((jobs) => {
const jobIds = jobs.map((job) => job.id);
console.log(`Job ids: ${jobIds.join(' ')}`);
});
```
Looks up jobs by their queue type. When looking up jobs of type `waiting`, `active`, or `delayed`, `page` should be configured with `start` and `end` attributes to specify a range of job indices to return. Jobs of type `failed` and `succeeded` will return an arbitrary subset of the queue of size `page['size']`. Note: This is because failed and succeeded job types are represented by a Redis SET, which does not maintain a job ordering.
Note that large values of the attributes of `page` may cause excess load on the Redis server.
#### Queue#process([concurrency], handler(job, done))
Begins processing jobs with the provided handler function.
This function should only be called once, and should never be called on a queue where `isWorker` is false.
The `process` method should only be called once, and should never be called on a queue where `isWorker` is false.
The optional `concurrency` parameter sets the maximum number of simultaneously active jobs for this processor. It defaults to 1.
The handler function should:
The handler function should either:
- Return a `Promise` that eventually resolves or rejects, or
- Call `done` exactly once

@@ -366,11 +490,14 @@ - Use `done(err)` to indicate job failure

- `result` must be JSON-serializable (for `JSON.stringify`)
- Never throw an exception, unless `catchExceptions` has been enabled.
- Never ever [block](http://www.slideshare.net/async_io/practical-use-of-mongodb-for-nodejs/47) [the](http://blog.mixu.net/2011/02/01/understanding-the-node-js-event-loop/) [event](http://strongloop.com/strongblog/node-js-performance-event-loop-monitoring/) [loop](http://zef.me/blog/4561/node-js-and-the-case-of-the-blocked-event-loop) (for very long). If you do, the stall detection might think the job stalled, when it was really just blocking the event loop.
- Never ever [block](http://www.slideshare.net/async_io/practical-use-of-mongodb-for-nodejs/47) [the](http://blog.mixu.net/2011/02/01/understanding-the-node-js-event-loop/) [event](https://strongloop.com/strongblog/node-js-performance-event-loop-monitoring/) [loop](http://zef.me/blog/4561/node-js-and-the-case-of-the-blocked-event-loop) (for very long). If you do, the stall detection might think the job stalled, when it was really just blocking the event loop.
#### Queue.prototype.checkStalledJobs([interval], [cb])
_N.B. If the handler returns a `Promise`, calls to the `done` callback will be ignored._
#### Queue#checkStalledJobs([interval], [cb])
Checks for jobs that appear to be stalling and thus need to be retried, then re-enqueues them.
```javascript
queue.checkStalledJobs(5000, function (err) {
console.log('Checked stalled jobs'); // prints every 5000 ms
```js
queue.checkStalledJobs(5000, (err, numStalled) => {
// prints the number of stalled jobs detected every 5000 ms
console.log('Checked stalled jobs', numStalled);
});

@@ -380,2 +507,3 @@ ```

What happens after the check is determined by the parameters provided:
- `cb` only: `cb` is called

@@ -387,11 +515,15 @@ - `interval` only: a timeout is set to call the method again in `interval` ms

The maximum delay from when a job stalls until it will be retried is roughly `stallInterval` + `interval`, so to minimize that delay without calling `checkStalledJobs` unnecessarily often, set `interval` to be the same or a bit shorter than `stallInterval`. A good system-wide average frequency for the check is every 0.5-10 seconds, depending on how time-sensitive your jobs are in case of failure.
The maximum delay from when a job stalls until it will be retried is roughly `stallInterval + interval`, so to minimize that delay without calling `checkStalledJobs` unnecessarily often, set `interval` to be the same or a bit shorter than `stallInterval`. A good system-wide average frequency for the check is every 0.5-10 seconds, depending on how time-sensitive your jobs are in case of failure. Larger deployments, or deployments where processing has higher CPU variance, may need even higher intervals.
#### Queue.prototype.close([cb])
Closes the queue's connections to Redis.
Note that for calls that specify an interval, you must provide a callback if you want results from each subsequent check - the returned `Promise` can and will only resolve for the first check. If and only if you specify an `interval` and no `cb`, then errors encountered after the first check will be emitted as `error` events.
#### Queue#close([cb])
Closes the queue's connections to Redis. Idempotent.
## Job
### Properties
- `id`: number, Job ID unique to each job. Not populated until `.save` calls back.
- `id`: string, Job ID unique to each job. Not populated until `.save` calls back. Can be overridden with `Job#setId`.
- `data`: object; user data associated with the job. It should:

@@ -401,6 +533,7 @@ - Be JSON-serializable (for `JSON.stringify`)

- Ideally be as small as possible (1kB or less)
- `options`: object used by Bee-Queue to store timeout, retries, etc.
- `options`: object used by Bee-Queue to store timeout, retries, stack traces, etc.
- Do not modify directly; use job methods instead.
- `queue`: the Queue responsible for this instance of the job. This is either:
- the queue that called `createJob` to make the job
- the queue that called `createJob` to make the job,
- the queue that ran `getJob` to fetch the job from redis, or
- the queue that called `process` to process it

@@ -410,43 +543,70 @@ - `progress`: number; progress between 0 and 100, as reported by `reportProgress`.

### Job Events
These are all Pub/Sub events like [Queue PubSub events](#queue-pubsub-events) and are disabled when `getEvents` is false.
#### succeeded
```javascript
var job = queue.createJob({...}).save();
job.on('succeeded', function (result) {
console.log('Job ' + job.id + ' succeeded with result: ' + result);
```js
const job = await queue.createJob({...}).save();
job.on('succeeded', (result) => {
console.log(`Job ${job.id} succeeded with result: ${result}`);
});
```
The job has succeeded. If `result` is defined, the handler called `done(null, result)`.
#### retrying
```javascript
job.on('retrying', function (err) {
console.log('Job ' + job.id + ' failed with error ' + err.message ' but is being retried!');
```js
job.on('retrying', (err) => {
console.log(`Job ${job.id} failed with error ${err.message} but is being retried!`);
});
```
The job has failed, but it is being automatically re-enqueued for another attempt. `job.options.retries` has been decremented accordingly.
#### failed
```javascript
job.on('failed', function (err) {
console.log('Job ' + job.id + ' failed with error ' + err.message);
```js
job.on('failed', (err) => {
console.log(`Job ${job.id} failed with error ${err.message}`);
});
```
The job has failed, and is not being retried.
#### progress
```javascript
job.on('progress', function (progress) {
console.log('Job ' + job.id + ' reported progress: ' + progress + '%');
```js
job.on('progress', (progress) => {
console.log(`Job ${job.id} reported progress: ${progress}%`);
});
```
The job has sent a [progress report](#jobprototypereportprogressn) of `progress` percent.
The job has sent a [progress report](#jobreportprogressn) of `progress` percent.
### Methods
#### Job.prototype.retries(n)
```javascript
var job = queue.createJob({...}).retries(3).save();
Each Job can be configured with the commands `.setId(id)`, `.retries(n)`, `.backoff(strategy, delayFactor)`, `.delayUntil(date|timestamp)`, and `.timeout(ms)`.
#### Job#setId(id)
```js
const job = await queue.createJob({...})
.setId('bulk')
.save();
```
Explicitly sets the ID of the job. If a job with the given ID already exists, the Job will not be created, and `job.id` will be set to `null`. This method can be used to run a once for each of an external resource by passing that resource's ID. For instance, you might run the setup job for a user only once by setting the job ID to the ID of the user.
Avoid passing a numeric job ID, as it may conflict with an auto-generated ID.
#### Job#retries(n)
```js
const job = await queue.createJob({...})
.retries(3)
.save();
```
Sets how many times the job should be automatically retried in case of failure.

@@ -458,39 +618,80 @@

#### Job.prototype.timeout(ms)
```javascript
var job = queue.createJob({...}).timeout(10000).save();
#### Job#backoff(strategy, delayFactor)
```js
// When the job fails, retry it immediately.
const job = queue.createJob({...})
.backoff('immediate');
// When the job fails, wait the given number of milliseconds before retrying.
job.backoff('fixed', 1000);
// When the job fails, retry using an exponential backoff policy.
// In this example, the first retry will be after one second after completion
// of the first attempt, and the second retry will be two seconds after completion
// of the first retry.
job.backoff('exponential', 1000);
```
Sets a job runtime timeout; if the job's handler function takes longer than the timeout to call `done`, the worker assumes the job has failed and reports it as such.
Sets the backoff policy when handling retries.
This setting is stored in `job.options.backoff` as `{strategy, delay}`.
Defaults to `'immediate'`.
#### Job#delayUntil(date|timestamp)
```js
const job = await queue.createJob({...})
.delayUntil(Date.parse('2038-01-19T03:14:08.000Z'))
.save();
```
Delay the job until the given Date/timestamp passes. See the `Queue` settings section for information on controlling the activation of delayed jobs.
Defaults to enqueueing the job for immediate processing.
#### Job#timeout(ms)
```js
const job = await queue.createJob({...})
.timeout(10000)
.save();
```
Sets a job runtime timeout in milliseconds; if the job's handler function takes longer than the timeout to call `done`, the worker assumes the job has failed and reports it as such (causing the job to retry if applicable).
Defaults to no timeout.
#### Job.prototype.save([cb])
```javascript
var job = queue.createJob({...}).save(function (err, job) {
console.log('Saved job ' + job.id);
#### Job#save([cb])
```js
const job = queue.createJob({...});
job.save((err, job) => {
console.log(`Saved job ${job.id}`);
});
job.save().then((job) => console.log(`Saved job ${job.id}`));
```
Saves a job, queueing it up for processing. After the callback fires, `job.id` will be populated.
#### Job.prototype.reportProgress(n)
```javascript
queue.process(function (job, done) {
...
Saves a job, queueing it up for processing. After the callback fires (and associated Promise resolves), `job.id` will be populated.
#### Job#reportProgress(n)
```js
queue.process(async (job, done) => {
await doSomethingQuick();
job.reportProgress(10);
...
await doSomethingBigger();
job.reportProgress(50);
...
await doFinalizeStep();
});
```
Reports job progress when called within a handler function. Causes a `progress` event to be emitted.
Reports job progress when called within a handler function. Causes a `progress` event to be emitted. Does not persist the progress to Redis, but will store it on `job.progress`, and if other `Queue`s have `storeJobs` and `getEvents` enabled, then the `progress` will end up on all corresponding job instances.
### Defaults
All methods with an optional callback field will use the following default:
```javascript
var defaultCb = function (err) {
if (err) throw err;
};
```
Defaults for Queue `settings` live in `lib/defaults.js`. Changing that file will change Bee-Queue's default behavior.

@@ -501,4 +702,5 @@

Each Queue uses the following Redis keys:
- `bq:name:id`: Integer, incremented to determine the next Job ID.
- `bq:name:jobs`: Hash from Job ID to a JSON string of its data and options.
- `bq:name:jobs`: Hash from Job ID to a JSON string containing its data and options.
- `bq:name:waiting`: List of IDs of jobs waiting to be processed.

@@ -508,8 +710,11 @@ - `bq:name:active`: List of IDs jobs currently being processed.

- `bq:name:failed`: Set of IDs of jobs which failed.
- `bq:name:delayed`: Ordered Set of IDs corresponding to delayed jobs - this set maps delayed timestamp to IDs.
- `bq:name:stalling`: Set of IDs of jobs which haven't 'checked in' during this interval.
- `bq:name:stallBlock`: Set of IDs of jobs which haven't 'checked in' during this interval.
- `bq:name:events`: Pub/Sub channel for workers to send out job results.
- `bq:name:earlierDelayed`: When a new delayed job is added prior to all other jobs, the script creating the job will publish the job's timestamp over this Pub/Sub channel.
Bee-Queue is non-polling, so idle workers are listening to receive jobs as soon as they're enqueued to Redis. This is powered by [brpoplpush](http://redis.io/commands/BRPOPLPUSH), which is used to move jobs from the waiting list to the active list. Bee-Queue generally follows the "Reliable Queue" pattern described [here](http://redis.io/commands/rpoplpush).
The `isWorker` [setting](#settings) creates an extra Redis connection dedicated to `brpoplpush`, while `getEvents` creates one dedicated to receiving Pub/Sub events. As such, these settings should be disabled if you don't need them; in most cases, only one of them needs to be enabled.
The `isWorker` [setting](#settings) creates an extra Redis connection dedicated to `brpoplpush`. If either `getEvents` or `activateDelayedJobs` are enabled, another connection is dedicated to receiving Pub/Sub events. As such, these settings should be disabled if you don't need them.

@@ -522,9 +727,8 @@ The stalling set is a snapshot of the active list from the beginning of the latest stall interval. During each stalling interval, workers remove their job IDs from the stalling set, so at the end of an interval, any jobs whose IDs are left in the stalling set have missed their window (stalled) and need to be rerun. When `checkStalledJobs` runs, it re-enqueues any jobs left in the stalling set (to the waiting list), then takes a snapshot of the active list and stores it in the stalling set.

Pull requests are welcome; just make sure `grunt test` passes. For significant changes, open an issue for discussion first.
Pull requests are welcome; just make sure `npm test` passes. For significant changes, open an issue for discussion first.
Some significant non-features include:
- Job scheduling: Kue and Bull do this.
- Worker tracking: Kue does this.
- All-workers pause-resume: Bull does this.
- Web interface: Kue has a nice one built in, and someone made [one for Bull](https://github.com/ShaneK/Matador).
- Job priority: multiple queues get the job done in simple cases, but Kue has first-class support. Bull provides a wrapper around multiple queues.

@@ -534,9 +738,9 @@

You'll need a local redis server to run the tests. Note that running them will delete any keys that start with `bq:test:`.
You'll need a local redis server to run the tests. Note that running the tests may delete some keys in the form of `bq:test-*-*:*`.
[npm-image]: https://img.shields.io/npm/v/bee-queue.svg?style=flat
[npm-url]: https://www.npmjs.com/package/bee-queue
[travis-image]: https://img.shields.io/travis/LewisJEllis/bee-queue.svg?style=flat
[travis-url]: https://travis-ci.org/LewisJEllis/bee-queue
[coveralls-image]: https://coveralls.io/repos/LewisJEllis/bee-queue/badge.svg?branch=master
[coveralls-url]: https://coveralls.io/r/LewisJEllis/bee-queue?branch=master
[travis-image]: https://img.shields.io/travis/bee-queue/bee-queue.svg?style=flat
[travis-url]: https://travis-ci.org/bee-queue/bee-queue
[coveralls-image]: https://coveralls.io/repos/bee-queue/bee-queue/badge.svg?branch=master
[coveralls-url]: https://coveralls.io/r/bee-queue/bee-queue?branch=master

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc