mongodb-queue
Advanced tools
Comparing version 0.2.1 to 0.3.0
@@ -22,2 +22,6 @@ /** | ||
} | ||
function datePlus(date, s) { | ||
var delayDate = (new Date(date)).getTime() + s * 1000 | ||
return (new Date(delayDate)).toISOString() | ||
} | ||
@@ -40,4 +44,5 @@ // the Queue object itself | ||
this.msgs = mongoDbClient.collection(opts.collectionName) | ||
this.queueName = opts.queueName | ||
this.queueName = queueName | ||
this.visibility = opts.visibility || 30 | ||
this.delay = opts.delay || 0 | ||
} | ||
@@ -48,11 +53,17 @@ | ||
var aDate = date() | ||
var delayDate | ||
if ( self.delay ) { | ||
delayDate = datePlus(aDate, self.delay) | ||
} | ||
var thisId = id() | ||
var msg = { | ||
queue : self.queueName, | ||
id : id(), | ||
id : thisId, | ||
inserted : aDate, | ||
visible : aDate, | ||
visible : delayDate || aDate, | ||
payload : payload, | ||
} | ||
self.msgs.insert(msg, function(err, item) { | ||
callback(err) | ||
self.msgs.insert(msg, function(err) { | ||
if (err) return callback(err) | ||
callback(null, thisId) | ||
}) | ||
@@ -98,4 +109,3 @@ } | ||
ack : ack, | ||
// ToDo: check you can't ack an old message which has expired | ||
// visible : { $lt : date() }, | ||
visible : { $gt : date() }, | ||
} | ||
@@ -107,5 +117,11 @@ var update = { | ||
} | ||
self.msgs.findAndModify(query, undefined, update, { new : true }, callback) | ||
self.msgs.findAndModify(query, undefined, update, { new : true }, function(err, msg, blah) { | ||
if (err) return callback(err) | ||
if ( !msg ) { | ||
return callback(new Error("Unidentified id/ack pair : " + id + '/' + ack)) | ||
} | ||
callback(null, msg) | ||
}) | ||
} | ||
module.exports = Queue |
{ | ||
"name": "mongodb-queue", | ||
"version": "0.2.1", | ||
"version": "0.3.0", | ||
"description": "Message queues which uses MongoDB.", | ||
@@ -5,0 +5,0 @@ "main": "mongodb-queue.js", |
@@ -112,2 +112,11 @@ # mongodb-queue # | ||
### 0.3.0 (2014-03-19) ### | ||
* [NEW] Return the message id when added to a queue | ||
* [NEW] Ability to set a default delay on all messages in a queue | ||
* [FIX] Make sure old messages (outside of visibility window) aren't deleted when acked | ||
* [FIX] Internal: Fix `queueName` | ||
* [TEST] Added test for multiple messages | ||
* [TEST] Added test for delayed messages | ||
### 0.2.1 (2014-03-19) ### | ||
@@ -114,0 +123,0 @@ |
@@ -26,4 +26,5 @@ var mongodb = require('mongodb') | ||
function(next) { | ||
queue.add('Hello, World!', function(err) { | ||
queue.add('Hello, World!', function(err, id) { | ||
t.ok(!err, 'There is no error when adding a message.') | ||
t.ok(id, 'Received an id for this message') | ||
next() | ||
@@ -30,0 +31,0 @@ }) |
@@ -50,7 +50,67 @@ var async = require('async') | ||
t.end() | ||
} | ||
) | ||
}) | ||
test("visibility: check that a late ack doesn't remove the msg", function(t) { | ||
var queue = new Queue(db, 'visibility', { visibility : 3 }) | ||
var originalAck | ||
async.series( | ||
[ | ||
function(next) { | ||
queue.add('Hello, World!', function(err) { | ||
t.ok(!err, 'There is no error when adding a message.') | ||
next() | ||
}) | ||
}, | ||
function(next) { | ||
queue.get(function(err, msg) { | ||
t.ok(msg.id, 'Got a msg.id (sanity check)') | ||
// remember this original ack | ||
originalAck = msg.ack | ||
// wait over 3s so the msg returns to the queue | ||
setTimeout(function() { | ||
t.pass('Back from timeout, now acking the message') | ||
// now ack the message but too late - it shouldn't be deleted | ||
queue.ack(msg.id, msg.ack, function(err, msg) { | ||
t.ok(err, 'Got an error when acking the message late') | ||
t.ok(!msg, 'No message was updated') | ||
next() | ||
}) | ||
}, 4 * 1000) | ||
}) | ||
}, | ||
function(next) { | ||
queue.get(function(err, msg) { | ||
// the message should now be able to be retrieved, with a new 'ack' id | ||
t.ok(msg.id, 'Got a msg.id (sanity check)') | ||
t.notEqual(msg.ack, originalAck, 'Original ack and new ack are different') | ||
// now ack this new retrieval | ||
queue.ack(msg.id, msg.ack, next) | ||
}) | ||
}, | ||
function(next) { | ||
queue.get(function(err, msg) { | ||
// no more messages | ||
t.ok(!err, 'No error when getting no messages') | ||
t.ok(!msg, 'No msg received') | ||
next() | ||
}) | ||
}, | ||
], | ||
function(err) { | ||
if (err) t.fail(err) | ||
t.pass('Finished test ok') | ||
t.end() | ||
db.close() | ||
} | ||
) | ||
}) | ||
}) | ||
}) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
19504
10
397
149