Comparing version 3.0.0-rc.7 to 3.0.0-rc.8
@@ -0,1 +1,8 @@ | ||
v.3.0.0-rc.8 | ||
============ | ||
- Enhanced job fetching #651 (faster and more reliable). | ||
[Changes](https://github.com/OptimalBits/bull/compare/v3.0.0-rc.7...v3.0.0-rc.8) | ||
v.3.0.0-rc.7 | ||
@@ -2,0 +9,0 @@ ============ |
@@ -172,3 +172,3 @@ /*eslint-env node */ | ||
retryProcessDelay: 5000, | ||
drainDelay: 2 | ||
drainDelay: 5 | ||
}); | ||
@@ -203,5 +203,6 @@ | ||
'priority', | ||
'unlocked-check', | ||
'stalled-check', | ||
'completed', | ||
'failed'], function(key){ | ||
'failed', | ||
'stalled'], function(key){ | ||
keys[key] = _this.toKey(key); | ||
@@ -736,4 +737,4 @@ }); | ||
return scripts.moveUnlockedJobsToWait(this).then(function(responses){ | ||
var handleFailedJobs = responses[0].map(function(jobId){ | ||
return scripts.moveUnlockedJobsToWait(this).spread(function(failed, stalled){ | ||
var handleFailedJobs = failed.map(function(jobId){ | ||
return _this.getJobFromId(jobId).then(function(job){ | ||
@@ -744,3 +745,3 @@ _this.emit('failed', job, new Error('job stalled more than allowable limit'), 'active' ); | ||
}); | ||
var handleStalledJobs = responses[1].map(function(jobId){ | ||
var handleStalledJobs = stalled.map(function(jobId){ | ||
return _this.getJobFromId(jobId).then(function(job){ | ||
@@ -885,13 +886,24 @@ _this.emit('stalled', job); | ||
// | ||
// Waiting for new jobs to arrive | ||
// | ||
return this.bclient.blpop(this.toKey('wait:added'), _this.settings.drainDelay).then(function(result){ | ||
if(!result){ | ||
_this.emit('drained'); | ||
return; | ||
} | ||
var key = result[0], jobId = result[1]; | ||
return scripts.moveToActive(_this).spread(function(jobData, jobId){ | ||
if(this.drained){ | ||
// | ||
// Waiting for new jobs to arrive | ||
// | ||
return this.bclient.brpoplpush(this.toKey('wait'), this.toKey('active'), _this.settings.drainDelay).then(function(jobId){ | ||
if(jobId){ | ||
return moveToActive(jobId); | ||
} | ||
}, function(err){ | ||
// Swallow error | ||
if(err.message !== 'Connection is closed.'){ | ||
console.error('BRPOPLPUSH', err); | ||
} | ||
}); | ||
}else{ | ||
return moveToActive(); | ||
} | ||
function moveToActive(jobId){ | ||
return scripts.moveToActive(_this, jobId).spread(function(jobData, jobId){ | ||
if(jobData){ | ||
_this.drained = false; | ||
var job = Job.fromJSON(_this, jobData, jobId); | ||
@@ -905,11 +917,7 @@ if(job.opts.repeat){ | ||
}else{ | ||
_this.drained = true; | ||
_this.emit('drained'); | ||
} | ||
}); | ||
}, function(err){ | ||
// Swallow error | ||
if(err.message !== 'Connection is closed.'){ | ||
console.error('BLPOP', err); | ||
} | ||
}); | ||
} | ||
}; | ||
@@ -916,0 +924,0 @@ |
@@ -73,3 +73,3 @@ /** | ||
moveToActive: function(queue){ | ||
moveToActive: function(queue, jobId){ | ||
var queueKeys = queue.keys; | ||
@@ -79,2 +79,3 @@ var keys = [queueKeys.wait, queueKeys.active, queueKeys.priority]; | ||
keys[3] = keys[1] + '@' + queue.token; | ||
keys[4] = queueKeys.stalled; | ||
@@ -85,3 +86,4 @@ var args = [ | ||
queue.settings.lockDuration, | ||
Date.now() | ||
Date.now(), | ||
jobId | ||
]; | ||
@@ -221,3 +223,8 @@ | ||
extendLock: function(queue, jobId){ | ||
return queue.client.extendLock([queue.toKey(jobId) + ':lock', queue.token, queue.settings.lockDuration]); | ||
return queue.client.extendLock([ | ||
queue.toKey(jobId) + ':lock', | ||
queue.keys.stalled, | ||
queue.token, | ||
queue.settings.lockDuration, | ||
jobId]); | ||
}, | ||
@@ -258,11 +265,12 @@ | ||
var keys = [ | ||
queue.keys.stalled, | ||
queue.keys.wait, | ||
queue.keys.active, | ||
queue.keys.wait, | ||
queue.keys.failed, | ||
queue.keys['stalled-check'], | ||
queue.keys['meta-paused'], | ||
queue.keys.paused, | ||
queue.keys['unlocked-check'] | ||
]; | ||
var args = [queue.settings.maxStalledCount, queue.toKey(''), Date.now(), queue.settings.stalledInterval]; | ||
return queue.client.moveUnlockedJobsToWait(keys.concat(args)); | ||
return queue.client.moveStalledJobsToWait(keys.concat(args)); | ||
}, | ||
@@ -269,0 +277,0 @@ |
@@ -5,3 +5,8 @@ /*eslint-env node */ | ||
module.exports = function(Queue){ | ||
// IDEA, How to store metadata associated to a worker. | ||
// create a key from the worker ID associated to the given name. | ||
// We keep a hash table bull:myqueue:workers where every worker is a hash key workername:workerId with json holding | ||
// metadata of the worker. The worker key gets expired every 30 seconds or so, we renew the worker metadata. | ||
// | ||
Queue.prototype.setWorkerName = function(){ | ||
@@ -8,0 +13,0 @@ return this.client.client('setname', this.clientName()); |
{ | ||
"name": "bull", | ||
"version": "3.0.0-rc.7", | ||
"version": "3.0.0-rc.8", | ||
"description": "Job manager", | ||
@@ -5,0 +5,0 @@ "main": "./lib/queue", |
@@ -22,5 +22,6 @@ /*eslint-env node */ | ||
return client.flushdb().then(function(){ | ||
queue = utils.buildQueue('repeat', {settings: { | ||
queue = utils.buildQueue('repeat', {settings: { | ||
guardInterval: Number.MAX_VALUE, | ||
stalledInterval: Number.MAX_VALUE | ||
stalledInterval: Number.MAX_VALUE, | ||
drainDelay: 1 // Small delay so that .close is faster. | ||
}}); | ||
@@ -111,3 +112,3 @@ }); | ||
queue.add('repeat', {foo: 'bar'}, {repeat: { | ||
cron: '0 1 * * *', | ||
cron: '0 1 * * *', | ||
endDate: new Date('2017-05-10 13:12:00')} | ||
@@ -174,2 +175,18 @@ }).then(function(){ | ||
}); | ||
it('should create two jobs with the same ids', function(){ | ||
var options = { | ||
repeat: { | ||
cron: '0 1 * * *', | ||
}, | ||
}; | ||
var p1 = queue.add({foo: 'bar'}, options); | ||
var p2 = queue.add({foo: 'bar'}, options); | ||
return Promise.all([p1, p2]).then(function(jobs) { | ||
expect(jobs.length).to.be.eql(2); | ||
expect(jobs[0].id).to.be.eql(jobs[1].id); | ||
}); | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
485574
65
5498