Socket
Socket
Sign inDemoInstall

sqs-consumer

Package Overview
Dependencies
Maintainers
1
Versions
101
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

sqs-consumer - npm Package Compare versions

Comparing version 5.4.0 to 5.5.0

.eslintignore

3

dist/bind.js

@@ -1,3 +0,4 @@

'use strict';
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.autoBind = void 0;
function isMethod(propertyName, value) {

@@ -4,0 +5,0 @@ return propertyName !== 'constructor' && typeof value === 'function';

@@ -0,4 +1,5 @@

/// <reference types="node" />
import * as SQS from 'aws-sdk/clients/sqs';
import { EventEmitter } from 'events';
declare type SQSMessage = SQS.Types.Message;
export declare type SQSMessage = SQS.Types.Message;
export interface ConsumerOptions {

@@ -15,2 +16,3 @@ queueUrl?: string;

terminateVisibilityTimeout?: boolean;
heartbeatInterval?: number;
sqs?: SQS;

@@ -22,2 +24,12 @@ region?: string;

}
interface Events {
'response_processed': [];
'empty': [];
'message_received': [SQSMessage];
'message_processed': [SQSMessage];
'error': [Error, void | SQSMessage | SQSMessage[]];
'timeout_error': [Error, SQSMessage];
'processing_error': [Error, SQSMessage];
'stopped': [];
}
export declare class Consumer extends EventEmitter {

@@ -37,5 +49,9 @@ private queueUrl;

private terminateVisibilityTimeout;
private heartbeatInterval;
private sqs;
constructor(options: ConsumerOptions);
readonly isRunning: boolean;
emit<T extends keyof Events>(event: T, ...args: Events[T]): boolean;
on<T extends keyof Events>(event: T, listener: (...args: Events[T]) => void): this;
once<T extends keyof Events>(event: T, listener: (...args: Events[T]) => void): this;
get isRunning(): boolean;
static create(options: ConsumerOptions): Consumer;

@@ -49,3 +65,3 @@ start(): void;

private executeHandler;
private terminateVisabilityTimeout;
private changeVisabilityTimeout;
private emitError;

@@ -56,4 +72,5 @@ private poll;

private executeBatchHandler;
private terminateVisabilityTimeoutBatch;
private changeVisabilityTimeoutBatch;
private startHeartbeat;
}
export {};
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.Consumer = void 0;
const SQS = require("aws-sdk/clients/sqs");

@@ -33,2 +34,5 @@ const Debug = require("debug");

}
if (options.heartbeatInterval && !(options.heartbeatInterval < options.visibilityTimeout)) {
throw new Error('heartbeatInterval must be less than visibilityTimeout.');
}
}

@@ -68,2 +72,3 @@ function isConnectionError(err) {

this.terminateVisibilityTimeout = options.terminateVisibilityTimeout || false;
this.heartbeatInterval = options.heartbeatInterval;
this.waitTimeSeconds = options.waitTimeSeconds || 20;

@@ -77,2 +82,11 @@ this.authenticationErrorTimeout = options.authenticationErrorTimeout || 10000;

}
emit(event, ...args) {
return super.emit(event, ...args);
}
on(event, listener) {
return super.on(event, listener);
}
once(event, listener) {
return super.once(event, listener);
}
get isRunning() {

@@ -116,3 +130,9 @@ return !this.stopped;

this.emit('message_received', message);
let heartbeat;
try {
if (this.heartbeatInterval) {
heartbeat = this.startHeartbeat(async (elapsedSeconds) => {
return this.changeVisabilityTimeout(message, elapsedSeconds + this.visibilityTimeout);
});
}
await this.executeHandler(message);

@@ -125,10 +145,8 @@ await this.deleteMessage(message);

if (this.terminateVisibilityTimeout) {
try {
await this.terminateVisabilityTimeout(message);
}
catch (err) {
this.emit('error', err, message);
}
await this.changeVisabilityTimeout(message, 0);
}
}
finally {
clearInterval(heartbeat);
}
}

@@ -188,10 +206,15 @@ async receiveMessage(params) {

}
async terminateVisabilityTimeout(message) {
return this.sqs
.changeMessageVisibility({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: 0
})
.promise();
async changeVisabilityTimeout(message, timeout) {
try {
return this.sqs
.changeMessageVisibility({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: timeout
})
.promise();
}
catch (err) {
this.emit('error', err, message);
}
}

@@ -243,3 +266,9 @@ emitError(err, message) {

});
let heartbeat;
try {
if (this.heartbeatInterval) {
heartbeat = this.startHeartbeat(async (elapsedSeconds) => {
return this.changeVisabilityTimeoutBatch(messages, elapsedSeconds + this.visibilityTimeout);
});
}
await this.executeBatchHandler(messages);

@@ -254,10 +283,8 @@ await this.deleteMessageBatch(messages);

if (this.terminateVisibilityTimeout) {
try {
await this.terminateVisabilityTimeoutBatch(messages);
}
catch (err) {
this.emit('error', err, messages);
}
await this.changeVisabilityTimeoutBatch(messages, 0);
}
}
finally {
clearInterval(heartbeat);
}
}

@@ -268,3 +295,3 @@ async deleteMessageBatch(messages) {

QueueUrl: this.queueUrl,
Entries: messages.map(message => ({
Entries: messages.map((message) => ({
Id: message.MessageId,

@@ -292,3 +319,3 @@ ReceiptHandle: message.ReceiptHandle

}
async terminateVisabilityTimeoutBatch(messages) {
async changeVisabilityTimeoutBatch(messages, timeout) {
const params = {

@@ -299,10 +326,22 @@ QueueUrl: this.queueUrl,

ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: 0
VisibilityTimeout: timeout
}))
};
return this.sqs
.changeMessageVisibilityBatch(params)
.promise();
try {
return this.sqs
.changeMessageVisibilityBatch(params)
.promise();
}
catch (err) {
this.emit('error', err, messages);
}
}
startHeartbeat(heartbeatFn) {
const startTime = Date.now();
return setInterval(() => {
const elapsedSeconds = Math.ceil((Date.now() - startTime) / 1000);
heartbeatFn(elapsedSeconds);
}, this.heartbeatInterval * 1000);
}
}
exports.Consumer = Consumer;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.TimeoutError = exports.SQSError = void 0;
class SQSError extends Error {

@@ -4,0 +5,0 @@ constructor(message) {

@@ -1,1 +0,1 @@

export { Consumer, ConsumerOptions } from './consumer';
export { SQSMessage, Consumer, ConsumerOptions } from './consumer';
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
var consumer_1 = require("./consumer");
exports.Consumer = consumer_1.Consumer;
Object.defineProperty(exports, "Consumer", { enumerable: true, get: function () { return consumer_1.Consumer; } });
{
"name": "sqs-consumer",
"version": "5.4.0",
"version": "5.5.0",
"description": "Build SQS-based Node applications without the boilerplate",

@@ -13,4 +13,5 @@ "main": "dist/index.js",

"pretest": "npm run build",
"test": "mocha",
"lint": "tslint --project tsconfig.json",
"test": "mocha --recursive --full-trace --exit",
"lint": "eslint . --ext .ts",
"lint:fix": "eslint . --fix",
"coverage": "nyc mocha && nyc report --reporter=html && nyc report --reporter=json-summary",

@@ -35,24 +36,28 @@ "lcov": "nyc mocha && nyc report --reporter=lcov",

"devDependencies": {
"@types/chai": "^4.1.4",
"@types/debug": "^4.1.3",
"@types/mocha": "^2.2.43",
"@types/node": "^10.12.18",
"@types/sinon": "^4.0.0",
"@types/chai": "^4.2.11",
"@types/debug": "^4.1.5",
"@types/mocha": "^7.0.2",
"@types/node": "^14.0.13",
"@types/sinon": "^9.0.4",
"@types/typescript": "^2.0.0",
"chai": "^4.2.0",
"codeclimate-test-reporter": "^0.5.1",
"mocha": "^5.2.0",
"nyc": "^14.1.1",
"p-event": "^2.1.0",
"sinon": "^7.2.2",
"ts-node": "^3.3.0",
"tslint": "^5.17.0",
"tslint-config-airbnb": "^5.3.1",
"tslint-microsoft-contrib": "^5.0.3",
"typescript": "^2.6.1"
"eslint": "^7.2.0",
"eslint-config-iplayer-ts": "^2.0.0",
"mocha": "^8.0.1",
"nyc": "^15.1.0",
"p-event": "^4.2.0",
"sinon": "^9.0.2",
"ts-node": "^8.10.2",
"tslint-config-airbnb": "^5.11.2",
"tslint-microsoft-contrib": "^6.2.0",
"typescript": "^3.9.5",
"aws-sdk": "^2.699.0"
},
"dependencies": {
"aws-sdk": "^2.443.0",
"debug": "^4.1.1"
},
"peerDependencies": {
"aws-sdk": "^2.699.0"
},
"nyc": {

@@ -70,3 +75,14 @@ "include": [

"instrument": true
},
"eslintConfig": {
"extends": "iplayer-ts",
"parserOptions": {
"ecmaVersion": 2017,
"sourceType": "module"
}
},
"mocha": {
"spec": "test/**/**/*.ts",
"require": "ts-node/register"
}
}

@@ -43,3 +43,32 @@ # sqs-consumer

* By default messages are processed one at a time – a new message won't be received until the first one has been processed. To process messages in parallel, use the `batchSize` option [detailed below](#options).
* By default, the default Node.js HTTP/HTTPS SQS agent creates a new TCP connection for every new request ([AWS SQS documentation](https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/node-reusing-connections.html)). To avoid the cost of establishing a new connection, you can reuse an existing connection by passing a new SQS instance with `keepAlive: true`.
```js
const { Consumer } = require('sqs-consumer');
const AWS = require('aws-sdk');
const app = Consumer.create({
queueUrl: 'https://sqs.eu-west-1.amazonaws.com/account-id/queue-name',
handleMessage: async (message) => {
// do some work with `message`
},
sqs: new AWS.SQS({
httpOptions: {
agent: new https.Agent({
keepAlive: true
})
}
})
});
app.on('error', (err) => {
console.error(err.message);
});
app.on('processing_error', (err) => {
console.error(err.message);
});
app.start();
```
### Credentials

@@ -102,3 +131,3 @@

* `handleMessageBatch` - _Function_ - An `async` function (or function that returns a `Promise`) to be called whenever a batch of messages is received. Similar to `handleMessage` but will receive the list of messages, not each message individually. **If both are set, `handleMessageBatch` overrides `handleMessage`**.
* `handleMessageTimeout` - _String_ - Time in ms to wait for `handleMessage` to process a message before timing out. Emits `timeout_error` on timeout. By default, if `handleMessage` times out, the unprocessed message returns to the end of the queue.
* `handleMessageTimeout` - _Number_ - Time in ms to wait for `handleMessage` to process a message before timing out. Emits `timeout_error` on timeout. By default, if `handleMessage` times out, the unprocessed message returns to the end of the queue.
* `attributeNames` - _Array_ - List of queue attributes to retrieve (i.e. `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`).

@@ -108,2 +137,3 @@ * `messageAttributeNames` - _Array_ - List of message attributes to retrieve (i.e. `['name', 'address']`).

* `visibilityTimeout` - _Number_ - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request.
* `heartbeatInterval` - _Number_ - The interval (in seconds) between requests to extend the message visibility timeout. On each heartbeat the visibility is extended by adding `visibilityTimeout` to the number of seconds since the start of the handler function. This value must less than `visibilityTimeout`.
* `terminateVisibilityTimeout` - _Boolean_ - If true, sets the message visibility timeout to 0 after a `processing_error` (defaults to `false`).

@@ -148,2 +178,2 @@ * `waitTimeSeconds` - _Number_ - The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning.

### Contributing
See contributing [guildlines](https://github.com/bbc/sqs-consumer/blob/master/CONTRIBUTING.md)
See contributing [guidelines](https://github.com/bbc/sqs-consumer/blob/master/CONTRIBUTING.md)

@@ -1,3 +0,1 @@

'use strict';
function isMethod(propertyName: string, value: any): boolean {

@@ -4,0 +2,0 @@ return propertyName !== 'constructor' && typeof value === 'function';

@@ -12,4 +12,4 @@ import { AWSError } from 'aws-sdk';

type ReceieveMessageResponse = PromiseResult<SQS.Types.ReceiveMessageResult, AWSError>;
type SQSMessage = SQS.Types.Message;
type ReceiveMessageRequest = SQS.Types.ReceiveMessageRequest;
export type SQSMessage = SQS.Types.Message;

@@ -48,5 +48,9 @@ const requiredOptions = [

}
if (options.heartbeatInterval && !(options.heartbeatInterval < options.visibilityTimeout)) {
throw new Error('heartbeatInterval must be less than visibilityTimeout.');
}
}
function isConnectionError(err: Error): Boolean {
function isConnectionError(err: Error): boolean {
if (err instanceof SQSError) {

@@ -85,2 +89,3 @@ return (err.statusCode === 403 || err.code === 'CredentialsError' || err.code === 'UnknownEndpoint');

terminateVisibilityTimeout?: boolean;
heartbeatInterval?: number;
sqs?: SQS;

@@ -93,2 +98,13 @@ region?: string;

interface Events {
'response_processed': [];
'empty': [];
'message_received': [SQSMessage];
'message_processed': [SQSMessage];
'error': [Error, void | SQSMessage | SQSMessage[]];
'timeout_error': [Error, SQSMessage];
'processing_error': [Error, SQSMessage];
'stopped': [];
}
export class Consumer extends EventEmitter {

@@ -108,2 +124,3 @@ private queueUrl: string;

private terminateVisibilityTimeout: boolean;
private heartbeatInterval: number;
private sqs: SQS;

@@ -124,2 +141,3 @@

this.terminateVisibilityTimeout = options.terminateVisibilityTimeout || false;
this.heartbeatInterval = options.heartbeatInterval;
this.waitTimeSeconds = options.waitTimeSeconds || 20;

@@ -136,2 +154,14 @@ this.authenticationErrorTimeout = options.authenticationErrorTimeout || 10000;

emit<T extends keyof Events>(event: T, ...args: Events[T]) {
return super.emit(event, ...args);
}
on<T extends keyof Events>(event: T, listener: (...args: Events[T]) => void): this {
return super.on(event, listener);
}
once<T extends keyof Events>(event: T, listener: (...args: Events[T]) => void): this {
return super.once(event, listener);
}
public get isRunning(): boolean {

@@ -180,3 +210,9 @@ return !this.stopped;

let heartbeat;
try {
if (this.heartbeatInterval) {
heartbeat = this.startHeartbeat(async (elapsedSeconds) => {
return this.changeVisabilityTimeout(message, elapsedSeconds + this.visibilityTimeout);
});
}
await this.executeHandler(message);

@@ -189,8 +225,6 @@ await this.deleteMessage(message);

if (this.terminateVisibilityTimeout) {
try {
await this.terminateVisabilityTimeout(message);
} catch (err) {
this.emit('error', err, message);
}
await this.changeVisabilityTimeout(message, 0);
}
} finally {
clearInterval(heartbeat);
}

@@ -251,10 +285,14 @@ }

private async terminateVisabilityTimeout(message: SQSMessage): Promise<PromiseResult<any, AWSError>> {
return this.sqs
.changeMessageVisibility({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: 0
})
.promise();
private async changeVisabilityTimeout(message: SQSMessage, timeout: number): Promise<PromiseResult<any, AWSError>> {
try {
return this.sqs
.changeMessageVisibility({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: timeout
})
.promise();
} catch (err) {
this.emit('error', err, message);
}
}

@@ -299,3 +337,3 @@

}).then(() => {
setTimeout(this.poll, currentPollingTimeout);
setTimeout(this.poll, currentPollingTimeout);
}).catch((err) => {

@@ -311,3 +349,9 @@ this.emit('error', err);

let heartbeat;
try {
if (this.heartbeatInterval) {
heartbeat = this.startHeartbeat(async (elapsedSeconds) => {
return this.changeVisabilityTimeoutBatch(messages, elapsedSeconds + this.visibilityTimeout);
});
}
await this.executeBatchHandler(messages);

@@ -322,8 +366,6 @@ await this.deleteMessageBatch(messages);

if (this.terminateVisibilityTimeout) {
try {
await this.terminateVisabilityTimeoutBatch(messages);
} catch (err) {
this.emit('error', err, messages);
}
await this.changeVisabilityTimeoutBatch(messages, 0);
}
} finally {
clearInterval(heartbeat);
}

@@ -337,3 +379,3 @@ }

QueueUrl: this.queueUrl,
Entries: messages.map(message => ({
Entries: messages.map((message) => ({
Id: message.MessageId,

@@ -362,3 +404,3 @@ ReceiptHandle: message.ReceiptHandle

private async terminateVisabilityTimeoutBatch(messages: SQSMessage[]): Promise<PromiseResult<any, AWSError>> {
private async changeVisabilityTimeoutBatch(messages: SQSMessage[], timeout: number): Promise<PromiseResult<any, AWSError>> {
const params = {

@@ -369,10 +411,21 @@ QueueUrl: this.queueUrl,

ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: 0
VisibilityTimeout: timeout
}))
};
return this.sqs
.changeMessageVisibilityBatch(params)
.promise();
try {
return this.sqs
.changeMessageVisibilityBatch(params)
.promise();
} catch (err) {
this.emit('error', err, messages);
}
}
private startHeartbeat(heartbeatFn: (elapsedSeconds: number) => void): NodeJS.Timeout {
const startTime = Date.now();
return setInterval(() => {
const elapsedSeconds = Math.ceil((Date.now() - startTime) / 1000);
heartbeatFn(elapsedSeconds);
}, this.heartbeatInterval * 1000);
}
}

@@ -16,3 +16,3 @@ class SQSError extends Error {

class TimeoutError extends Error {
constructor(message: string = 'Operation timed out.') {
constructor(message = 'Operation timed out.') {
super(message);

@@ -19,0 +19,0 @@ this.message = message;

@@ -1,1 +0,1 @@

export { Consumer, ConsumerOptions } from './consumer';
export { SQSMessage, Consumer, ConsumerOptions } from './consumer';

@@ -34,2 +34,3 @@ import { assert } from 'chai';

super(message);
this.message = message;
}

@@ -41,2 +42,3 @@ }

let consumer;
let clock;
let handleMessage;

@@ -54,2 +56,3 @@ let handleMessageBatch;

beforeEach(() => {
clock = sinon.useFakeTimers();
handleMessage = sandbox.stub().resolves(null);

@@ -62,2 +65,3 @@ handleMessageBatch = sandbox.stub().resolves(null);

sqs.changeMessageVisibility = stubResolve();
sqs.changeMessageVisibilityBatch = stubResolve();

@@ -118,2 +122,25 @@ consumer = new Consumer({

it('requires visibilityTimeout to be set with heartbeatInterval', () => {
assert.throws(() => {
new Consumer({
region: 'some-region',
queueUrl: 'some-queue-url',
handleMessage,
heartbeatInterval: 30
});
});
});
it('requires heartbeatInterval to be less than visibilityTimeout', () => {
assert.throws(() => {
new Consumer({
region: 'some-region',
queueUrl: 'some-queue-url',
handleMessage,
heartbeatInterval: 30,
visibilityTimeout: 30
});
});
});
describe('.create', () => {

@@ -142,3 +169,3 @@ it('creates a new instance of a Consumer object', () => {

const err = await pEvent(consumer, 'error');
const err: any = await pEvent(consumer, 'error');

@@ -162,3 +189,3 @@ consumer.stop();

consumer.start();
const err = await pEvent(consumer, 'error');
const err: any = await pEvent(consumer, 'error');
consumer.stop();

@@ -188,3 +215,3 @@

consumer.start();
const err = await pEvent(consumer, 'timeout_error');
const [err]: any = await Promise.all([pEvent(consumer, 'timeout_error'), clock.tickAsync(handleMessageTimeout)]);
consumer.stop();

@@ -208,3 +235,3 @@

consumer.start();
const err = await pEvent(consumer, 'processing_error');
const err: any = await pEvent(consumer, 'processing_error');
consumer.stop();

@@ -223,3 +250,3 @@

consumer.start();
const err = await pEvent(consumer, 'error');
const err: any = await pEvent(consumer, 'error');
consumer.stop();

@@ -265,17 +292,11 @@

sqs.receiveMessage = stubReject(credentialsErr);
const errorListener = sandbox.stub();
consumer.on('error', errorListener);
return new Promise((resolve) => {
const timings = [];
const errorListener = sandbox.stub().callsFake(() => timings.push(new Date()));
consumer.start();
await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT);
consumer.stop();
errorListener.onThirdCall().callsFake(() => {
consumer.stop();
sandbox.assert.calledThrice(sqs.receiveMessage);
assert.isAtLeast(timings[1] - timings[0], AUTHENTICATION_ERROR_TIMEOUT);
resolve();
});
consumer.on('error', errorListener);
consumer.start();
});
sandbox.assert.calledTwice(errorListener);
sandbox.assert.calledTwice(sqs.receiveMessage);
});

@@ -289,17 +310,11 @@

sqs.receiveMessage = stubReject(invalidSignatureErr);
const errorListener = sandbox.stub();
consumer.on('error', errorListener);
return new Promise((resolve) => {
const timings = [];
const errorListener = sandbox.stub().callsFake(() => timings.push(new Date()));
consumer.start();
await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT);
consumer.stop();
errorListener.onThirdCall().callsFake(() => {
consumer.stop();
sandbox.assert.calledThrice(sqs.receiveMessage);
assert.isAtLeast(timings[1] - timings[0], AUTHENTICATION_ERROR_TIMEOUT);
resolve();
});
consumer.on('error', errorListener);
consumer.start();
});
sandbox.assert.calledTwice(errorListener);
sandbox.assert.calledTwice(sqs.receiveMessage);
});

@@ -313,17 +328,11 @@

sqs.receiveMessage = stubReject(unknownEndpointErr);
const errorListener = sandbox.stub();
consumer.on('error', errorListener);
return new Promise((resolve) => {
const timings = [];
const errorListener = sandbox.stub().callsFake(() => timings.push(new Date()));
consumer.start();
await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT);
consumer.stop();
errorListener.onThirdCall().callsFake(() => {
consumer.stop();
sandbox.assert.calledThrice(sqs.receiveMessage);
assert.isAtLeast(timings[1] - timings[0], AUTHENTICATION_ERROR_TIMEOUT);
resolve();
});
consumer.on('error', errorListener);
consumer.start();
});
sandbox.assert.calledTwice(errorListener);
sandbox.assert.calledTwice(sqs.receiveMessage);
});

@@ -340,16 +349,8 @@

});
return new Promise((resolve) => {
const timings = [];
const timeListener = sandbox.stub().callsFake(() => timings.push(new Date()));
timeListener.onThirdCall().callsFake(() => {
consumer.stop();
sandbox.assert.calledThrice(sqs.receiveMessage);
assert.isAtLeast(timings[1] - timings[0], POLLING_TIMEOUT);
resolve();
});
consumer.start();
await clock.tickAsync(POLLING_TIMEOUT);
consumer.stop();
consumer.on('message_received', timeListener);
consumer.start();
});
sandbox.assert.calledTwice(sqs.receiveMessage);
});

@@ -409,9 +410,7 @@

return new Promise((resolve) => {
handleMessage.onSecondCall().callsFake(() => {
consumer.stop();
resolve();
});
consumer.start();
});
consumer.start();
await clock.runToLastAsync();
consumer.stop();
sandbox.assert.calledTwice(handleMessage);
});

@@ -641,2 +640,72 @@

});
it('extends visibility timeout for long running handler functions', async () => {
consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
handleMessage: () => new Promise((resolve) => setTimeout(resolve, 75000)),
sqs,
visibilityTimeout: 40,
heartbeatInterval: 30
});
const clearIntervalSpy = sinon.spy(global, 'clearInterval');
consumer.start();
await Promise.all([pEvent(consumer, 'response_processed'), clock.tickAsync(75000)]);
consumer.stop();
sandbox.assert.calledWith(sqs.changeMessageVisibility, {
QueueUrl: 'some-queue-url',
ReceiptHandle: 'receipt-handle',
VisibilityTimeout: 70
});
sandbox.assert.calledWith(sqs.changeMessageVisibility, {
QueueUrl: 'some-queue-url',
ReceiptHandle: 'receipt-handle',
VisibilityTimeout: 100
});
sandbox.assert.calledOnce(clearIntervalSpy);
});
it('extends visibility timeout for long running batch handler functions', async () => {
sqs.receiveMessage = stubResolve({
Messages: [
{ MessageId: '1', ReceiptHandle: 'receipt-handle-1', Body: 'body-1' },
{ MessageId: '2', ReceiptHandle: 'receipt-handle-2', Body: 'body-2' },
{ MessageId: '3', ReceiptHandle: 'receipt-handle-3', Body: 'body-3' }
]
});
consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
handleMessageBatch: () => new Promise((resolve) => setTimeout(resolve, 75000)),
batchSize: 3,
sqs,
visibilityTimeout: 40,
heartbeatInterval: 30
});
const clearIntervalSpy = sinon.spy(global, 'clearInterval');
consumer.start();
await Promise.all([pEvent(consumer, 'response_processed'), clock.tickAsync(75000)]);
consumer.stop();
sandbox.assert.calledWith(sqs.changeMessageVisibilityBatch, {
QueueUrl: 'some-queue-url',
Entries: [
{ Id: '1', ReceiptHandle: 'receipt-handle-1', VisibilityTimeout: 70 },
{ Id: '2', ReceiptHandle: 'receipt-handle-2', VisibilityTimeout: 70 },
{ Id: '3', ReceiptHandle: 'receipt-handle-3', VisibilityTimeout: 70 }
]
});
sandbox.assert.calledWith(sqs.changeMessageVisibilityBatch, {
QueueUrl: 'some-queue-url',
Entries: [
{ Id: '1', ReceiptHandle: 'receipt-handle-1', VisibilityTimeout: 100 },
{ Id: '2', ReceiptHandle: 'receipt-handle-2', VisibilityTimeout: 100 },
{ Id: '3', ReceiptHandle: 'receipt-handle-3', VisibilityTimeout: 100 }
]
});
sandbox.assert.calledOnce(clearIntervalSpy);
});
});

@@ -649,3 +718,3 @@

await pEvent(consumer, 'stopped');
await Promise.all([pEvent(consumer, 'stopped'), clock.runAllAsync()]);

@@ -658,3 +727,3 @@ sandbox.assert.calledOnce(handleMessage);

consumer.stop();
await pEvent(consumer, 'stopped');
await Promise.all([pEvent(consumer, 'stopped'), clock.runAllAsync()]);
});

@@ -667,29 +736,23 @@

return new Promise((resolve) => {
consumer.start();
consumer.stop();
consumer.stop();
consumer.stop();
consumer.start();
consumer.stop();
consumer.stop();
consumer.stop();
await clock.runAllAsync();
setTimeout(() => {
sandbox.assert.calledOnce(handleStop);
resolve();
}, 10);
});
sandbox.assert.calledOnce(handleStop);
});
it('fires a stopped event a second time if started and stopped twice', async () => {
return new Promise((resolve) => {
const handleStop = sandbox.stub().returns(null).onSecondCall().callsFake(() => {
sandbox.assert.calledTwice(handleStop);
resolve();
});
const handleStop = sandbox.stub().returns(null);
consumer.on('stopped', handleStop);
consumer.on('stopped', handleStop);
consumer.start();
consumer.stop();
consumer.start();
consumer.stop();
});
consumer.start();
consumer.stop();
consumer.start();
consumer.stop();
await clock.runAllAsync();
sandbox.assert.calledTwice(handleStop);
});

@@ -696,0 +759,0 @@ });

@@ -19,2 +19,2 @@ {

]
}
}

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc