Comparing version 2.1.2 to 2.2.0
@@ -0,1 +1,12 @@ | ||
v.2.2.0 | ||
======= | ||
- Much improved priority queues, simpler, faster and more reliable. | ||
- Fixed issue where lua scripts where leaking memory. | ||
- Improvements in local pause, fixing #446 and #447. | ||
- Fix to increase delay time over 24 days #244 | ||
[Changes](https://github.com/OptimalBits/bull/compare/v2.1.2...v2.2.0) | ||
v.2.1.2 | ||
@@ -2,0 +13,0 @@ ======= |
@@ -40,5 +40,10 @@ /*eslint-env node */ | ||
function addJob(queue, job){ | ||
var opts = job.opts; | ||
var jobData = job.toData(); | ||
var toKey = _.bind(queue.toKey, queue); | ||
return scripts.addJob(queue.client, toKey, jobData, { lifo: job.opts.lifo, customJobId: job.opts.jobId }); | ||
return scripts.addJob(queue.client, toKey, jobData, { | ||
lifo: opts.lifo, | ||
customJobId: opts.jobId, | ||
priority: opts.priority | ||
}); | ||
} | ||
@@ -45,0 +50,0 @@ |
@@ -20,2 +20,4 @@ "use strict"; | ||
console.warn("DEPRECATION NOTICE: PriorityQueue has been deprecated and will be removed in bull 3.0.0, please use the priority option instead."); | ||
var _this = this; | ||
@@ -22,0 +24,0 @@ this.paused = false; |
125
lib/queue.js
@@ -17,14 +17,15 @@ /*eslint-env node */ | ||
/** | ||
Gets or creates a new Queue with the given name. | ||
The Queue keeps 5 data structures: | ||
The Queue keeps 6 data structures: | ||
- wait (list) | ||
- active (list) | ||
- delayed (zset) | ||
- priority (zset) | ||
- completed (set) | ||
- failed (set) | ||
-- >completed | ||
/ | ||
--> priorities -- >completed | ||
/ / | ||
job -> wait -> active | ||
@@ -40,3 +41,3 @@ | ^ \ | ||
The mechanism is simple, a delayedTimestamp variable holds the next | ||
known timestamp that is on the delayed set (or MAX_INT if none). | ||
known timestamp that is on the delayed set (or MAX_TIMEOUT_MS if none). | ||
@@ -64,2 +65,4 @@ When the current job has finalized the variable is checked, if | ||
var MAX_TIMEOUT_MS = Math.pow(2, 31) - 1; // 32 bit signed | ||
var Queue = function Queue(name, redisPort, redisHost, redisOptions){ | ||
@@ -101,5 +104,5 @@ if(!(this instanceof Queue)){ | ||
if(_.isFunction(redisOptions.createClient)){ | ||
client = redisOptions.createClient(); | ||
client = new redisOptions(); | ||
}else{ | ||
client = redis.createClient(redisPort, redisHost, redisOptions); | ||
client = new redis(redisPort, redisHost, redisOptions); | ||
} | ||
@@ -156,2 +159,3 @@ return client; | ||
this.processing = 0; | ||
this.retrieving = 0; | ||
@@ -269,2 +273,10 @@ this.LOCK_RENEW_TIME = LOCK_RENEW_TIME; | ||
Queue.prototype.getJobMoveCount = function(){ | ||
return this.bclient.commandQueue.length; | ||
}; | ||
Queue.prototype.whenCurrentMoveFinished = function(){ | ||
var currentMove = this.bclient.commandQueue.peekFront() | ||
return currentMove && currentMove.command.promise || Promise.resolve(); | ||
}; | ||
/** | ||
@@ -324,2 +336,3 @@ * | ||
return this.closing = this._initializing.then(function(){ | ||
@@ -555,3 +568,3 @@ clearTimeout(_this.delayTimer); | ||
if(newDelayedTimestamp < _this.delayedTimestamp){ | ||
if(newDelayedTimestamp < _this.delayedTimestamp && newDelayedTimestamp < (MAX_TIMEOUT_MS + Date.now())){ | ||
clearTimeout(this.delayTimer); | ||
@@ -571,3 +584,3 @@ this.delayedTimestamp = newDelayedTimestamp; | ||
_this.updateDelayTimer(nextTimestamp); | ||
}).catch(function(err){ | ||
}).catch(function(err){ | ||
console.error('Error updating the delay timer', err); | ||
@@ -716,4 +729,2 @@ }); | ||
this.processing++; | ||
return lockRenewer().then(function(locked){ | ||
@@ -739,4 +750,20 @@ if(locked){ | ||
Queue.prototype.getNextJob = function(opts){ | ||
var _this = this; | ||
if(!this.closing){ | ||
return this.moveJob('wait', 'active', opts).then(this.getJobFromId); | ||
this.retrieving++; | ||
return this.moveJob('wait', 'active', opts) | ||
.then(this.getJobFromId) | ||
.tap(function(job) { | ||
_this.retrieving--; | ||
if (job) { | ||
_this.processing++; | ||
} else { | ||
_this.emit('no-job-retrieved'); | ||
} | ||
}) | ||
.catch(function(err) { | ||
_this.retrieving--; | ||
_this.emit('no-job-retrieved'); | ||
throw err; | ||
}); | ||
}else{ | ||
@@ -757,23 +784,40 @@ return Promise.reject(); | ||
Queue.prototype.moveJob = function(src, dst, opts) { | ||
var args = arguments; | ||
var _this = this; | ||
var move; | ||
if(opts && opts.block === false){ | ||
if(!this.closing){ | ||
return this.bclient.rpoplpush(this.toKey(src), this.toKey(dst)); | ||
move = this.bclient.rpoplpush(this.toKey(src), this.toKey(dst)); | ||
}else{ | ||
return Promise.reject(); | ||
move = Promise.reject(); | ||
} | ||
} else if (this.closing || this.paused) { | ||
move = Promise.resolve(); | ||
} else if (this.getJobMoveCount()) { | ||
move = this.whenCurrentMoveFinished().then(function() { | ||
return _this.moveJob.apply(_this, args); | ||
}); | ||
}else{ | ||
return this.bclient.brpoplpush( | ||
move = this.bclient.brpoplpush( | ||
this.toKey(src), | ||
this.toKey(dst), | ||
Math.floor(this.LOCK_RENEW_TIME / 1000)).then(function(jobId) { | ||
// Return undefined instead of Promise.reject if there is no jobId | ||
// Avoid Promise.reject because https://github.com/OptimalBits/bull/issues/144 | ||
Math.floor(this.LOCK_RENEW_TIME / 1000)); | ||
} | ||
return move.then(function(jobId){ | ||
// | ||
// Unfortunatelly this cannot be performed atomically, which will lead to a | ||
// slight hazard for priority queues (will only affect its order). | ||
// | ||
if(jobId){ | ||
return _this.client.zrem(_this.toKey('priority'), jobId).then(function(){ | ||
return jobId; | ||
}, function(err){ | ||
if(!_this.closing){ | ||
return err; | ||
} | ||
}); | ||
} | ||
} | ||
}, function(err){ | ||
if(!_this.closing){ | ||
throw err; | ||
} | ||
}); | ||
}; | ||
@@ -963,22 +1007,23 @@ | ||
var resolver; | ||
return new Promise(function(resolve, reject) { | ||
_this.getActiveCount().then(function(count) { | ||
if(count === 0){ | ||
var count = this.processing + this.retrieving; | ||
return new Promise(function(resolve) { | ||
if(count === 0){ | ||
resolve(); | ||
}else{ | ||
resolver = _.after(count, function(){ | ||
_this.removeListener('stalled', resolver); | ||
_this.removeListener('completed', resolver); | ||
_this.removeListener('failed', resolver); | ||
_this.removeListener('no-job-retrieved', resolver); | ||
clearInterval(_this.moveUnlockedJobsToWaitInterval); | ||
resolve(); | ||
}else{ | ||
resolver = _.after(count, function(){ | ||
_this.removeListener('stalled', resolver); | ||
_this.removeListener('completed', resolver); | ||
_this.removeListener('failed', resolver); | ||
clearInterval(_this.moveUnlockedJobsToWaitInterval); | ||
resolve(); | ||
}); | ||
}); | ||
_this.on('stalled', resolver); | ||
_this.on('completed', resolver); | ||
_this.on('failed', resolver); | ||
_this.on('stalled', resolver); | ||
_this.on('completed', resolver); | ||
_this.on('failed', resolver); | ||
_this.on('no-job-retrieved', resolver); | ||
_this.startMoveUnlockedJobsToWait(); | ||
} | ||
}, reject); | ||
_this.startMoveUnlockedJobsToWait(); | ||
} | ||
}); | ||
@@ -985,0 +1030,0 @@ }; |
@@ -25,18 +25,28 @@ /** | ||
function isCommandDefined(client, hash){ | ||
return !!client[hash]; | ||
} | ||
var scripts = { | ||
_isJobInList: function(keyVar, argVar, operator) { | ||
keyVar = keyVar || 'KEYS[1]'; | ||
argVar = argVar || 'ARGV[1]'; | ||
operator = operator || 'return'; | ||
return [ | ||
'local function item_in_list (list, item)', | ||
' for _, v in pairs(list) do', | ||
' if v == item then', | ||
' return 1', | ||
' end', | ||
' end', | ||
' return nil', | ||
'end', | ||
['local items = redis.call("LRANGE",', keyVar, ' , 0, -1)'].join(''), | ||
[operator, ' item_in_list(items, ', argVar, ')'].join('') | ||
keyVar = keyVar ? 'splitKey[1]..":"..splitKey[2]..":active"' : 'KEYS[1]'; | ||
argVar = argVar || 'ARGV[1]'; | ||
operator = operator || 'return'; | ||
return [ | ||
'local function item_in_list (list, item)', | ||
' for _, v in pairs(list) do', | ||
' if v == item then', | ||
' return 1', | ||
' end', | ||
' end', | ||
' return nil', | ||
'end', | ||
'local splitKey={}', | ||
'local i=1', | ||
'for str in string.gmatch(KEYS[1], "([^:]+)") do', | ||
'splitKey[i] = str', | ||
'i = i + 1', | ||
'end', | ||
[ 'local items = redis.call("LRANGE", ', keyVar, ' , 0, -1)' ].join(''), | ||
[ operator, ' item_in_list(items, ', argVar, ')' ].join('') | ||
].join('\n'); | ||
@@ -50,8 +60,25 @@ }, | ||
addJob: function(client, toKey, job, opts){ | ||
var delayed; | ||
var scriptName; | ||
opts = opts || {}; | ||
opts.lifo = !!(opts.lifo); | ||
var delayTimestamp = job.timestamp + job.delay; | ||
if(job.delay && delayTimestamp > Date.now()){ | ||
delayed = true; | ||
scriptName = 'addJob:delayed'; | ||
} else { | ||
scriptName = 'addJob'+(opts.lifo?':lifo':'') + (opts.priority?':priority':''); | ||
} | ||
/* | ||
if(isCommandDefined(client, scriptName)){ | ||
return client[scriptName].apply(client, args); | ||
}; | ||
*/ | ||
var jobArgs = _.flatten(_.toPairs(job)); | ||
var keys = _.map(['wait', 'paused', 'meta-paused', 'jobs', 'id', 'delayed'], function(name){ | ||
var keys = _.map(['wait', 'paused', 'meta-paused', 'jobs', 'id', 'delayed', 'priority'], function(name){ | ||
return toKey(name); | ||
@@ -62,3 +89,3 @@ }); | ||
var argvs = _.map(jobArgs, function(arg, index){ | ||
return ', ARGV['+(index+3)+']'; | ||
return ', ARGV['+(index+4)+']'; | ||
}) | ||
@@ -75,8 +102,6 @@ | ||
var scriptName; | ||
var delayTimestamp = job.timestamp + job.delay; | ||
if(job.delay && delayTimestamp > Date.now()){ | ||
if(delayed){ | ||
script.push.apply(script, [ | ||
' local timestamp = tonumber(ARGV[' + (argvs.length + 3) + ']) * 0x1000 + bit.band(jobCounter, 0xfff)', | ||
' local timestamp = tonumber(ARGV[' + (argvs.length + 4) + ']) * 0x1000 + bit.band(jobCounter, 0xfff)', | ||
' redis.call("ZADD", KEYS[6], timestamp, jobId)', | ||
@@ -86,6 +111,32 @@ ' redis.call("PUBLISH", KEYS[6], (timestamp / 0x1000))', | ||
]); | ||
}else{ | ||
var push, pushPaused; | ||
var add = _.template('redis.call("<%= direction %>", <%= waitQueue %>, jobId)'); | ||
scriptName = 'addJob:delayed'; | ||
}else{ | ||
var push = (opts.lifo ? 'R' : 'L') + 'PUSH'; | ||
if(opts.lifo){ | ||
push = add({direction: 'RPUSH', waitQueue: 'KEYS[1]'}); | ||
pushPaused = add({direction: 'RPUSH', waitQueue: 'KEYS[2]'}); | ||
}else if(opts.priority){ | ||
script.push.apply(script, [ | ||
' redis.call("ZADD", KEYS[7], ARGV[3], jobId)', | ||
' local count = redis.call("ZCOUNT", KEYS[7], 0, ARGV[3])', | ||
]); | ||
var priorityAdd = _.template([ | ||
' local len = redis.call("LLEN", <%= waitQueue %>)', | ||
' local id = redis.call("LINDEX", <%= waitQueue %>, len - (count-1))', | ||
' if id then', | ||
' redis.call("LINSERT", <%= waitQueue %>, "BEFORE", id, jobId)', | ||
' else', | ||
' redis.call("RPUSH", <%= waitQueue %>, jobId)', | ||
' end', | ||
].join('\n')); | ||
push = priorityAdd({waitQueue: 'KEYS[1]'}); | ||
pushPaused = priorityAdd({waitQueue: 'KEYS[2]'}); | ||
}else{ | ||
push = add({direction: 'LPUSH', waitQueue: 'KEYS[1]'}); | ||
pushPaused = add({direction: 'LPUSH', waitQueue: 'KEYS[2]'}); | ||
} | ||
// | ||
@@ -96,5 +147,5 @@ // Whe check for the meta-paused key to decide if we are paused or not | ||
'if redis.call("EXISTS", KEYS[3]) ~= 1 then', | ||
' redis.call("' + push + '", KEYS[1], jobId)', | ||
push, | ||
'else', | ||
' redis.call("' + push + '", KEYS[2], jobId)', | ||
pushPaused, | ||
'end', | ||
@@ -104,4 +155,2 @@ 'redis.call("PUBLISH", KEYS[4], jobId)', | ||
]); | ||
scriptName = 'addJob'+push; | ||
} | ||
@@ -119,2 +168,3 @@ | ||
args.push(opts.customJobId || ''); | ||
args.push(opts.priority); | ||
args.push.apply(args, jobArgs); | ||
@@ -125,2 +175,3 @@ args.push(delayTimestamp); | ||
}, | ||
// TODO: perfect this function so that it can be used instead | ||
@@ -312,3 +363,3 @@ // of all the specialized functions moveToComplete, etc. | ||
var isJobInList = this._isJobInList(keyVar, argVar, 'if'); | ||
var lockAcquired = ['and redis.call("HSET", "', queue.toKey(job.jobId), '", "lockAcquired", "1")'].join(''); | ||
var lockAcquired = 'and redis.call("HSET", splitKey[1]..":"..splitKey[2]..":"..splitKey[3], "lockAcquired", "1")'; | ||
var success = 'then return 1 else return 0 end'; | ||
@@ -315,0 +366,0 @@ var opts = { |
{ | ||
"name": "bull", | ||
"version": "2.1.2", | ||
"version": "2.2.0", | ||
"description": "Job manager", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -349,3 +349,3 @@ Bull Job Manager | ||
redisHost {String} A host specified as IP or domain where redis is running. | ||
redisOptions {Object} Options to pass to the redis client. https://github.com/mranney/node_redis | ||
redisOptions {Object} Options to pass to the redis client. https://github.com/luin/ioredis/blob/master/API.md#new-redisport-host-options | ||
``` | ||
@@ -360,3 +360,3 @@ | ||
redisConnectionString {String} A connection string containing the redis server host, port and (optional) authentication. | ||
redisOptions {Object} Options to pass to the redis client. https://github.com/mranney/node_redis | ||
redisOptions {Object} Options to pass to the redis client. https://github.com/luin/ioredis/blob/master/API.md#new-redisport-host-options | ||
``` | ||
@@ -436,3 +436,7 @@ | ||
{ | ||
priority {Number} Optional priority value, ranges from 1 (highest priority) to MAX_INT (lowest priority). Note that | ||
using priorities has a slight impact on performance, so do not use if not required. | ||
delay {Number} An amount of miliseconds to wait until this job can be processed. Note that for accurate delays, both server and clients should have their clocks synchronized. [optional] | ||
attempts {Number} The total number of attempts to try the job until it completes. | ||
@@ -442,8 +446,10 @@ | ||
backoff.type {String} Backoff type, which can be either `fixed` or `exponential` | ||
backoff.delay {Number} Backoff delay, in milliseconds | ||
backoff.delay {Number} Backoff delay, in milliseconds. | ||
lifo {Boolean} A boolean which, if true, adds the job to the right of the queue | ||
instead of the left (default false) | ||
timeout {Number} The number of milliseconds after which the job should be fail | ||
with a timeout error [optional] | ||
jobId {Number|String} Override the job ID - by default, the job ID is a unique | ||
@@ -454,2 +460,3 @@ integer, but you can use this setting to override it. | ||
already exists, it will not be added. | ||
removeOnComplete {Boolean} A boolean which, if true, removes the job when it successfully | ||
@@ -646,6 +653,10 @@ completes. Default behavior is to keep the job in the completed queue. | ||
<a name="priorityQueue"/> | ||
###PriorityQueue(queueName, redisPort, redisHost, [redisOpts]) | ||
### DEPRECATION notice | ||
The priority queue has been deprecated since version 2.2.0 in favor of a new option, *priority* in [Queue##add](#add). | ||
The priorityQueue will be removed from the code base in version 3.0.0. | ||
-- | ||
This is the Queue constructor of priority queue. It works same a normal queue, with same function and parameters. | ||
@@ -681,3 +692,3 @@ The only difference is that the Queue#add() allow an options opts.priority that could take | ||
The most important property for the user is Job##data that includes the | ||
object that was passed to Queue##add, and that is normally used to | ||
object that was passed to [Queue##add](#add), and that is normally used to | ||
perform the job. | ||
@@ -684,0 +695,0 @@ |
@@ -21,3 +21,3 @@ /*eslint-env node */ | ||
// we need to purge all keys after each test | ||
var client = redis.createClient(6379, '127.0.0.1', {}); | ||
var client = new redis(6379, '127.0.0.1', {}); | ||
client.select(0); | ||
@@ -24,0 +24,0 @@ |
@@ -15,3 +15,3 @@ /*eslint-env node */ | ||
beforeEach(function(){ | ||
var client = redis.createClient(); | ||
var client = new redis(); | ||
return client.flushdb().then(function(){ | ||
@@ -18,0 +18,0 @@ queue = utils.buildQueue(); |
@@ -17,3 +17,3 @@ /*eslint-env node */ | ||
beforeEach(function(){ | ||
var client = redis.createClient(); | ||
var client = new redis(); | ||
return client.flushdb(); | ||
@@ -388,3 +388,3 @@ }); | ||
var client = redis.createClient(); | ||
var client = new redis(); | ||
return Job.create(queue, {foo: 'baz'}).then(function(job) { | ||
@@ -391,0 +391,0 @@ return job.isStuck().then(function(isStuck) { |
@@ -29,3 +29,3 @@ /// <reference path='../typings/mocha/mocha.d.ts'/> | ||
beforeEach(function(){ | ||
var client = redis.createClient(); | ||
var client = new redis(); | ||
return client.flushdb(); | ||
@@ -50,3 +50,3 @@ }); | ||
clients++; | ||
return redis.createClient(); | ||
return new redis(); | ||
} | ||
@@ -825,3 +825,3 @@ } | ||
it('should clean a job without a timestamp', function (done) { | ||
var client = redis.createClient(6379, '127.0.0.1', {}); | ||
var client = new redis(6379, '127.0.0.1', {}); | ||
@@ -828,0 +828,0 @@ queue.add({some: 'data'}, {priority: 'normal'}); |
@@ -32,3 +32,3 @@ /*eslint-env node */ | ||
beforeEach(function () { | ||
var client = redis.createClient(); | ||
var client = new redis(); | ||
return client.flushdb(); | ||
@@ -270,3 +270,3 @@ }); | ||
beforeEach(function () { | ||
var client = redis.createClient(); | ||
var client = new redis(); | ||
return client.flushdb().then(function () { | ||
@@ -323,2 +323,45 @@ return utils.newQueue(); | ||
it('should processes jobs by priority', function(done){ | ||
var normalPriority = [], | ||
mediumPriority = [], | ||
highPriority = []; | ||
// for the current strategy this number should not exceed 8 (2^2*2) | ||
// this is done to maitain a deterministic output. | ||
var numJobsPerPriority = 6; | ||
for(var i = 0; i < numJobsPerPriority; i++){ | ||
normalPriority.push(queue.add({p: 2}, {priority: 2})); | ||
mediumPriority.push(queue.add({p: 3}, {priority: 3})); | ||
highPriority.push(queue.add({p: 1}, {priority: 1})); | ||
} | ||
// wait for all jobs to enter the queue and then start processing | ||
Promise | ||
.all(normalPriority, mediumPriority, highPriority) | ||
.then(function(){ | ||
var currentPriority = 1; | ||
var counter = 0; | ||
var total = 0; | ||
queue.process(function(job, jobDone){ | ||
expect(job.jobId).to.be.ok(); | ||
expect(job.data.p).to.be(currentPriority); | ||
jobDone(); | ||
total ++; | ||
if(++counter === numJobsPerPriority){ | ||
currentPriority++; | ||
counter = 0; | ||
if(currentPriority === 4 && total === numJobsPerPriority * 3){ | ||
done(); | ||
} | ||
} | ||
}); | ||
}, done); | ||
}); | ||
it('process several jobs serially', function (done) { | ||
@@ -635,3 +678,3 @@ this.timeout(12000); | ||
jobDone(); | ||
var client = redis.createClient(); | ||
var client = new redis(); | ||
client.srem(queue2.toKey('completed'), 1); | ||
@@ -736,3 +779,3 @@ client.lpush(queue2.toKey('active'), 1); | ||
it('does not renew a job lock after the lock has been released [#397]', function (done) { | ||
this.timeout(queue.LOCK_RENEW_TIME * 3); | ||
this.timeout(queue.LOCK_RENEW_TIME * 4); | ||
@@ -768,3 +811,3 @@ queue.process(function (job) { | ||
var retryQueue = utils.buildQueue('retry-test-queue'); | ||
var client = redis.createClient(6379, '127.0.0.1', {}); | ||
var client = new redis(6379, '127.0.0.1', {}); | ||
@@ -846,3 +889,3 @@ client.select(0); | ||
beforeEach(function () { | ||
var client = redis.createClient(); | ||
var client = new redis(); | ||
return client.flushdb(); | ||
@@ -987,6 +1030,81 @@ }); | ||
}); | ||
it('should pause the queue locally when more than one worker is active', function () { | ||
var queue1 = utils.buildQueue('pause-queue'); | ||
var queue1IsProcessing = new Promise(function(resolve) { | ||
queue1.process(function(job, jobDone) { | ||
resolve(); | ||
setTimeout(jobDone, 200); | ||
}); | ||
}); | ||
var queue2 = utils.buildQueue('pause-queue'); | ||
var queue2IsProcessing = new Promise(function(resolve) { | ||
queue2.process(function(job, jobDone) { | ||
resolve(); | ||
setTimeout(jobDone, 200); | ||
}); | ||
}); | ||
queue1.add(1); | ||
queue1.add(2); | ||
queue1.add(3); | ||
queue1.add(4); | ||
return Promise.all([queue1IsProcessing, queue2IsProcessing]).then(function() { | ||
return Promise.all([queue1.pause(true /* local */), queue2.pause(true /* local */)]).then(function() { | ||
var active = queue1.getJobCountByTypes(['active']).then(function(count) { | ||
expect(count).to.be(0); | ||
}); | ||
var pending = queue1.getJobCountByTypes(['wait']).then(function(count) { | ||
expect(count).to.be(2); | ||
}); | ||
var completed = queue1.getJobCountByTypes(['completed']).then(function(count) { | ||
expect(count).to.be(2); | ||
}); | ||
return Promise.all([active, pending, completed]); | ||
}); | ||
}); | ||
}); | ||
it('should wait for blocking job retrieval to complete before pausing locally', function() { | ||
var queue = utils.buildQueue(); | ||
queue.process(function(job, jobDone) { | ||
setTimeout(jobDone, 200); | ||
}); | ||
return new Promise(function(resolve) { | ||
queue.on('ready', resolve); | ||
}).then(function() { | ||
//start the pause process | ||
var queueIsPaused = queue.pause(true); | ||
//add some jobs | ||
return Promise.all([ queue.add(1), queue.add(2) ]).then(function() { | ||
//wait for the queue to finish pausing | ||
return queueIsPaused; | ||
}); | ||
}).then(function() { | ||
var active = queue.getJobCountByTypes(['active']).then(function(count) { | ||
expect(count).to.be(0); | ||
}); | ||
var pending = queue.getJobCountByTypes(['wait']).then(function(count) { | ||
expect(count).to.be(1); | ||
}); | ||
var completed = queue.getJobCountByTypes(['completed']).then(function(count) { | ||
expect(count).to.be(1); | ||
}); | ||
return Promise.all([active, pending, completed]); | ||
}); | ||
}); | ||
}); | ||
it('should publish a message when a new message is added to the queue', function (done) { | ||
var client = redis.createClient(6379, '127.0.0.1', {}); | ||
var client = new redis(6379, '127.0.0.1', {}); | ||
client.select(0); | ||
@@ -1022,3 +1140,3 @@ var queue = new Queue('test pub sub'); | ||
beforeEach(function () { | ||
var client = redis.createClient(); | ||
var client = new redis(); | ||
return client.flushdb(); | ||
@@ -1030,3 +1148,3 @@ }); | ||
queue = new Queue('delayed queue simple'); | ||
var client = redis.createClient(6379, '127.0.0.1', {}); | ||
var client = new redis(6379, '127.0.0.1', {}); | ||
var timestamp = Date.now(); | ||
@@ -1056,3 +1174,3 @@ var publishHappened = false; | ||
expect(publishHappened).to.be(true); | ||
queue.close().then(done, done); | ||
queue.close(true).then(done, done); | ||
}); | ||
@@ -1148,3 +1266,3 @@ }); | ||
it('should process delayed jobs with exact same timestamps in correct order (FIFO)', function (done) { | ||
var client = redis.createClient(6379, '127.0.0.1', {}); | ||
var client = new redis(6379, '127.0.0.1', {}); | ||
client = Promise.promisifyAll(client); | ||
@@ -1187,3 +1305,3 @@ var QUEUE_NAME = 'delayed queue multiple' + uuid(); | ||
beforeEach(function () { | ||
var client = redis.createClient(); | ||
var client = new redis(); | ||
queue = utils.buildQueue(); | ||
@@ -1930,3 +2048,3 @@ return client.flushdb(); | ||
it('should clean a job without a timestamp', function (done) { | ||
var client = redis.createClient(6379, '127.0.0.1', {}); | ||
var client = new redis(6379, '127.0.0.1', {}); | ||
@@ -1933,0 +2051,0 @@ queue.add({ some: 'data' }); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
225912
5950
762