Comparing version 0.2.1 to 0.3.1
153
lib/sqs.js
@@ -7,4 +7,6 @@ | ||
var Emitter = require('event-emitter'); | ||
var SQS = require('aws-sqs'); | ||
//var SQS = require('aws-sqs'); | ||
var AWS = require('aws-sdk'); | ||
var debug = require('debug')('queues:sqs'); | ||
var Backoff = require('./backoff'); | ||
@@ -28,3 +30,8 @@ /** | ||
this.client = new SQS(id, secret); | ||
this.client = new AWS.SQS({ | ||
accessKeyId: id, | ||
secretAccessKey: secret, | ||
region: 'us-east-1' | ||
}); | ||
this.queues = {}; | ||
@@ -80,4 +87,5 @@ } | ||
this.client.createQueue(this.name, {}, | ||
function(err, res) { | ||
this.client.createQueue({ | ||
QueueName: this.name | ||
}, function(err, res) { | ||
if (err) { | ||
@@ -88,6 +96,8 @@ debug('Failed to initialize queue', err); | ||
var id = res.QueueUrl; | ||
// initialize queue | ||
self.init(res); | ||
self.init(id); | ||
debug('connected, waiting for messages..'); | ||
debug('connected to queue %s', id); | ||
@@ -100,2 +110,23 @@ self.emit('connected'); | ||
/** | ||
* Define backoff strategy | ||
* | ||
* @param {String} type - linear, exponential. | ||
* @param {Integer} max - max seconds backoff. | ||
*/ | ||
Queue.prototype.idleBackoff = function(type, max) { | ||
var backoff; | ||
if (type === 'linear') { | ||
backoff = new Backoff.Linear(max); | ||
} else { | ||
throw new Error('invalid idle backoff strategy'); | ||
} | ||
this._idleBackoff = backoff; | ||
return this; | ||
}; | ||
/** | ||
* Pull messages from the queue | ||
@@ -107,6 +138,9 @@ * | ||
Queue.prototype.pull = function() { | ||
this.client.receiveMessage(this.id, { | ||
maxNumberOfMessages: 5 | ||
debug('polling queue'); | ||
this.client.receiveMessage({ | ||
QueueUrl: this.id, | ||
MaxNumberOfMessages: 10, | ||
WaitTimeSeconds: 20 | ||
}, this.onMessage.bind(this)); | ||
}; | ||
@@ -154,4 +188,10 @@ | ||
if (body) { | ||
body.forEach(function(msg){ | ||
debug('received messages %j', body); | ||
var messages = body.Messages || []; | ||
var count = messages.length; | ||
if (messages) { | ||
debug('received %s messages', count); | ||
messages.forEach(function(msg){ | ||
self.emit('message', msg); | ||
@@ -161,3 +201,15 @@ }); | ||
this.pull(); | ||
if (this._idleBackoff) { | ||
if (!count) { | ||
var secs = this._idleBackoff.next(); | ||
debug('idle, backing-off %s seconds', secs); | ||
setTimeout(this.pull.bind(this), secs*1000); | ||
} else { | ||
this._idleBackoff.reset(); | ||
this.pull(); | ||
} | ||
} else { | ||
this.pull(); | ||
} | ||
}; | ||
@@ -172,7 +224,16 @@ | ||
Queue.prototype.post = function(msg, fn) { | ||
debug('posting message'); | ||
var params = { | ||
QueueUrl: this.id, | ||
MessageBody: JSON.stringify(msg) | ||
}; | ||
this.client.sendMessage(this.id, JSON.stringify(msg), null, | ||
function(err, res) { | ||
debug('message posted'); | ||
debug('posting message %j', params, this.id); | ||
this.client.sendMessage(params, function(err, res) { | ||
if (err) { | ||
debug('error posting message %j', err); | ||
} else { | ||
debug('message posted ok'); | ||
} | ||
fn && fn(err, res); | ||
@@ -192,3 +253,6 @@ } | ||
this.client.deleteMessage(this.id, receipt, function(err, body) { | ||
this.client.deleteMessage({ | ||
QueueUrl: this.id, | ||
ReceiptHandle: receipt | ||
}, function(err, body) { | ||
if (err) | ||
@@ -201,26 +265,33 @@ debug('could not delete "%s"', receipt, err); | ||
/** | ||
// MAIN | ||
//var creds = { | ||
// id: 'XXX', | ||
// secret: 'YYY' | ||
//}; | ||
// | ||
//var provider = new Provider(creds.id, creds.secret); | ||
//var queue = provider.get('dev-notifications'); | ||
// | ||
//queue.on('connected', function() { | ||
// queue.post('hello!', function(err, res) { | ||
// if (err) | ||
// debug('error posting callback %j, %j', err, res); | ||
// else | ||
// debug('posted message with id', res.MessageId); | ||
// }); | ||
//}); | ||
// | ||
//queue.on('message', function(msg) { | ||
// debug('message received', msg.Body); | ||
// queue.remove(msg); | ||
//}); | ||
// | ||
//queue.connect(); | ||
// | ||
var creds = { | ||
"id": "XXX", | ||
"secret": "XXX" | ||
}; | ||
var provider = new Provider(creds.id, creds.secret); | ||
var queue = provider.get('dev-notifications'); | ||
queue.on('connected', function() { | ||
for (var i=0; i<10; i++) { | ||
queue.post('hello!', function(err, res) { | ||
if (err) | ||
debug('error posting callback %j, %j', err, res); | ||
else | ||
debug('posted message with id', res.MessageId); | ||
}); | ||
} | ||
queue.start(); | ||
}); | ||
queue.idleBackoff('linear', 5); | ||
queue.on('message', function(msg) { | ||
debug('message received', msg.Body); | ||
queue.remove(msg); | ||
}); | ||
queue.connect(); | ||
**/ |
{ | ||
"name": "queues", | ||
"version": "0.2.1", | ||
"version": "0.3.1", | ||
"description": "Collection of queue wrappers with a common interface.", | ||
@@ -26,4 +26,5 @@ "main": "index.js", | ||
"event-emitter": "^0.3.1", | ||
"aws-sqs": "0.0.6" | ||
"aws-sqs": "0.0.6", | ||
"aws-sdk": "^2.0.11" | ||
} | ||
} |
@@ -15,2 +15,5 @@ queues | ||
// set a backoff strategy | ||
q.idleBackoff('linear', 5); | ||
q.connect(); | ||
@@ -17,0 +20,0 @@ |
10071
9
365
26
5
+ Addedaws-sdk@^2.0.11
+ Addedavailable-typed-arrays@1.0.7(transitive)
+ Addedaws-sdk@2.1692.0(transitive)
+ Addedbase64-js@1.5.1(transitive)
+ Addedbuffer@4.9.2(transitive)
+ Addedcall-bind@1.0.8(transitive)
+ Addedcall-bind-apply-helpers@1.0.2(transitive)
+ Addedcall-bound@1.0.3(transitive)
+ Addeddefine-data-property@1.1.4(transitive)
+ Addeddunder-proto@1.0.1(transitive)
+ Addedes-define-property@1.0.1(transitive)
+ Addedes-errors@1.3.0(transitive)
+ Addedes-object-atoms@1.1.1(transitive)
+ Addedevents@1.1.1(transitive)
+ Addedfor-each@0.3.5(transitive)
+ Addedfunction-bind@1.1.2(transitive)
+ Addedget-intrinsic@1.3.0(transitive)
+ Addedget-proto@1.0.1(transitive)
+ Addedgopd@1.2.0(transitive)
+ Addedhas-property-descriptors@1.0.2(transitive)
+ Addedhas-symbols@1.1.0(transitive)
+ Addedhas-tostringtag@1.0.2(transitive)
+ Addedhasown@2.0.2(transitive)
+ Addedieee754@1.1.13(transitive)
+ Addedis-arguments@1.2.0(transitive)
+ Addedis-callable@1.2.7(transitive)
+ Addedis-generator-function@1.1.0(transitive)
+ Addedis-regex@1.2.1(transitive)
+ Addedis-typed-array@1.1.15(transitive)
+ Addedisarray@1.0.0(transitive)
+ Addedjmespath@0.16.0(transitive)
+ Addedmath-intrinsics@1.1.0(transitive)
+ Addedpossible-typed-array-names@1.1.0(transitive)
+ Addedpunycode@1.3.2(transitive)
+ Addedquerystring@0.2.0(transitive)
+ Addedsafe-regex-test@1.1.0(transitive)
+ Addedsax@1.2.1(transitive)
+ Addedset-function-length@1.2.2(transitive)
+ Addedurl@0.10.3(transitive)
+ Addedutil@0.12.5(transitive)
+ Addeduuid@8.0.0(transitive)
+ Addedwhich-typed-array@1.1.18(transitive)
+ Addedxml2js@0.6.2(transitive)
+ Addedxmlbuilder@11.0.1(transitive)