Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bull

Package Overview
Dependencies
Maintainers
1
Versions
198
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bull - npm Package Compare versions

Comparing version 1.0.0-rc4 to 1.0.0

57

chains and topics.md
https://github.com/OptimalBits/bull/issues/120

@@ -11,7 +12,57 @@ # Process chains

.chain(workerD)
.bus('topicName', workerE, workerF, workerG)
.chain(workerH)
.bus('topicName', [workerE, workerF, workerG], workerH)
.chain(workerI)
Bull
.chain(workerB)
.chain()
muxer, demuxer
muxer( chainA, chainB, chainC )
demux( chainA ) -> chainB, chainC, chainD
```javascript
// New type of queue that has two "process" handlers: a regular worker
// and a handler that is called when all child jobs have completed
var GroupQueue = require('bull/lib/group-queue');
var Queue = require('bull');
var listUserQueue = Queue('list-user', 6379, '127.0.0.1');
var fetchMultipleUsersQueue = GroupQueue('fetch-multiple-user', 6379, '127.0.0.1');
listUserQueue.add({ url: '/list-users' });
listUserQueue.process(function(job, chain) {
return request(job.data.url).then(function(userUrls) {
var groupJob = fetchMultipleUsersQueue.add(userUrls);
// chain: new concept that makes this job dependant of another job
// I _really_ have no idea if that can be done in a sane way.
chain(groupJob);
});
});
fetchMultipleUsersQueue.process({
// Called to process a single job
unit: function(job) {
return request(job.data.url).then(function(user) {
return user.interestingInfos;
});
},
// Called when all single jobs have completed for this group
group: function(jobs) {
var result = jobs.reduce(function(sum, job) {
return job.data.salary + sum;
}, 0);
// Do something with result, save it to the database or something
return result;
});
}
});
```

@@ -0,1 +1,36 @@

