Comparing version 1.0.0-rc4 to 1.0.0
https://github.com/OptimalBits/bull/issues/120 | ||
@@ -11,7 +12,57 @@ # Process chains | ||
.chain(workerD) | ||
.bus('topicName', workerE, workerF, workerG) | ||
.chain(workerH) | ||
.bus('topicName', [workerE, workerF, workerG], workerH) | ||
.chain(workerI) | ||
Bull | ||
.chain(workerB) | ||
.chain() | ||
muxer, demuxer | ||
muxer( chainA, chainB, chainC ) | ||
demux( chainA ) -> chainB, chainC, chainD | ||
```javascript | ||
// New type of queue that has two "process" handlers: a regular worker | ||
// and a handler that is called when all child jobs have completed | ||
var GroupQueue = require('bull/lib/group-queue'); | ||
var Queue = require('bull'); | ||
var listUserQueue = Queue('list-user', 6379, '127.0.0.1'); | ||
var fetchMultipleUsersQueue = GroupQueue('fetch-multiple-user', 6379, '127.0.0.1'); | ||
listUserQueue.add({ url: '/list-users' }); | ||
listUserQueue.process(function(job, chain) { | ||
return request(job.data.url).then(function(userUrls) { | ||
var groupJob = fetchMultipleUsersQueue.add(userUrls); | ||
// chain: new concept that makes this job dependant of another job | ||
// I _really_ have no idea if that can be done in a sane way. | ||
chain(groupJob); | ||
}); | ||
}); | ||
fetchMultipleUsersQueue.process({ | ||
// Called to process a single job | ||
unit: function(job) { | ||
return request(job.data.url).then(function(user) { | ||
return user.interestingInfos; | ||
}); | ||
}, | ||
// Called when all single jobs have completed for this group | ||
group: function(jobs) { | ||
var result = jobs.reduce(function(sum, job) { | ||
return job.data.salary + sum; | ||
}, 0); | ||
// Do something with result, save it to the database or something | ||
return result; | ||
}); | ||
} | ||
}); | ||
``` |
@@ -0,1 +1,36 @@ | ||
v1.0.0 | ||
====== | ||
- improvements in clean (fixes and performance). | ||
[Changes](https://github.com/OptimalBits/bull/compare/v1.0.0-rc4...v1.0.0) | ||
v1.0.0-rc4 | ||
========== | ||
- fixed lock renew logic. | ||
- atomized code for getting stalled jobs. | ||
[Changes](https://github.com/OptimalBits/bull/compare/v1.0.0-rc3...v1.0.0-rc4) | ||
v1.0.0-rc3 | ||
========== | ||
- smaller fixes. | ||
[Changes](https://github.com/OptimalBits/bull/compare/v1.0.0-rc2...v1.0.0-rc3) | ||
v1.0.0-rc2 | ||
========== | ||
- Improved locking when removing and processing stalled jobs. | ||
- Fixed #302 EVALSHA failure. | ||
- Fixed #295 error with redis 3.2. | ||
- Correctly allows the specification of the db | ||
- Honor start/end range for complete/failed jobs. | ||
- Fixed #277 Memory Leaks With Large Queue. | ||
- Support for custom key prefix for redis keys. | ||
[Changes](https://github.com/OptimalBits/bull/compare/v1.0.0-rc1...v1.0.0-rc2) | ||
v1.0.0-rc1 | ||
@@ -2,0 +37,0 @@ ========== |
@@ -163,3 +163,3 @@ /*eslint-env node */ | ||
// If so, move to delayed | ||
return _this._moveToSet('delayed', Date.now() + backoff); | ||
return _this.moveToDelayed(Date.now() + backoff); | ||
}else{ | ||
@@ -166,0 +166,0 @@ // If not, retry immediately |
@@ -448,3 +448,3 @@ /*eslint-env node */ | ||
/** | ||
This function updates the delay timer, which is a timer that timeout | ||
This function updates the delay timer, which is a timer that timeouts | ||
at the next known delayed job. | ||
@@ -795,10 +795,9 @@ */ | ||
* @param {string} [type=completed] - The type of job to clean. Possible values | ||
* @param {int} The max number of jobs to clean | ||
* are completed, waiting, active, delayed, failed. Defaults to completed. | ||
*/ | ||
Queue.prototype.clean = function (grace, type) { | ||
Queue.prototype.clean = function (grace, type, limit) { | ||
var _this = this; | ||
return new Promise(function (resolve, reject) { | ||
var getter; | ||
if(grace === undefined || grace === null) { | ||
@@ -814,3 +813,3 @@ return reject(new Error('You must define a grace period.')); | ||
'completed', | ||
'waiting', | ||
'wait', | ||
'active', | ||
@@ -822,15 +821,3 @@ 'delayed', | ||
getter = 'get' + type.charAt(0).toUpperCase() + type.slice(1); | ||
_this[getter]().then(function (jobs) { | ||
// take all jobs outside of the grace period | ||
return Promise.filter(jobs, function (job) { | ||
return job && ((!job.timestamp) || (job.timestamp < Date.now() - grace)); | ||
}); | ||
}).then(function (jobs) { | ||
// remove those old jobs | ||
return Promise.each(jobs, function (job) { | ||
return job.remove(_this.token); | ||
}); | ||
}).then(function (jobs) { | ||
return scripts.cleanJobsInSet(_this, type, Date.now() - grace, limit).then(function (jobs) { | ||
_this.emit('cleaned', jobs, type); | ||
@@ -837,0 +824,0 @@ resolve(jobs); |
@@ -374,2 +374,76 @@ /** | ||
}, | ||
cleanJobsInSet: function(queue, set, ts, limit) { | ||
var command; | ||
var removeCommand; | ||
var breakEarlyCommand = ''; | ||
var hash; | ||
limit = limit || 0; | ||
switch(set) { | ||
case 'completed': | ||
case 'failed': | ||
command = 'local jobs = redis.call("SMEMBERS", KEYS[1])'; | ||
removeCommand = 'redis.call("SREM", KEYS[1], job)'; | ||
hash = 'cleanSet'; | ||
break; | ||
case 'wait': | ||
case 'active': | ||
case 'paused': | ||
command = 'local jobs = redis.call("LRANGE", KEYS[1], 0, -1)'; | ||
removeCommand = 'redis.call("LREM", KEYS[1], 0, job)'; | ||
hash = 'cleanList'; | ||
break; | ||
case 'delayed': | ||
command = 'local jobs = redis.call("ZRANGE", KEYS[1], 0, -1)'; | ||
removeCommand = 'redis.call("ZREM", KEYS[1], job)'; | ||
hash = 'cleanOSet'; | ||
break; | ||
} | ||
if(limit > 0) { | ||
breakEarlyCommand = [ | ||
'if deletedCount >= limit then', | ||
' break', | ||
'end', | ||
].join('\n'); | ||
hash = hash + 'WithLimit'; | ||
} | ||
var script = [ | ||
command, | ||
'local deleted = {}', | ||
'local deletedCount = 0', | ||
'local limit = tonumber(ARGV[3])', | ||
'local jobTS', | ||
'for _, job in ipairs(jobs) do', | ||
breakEarlyCommand, | ||
' local jobKey = ARGV[1] .. job', | ||
' if (redis.call("EXISTS", jobKey .. ":lock") == 0) then', | ||
' jobTS = redis.call("HGET", jobKey, "timestamp")', | ||
' if(not jobTS or jobTS < ARGV[2]) then', | ||
removeCommand, | ||
' redis.call("DEL", jobKey)', | ||
' deletedCount = deletedCount + 1', | ||
' table.insert(deleted, job)', | ||
' end', | ||
' end', | ||
'end', | ||
'return deleted' | ||
].join('\n'); | ||
var args = [ | ||
queue.client, | ||
hash, | ||
script, | ||
1, | ||
queue.toKey(set), | ||
queue.toKey(''), | ||
ts, | ||
limit | ||
]; | ||
return execScript.apply(scripts, args); | ||
} | ||
}; | ||
@@ -376,0 +450,0 @@ |
{ | ||
"name": "bull", | ||
"version": "1.0.0-rc4", | ||
"version": "1.0.0", | ||
"description": "Job manager", | ||
@@ -20,8 +20,8 @@ "main": "index.js", | ||
"dependencies": { | ||
"bluebird": "^3.4.0", | ||
"bluebird": "^3.4.1", | ||
"debuglog": "^1.0.0", | ||
"lodash": "^4.13.1", | ||
"node-uuid": "^1.4.7", | ||
"redis": "^2.6.1", | ||
"semver": "^4.2.0" | ||
"redis": "^2.6.2", | ||
"semver": "^5.1.0" | ||
}, | ||
@@ -32,4 +32,4 @@ "devDependencies": { | ||
"gulp-eslint": "^2.0.0", | ||
"mocha": "^2.4.5", | ||
"sinon": "^1.17.2" | ||
"mocha": "^2.5.3", | ||
"sinon": "^1.17.4" | ||
}, | ||
@@ -36,0 +36,0 @@ "scripts": { |
@@ -114,2 +114,7 @@ Bull Job Manager | ||
pdfQueue.process(function(job){ | ||
// Processors can also return promises instead of using the done callback | ||
return pdfAsyncProcessor(); | ||
} | ||
videoQueue.add({video: 'http://example.com/video1.mov'}); | ||
@@ -405,3 +410,3 @@ audioQueue.add({audio: 'http://example.com/audio1.mp3'}); | ||
should have their clocks synchronized. [optional] | ||
opts.attempts {Number} A number of attempts to retry if the job fails [optional] | ||
opts.attempts {Number} The total number of attempts to try the job until it completes. | ||
opts.backoff {Number|Object} Backoff setting for automatic retries if the job fails | ||
@@ -408,0 +413,0 @@ opts.backoff.type {String} Backoff type, which can be either `fixed` or `exponential` |
@@ -1624,2 +1624,45 @@ /*eslint-env node */ | ||
it('should clean all waiting jobs', function (done) { | ||
queue.add({some: 'data'}); | ||
queue.add({some: 'data'}); | ||
Promise.delay(100).then(function () { | ||
return queue.clean(0, 'wait'); | ||
}).then(function (jobs) { | ||
expect(jobs.length).to.be(2); | ||
return queue.count(); | ||
}).then(function(len) { | ||
expect(len).to.be(0); | ||
done(); | ||
}); | ||
}); | ||
it('should clean all delayed jobs', function (done) { | ||
queue.add({some: 'data'}, { delay: 5000 }); | ||
queue.add({some: 'data'}, { delay: 5000 }); | ||
Promise.delay(100).then(function () { | ||
return queue.clean(0, 'delayed'); | ||
}).then(function (jobs) { | ||
expect(jobs.length).to.be(2); | ||
return queue.count(); | ||
}).then(function(len) { | ||
expect(len).to.be(0); | ||
done(); | ||
}); | ||
}); | ||
it('should clean the number of jobs requested', function (done) { | ||
queue.add({some: 'data'}); | ||
queue.add({some: 'data'}); | ||
queue.add({some: 'data'}); | ||
Promise.delay(100).then(function () { | ||
return queue.clean(0, 'wait', 1); | ||
}).then(function (jobs) { | ||
expect(jobs.length).to.be(1); | ||
return queue.count(); | ||
}).then(function(len) { | ||
expect(len).to.be(2); | ||
done(); | ||
}); | ||
}); | ||
it('should clean a job without a timestamp', function (done) { | ||
@@ -1649,2 +1692,2 @@ var client = redis.createClient(6379, '127.0.0.1', {}); | ||
}); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
13318727
5228
1
703
11
+ Addedsemver@5.7.2(transitive)
- Removedsemver@4.3.6(transitive)
Updatedbluebird@^3.4.1
Updatedredis@^2.6.2
Updatedsemver@^5.1.0