sqs-consumer
Advanced tools
Comparing version 1.1.1 to 2.0.0
61
index.js
var EventEmitter = require('events').EventEmitter; | ||
var util = require('util'); | ||
var _ = require('lodash'); | ||
var async = require('async'); | ||
var AWS = require('aws-sdk'); | ||
@@ -18,2 +19,6 @@ var debug = require('debug')('sqs-consumer'); | ||
}); | ||
if (options.batchSize > 10 || options.batchSize < 1) { | ||
throw new Error('SQS batchSize option must be between 1 and 10.'); | ||
} | ||
} | ||
@@ -27,3 +32,3 @@ | ||
* @param {function} options.handleMessage | ||
* @param {number} options.waitTime | ||
* @param {number} options.batchSize | ||
* @param {object} options.sqs | ||
@@ -37,6 +42,6 @@ */ | ||
this.stopped = true; | ||
this.batchSize = options.batchSize || 1; | ||
this.sqs = options.sqs || new AWS.SQS({ | ||
region: options.region | ||
}); | ||
this.poll = _.throttle(this._poll.bind(this), options.waitTime || 100); | ||
} | ||
@@ -53,3 +58,3 @@ | ||
this.stopped = false; | ||
this.poll(); | ||
this._poll(); | ||
} | ||
@@ -69,3 +74,3 @@ }; | ||
QueueUrl: this.queueUrl, | ||
MaxNumberOfMessages: 1, | ||
MaxNumberOfMessages: this.batchSize, | ||
WaitTimeSeconds: 20 | ||
@@ -83,27 +88,41 @@ }; | ||
var consumer = this; | ||
debug('Received SQS response'); | ||
debug(response); | ||
if (response && response.Messages && response.Messages.length > 0) { | ||
var message = response.Messages[0]; | ||
this.emit('message_received', message); | ||
this._handleSqsMessage(message); | ||
async.each(response.Messages, this._processMessage.bind(this), function () { | ||
// start polling again once all of the messages have been processed | ||
consumer._poll(); | ||
}); | ||
} else { | ||
// there were no messages, so start polling again | ||
this._poll(); | ||
} | ||
// Poll for another message | ||
this.poll(); | ||
}; | ||
Consumer.prototype._handleSqsMessage = function (message) { | ||
Consumer.prototype._processMessage = function (message, cb) { | ||
var consumer = this; | ||
this.handleMessage(message, function (err) { | ||
if (err) return consumer.emit('error', err); | ||
this.emit('message_received', message); | ||
async.series([ | ||
function handleMessage(done) { | ||
consumer.handleMessage(message, done); | ||
}, | ||
function deleteMessage(done) { | ||
consumer._deleteMessage(message, done); | ||
} | ||
], function (err) { | ||
if (err) { | ||
consumer.emit('error', err); | ||
} else { | ||
consumer.emit('message_processed', message); | ||
} | ||
consumer._deleteMessage(message); | ||
cb(); | ||
}); | ||
}; | ||
Consumer.prototype._deleteMessage = function (message) { | ||
var consumer = this; | ||
Consumer.prototype._deleteMessage = function (message, cb) { | ||
var deleteParams = { | ||
@@ -115,9 +134,5 @@ QueueUrl: this.queueUrl, | ||
debug('Deleting message %s', message.MessageId); | ||
this.sqs.deleteMessage(deleteParams, function (err) { | ||
if (err) return consumer.emit('error', err); | ||
consumer.emit('message_processed', message); | ||
}); | ||
this.sqs.deleteMessage(deleteParams, cb); | ||
}; | ||
module.exports = Consumer; | ||
module.exports = Consumer; |
{ | ||
"name": "sqs-consumer", | ||
"version": "1.1.1", | ||
"version": "2.0.0", | ||
"description": "Build SQS-based Node applications without the boilerplate", | ||
@@ -29,2 +29,3 @@ "main": "index.js", | ||
"dependencies": { | ||
"async": "^0.9.0", | ||
"aws-sdk": "^2.0.23", | ||
@@ -31,0 +32,0 @@ "debug": "^2.1.0", |
@@ -38,2 +38,3 @@ # sqs-consumer | ||
* Calling `done(err)` with an error object will cause the message to be left on the queue. An [SQS redrive policy](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/SQSDeadLetterQueue.html) can be used to move messages that cannot be processed to a dead letter queue. | ||
* By default messages are processed one at a time – a new message won't be received until the first one has been processed. To process messages in parallel, use the `batchSize` option [detailed below](#options). | ||
@@ -51,3 +52,3 @@ ## API | ||
* `handleMessage` - _Function_ - A function to be called whenever a message is receieved. Receives an SQS message object as its first argument and a function to call when the message has been handled as its second argument (i.e. `handleMessage(message, done)`). | ||
* `waitTime` - _Number_ - An optional time in milliseconds to wait after recieving a message before requesting another one. This enables you to throttle the rate at which messages will be received. (default `100`); | ||
* `batchSize` - _Number_ - The number of messages to request from SQS when polling (default `1`). This cannot be higher than the AWS limit of 10. | ||
* `sqs` - _Object_ - An optional [AWS SQS](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html) object to use if you need to configure the client manually | ||
@@ -54,0 +55,0 @@ |
@@ -18,7 +18,7 @@ var Consumer = require('..'); | ||
beforeEach(function () { | ||
handleMessage = sinon.stub(); | ||
handleMessage = sinon.stub().yieldsAsync(null); | ||
sqs = sinon.mock(); | ||
sqs.receiveMessage = sinon.stub().yields(null, response); | ||
sqs.receiveMessage = sinon.stub().yieldsAsync(null, response); | ||
sqs.receiveMessage.onSecondCall().returns(); | ||
sqs.deleteMessage = sinon.stub().yields(null); | ||
sqs.deleteMessage = sinon.stub().yieldsAsync(null); | ||
consumer = new Consumer({ | ||
@@ -28,3 +28,2 @@ queueUrl: 'some-queue-url', | ||
handleMessage: handleMessage, | ||
waitTime: 10, | ||
sqs: sqs | ||
@@ -61,43 +60,25 @@ }); | ||
describe('.start', function () { | ||
it('calls the handleMessage function when a message is received', function () { | ||
consumer.start(); | ||
sinon.assert.calledWith(handleMessage, response.Messages[0]); | ||
}); | ||
it('deletes the message when the handleMessage callback is called', function () { | ||
handleMessage.yields(null); | ||
consumer.start(); | ||
sinon.assert.calledWith(sqs.deleteMessage, { | ||
QueueUrl: 'some-queue-url', | ||
ReceiptHandle: 'receipt-handle' | ||
it('requires the batchSize option to be no greater than 10', function () { | ||
assert.throws(function () { | ||
new Consumer({ | ||
region: 'some-region', | ||
queueUrl: 'some-queue-url', | ||
handleMessage: handleMessage, | ||
batchSize: 11 | ||
}); | ||
}); | ||
}); | ||
it('doesn\'t delete the message when a processing error is reported', function () { | ||
handleMessage.yields(new Error('Processing error')); | ||
consumer.on('error', function () { | ||
// ignore the error | ||
it('requires the batchSize option to be greater than 0', function () { | ||
assert.throws(function () { | ||
new Consumer({ | ||
region: 'some-region', | ||
queueUrl: 'some-queue-url', | ||
handleMessage: handleMessage, | ||
batchSize: -1 | ||
}); | ||
consumer.start(); | ||
sinon.assert.notCalled(sqs.deleteMessage); | ||
}); | ||
}); | ||
it('waits before consuming new messages', function (done) { | ||
sqs.receiveMessage.onSecondCall().yields(null, response); | ||
consumer.start(); | ||
setTimeout(function () { | ||
sinon.assert.calledTwice(handleMessage); | ||
done(); | ||
}, consumer.waitTime + 1); | ||
}); | ||
describe('.start', function () { | ||
it('fires an error event when an error occurs receiving a message', function (done) { | ||
@@ -163,9 +144,100 @@ var receiveErr = new Error('Receive error'); | ||
it('doesn\'t consumer more messages when called multiple times', function () { | ||
it('calls the handleMessage function when a message is received', function (done) { | ||
consumer.start(); | ||
consumer.on('message_processed', function () { | ||
sinon.assert.calledWith(handleMessage, response.Messages[0]); | ||
done(); | ||
}); | ||
}); | ||
it('deletes the message when the handleMessage callback is called', function (done) { | ||
handleMessage.yields(null); | ||
consumer.start(); | ||
consumer.on('message_processed', function () { | ||
sinon.assert.calledWith(sqs.deleteMessage, { | ||
QueueUrl: 'some-queue-url', | ||
ReceiptHandle: 'receipt-handle' | ||
}); | ||
done(); | ||
}); | ||
}); | ||
it('doesn\'t delete the message when a processing error is reported', function () { | ||
handleMessage.yields(new Error('Processing error')); | ||
consumer.on('error', function () { | ||
// ignore the error | ||
}); | ||
consumer.start(); | ||
sinon.assert.notCalled(sqs.deleteMessage); | ||
}); | ||
it('consumes another message once one is processed', function (done) { | ||
sqs.receiveMessage.onSecondCall().yields(null, response); | ||
sqs.receiveMessage.onThirdCall().returns(); | ||
consumer.start(); | ||
setTimeout(function () { | ||
sinon.assert.calledTwice(handleMessage); | ||
done(); | ||
}, 10); | ||
}); | ||
it('doesn\'t consume more messages when called multiple times', function () { | ||
sqs.receiveMessage = sinon.stub().returns(); | ||
consumer.start(); | ||
consumer.start(); | ||
consumer.start(); | ||
consumer.start(); | ||
consumer.start(); | ||
sinon.assert.calledOnce(sqs.receiveMessage); | ||
}); | ||
it('consumes multiple messages when the batchSize is greater than 1', function (done) { | ||
sqs.receiveMessage.yieldsAsync(null, { | ||
Messages: [ | ||
{ | ||
ReceiptHandle: 'receipt-handle-1', | ||
MessageId: '1', | ||
Body: 'body-1' | ||
}, | ||
{ | ||
ReceiptHandle: 'receipt-handle-2', | ||
MessageId: '2', | ||
Body: 'body-2' | ||
}, | ||
{ | ||
ReceiptHandle: 'receipt-handle-3', | ||
MessageId: '3', | ||
Body: 'body-3' | ||
} | ||
] | ||
}); | ||
consumer = new Consumer({ | ||
queueUrl: 'some-queue-url', | ||
region: 'some-region', | ||
handleMessage: handleMessage, | ||
batchSize: 3, | ||
sqs: sqs | ||
}); | ||
consumer.start(); | ||
setTimeout(function () { | ||
sinon.assert.calledWith(sqs.receiveMessage, { | ||
QueueUrl: 'some-queue-url', | ||
MaxNumberOfMessages: 3, | ||
WaitTimeSeconds: 20 | ||
}); | ||
sinon.assert.callCount(handleMessage, 3); | ||
done(); | ||
}, 10); | ||
}); | ||
}); | ||
@@ -175,3 +247,4 @@ | ||
it('stops the consumer polling for messages', function (done) { | ||
sqs.receiveMessage.onSecondCall().yields(null, response); | ||
sqs.receiveMessage.onSecondCall().yieldsAsync(null, response); | ||
sqs.receiveMessage.onThirdCall().returns(); | ||
@@ -184,5 +257,5 @@ consumer.start(); | ||
done(); | ||
}, consumer.waitTime + 1); | ||
}, 10); | ||
}); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
14386
322
70
4
1
+ Addedasync@^0.9.0
+ Addedasync@0.9.2(transitive)