v1.0.0
======
- improvements in clean (fixes and performance).
[Changes](https://github.com/OptimalBits/bull/compare/v1.0.0-rc4...v1.0.0)
v1.0.0-rc4
==========
- fixed lock renew logic.
- atomized code for getting stalled jobs.
[Changes](https://github.com/OptimalBits/bull/compare/v1.0.0-rc3...v1.0.0-rc4)
v1.0.0-rc3
==========
- smaller fixes.
[Changes](https://github.com/OptimalBits/bull/compare/v1.0.0-rc2...v1.0.0-rc3)
v1.0.0-rc2
==========
- Improved locking when removing and processing stalled jobs.
- Fixed #302 EVALSHA failure.
- Fixed #295 error with redis 3.2.
- Correctly allows the specification of the db
- Honor start/end range for complete/failed jobs.
- Fixed #277 Memory Leaks With Large Queue.
- Support for custom key prefix for redis keys.
[Changes](https://github.com/OptimalBits/bull/compare/v1.0.0-rc1...v1.0.0-rc2)
v1.0.0-rc1

@@ -2,0 +37,0 @@ ==========

2

lib/job.js

@@ -163,3 +163,3 @@ /*eslint-env node */

// If so, move to delayed
return _this._moveToSet('delayed', Date.now() + backoff);
return _this.moveToDelayed(Date.now() + backoff);
}else{

@@ -166,0 +166,0 @@ // If not, retry immediately

@@ -448,3 +448,3 @@ /*eslint-env node */

/**
This function updates the delay timer, which is a timer that timeout
This function updates the delay timer, which is a timer that timeouts
at the next known delayed job.

@@ -795,10 +795,9 @@ */

* @param {string} [type=completed] - The type of job to clean. Possible values
* @param {int} The max number of jobs to clean
* are completed, waiting, active, delayed, failed. Defaults to completed.
*/
Queue.prototype.clean = function (grace, type) {
Queue.prototype.clean = function (grace, type, limit) {
var _this = this;
return new Promise(function (resolve, reject) {
var getter;
if(grace === undefined || grace === null) {

@@ -814,3 +813,3 @@ return reject(new Error('You must define a grace period.'));

'completed',
'waiting',
'wait',
'active',

@@ -822,15 +821,3 @@ 'delayed',

getter = 'get' + type.charAt(0).toUpperCase() + type.slice(1);
_this[getter]().then(function (jobs) {
// take all jobs outside of the grace period
return Promise.filter(jobs, function (job) {
return job && ((!job.timestamp) || (job.timestamp < Date.now() - grace));
});
}).then(function (jobs) {
// remove those old jobs
return Promise.each(jobs, function (job) {
return job.remove(_this.token);
});
}).then(function (jobs) {
return scripts.cleanJobsInSet(_this, type, Date.now() - grace, limit).then(function (jobs) {
_this.emit('cleaned', jobs, type);

@@ -837,0 +824,0 @@ resolve(jobs);

@@ -374,2 +374,76 @@ /**

},
cleanJobsInSet: function(queue, set, ts, limit) {
var command;
var removeCommand;
var breakEarlyCommand = '';
var hash;
limit = limit || 0;
switch(set) {
case 'completed':
case 'failed':
command = 'local jobs = redis.call("SMEMBERS", KEYS[1])';
removeCommand = 'redis.call("SREM", KEYS[1], job)';
hash = 'cleanSet';
break;
case 'wait':
case 'active':
case 'paused':
command = 'local jobs = redis.call("LRANGE", KEYS[1], 0, -1)';
removeCommand = 'redis.call("LREM", KEYS[1], 0, job)';
hash = 'cleanList';
break;
case 'delayed':
command = 'local jobs = redis.call("ZRANGE", KEYS[1], 0, -1)';
removeCommand = 'redis.call("ZREM", KEYS[1], job)';
hash = 'cleanOSet';
break;
}
if(limit > 0) {
breakEarlyCommand = [
'if deletedCount >= limit then',
' break',
'end',
].join('\n');
hash = hash + 'WithLimit';
}
var script = [
command,
'local deleted = {}',
'local deletedCount = 0',
'local limit = tonumber(ARGV[3])',
'local jobTS',
'for _, job in ipairs(jobs) do',
breakEarlyCommand,
' local jobKey = ARGV[1] .. job',
' if (redis.call("EXISTS", jobKey .. ":lock") == 0) then',
' jobTS = redis.call("HGET", jobKey, "timestamp")',
' if(not jobTS or jobTS < ARGV[2]) then',
removeCommand,
' redis.call("DEL", jobKey)',
' deletedCount = deletedCount + 1',
' table.insert(deleted, job)',
' end',
' end',
'end',
'return deleted'
].join('\n');
var args = [
queue.client,
hash,
script,
1,
queue.toKey(set),
queue.toKey(''),
ts,
limit
];
return execScript.apply(scripts, args);
}
};

@@ -376,0 +450,0 @@

{
"name": "bull",
"version": "1.0.0-rc4",
"version": "1.0.0",
"description": "Job manager",

@@ -20,8 +20,8 @@ "main": "index.js",

"dependencies": {
"bluebird": "^3.4.0",
"bluebird": "^3.4.1",
"debuglog": "^1.0.0",
"lodash": "^4.13.1",
"node-uuid": "^1.4.7",
"redis": "^2.6.1",
"semver": "^4.2.0"
"redis": "^2.6.2",
"semver": "^5.1.0"
},

@@ -32,4 +32,4 @@ "devDependencies": {

"gulp-eslint": "^2.0.0",
"mocha": "^2.4.5",
"sinon": "^1.17.2"
"mocha": "^2.5.3",
"sinon": "^1.17.4"
},

@@ -36,0 +36,0 @@ "scripts": {

@@ -114,2 +114,7 @@ Bull Job Manager

pdfQueue.process(function(job){
// Processors can also return promises instead of using the done callback
return pdfAsyncProcessor();
}
videoQueue.add({video: 'http://example.com/video1.mov'});

@@ -405,3 +410,3 @@ audioQueue.add({audio: 'http://example.com/audio1.mp3'});

should have their clocks synchronized. [optional]
opts.attempts {Number} A number of attempts to retry if the job fails [optional]
opts.attempts {Number} The total number of attempts to try the job until it completes.
opts.backoff {Number|Object} Backoff setting for automatic retries if the job fails

@@ -408,0 +413,0 @@ opts.backoff.type {String} Backoff type, which can be either `fixed` or `exponential`

@@ -1624,2 +1624,45 @@ /*eslint-env node */

it('should clean all waiting jobs', function (done) {
queue.add({some: 'data'});
queue.add({some: 'data'});
Promise.delay(100).then(function () {
return queue.clean(0, 'wait');
}).then(function (jobs) {
expect(jobs.length).to.be(2);
return queue.count();
}).then(function(len) {
expect(len).to.be(0);
done();
});
});
it('should clean all delayed jobs', function (done) {
queue.add({some: 'data'}, { delay: 5000 });
queue.add({some: 'data'}, { delay: 5000 });
Promise.delay(100).then(function () {
return queue.clean(0, 'delayed');
}).then(function (jobs) {
expect(jobs.length).to.be(2);
return queue.count();
}).then(function(len) {
expect(len).to.be(0);
done();
});
});
it('should clean the number of jobs requested', function (done) {
queue.add({some: 'data'});
queue.add({some: 'data'});
queue.add({some: 'data'});
Promise.delay(100).then(function () {
return queue.clean(0, 'wait', 1);
}).then(function (jobs) {
expect(jobs.length).to.be(1);
return queue.count();
}).then(function(len) {
expect(len).to.be(2);
done();
});
});
it('should clean a job without a timestamp', function (done) {

@@ -1649,2 +1692,2 @@ var client = redis.createClient(6379, '127.0.0.1', {});

});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc