Socket
Socket
Sign inDemoInstall

sqs-consumer

Package Overview
Dependencies
Maintainers
2
Versions
101
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

sqs-consumer - npm Package Compare versions

Comparing version 6.1.0 to 6.2.0

.github/workflows/docs.yml

4

dist/bind.d.ts

@@ -0,1 +1,5 @@

/**
* Auto binds the provided properties
* @param obj an object containing the available properties
*/
export declare function autoBind(obj: object): void;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.autoBind = void 0;
/**
* Determines if the property is a method
* @param propertyName the name of the property
* @param value the value of the property
*/
function isMethod(propertyName, value) {
return propertyName !== 'constructor' && typeof value === 'function';
}
/**
* Auto binds the provided properties
* @param obj an object containing the available properties
*/
function autoBind(obj) {

@@ -8,0 +17,0 @@ const propertyNames = Object.getOwnPropertyNames(obj.constructor.prototype);

@@ -1,42 +0,111 @@

/// <reference types="node" />
import { EventEmitter } from 'events';
import { ConsumerOptions, Events } from './types';
export declare class Consumer extends EventEmitter {
import { ConsumerOptions, TypedEventEmitter } from './types';
/**
* [Usage](https://bbc.github.io/sqs-consumer/index.html#usage)
*/
export declare class Consumer extends TypedEventEmitter {
private pollingTimeoutId;
private heartbeatTimeoutId;
private handleMessageTimeoutId;
private stopped;
private queueUrl;
private handleMessage;
private handleMessageBatch;
private sqs;
private handleMessageTimeout;
private attributeNames;
private messageAttributeNames;
private stopped;
private shouldDeleteMessages;
private batchSize;
private visibilityTimeout;
private terminateVisibilityTimeout;
private waitTimeSeconds;
private authenticationErrorTimeout;
private pollingWaitTimeMs;
private terminateVisibilityTimeout;
private heartbeatInterval;
private sqs;
private shouldDeleteMessages;
constructor(options: ConsumerOptions);
emit<T extends keyof Events>(event: T, ...args: Events[T]): boolean;
on<T extends keyof Events>(event: T, listener: (...args: Events[T]) => void): this;
once<T extends keyof Events>(event: T, listener: (...args: Events[T]) => void): this;
get isRunning(): boolean;
/**
* Creates a new SQS consumer.
*/
static create(options: ConsumerOptions): Consumer;
/**
* Start polling the queue for messages.
*/
start(): void;
/**
* Stop polling the queue for messages (pre existing requests will still be made until concluded).
*/
stop(): void;
/**
* Returns the current polling state of the consumer: `true` if it is actively polling, `false` if it is not.
*/
get isRunning(): boolean;
/**
* Emit one of the consumer's error events depending on the error received.
* @param err The error object to forward on
* @param message The message that the error occurred on
*/
private emitError;
/**
* Poll for new messages from SQS
*/
private poll;
/**
* Send a request to SQS to retrieve messages
* @param params The required params to receive messages from SQS
*/
private receiveMessage;
/**
* Handles the response from AWS SQS, determining if we should proceed to
* the message handler.
* @param response The output from AWS SQS
*/
private handleSqsResponse;
/**
* Process a message that has been received from SQS. This will execute the message
* handler and delete the message once complete.
* @param message The message that was delivered from SQS
*/
private processMessage;
private receiveMessage;
/**
* Process a batch of messages from the SQS queue.
* @param messages The messages that were delivered from SQS
*/
private processMessageBatch;
/**
* Trigger a function on a set interval
* @param heartbeatFn The function that should be triggered
*/
private startHeartbeat;
/**
* Change the visibility timeout on a message
* @param message The message to change the value of
* @param timeout The new timeout that should be set
*/
private changeVisibilityTimeout;
/**
* Change the visibility timeout on a batch of messages
* @param messages The messages to change the value of
* @param timeout The new timeout that should be set
*/
private changeVisibilityTimeoutBatch;
/**
* Trigger the applications handleMessage function
* @param message The message that was received from SQS
*/
private executeHandler;
/**
* Execute the application's message batch handler
* @param messages The messages that should be forwarded from the SQS queue
*/
private executeBatchHandler;
/**
* Delete a single message from SQS
* @param message The message to delete from the SQS queue
*/
private deleteMessage;
private executeHandler;
private changeVisibilityTimeout;
private emitError;
private poll;
private processMessageBatch;
/**
* Delete a batch of messages from the SQS queue.
* @param messages The messages that should be deleted from SQS
*/
private deleteMessageBatch;
private executeBatchHandler;
private changeVisibilityTimeoutBatch;
private startHeartbeat;
}

461

dist/consumer.js

@@ -6,63 +6,19 @@ "use strict";

const debug_1 = require("debug");
const events_1 = require("events");
const types_1 = require("./types");
const bind_1 = require("./bind");
const errors_1 = require("./errors");
const validation_1 = require("./validation");
const debug = (0, debug_1.default)('sqs-consumer');
const requiredOptions = [
'queueUrl',
// only one of handleMessage / handleMessagesBatch is required
'handleMessage|handleMessageBatch'
];
function createTimeout(duration) {
let timeout;
const pending = new Promise((_, reject) => {
timeout = setTimeout(() => {
reject(new errors_1.TimeoutError());
}, duration);
});
return [timeout, pending];
}
function assertOptions(options) {
requiredOptions.forEach((option) => {
const possibilities = option.split('|');
if (!possibilities.find((p) => options[p])) {
throw new Error(`Missing SQS consumer option [ ${possibilities.join(' or ')} ].`);
}
});
if (options.batchSize > 10 || options.batchSize < 1) {
throw new Error('SQS batchSize option must be between 1 and 10.');
}
if (options.heartbeatInterval &&
!(options.heartbeatInterval < options.visibilityTimeout)) {
throw new Error('heartbeatInterval must be less than visibilityTimeout.');
}
}
function isConnectionError(err) {
if (err instanceof errors_1.SQSError) {
return (err.statusCode === 403 ||
err.code === 'CredentialsError' ||
err.code === 'UnknownEndpoint' ||
err.code === 'AWS.SimpleQueueService.NonExistentQueue');
}
return false;
}
function toSQSError(err, message) {
var _a, _b;
const sqsError = new errors_1.SQSError(message);
sqsError.code = err.name;
sqsError.statusCode = (_a = err.$metadata) === null || _a === void 0 ? void 0 : _a.httpStatusCode;
sqsError.retryable = (_b = err.$retryable) === null || _b === void 0 ? void 0 : _b.throttling;
sqsError.service = err.$service;
sqsError.fault = err.$fault;
sqsError.time = new Date();
return sqsError;
}
function hasMessages(response) {
return response.Messages && response.Messages.length > 0;
}
class Consumer extends events_1.EventEmitter {
/**
* [Usage](https://bbc.github.io/sqs-consumer/index.html#usage)
*/
class Consumer extends types_1.TypedEventEmitter {
constructor(options) {
var _a, _b, _c, _d;
super();
assertOptions(options);
this.pollingTimeoutId = undefined;
this.heartbeatTimeoutId = undefined;
this.handleMessageTimeoutId = undefined;
this.stopped = true;
(0, validation_1.assertOptions)(options);
this.queueUrl = options.queueUrl;

@@ -74,3 +30,2 @@ this.handleMessage = options.handleMessage;

this.messageAttributeNames = options.messageAttributeNames || [];
this.stopped = true;
this.batchSize = options.batchSize || 1;

@@ -93,17 +48,11 @@ this.visibilityTimeout = options.visibilityTimeout;

}
emit(event, ...args) {
return super.emit(event, ...args);
}
on(event, listener) {
return super.on(event, listener);
}
once(event, listener) {
return super.once(event, listener);
}
get isRunning() {
return !this.stopped;
}
/**
* Creates a new SQS consumer.
*/
static create(options) {
return new Consumer(options);
}
/**
* Start polling the queue for messages.
*/
start() {

@@ -116,113 +65,34 @@ if (this.stopped) {

}
/**
* Stop polling the queue for messages (pre existing requests will still be made until concluded).
*/
stop() {
if (this.stopped) {
debug('Consumer was already stopped');
return;
}
debug('Stopping consumer');
this.stopped = true;
}
async handleSqsResponse(response) {
debug('Received SQS response');
debug(response);
if (response) {
if (hasMessages(response)) {
if (this.handleMessageBatch) {
// prefer handling messages in batch when available
await this.processMessageBatch(response.Messages);
}
else {
await Promise.all(response.Messages.map(this.processMessage));
}
this.emit('response_processed');
}
else {
this.emit('empty');
}
if (this.pollingTimeoutId) {
clearTimeout(this.pollingTimeoutId);
this.pollingTimeoutId = undefined;
}
this.emit('stopped');
}
async processMessage(message) {
this.emit('message_received', message);
let heartbeat;
try {
if (this.heartbeatInterval) {
heartbeat = this.startHeartbeat(async () => {
return this.changeVisibilityTimeout(message, this.visibilityTimeout);
});
}
await this.executeHandler(message);
await this.deleteMessage(message);
this.emit('message_processed', message);
}
catch (err) {
this.emitError(err, message);
if (this.terminateVisibilityTimeout) {
await this.changeVisibilityTimeout(message, 0);
}
}
finally {
clearInterval(heartbeat);
}
/**
* Returns the current polling state of the consumer: `true` if it is actively polling, `false` if it is not.
*/
get isRunning() {
return !this.stopped;
}
async receiveMessage(params) {
try {
return await this.sqs.send(new client_sqs_1.ReceiveMessageCommand(params));
/**
* Emit one of the consumer's error events depending on the error received.
* @param err The error object to forward on
* @param message The message that the error occurred on
*/
emitError(err, message) {
if (!message) {
this.emit('error', err);
}
catch (err) {
throw toSQSError(err, `SQS receive message failed: ${err.message}`);
}
}
async deleteMessage(message) {
if (!this.shouldDeleteMessages) {
debug('Skipping message delete since shouldDeleteMessages is set to false');
return;
}
debug('Deleting message %s', message.MessageId);
const deleteParams = {
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle
};
try {
await this.sqs.send(new client_sqs_1.DeleteMessageCommand(deleteParams));
}
catch (err) {
throw toSQSError(err, `SQS delete message failed: ${err.message}`);
}
}
async executeHandler(message) {
let timeout;
let pending;
try {
if (this.handleMessageTimeout) {
[timeout, pending] = createTimeout(this.handleMessageTimeout);
await Promise.race([this.handleMessage(message), pending]);
}
else {
await this.handleMessage(message);
}
}
catch (err) {
if (err instanceof errors_1.TimeoutError) {
err.message = `Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`;
}
else if (err instanceof Error) {
err.message = `Unexpected message handler failure: ${err.message}`;
}
throw err;
}
finally {
clearTimeout(timeout);
}
}
async changeVisibilityTimeout(message, timeout) {
try {
const input = {
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: timeout
};
return await this.sqs.send(new client_sqs_1.ChangeMessageVisibilityCommand(input));
}
catch (err) {
this.emit('error', toSQSError(err, `Error changing visibility timeout: ${err.message}`), message);
}
}
emitError(err, message) {
if (err.name === errors_1.SQSError.name) {
else if (err.name === errors_1.SQSError.name) {
this.emit('error', err, message);

@@ -237,9 +107,13 @@ }

}
/**
* Poll for new messages from SQS
*/
poll() {
if (this.stopped) {
this.emit('stopped');
debug('Poll was called while consumer was stopped, cancelling poll...');
return;
}
debug('Polling for messages');
const receiveParams = {
let currentPollingTimeout = this.pollingWaitTimeMs;
this.receiveMessage({
QueueUrl: this.queueUrl,

@@ -251,9 +125,7 @@ AttributeNames: this.attributeNames,

VisibilityTimeout: this.visibilityTimeout
};
let currentPollingTimeout = this.pollingWaitTimeMs;
this.receiveMessage(receiveParams)
})
.then(this.handleSqsResponse)
.catch((err) => {
this.emit('error', err);
if (isConnectionError(err)) {
this.emitError(err);
if ((0, errors_1.isConnectionError)(err)) {
debug('There was an authentication error. Pausing before retrying.');

@@ -265,26 +137,89 @@ currentPollingTimeout = this.authenticationErrorTimeout;

.then(() => {
setTimeout(this.poll, currentPollingTimeout);
if (this.pollingTimeoutId) {
clearTimeout(this.pollingTimeoutId);
}
this.pollingTimeoutId = setTimeout(this.poll, currentPollingTimeout);
})
.catch((err) => {
this.emit('error', err);
this.emitError(err);
});
}
/**
* Send a request to SQS to retrieve messages
* @param params The required params to receive messages from SQS
*/
async receiveMessage(params) {
try {
return await this.sqs.send(new client_sqs_1.ReceiveMessageCommand(params));
}
catch (err) {
throw (0, errors_1.toSQSError)(err, `SQS receive message failed: ${err.message}`);
}
}
/**
* Handles the response from AWS SQS, determining if we should proceed to
* the message handler.
* @param response The output from AWS SQS
*/
async handleSqsResponse(response) {
if ((0, validation_1.hasMessages)(response)) {
if (this.handleMessageBatch) {
await this.processMessageBatch(response.Messages);
}
else {
await Promise.all(response.Messages.map(this.processMessage));
}
this.emit('response_processed');
}
else if (response) {
this.emit('empty');
}
}
/**
* Process a message that has been received from SQS. This will execute the message
* handler and delete the message once complete.
* @param message The message that was delivered from SQS
*/
async processMessage(message) {
try {
this.emit('message_received', message);
if (this.heartbeatInterval) {
this.heartbeatTimeoutId = this.startHeartbeat(message);
}
const ackedMessage = await this.executeHandler(message);
if ((ackedMessage === null || ackedMessage === void 0 ? void 0 : ackedMessage.MessageId) === message.MessageId) {
await this.deleteMessage(message);
this.emit('message_processed', message);
}
}
catch (err) {
this.emitError(err, message);
if (this.terminateVisibilityTimeout) {
await this.changeVisibilityTimeout(message, 0);
}
}
finally {
clearInterval(this.heartbeatTimeoutId);
this.heartbeatTimeoutId = undefined;
}
}
/**
* Process a batch of messages from the SQS queue.
* @param messages The messages that were delivered from SQS
*/
async processMessageBatch(messages) {
messages.forEach((message) => {
this.emit('message_received', message);
});
let heartbeat;
try {
messages.forEach((message) => {
this.emit('message_received', message);
});
if (this.heartbeatInterval) {
heartbeat = this.startHeartbeat(async () => {
return this.changeVisibilityTimeoutBatch(messages, this.visibilityTimeout);
});
this.heartbeatTimeoutId = this.startHeartbeat(null, messages);
}
const ackedMessages = await this.executeBatchHandler(messages);
if (ackedMessages.length > 0) {
if ((ackedMessages === null || ackedMessages === void 0 ? void 0 : ackedMessages.length) > 0) {
await this.deleteMessageBatch(ackedMessages);
ackedMessages.forEach((message) => {
this.emit('message_processed', message);
});
}
ackedMessages.forEach((message) => {
this.emit('message_processed', message);
});
}

@@ -298,32 +233,100 @@ catch (err) {

finally {
clearInterval(heartbeat);
clearInterval(this.heartbeatTimeoutId);
this.heartbeatTimeoutId = undefined;
}
}
async deleteMessageBatch(messages) {
if (!this.shouldDeleteMessages) {
debug('Skipping message delete since shouldDeleteMessages is set to false');
return;
/**
* Trigger a function on a set interval
* @param heartbeatFn The function that should be triggered
*/
startHeartbeat(message, messages) {
return setInterval(() => {
if (this.handleMessageBatch) {
return this.changeVisibilityTimeoutBatch(messages, this.visibilityTimeout);
}
else {
return this.changeVisibilityTimeout(message, this.visibilityTimeout);
}
}, this.heartbeatInterval * 1000);
}
/**
* Change the visibility timeout on a message
* @param message The message to change the value of
* @param timeout The new timeout that should be set
*/
async changeVisibilityTimeout(message, timeout) {
try {
const input = {
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: timeout
};
return await this.sqs.send(new client_sqs_1.ChangeMessageVisibilityCommand(input));
}
debug('Deleting messages %s', messages.map((msg) => msg.MessageId).join(' ,'));
const deleteParams = {
catch (err) {
this.emit('error', (0, errors_1.toSQSError)(err, `Error changing visibility timeout: ${err.message}`), message);
}
}
/**
* Change the visibility timeout on a batch of messages
* @param messages The messages to change the value of
* @param timeout The new timeout that should be set
*/
async changeVisibilityTimeoutBatch(messages, timeout) {
const params = {
QueueUrl: this.queueUrl,
Entries: messages.map((message) => ({
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: timeout
}))
};
try {
await this.sqs.send(new client_sqs_1.DeleteMessageBatchCommand(deleteParams));
return await this.sqs.send(new client_sqs_1.ChangeMessageVisibilityBatchCommand(params));
}
catch (err) {
throw toSQSError(err, `SQS delete message failed: ${err.message}`);
this.emit('error', (0, errors_1.toSQSError)(err, `Error changing visibility timeout: ${err.message}`), messages);
}
}
/**
* Trigger the applications handleMessage function
* @param message The message that was received from SQS
*/
async executeHandler(message) {
try {
let result;
if (this.handleMessageTimeout) {
const pending = new Promise((_, reject) => {
this.handleMessageTimeoutId = setTimeout(() => {
reject(new errors_1.TimeoutError());
}, this.handleMessageTimeout);
});
result = await Promise.race([this.handleMessage(message), pending]);
}
else {
result = await this.handleMessage(message);
}
return result instanceof Object ? result : message;
}
catch (err) {
err.message =
err instanceof errors_1.TimeoutError
? `Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`
: `Unexpected message handler failure: ${err.message}`;
throw err;
}
finally {
if (this.handleMessageTimeoutId) {
clearTimeout(this.handleMessageTimeoutId);
}
}
}
/**
* Execute the application's message batch handler
* @param messages The messages that should be forwarded from the SQS queue
*/
async executeBatchHandler(messages) {
try {
const result = await this.handleMessageBatch(messages);
if (result instanceof Object) {
return result;
}
return messages;
return result instanceof Object ? result : messages;
}

@@ -335,24 +338,48 @@ catch (err) {

}
async changeVisibilityTimeoutBatch(messages, timeout) {
const params = {
/**
* Delete a single message from SQS
* @param message The message to delete from the SQS queue
*/
async deleteMessage(message) {
if (!this.shouldDeleteMessages) {
debug('Skipping message delete since shouldDeleteMessages is set to false');
return;
}
debug('Deleting message %s', message.MessageId);
const deleteParams = {
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle
};
try {
await this.sqs.send(new client_sqs_1.DeleteMessageCommand(deleteParams));
}
catch (err) {
throw (0, errors_1.toSQSError)(err, `SQS delete message failed: ${err.message}`);
}
}
/**
* Delete a batch of messages from the SQS queue.
* @param messages The messages that should be deleted from SQS
*/
async deleteMessageBatch(messages) {
if (!this.shouldDeleteMessages) {
debug('Skipping message delete since shouldDeleteMessages is set to false');
return;
}
debug('Deleting messages %s', messages.map((msg) => msg.MessageId).join(' ,'));
const deleteParams = {
QueueUrl: this.queueUrl,
Entries: messages.map((message) => ({
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: timeout
ReceiptHandle: message.ReceiptHandle
}))
};
try {
return await this.sqs.send(new client_sqs_1.ChangeMessageVisibilityBatchCommand(params));
await this.sqs.send(new client_sqs_1.DeleteMessageBatchCommand(deleteParams));
}
catch (err) {
this.emit('error', toSQSError(err, `Error changing visibility timeout: ${err.message}`), messages);
throw (0, errors_1.toSQSError)(err, `SQS delete message failed: ${err.message}`);
}
}
startHeartbeat(heartbeatFn) {
return setInterval(() => {
heartbeatFn();
}, this.heartbeatInterval * 1000);
}
}
exports.Consumer = Consumer;

@@ -0,1 +1,2 @@

import { AWSError } from './types';
declare class SQSError extends Error {

@@ -13,2 +14,13 @@ code: string;

}
export { SQSError, TimeoutError };
/**
* Checks if the error provided should be treated as a connection error.
* @param err The error that was received.
*/
declare function isConnectionError(err: Error): boolean;
/**
* Formats an AWSError the the SQSError type.
* @param err The error object that was received.
* @param message The message that the error occurred on.
*/
declare function toSQSError(err: AWSError, message: string): SQSError;
export { SQSError, TimeoutError, isConnectionError, toSQSError };
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.TimeoutError = exports.SQSError = void 0;
exports.toSQSError = exports.isConnectionError = exports.TimeoutError = exports.SQSError = void 0;
class SQSError extends Error {

@@ -19,1 +19,32 @@ constructor(message) {

exports.TimeoutError = TimeoutError;
/**
* Checks if the error provided should be treated as a connection error.
* @param err The error that was received.
*/
function isConnectionError(err) {
if (err instanceof SQSError) {
return (err.statusCode === 403 ||
err.code === 'CredentialsError' ||
err.code === 'UnknownEndpoint' ||
err.code === 'AWS.SimpleQueueService.NonExistentQueue');
}
return false;
}
exports.isConnectionError = isConnectionError;
/**
* Formats an AWSError the the SQSError type.
* @param err The error object that was received.
* @param message The message that the error occurred on.
*/
function toSQSError(err, message) {
var _a, _b;
const sqsError = new SQSError(message);
sqsError.code = err.name;
sqsError.statusCode = (_a = err.$metadata) === null || _a === void 0 ? void 0 : _a.httpStatusCode;
sqsError.retryable = (_b = err.$retryable) === null || _b === void 0 ? void 0 : _b.throttling;
sqsError.service = err.$service;
sqsError.fault = err.$fault;
sqsError.time = new Date();
return sqsError;
}
exports.toSQSError = toSQSError;

@@ -0,31 +1,164 @@

/// <reference types="node" />
import { SQSClient, Message } from '@aws-sdk/client-sqs';
import { EventEmitter } from 'events';
export interface ConsumerOptions {
/**
* The SQS queue URL.
*/
queueUrl: string;
/**
* List of queue attributes to retrieve (i.e.
* `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`).
* @defaultvalue `[]`
*/
attributeNames?: string[];
/**
* List of message attributes to retrieve (i.e. `['name', 'address']`).
* @defaultvalue `[]`
*/
messageAttributeNames?: string[];
/** @hidden */
stopped?: boolean;
/**
* The number of messages to request from SQS when polling (default `1`).
*
* This cannot be higher than the
* [AWS limit of 10](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html).
* @defaultvalue `1`
*/
batchSize?: number;
/**
* The duration (in seconds) that the received messages are hidden from subsequent
* retrieve requests after being retrieved by a ReceiveMessage request.
*/
visibilityTimeout?: number;
/**
* The duration (in seconds) for which the call will wait for a message to arrive in
* the queue before returning.
* @defaultvalue `20`
*/
waitTimeSeconds?: number;
/**
* The duration (in milliseconds) to wait before retrying after an authentication error.
* @defaultvalue `10000`
*/
authenticationErrorTimeout?: number;
/**
* The duration (in milliseconds) to wait before repolling the queue.
* @defaultvalue `0`
*/
pollingWaitTimeMs?: number;
/**
* If true, sets the message visibility timeout to 0 after a `processing_error`.
* @defaultvalue `false`
*/
terminateVisibilityTimeout?: boolean;
/**
* The interval (in seconds) between requests to extend the message visibility timeout.
*
* On each heartbeat the visibility is extended by adding `visibilityTimeout` to
* the number of seconds since the start of the handler function.
*
* This value must less than `visibilityTimeout`.
*/
heartbeatInterval?: number;
/**
* An optional [SQS Client](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/classes/sqsclient.html)
* object to use if you need to configure the client manually.
*/
sqs?: SQSClient;
/**
* The AWS region.
* @defaultValue `eu-west-1`
*/
region?: string;
/**
* Time in ms to wait for `handleMessage` to process a message before timing out.
*
* Emits `timeout_error` on timeout. By default, if `handleMessage` times out,
* the unprocessed message returns to the end of the queue.
*/
handleMessageTimeout?: number;
/**
* Default to `true`, if you don't want the package to delete messages from sqs
* set this to `false`.
* @defaultvalue `true`
*/
shouldDeleteMessages?: boolean;
handleMessage?(message: Message): Promise<void>;
/**
* An `async` function (or function that returns a `Promise`) to be called whenever
* a message is received.
*
* In the case that you need to acknowledge the message, return an object containing
* the MessageId that you'd like to acknowledge.
*/
handleMessage?(message: Message): Promise<Message | void>;
/**
* An `async` function (or function that returns a `Promise`) to be called whenever
* a batch of messages is received. Similar to `handleMessage` but will receive the
* list of messages, not each message individually.
*
* **If both are set, `handleMessageBatch` overrides `handleMessage`**.
*
* In the case that you need to ack only some of the messages, return an array with
* the successful messages only.
*/
handleMessageBatch?(messages: Message[]): Promise<Message[] | void>;
}
export interface Events {
/**
* Fired after one batch of items (up to `batchSize`) has been successfully processed.
*/
response_processed: [];
/**
* Fired when the queue is empty (All messages have been consumed).
*/
empty: [];
/**
* Fired when a message is received.
*/
message_received: [Message];
/**
* Fired when a message is successfully processed and removed from the queue.
*/
message_processed: [Message];
/**
* Fired when an error occurs interacting with the queue.
*
* If the error correlates to a message, that message is included in Params
*/
error: [Error, void | Message | Message[]];
/**
* Fired when `handleMessageTimeout` is supplied as an option and if
* `handleMessage` times out.
*/
timeout_error: [Error, Message];
/**
* Fired when an error occurs processing the message.
*/
processing_error: [Error, Message];
/**
* Fired when the consumer finally stops its work.
*/
stopped: [];
}
export declare class TypedEventEmitter extends EventEmitter {
/**
* Trigger a listener on all emitted events
* @param event The name of the event to listen to
* @param listener A function to trigger when the event is emitted
*/
on<E extends keyof Events>(event: E, listener: (...args: Events[E]) => void): this;
/**
* Trigger a listener only once for an emitted event
* @param event The name of the event to listen to
* @param listener A function to trigger when the event is emitted
*/
once<E extends keyof Events>(event: E, listener: (...args: Events[E]) => void): this;
/**
* Emits an event with the provided arguments
* @param event The name of the event to emit
*/
emit<E extends keyof Events>(event: E, ...args: Events[E]): boolean;
}
export type AWSError = {

@@ -32,0 +165,0 @@ /**

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.TypedEventEmitter = void 0;
const events_1 = require("events");
class TypedEventEmitter extends events_1.EventEmitter {
/**
* Trigger a listener on all emitted events
* @param event The name of the event to listen to
* @param listener A function to trigger when the event is emitted
*/
on(event, listener) {
return super.on(event, listener);
}
/**
* Trigger a listener only once for an emitted event
* @param event The name of the event to listen to
* @param listener A function to trigger when the event is emitted
*/
once(event, listener) {
return super.on(event, listener);
}
/**
* Emits an event with the provided arguments
* @param event The name of the event to emit
*/
emit(event, ...args) {
return super.emit(event, ...args);
}
}
exports.TypedEventEmitter = TypedEventEmitter;
{
"name": "sqs-consumer",
"version": "6.1.0",
"version": "6.2.0",
"description": "Build SQS-based Node applications without the boilerplate",

@@ -20,3 +20,4 @@ "main": "dist/index.js",

"format:check": "prettier --check \"**/*.{js,json,jsx,md,ts,tsx,html}\"",
"posttest": "npm run lint && npm run format:check"
"posttest": "npm run lint && npm run format:check",
"generate-docs": "typedoc"
},

@@ -30,3 +31,3 @@ "repository": {

},
"homepage": "https://github.com/BBC/sqs-consumer",
"homepage": "https://bbc.github.io/sqs-consumer/",
"keywords": [

@@ -42,22 +43,23 @@ "sqs",

"@types/mocha": "^10.0.1",
"@types/node": "^16.18.7",
"@types/node": "^18.11.18",
"@types/sinon": "^10.0.13",
"chai": "^4.3.7",
"eslint": "^8.29.0",
"eslint": "^8.31.0",
"eslint-config-iplayer-ts": "^4.1.0",
"eslint-config-prettier": "^4.3.0",
"mocha": "^10.1.0",
"eslint-config-prettier": "^8.5.0",
"mocha": "^10.2.0",
"nyc": "^15.1.0",
"p-event": "^4.2.0",
"prettier": "^2.8.1",
"sinon": "^15.0.0",
"sinon": "^15.0.1",
"ts-node": "^10.9.1",
"typedoc": "^0.23.23",
"typescript": "^4.9.4"
},
"dependencies": {
"@aws-sdk/client-sqs": "^3.226.0",
"@aws-sdk/client-sqs": "^3.241.0",
"debug": "^4.3.4"
},
"peerDependencies": {
"@aws-sdk/client-sqs": "^3.226.0"
"@aws-sdk/client-sqs": "^3.241.0"
},

@@ -84,5 +86,3 @@ "mocha": {

"iplayer-ts",
"prettier",
"prettier/react",
"prettier/@typescript-eslint"
"prettier"
],

@@ -89,0 +89,0 @@ "parserOptions": {

@@ -5,4 +5,4 @@ # sqs-consumer

[![Build Status](https://github.com/bbc/sqs-consumer/actions/workflows/test.yml/badge.svg?branch=main)](https://github.com/bbc/sqs-consumer/actions/workflows/test.yml)
[![Code Climate](https://codeclimate.com/github/BBC/sqs-consumer/badges/gpa.svg)](https://codeclimate.com/github/BBC/sqs-consumer)
[![Test Coverage](https://codeclimate.com/github/BBC/sqs-consumer/badges/coverage.svg)](https://codeclimate.com/github/BBC/sqs-consumer)
[![Maintainability](https://api.codeclimate.com/v1/badges/16ec3f59e73bc898b7ff/maintainability)](https://codeclimate.com/github/bbc/sqs-consumer/maintainability)
[![Test Coverage](https://api.codeclimate.com/v1/badges/16ec3f59e73bc898b7ff/test_coverage)](https://codeclimate.com/github/bbc/sqs-consumer/test_coverage)

@@ -13,4 +13,6 @@ Build SQS-based applications without the boilerplate. Just define an async function that handles the SQS message processing.

To install this package, simply enter the following command into your terminal (or the variant of whatever package manager you are using):
```bash
npm install sqs-consumer --save-dev
npm install --save-dev sqs-consumer
```

@@ -48,5 +50,5 @@

- The queue is polled continuously for messages using [long polling](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html).
- The queue is polled continuously for messages using [long polling](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html).
- Messages are deleted from the queue once the handler function has completed successfully.
- Throwing an error (or returning a rejected promise) from the handler function will cause the message to be left on the queue. An [SQS redrive policy](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/SQSDeadLetterQueue.html) can be used to move messages that cannot be processed to a dead letter queue.
- Throwing an error (or returning a rejected promise) from the handler function will cause the message to be left on the queue. An [SQS redrive policy](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/SQSDeadLetterQueue.html) can be used to move messages that cannot be processed to a dead letter queue.
- By default messages are processed one at a time – a new message won't be received until the first one has been processed. To process messages in parallel, use the `batchSize` option [detailed below](#options).

@@ -58,3 +60,3 @@

By default the consumer will look for AWS credentials in the places [specified by the AWS SDK](http://docs.aws.amazon.com/AWSJavaScriptSDK/guide/node-configuring.html#Setting_AWS_Credentials). The simplest option is to export your credentials as environment variables:
By default the consumer will look for AWS credentials in the places [specified by the AWS SDK](https://docs.aws.amazon.com/AWSJavaScriptSDK/guide/node-configuring.html#Setting_AWS_Credentials). The simplest option is to export your credentials as environment variables:

@@ -109,25 +111,4 @@ ```bash

Creates a new SQS consumer.
Creates a new SQS consumer using the [defined options](https://bbc.github.io/sqs-consumer/interfaces/ConsumerOptions.html).
#### Options
| Option | Type | Description |
| ---------------------------- | --------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `queueUrl` | String | The SQS queue URL |
| `region` | String | The AWS region (default `eu-west-1`) |
| `handleMessage` | Function | An `async` function (or function that returns a `Promise`) to be called whenever a message is received. Receives an SQS message object as its first argument. |
| `handleMessageBatch` | Function | An `async` function (or function that returns a `Promise`) to be called whenever a batch of messages is received. Similar to `handleMessage` but will receive the list of messages, not each message individually. **If both are set, `handleMessageBatch` overrides `handleMessage`**. In the case that you need to ack only some of the messages, return an array with the successful messages only. |
| `handleMessageTimeout` | Number | Time in ms to wait for `handleMessage` to process a message before timing out. Emits `timeout_error` on timeout. By default, if `handleMessage` times out, the unprocessed message returns to the end of the queue. |
| `attributeNames` | Array | List of queue attributes to retrieve (i.e. `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`). |
| `messageAttributeNames` | Array | List of message attributes to retrieve (i.e. `['name', 'address']`). |
| `batchSize` | Number | The number of messages to request from SQS when polling (default `1`). This cannot be higher than the [AWS limit of 10](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html). |
| `visibilityTimeout` | Number | The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. |
| `heartbeatInterval` | Number | The interval (in seconds) between requests to extend the message visibility timeout. On each heartbeat the visibility is extended by adding `visibilityTimeout` to the number of seconds since the start of the handler function. This value must less than `visibilityTimeout`. |
| `terminateVisibilityTimeout` | Boolean | If true, sets the message visibility timeout to 0 after a `processing_error` (defaults to `false`). |
| `waitTimeSeconds` | Number | The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning (defaults to `20`). |
| `authenticationErrorTimeout` | Number | The duration (in milliseconds) to wait before retrying after an authentication error (defaults to `10000`). |
| `pollingWaitTimeMs` | Number | The duration (in milliseconds) to wait before repolling the queue (defaults to `0`). |
| `sqs` | SQSClient | An optional [SQS Client](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/classes/sqsclient.html) object to use if you need to configure the client manually |
| `shouldDeleteMessages` | Boolean | Default to `true`, if you don't want the package to delete messages from sqs set this to `false` |
### `consumer.start()`

@@ -139,3 +120,3 @@

Stop polling the queue for messages.
Stop polling the queue for messages (pre existing requests will still be made until concluded).

@@ -148,17 +129,12 @@ ### `consumer.isRunning`

Each consumer is an [`EventEmitter`](http://nodejs.org/api/events.html) and emits the following events:
Each consumer is an [`EventEmitter`](https://nodejs.org/api/events.html) and [emits these events](https://bbc.github.io/sqs-consumer/interfaces/Events.html).
| Event | Params | Description |
| -------------------- | ------------------ | ------------------------------------------------------------------------------------------------------------------------------- |
| `error` | `err`, `[message]` | Fired when an error occurs interacting with the queue. If the error correlates to a message, that message is included in Params |
| `processing_error` | `err`, `message` | Fired when an error occurs processing the message. |
| `timeout_error` | `err`, `message` | Fired when `handleMessageTimeout` is supplied as an option and if `handleMessage` times out. |
| `message_received` | `message` | Fired when a message is received. |
| `message_processed` | `message` | Fired when a message is successfully processed and removed from the queue. |
| `response_processed` | None | Fired after one batch of items (up to `batchSize`) has been successfully processed. |
| `stopped` | None | Fired when the consumer finally stops its work. |
| `empty` | None | Fired when the queue is empty (All messages have been consumed). |
## Contributing
### Contributing
We welcome and appreciate contributions for anyone who would like to take the time to fix a bug or implement a new feature.
See contributing [guidelines](https://github.com/bbc/sqs-consumer/blob/main/.github/CONTRIBUTING.md).
But before you get started, [please read the contributing guidelines](https://github.com/bbc/sqs-consumer/blob/main/.github/CONTRIBUTING.md) and [code of conduct](https://github.com/bbc/sqs-consumer/blob/main/.github/CODE_OF_CONDUCT.md).
## License
SQS Consumer is distributed under the Apache License, Version 2.0, see [LICENSE](https://github.com/bbc/sqs-consumer/blob/main/LICENSE) for more information.

@@ -0,1 +1,6 @@

/**
* Determines if the property is a method
* @param propertyName the name of the property
* @param value the value of the property
*/
function isMethod(propertyName: string, value: any): boolean {

@@ -5,2 +10,6 @@ return propertyName !== 'constructor' && typeof value === 'function';

/**
* Auto binds the provided properties
* @param obj an object containing the available properties
*/
export function autoBind(obj: object): void {

@@ -7,0 +16,0 @@ const propertyNames = Object.getOwnPropertyNames(obj.constructor.prototype);

@@ -19,98 +19,38 @@ import {

import Debug from 'debug';
import { EventEmitter } from 'events';
import { AWSError, ConsumerOptions, Events } from './types';
import { ConsumerOptions, TypedEventEmitter } from './types';
import { autoBind } from './bind';
import { SQSError, TimeoutError } from './errors';
import {
SQSError,
TimeoutError,
toSQSError,
isConnectionError
} from './errors';
import { assertOptions, hasMessages } from './validation';
const debug = Debug('sqs-consumer');
const requiredOptions = [
'queueUrl',
// only one of handleMessage / handleMessagesBatch is required
'handleMessage|handleMessageBatch'
];
interface TimeoutResponse {
timeout: NodeJS.Timeout;
pending: Promise<void>;
}
function createTimeout(duration: number): TimeoutResponse[] {
let timeout;
const pending = new Promise((_, reject) => {
timeout = setTimeout((): void => {
reject(new TimeoutError());
}, duration);
});
return [timeout, pending];
}
function assertOptions(options: ConsumerOptions): void {
requiredOptions.forEach((option) => {
const possibilities = option.split('|');
if (!possibilities.find((p) => options[p])) {
throw new Error(
`Missing SQS consumer option [ ${possibilities.join(' or ')} ].`
);
}
});
if (options.batchSize > 10 || options.batchSize < 1) {
throw new Error('SQS batchSize option must be between 1 and 10.');
}
if (
options.heartbeatInterval &&
!(options.heartbeatInterval < options.visibilityTimeout)
) {
throw new Error('heartbeatInterval must be less than visibilityTimeout.');
}
}
function isConnectionError(err: Error): boolean {
if (err instanceof SQSError) {
return (
err.statusCode === 403 ||
err.code === 'CredentialsError' ||
err.code === 'UnknownEndpoint' ||
err.code === 'AWS.SimpleQueueService.NonExistentQueue'
);
}
return false;
}
function toSQSError(err: AWSError, message: string): SQSError {
const sqsError = new SQSError(message);
sqsError.code = err.name;
sqsError.statusCode = err.$metadata?.httpStatusCode;
sqsError.retryable = err.$retryable?.throttling;
sqsError.service = err.$service;
sqsError.fault = err.$fault;
sqsError.time = new Date();
return sqsError;
}
function hasMessages(response: ReceiveMessageCommandOutput): boolean {
return response.Messages && response.Messages.length > 0;
}
export class Consumer extends EventEmitter {
/**
* [Usage](https://bbc.github.io/sqs-consumer/index.html#usage)
*/
export class Consumer extends TypedEventEmitter {
private pollingTimeoutId: NodeJS.Timeout | undefined = undefined;
private heartbeatTimeoutId: NodeJS.Timeout | undefined = undefined;
private handleMessageTimeoutId: NodeJS.Timeout | undefined = undefined;
private stopped = true;
private queueUrl: string;
private handleMessage: (message: Message) => Promise<void>;
private handleMessage: (message: Message) => Promise<Message | void>;
private handleMessageBatch: (message: Message[]) => Promise<Message[] | void>;
private sqs: SQSClient;
private handleMessageTimeout: number;
private attributeNames: string[];
private messageAttributeNames: string[];
private stopped: boolean;
private shouldDeleteMessages: boolean;
private batchSize: number;
private visibilityTimeout: number;
private terminateVisibilityTimeout: boolean;
private waitTimeSeconds: number;
private authenticationErrorTimeout: number;
private pollingWaitTimeMs: number;
private terminateVisibilityTimeout: boolean;
private heartbeatInterval: number;
private sqs: SQSClient;
private shouldDeleteMessages: boolean;

@@ -126,3 +66,2 @@ constructor(options: ConsumerOptions) {

this.messageAttributeNames = options.messageAttributeNames || [];
this.stopped = true;
this.batchSize = options.batchSize || 1;

@@ -138,3 +77,2 @@ this.visibilityTimeout = options.visibilityTimeout;

this.shouldDeleteMessages = options.shouldDeleteMessages ?? true;
this.sqs =

@@ -145,24 +83,46 @@ options.sqs ||

});
autoBind(this);
}
emit<T extends keyof Events>(event: T, ...args: Events[T]) {
return super.emit(event, ...args);
/**
* Creates a new SQS consumer.
*/
public static create(options: ConsumerOptions): Consumer {
return new Consumer(options);
}
on<T extends keyof Events>(
event: T,
listener: (...args: Events[T]) => void
): this {
return super.on(event, listener);
/**
* Start polling the queue for messages.
*/
public start(): void {
if (this.stopped) {
debug('Starting consumer');
this.stopped = false;
this.poll();
}
}
once<T extends keyof Events>(
event: T,
listener: (...args: Events[T]) => void
): this {
return super.once(event, listener);
/**
* Stop polling the queue for messages (pre existing requests will still be made until concluded).
*/
public stop(): void {
if (this.stopped) {
debug('Consumer was already stopped');
return;
}
debug('Stopping consumer');
this.stopped = true;
if (this.pollingTimeoutId) {
clearTimeout(this.pollingTimeoutId);
this.pollingTimeoutId = undefined;
}
this.emit('stopped');
}
/**
* Returns the current polling state of the consumer: `true` if it is actively polling, `false` if it is not.
*/
public get isRunning(): boolean {

@@ -172,53 +132,114 @@ return !this.stopped;

public static create(options: ConsumerOptions): Consumer {
return new Consumer(options);
/**
* Emit one of the consumer's error events depending on the error received.
* @param err The error object to forward on
* @param message The message that the error occurred on
*/
private emitError(err: Error, message?: Message): void {
if (!message) {
this.emit('error', err);
} else if (err.name === SQSError.name) {
this.emit('error', err, message);
} else if (err instanceof TimeoutError) {
this.emit('timeout_error', err, message);
} else {
this.emit('processing_error', err, message);
}
}
public start(): void {
/**
* Poll for new messages from SQS
*/
private poll(): void {
if (this.stopped) {
debug('Starting consumer');
this.stopped = false;
this.poll();
debug('Poll was called while consumer was stopped, cancelling poll...');
return;
}
debug('Polling for messages');
let currentPollingTimeout = this.pollingWaitTimeMs;
this.receiveMessage({
QueueUrl: this.queueUrl,
AttributeNames: this.attributeNames,
MessageAttributeNames: this.messageAttributeNames,
MaxNumberOfMessages: this.batchSize,
WaitTimeSeconds: this.waitTimeSeconds,
VisibilityTimeout: this.visibilityTimeout
})
.then(this.handleSqsResponse)
.catch((err) => {
this.emitError(err);
if (isConnectionError(err)) {
debug('There was an authentication error. Pausing before retrying.');
currentPollingTimeout = this.authenticationErrorTimeout;
}
return;
})
.then(() => {
if (this.pollingTimeoutId) {
clearTimeout(this.pollingTimeoutId);
}
this.pollingTimeoutId = setTimeout(this.poll, currentPollingTimeout);
})
.catch((err) => {
this.emitError(err);
});
}
public stop(): void {
debug('Stopping consumer');
this.stopped = true;
/**
* Send a request to SQS to retrieve messages
* @param params The required params to receive messages from SQS
*/
private async receiveMessage(
params: ReceiveMessageCommandInput
): Promise<ReceiveMessageCommandOutput> {
try {
return await this.sqs.send(new ReceiveMessageCommand(params));
} catch (err) {
throw toSQSError(err, `SQS receive message failed: ${err.message}`);
}
}
/**
* Handles the response from AWS SQS, determining if we should proceed to
* the message handler.
* @param response The output from AWS SQS
*/
private async handleSqsResponse(
response: ReceiveMessageCommandOutput
): Promise<void> {
debug('Received SQS response');
debug(response);
if (response) {
if (hasMessages(response)) {
if (this.handleMessageBatch) {
// prefer handling messages in batch when available
await this.processMessageBatch(response.Messages);
} else {
await Promise.all(response.Messages.map(this.processMessage));
}
this.emit('response_processed');
if (hasMessages(response)) {
if (this.handleMessageBatch) {
await this.processMessageBatch(response.Messages);
} else {
this.emit('empty');
await Promise.all(response.Messages.map(this.processMessage));
}
this.emit('response_processed');
} else if (response) {
this.emit('empty');
}
}
/**
* Process a message that has been received from SQS. This will execute the message
* handler and delete the message once complete.
* @param message The message that was delivered from SQS
*/
private async processMessage(message: Message): Promise<void> {
this.emit('message_received', message);
try {
this.emit('message_received', message);
let heartbeat;
try {
if (this.heartbeatInterval) {
heartbeat = this.startHeartbeat(async () => {
return this.changeVisibilityTimeout(message, this.visibilityTimeout);
});
this.heartbeatTimeoutId = this.startHeartbeat(message);
}
await this.executeHandler(message);
await this.deleteMessage(message);
this.emit('message_processed', message);
const ackedMessage = await this.executeHandler(message);
if (ackedMessage?.MessageId === message.MessageId) {
await this.deleteMessage(message);
this.emit('message_processed', message);
}
} catch (err) {

@@ -231,59 +252,67 @@ this.emitError(err, message);

} finally {
clearInterval(heartbeat);
clearInterval(this.heartbeatTimeoutId);
this.heartbeatTimeoutId = undefined;
}
}
private async receiveMessage(
params: ReceiveMessageCommandInput
): Promise<ReceiveMessageCommandOutput> {
/**
* Process a batch of messages from the SQS queue.
* @param messages The messages that were delivered from SQS
*/
private async processMessageBatch(messages: Message[]): Promise<void> {
try {
return await this.sqs.send(new ReceiveMessageCommand(params));
} catch (err) {
throw toSQSError(err, `SQS receive message failed: ${err.message}`);
}
}
messages.forEach((message) => {
this.emit('message_received', message);
});
private async deleteMessage(message: Message): Promise<void> {
if (!this.shouldDeleteMessages) {
debug(
'Skipping message delete since shouldDeleteMessages is set to false'
);
return;
}
debug('Deleting message %s', message.MessageId);
if (this.heartbeatInterval) {
this.heartbeatTimeoutId = this.startHeartbeat(null, messages);
}
const deleteParams: DeleteMessageCommandInput = {
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle
};
const ackedMessages = await this.executeBatchHandler(messages);
try {
await this.sqs.send(new DeleteMessageCommand(deleteParams));
} catch (err) {
throw toSQSError(err, `SQS delete message failed: ${err.message}`);
}
}
if (ackedMessages?.length > 0) {
await this.deleteMessageBatch(ackedMessages);
private async executeHandler(message: Message): Promise<void> {
let timeout;
let pending;
try {
if (this.handleMessageTimeout) {
[timeout, pending] = createTimeout(this.handleMessageTimeout);
await Promise.race([this.handleMessage(message), pending]);
} else {
await this.handleMessage(message);
ackedMessages.forEach((message) => {
this.emit('message_processed', message);
});
}
} catch (err) {
if (err instanceof TimeoutError) {
err.message = `Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`;
} else if (err instanceof Error) {
err.message = `Unexpected message handler failure: ${err.message}`;
this.emit('error', err, messages);
if (this.terminateVisibilityTimeout) {
await this.changeVisibilityTimeoutBatch(messages, 0);
}
throw err;
} finally {
clearTimeout(timeout);
clearInterval(this.heartbeatTimeoutId);
this.heartbeatTimeoutId = undefined;
}
}
/**
* Trigger a function on a set interval
* @param heartbeatFn The function that should be triggered
*/
private startHeartbeat(
message?: Message,
messages?: Message[]
): NodeJS.Timeout {
return setInterval(() => {
if (this.handleMessageBatch) {
return this.changeVisibilityTimeoutBatch(
messages,
this.visibilityTimeout
);
} else {
return this.changeVisibilityTimeout(message, this.visibilityTimeout);
}
}, this.heartbeatInterval * 1000);
}
/**
* Change the visibility timeout on a message
* @param message The message to change the value of
* @param timeout The new timeout that should be set
*/
private async changeVisibilityTimeout(

@@ -309,82 +338,109 @@ message: Message,

private emitError(err: Error, message: Message): void {
if (err.name === SQSError.name) {
this.emit('error', err, message);
} else if (err instanceof TimeoutError) {
this.emit('timeout_error', err, message);
} else {
this.emit('processing_error', err, message);
}
}
private poll(): void {
if (this.stopped) {
this.emit('stopped');
return;
}
debug('Polling for messages');
const receiveParams: ReceiveMessageCommandInput = {
/**
* Change the visibility timeout on a batch of messages
* @param messages The messages to change the value of
* @param timeout The new timeout that should be set
*/
private async changeVisibilityTimeoutBatch(
messages: Message[],
timeout: number
): Promise<ChangeMessageVisibilityBatchCommandOutput> {
const params: ChangeMessageVisibilityBatchCommandInput = {
QueueUrl: this.queueUrl,
AttributeNames: this.attributeNames,
MessageAttributeNames: this.messageAttributeNames,
MaxNumberOfMessages: this.batchSize,
WaitTimeSeconds: this.waitTimeSeconds,
VisibilityTimeout: this.visibilityTimeout
Entries: messages.map((message) => ({
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: timeout
}))
};
let currentPollingTimeout = this.pollingWaitTimeMs;
this.receiveMessage(receiveParams)
.then(this.handleSqsResponse)
.catch((err) => {
this.emit('error', err);
if (isConnectionError(err)) {
debug('There was an authentication error. Pausing before retrying.');
currentPollingTimeout = this.authenticationErrorTimeout;
}
return;
})
.then(() => {
setTimeout(this.poll, currentPollingTimeout);
})
.catch((err) => {
this.emit('error', err);
});
try {
return await this.sqs.send(
new ChangeMessageVisibilityBatchCommand(params)
);
} catch (err) {
this.emit(
'error',
toSQSError(err, `Error changing visibility timeout: ${err.message}`),
messages
);
}
}
private async processMessageBatch(messages: Message[]): Promise<void> {
messages.forEach((message) => {
this.emit('message_received', message);
});
/**
* Trigger the applications handleMessage function
* @param message The message that was received from SQS
*/
private async executeHandler(message: Message): Promise<Message> {
try {
let result;
let heartbeat;
try {
if (this.heartbeatInterval) {
heartbeat = this.startHeartbeat(async () => {
return this.changeVisibilityTimeoutBatch(
messages,
this.visibilityTimeout
);
if (this.handleMessageTimeout) {
const pending = new Promise((_, reject) => {
this.handleMessageTimeoutId = setTimeout((): void => {
reject(new TimeoutError());
}, this.handleMessageTimeout);
});
result = await Promise.race([this.handleMessage(message), pending]);
} else {
result = await this.handleMessage(message);
}
const ackedMessages = await this.executeBatchHandler(messages);
if (ackedMessages.length > 0) {
await this.deleteMessageBatch(ackedMessages);
return result instanceof Object ? result : message;
} catch (err) {
err.message =
err instanceof TimeoutError
? `Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`
: `Unexpected message handler failure: ${err.message}`;
throw err;
} finally {
if (this.handleMessageTimeoutId) {
clearTimeout(this.handleMessageTimeoutId);
}
}
}
ackedMessages.forEach((message) => {
this.emit('message_processed', message);
});
/**
* Execute the application's message batch handler
* @param messages The messages that should be forwarded from the SQS queue
*/
private async executeBatchHandler(messages: Message[]): Promise<Message[]> {
try {
const result = await this.handleMessageBatch(messages);
return result instanceof Object ? result : messages;
} catch (err) {
this.emit('error', err, messages);
err.message = `Unexpected message handler failure: ${err.message}`;
throw err;
}
}
if (this.terminateVisibilityTimeout) {
await this.changeVisibilityTimeoutBatch(messages, 0);
}
} finally {
clearInterval(heartbeat);
/**
* Delete a single message from SQS
* @param message The message to delete from the SQS queue
*/
private async deleteMessage(message: Message): Promise<void> {
if (!this.shouldDeleteMessages) {
debug(
'Skipping message delete since shouldDeleteMessages is set to false'
);
return;
}
debug('Deleting message %s', message.MessageId);
const deleteParams: DeleteMessageCommandInput = {
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle
};
try {
await this.sqs.send(new DeleteMessageCommand(deleteParams));
} catch (err) {
throw toSQSError(err, `SQS delete message failed: ${err.message}`);
}
}
/**
* Delete a batch of messages from the SQS queue.
* @param messages The messages that should be deleted from SQS
*/
private async deleteMessageBatch(messages: Message[]): Promise<void> {

@@ -416,48 +472,2 @@ if (!this.shouldDeleteMessages) {

}
private async executeBatchHandler(messages: Message[]): Promise<Message[]> {
try {
const result = await this.handleMessageBatch(messages);
if (result instanceof Object) {
return result;
}
return messages;
} catch (err) {
err.message = `Unexpected message handler failure: ${err.message}`;
throw err;
}
}
private async changeVisibilityTimeoutBatch(
messages: Message[],
timeout: number
): Promise<ChangeMessageVisibilityBatchCommandOutput> {
const params: ChangeMessageVisibilityBatchCommandInput = {
QueueUrl: this.queueUrl,
Entries: messages.map((message) => ({
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: timeout
}))
};
try {
return await this.sqs.send(
new ChangeMessageVisibilityBatchCommand(params)
);
} catch (err) {
this.emit(
'error',
toSQSError(err, `Error changing visibility timeout: ${err.message}`),
messages
);
}
}
private startHeartbeat(heartbeatFn: () => void): NodeJS.Timeout {
return setInterval(() => {
heartbeatFn();
}, this.heartbeatInterval * 1000);
}
}

@@ -0,1 +1,3 @@

import { AWSError } from './types';
class SQSError extends Error {

@@ -23,2 +25,35 @@ code: string;

export { SQSError, TimeoutError };
/**
* Checks if the error provided should be treated as a connection error.
* @param err The error that was received.
*/
function isConnectionError(err: Error): boolean {
if (err instanceof SQSError) {
return (
err.statusCode === 403 ||
err.code === 'CredentialsError' ||
err.code === 'UnknownEndpoint' ||
err.code === 'AWS.SimpleQueueService.NonExistentQueue'
);
}
return false;
}
/**
* Formats an AWSError the the SQSError type.
* @param err The error object that was received.
* @param message The message that the error occurred on.
*/
function toSQSError(err: AWSError, message: string): SQSError {
const sqsError = new SQSError(message);
sqsError.code = err.name;
sqsError.statusCode = err.$metadata?.httpStatusCode;
sqsError.retryable = err.$retryable?.throttling;
sqsError.service = err.$service;
sqsError.fault = err.$fault;
sqsError.time = new Date();
return sqsError;
}
export { SQSError, TimeoutError, isConnectionError, toSQSError };
import { SQSClient, Message } from '@aws-sdk/client-sqs';
import { EventEmitter } from 'events';
export interface ConsumerOptions {
/**
* The SQS queue URL.
*/
queueUrl: string;
/**
* List of queue attributes to retrieve (i.e.
* `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`).
* @defaultvalue `[]`
*/
attributeNames?: string[];
/**
* List of message attributes to retrieve (i.e. `['name', 'address']`).
* @defaultvalue `[]`
*/
messageAttributeNames?: string[];
/** @hidden */
stopped?: boolean;
/**
* The number of messages to request from SQS when polling (default `1`).
*
* This cannot be higher than the
* [AWS limit of 10](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html).
* @defaultvalue `1`
*/
batchSize?: number;
/**
* The duration (in seconds) that the received messages are hidden from subsequent
* retrieve requests after being retrieved by a ReceiveMessage request.
*/
visibilityTimeout?: number;
/**
* The duration (in seconds) for which the call will wait for a message to arrive in
* the queue before returning.
* @defaultvalue `20`
*/
waitTimeSeconds?: number;
/**
* The duration (in milliseconds) to wait before retrying after an authentication error.
* @defaultvalue `10000`
*/
authenticationErrorTimeout?: number;
/**
* The duration (in milliseconds) to wait before repolling the queue.
* @defaultvalue `0`
*/
pollingWaitTimeMs?: number;
/**
* If true, sets the message visibility timeout to 0 after a `processing_error`.
* @defaultvalue `false`
*/
terminateVisibilityTimeout?: boolean;
/**
* The interval (in seconds) between requests to extend the message visibility timeout.
*
* On each heartbeat the visibility is extended by adding `visibilityTimeout` to
* the number of seconds since the start of the handler function.
*
* This value must less than `visibilityTimeout`.
*/
heartbeatInterval?: number;
/**
* An optional [SQS Client](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/classes/sqsclient.html)
* object to use if you need to configure the client manually.
*/
sqs?: SQSClient;
/**
* The AWS region.
* @defaultValue `eu-west-1`
*/
region?: string;
/**
* Time in ms to wait for `handleMessage` to process a message before timing out.
*
* Emits `timeout_error` on timeout. By default, if `handleMessage` times out,
* the unprocessed message returns to the end of the queue.
*/
handleMessageTimeout?: number;
/**
* Default to `true`, if you don't want the package to delete messages from sqs
* set this to `false`.
* @defaultvalue `true`
*/
shouldDeleteMessages?: boolean;
handleMessage?(message: Message): Promise<void>;
/**
* An `async` function (or function that returns a `Promise`) to be called whenever
* a message is received.
*
* In the case that you need to acknowledge the message, return an object containing
* the MessageId that you'd like to acknowledge.
*/
handleMessage?(message: Message): Promise<Message | void>;
/**
* An `async` function (or function that returns a `Promise`) to be called whenever
* a batch of messages is received. Similar to `handleMessage` but will receive the
* list of messages, not each message individually.
*
* **If both are set, `handleMessageBatch` overrides `handleMessage`**.
*
* In the case that you need to ack only some of the messages, return an array with
* the successful messages only.
*/
handleMessageBatch?(messages: Message[]): Promise<Message[] | void>;

@@ -24,12 +110,71 @@ }

export interface Events {
/**
* Fired after one batch of items (up to `batchSize`) has been successfully processed.
*/
response_processed: [];
/**
* Fired when the queue is empty (All messages have been consumed).
*/
empty: [];
/**
* Fired when a message is received.
*/
message_received: [Message];
/**
* Fired when a message is successfully processed and removed from the queue.
*/
message_processed: [Message];
/**
* Fired when an error occurs interacting with the queue.
*
* If the error correlates to a message, that message is included in Params
*/
error: [Error, void | Message | Message[]];
/**
* Fired when `handleMessageTimeout` is supplied as an option and if
* `handleMessage` times out.
*/
timeout_error: [Error, Message];
/**
* Fired when an error occurs processing the message.
*/
processing_error: [Error, Message];
/**
* Fired when the consumer finally stops its work.
*/
stopped: [];
}
export class TypedEventEmitter extends EventEmitter {
/**
* Trigger a listener on all emitted events
* @param event The name of the event to listen to
* @param listener A function to trigger when the event is emitted
*/
on<E extends keyof Events>(
event: E,
listener: (...args: Events[E]) => void
): this {
return super.on(event, listener);
}
/**
* Trigger a listener only once for an emitted event
* @param event The name of the event to listen to
* @param listener A function to trigger when the event is emitted
*/
once<E extends keyof Events>(
event: E,
listener: (...args: Events[E]) => void
): this {
return super.on(event, listener);
}
/**
* Emits an event with the provided arguments
* @param event The name of the event to emit
*/
emit<E extends keyof Events>(event: E, ...args: Events[E]): boolean {
return super.emit(event, ...args);
}
}
export type AWSError = {

@@ -36,0 +181,0 @@ /**

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc