mongodb-queue
Advanced tools
Comparing version 0.3.1 to 0.4.0
@@ -28,18 +28,14 @@ /** | ||
// the Queue object itself | ||
function Queue(mongoDbClient, queueName, opts) { | ||
function Queue(mongoDbClient, name, opts) { | ||
if (!(this instanceof Queue)) { | ||
return new Queue(mongoDbClient, queueName, opts) | ||
return new Queue(mongoDbClient, name, opts) | ||
} | ||
if ( !queueName ) { | ||
throw new Error("mongodb-queue: provide a queueName") | ||
if ( !name ) { | ||
throw new Error("mongodb-queue: provide a queue name") | ||
} | ||
opts = opts || {} | ||
// set some defaults | ||
opts.collectionName = opts.collectionName || 'msgs' | ||
// this.db = mongoDbClient | ||
this.msgs = mongoDbClient.collection(opts.collectionName) | ||
this.queueName = queueName | ||
this.name = name | ||
this.col = mongoDbClient.collection(name) | ||
this.visibility = opts.visibility || 30 | ||
@@ -56,13 +52,9 @@ this.delay = opts.delay || 0 | ||
} | ||
var thisId = id() | ||
var msg = { | ||
queue : self.queueName, | ||
id : thisId, | ||
inserted : aDate, | ||
visible : delayDate || aDate, | ||
payload : payload, | ||
} | ||
self.msgs.insert(msg, function(err) { | ||
self.col.insert(msg, function(err, results) { | ||
if (err) return callback(err) | ||
callback(null, thisId) | ||
callback(null, '' + results[0]._id) | ||
}) | ||
@@ -75,3 +67,2 @@ } | ||
var query = { | ||
queue : self.queueName, | ||
visible : { $lt : date() }, | ||
@@ -91,7 +82,8 @@ deleted : { $exists : false }, | ||
self.msgs.findAndModify(query, sort, update, { new : true }, function(err, msg) { | ||
self.col.findAndModify(query, sort, update, { new : true }, function(err, msg) { | ||
if (err) return callback(err) | ||
if (!msg) return callback() | ||
callback(null, { | ||
id : msg.id, | ||
// convert '_id' to an 'id' string | ||
id : '' + msg._id, | ||
ack : msg.ack, | ||
@@ -104,7 +96,6 @@ payload : msg.payload, | ||
Queue.prototype.ack = function(id, ack, callback) { | ||
Queue.prototype.ping = function(ack, callback) { | ||
var self = this | ||
var query = { | ||
id : id, | ||
ack : ack, | ||
@@ -115,11 +106,32 @@ visible : { $gt : date() }, | ||
$set : { | ||
visible : (new Date(Date.now() + self.visibility * 1000)).toISOString(), | ||
} | ||
} | ||
self.col.findAndModify(query, undefined, update, { new : true }, function(err, msg, blah) { | ||
if (err) return callback(err) | ||
if ( !msg ) { | ||
return callback(new Error("Queue.ping(): Unidentified ack : " + ack)) | ||
} | ||
callback() | ||
}) | ||
} | ||
Queue.prototype.ack = function(ack, callback) { | ||
var self = this | ||
var query = { | ||
ack : ack, | ||
visible : { $gt : date() }, | ||
} | ||
var update = { | ||
$set : { | ||
deleted : true, | ||
} | ||
} | ||
self.msgs.findAndModify(query, undefined, update, { new : true }, function(err, msg, blah) { | ||
self.col.findAndModify(query, undefined, update, { new : true }, function(err, msg, blah) { | ||
if (err) return callback(err) | ||
if ( !msg ) { | ||
return callback(new Error("Unidentified id/ack pair : " + id + '/' + ack)) | ||
return callback(new Error("Queue.ack(): Unidentified ack : " + ack)) | ||
} | ||
callback(null, msg) | ||
callback() | ||
}) | ||
@@ -126,0 +138,0 @@ } |
{ | ||
"name": "mongodb-queue", | ||
"version": "0.3.1", | ||
"version": "0.4.0", | ||
"description": "Message queues which uses MongoDB.", | ||
@@ -5,0 +5,0 @@ "main": "mongodb-queue.js", |
@@ -31,10 +31,19 @@ # mongodb-queue # | ||
console.log('msg.ack=' + msg.ack) | ||
// msg.payload is 'Hello, World!' | ||
console.log('msg.payload=' + msg.payload) // 'Hello, World!' | ||
console.log('msg.tries=' + msg.tries) | ||
}) | ||
``` | ||
Ping a message to keep it's visibility open for long-running tasks | ||
```js | ||
queue.ping(msg.ack, function(err) { | ||
// visibility window has now been increased | ||
}) | ||
``` | ||
Ack a message (and remove it from the queue): | ||
```js | ||
queue.ack(msg.id, msg.ack, function(err) { | ||
queue.ack(msg.ack, function(err) { | ||
// msg removed from queue | ||
@@ -44,35 +53,35 @@ }) | ||
## Options ## | ||
## Creating a Queue ## | ||
### collectionName ### | ||
To create a queue, just call `new` (and if you forget to do that, we'll do it for you) | ||
and pass it the MongoClient, a name and an optional set of options. These are equivalent: | ||
Default: `'msgs'` | ||
``` | ||
var q1 = new Queue(db, 'a-queue') | ||
var q2 = Queue(db, 'a-queue') | ||
``` | ||
This is the name of the MongoDB Collection you wish to use to store the messages. | ||
By default we only use this one MongoDB Collection, unless you specify an | ||
alternate one. | ||
Note: but don't use the same queue name twice with different options, otherwise things might get confusing. | ||
e.g. both of these queues use the same `'msgs'` collection by default: | ||
To pass options, try this: | ||
``` | ||
var resizeQueue = Queue(db, 'resize-image') | ||
var uploadQueue = Queue(db, 'upload-image') | ||
var imageResizeQueue = Quene(db, 'resize-queue', { visibility : 30, delay : 15 }) | ||
``` | ||
e.g. both of these queue use the MongoDB Collection named `'app'`: | ||
## Options ## | ||
``` | ||
var resizeQueue = Queue(db, 'resize-image', { collectionName : 'app' }) | ||
var uploadQueue = Queue(db, 'upload-image', { collectionName : 'app' }) | ||
``` | ||
### name ### | ||
e.g. these two queue use different MongoDB Collections, `'msgs'` and `'app'` respectively: | ||
This is the name of the MongoDB Collection you wish to use to store the messages. | ||
Each queue you create will be it's own collection. | ||
e.g. | ||
``` | ||
var resizeQueue = Queue(db, 'resize-image') | ||
var uploadQueue = Queue(db, 'upload-image', { collectionName : 'app' }) | ||
var notifyQueue = Queue(db, 'notify-owner') | ||
``` | ||
Using the default MongoDB Collection for all of your queues shouldn't cause a problem | ||
but you may wish to use a different collection per queue if you have a high throughput. | ||
This will create two collections in MongoDB called `resize-image` and `notify-owner`. | ||
@@ -131,2 +140,18 @@ ### Message Visibility Window ### | ||
### 0.4.0 (2014-03-20) ### | ||
* [NEW] Ability to ping retrieved messages a. la. 'still alive' and 'extend visibility' | ||
* [CHANGE] Removed ability to have different queues in the same collection | ||
* [CHANGE] All queues are now stored in their own collection | ||
* [CHANGE] When acking a message, only need ack (no longer need id) | ||
* [TEST] Added test for pinged messages | ||
* [DOC] Update to specify each queue will create it's own MongoDB collection | ||
* [DOC] Added docs for option `delay` | ||
* [DOC] Added synopsis for Queue.ping() | ||
* [DOC] Removed use of msg.id when calling Queue.ack() | ||
### 0.3.1 (2014-03-19) ### | ||
* [DOC] Added documentation for the `delay` option | ||
### 0.3.0 (2014-03-19) ### | ||
@@ -133,0 +158,0 @@ |
@@ -31,7 +31,7 @@ var async = require('async') | ||
function(next) { | ||
// get something now and it shouldn't be there | ||
// get something now and it SHOULD be there | ||
queue.get(function(err, msg) { | ||
t.ok(!err, 'No error when getting a message') | ||
t.ok(msg.id, 'Got a message id now that the message delay has passed') | ||
queue.ack(msg.id, msg.ack, next) | ||
queue.ack(msg.ack, next) | ||
}) | ||
@@ -38,0 +38,0 @@ }, |
@@ -49,3 +49,3 @@ var async = require('async') | ||
msgs.forEach(function(msg) { | ||
queue.ack(msg.id, msg.ack, function(err) { | ||
queue.ack(msg.ack, function(err) { | ||
if (err) return t.fail('Failed acking a message') | ||
@@ -52,0 +52,0 @@ acked++ |
@@ -10,3 +10,3 @@ var mongodb = require('mongodb') | ||
// let's empty out some collections to make sure there are no messages | ||
var collections = ['msgs'] | ||
var collections = ['default', 'delay', 'multi', 'visibility', 'ping'] | ||
collections.forEach(function(col) { | ||
@@ -13,0 +13,0 @@ db.collection(col).remove(function() { |
@@ -31,3 +31,3 @@ var async = require('async') | ||
t.ok(msg.id, 'Got a msg.id (sanity check)') | ||
queue.ack(msg.id, msg.ack, function(err) { | ||
queue.ack(msg.ack, function(err) { | ||
t.ok(!err, 'No error when acking the message') | ||
@@ -80,3 +80,3 @@ next() | ||
// now ack the message but too late - it shouldn't be deleted | ||
queue.ack(msg.id, msg.ack, function(err, msg) { | ||
queue.ack(msg.ack, function(err, msg) { | ||
t.ok(err, 'Got an error when acking the message late') | ||
@@ -96,3 +96,3 @@ t.ok(!msg, 'No message was updated') | ||
// now ack this new retrieval | ||
queue.ack(msg.id, msg.ack, next) | ||
queue.ack(msg.ack, next) | ||
}) | ||
@@ -99,0 +99,0 @@ }, |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
23328
11
467
191