Socket
Socket
Sign inDemoInstall

sqs-consumer

Package Overview
Dependencies
39
Maintainers
2
Versions
90
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.0.6 to 1.1.0

25

index.js

@@ -35,2 +35,3 @@ var EventEmitter = require('events').EventEmitter;

this.waitTime = options.waitTime || 100;
this.stopped = true;
this.sqs = options.sqs || new AWS.SQS();

@@ -45,2 +46,18 @@ }

Consumer.prototype.start = function () {
if (this.stopped) {
debug('Starting consumer');
this.stopped = false;
this._poll();
}
};
/**
* Stop polling for messages.
*/
Consumer.prototype.stop = function () {
debug('Stopping consumer');
this.stopped = true;
};
Consumer.prototype._poll = function () {
var receiveParams = {

@@ -52,4 +69,6 @@ QueueUrl: this.queueUrl,

debug('Polling for messages');
this.sqs.receiveMessage(receiveParams, this._handleSqsResponse.bind(this));
if (!this.stopped) {
debug('Polling for messages');
this.sqs.receiveMessage(receiveParams, this._handleSqsResponse.bind(this));
}
};

@@ -70,3 +89,3 @@

// Start polling for a new message after the wait time.
setTimeout(this.start.bind(this), this.waitTime);
setTimeout(this._poll.bind(this), this.waitTime);
};

@@ -73,0 +92,0 @@

2

package.json
{
"name": "sqs-consumer",
"version": "1.0.6",
"version": "1.1.0",
"description": "Build SQS-based Node applications without the boilerplate",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -57,2 +57,6 @@ # sqs-consumer

### `consumer.stop()`
Stop polling the queue for messages.
### Events

@@ -59,0 +63,0 @@

@@ -59,101 +59,125 @@ var Consumer = require('..');

it('calls the handleMessage function when a message is received', function () {
consumer.start();
describe('.start', function () {
it('calls the handleMessage function when a message is received', function () {
consumer.start();
sinon.assert.calledWith(handleMessage, response.Messages[0]);
});
sinon.assert.calledWith(handleMessage, response.Messages[0]);
});
it('deletes the message when the handleMessage callback is called', function () {
handleMessage.yields(null);
it('deletes the message when the handleMessage callback is called', function () {
handleMessage.yields(null);
consumer.start();
consumer.start();
sinon.assert.calledWith(sqs.deleteMessage, {
QueueUrl: 'some-queue-url',
ReceiptHandle: 'receipt-handle'
sinon.assert.calledWith(sqs.deleteMessage, {
QueueUrl: 'some-queue-url',
ReceiptHandle: 'receipt-handle'
});
});
});
it('doesn\'t delete the message when a processing error is reported', function () {
handleMessage.yields(new Error('Processing error'));
it('doesn\'t delete the message when a processing error is reported', function () {
handleMessage.yields(new Error('Processing error'));
consumer.on('error', function () {
// ignore the error
consumer.on('error', function () {
// ignore the error
});
consumer.start();
sinon.assert.notCalled(sqs.deleteMessage);
});
consumer.start();
it('waits before consuming new messages', function (done) {
sqs.receiveMessage.onSecondCall().yields(null, response);
sinon.assert.notCalled(sqs.deleteMessage);
});
consumer.start();
it('waits before consuming new messages', function (done) {
sqs.receiveMessage.onSecondCall().yields(null, response);
setTimeout(function () {
sinon.assert.calledTwice(handleMessage);
done();
}, consumer.waitTime + 1);
});
consumer.start();
it('fires an error event when an error occurs receiving a message', function (done) {
var receiveErr = new Error('Receive error');
setTimeout(function () {
sinon.assert.calledTwice(handleMessage);
done();
}, consumer.waitTime + 1);
});
sqs.receiveMessage.yields(receiveErr);
it('fires an error event when an error occurs receiving a message', function (done) {
var receiveErr = new Error('Receive error');
consumer.on('error', function (err) {
assert.equal(err, receiveErr);
done();
});
sqs.receiveMessage.yields(receiveErr);
consumer.on('error', function (err) {
assert.equal(err, receiveErr);
done();
consumer.start();
});
consumer.start();
});
it('fires an error event when an error occurs deleting a message', function (done) {
var deleteErr = new Error('Delete error');
it('fires an error event when an error occurs deleting a message', function (done) {
var deleteErr = new Error('Delete error');
handleMessage.yields(null);
sqs.deleteMessage.yields(deleteErr);
handleMessage.yields(null);
sqs.deleteMessage.yields(deleteErr);
consumer.on('error', function (err) {
assert.equal(err, deleteErr);
done();
});
consumer.on('error', function (err) {
assert.equal(err, deleteErr);
done();
consumer.start();
});
consumer.start();
});
it('fires an error event when an error occurs processing a message', function (done) {
var processingErr = new Error('Processing error');
it('fires an error event when an error occurs processing a message', function (done) {
var processingErr = new Error('Processing error');
handleMessage.yields(processingErr);
handleMessage.yields(processingErr);
consumer.on('error', function (err) {
assert.equal(err, processingErr);
done();
});
consumer.on('error', function (err) {
assert.equal(err, processingErr);
done();
consumer.start();
});
consumer.start();
});
it('fires a message_received event when a message is received', function (done) {
consumer.on('message_received', function (message) {
assert.equal(message, response.Messages[0]);
done();
});
it('fires a message_received event when a message is received', function (done) {
consumer.on('message_received', function (message) {
assert.equal(message, response.Messages[0]);
done();
consumer.start();
});
consumer.start();
});
it('fires a message_processed event when a message is successfully deleted', function (done) {
handleMessage.yields(null);
it('fires a message_processed event when a message is successfully deleted', function (done) {
handleMessage.yields(null);
consumer.on('message_processed', function (message) {
assert.equal(message, response.Messages[0]);
done();
});
consumer.on('message_processed', function (message) {
assert.equal(message, response.Messages[0]);
done();
consumer.start();
});
consumer.start();
it('doesn\'t consumer more messages when called multiple times', function () {
consumer.start();
consumer.start();
consumer.start();
sinon.assert.calledOnce(sqs.receiveMessage);
});
});
describe('.stop', function () {
it('stops the consumer polling for messages', function (done) {
sqs.receiveMessage.onSecondCall().yields(null, response);
consumer.start();
consumer.stop();
setTimeout(function () {
sinon.assert.calledOnce(handleMessage);
done();
}, consumer.waitTime + 1);
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc