sqs-consumer
Advanced tools
Comparing version 3.5.0 to 3.6.0
13
index.js
@@ -62,2 +62,3 @@ 'use strict'; | ||
this.visibilityTimeout = options.visibilityTimeout; | ||
this.terminateVisibilityTimeout = options.terminateVisibilityTimeout || false; | ||
this.waitTimeSeconds = options.waitTimeSeconds || 20; | ||
@@ -166,2 +167,14 @@ this.authenticationErrorTimeout = options.authenticationErrorTimeout || 10000; | ||
} | ||
if (consumer.terminateVisibilityTimeout) { | ||
consumer.sqs.changeMessageVisibility({ | ||
QueueUrl: consumer.queueUrl, | ||
ReceiptHandle: message.ReceiptHandle, | ||
VisibilityTimeout: 0 | ||
}, function(err) { | ||
if (err) consumer.emit('error', err, message); | ||
cb(); | ||
}); | ||
return; | ||
} | ||
} else { | ||
@@ -168,0 +181,0 @@ consumer.emit('message_processed', message); |
{ | ||
"name": "sqs-consumer", | ||
"version": "3.5.0", | ||
"version": "3.6.0", | ||
"description": "Build SQS-based Node applications without the boilerplate", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -91,2 +91,3 @@ # sqs-consumer | ||
* `visibilityTimeout` - _Number_ - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. | ||
* `terminateVisibilityTimeout` - _Boolean_ - If true, sets the message visibility timeout to 0 after a `processing_error` (defaults to `false`). | ||
* `waitTimeSeconds` - _Number_ - The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning. | ||
@@ -116,1 +117,5 @@ * `authenticationErrorTimeout` - _Number_ - The duration (in milliseconds) to wait before retrying after an authentication error (defaults to `10000`). | ||
|`empty`|None|Fired when the queue is empty (All messages have been consumed).| | ||
### AWS IAM Permissions | ||
Consumer will receive and delete messages from the SQS queue. Ensure `sqs:ReceiveMessage` and `sqs:DeleteMessage` access is granted on the queue being consumed. |
@@ -26,2 +26,4 @@ 'use strict'; | ||
sqs._deleteMessage = sinon.stub().yieldsAsync(null); | ||
sqs.changeMessageVisibility = sinon.stub().yieldsAsync(null); | ||
consumer = new Consumer({ | ||
@@ -363,2 +365,56 @@ queueUrl: 'some-queue-url', | ||
}); | ||
it('terminate message visibility timeout on processing error', function (done) { | ||
handleMessage.yields(new Error('Processing error')); | ||
consumer.terminateVisibilityTimeout = true; | ||
consumer.on('processing_error', function () { | ||
setImmediate(function () { | ||
sinon.assert.calledWith(sqs.changeMessageVisibility, { | ||
QueueUrl: 'some-queue-url', | ||
ReceiptHandle: 'receipt-handle', | ||
VisibilityTimeout: 0 | ||
}); | ||
done(); | ||
}); | ||
}); | ||
consumer.start(); | ||
}); | ||
it('does not terminate visibility timeout when `terminateVisibilityTimeout` option is false', function (done) { | ||
handleMessage.yields(new Error('Processing error')); | ||
consumer.terminateVisibilityTimeout = false; | ||
consumer.on('processing_error', function () { | ||
setImmediate(function () { | ||
sinon.assert.notCalled(sqs.changeMessageVisibility); | ||
done(); | ||
}); | ||
}); | ||
consumer.start(); | ||
}); | ||
it('fires error event when failed to terminate visibility timeout on processing error', function (done) { | ||
handleMessage.yields(new Error('Processing error')); | ||
var sqsError = new Error('Processing error'); | ||
sqsError.name = 'SQSError'; | ||
sqs.changeMessageVisibility.yields(sqsError); | ||
consumer.terminateVisibilityTimeout = true; | ||
consumer.on('error', function () { | ||
setImmediate(function () { | ||
sinon.assert.calledWith(sqs.changeMessageVisibility, { | ||
QueueUrl: 'some-queue-url', | ||
ReceiptHandle: 'receipt-handle', | ||
VisibilityTimeout: 0 | ||
}); | ||
done(); | ||
}); | ||
}); | ||
consumer.start(); | ||
}); | ||
}); | ||
@@ -365,0 +421,0 @@ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
27418
10
565
120