sqs-consumer
Advanced tools
Comparing version 1.0.6 to 1.1.0
25
index.js
@@ -35,2 +35,3 @@ var EventEmitter = require('events').EventEmitter; | ||
this.waitTime = options.waitTime || 100; | ||
this.stopped = true; | ||
this.sqs = options.sqs || new AWS.SQS(); | ||
@@ -45,2 +46,18 @@ } | ||
Consumer.prototype.start = function () { | ||
if (this.stopped) { | ||
debug('Starting consumer'); | ||
this.stopped = false; | ||
this._poll(); | ||
} | ||
}; | ||
/** | ||
* Stop polling for messages. | ||
*/ | ||
Consumer.prototype.stop = function () { | ||
debug('Stopping consumer'); | ||
this.stopped = true; | ||
}; | ||
Consumer.prototype._poll = function () { | ||
var receiveParams = { | ||
@@ -52,4 +69,6 @@ QueueUrl: this.queueUrl, | ||
debug('Polling for messages'); | ||
this.sqs.receiveMessage(receiveParams, this._handleSqsResponse.bind(this)); | ||
if (!this.stopped) { | ||
debug('Polling for messages'); | ||
this.sqs.receiveMessage(receiveParams, this._handleSqsResponse.bind(this)); | ||
} | ||
}; | ||
@@ -70,3 +89,3 @@ | ||
// Start polling for a new message after the wait time. | ||
setTimeout(this.start.bind(this), this.waitTime); | ||
setTimeout(this._poll.bind(this), this.waitTime); | ||
}; | ||
@@ -73,0 +92,0 @@ |
{ | ||
"name": "sqs-consumer", | ||
"version": "1.0.6", | ||
"version": "1.1.0", | ||
"description": "Build SQS-based Node applications without the boilerplate", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -57,2 +57,6 @@ # sqs-consumer | ||
### `consumer.stop()` | ||
Stop polling the queue for messages. | ||
### Events | ||
@@ -59,0 +63,0 @@ |
@@ -59,101 +59,125 @@ var Consumer = require('..'); | ||
it('calls the handleMessage function when a message is received', function () { | ||
consumer.start(); | ||
describe('.start', function () { | ||
it('calls the handleMessage function when a message is received', function () { | ||
consumer.start(); | ||
sinon.assert.calledWith(handleMessage, response.Messages[0]); | ||
}); | ||
sinon.assert.calledWith(handleMessage, response.Messages[0]); | ||
}); | ||
it('deletes the message when the handleMessage callback is called', function () { | ||
handleMessage.yields(null); | ||
it('deletes the message when the handleMessage callback is called', function () { | ||
handleMessage.yields(null); | ||
consumer.start(); | ||
consumer.start(); | ||
sinon.assert.calledWith(sqs.deleteMessage, { | ||
QueueUrl: 'some-queue-url', | ||
ReceiptHandle: 'receipt-handle' | ||
sinon.assert.calledWith(sqs.deleteMessage, { | ||
QueueUrl: 'some-queue-url', | ||
ReceiptHandle: 'receipt-handle' | ||
}); | ||
}); | ||
}); | ||
it('doesn\'t delete the message when a processing error is reported', function () { | ||
handleMessage.yields(new Error('Processing error')); | ||
it('doesn\'t delete the message when a processing error is reported', function () { | ||
handleMessage.yields(new Error('Processing error')); | ||
consumer.on('error', function () { | ||
// ignore the error | ||
consumer.on('error', function () { | ||
// ignore the error | ||
}); | ||
consumer.start(); | ||
sinon.assert.notCalled(sqs.deleteMessage); | ||
}); | ||
consumer.start(); | ||
it('waits before consuming new messages', function (done) { | ||
sqs.receiveMessage.onSecondCall().yields(null, response); | ||
sinon.assert.notCalled(sqs.deleteMessage); | ||
}); | ||
consumer.start(); | ||
it('waits before consuming new messages', function (done) { | ||
sqs.receiveMessage.onSecondCall().yields(null, response); | ||
setTimeout(function () { | ||
sinon.assert.calledTwice(handleMessage); | ||
done(); | ||
}, consumer.waitTime + 1); | ||
}); | ||
consumer.start(); | ||
it('fires an error event when an error occurs receiving a message', function (done) { | ||
var receiveErr = new Error('Receive error'); | ||
setTimeout(function () { | ||
sinon.assert.calledTwice(handleMessage); | ||
done(); | ||
}, consumer.waitTime + 1); | ||
}); | ||
sqs.receiveMessage.yields(receiveErr); | ||
it('fires an error event when an error occurs receiving a message', function (done) { | ||
var receiveErr = new Error('Receive error'); | ||
consumer.on('error', function (err) { | ||
assert.equal(err, receiveErr); | ||
done(); | ||
}); | ||
sqs.receiveMessage.yields(receiveErr); | ||
consumer.on('error', function (err) { | ||
assert.equal(err, receiveErr); | ||
done(); | ||
consumer.start(); | ||
}); | ||
consumer.start(); | ||
}); | ||
it('fires an error event when an error occurs deleting a message', function (done) { | ||
var deleteErr = new Error('Delete error'); | ||
it('fires an error event when an error occurs deleting a message', function (done) { | ||
var deleteErr = new Error('Delete error'); | ||
handleMessage.yields(null); | ||
sqs.deleteMessage.yields(deleteErr); | ||
handleMessage.yields(null); | ||
sqs.deleteMessage.yields(deleteErr); | ||
consumer.on('error', function (err) { | ||
assert.equal(err, deleteErr); | ||
done(); | ||
}); | ||
consumer.on('error', function (err) { | ||
assert.equal(err, deleteErr); | ||
done(); | ||
consumer.start(); | ||
}); | ||
consumer.start(); | ||
}); | ||
it('fires an error event when an error occurs processing a message', function (done) { | ||
var processingErr = new Error('Processing error'); | ||
it('fires an error event when an error occurs processing a message', function (done) { | ||
var processingErr = new Error('Processing error'); | ||
handleMessage.yields(processingErr); | ||
handleMessage.yields(processingErr); | ||
consumer.on('error', function (err) { | ||
assert.equal(err, processingErr); | ||
done(); | ||
}); | ||
consumer.on('error', function (err) { | ||
assert.equal(err, processingErr); | ||
done(); | ||
consumer.start(); | ||
}); | ||
consumer.start(); | ||
}); | ||
it('fires a message_received event when a message is received', function (done) { | ||
consumer.on('message_received', function (message) { | ||
assert.equal(message, response.Messages[0]); | ||
done(); | ||
}); | ||
it('fires a message_received event when a message is received', function (done) { | ||
consumer.on('message_received', function (message) { | ||
assert.equal(message, response.Messages[0]); | ||
done(); | ||
consumer.start(); | ||
}); | ||
consumer.start(); | ||
}); | ||
it('fires a message_processed event when a message is successfully deleted', function (done) { | ||
handleMessage.yields(null); | ||
it('fires a message_processed event when a message is successfully deleted', function (done) { | ||
handleMessage.yields(null); | ||
consumer.on('message_processed', function (message) { | ||
assert.equal(message, response.Messages[0]); | ||
done(); | ||
}); | ||
consumer.on('message_processed', function (message) { | ||
assert.equal(message, response.Messages[0]); | ||
done(); | ||
consumer.start(); | ||
}); | ||
consumer.start(); | ||
it('doesn\'t consumer more messages when called multiple times', function () { | ||
consumer.start(); | ||
consumer.start(); | ||
consumer.start(); | ||
sinon.assert.calledOnce(sqs.receiveMessage); | ||
}); | ||
}); | ||
describe('.stop', function () { | ||
it('stops the consumer polling for messages', function (done) { | ||
sqs.receiveMessage.onSecondCall().yields(null, response); | ||
consumer.start(); | ||
consumer.stop(); | ||
setTimeout(function () { | ||
sinon.assert.calledOnce(handleMessage); | ||
done(); | ||
}, consumer.waitTime + 1); | ||
}); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
11786
236
69