mongodb-queue
Advanced tools
Comparing version 0.9.1 to 1.0.0
@@ -82,5 +82,10 @@ /** | ||
Queue.prototype.get = function(callback) { | ||
Queue.prototype.get = function(opts, callback) { | ||
var self = this | ||
if ( !callback ) { | ||
callback = opts | ||
opts = {} | ||
} | ||
var visibility = opts.visibility || self.visibility | ||
var query = { | ||
@@ -97,3 +102,3 @@ visible : { $lt : now() }, | ||
ack : id(), | ||
visible : nowPlusSecs(self.visibility), | ||
visible : nowPlusSecs(visibility), | ||
} | ||
@@ -138,5 +143,10 @@ } | ||
Queue.prototype.ping = function(ack, callback) { | ||
Queue.prototype.ping = function(ack, opts, callback) { | ||
var self = this | ||
if ( !callback ) { | ||
callback = opts | ||
opts = {} | ||
} | ||
var visibility = opts.visibility || self.visibility | ||
var query = { | ||
@@ -149,3 +159,3 @@ ack : ack, | ||
$set : { | ||
visible : nowPlusSecs(self.visibility) | ||
visible : nowPlusSecs(visibility) | ||
} | ||
@@ -152,0 +162,0 @@ } |
{ | ||
"name": "mongodb-queue", | ||
"version": "0.9.1", | ||
"version": "1.0.0", | ||
"description": "Message queues which uses MongoDB.", | ||
@@ -5,0 +5,0 @@ "main": "mongodb-queue.js", |
@@ -253,2 +253,31 @@ # mongodb-queue # | ||
### .get() ### | ||
Retrieve a message from the queue: | ||
```js | ||
queue.get(function(err, msg) { | ||
// You can now process the message | ||
}) | ||
``` | ||
You can choose the visibility of an individual retrieved message by passing the `visibility` option: | ||
```js | ||
queue.get({ visibility: 10 }, function(err, msg) { | ||
// You can now process the message for 10s before it goes back into the queue if not ack'd instead of the duration that is set on the queue in general | ||
}) | ||
``` | ||
Message will have the following structure: | ||
```js | ||
{ | ||
id: '533b1eb64ee78a57664cc76c', // ID of the message | ||
ack: 'c8a3cc585cbaaacf549d746d7db72f69', // ID for ack and ping operations | ||
payload: 'Hello, World!', // Payload passed when the message was addded | ||
tries: 1 // Number of times this message has been retrieved from queue without being ack'd | ||
} | ||
``` | ||
### .ack() ### | ||
@@ -281,2 +310,12 @@ | ||
You can also choose the visibility time that gets added by the ping operation by passing the `visibility` option: | ||
```js | ||
queue.get(function(err, msg) { | ||
queue.ping(msg.ack, { visibility: 10 }, function(err, id) { | ||
// this message has had it's visibility window extended by 10s instead of the visibilty set on the queue in general | ||
}) | ||
}) | ||
``` | ||
### .total() ### | ||
@@ -368,4 +407,12 @@ | ||
### 0.8.0 (2014-08-28) ### | ||
Yay! We made it to v1.0. This means that development may slow down but to be honest, I have pretty | ||
much all of the functionality I want in this thing done. Thanks to everyone for feedback, reports | ||
and pull requests. | ||
### 1.0.0 (2014-10-30) ### | ||
* [NEW] Ability to specify a visibility window when getting a message (thanks https://github.com/Gertt) | ||
### 0.9.1 (2014-08-28) ### | ||
* [NEW] Added .clean() method to remove old (processed) messages | ||
@@ -372,0 +419,0 @@ * [NEW] Add 'delay' option to queue.add() so individual messages can be delayed separately |
@@ -111,2 +111,68 @@ var async = require('async') | ||
test("ping: check visibility option overrides the queue visibility", function(t) { | ||
var queue = mongoDbQueue(db, 'ping', { visibility : 3 }) | ||
var msg | ||
async.series( | ||
[ | ||
function(next) { | ||
queue.add('Hello, World!', function(err, id) { | ||
t.ok(!err, 'There is no error when adding a message.') | ||
t.ok(id, 'There is an id returned when adding a message.') | ||
next() | ||
}) | ||
}, | ||
function(next) { | ||
queue.get(function(err, thisMsg) { | ||
msg = thisMsg | ||
// message should reset in three seconds | ||
t.ok(msg.id, 'Got a msg.id (sanity check)') | ||
setTimeout(next, 2 * 1000) | ||
}) | ||
}, | ||
function(next) { | ||
// ping this message so it will be kept alive longer, another 5s instead of 3s | ||
queue.ping(msg.ack, { visibility: 5 }, function(err, id) { | ||
t.ok(!err, 'No error when pinging a message') | ||
t.ok(id, 'Received an id when acking this message') | ||
// wait 4s so the msg would normally have returns to the queue | ||
setTimeout(next, 4 * 1000) | ||
}) | ||
}, | ||
function(next) { | ||
queue.get(function(err, msg) { | ||
// messages should not be back yet | ||
t.ok(!err, 'No error when getting no messages') | ||
t.ok(!msg, 'No msg received') | ||
// wait 2s so the msg should have returns to the queue | ||
setTimeout(next, 2 * 1000) | ||
}) | ||
}, | ||
function(next) { | ||
queue.get(function(err, msg) { | ||
// yes, there should be a message on the queue again | ||
t.ok(msg.id, 'Got a msg.id (sanity check)') | ||
queue.ack(msg.ack, function(err) { | ||
t.ok(!err, 'No error when acking the message') | ||
next() | ||
}) | ||
}) | ||
}, | ||
function(next) { | ||
queue.get(function(err, msg) { | ||
// no more messages | ||
t.ok(!err, 'No error when getting no messages') | ||
t.ok(!msg, 'No msg received') | ||
next() | ||
}) | ||
} | ||
], | ||
function(err) { | ||
if (err) t.fail(err) | ||
t.pass('Finished test ok') | ||
t.end() | ||
} | ||
) | ||
}) | ||
test('db.close()', function(t) { | ||
@@ -113,0 +179,0 @@ t.pass('db.close()') |
@@ -113,2 +113,57 @@ var async = require('async') | ||
test("visibility: check visibility option overrides the queue visibility", function(t) { | ||
var queue = mongoDbQueue(db, 'visibility', { visibility : 2 }) | ||
var originalAck | ||
async.series( | ||
[ | ||
function(next) { | ||
queue.add('Hello, World!', function(err) { | ||
t.ok(!err, 'There is no error when adding a message.') | ||
next() | ||
}) | ||
}, | ||
function(next) { | ||
queue.get({ visibility: 4 }, function(err, msg) { | ||
// wait over 2s so the msg would normally have returns to the queue | ||
t.ok(msg.id, 'Got a msg.id (sanity check)') | ||
setTimeout(next, 3 * 1000) | ||
}) | ||
}, | ||
function(next) { | ||
queue.get(function(err, msg) { | ||
// messages should not be back yet | ||
t.ok(!err, 'No error when getting no messages') | ||
t.ok(!msg, 'No msg received') | ||
// wait 2s so the msg should have returns to the queue | ||
setTimeout(next, 2 * 1000) | ||
}) | ||
}, | ||
function(next) { | ||
queue.get(function(err, msg) { | ||
// yes, there should be a message on the queue again | ||
t.ok(msg.id, 'Got a msg.id (sanity check)') | ||
queue.ack(msg.ack, function(err) { | ||
t.ok(!err, 'No error when acking the message') | ||
next() | ||
}) | ||
}) | ||
}, | ||
function(next) { | ||
queue.get(function(err, msg) { | ||
// no more messages | ||
t.ok(!err, 'No error when getting no messages') | ||
t.ok(!msg, 'No msg received') | ||
next() | ||
}) | ||
} | ||
], | ||
function(err) { | ||
if (err) t.fail(err) | ||
t.pass('Finished test ok') | ||
t.end() | ||
} | ||
) | ||
}) | ||
test('db.close()', function(t) { | ||
@@ -115,0 +170,0 @@ t.pass('db.close()') |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
68980
1392
0
493