Socket
Socket
Sign inDemoInstall

sqs-consumer

Package Overview
Dependencies
Maintainers
3
Versions
101
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

sqs-consumer - npm Package Compare versions

Comparing version 1.1.1 to 2.0.0

61

index.js
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var _ = require('lodash');
var async = require('async');
var AWS = require('aws-sdk');

@@ -18,2 +19,6 @@ var debug = require('debug')('sqs-consumer');

});
if (options.batchSize > 10 || options.batchSize < 1) {
throw new Error('SQS batchSize option must be between 1 and 10.');
}
}

@@ -27,3 +32,3 @@

* @param {function} options.handleMessage
* @param {number} options.waitTime
* @param {number} options.batchSize
* @param {object} options.sqs

@@ -37,6 +42,6 @@ */

this.stopped = true;
this.batchSize = options.batchSize || 1;
this.sqs = options.sqs || new AWS.SQS({
region: options.region
});
this.poll = _.throttle(this._poll.bind(this), options.waitTime || 100);
}

@@ -53,3 +58,3 @@

this.stopped = false;
this.poll();
this._poll();
}

@@ -69,3 +74,3 @@ };

QueueUrl: this.queueUrl,
MaxNumberOfMessages: 1,
MaxNumberOfMessages: this.batchSize,
WaitTimeSeconds: 20

@@ -83,27 +88,41 @@ };

