Comparing version 3.0.3 to 3.1.0
@@ -173,4 +173,5 @@ 'use strict'; | ||
DBQueue.prototype.listen = function(queue_name, options, consumer) { | ||
var interval = 1000; | ||
var interval = options.interval || 1000; | ||
var max_outstanding = options.max_outstanding || 1; | ||
var max_at_a_time = options.max_jobs_per_interval || 0; | ||
var outstanding = 0; | ||
@@ -180,5 +181,11 @@ | ||
var num_to_consume = max_outstanding - outstanding; | ||
if (!num_to_consume) { | ||
return; | ||
} | ||
if (max_at_a_time) { | ||
num_to_consume = Math.min(num_to_consume, max_at_a_time); | ||
} | ||
var consume_options = { | ||
@@ -185,0 +192,0 @@ lock_time: options.lock_time, |
{ | ||
"name": "dbqueue", | ||
"version": "3.0.3", | ||
"version": "3.1.0", | ||
"description": "A minimal, durable DB-based message queue system", | ||
@@ -12,3 +12,3 @@ "homepage": "https://github.com/elliotf/node-dbqueue", | ||
"scripts": { | ||
"test": "NODE_ENV=test mocha --check-leaks --recursive test" | ||
"test": "NODE_ENV=test mocha -r mocha-clean --check-leaks --recursive test" | ||
}, | ||
@@ -36,2 +36,3 @@ "keywords": [ | ||
"mocha": "^2.4.5", | ||
"mocha-clean": "^1.0.0", | ||
"mocha-sinon": "^1.1.5", | ||
@@ -38,0 +39,0 @@ "nodemon": "^1.9.1", |
@@ -175,6 +175,9 @@ # DBQueue | ||
```javascript | ||
var queue_name = 'example queue'; | ||
var queue_name = 'default queue configuration'; | ||
var options = { | ||
interval: 1000, // milliseconds to wait between polling the queue, defaults to 100 | ||
max_outstanding: 10, // maximum un-ack'ed outstanding messages to have, defaults to 1 | ||
interval: 1000, // milliseconds to wait between polling the queue, defaults to 1000 | ||
max_outstanding: 1, // maximum un-ack'ed outstanding messages to have, defaults to 1 | ||
max_jobs_per_interval: 0, // maximum number of messages to consume per interval, defaults to 0 | ||
// if set to 0, there is no limit per-interval, but max_outstanding | ||
// is still enforced | ||
}; | ||
@@ -189,2 +192,21 @@ | ||
## Example rate-limited consumer for slow jobs | ||
Consume at a steady rate of ~4 messages/sec, up to 10,000 jobs in flight. | ||
```javascript | ||
var queue_name = 'slow job queue with high concurrency'; | ||
var options = { | ||
interval: 500, // check for jobs twice a second | ||
max_jobs_per_interval: 2, | ||
max_outstanding: 10000, | ||
lock_time: 10*60, // jobs take a while, so lock for longer | ||
}; | ||
function consumer(err, message_data, ackMessageCallback) { | ||
// the same signature as the `consume` handler above | ||
} | ||
queue.listen(queue_name, options, consumer); | ||
``` | ||
## Custom serialization | ||
@@ -191,0 +213,0 @@ |
@@ -283,3 +283,3 @@ 'use strict'; | ||
expect(rows).to.have.length(0); | ||
expect(rows).to.deep.equal([]); | ||
@@ -521,2 +521,3 @@ return done(); | ||
var listen_options; | ||
var consumer; | ||
@@ -531,2 +532,4 @@ beforeEach(function(done) { | ||
consumer = this.sinon.spy(); | ||
var todo = ['a', 'b', 'c', 'd', 'e']; | ||
@@ -550,3 +553,2 @@ async.map( | ||
var clock = this.sinon.useFakeTimers(); | ||
var consumer = this.sinon.spy(); | ||
@@ -572,3 +574,2 @@ queue.listen('a queue', listen_options, consumer); | ||
var clock = this.sinon.useFakeTimers(); | ||
var consumer = this.sinon.spy(); | ||
@@ -591,3 +592,2 @@ var stop = queue.listen('a queue', listen_options, consumer); | ||
var clock = this.sinon.useFakeTimers(); | ||
var consumer = this.sinon.spy(); | ||
@@ -668,2 +668,66 @@ var stop = queue.listen('a queue', listen_options, consumer); | ||
context('when provided a max_jobs_per_interval', function() { | ||
var listen_options; | ||
var interval; | ||
beforeEach(function() { | ||
interval = 500; | ||
listen_options = { | ||
max_outstanding: 4, | ||
max_jobs_per_interval: 3, | ||
lock_time: 60000, | ||
interval: interval, | ||
}; | ||
}); | ||
it('will fetch only up to that many jobs at a time', function(done) { | ||
this.sinon.stub(queue, 'consume'); | ||
var consumer = this.sinon.spy(); | ||
var clock = this.sinon.useFakeTimers(); | ||
queue.listen('a queue', listen_options, consumer); | ||
expect(queue.consume).to.have.callCount(0); | ||
clock.tick(interval + 10); | ||
expect(queue.consume).to.have.callCount(1); | ||
expect(queue.consume.args[0][1]).to.deep.equal({ | ||
count: 3, | ||
lock_time: 60000, | ||
}); | ||
done(); | ||
}); | ||
}); | ||
context('when options.interval is provided', function() { | ||
it('checks for new messages every <interval> milliseconds', function(done) { | ||
var clock = this.sinon.useFakeTimers(); | ||
var consumer = this.sinon.spy(); | ||
var interval = 100; | ||
this.sinon.spy(queue, 'consume'); | ||
listen_options = { | ||
max_outstanding: 100, | ||
lock_time: 3000, | ||
interval: interval, | ||
}; | ||
queue.listen('a queue', listen_options, consumer); | ||
expect(queue.consume).to.have.callCount(0); | ||
clock.tick(12*interval + 10); | ||
expect(queue.consume).to.have.callCount(12); | ||
return done(); | ||
}); | ||
}); | ||
context('when there are no messages', function() { | ||
@@ -683,9 +747,10 @@ it('does not call the consumer', function(done) { | ||
}; | ||
queue.listen('an empty queue', listen_options, consumer); | ||
clock.tick(30000); | ||
queue.listen('an empty queue', listen_options, consumer); | ||
expect(consumer).to.have.callCount(0); | ||
clock.tick(30000); | ||
return done(); | ||
expect(consumer).to.have.callCount(0); | ||
return done(); | ||
}); | ||
@@ -692,0 +757,0 @@ }); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
46793
11
1020
251
12