Socket
Socket
Sign inDemoInstall

sqs-consumer

Package Overview
Dependencies
Maintainers
2
Versions
101
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

sqs-consumer - npm Package Compare versions

Comparing version 5.8.0 to 6.0.0-alpha.1

dist/types.d.ts

21

dist/consumer.d.ts
/// <reference types="node" />
import { SQS } from 'aws-sdk';
import { SQSClient, Message } from '@aws-sdk/client-sqs';
import { EventEmitter } from 'events';
export type SQSMessage = SQS.Types.Message;
export interface ConsumerOptions {

@@ -17,8 +16,8 @@ queueUrl?: string;

heartbeatInterval?: number;
sqs?: SQS;
sqs?: SQSClient;
region?: string;
handleMessageTimeout?: number;
shouldDeleteMessages?: boolean;
handleMessage?(message: SQSMessage): Promise<void>;
handleMessageBatch?(messages: SQSMessage[]): Promise<void>;
handleMessage?(message: Message): Promise<void>;
handleMessageBatch?(messages: Message[]): Promise<void>;
}

@@ -28,7 +27,7 @@ interface Events {

empty: [];
message_received: [SQSMessage];
message_processed: [SQSMessage];
error: [Error, void | SQSMessage | SQSMessage[]];
timeout_error: [Error, SQSMessage];
processing_error: [Error, SQSMessage];
message_received: [Message];
message_processed: [Message];
error: [Error, void | Message | Message[]];
timeout_error: [Error, Message];
processing_error: [Error, Message];
stopped: [];

@@ -72,5 +71,5 @@ }

private executeBatchHandler;
private changeVisabilityTimeoutBatch;
private changeVisibilityTimeoutBatch;
private startHeartbeat;
}
export {};
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.Consumer = void 0;
const aws_sdk_1 = require("aws-sdk");
const client_sqs_1 = require("@aws-sdk/client-sqs");
const debug_1 = require("debug");

@@ -48,9 +48,10 @@ const events_1 = require("events");

function toSQSError(err, message) {
var _a, _b;
const sqsError = new errors_1.SQSError(message);
sqsError.code = err.code;
sqsError.statusCode = err.statusCode;
sqsError.region = err.region;
sqsError.retryable = err.retryable;
sqsError.hostname = err.hostname;
sqsError.time = err.time;
sqsError.code = err.name;
sqsError.statusCode = (_a = err.$metadata) === null || _a === void 0 ? void 0 : _a.httpStatusCode;
sqsError.retryable = (_b = err.$retryable) === null || _b === void 0 ? void 0 : _b.throttling;
sqsError.service = err.$service;
sqsError.fault = err.$fault;
sqsError.time = new Date();
return sqsError;

@@ -85,3 +86,3 @@ }

options.sqs ||
new aws_sdk_1.SQS({
new client_sqs_1.SQSClient({
region: options.region || process.env.AWS_REGION || 'eu-west-1'

@@ -161,3 +162,3 @@ });

try {
return await this.sqs.receiveMessage(params).promise();
return await this.sqs.send(new client_sqs_1.ReceiveMessageCommand(params));
}

@@ -179,3 +180,3 @@ catch (err) {

try {
await this.sqs.deleteMessage(deleteParams).promise();
await this.sqs.send(new client_sqs_1.DeleteMessageCommand(deleteParams));
}

@@ -213,9 +214,8 @@ catch (err) {

try {
return await this.sqs
.changeMessageVisibility({
const input = {
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: timeout
})
.promise();
};
return await this.sqs.send(new client_sqs_1.ChangeMessageVisibilityCommand(input));
}

@@ -277,3 +277,3 @@ catch (err) {

heartbeat = this.startHeartbeat(async () => {
return this.changeVisabilityTimeoutBatch(messages, this.visibilityTimeout);
return this.changeVisibilityTimeoutBatch(messages, this.visibilityTimeout);
});

@@ -290,3 +290,3 @@ }

if (this.terminateVisibilityTimeout) {
await this.changeVisabilityTimeoutBatch(messages, 0);
await this.changeVisibilityTimeoutBatch(messages, 0);
}

@@ -312,3 +312,3 @@ }

try {
await this.sqs.deleteMessageBatch(deleteParams).promise();
await this.sqs.send(new client_sqs_1.DeleteMessageBatchCommand(deleteParams));
}

@@ -328,3 +328,3 @@ catch (err) {

}
async changeVisabilityTimeoutBatch(messages, timeout) {
async changeVisibilityTimeoutBatch(messages, timeout) {
const params = {

@@ -339,3 +339,3 @@ QueueUrl: this.queueUrl,

try {
return await this.sqs.changeMessageVisibilityBatch(params).promise();
return await this.sqs.send(new client_sqs_1.ChangeMessageVisibilityBatchCommand(params));
}

@@ -342,0 +342,0 @@ catch (err) {

declare class SQSError extends Error {
code: string;
statusCode: number;
region: string;
hostname: string;
service: string;
time: Date;
retryable: boolean;
fault: 'client' | 'server';
constructor(message: string);

@@ -9,0 +9,0 @@ }

@@ -1,1 +0,1 @@

export { SQSMessage, Consumer, ConsumerOptions } from './consumer';
export { Consumer, ConsumerOptions } from './consumer';
{
"name": "sqs-consumer",
"version": "5.8.0",
"version": "6.0.0-alpha.1",
"description": "Build SQS-based Node applications without the boilerplate",

@@ -55,7 +55,7 @@ "main": "dist/index.js",

"dependencies": {
"aws-sdk": "^2.1271.0",
"@aws-sdk/client-sqs": "^3.226.0",
"debug": "^4.3.4"
},
"peerDependencies": {
"aws-sdk": "^2.1271.0"
"@aws-sdk/client-sqs": "^3.226.0"
},

@@ -62,0 +62,0 @@ "mocha": {

@@ -16,6 +16,13 @@ # sqs-consumer

> **Note**
> This library assumes you are using [AWS SDK v3](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/index.html). If you are using v2, please install v5.8.0:
>
> ```bash
> npm install sqs-consumer@5.8.0 --save-dev
> ```
## Usage
```js
const { Consumer } = require('sqs-consumer');
import { Consumer } from 'sqs-consumer';

@@ -44,34 +51,3 @@ const app = Consumer.create({

- By default messages are processed one at a time – a new message won't be received until the first one has been processed. To process messages in parallel, use the `batchSize` option [detailed below](#options).
- By default, the default Node.js HTTP/HTTPS SQS agent creates a new TCP connection for every new request ([AWS SQS documentation](https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/node-reusing-connections.html)). To avoid the cost of establishing a new connection, you can reuse an existing connection by passing a new SQS instance with `keepAlive: true`.
```js
const { Consumer } = require('sqs-consumer');
const AWS = require('aws-sdk');
const https = require('https');
const app = Consumer.create({
queueUrl: 'https://sqs.eu-west-1.amazonaws.com/account-id/queue-name',
handleMessage: async (message) => {
// do some work with `message`
},
sqs: new AWS.SQS({
httpOptions: {
agent: new https.Agent({
keepAlive: true
})
}
})
});
app.on('error', (err) => {
console.error(err.message);
});
app.on('processing_error', (err) => {
console.error(err.message);
});
app.start();
```
### Credentials

@@ -86,14 +62,8 @@

If you need to specify your credentials manually, you can use a pre-configured instance of the [AWS SQS](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html) client:
If you need to specify your credentials manually, you can use a pre-configured instance of the [SQS Client](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/classes/sqsclient.html) client.
```js
const { Consumer } = require('sqs-consumer');
const AWS = require('aws-sdk');
import { Consumer } from 'sqs-consumer';
import { SQSClient } from '@aws-sdk/client-sqs';
AWS.config.update({
region: 'eu-west-1',
accessKeyId: '...',
secretAccessKey: '...'
});
const app = Consumer.create({

@@ -104,3 +74,9 @@ queueUrl: 'https://sqs.eu-west-1.amazonaws.com/account-id/queue-name',

},
sqs: new AWS.SQS()
sqs: new SQSClient({
region: 'my-region',
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY
}
})
});

@@ -145,3 +121,3 @@

- `pollingWaitTimeMs` - _Number_ - The duration (in milliseconds) to wait before repolling the queue (defaults to `0`).
- `sqs` - _Object_ - An optional [AWS SQS](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html) object to use if you need to configure the client manually
- `sqs` - _Object_ - An optional [SQS Client](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/classes/sqsclient.html) object to use if you need to configure the client manually
- `shouldDeleteMessages` - _Boolean_ - Default to `true`, if you don't want the package to delete messages from sqs set this to `false`

@@ -148,0 +124,0 @@

@@ -1,5 +0,22 @@

import { AWSError, SQS } from 'aws-sdk';
import { PromiseResult } from 'aws-sdk/lib/request';
import {
SQSClient,
Message,
ChangeMessageVisibilityCommand,
ChangeMessageVisibilityCommandInput,
ChangeMessageVisibilityCommandOutput,
ChangeMessageVisibilityBatchCommand,
ChangeMessageVisibilityBatchCommandInput,
ChangeMessageVisibilityBatchCommandOutput,
DeleteMessageCommand,
DeleteMessageCommandInput,
DeleteMessageBatchCommand,
DeleteMessageBatchCommandInput,
ReceiveMessageCommand,
ReceiveMessageCommandInput,
ReceiveMessageCommandOutput
} from '@aws-sdk/client-sqs';
import Debug from 'debug';
import { EventEmitter } from 'events';
import { AWSError } from './types';
import { autoBind } from './bind';

@@ -10,9 +27,2 @@ import { SQSError, TimeoutError } from './errors';

type ReceieveMessageResponse = PromiseResult<
SQS.Types.ReceiveMessageResult,
AWSError
>;
type ReceiveMessageRequest = SQS.Types.ReceiveMessageRequest;
export type SQSMessage = SQS.Types.Message;
const requiredOptions = [

@@ -74,8 +84,8 @@ 'queueUrl',

const sqsError = new SQSError(message);
sqsError.code = err.code;
sqsError.statusCode = err.statusCode;
sqsError.region = err.region;
sqsError.retryable = err.retryable;
sqsError.hostname = err.hostname;
sqsError.time = err.time;
sqsError.code = err.name;
sqsError.statusCode = err.$metadata?.httpStatusCode;
sqsError.retryable = err.$retryable?.throttling;
sqsError.service = err.$service;
sqsError.fault = err.$fault;
sqsError.time = new Date();

@@ -85,3 +95,3 @@ return sqsError;

function hasMessages(response: ReceieveMessageResponse): boolean {
function hasMessages(response: ReceiveMessageCommandOutput): boolean {
return response.Messages && response.Messages.length > 0;

@@ -102,8 +112,8 @@ }

heartbeatInterval?: number;
sqs?: SQS;
sqs?: SQSClient;
region?: string;
handleMessageTimeout?: number;
shouldDeleteMessages?: boolean;
handleMessage?(message: SQSMessage): Promise<void>;
handleMessageBatch?(messages: SQSMessage[]): Promise<void>;
handleMessage?(message: Message): Promise<void>;
handleMessageBatch?(messages: Message[]): Promise<void>;
}

@@ -114,7 +124,7 @@

empty: [];
message_received: [SQSMessage];
message_processed: [SQSMessage];
error: [Error, void | SQSMessage | SQSMessage[]];
timeout_error: [Error, SQSMessage];
processing_error: [Error, SQSMessage];
message_received: [Message];
message_processed: [Message];
error: [Error, void | Message | Message[]];
timeout_error: [Error, Message];
processing_error: [Error, Message];
stopped: [];

@@ -125,4 +135,4 @@ }

private queueUrl: string;
private handleMessage: (message: SQSMessage) => Promise<void>;
private handleMessageBatch: (message: SQSMessage[]) => Promise<void>;
private handleMessage: (message: Message) => Promise<void>;
private handleMessageBatch: (message: Message[]) => Promise<void>;
private handleMessageTimeout: number;

@@ -139,3 +149,3 @@ private attributeNames: string[];

private heartbeatInterval: number;
private sqs: SQS;
private sqs: SQSClient;
private shouldDeleteMessages: boolean;

@@ -166,3 +176,3 @@

options.sqs ||
new SQS({
new SQSClient({
region: options.region || process.env.AWS_REGION || 'eu-west-1'

@@ -214,3 +224,3 @@ });

private async handleSqsResponse(
response: ReceieveMessageResponse
response: ReceiveMessageCommandOutput
): Promise<void> {

@@ -235,3 +245,3 @@ debug('Received SQS response');

private async processMessage(message: SQSMessage): Promise<void> {
private async processMessage(message: Message): Promise<void> {
this.emit('message_received', message);

@@ -261,6 +271,6 @@

private async receiveMessage(
params: ReceiveMessageRequest
): Promise<ReceieveMessageResponse> {
params: ReceiveMessageCommandInput
): Promise<ReceiveMessageCommandOutput> {
try {
return await this.sqs.receiveMessage(params).promise();
return await this.sqs.send(new ReceiveMessageCommand(params));
} catch (err) {

@@ -271,3 +281,3 @@ throw toSQSError(err, `SQS receive message failed: ${err.message}`);

private async deleteMessage(message: SQSMessage): Promise<void> {
private async deleteMessage(message: Message): Promise<void> {
if (!this.shouldDeleteMessages) {

@@ -281,3 +291,3 @@ debug(

const deleteParams = {
const deleteParams: DeleteMessageCommandInput = {
QueueUrl: this.queueUrl,

@@ -288,3 +298,3 @@ ReceiptHandle: message.ReceiptHandle

try {
await this.sqs.deleteMessage(deleteParams).promise();
await this.sqs.send(new DeleteMessageCommand(deleteParams));
} catch (err) {

@@ -295,3 +305,3 @@ throw toSQSError(err, `SQS delete message failed: ${err.message}`);

private async executeHandler(message: SQSMessage): Promise<void> {
private async executeHandler(message: Message): Promise<void> {
let timeout;

@@ -319,13 +329,12 @@ let pending;

private async changeVisibilityTimeout(
message: SQSMessage,
message: Message,
timeout: number
): Promise<PromiseResult<any, AWSError>> {
): Promise<ChangeMessageVisibilityCommandOutput> {
try {
return await this.sqs
.changeMessageVisibility({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: timeout
})
.promise();
const input: ChangeMessageVisibilityCommandInput = {
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: timeout
};
return await this.sqs.send(new ChangeMessageVisibilityCommand(input));
} catch (err) {

@@ -340,3 +349,3 @@ this.emit(

private emitError(err: Error, message: SQSMessage): void {
private emitError(err: Error, message: Message): void {
if (err.name === SQSError.name) {

@@ -358,3 +367,3 @@ this.emit('error', err, message);

debug('Polling for messages');
const receiveParams = {
const receiveParams: ReceiveMessageCommandInput = {
QueueUrl: this.queueUrl,

@@ -387,3 +396,3 @@ AttributeNames: this.attributeNames,

private async processMessageBatch(messages: SQSMessage[]): Promise<void> {
private async processMessageBatch(messages: Message[]): Promise<void> {
messages.forEach((message) => {

@@ -397,3 +406,3 @@ this.emit('message_received', message);

heartbeat = this.startHeartbeat(async () => {
return this.changeVisabilityTimeoutBatch(
return this.changeVisibilityTimeoutBatch(
messages,

@@ -413,3 +422,3 @@ this.visibilityTimeout

if (this.terminateVisibilityTimeout) {
await this.changeVisabilityTimeoutBatch(messages, 0);
await this.changeVisibilityTimeoutBatch(messages, 0);
}

@@ -421,3 +430,3 @@ } finally {

private async deleteMessageBatch(messages: SQSMessage[]): Promise<void> {
private async deleteMessageBatch(messages: Message[]): Promise<void> {
if (!this.shouldDeleteMessages) {

@@ -434,3 +443,3 @@ debug(

const deleteParams = {
const deleteParams: DeleteMessageBatchCommandInput = {
QueueUrl: this.queueUrl,

@@ -444,3 +453,3 @@ Entries: messages.map((message) => ({

try {
await this.sqs.deleteMessageBatch(deleteParams).promise();
await this.sqs.send(new DeleteMessageBatchCommand(deleteParams));
} catch (err) {

@@ -451,3 +460,3 @@ throw toSQSError(err, `SQS delete message failed: ${err.message}`);

private async executeBatchHandler(messages: SQSMessage[]): Promise<void> {
private async executeBatchHandler(messages: Message[]): Promise<void> {
try {

@@ -461,7 +470,7 @@ await this.handleMessageBatch(messages);

private async changeVisabilityTimeoutBatch(
messages: SQSMessage[],
private async changeVisibilityTimeoutBatch(
messages: Message[],
timeout: number
): Promise<PromiseResult<any, AWSError>> {
const params = {
): Promise<ChangeMessageVisibilityBatchCommandOutput> {
const params: ChangeMessageVisibilityBatchCommandInput = {
QueueUrl: this.queueUrl,

@@ -475,3 +484,5 @@ Entries: messages.map((message) => ({

try {
return await this.sqs.changeMessageVisibilityBatch(params).promise();
return await this.sqs.send(
new ChangeMessageVisibilityBatchCommand(params)
);
} catch (err) {

@@ -478,0 +489,0 @@ this.emit(

class SQSError extends Error {
code: string;
statusCode: number;
region: string;
hostname: string;
service: string;
time: Date;
retryable: boolean;
fault: 'client' | 'server';

@@ -9,0 +9,0 @@ constructor(message: string) {

@@ -1,1 +0,1 @@

export { SQSMessage, Consumer, ConsumerOptions } from './consumer';
export { Consumer, ConsumerOptions } from './consumer';

@@ -0,4 +1,14 @@

import {
ChangeMessageVisibilityBatchCommand,
ChangeMessageVisibilityCommand,
DeleteMessageBatchCommand,
DeleteMessageCommand,
ReceiveMessageCommand,
SQSClient
} from '@aws-sdk/client-sqs';
import { assert } from 'chai';
import * as sinon from 'sinon';
import * as pEvent from 'p-event';
import { AWSError } from '../src/types';
import { Consumer } from '../src/consumer';

@@ -10,18 +20,28 @@

const POLLING_TIMEOUT = 100;
const QUEUE_URL = 'some-queue-url';
const REGION = 'some-region';
function stubResolve(value?: any): any {
return sandbox.stub().returns({ promise: sandbox.stub().resolves(value) });
}
const mockReceiveMessage = sinon.match.instanceOf(ReceiveMessageCommand);
const mockDeleteMessage = sinon.match.instanceOf(DeleteMessageCommand);
const mockDeleteMessageBatch = sinon.match.instanceOf(
DeleteMessageBatchCommand
);
const mockChangeMessageVisibility = sinon.match.instanceOf(
ChangeMessageVisibilityCommand
);
const mockChangeMessageVisibilityBatch = sinon.match.instanceOf(
ChangeMessageVisibilityBatchCommand
);
function stubReject(value?: any): any {
return sandbox.stub().returns({ promise: sandbox.stub().rejects(value) });
}
class MockSQSError extends Error {
code: string;
statusCode: number;
region: string;
hostname: string;
class MockSQSError extends Error implements AWSError {
name: string;
$metadata: {
httpStatusCode: number;
};
$service: string;
$retryable: {
throttling: boolean;
};
$fault: 'client' | 'server';
time: Date;
retryable: boolean;

@@ -54,15 +74,17 @@ constructor(message: string) {

handleMessageBatch = sandbox.stub().resolves(null);
sqs = sandbox.mock();
sqs.receiveMessage = stubResolve(response);
sqs.deleteMessage = stubResolve();
sqs.deleteMessageBatch = stubResolve();
sqs.changeMessageVisibility = stubResolve();
sqs.changeMessageVisibilityBatch = stubResolve();
sqs = sinon.createStubInstance(SQSClient);
sqs.send = sinon.stub();
sqs.send.withArgs(mockReceiveMessage).resolves(response);
sqs.send.withArgs(mockDeleteMessage).resolves();
sqs.send.withArgs(mockDeleteMessageBatch).resolves();
sqs.send.withArgs(mockChangeMessageVisibility).resolves();
sqs.send.withArgs(mockChangeMessageVisibilityBatch).resolves();
consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
queueUrl: QUEUE_URL,
region: REGION,
handleMessage,
sqs,
authenticationErrorTimeout: 20
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT
});

@@ -79,3 +101,3 @@ });

Consumer.create({
region: 'some-region',
region: REGION,
handleMessage

@@ -90,4 +112,4 @@ });

handleMessage: undefined,
region: 'some-region',
queueUrl: 'some-queue-url'
region: REGION,
queueUrl: QUEUE_URL
});

@@ -100,4 +122,4 @@ });

new Consumer({
region: 'some-region',
queueUrl: 'some-queue-url',
region: REGION,
queueUrl: QUEUE_URL,
handleMessage,

@@ -112,4 +134,4 @@ batchSize: 11

new Consumer({
region: 'some-region',
queueUrl: 'some-queue-url',
region: REGION,
queueUrl: QUEUE_URL,
handleMessage,

@@ -124,4 +146,4 @@ batchSize: -1

new Consumer({
region: 'some-region',
queueUrl: 'some-queue-url',
region: REGION,
queueUrl: QUEUE_URL,
handleMessage,

@@ -136,4 +158,4 @@ heartbeatInterval: 30

new Consumer({
region: 'some-region',
queueUrl: 'some-queue-url',
region: REGION,
queueUrl: QUEUE_URL,
handleMessage,

@@ -149,4 +171,4 @@ heartbeatInterval: 30,

const instance = Consumer.create({
region: 'some-region',
queueUrl: 'some-queue-url',
region: REGION,
queueUrl: QUEUE_URL,
batchSize: 1,

@@ -166,3 +188,3 @@ visibilityTimeout: 10,

sqs.receiveMessage = stubReject(receiveErr);
sqs.send.withArgs(mockReceiveMessage).rejects(receiveErr);

@@ -180,10 +202,13 @@ consumer.start();

const receiveErr = new MockSQSError('Receive error');
receiveErr.code = 'short code';
receiveErr.retryable = false;
receiveErr.statusCode = 403;
receiveErr.name = 'short code';
receiveErr.$retryable = {
throttling: false
};
receiveErr.$metadata = {
httpStatusCode: 403
};
receiveErr.time = new Date();
receiveErr.hostname = 'hostname';
receiveErr.region = 'eu-west-1';
receiveErr.$service = 'service';
sqs.receiveMessage = stubReject(receiveErr);
sqs.send.withArgs(mockReceiveMessage).rejects(receiveErr);

@@ -196,8 +221,8 @@ consumer.start();

assert.equal(err.message, 'SQS receive message failed: Receive error');
assert.equal(err.code, receiveErr.code);
assert.equal(err.retryable, receiveErr.retryable);
assert.equal(err.statusCode, receiveErr.statusCode);
assert.equal(err.time, receiveErr.time);
assert.equal(err.hostname, receiveErr.hostname);
assert.equal(err.region, receiveErr.region);
assert.equal(err.code, receiveErr.name);
assert.equal(err.retryable, receiveErr.$retryable.throttling);
assert.equal(err.statusCode, receiveErr.$metadata.httpStatusCode);
assert.equal(err.time.toString(), receiveErr.time.toString());
assert.equal(err.service, receiveErr.$service);
assert.equal(err.fault, receiveErr.$fault);
});

@@ -208,4 +233,4 @@

consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
queueUrl: QUEUE_URL,
region: REGION,
handleMessage: () =>

@@ -215,3 +240,3 @@ new Promise((resolve) => setTimeout(resolve, 1000)),

sqs,
authenticationErrorTimeout: 20
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT
});

@@ -235,4 +260,4 @@

consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
queueUrl: QUEUE_URL,
region: REGION,
handleMessage: () => {

@@ -242,3 +267,3 @@ throw new Error('unexpected parsing error');

sqs,
authenticationErrorTimeout: 20
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT
});

@@ -261,3 +286,3 @@

handleMessage.resolves(null);
sqs.deleteMessage = stubReject(deleteErr);
sqs.send.withArgs(mockDeleteMessage).rejects(deleteErr);

@@ -298,3 +323,3 @@ consumer.start();

handleMessage.resolves(sqsError);
sqs.deleteMessage = stubReject(sqsError);
sqs.send.withArgs(mockDeleteMessage).rejects(sqsError);

@@ -316,6 +341,6 @@ consumer.start();

const credentialsErr = {
code: 'CredentialsError',
name: 'CredentialsError',
message: 'Missing credentials in config'
};
sqs.receiveMessage = stubReject(credentialsErr);
sqs.send.withArgs(mockReceiveMessage).rejects(credentialsErr);
const errorListener = sandbox.stub();

@@ -329,3 +354,4 @@ consumer.on('error', errorListener);

sandbox.assert.calledTwice(errorListener);
sandbox.assert.calledTwice(sqs.receiveMessage);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage);
});

@@ -335,6 +361,8 @@

const invalidSignatureErr = {
statusCode: 403,
$metadata: {
httpStatusCode: 403
},
message: 'The security token included in the request is invalid'
};
sqs.receiveMessage = stubReject(invalidSignatureErr);
sqs.send.withArgs(mockReceiveMessage).rejects(invalidSignatureErr);
const errorListener = sandbox.stub();

@@ -348,3 +376,4 @@ consumer.on('error', errorListener);

sandbox.assert.calledTwice(errorListener);
sandbox.assert.calledTwice(sqs.receiveMessage);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage);
});

@@ -354,7 +383,7 @@

const unknownEndpointErr = {
code: 'UnknownEndpoint',
name: 'UnknownEndpoint',
message:
'Inaccessible host: `sqs.eu-west-1.amazonaws.com`. This service may not be available in the `eu-west-1` region.'
};
sqs.receiveMessage = stubReject(unknownEndpointErr);
sqs.send.withArgs(mockReceiveMessage).rejects(unknownEndpointErr);
const errorListener = sandbox.stub();

@@ -368,3 +397,5 @@ consumer.on('error', errorListener);

sandbox.assert.calledTwice(errorListener);
sandbox.assert.calledTwice(sqs.receiveMessage);
sandbox.assert.calledTwice(sqs.send);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage);
});

@@ -374,8 +405,8 @@

consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
queueUrl: QUEUE_URL,
region: REGION,
handleMessage,
sqs,
authenticationErrorTimeout: 20,
pollingWaitTimeMs: 100
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
pollingWaitTimeMs: POLLING_TIMEOUT
});

@@ -387,3 +418,7 @@

sandbox.assert.calledTwice(sqs.receiveMessage);
sandbox.assert.callCount(sqs.send, 4);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.secondCall, mockDeleteMessage);
sandbox.assert.calledWithMatch(sqs.send.thirdCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.getCall(3), mockDeleteMessage);
});

@@ -424,6 +459,10 @@

sandbox.assert.calledWith(sqs.deleteMessage, {
QueueUrl: 'some-queue-url',
ReceiptHandle: 'receipt-handle'
});
sandbox.assert.calledWith(sqs.send.secondCall, mockDeleteMessage);
sandbox.assert.match(
sqs.send.secondCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
ReceiptHandle: 'receipt-handle'
})
);
});

@@ -438,3 +477,3 @@

sandbox.assert.notCalled(sqs.deleteMessage);
sandbox.assert.neverCalledWithMatch(sqs.send, mockDeleteMessage);
});

@@ -453,5 +492,5 @@

it("doesn't consume more messages when called multiple times", () => {
sqs.receiveMessage = stubResolve(
new Promise((res) => setTimeout(res, 100))
);
sqs.send
.withArgs(mockReceiveMessage)
.resolves(new Promise((res) => setTimeout(res, 100)));
consumer.start();

@@ -464,7 +503,7 @@ consumer.start();

sandbox.assert.calledOnce(sqs.receiveMessage);
sqs.send.calledOnceWith(mockReceiveMessage);
});
it('consumes multiple messages when the batchSize is greater than 1', async () => {
sqs.receiveMessage = stubResolve({
sqs.send.withArgs(mockReceiveMessage).resolves({
Messages: [

@@ -490,5 +529,5 @@ {

consumer = new Consumer({
queueUrl: 'some-queue-url',
queueUrl: QUEUE_URL,
messageAttributeNames: ['attribute-1', 'attribute-2'],
region: 'some-region',
region: REGION,
handleMessage,

@@ -500,18 +539,18 @@ batchSize: 3,

consumer.start();
await pEvent(consumer, 'message_received');
consumer.stop();
return new Promise((resolve) => {
handleMessage.onThirdCall().callsFake(() => {
sandbox.assert.calledWith(sqs.receiveMessage, {
QueueUrl: 'some-queue-url',
AttributeNames: [],
MessageAttributeNames: ['attribute-1', 'attribute-2'],
MaxNumberOfMessages: 3,
WaitTimeSeconds: 20,
VisibilityTimeout: undefined
});
sandbox.assert.callCount(handleMessage, 3);
consumer.stop();
resolve();
});
});
sandbox.assert.callCount(handleMessage, 3);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.match(
sqs.send.firstCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
AttributeNames: [],
MessageAttributeNames: ['attribute-1', 'attribute-2'],
MaxNumberOfMessages: 3,
WaitTimeSeconds: AUTHENTICATION_ERROR_TIMEOUT,
VisibilityTimeout: undefined
})
);
});

@@ -529,3 +568,3 @@

sqs.receiveMessage = stubResolve({
sqs.send.withArgs(mockReceiveMessage).resolves({
Messages: [messageWithAttr]

@@ -535,5 +574,5 @@ });

consumer = new Consumer({
queueUrl: 'some-queue-url',
queueUrl: QUEUE_URL,
attributeNames: ['ApproximateReceiveCount'],
region: 'some-region',
region: REGION,
handleMessage,

@@ -547,10 +586,14 @@ sqs

sandbox.assert.calledWith(sqs.receiveMessage, {
QueueUrl: 'some-queue-url',
AttributeNames: ['ApproximateReceiveCount'],
MessageAttributeNames: [],
MaxNumberOfMessages: 1,
WaitTimeSeconds: 20,
VisibilityTimeout: undefined
});
sandbox.assert.calledWith(sqs.send, mockReceiveMessage);
sandbox.assert.match(
sqs.send.firstCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
AttributeNames: ['ApproximateReceiveCount'],
MessageAttributeNames: [],
MaxNumberOfMessages: 1,
WaitTimeSeconds: AUTHENTICATION_ERROR_TIMEOUT,
VisibilityTimeout: undefined
})
);

@@ -561,3 +604,3 @@ assert.equal(message, messageWithAttr);

it('fires an emptyQueue event when all messages have been consumed', async () => {
sqs.receiveMessage = stubResolve({});
sqs.send.withArgs(mockReceiveMessage).resolves({});

@@ -578,7 +621,14 @@ consumer.start();

sandbox.assert.calledWith(sqs.changeMessageVisibility, {
QueueUrl: 'some-queue-url',
ReceiptHandle: 'receipt-handle',
VisibilityTimeout: 0
});
sandbox.assert.calledWith(
sqs.send.secondCall,
mockChangeMessageVisibility
);
sandbox.assert.match(
sqs.send.secondCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
ReceiptHandle: 'receipt-handle',
VisibilityTimeout: 0
})
);
});

@@ -594,3 +644,3 @@

sandbox.assert.notCalled(sqs.changeMessageVisibility);
sqs.send.neverCalledWith(mockChangeMessageVisibility);
});

@@ -603,3 +653,3 @@

sqsError.name = 'SQSError';
sqs.changeMessageVisibility = stubReject(sqsError);
sqs.send.withArgs(mockChangeMessageVisibility).rejects(sqsError);
consumer.terminateVisibilityTimeout = true;

@@ -611,11 +661,18 @@

sandbox.assert.calledWith(sqs.changeMessageVisibility, {
QueueUrl: 'some-queue-url',
ReceiptHandle: 'receipt-handle',
VisibilityTimeout: 0
});
sandbox.assert.calledWith(
sqs.send.secondCall,
mockChangeMessageVisibility
);
sandbox.assert.match(
sqs.send.secondCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
ReceiptHandle: 'receipt-handle',
VisibilityTimeout: 0
})
);
});
it('fires response_processed event for each batch', async () => {
sqs.receiveMessage = stubResolve({
sqs.send.withArgs(mockReceiveMessage).resolves({
Messages: [

@@ -637,5 +694,5 @@ {

consumer = new Consumer({
queueUrl: 'some-queue-url',
queueUrl: QUEUE_URL,
messageAttributeNames: ['attribute-1', 'attribute-2'],
region: 'some-region',
region: REGION,
handleMessage,

@@ -655,5 +712,5 @@ batchSize: 2,

consumer = new Consumer({
queueUrl: 'some-queue-url',
queueUrl: QUEUE_URL,
messageAttributeNames: ['attribute-1', 'attribute-2'],
region: 'some-region',
region: REGION,
handleMessageBatch,

@@ -673,5 +730,5 @@ batchSize: 2,

consumer = new Consumer({
queueUrl: 'some-queue-url',
queueUrl: QUEUE_URL,
messageAttributeNames: ['attribute-1', 'attribute-2'],
region: 'some-region',
region: REGION,
handleMessageBatch,

@@ -693,4 +750,4 @@ handleMessage,

consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
queueUrl: QUEUE_URL,
region: REGION,
handleMessage: () =>

@@ -711,12 +768,26 @@ new Promise((resolve) => setTimeout(resolve, 75000)),

sandbox.assert.calledWith(sqs.changeMessageVisibility, {
QueueUrl: 'some-queue-url',
ReceiptHandle: 'receipt-handle',
VisibilityTimeout: 40
});
sandbox.assert.calledWith(sqs.changeMessageVisibility, {
QueueUrl: 'some-queue-url',
ReceiptHandle: 'receipt-handle',
VisibilityTimeout: 40
});
sandbox.assert.calledWith(
sqs.send.secondCall,
mockChangeMessageVisibility
);
sandbox.assert.match(
sqs.send.secondCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
ReceiptHandle: 'receipt-handle',
VisibilityTimeout: 40
})
);
sandbox.assert.calledWith(
sqs.send.thirdCall,
mockChangeMessageVisibility
);
sandbox.assert.match(
sqs.send.thirdCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
ReceiptHandle: 'receipt-handle',
VisibilityTimeout: 40
})
);
sandbox.assert.calledOnce(clearIntervalSpy);

@@ -726,3 +797,3 @@ });

it('passes in the correct visibility timeout for long running batch handler functions', async () => {
sqs.receiveMessage = stubResolve({
sqs.send.withArgs(mockReceiveMessage).resolves({
Messages: [

@@ -735,4 +806,4 @@ { MessageId: '1', ReceiptHandle: 'receipt-handle-1', Body: 'body-1' },

consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
queueUrl: QUEUE_URL,
region: REGION,
handleMessageBatch: () =>

@@ -754,18 +825,56 @@ new Promise((resolve) => setTimeout(resolve, 75000)),

sandbox.assert.calledWith(sqs.changeMessageVisibilityBatch, {
QueueUrl: 'some-queue-url',
Entries: [
{ Id: '1', ReceiptHandle: 'receipt-handle-1', VisibilityTimeout: 40 },
{ Id: '2', ReceiptHandle: 'receipt-handle-2', VisibilityTimeout: 40 },
{ Id: '3', ReceiptHandle: 'receipt-handle-3', VisibilityTimeout: 40 }
]
});
sandbox.assert.calledWith(sqs.changeMessageVisibilityBatch, {
QueueUrl: 'some-queue-url',
Entries: [
{ Id: '1', ReceiptHandle: 'receipt-handle-1', VisibilityTimeout: 40 },
{ Id: '2', ReceiptHandle: 'receipt-handle-2', VisibilityTimeout: 40 },
{ Id: '3', ReceiptHandle: 'receipt-handle-3', VisibilityTimeout: 40 }
]
});
sandbox.assert.calledWith(
sqs.send.secondCall,
mockChangeMessageVisibilityBatch
);
sandbox.assert.match(
sqs.send.secondCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
Entries: sinon.match.array.deepEquals([
{
Id: '1',
ReceiptHandle: 'receipt-handle-1',
VisibilityTimeout: 40
},
{
Id: '2',
ReceiptHandle: 'receipt-handle-2',
VisibilityTimeout: 40
},
{
Id: '3',
ReceiptHandle: 'receipt-handle-3',
VisibilityTimeout: 40
}
])
})
);
sandbox.assert.calledWith(
sqs.send.thirdCall,
mockChangeMessageVisibilityBatch
);
sandbox.assert.match(
sqs.send.thirdCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
Entries: [
{
Id: '1',
ReceiptHandle: 'receipt-handle-1',
VisibilityTimeout: 40
},
{
Id: '2',
ReceiptHandle: 'receipt-handle-2',
VisibilityTimeout: 40
},
{
Id: '3',
ReceiptHandle: 'receipt-handle-3',
VisibilityTimeout: 40
}
]
})
);
sandbox.assert.calledOnce(clearIntervalSpy);

@@ -775,3 +884,3 @@ });

it('emit error when changing visibility timeout fails', async () => {
sqs.receiveMessage = stubResolve({
sqs.send.withArgs(mockReceiveMessage).resolves({
Messages: [

@@ -782,4 +891,4 @@ { MessageId: '1', ReceiptHandle: 'receipt-handle-1', Body: 'body-1' }

consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
queueUrl: QUEUE_URL,
region: REGION,
handleMessage: () =>

@@ -793,3 +902,3 @@ new Promise((resolve) => setTimeout(resolve, 75000)),

const receiveErr = new MockSQSError('failed');
sqs.changeMessageVisibility = stubReject(receiveErr);
sqs.send.withArgs(mockChangeMessageVisibility).rejects(receiveErr);

@@ -808,3 +917,3 @@ consumer.start();

it('emit error when changing visibility timeout fails for batch handler functions', async () => {
sqs.receiveMessage = stubResolve({
sqs.send.withArgs(mockReceiveMessage).resolves({
Messages: [

@@ -816,4 +925,4 @@ { MessageId: '1', ReceiptHandle: 'receipt-handle-1', Body: 'body-1' },

consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
queueUrl: QUEUE_URL,
region: REGION,
handleMessageBatch: () =>

@@ -828,3 +937,3 @@ new Promise((resolve) => setTimeout(resolve, 75000)),

const receiveErr = new MockSQSError('failed');
sqs.changeMessageVisibilityBatch = stubReject(receiveErr);
sqs.send.withArgs(mockChangeMessageVisibilityBatch).rejects(receiveErr);

@@ -905,7 +1014,7 @@ consumer.start();

consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
queueUrl: QUEUE_URL,
region: REGION,
handleMessage,
sqs,
authenticationErrorTimeout: 20,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
shouldDeleteMessages: false

@@ -915,3 +1024,3 @@ });

it('dont deletes the message when the handleMessage function is called', async () => {
it('do not deletes the message when the handleMessage function is called', async () => {
handleMessage.resolves();

@@ -923,5 +1032,5 @@

sandbox.assert.notCalled(sqs.deleteMessage);
sandbox.assert.neverCalledWithMatch(sqs.send, mockDeleteMessage);
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc