sqs-consumer
Advanced tools
Comparing version 4.0.0 to 4.1.0
'use strict'; | ||
const AWS = require('aws-sdk'); | ||
const SQS = require('aws-sdk/clients/sqs'); | ||
const { EventEmitter } = require('events'); | ||
const { SQSError, TimeoutError } = require('./errors'); | ||
const debug = require('debug')('sqs-consumer'); | ||
@@ -14,7 +15,8 @@ | ||
class SQSError extends Error { | ||
constructor() { | ||
super(Array.from(arguments)); | ||
this.name = this.constructor.name; | ||
} | ||
function createTimeout(duration) { | ||
let timeout; | ||
const pending = new Promise((_, reject) => { | ||
timeout = setTimeout(() => reject(new TimeoutError()), duration); | ||
}); | ||
return [timeout, pending]; | ||
} | ||
@@ -61,2 +63,3 @@ | ||
this.handleMessage = options.handleMessage; | ||
this.handleMessageTimeout = options.handleMessageTimeout; | ||
this.attributeNames = options.attributeNames || []; | ||
@@ -71,3 +74,3 @@ this.messageAttributeNames = options.messageAttributeNames || []; | ||
this.sqs = options.sqs || new AWS.SQS({ | ||
this.sqs = options.sqs || new SQS({ | ||
region: options.region || process.env.AWS_REGION || 'eu-west-1' | ||
@@ -151,2 +154,4 @@ }); | ||
this.emit('error', err, message); | ||
} else if (err instanceof TimeoutError) { | ||
this.emit('timeout_error', err, message); | ||
} else { | ||
@@ -158,9 +163,3 @@ this.emit('processing_error', err, message); | ||
try { | ||
await this.sqs | ||
.changeMessageVisibility({ | ||
QueueUrl: this.queueUrl, | ||
ReceiptHandle: message.ReceiptHandle, | ||
VisibilityTimeout: 0 | ||
}) | ||
.promise(); | ||
await this._terminateVisabilityTimeout(message); | ||
} catch (err) { | ||
@@ -181,2 +180,12 @@ this.emit('error', err, message); | ||
async _terminateVisabilityTimeout(message) { | ||
return this.sqs | ||
.changeMessageVisibility({ | ||
QueueUrl: this.queueUrl, | ||
ReceiptHandle: message.ReceiptHandle, | ||
VisibilityTimeout: 0 | ||
}) | ||
.promise(); | ||
} | ||
async _deleteMessage(message) { | ||
@@ -198,6 +207,23 @@ debug('Deleting message %s', message.MessageId); | ||
async _executeHandler(message) { | ||
let timeout; | ||
let pending; | ||
try { | ||
await this.handleMessage(message); | ||
if (this.handleMessageTimeout) { | ||
[timeout, pending] = createTimeout(this.handleMessageTimeout); | ||
await Promise.race([ | ||
this.handleMessage(message), | ||
pending | ||
]); | ||
} else { | ||
await this.handleMessage(message); | ||
} | ||
} catch (err) { | ||
throw new Error('Unexpected message handler failure: ' + err.message); | ||
if (err instanceof TimeoutError) { | ||
err.message = `Message handler timed out after ${this.handleMessageTimeout}ms: ${err.message}`; | ||
} else { | ||
err.message = `Unexpected message handler failure: ${err.message}`; | ||
} | ||
throw err; | ||
} finally { | ||
clearTimeout(timeout); | ||
} | ||
@@ -204,0 +230,0 @@ } |
{ | ||
"name": "sqs-consumer", | ||
"version": "4.0.0", | ||
"version": "4.1.0", | ||
"description": "Build SQS-based Node applications without the boilerplate", | ||
@@ -38,3 +38,3 @@ "main": "index.js", | ||
"dependencies": { | ||
"aws-sdk": "^2.100.0", | ||
"aws-sdk": "^2.393.0", | ||
"debug": "^4.1.1" | ||
@@ -41,0 +41,0 @@ }, |
# sqs-consumer | ||
[![NPM downloads](https://img.shields.io/npm/dm/sqs-consumer.svg?style=flat)](https://npmjs.org/package/sqs-consumer) | ||
[![Build Status](https://travis-ci.org/bbc/sqs-consumer.svg)](https://travis-ci.org/bbc/sqs-consumer) | ||
@@ -81,2 +82,6 @@ [![Code Climate](https://codeclimate.com/github/BBC/sqs-consumer/badges/gpa.svg)](https://codeclimate.com/github/BBC/sqs-consumer) | ||
app.on('timeout_error', (err) => { | ||
console.error(err.message); | ||
}); | ||
app.start(); | ||
@@ -96,2 +101,3 @@ ``` | ||
* `handleMessage` - _Function_ - An `async` function (or function that returns a `Promise`) to be called whenever a message is received. Receives an SQS message object as it's first argument. | ||
* `handleMessageTimeout` - _String_ - Time in ms to wait for `handleMessage` to process a message before timing out. Emits `timeout_error` on timeout. By default, if `handleMessage` times out, the unprocessed message returns to the end of the queue. | ||
* `attributeNames` - _Array_ - List of queue attributes to retrieve (i.e. `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`). | ||
@@ -122,2 +128,3 @@ * `messageAttributeNames` - _Array_ - List of message attributes to retrieve (i.e. `['name', 'address']`). | ||
|`processing_error`|`err`, `message`|Fired when an error occurs processing the message.| | ||
|`timeout_error`|`err`, `message`|Fired when `handleMessageTimeout` is supplied as an option and if `handleMessage` times out.| | ||
|`message_received`|`message`|Fired when a message is received.| | ||
@@ -124,0 +131,0 @@ |`message_processed`|`message`|Fired when a message is successfully processed and removed from the queue.| |
@@ -151,2 +151,21 @@ 'use strict'; | ||
it('fires a timeout event if handler function takes too long', async () => { | ||
const handleMessageTimeout = 500; | ||
consumer = new Consumer({ | ||
queueUrl: 'some-queue-url', | ||
region: 'some-region', | ||
handleMessage: () => new Promise((resolve) => setTimeout(() => resolve(), 1000)), | ||
handleMessageTimeout, | ||
sqs, | ||
authenticationErrorTimeout: 20 | ||
}); | ||
consumer.start(); | ||
const err = await pEvent(consumer, 'timeout_error'); | ||
consumer.stop(); | ||
assert.ok(err); | ||
assert.equal(err.message, `Message handler timed out after ${handleMessageTimeout}ms: Operation timed out.`); | ||
}); | ||
it('handles unexpected exceptions thrown by the handler function', async () => { | ||
@@ -153,0 +172,0 @@ consumer = new Consumer({ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
32241
696
136
0
Updatedaws-sdk@^2.393.0