var consumer = this;
debug('Received SQS response');
debug(response);
if (response && response.Messages && response.Messages.length > 0) {
var message = response.Messages[0];
this.emit('message_received', message);
this._handleSqsMessage(message);
async.each(response.Messages, this._processMessage.bind(this), function () {
// start polling again once all of the messages have been processed
consumer._poll();
});
} else {
// there were no messages, so start polling again
this._poll();
}
// Poll for another message
this.poll();
};
Consumer.prototype._handleSqsMessage = function (message) {
Consumer.prototype._processMessage = function (message, cb) {
var consumer = this;
this.handleMessage(message, function (err) {
if (err) return consumer.emit('error', err);
this.emit('message_received', message);
async.series([
function handleMessage(done) {
consumer.handleMessage(message, done);
},
function deleteMessage(done) {
consumer._deleteMessage(message, done);
}
], function (err) {
if (err) {
consumer.emit('error', err);
} else {
consumer.emit('message_processed', message);
}
consumer._deleteMessage(message);
cb();
});
};
Consumer.prototype._deleteMessage = function (message) {
var consumer = this;
Consumer.prototype._deleteMessage = function (message, cb) {
var deleteParams = {

@@ -115,9 +134,5 @@ QueueUrl: this.queueUrl,

debug('Deleting message %s', message.MessageId);
this.sqs.deleteMessage(deleteParams, function (err) {
if (err) return consumer.emit('error', err);
consumer.emit('message_processed', message);
});
this.sqs.deleteMessage(deleteParams, cb);
};
module.exports = Consumer;
module.exports = Consumer;
{
"name": "sqs-consumer",
"version": "1.1.1",
"version": "2.0.0",
"description": "Build SQS-based Node applications without the boilerplate",

@@ -29,2 +29,3 @@ "main": "index.js",

"dependencies": {
"async": "^0.9.0",
"aws-sdk": "^2.0.23",

@@ -31,0 +32,0 @@ "debug": "^2.1.0",

@@ -38,2 +38,3 @@ # sqs-consumer

* Calling `done(err)` with an error object will cause the message to be left on the queue. An [SQS redrive policy](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/SQSDeadLetterQueue.html) can be used to move messages that cannot be processed to a dead letter queue.
* By default messages are processed one at a time – a new message won't be received until the first one has been processed. To process messages in parallel, use the `batchSize` option [detailed below](#options).

@@ -51,3 +52,3 @@ ## API

* `handleMessage` - _Function_ - A function to be called whenever a message is receieved. Receives an SQS message object as its first argument and a function to call when the message has been handled as its second argument (i.e. `handleMessage(message, done)`).
* `waitTime` - _Number_ - An optional time in milliseconds to wait after recieving a message before requesting another one. This enables you to throttle the rate at which messages will be received. (default `100`);
* `batchSize` - _Number_ - The number of messages to request from SQS when polling (default `1`). This cannot be higher than the AWS limit of 10.
* `sqs` - _Object_ - An optional [AWS SQS](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html) object to use if you need to configure the client manually

@@ -54,0 +55,0 @@

@@ -18,7 +18,7 @@ var Consumer = require('..');

beforeEach(function () {
handleMessage = sinon.stub();
handleMessage = sinon.stub().yieldsAsync(null);
sqs = sinon.mock();
sqs.receiveMessage = sinon.stub().yields(null, response);
sqs.receiveMessage = sinon.stub().yieldsAsync(null, response);
sqs.receiveMessage.onSecondCall().returns();
sqs.deleteMessage = sinon.stub().yields(null);
sqs.deleteMessage = sinon.stub().yieldsAsync(null);
consumer = new Consumer({

@@ -28,3 +28,2 @@ queueUrl: 'some-queue-url',

handleMessage: handleMessage,
waitTime: 10,
sqs: sqs

@@ -61,43 +60,25 @@ });

describe('.start', function () {
it('calls the handleMessage function when a message is received', function () {
consumer.start();
sinon.assert.calledWith(handleMessage, response.Messages[0]);
});
it('deletes the message when the handleMessage callback is called', function () {
handleMessage.yields(null);
consumer.start();
sinon.assert.calledWith(sqs.deleteMessage, {
QueueUrl: 'some-queue-url',
ReceiptHandle: 'receipt-handle'
it('requires the batchSize option to be no greater than 10', function () {
assert.throws(function () {
new Consumer({
region: 'some-region',
queueUrl: 'some-queue-url',
handleMessage: handleMessage,
batchSize: 11
});
});
});
it('doesn\'t delete the message when a processing error is reported', function () {
handleMessage.yields(new Error('Processing error'));
consumer.on('error', function () {
// ignore the error
it('requires the batchSize option to be greater than 0', function () {
assert.throws(function () {
new Consumer({
region: 'some-region',
queueUrl: 'some-queue-url',
handleMessage: handleMessage,
batchSize: -1
});
consumer.start();
sinon.assert.notCalled(sqs.deleteMessage);
});
});
it('waits before consuming new messages', function (done) {
sqs.receiveMessage.onSecondCall().yields(null, response);
consumer.start();
setTimeout(function () {
sinon.assert.calledTwice(handleMessage);
done();
}, consumer.waitTime + 1);
});
describe('.start', function () {
it('fires an error event when an error occurs receiving a message', function (done) {

@@ -163,9 +144,100 @@ var receiveErr = new Error('Receive error');

it('doesn\'t consumer more messages when called multiple times', function () {
it('calls the handleMessage function when a message is received', function (done) {
consumer.start();
consumer.on('message_processed', function () {
sinon.assert.calledWith(handleMessage, response.Messages[0]);
done();
});
});
it('deletes the message when the handleMessage callback is called', function (done) {
handleMessage.yields(null);
consumer.start();
consumer.on('message_processed', function () {
sinon.assert.calledWith(sqs.deleteMessage, {
QueueUrl: 'some-queue-url',
ReceiptHandle: 'receipt-handle'
});
done();
});
});
it('doesn\'t delete the message when a processing error is reported', function () {
handleMessage.yields(new Error('Processing error'));
consumer.on('error', function () {
// ignore the error
});
consumer.start();
sinon.assert.notCalled(sqs.deleteMessage);
});
it('consumes another message once one is processed', function (done) {
sqs.receiveMessage.onSecondCall().yields(null, response);
sqs.receiveMessage.onThirdCall().returns();
consumer.start();
setTimeout(function () {
sinon.assert.calledTwice(handleMessage);
done();
}, 10);
});
it('doesn\'t consume more messages when called multiple times', function () {
sqs.receiveMessage = sinon.stub().returns();
consumer.start();
consumer.start();
consumer.start();
consumer.start();
consumer.start();
sinon.assert.calledOnce(sqs.receiveMessage);
});
it('consumes multiple messages when the batchSize is greater than 1', function (done) {
sqs.receiveMessage.yieldsAsync(null, {
Messages: [
{
ReceiptHandle: 'receipt-handle-1',
MessageId: '1',
Body: 'body-1'
},
{
ReceiptHandle: 'receipt-handle-2',
MessageId: '2',
Body: 'body-2'
},
{
ReceiptHandle: 'receipt-handle-3',
MessageId: '3',
Body: 'body-3'
}
]
});
consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
handleMessage: handleMessage,
batchSize: 3,
sqs: sqs
});
consumer.start();
setTimeout(function () {
sinon.assert.calledWith(sqs.receiveMessage, {
QueueUrl: 'some-queue-url',
MaxNumberOfMessages: 3,
WaitTimeSeconds: 20
});
sinon.assert.callCount(handleMessage, 3);
done();
}, 10);
});
});

@@ -175,3 +247,4 @@

it('stops the consumer polling for messages', function (done) {
sqs.receiveMessage.onSecondCall().yields(null, response);
sqs.receiveMessage.onSecondCall().yieldsAsync(null, response);
sqs.receiveMessage.onThirdCall().returns();

@@ -184,5 +257,5 @@ consumer.start();

done();
}, consumer.waitTime + 1);
}, 10);
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc