Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bull

Package Overview
Dependencies
Maintainers
1
Versions
198
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bull - npm Package Compare versions

Comparing version 3.0.0-rc.7 to 3.0.0-rc.8

lib/commands/extendLock-2.lua

7

CHANGELOG.md

@@ -0,1 +1,8 @@

v.3.0.0-rc.8
============
- Enhanced job fetching #651 (faster and more reliable).
[Changes](https://github.com/OptimalBits/bull/compare/v3.0.0-rc.7...v3.0.0-rc.8)
v.3.0.0-rc.7

@@ -2,0 +9,0 @@ ============

52

lib/queue.js

@@ -172,3 +172,3 @@ /*eslint-env node */

retryProcessDelay: 5000,
drainDelay: 2
drainDelay: 5
});

@@ -203,5 +203,6 @@

'priority',
'unlocked-check',
'stalled-check',
'completed',
'failed'], function(key){
'failed',
'stalled'], function(key){
keys[key] = _this.toKey(key);

@@ -736,4 +737,4 @@ });

return scripts.moveUnlockedJobsToWait(this).then(function(responses){
var handleFailedJobs = responses[0].map(function(jobId){
return scripts.moveUnlockedJobsToWait(this).spread(function(failed, stalled){
var handleFailedJobs = failed.map(function(jobId){
return _this.getJobFromId(jobId).then(function(job){

@@ -744,3 +745,3 @@ _this.emit('failed', job, new Error('job stalled more than allowable limit'), 'active' );

});
var handleStalledJobs = responses[1].map(function(jobId){
var handleStalledJobs = stalled.map(function(jobId){
return _this.getJobFromId(jobId).then(function(job){

@@ -885,13 +886,24 @@ _this.emit('stalled', job);

//
// Waiting for new jobs to arrive
//
return this.bclient.blpop(this.toKey('wait:added'), _this.settings.drainDelay).then(function(result){
if(!result){
_this.emit('drained');
return;
}
var key = result[0], jobId = result[1];
return scripts.moveToActive(_this).spread(function(jobData, jobId){
if(this.drained){
//
// Waiting for new jobs to arrive
//
return this.bclient.brpoplpush(this.toKey('wait'), this.toKey('active'), _this.settings.drainDelay).then(function(jobId){
if(jobId){
return moveToActive(jobId);
}
}, function(err){
// Swallow error
if(err.message !== 'Connection is closed.'){
console.error('BRPOPLPUSH', err);
}
});
}else{
return moveToActive();
}
function moveToActive(jobId){
return scripts.moveToActive(_this, jobId).spread(function(jobData, jobId){
if(jobData){
_this.drained = false;
var job = Job.fromJSON(_this, jobData, jobId);

@@ -905,11 +917,7 @@ if(job.opts.repeat){

}else{
_this.drained = true;
_this.emit('drained');
}
});
}, function(err){
// Swallow error
if(err.message !== 'Connection is closed.'){
console.error('BLPOP', err);
}
});
}
};

@@ -916,0 +924,0 @@

@@ -73,3 +73,3 @@ /**

moveToActive: function(queue){
moveToActive: function(queue, jobId){
var queueKeys = queue.keys;

@@ -79,2 +79,3 @@ var keys = [queueKeys.wait, queueKeys.active, queueKeys.priority];

keys[3] = keys[1] + '@' + queue.token;
keys[4] = queueKeys.stalled;

@@ -85,3 +86,4 @@ var args = [

queue.settings.lockDuration,
Date.now()
Date.now(),
jobId
];

@@ -221,3 +223,8 @@

extendLock: function(queue, jobId){
return queue.client.extendLock([queue.toKey(jobId) + ':lock', queue.token, queue.settings.lockDuration]);
return queue.client.extendLock([
queue.toKey(jobId) + ':lock',
queue.keys.stalled,
queue.token,
queue.settings.lockDuration,
jobId]);
},

@@ -258,11 +265,12 @@

var keys = [
queue.keys.stalled,
queue.keys.wait,
queue.keys.active,
queue.keys.wait,
queue.keys.failed,
queue.keys['stalled-check'],
queue.keys['meta-paused'],
queue.keys.paused,
queue.keys['unlocked-check']
];
var args = [queue.settings.maxStalledCount, queue.toKey(''), Date.now(), queue.settings.stalledInterval];
return queue.client.moveUnlockedJobsToWait(keys.concat(args));
return queue.client.moveStalledJobsToWait(keys.concat(args));
},

@@ -269,0 +277,0 @@

@@ -5,3 +5,8 @@ /*eslint-env node */

module.exports = function(Queue){
// IDEA, How to store metadata associated to a worker.
// create a key from the worker ID associated to the given name.
// We keep a hash table bull:myqueue:workers where every worker is a hash key workername:workerId with json holding
// metadata of the worker. The worker key gets expired every 30 seconds or so, we renew the worker metadata.
//
Queue.prototype.setWorkerName = function(){

@@ -8,0 +13,0 @@ return this.client.client('setname', this.clientName());

{
"name": "bull",
"version": "3.0.0-rc.7",
"version": "3.0.0-rc.8",
"description": "Job manager",

@@ -5,0 +5,0 @@ "main": "./lib/queue",

@@ -22,5 +22,6 @@ /*eslint-env node */

return client.flushdb().then(function(){
queue = utils.buildQueue('repeat', {settings: {
queue = utils.buildQueue('repeat', {settings: {
guardInterval: Number.MAX_VALUE,
stalledInterval: Number.MAX_VALUE
stalledInterval: Number.MAX_VALUE,
drainDelay: 1 // Small delay so that .close is faster.
}});

@@ -111,3 +112,3 @@ });

queue.add('repeat', {foo: 'bar'}, {repeat: {
cron: '0 1 * * *',
cron: '0 1 * * *',
endDate: new Date('2017-05-10 13:12:00')}

@@ -174,2 +175,18 @@ }).then(function(){

});
it('should create two jobs with the same ids', function(){
var options = {
repeat: {
cron: '0 1 * * *',
},
};
var p1 = queue.add({foo: 'bar'}, options);
var p2 = queue.add({foo: 'bar'}, options);
return Promise.all([p1, p2]).then(function(jobs) {
expect(jobs.length).to.be.eql(2);
expect(jobs[0].id).to.be.eql(jobs[1].id);
});
});
});

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc