Socket
Socket
Sign inDemoInstall

sqs-consumer

Package Overview
Dependencies
39
Maintainers
1
Versions
90
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 5.2.0 to 5.3.0

.nyc_output/6b82944a-e1d8-43f1-b7e6-4250bba9eb0c.json

8

dist/consumer.d.ts

@@ -17,3 +17,4 @@ import * as SQS from 'aws-sdk/clients/sqs';

handleMessageTimeout?: number;
handleMessage(message: SQSMessage): Promise<void>;
handleMessage?(message: SQSMessage): Promise<void>;
handleMessageBatch?(messages: SQSMessage[]): Promise<void>;
}

@@ -23,2 +24,3 @@ export declare class Consumer extends EventEmitter {

private handleMessage;
private handleMessageBatch;
private handleMessageTimeout;

@@ -47,3 +49,7 @@ private attributeNames;

private poll;
private processMessageBatch;
private deleteMessageBatch;
private executeBatchHandler;
private terminateVisabilityTimeoutBatch;
}
export {};
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const debug = require('debug')('sqs-consumer');
const SQS = require("aws-sdk/clients/sqs");
const Debug = require("debug");
const events_1 = require("events");
const bind_1 = require("./bind");
const errors_1 = require("./errors");
const debug = Debug('sqs-consumer');
const requiredOptions = [
'queueUrl',
'handleMessage'
// only one of handleMessage / handleMessagesBatch is required
'handleMessage|handleMessageBatch'
];

@@ -23,4 +25,5 @@ function createTimeout(duration) {

requiredOptions.forEach((option) => {
if (!options[option]) {
throw new Error(`Missing SQS consumer option ['${option}'].`);
const possibilities = option.split('|');
if (!possibilities.find((p) => options[p])) {
throw new Error(`Missing SQS consumer option [ ${possibilities.join(' or ')} ].`);
}

@@ -57,2 +60,3 @@ });

this.handleMessage = options.handleMessage;
this.handleMessageBatch = options.handleMessageBatch;
this.handleMessageTimeout = options.handleMessageTimeout;

@@ -94,3 +98,9 @@ this.attributeNames = options.attributeNames || [];

if (hasMessages(response)) {
await Promise.all(response.Messages.map(this.processMessage));
if (this.handleMessageBatch) {
// prefer handling messages in batch when available
await this.processMessageBatch(response.Messages);
}
else {
await Promise.all(response.Messages.map(this.processMessage));
}
this.emit('response_processed');

@@ -225,3 +235,66 @@ }

}
async processMessageBatch(messages) {
messages.forEach((message) => {
this.emit('message_received', message);
});
try {
await this.executeBatchHandler(messages);
await this.deleteMessageBatch(messages);
messages.forEach((message) => {
this.emit('message_processed', message);
});
}
catch (err) {
this.emit('error', err, messages);
if (this.terminateVisibilityTimeout) {
try {
await this.terminateVisabilityTimeoutBatch(messages);
}
catch (err) {
this.emit('error', err, messages);
}
}
}
}
async deleteMessageBatch(messages) {
debug('Deleting messages %s', messages.map((msg) => msg.MessageId).join(' ,'));
const deleteParams = {
QueueUrl: this.queueUrl,
Entries: messages.map(message => ({
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle
}))
};
try {
await this.sqs
.deleteMessageBatch(deleteParams)
.promise();
}
catch (err) {
throw toSQSError(err, `SQS delete message failed: ${err.message}`);
}
}
async executeBatchHandler(messages) {
try {
await this.handleMessageBatch(messages);
}
catch (err) {
err.message = `Unexpected message handler failure: ${err.message}`;
throw err;
}
}
async terminateVisabilityTimeoutBatch(messages) {
const params = {
QueueUrl: this.queueUrl,
Entries: messages.map((message) => ({
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: 0
}))
};
return this.sqs
.changeMessageVisibilityBatch(params)
.promise();
}
}
exports.Consumer = Consumer;

18

package.json
{
"name": "sqs-consumer",
"version": "5.2.0",
"version": "5.3.0",
"description": "Build SQS-based Node applications without the boilerplate",

@@ -34,4 +34,4 @@ "main": "dist/index.js",

"devDependencies": {
"@types/aws-sdk": "^2.7.0",
"@types/chai": "^4.1.4",
"@types/debug": "^4.1.3",
"@types/mocha": "^2.2.43",

@@ -42,2 +42,7 @@ "@types/node": "^10.12.18",

"chai": "^4.2.0",
"codeclimate-test-reporter": "^0.5.1",
"mocha": "^5.2.0",
"nyc": "^13.1.0",
"p-event": "^2.1.0",
"sinon": "^7.2.2",
"ts-node": "^3.3.0",

@@ -47,11 +52,6 @@ "tslint": "^5.12.1",

"tslint-microsoft-contrib": "^5.0.3",
"typescript": "^2.6.1",
"codeclimate-test-reporter": "^0.5.1",
"mocha": "^5.2.0",
"nyc": "^13.1.0",
"p-event": "^2.1.0",
"sinon": "^7.2.2"
"typescript": "^2.6.1"
},
"dependencies": {
"aws-sdk": "^2.393.0",
"aws-sdk": "^2.443.0",
"debug": "^4.1.1"

@@ -58,0 +58,0 @@ },

@@ -100,3 +100,4 @@ # sqs-consumer

* `handleMessage` - _Function_ - An `async` function (or function that returns a `Promise`) to be called whenever a message is received. Receives an SQS message object as it's first argument.
* `handleMessageTimeout` - _Number_ - Time in ms to wait for `handleMessage` to process a message before timing out. Emits `timeout_error` on timeout. By default, if `handleMessage` times out, the unprocessed message returns to the end of the queue.
* `handleMessageBatch` - _Function_ - An `async` function (or function that returns a `Promise`) to be called whenever a batch of messages is received. Similar to `handleMessage` but will receive the list of messages, not each message individually. **If both are set, `handleMessageBatch` overrides `handleMessage`**.
* `handleMessageTimeout` - _String_ - Time in ms to wait for `handleMessage` to process a message before timing out. Emits `timeout_error` on timeout. By default, if `handleMessage` times out, the unprocessed message returns to the end of the queue.
* `attributeNames` - _Array_ - List of queue attributes to retrieve (i.e. `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`).

@@ -103,0 +104,0 @@ * `messageAttributeNames` - _Array_ - List of message attributes to retrieve (i.e. `['name', 'address']`).

@@ -1,6 +0,5 @@

const debug = require('debug')('sqs-consumer');
import { AWSError } from 'aws-sdk';
import * as SQS from 'aws-sdk/clients/sqs';
import { PromiseResult } from 'aws-sdk/lib/request';
import * as Debug from 'debug';
import { EventEmitter } from 'events';

@@ -10,2 +9,4 @@ import { autoBind } from './bind';

const debug = Debug('sqs-consumer');
type ReceieveMessageResponse = PromiseResult<SQS.Types.ReceiveMessageResult, AWSError>;

@@ -17,3 +18,4 @@ type SQSMessage = SQS.Types.Message;

'queueUrl',
'handleMessage'
// only one of handleMessage / handleMessagesBatch is required
'handleMessage|handleMessageBatch'
];

@@ -38,4 +40,5 @@

requiredOptions.forEach((option) => {
if (!options[option]) {
throw new Error(`Missing SQS consumer option ['${option}'].`);
const possibilities = option.split('|');
if (!possibilities.find((p) => options[p])) {
throw new Error(`Missing SQS consumer option [ ${possibilities.join(' or ')} ].`);
}

@@ -85,3 +88,4 @@ });

handleMessageTimeout?: number;
handleMessage(message: SQSMessage): Promise<void>;
handleMessage?(message: SQSMessage): Promise<void>;
handleMessageBatch?(messages: SQSMessage[]): Promise<void>;
}

@@ -92,2 +96,3 @@

private handleMessage: (message: SQSMessage) => Promise<void>;
private handleMessageBatch: (message: SQSMessage[]) => Promise<void>;
private handleMessageTimeout: number;

@@ -107,5 +112,5 @@ private attributeNames: string[];

assertOptions(options);
this.queueUrl = options.queueUrl;
this.handleMessage = options.handleMessage;
this.handleMessageBatch = options.handleMessageBatch;
this.handleMessageTimeout = options.handleMessageTimeout;

@@ -129,3 +134,3 @@ this.attributeNames = options.attributeNames || [];

public get isRunning(): boolean {
return !this.stopped;
return !this.stopped;
}

@@ -156,3 +161,8 @@

if (hasMessages(response)) {
await Promise.all(response.Messages.map(this.processMessage));
if (this.handleMessageBatch) {
// prefer handling messages in batch when available
await this.processMessageBatch(response.Messages);
} else {
await Promise.all(response.Messages.map(this.processMessage));
}
this.emit('response_processed');

@@ -289,2 +299,70 @@ } else {

}
private async processMessageBatch(messages: SQSMessage[]): Promise<void> {
messages.forEach((message) => {
this.emit('message_received', message);
});
try {
await this.executeBatchHandler(messages);
await this.deleteMessageBatch(messages);
messages.forEach((message) => {
this.emit('message_processed', message);
});
} catch (err) {
this.emit('error', err, messages);
if (this.terminateVisibilityTimeout) {
try {
await this.terminateVisabilityTimeoutBatch(messages);
} catch (err) {
this.emit('error', err, messages);
}
}
}
}
private async deleteMessageBatch(messages: SQSMessage[]): Promise<void> {
debug('Deleting messages %s', messages.map((msg) => msg.MessageId).join(' ,'));
const deleteParams = {
QueueUrl: this.queueUrl,
Entries: messages.map(message => ({
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle
}))
};
try {
await this.sqs
.deleteMessageBatch(deleteParams)
.promise();
} catch (err) {
throw toSQSError(err, `SQS delete message failed: ${err.message}`);
}
}
private async executeBatchHandler(messages: SQSMessage[]): Promise<void> {
try {
await this.handleMessageBatch(messages);
} catch (err) {
err.message = `Unexpected message handler failure: ${err.message}`;
throw err;
}
}
private async terminateVisabilityTimeoutBatch(messages: SQSMessage[]): Promise<PromiseResult<any, AWSError>> {
const params = {
QueueUrl: this.queueUrl,
Entries: messages.map((message) => ({
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: 0
}))
};
return this.sqs
.changeMessageVisibilityBatch(params)
.promise();
}
}

@@ -40,2 +40,3 @@ import { assert } from 'chai';

let handleMessage;
let handleMessageBatch;
let sqs;

@@ -52,5 +53,7 @@ const response = {

handleMessage = sandbox.stub().resolves(null);
handleMessageBatch = sandbox.stub().resolves(null);
sqs = sandbox.mock();
sqs.receiveMessage = stubResolve(response);
sqs.deleteMessage = stubResolve();
sqs.deleteMessageBatch = stubResolve();
sqs.changeMessageVisibility = stubResolve();

@@ -80,3 +83,3 @@

it('requires a handleMessage function to be set', () => {
it('requires a handleMessage or handleMessagesBatch function to be set', () => {
assert.throws(() => {

@@ -563,2 +566,39 @@ new Consumer({

});
it('calls the handleMessagesBatch function when a batch of messages is received', async () => {
consumer = new Consumer({
queueUrl: 'some-queue-url',
messageAttributeNames: ['attribute-1', 'attribute-2'],
region: 'some-region',
handleMessageBatch,
batchSize: 2,
sqs
});
consumer.start();
await pEvent(consumer, 'response_processed');
consumer.stop();
sandbox.assert.callCount(handleMessageBatch, 1);
});
it('prefers handleMessagesBatch over handleMessage when both are set', async () => {
consumer = new Consumer({
queueUrl: 'some-queue-url',
messageAttributeNames: ['attribute-1', 'attribute-2'],
region: 'some-region',
handleMessageBatch,
handleMessage,
batchSize: 2,
sqs
});
consumer.start();
await pEvent(consumer, 'response_processed');
consumer.stop();
sandbox.assert.callCount(handleMessageBatch, 1);
sandbox.assert.callCount(handleMessage, 0);
});
});

@@ -565,0 +605,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc