Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mongodb-queue

Package Overview
Dependencies
Maintainers
1
Versions
19
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongodb-queue - npm Package Compare versions

Comparing version 0.3.1 to 0.4.0

test/default.js

60

mongodb-queue.js

@@ -28,18 +28,14 @@ /**

// the Queue object itself
function Queue(mongoDbClient, queueName, opts) {
function Queue(mongoDbClient, name, opts) {
if (!(this instanceof Queue)) {
return new Queue(mongoDbClient, queueName, opts)
return new Queue(mongoDbClient, name, opts)
}
if ( !queueName ) {
throw new Error("mongodb-queue: provide a queueName")
if ( !name ) {
throw new Error("mongodb-queue: provide a queue name")
}
opts = opts || {}
// set some defaults
opts.collectionName = opts.collectionName || 'msgs'
// this.db = mongoDbClient
this.msgs = mongoDbClient.collection(opts.collectionName)
this.queueName = queueName
this.name = name
this.col = mongoDbClient.collection(name)
this.visibility = opts.visibility || 30

@@ -56,13 +52,9 @@ this.delay = opts.delay || 0

}
var thisId = id()
var msg = {
queue : self.queueName,
id : thisId,
inserted : aDate,
visible : delayDate || aDate,
payload : payload,
}
self.msgs.insert(msg, function(err) {
self.col.insert(msg, function(err, results) {
if (err) return callback(err)
callback(null, thisId)
callback(null, '' + results[0]._id)
})

@@ -75,3 +67,2 @@ }

var query = {
queue : self.queueName,
visible : { $lt : date() },

@@ -91,7 +82,8 @@ deleted : { $exists : false },

self.msgs.findAndModify(query, sort, update, { new : true }, function(err, msg) {
self.col.findAndModify(query, sort, update, { new : true }, function(err, msg) {
if (err) return callback(err)
if (!msg) return callback()
callback(null, {
id : msg.id,
// convert '_id' to an 'id' string
id : '' + msg._id,
ack : msg.ack,

@@ -104,7 +96,6 @@ payload : msg.payload,

Queue.prototype.ack = function(id, ack, callback) {
Queue.prototype.ping = function(ack, callback) {
var self = this
var query = {
id : id,
ack : ack,

@@ -115,11 +106,32 @@ visible : { $gt : date() },

$set : {
visible : (new Date(Date.now() + self.visibility * 1000)).toISOString(),
}
}
self.col.findAndModify(query, undefined, update, { new : true }, function(err, msg, blah) {
if (err) return callback(err)
if ( !msg ) {
return callback(new Error("Queue.ping(): Unidentified ack : " + ack))
}
callback()
})
}
Queue.prototype.ack = function(ack, callback) {
var self = this
var query = {
ack : ack,
visible : { $gt : date() },
}
var update = {
$set : {
deleted : true,
}
}
self.msgs.findAndModify(query, undefined, update, { new : true }, function(err, msg, blah) {
self.col.findAndModify(query, undefined, update, { new : true }, function(err, msg, blah) {
if (err) return callback(err)
if ( !msg ) {
return callback(new Error("Unidentified id/ack pair : " + id + '/' + ack))
return callback(new Error("Queue.ack(): Unidentified ack : " + ack))
}
callback(null, msg)
callback()
})

@@ -126,0 +138,0 @@ }

2

package.json
{
"name": "mongodb-queue",
"version": "0.3.1",
"version": "0.4.0",
"description": "Message queues which uses MongoDB.",

@@ -5,0 +5,0 @@ "main": "mongodb-queue.js",

@@ -31,10 +31,19 @@ # mongodb-queue #

console.log('msg.ack=' + msg.ack)
// msg.payload is 'Hello, World!'
console.log('msg.payload=' + msg.payload) // 'Hello, World!'
console.log('msg.tries=' + msg.tries)
})
```
Ping a message to keep it's visibility open for long-running tasks
```js
queue.ping(msg.ack, function(err) {
// visibility window has now been increased
})
```
Ack a message (and remove it from the queue):
```js
queue.ack(msg.id, msg.ack, function(err) {
queue.ack(msg.ack, function(err) {
// msg removed from queue

@@ -44,35 +53,35 @@ })

## Options ##
## Creating a Queue ##
### collectionName ###
To create a queue, just call `new` (and if you forget to do that, we'll do it for you)
and pass it the MongoClient, a name and an optional set of options. These are equivalent:
Default: `'msgs'`
```
var q1 = new Queue(db, 'a-queue')
var q2 = Queue(db, 'a-queue')
```
This is the name of the MongoDB Collection you wish to use to store the messages.
By default we only use this one MongoDB Collection, unless you specify an
alternate one.
Note: but don't use the same queue name twice with different options, otherwise things might get confusing.
e.g. both of these queues use the same `'msgs'` collection by default:
To pass options, try this:
```
var resizeQueue = Queue(db, 'resize-image')
var uploadQueue = Queue(db, 'upload-image')
var imageResizeQueue = Quene(db, 'resize-queue', { visibility : 30, delay : 15 })
```
e.g. both of these queue use the MongoDB Collection named `'app'`:
## Options ##
```
var resizeQueue = Queue(db, 'resize-image', { collectionName : 'app' })
var uploadQueue = Queue(db, 'upload-image', { collectionName : 'app' })
```
### name ###
e.g. these two queue use different MongoDB Collections, `'msgs'` and `'app'` respectively:
This is the name of the MongoDB Collection you wish to use to store the messages.
Each queue you create will be it's own collection.
e.g.
```
var resizeQueue = Queue(db, 'resize-image')
var uploadQueue = Queue(db, 'upload-image', { collectionName : 'app' })
var notifyQueue = Queue(db, 'notify-owner')
```
Using the default MongoDB Collection for all of your queues shouldn't cause a problem
but you may wish to use a different collection per queue if you have a high throughput.
This will create two collections in MongoDB called `resize-image` and `notify-owner`.

@@ -131,2 +140,18 @@ ### Message Visibility Window ###

### 0.4.0 (2014-03-20) ###
* [NEW] Ability to ping retrieved messages a. la. 'still alive' and 'extend visibility'
* [CHANGE] Removed ability to have different queues in the same collection
* [CHANGE] All queues are now stored in their own collection
* [CHANGE] When acking a message, only need ack (no longer need id)
* [TEST] Added test for pinged messages
* [DOC] Update to specify each queue will create it's own MongoDB collection
* [DOC] Added docs for option `delay`
* [DOC] Added synopsis for Queue.ping()
* [DOC] Removed use of msg.id when calling Queue.ack()
### 0.3.1 (2014-03-19) ###
* [DOC] Added documentation for the `delay` option
### 0.3.0 (2014-03-19) ###

@@ -133,0 +158,0 @@

@@ -31,7 +31,7 @@ var async = require('async')

function(next) {
// get something now and it shouldn't be there
// get something now and it SHOULD be there
queue.get(function(err, msg) {
t.ok(!err, 'No error when getting a message')
t.ok(msg.id, 'Got a message id now that the message delay has passed')
queue.ack(msg.id, msg.ack, next)
queue.ack(msg.ack, next)
})

@@ -38,0 +38,0 @@ },

@@ -49,3 +49,3 @@ var async = require('async')

msgs.forEach(function(msg) {
queue.ack(msg.id, msg.ack, function(err) {
queue.ack(msg.ack, function(err) {
if (err) return t.fail('Failed acking a message')

@@ -52,0 +52,0 @@ acked++

@@ -10,3 +10,3 @@ var mongodb = require('mongodb')

// let's empty out some collections to make sure there are no messages
var collections = ['msgs']
var collections = ['default', 'delay', 'multi', 'visibility', 'ping']
collections.forEach(function(col) {

@@ -13,0 +13,0 @@ db.collection(col).remove(function() {

@@ -31,3 +31,3 @@ var async = require('async')

t.ok(msg.id, 'Got a msg.id (sanity check)')
queue.ack(msg.id, msg.ack, function(err) {
queue.ack(msg.ack, function(err) {
t.ok(!err, 'No error when acking the message')

@@ -80,3 +80,3 @@ next()

// now ack the message but too late - it shouldn't be deleted
queue.ack(msg.id, msg.ack, function(err, msg) {
queue.ack(msg.ack, function(err, msg) {
t.ok(err, 'Got an error when acking the message late')

@@ -96,3 +96,3 @@ t.ok(!msg, 'No message was updated')

// now ack this new retrieval
queue.ack(msg.id, msg.ack, next)
queue.ack(msg.ack, next)
})

@@ -99,0 +99,0 @@ },

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc