Socket
Socket
Sign inDemoInstall

sqs-consumer

Package Overview
Dependencies
Maintainers
1
Versions
104
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

sqs-consumer - npm Package Compare versions

Comparing version 4.0.0 to 4.1.0

lib/errors.js

58

lib/consumer.js
'use strict';
const AWS = require('aws-sdk');
const SQS = require('aws-sdk/clients/sqs');
const { EventEmitter } = require('events');
const { SQSError, TimeoutError } = require('./errors');
const debug = require('debug')('sqs-consumer');

@@ -14,7 +15,8 @@

class SQSError extends Error {
constructor() {
super(Array.from(arguments));
this.name = this.constructor.name;
}
function createTimeout(duration) {
let timeout;
const pending = new Promise((_, reject) => {
timeout = setTimeout(() => reject(new TimeoutError()), duration);
});
return [timeout, pending];
}

@@ -61,2 +63,3 @@

this.handleMessage = options.handleMessage;
this.handleMessageTimeout = options.handleMessageTimeout;
this.attributeNames = options.attributeNames || [];

@@ -71,3 +74,3 @@ this.messageAttributeNames = options.messageAttributeNames || [];

this.sqs = options.sqs || new AWS.SQS({
this.sqs = options.sqs || new SQS({
region: options.region || process.env.AWS_REGION || 'eu-west-1'

@@ -151,2 +154,4 @@ });

this.emit('error', err, message);
} else if (err instanceof TimeoutError) {
this.emit('timeout_error', err, message);
} else {

@@ -158,9 +163,3 @@ this.emit('processing_error', err, message);

try {
await this.sqs
.changeMessageVisibility({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: 0
})
.promise();
await this._terminateVisabilityTimeout(message);
} catch (err) {

@@ -181,2 +180,12 @@ this.emit('error', err, message);

async _terminateVisabilityTimeout(message) {
return this.sqs
.changeMessageVisibility({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: 0
})
.promise();
}
async _deleteMessage(message) {

@@ -198,6 +207,23 @@ debug('Deleting message %s', message.MessageId);

async _executeHandler(message) {
let timeout;
let pending;
try {
await this.handleMessage(message);
if (this.handleMessageTimeout) {
[timeout, pending] = createTimeout(this.handleMessageTimeout);
await Promise.race([
this.handleMessage(message),
pending
]);
} else {
await this.handleMessage(message);
}
} catch (err) {
throw new Error('Unexpected message handler failure: ' + err.message);
if (err instanceof TimeoutError) {
err.message = `Message handler timed out after ${this.handleMessageTimeout}ms: ${err.message}`;
} else {
err.message = `Unexpected message handler failure: ${err.message}`;
}
throw err;
} finally {
clearTimeout(timeout);
}

@@ -204,0 +230,0 @@ }

{
"name": "sqs-consumer",
"version": "4.0.0",
"version": "4.1.0",
"description": "Build SQS-based Node applications without the boilerplate",

@@ -38,3 +38,3 @@ "main": "index.js",

"dependencies": {
"aws-sdk": "^2.100.0",
"aws-sdk": "^2.393.0",
"debug": "^4.1.1"

@@ -41,0 +41,0 @@ },

# sqs-consumer
[![NPM downloads](https://img.shields.io/npm/dm/sqs-consumer.svg?style=flat)](https://npmjs.org/package/sqs-consumer)
[![Build Status](https://travis-ci.org/bbc/sqs-consumer.svg)](https://travis-ci.org/bbc/sqs-consumer)

@@ -81,2 +82,6 @@ [![Code Climate](https://codeclimate.com/github/BBC/sqs-consumer/badges/gpa.svg)](https://codeclimate.com/github/BBC/sqs-consumer)

app.on('timeout_error', (err) => {
console.error(err.message);
});
app.start();

@@ -96,2 +101,3 @@ ```

* `handleMessage` - _Function_ - An `async` function (or function that returns a `Promise`) to be called whenever a message is received. Receives an SQS message object as it's first argument.
* `handleMessageTimeout` - _String_ - Time in ms to wait for `handleMessage` to process a message before timing out. Emits `timeout_error` on timeout. By default, if `handleMessage` times out, the unprocessed message returns to the end of the queue.
* `attributeNames` - _Array_ - List of queue attributes to retrieve (i.e. `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`).

@@ -122,2 +128,3 @@ * `messageAttributeNames` - _Array_ - List of message attributes to retrieve (i.e. `['name', 'address']`).

|`processing_error`|`err`, `message`|Fired when an error occurs processing the message.|
|`timeout_error`|`err`, `message`|Fired when `handleMessageTimeout` is supplied as an option and if `handleMessage` times out.|
|`message_received`|`message`|Fired when a message is received.|

@@ -124,0 +131,0 @@ |`message_processed`|`message`|Fired when a message is successfully processed and removed from the queue.|

@@ -151,2 +151,21 @@ 'use strict';

it('fires a timeout event if handler function takes too long', async () => {
const handleMessageTimeout = 500;
consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
handleMessage: () => new Promise((resolve) => setTimeout(() => resolve(), 1000)),
handleMessageTimeout,
sqs,
authenticationErrorTimeout: 20
});
consumer.start();
const err = await pEvent(consumer, 'timeout_error');
consumer.stop();
assert.ok(err);
assert.equal(err.message, `Message handler timed out after ${handleMessageTimeout}ms: Operation timed out.`);
});
it('handles unexpected exceptions thrown by the handler function', async () => {

@@ -153,0 +172,0 @@ consumer = new Consumer({

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc