Comparing version
@@ -126,4 +126,4 @@ /*eslint-env node */ | ||
*/ | ||
Job.prototype.takeLock = function(token, renew){ | ||
return scripts.takeLock(this.queue, this, token, renew).then(function(res){ | ||
Job.prototype.takeLock = function(token, renew, ensureActive){ | ||
return scripts.takeLock(this.queue, this, token, renew, ensureActive).then(function(res){ | ||
return res === 1; // Indicates successful lock. | ||
@@ -357,2 +357,3 @@ }); | ||
if(!finished){ | ||
var interval; | ||
function onCompleted(job){ | ||
@@ -363,2 +364,3 @@ if(job.jobId === _this.jobId){ | ||
removeListeners(); | ||
clearInterval(interval); | ||
} | ||
@@ -371,2 +373,3 @@ | ||
removeListeners(); | ||
clearInterval(interval); | ||
} | ||
@@ -385,3 +388,3 @@ | ||
// | ||
var interval = setInterval(function(){ | ||
interval = setInterval(function(){ | ||
status(resolve, reject).then(function(finished){ | ||
@@ -388,0 +391,0 @@ if(finished){ |
@@ -66,3 +66,3 @@ /*eslint-env node */ | ||
if(_.isObject(redisPort)){ | ||
if(_.isObject(redisPort)) { | ||
var opts = redisPort; | ||
@@ -72,4 +72,7 @@ var redisOpts = opts.redis || {}; | ||
redisHost = redisOpts.host; | ||
redisOptions = redisOpts.opts ||Ā {}; | ||
redisOptions = redisOpts.opts || {}; | ||
redisOptions.db = redisOpts.DB; | ||
} else if(arguments.length == 3) { | ||
redisPort = parseInt(redisPort); | ||
redisOptions = redisOptions || {}; | ||
} else if(_.isString(redisPort)) { | ||
@@ -601,3 +604,3 @@ try { | ||
Queue.prototype.processJob = function(job, renew){ | ||
Queue.prototype.processJob = function(job){ | ||
var _this = this; | ||
@@ -619,4 +622,8 @@ var lockRenewId; | ||
// | ||
var renew = false; | ||
var lockRenewer = function(){ | ||
return job.takeLock(_this.token, renew).then(function(locked){ | ||
// The first call to lock the job should ensure that the job is in the 'active' state, | ||
// because it might have gotten picked up already by another processor. We don't need | ||
// to do this on subsequent calls. | ||
return job.takeLock(_this.token, renew, !renew).then(function(locked){ | ||
if(locked){ | ||
@@ -626,2 +633,4 @@ renew = true; | ||
} | ||
// TODO: if we failed to re-acquire the lock while trying to renew, should we let the job | ||
// handler know and cancel the timer? | ||
return locked; | ||
@@ -628,0 +637,0 @@ }, function(err){ |
@@ -202,18 +202,7 @@ /** | ||
}, | ||
moveToCompleted: function(job, token, removeOnComplete){ | ||
return scripts.move(job, token, 'active', removeOnComplete ? void 0 : 'completed'); | ||
/* | ||
var params = {}; | ||
if(isNaN(job.attemptsMade)){ | ||
params.attemptsMade = 1; | ||
}else{ | ||
params.attemptsMade = job.attemptsMade++; | ||
} | ||
}, | ||
if(job.stacktrace){ | ||
params.stacktrace = JSON.stringify(job.stacktrace); | ||
} | ||
return this.queue.client.hmsetAsync(this.queue.toKey(this.jobId), params); | ||
*/ | ||
}, | ||
moveToSet: function(queue, set, jobId, context){ | ||
@@ -305,5 +294,37 @@ // | ||
/** | ||
* Takes a lock | ||
* Gets a lock for a job. | ||
* | ||
* @param {Queue} queue The queue for the job | ||
* @param {Job} job The job | ||
* @param {Boolean=false} renew Whether to renew to lock, meaning it will assume the job | ||
* is already locked and just reset the lock expiration. | ||
* @param {Boolean=false} ensureActive Ensures that the job is in the 'active' state. | ||
*/ | ||
takeLock: function(queue, job, token, renew){ | ||
takeLock: function(queue, job, token, renew, ensureActive){ | ||
// Ensures that the lock doesn't exist, or if it does, that we own it. | ||
var ensureOwnershipCall = [ | ||
'local prevLock = redis.call("GET", KEYS[1])', | ||
'if (prevLock and prevLock ~= ARGV[1]) then', | ||
' return 0', | ||
'end' | ||
].join('\n'); | ||
// Ensures that the lock in the 'active' state. | ||
var ensureActiveCall = [ | ||
// Note that while this is inefficient to run a O(n) traversal of the 'active' queue, | ||
// it's highly likely that the job is within the first few elements of the active | ||
// list at the time this call is used. | ||
'local activeJobs = redis.call("LRANGE", KEYS[3], 0, -1)', | ||
'local found = false', | ||
'for _, job in ipairs(activeJobs) do', | ||
' if(job == ARGV[3]) then', | ||
' found = true', | ||
' break', | ||
' end', | ||
'end', | ||
'if (found == false) then', | ||
' return 0', | ||
'end' | ||
].join('\n'); | ||
var lockCall; | ||
@@ -317,2 +338,4 @@ if (renew){ | ||
var script = [ | ||
(renew ? ensureOwnershipCall : ''), | ||
(ensureActive ? ensureActiveCall : ''), | ||
'if(' + lockCall + ') then', | ||
@@ -329,9 +352,11 @@ // Mark the job as having been locked at least once. Used to determine if the job was stalled. | ||
queue.client, | ||
'takeLock' + (renew ? 'Renew' : ''), | ||
'takeLock' + (renew ? 'Renew' : '') + (ensureActive ? 'EnsureActive' : ''), | ||
script, | ||
2, | ||
3, | ||
job.lockKey(), | ||
queue.toKey(job.jobId), | ||
queue.toKey('active'), | ||
token, | ||
queue.LOCK_RENEW_TIME | ||
queue.LOCK_RENEW_TIME, | ||
job.jobId | ||
]; | ||
@@ -338,0 +363,0 @@ |
{ | ||
"name": "bull", | ||
"version": "1.1.1", | ||
"version": "1.1.2", | ||
"description": "Job manager", | ||
@@ -22,5 +22,4 @@ "main": "index.js", | ||
"debuglog": "^1.0.0", | ||
"disturbed": "^1.0.3", | ||
"disturbed": "^1.0.6", | ||
"lodash": "^4.16.6", | ||
"mocha": "^3.1.2", | ||
"node-uuid": "^1.4.7", | ||
@@ -27,0 +26,0 @@ "redis": "^2.6.3", |
@@ -76,6 +76,7 @@ /*eslint-env node */ | ||
queue = buildQueue(); | ||
var numJobs = 100; | ||
workerMessageHandler = function(job) { | ||
jobs.push(job.id); | ||
if(jobs.length === 11) { | ||
if(jobs.length === numJobs) { | ||
var counts = {}; | ||
@@ -92,3 +93,3 @@ var j = 0; | ||
var i = 0; | ||
for(i; i < 11; i++) { | ||
for(i; i < numJobs; i++) { | ||
queue.add({}); | ||
@@ -95,0 +96,0 @@ } |
@@ -172,2 +172,20 @@ /*eslint-env node */ | ||
it('should create a queue with a port number and a hostname', function (done) { | ||
var queue = new Queue('connstring', '6379', '127.0.0.1'); | ||
queue.once('ready', function () { | ||
expect(queue.client.connection_options.host).to.be('127.0.0.1'); | ||
expect(queue.bclient.connection_options.host).to.be('127.0.0.1'); | ||
expect(queue.client.connection_options.port).to.be(6379); | ||
expect(queue.bclient.connection_options.port).to.be(6379); | ||
expect(queue.client.selected_db).to.be(0); | ||
expect(queue.bclient.selected_db).to.be(0); | ||
queue.close().then(done); | ||
}); | ||
}); | ||
it('creates a queue using the supplied redis DB', function (done) { | ||
@@ -219,2 +237,19 @@ var queue = new Queue('custom', { redis: { DB: 1 } }); | ||
}); | ||
it('creates a queue accepting port as a string', function () { | ||
var queue = new Queue('foobar', '6379', 'localhost'); | ||
return queue.add({ foo: 'bar' }).then(function (job) { | ||
expect(job.jobId).to.be.ok(); | ||
expect(job.data.foo).to.be('bar'); | ||
}).then(function () { | ||
queue.process(function (job, jobDone) { | ||
expect(job.data.foo).to.be.equal('bar'); | ||
jobDone(); | ||
}); | ||
}).then(function () { | ||
return queue.close(); | ||
}); | ||
}); | ||
}); | ||
@@ -221,0 +256,0 @@ |
214288
1.35%7
-12.5%5693
1.08%- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
Updated