Socket
Socket
Sign inDemoInstall

sqs-consumer

Package Overview
Dependencies
140
Maintainers
2
Versions
91
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 6.0.2 to 6.1.0

.github/ISSUE_TEMPLATE/bug-report.yml

10

.github/CONTRIBUTING.md

@@ -37,2 +37,12 @@ # Contributing

## Contributors Licence Agreement
In order to accept contributions, we need all contributors grant Us a licence to the intellectual
property rights in their Contributions. This Agreement (“Agreement”) is intended to protect your
rights as a contributor, and to help ensure that the intellectual property contained
within is available to the whole community, to use and build on.
When you raise a pull request and you haven't previously signed a CLA, the bot will automatically
ask you to do this. You must complete this step in order for your PR to be merged.
## Pull Request Process

@@ -39,0 +49,0 @@

32

dist/consumer.d.ts
/// <reference types="node" />
import { SQSClient, Message } from '@aws-sdk/client-sqs';
import { EventEmitter } from 'events';
export interface ConsumerOptions {
queueUrl?: string;
attributeNames?: string[];
messageAttributeNames?: string[];
stopped?: boolean;
batchSize?: number;
visibilityTimeout?: number;
waitTimeSeconds?: number;
authenticationErrorTimeout?: number;
pollingWaitTimeMs?: number;
terminateVisibilityTimeout?: boolean;
heartbeatInterval?: number;
sqs?: SQSClient;
region?: string;
handleMessageTimeout?: number;
shouldDeleteMessages?: boolean;
handleMessage?(message: Message): Promise<void>;
handleMessageBatch?(messages: Message[]): Promise<void>;
}
interface Events {
response_processed: [];
empty: [];
message_received: [Message];
message_processed: [Message];
error: [Error, void | Message | Message[]];
timeout_error: [Error, Message];
processing_error: [Error, Message];
stopped: [];
}
import { ConsumerOptions, Events } from './types';
export declare class Consumer extends EventEmitter {

@@ -72,2 +43,1 @@ private queueUrl;

}
export {};

@@ -43,3 +43,4 @@ "use strict";

err.code === 'CredentialsError' ||
err.code === 'UnknownEndpoint');
err.code === 'UnknownEndpoint' ||
err.code === 'AWS.SimpleQueueService.NonExistentQueue');
}

@@ -276,5 +277,7 @@ return false;

}
await this.executeBatchHandler(messages);
await this.deleteMessageBatch(messages);
messages.forEach((message) => {
const ackedMessages = await this.executeBatchHandler(messages);
if (ackedMessages.length > 0) {
await this.deleteMessageBatch(ackedMessages);
}
ackedMessages.forEach((message) => {
this.emit('message_processed', message);

@@ -315,3 +318,7 @@ });

try {
await this.handleMessageBatch(messages);
const result = await this.handleMessageBatch(messages);
if (result instanceof Object) {
return result;
}
return messages;
}

@@ -318,0 +325,0 @@ catch (err) {

@@ -1,1 +0,2 @@

export { Consumer, ConsumerOptions } from './consumer';
export { Consumer } from './consumer';
export * from './types';
"use strict";
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
var desc = Object.getOwnPropertyDescriptor(m, k);
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) {
desc = { enumerable: true, get: function() { return m[k]; } };
}
Object.defineProperty(o, k2, desc);
}) : (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
o[k2] = m[k];
}));
var __exportStar = (this && this.__exportStar) || function(m, exports) {
for (var p in m) if (p !== "default" && !Object.prototype.hasOwnProperty.call(exports, p)) __createBinding(exports, m, p);
};
Object.defineProperty(exports, "__esModule", { value: true });

@@ -6,1 +20,2 @@ exports.Consumer = void 0;

Object.defineProperty(exports, "Consumer", { enumerable: true, get: function () { return consumer_1.Consumer; } });
__exportStar(require("./types"), exports);

@@ -0,1 +1,31 @@

import { SQSClient, Message } from '@aws-sdk/client-sqs';
export interface ConsumerOptions {
queueUrl: string;
attributeNames?: string[];
messageAttributeNames?: string[];
stopped?: boolean;
batchSize?: number;
visibilityTimeout?: number;
waitTimeSeconds?: number;
authenticationErrorTimeout?: number;
pollingWaitTimeMs?: number;
terminateVisibilityTimeout?: boolean;
heartbeatInterval?: number;
sqs?: SQSClient;
region?: string;
handleMessageTimeout?: number;
shouldDeleteMessages?: boolean;
handleMessage?(message: Message): Promise<void>;
handleMessageBatch?(messages: Message[]): Promise<Message[] | void>;
}
export interface Events {
response_processed: [];
empty: [];
message_received: [Message];
message_processed: [Message];
error: [Error, void | Message | Message[]];
timeout_error: [Error, Message];
processing_error: [Error, Message];
stopped: [];
}
export type AWSError = {

@@ -2,0 +32,0 @@ /**

{
"name": "sqs-consumer",
"version": "6.0.2",
"version": "6.1.0",
"description": "Build SQS-based Node applications without the boilerplate",

@@ -11,3 +11,3 @@ "main": "dist/index.js",

"clean": "rm -fr dist/*",
"prepare": "npm run build",
"prepublishOnly": "npm run build",
"pretest": "npm run build",

@@ -14,0 +14,0 @@ "test": "mocha --recursive --full-trace --exit",

@@ -51,2 +51,4 @@ # sqs-consumer

You can also find some examples of sqs-consumer implemented in various ways within the [examples directory](./examples/).
### Credentials

@@ -96,2 +98,6 @@

### AWS IAM Permissions
Consumer will receive and delete messages from the SQS queue. Ensure `sqs:ReceiveMessage`, `sqs:DeleteMessage`, `sqs:DeleteMessageBatch`, `sqs:ChangeMessageVisibility` and `sqs:ChangeMessageVisibilityBatch` access is granted on the queue being consumed.
## API

@@ -105,18 +111,20 @@

- `queueUrl` - _String_ - The SQS queue URL
- `region` - _String_ - The AWS region (default `eu-west-1`)
- `handleMessage` - _Function_ - An `async` function (or function that returns a `Promise`) to be called whenever a message is received. Receives an SQS message object as it's first argument.
- `handleMessageBatch` - _Function_ - An `async` function (or function that returns a `Promise`) to be called whenever a batch of messages is received. Similar to `handleMessage` but will receive the list of messages, not each message individually. **If both are set, `handleMessageBatch` overrides `handleMessage`**.
- `handleMessageTimeout` - _Number_ - Time in ms to wait for `handleMessage` to process a message before timing out. Emits `timeout_error` on timeout. By default, if `handleMessage` times out, the unprocessed message returns to the end of the queue.
- `attributeNames` - _Array_ - List of queue attributes to retrieve (i.e. `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`).
- `messageAttributeNames` - _Array_ - List of message attributes to retrieve (i.e. `['name', 'address']`).
- `batchSize` - _Number_ - The number of messages to request from SQS when polling (default `1`). This cannot be higher than the AWS limit of 10.
- `visibilityTimeout` - _Number_ - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request.
- `heartbeatInterval` - _Number_ - The interval (in seconds) between requests to extend the message visibility timeout. On each heartbeat the visibility is extended by adding `visibilityTimeout` to the number of seconds since the start of the handler function. This value must less than `visibilityTimeout`.
- `terminateVisibilityTimeout` - _Boolean_ - If true, sets the message visibility timeout to 0 after a `processing_error` (defaults to `false`).
- `waitTimeSeconds` - _Number_ - The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning (defaults to `20`).
- `authenticationErrorTimeout` - _Number_ - The duration (in milliseconds) to wait before retrying after an authentication error (defaults to `10000`).
- `pollingWaitTimeMs` - _Number_ - The duration (in milliseconds) to wait before repolling the queue (defaults to `0`).
- `sqs` - _Object_ - An optional [SQS Client](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/classes/sqsclient.html) object to use if you need to configure the client manually
- `shouldDeleteMessages` - _Boolean_ - Default to `true`, if you don't want the package to delete messages from sqs set this to `false`
| Option | Type | Description |
| ---------------------------- | --------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `queueUrl` | String | The SQS queue URL |
| `region` | String | The AWS region (default `eu-west-1`) |
| `handleMessage` | Function | An `async` function (or function that returns a `Promise`) to be called whenever a message is received. Receives an SQS message object as its first argument. |
| `handleMessageBatch` | Function | An `async` function (or function that returns a `Promise`) to be called whenever a batch of messages is received. Similar to `handleMessage` but will receive the list of messages, not each message individually. **If both are set, `handleMessageBatch` overrides `handleMessage`**. In the case that you need to ack only some of the messages, return an array with the successful messages only. |
| `handleMessageTimeout` | Number | Time in ms to wait for `handleMessage` to process a message before timing out. Emits `timeout_error` on timeout. By default, if `handleMessage` times out, the unprocessed message returns to the end of the queue. |
| `attributeNames` | Array | List of queue attributes to retrieve (i.e. `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`). |
| `messageAttributeNames` | Array | List of message attributes to retrieve (i.e. `['name', 'address']`). |
| `batchSize` | Number | The number of messages to request from SQS when polling (default `1`). This cannot be higher than the [AWS limit of 10](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html). |
| `visibilityTimeout` | Number | The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. |
| `heartbeatInterval` | Number | The interval (in seconds) between requests to extend the message visibility timeout. On each heartbeat the visibility is extended by adding `visibilityTimeout` to the number of seconds since the start of the handler function. This value must less than `visibilityTimeout`. |
| `terminateVisibilityTimeout` | Boolean | If true, sets the message visibility timeout to 0 after a `processing_error` (defaults to `false`). |
| `waitTimeSeconds` | Number | The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning (defaults to `20`). |
| `authenticationErrorTimeout` | Number | The duration (in milliseconds) to wait before retrying after an authentication error (defaults to `10000`). |
| `pollingWaitTimeMs` | Number | The duration (in milliseconds) to wait before repolling the queue (defaults to `0`). |
| `sqs` | SQSClient | An optional [SQS Client](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/classes/sqsclient.html) object to use if you need to configure the client manually |
| `shouldDeleteMessages` | Boolean | Default to `true`, if you don't want the package to delete messages from sqs set this to `false` |

@@ -150,8 +158,4 @@ ### `consumer.start()`

### AWS IAM Permissions
Consumer will receive and delete messages from the SQS queue. Ensure `sqs:ReceiveMessage`, `sqs:DeleteMessage`, `sqs:DeleteMessageBatch`, `sqs:ChangeMessageVisibility` and `sqs:ChangeMessageVisibilityBatch` access is granted on the queue being consumed.
### Contributing
See contributing [guidelines](https://github.com/bbc/sqs-consumer/blob/main/.github/CONTRIBUTING.md).

@@ -21,3 +21,3 @@ import {

import { AWSError } from './types';
import { AWSError, ConsumerOptions, Events } from './types';
import { autoBind } from './bind';

@@ -76,3 +76,4 @@ import { SQSError, TimeoutError } from './errors';

err.code === 'CredentialsError' ||
err.code === 'UnknownEndpoint'
err.code === 'UnknownEndpoint' ||
err.code === 'AWS.SimpleQueueService.NonExistentQueue'
);

@@ -99,37 +100,6 @@ }

export interface ConsumerOptions {
queueUrl?: string;
attributeNames?: string[];
messageAttributeNames?: string[];
stopped?: boolean;
batchSize?: number;
visibilityTimeout?: number;
waitTimeSeconds?: number;
authenticationErrorTimeout?: number;
pollingWaitTimeMs?: number;
terminateVisibilityTimeout?: boolean;
heartbeatInterval?: number;
sqs?: SQSClient;
region?: string;
handleMessageTimeout?: number;
shouldDeleteMessages?: boolean;
handleMessage?(message: Message): Promise<void>;
handleMessageBatch?(messages: Message[]): Promise<void>;
}
interface Events {
response_processed: [];
empty: [];
message_received: [Message];
message_processed: [Message];
error: [Error, void | Message | Message[]];
timeout_error: [Error, Message];
processing_error: [Error, Message];
stopped: [];
}
export class Consumer extends EventEmitter {
private queueUrl: string;
private handleMessage: (message: Message) => Promise<void>;
private handleMessageBatch: (message: Message[]) => Promise<void>;
private handleMessageBatch: (message: Message[]) => Promise<Message[] | void>;
private handleMessageTimeout: number;

@@ -396,5 +366,9 @@ private attributeNames: string[];

}
await this.executeBatchHandler(messages);
await this.deleteMessageBatch(messages);
messages.forEach((message) => {
const ackedMessages = await this.executeBatchHandler(messages);
if (ackedMessages.length > 0) {
await this.deleteMessageBatch(ackedMessages);
}
ackedMessages.forEach((message) => {
this.emit('message_processed', message);

@@ -440,5 +414,11 @@ });

private async executeBatchHandler(messages: Message[]): Promise<void> {
private async executeBatchHandler(messages: Message[]): Promise<Message[]> {
try {
await this.handleMessageBatch(messages);
const result = await this.handleMessageBatch(messages);
if (result instanceof Object) {
return result;
}
return messages;
} catch (err) {

@@ -445,0 +425,0 @@ err.message = `Unexpected message handler failure: ${err.message}`;

@@ -1,1 +0,2 @@

export { Consumer, ConsumerOptions } from './consumer';
export { Consumer } from './consumer';
export * from './types';

@@ -0,1 +1,34 @@

import { SQSClient, Message } from '@aws-sdk/client-sqs';
export interface ConsumerOptions {
queueUrl: string;
attributeNames?: string[];
messageAttributeNames?: string[];
stopped?: boolean;
batchSize?: number;
visibilityTimeout?: number;
waitTimeSeconds?: number;
authenticationErrorTimeout?: number;
pollingWaitTimeMs?: number;
terminateVisibilityTimeout?: boolean;
heartbeatInterval?: number;
sqs?: SQSClient;
region?: string;
handleMessageTimeout?: number;
shouldDeleteMessages?: boolean;
handleMessage?(message: Message): Promise<void>;
handleMessageBatch?(messages: Message[]): Promise<Message[] | void>;
}
export interface Events {
response_processed: [];
empty: [];
message_received: [Message];
message_processed: [Message];
error: [Error, void | Message | Message[]];
timeout_error: [Error, Message];
processing_error: [Error, Message];
stopped: [];
}
export type AWSError = {

@@ -2,0 +35,0 @@ /**

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc