sqs-consumer
Advanced tools
Comparing version 6.0.2 to 6.1.0
@@ -37,2 +37,12 @@ # Contributing | ||
## Contributors Licence Agreement | ||
In order to accept contributions, we need all contributors grant Us a licence to the intellectual | ||
property rights in their Contributions. This Agreement (“Agreement”) is intended to protect your | ||
rights as a contributor, and to help ensure that the intellectual property contained | ||
within is available to the whole community, to use and build on. | ||
When you raise a pull request and you haven't previously signed a CLA, the bot will automatically | ||
ask you to do this. You must complete this step in order for your PR to be merged. | ||
## Pull Request Process | ||
@@ -39,0 +49,0 @@ |
/// <reference types="node" /> | ||
import { SQSClient, Message } from '@aws-sdk/client-sqs'; | ||
import { EventEmitter } from 'events'; | ||
export interface ConsumerOptions { | ||
queueUrl?: string; | ||
attributeNames?: string[]; | ||
messageAttributeNames?: string[]; | ||
stopped?: boolean; | ||
batchSize?: number; | ||
visibilityTimeout?: number; | ||
waitTimeSeconds?: number; | ||
authenticationErrorTimeout?: number; | ||
pollingWaitTimeMs?: number; | ||
terminateVisibilityTimeout?: boolean; | ||
heartbeatInterval?: number; | ||
sqs?: SQSClient; | ||
region?: string; | ||
handleMessageTimeout?: number; | ||
shouldDeleteMessages?: boolean; | ||
handleMessage?(message: Message): Promise<void>; | ||
handleMessageBatch?(messages: Message[]): Promise<void>; | ||
} | ||
interface Events { | ||
response_processed: []; | ||
empty: []; | ||
message_received: [Message]; | ||
message_processed: [Message]; | ||
error: [Error, void | Message | Message[]]; | ||
timeout_error: [Error, Message]; | ||
processing_error: [Error, Message]; | ||
stopped: []; | ||
} | ||
import { ConsumerOptions, Events } from './types'; | ||
export declare class Consumer extends EventEmitter { | ||
@@ -72,2 +43,1 @@ private queueUrl; | ||
} | ||
export {}; |
@@ -43,3 +43,4 @@ "use strict"; | ||
err.code === 'CredentialsError' || | ||
err.code === 'UnknownEndpoint'); | ||
err.code === 'UnknownEndpoint' || | ||
err.code === 'AWS.SimpleQueueService.NonExistentQueue'); | ||
} | ||
@@ -276,5 +277,7 @@ return false; | ||
} | ||
await this.executeBatchHandler(messages); | ||
await this.deleteMessageBatch(messages); | ||
messages.forEach((message) => { | ||
const ackedMessages = await this.executeBatchHandler(messages); | ||
if (ackedMessages.length > 0) { | ||
await this.deleteMessageBatch(ackedMessages); | ||
} | ||
ackedMessages.forEach((message) => { | ||
this.emit('message_processed', message); | ||
@@ -315,3 +318,7 @@ }); | ||
try { | ||
await this.handleMessageBatch(messages); | ||
const result = await this.handleMessageBatch(messages); | ||
if (result instanceof Object) { | ||
return result; | ||
} | ||
return messages; | ||
} | ||
@@ -318,0 +325,0 @@ catch (err) { |
@@ -1,1 +0,2 @@ | ||
export { Consumer, ConsumerOptions } from './consumer'; | ||
export { Consumer } from './consumer'; | ||
export * from './types'; |
"use strict"; | ||
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
var desc = Object.getOwnPropertyDescriptor(m, k); | ||
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) { | ||
desc = { enumerable: true, get: function() { return m[k]; } }; | ||
} | ||
Object.defineProperty(o, k2, desc); | ||
}) : (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
o[k2] = m[k]; | ||
})); | ||
var __exportStar = (this && this.__exportStar) || function(m, exports) { | ||
for (var p in m) if (p !== "default" && !Object.prototype.hasOwnProperty.call(exports, p)) __createBinding(exports, m, p); | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
@@ -6,1 +20,2 @@ exports.Consumer = void 0; | ||
Object.defineProperty(exports, "Consumer", { enumerable: true, get: function () { return consumer_1.Consumer; } }); | ||
__exportStar(require("./types"), exports); |
@@ -0,1 +1,31 @@ | ||
import { SQSClient, Message } from '@aws-sdk/client-sqs'; | ||
export interface ConsumerOptions { | ||
queueUrl: string; | ||
attributeNames?: string[]; | ||
messageAttributeNames?: string[]; | ||
stopped?: boolean; | ||
batchSize?: number; | ||
visibilityTimeout?: number; | ||
waitTimeSeconds?: number; | ||
authenticationErrorTimeout?: number; | ||
pollingWaitTimeMs?: number; | ||
terminateVisibilityTimeout?: boolean; | ||
heartbeatInterval?: number; | ||
sqs?: SQSClient; | ||
region?: string; | ||
handleMessageTimeout?: number; | ||
shouldDeleteMessages?: boolean; | ||
handleMessage?(message: Message): Promise<void>; | ||
handleMessageBatch?(messages: Message[]): Promise<Message[] | void>; | ||
} | ||
export interface Events { | ||
response_processed: []; | ||
empty: []; | ||
message_received: [Message]; | ||
message_processed: [Message]; | ||
error: [Error, void | Message | Message[]]; | ||
timeout_error: [Error, Message]; | ||
processing_error: [Error, Message]; | ||
stopped: []; | ||
} | ||
export type AWSError = { | ||
@@ -2,0 +32,0 @@ /** |
{ | ||
"name": "sqs-consumer", | ||
"version": "6.0.2", | ||
"version": "6.1.0", | ||
"description": "Build SQS-based Node applications without the boilerplate", | ||
@@ -11,3 +11,3 @@ "main": "dist/index.js", | ||
"clean": "rm -fr dist/*", | ||
"prepare": "npm run build", | ||
"prepublishOnly": "npm run build", | ||
"pretest": "npm run build", | ||
@@ -14,0 +14,0 @@ "test": "mocha --recursive --full-trace --exit", |
@@ -51,2 +51,4 @@ # sqs-consumer | ||
You can also find some examples of sqs-consumer implemented in various ways within the [examples directory](./examples/). | ||
### Credentials | ||
@@ -96,2 +98,6 @@ | ||
### AWS IAM Permissions | ||
Consumer will receive and delete messages from the SQS queue. Ensure `sqs:ReceiveMessage`, `sqs:DeleteMessage`, `sqs:DeleteMessageBatch`, `sqs:ChangeMessageVisibility` and `sqs:ChangeMessageVisibilityBatch` access is granted on the queue being consumed. | ||
## API | ||
@@ -105,18 +111,20 @@ | ||
- `queueUrl` - _String_ - The SQS queue URL | ||
- `region` - _String_ - The AWS region (default `eu-west-1`) | ||
- `handleMessage` - _Function_ - An `async` function (or function that returns a `Promise`) to be called whenever a message is received. Receives an SQS message object as it's first argument. | ||
- `handleMessageBatch` - _Function_ - An `async` function (or function that returns a `Promise`) to be called whenever a batch of messages is received. Similar to `handleMessage` but will receive the list of messages, not each message individually. **If both are set, `handleMessageBatch` overrides `handleMessage`**. | ||
- `handleMessageTimeout` - _Number_ - Time in ms to wait for `handleMessage` to process a message before timing out. Emits `timeout_error` on timeout. By default, if `handleMessage` times out, the unprocessed message returns to the end of the queue. | ||
- `attributeNames` - _Array_ - List of queue attributes to retrieve (i.e. `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`). | ||
- `messageAttributeNames` - _Array_ - List of message attributes to retrieve (i.e. `['name', 'address']`). | ||
- `batchSize` - _Number_ - The number of messages to request from SQS when polling (default `1`). This cannot be higher than the AWS limit of 10. | ||
- `visibilityTimeout` - _Number_ - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. | ||
- `heartbeatInterval` - _Number_ - The interval (in seconds) between requests to extend the message visibility timeout. On each heartbeat the visibility is extended by adding `visibilityTimeout` to the number of seconds since the start of the handler function. This value must less than `visibilityTimeout`. | ||
- `terminateVisibilityTimeout` - _Boolean_ - If true, sets the message visibility timeout to 0 after a `processing_error` (defaults to `false`). | ||
- `waitTimeSeconds` - _Number_ - The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning (defaults to `20`). | ||
- `authenticationErrorTimeout` - _Number_ - The duration (in milliseconds) to wait before retrying after an authentication error (defaults to `10000`). | ||
- `pollingWaitTimeMs` - _Number_ - The duration (in milliseconds) to wait before repolling the queue (defaults to `0`). | ||
- `sqs` - _Object_ - An optional [SQS Client](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/classes/sqsclient.html) object to use if you need to configure the client manually | ||
- `shouldDeleteMessages` - _Boolean_ - Default to `true`, if you don't want the package to delete messages from sqs set this to `false` | ||
| Option | Type | Description | | ||
| ---------------------------- | --------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | | ||
| `queueUrl` | String | The SQS queue URL | | ||
| `region` | String | The AWS region (default `eu-west-1`) | | ||
| `handleMessage` | Function | An `async` function (or function that returns a `Promise`) to be called whenever a message is received. Receives an SQS message object as its first argument. | | ||
| `handleMessageBatch` | Function | An `async` function (or function that returns a `Promise`) to be called whenever a batch of messages is received. Similar to `handleMessage` but will receive the list of messages, not each message individually. **If both are set, `handleMessageBatch` overrides `handleMessage`**. In the case that you need to ack only some of the messages, return an array with the successful messages only. | | ||
| `handleMessageTimeout` | Number | Time in ms to wait for `handleMessage` to process a message before timing out. Emits `timeout_error` on timeout. By default, if `handleMessage` times out, the unprocessed message returns to the end of the queue. | | ||
| `attributeNames` | Array | List of queue attributes to retrieve (i.e. `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`). | | ||
| `messageAttributeNames` | Array | List of message attributes to retrieve (i.e. `['name', 'address']`). | | ||
| `batchSize` | Number | The number of messages to request from SQS when polling (default `1`). This cannot be higher than the [AWS limit of 10](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html). | | ||
| `visibilityTimeout` | Number | The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. | | ||
| `heartbeatInterval` | Number | The interval (in seconds) between requests to extend the message visibility timeout. On each heartbeat the visibility is extended by adding `visibilityTimeout` to the number of seconds since the start of the handler function. This value must less than `visibilityTimeout`. | | ||
| `terminateVisibilityTimeout` | Boolean | If true, sets the message visibility timeout to 0 after a `processing_error` (defaults to `false`). | | ||
| `waitTimeSeconds` | Number | The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning (defaults to `20`). | | ||
| `authenticationErrorTimeout` | Number | The duration (in milliseconds) to wait before retrying after an authentication error (defaults to `10000`). | | ||
| `pollingWaitTimeMs` | Number | The duration (in milliseconds) to wait before repolling the queue (defaults to `0`). | | ||
| `sqs` | SQSClient | An optional [SQS Client](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/classes/sqsclient.html) object to use if you need to configure the client manually | | ||
| `shouldDeleteMessages` | Boolean | Default to `true`, if you don't want the package to delete messages from sqs set this to `false` | | ||
@@ -150,8 +158,4 @@ ### `consumer.start()` | ||
### AWS IAM Permissions | ||
Consumer will receive and delete messages from the SQS queue. Ensure `sqs:ReceiveMessage`, `sqs:DeleteMessage`, `sqs:DeleteMessageBatch`, `sqs:ChangeMessageVisibility` and `sqs:ChangeMessageVisibilityBatch` access is granted on the queue being consumed. | ||
### Contributing | ||
See contributing [guidelines](https://github.com/bbc/sqs-consumer/blob/main/.github/CONTRIBUTING.md). |
@@ -21,3 +21,3 @@ import { | ||
import { AWSError } from './types'; | ||
import { AWSError, ConsumerOptions, Events } from './types'; | ||
import { autoBind } from './bind'; | ||
@@ -76,3 +76,4 @@ import { SQSError, TimeoutError } from './errors'; | ||
err.code === 'CredentialsError' || | ||
err.code === 'UnknownEndpoint' | ||
err.code === 'UnknownEndpoint' || | ||
err.code === 'AWS.SimpleQueueService.NonExistentQueue' | ||
); | ||
@@ -99,37 +100,6 @@ } | ||
export interface ConsumerOptions { | ||
queueUrl?: string; | ||
attributeNames?: string[]; | ||
messageAttributeNames?: string[]; | ||
stopped?: boolean; | ||
batchSize?: number; | ||
visibilityTimeout?: number; | ||
waitTimeSeconds?: number; | ||
authenticationErrorTimeout?: number; | ||
pollingWaitTimeMs?: number; | ||
terminateVisibilityTimeout?: boolean; | ||
heartbeatInterval?: number; | ||
sqs?: SQSClient; | ||
region?: string; | ||
handleMessageTimeout?: number; | ||
shouldDeleteMessages?: boolean; | ||
handleMessage?(message: Message): Promise<void>; | ||
handleMessageBatch?(messages: Message[]): Promise<void>; | ||
} | ||
interface Events { | ||
response_processed: []; | ||
empty: []; | ||
message_received: [Message]; | ||
message_processed: [Message]; | ||
error: [Error, void | Message | Message[]]; | ||
timeout_error: [Error, Message]; | ||
processing_error: [Error, Message]; | ||
stopped: []; | ||
} | ||
export class Consumer extends EventEmitter { | ||
private queueUrl: string; | ||
private handleMessage: (message: Message) => Promise<void>; | ||
private handleMessageBatch: (message: Message[]) => Promise<void>; | ||
private handleMessageBatch: (message: Message[]) => Promise<Message[] | void>; | ||
private handleMessageTimeout: number; | ||
@@ -396,5 +366,9 @@ private attributeNames: string[]; | ||
} | ||
await this.executeBatchHandler(messages); | ||
await this.deleteMessageBatch(messages); | ||
messages.forEach((message) => { | ||
const ackedMessages = await this.executeBatchHandler(messages); | ||
if (ackedMessages.length > 0) { | ||
await this.deleteMessageBatch(ackedMessages); | ||
} | ||
ackedMessages.forEach((message) => { | ||
this.emit('message_processed', message); | ||
@@ -440,5 +414,11 @@ }); | ||
private async executeBatchHandler(messages: Message[]): Promise<void> { | ||
private async executeBatchHandler(messages: Message[]): Promise<Message[]> { | ||
try { | ||
await this.handleMessageBatch(messages); | ||
const result = await this.handleMessageBatch(messages); | ||
if (result instanceof Object) { | ||
return result; | ||
} | ||
return messages; | ||
} catch (err) { | ||
@@ -445,0 +425,0 @@ err.message = `Unexpected message handler failure: ${err.message}`; |
@@ -1,1 +0,2 @@ | ||
export { Consumer, ConsumerOptions } from './consumer'; | ||
export { Consumer } from './consumer'; | ||
export * from './types'; |
@@ -0,1 +1,34 @@ | ||
import { SQSClient, Message } from '@aws-sdk/client-sqs'; | ||
export interface ConsumerOptions { | ||
queueUrl: string; | ||
attributeNames?: string[]; | ||
messageAttributeNames?: string[]; | ||
stopped?: boolean; | ||
batchSize?: number; | ||
visibilityTimeout?: number; | ||
waitTimeSeconds?: number; | ||
authenticationErrorTimeout?: number; | ||
pollingWaitTimeMs?: number; | ||
terminateVisibilityTimeout?: boolean; | ||
heartbeatInterval?: number; | ||
sqs?: SQSClient; | ||
region?: string; | ||
handleMessageTimeout?: number; | ||
shouldDeleteMessages?: boolean; | ||
handleMessage?(message: Message): Promise<void>; | ||
handleMessageBatch?(messages: Message[]): Promise<Message[] | void>; | ||
} | ||
export interface Events { | ||
response_processed: []; | ||
empty: []; | ||
message_received: [Message]; | ||
message_processed: [Message]; | ||
error: [Error, void | Message | Message[]]; | ||
timeout_error: [Error, Message]; | ||
processing_error: [Error, Message]; | ||
stopped: []; | ||
} | ||
export type AWSError = { | ||
@@ -2,0 +35,0 @@ /** |
72990
34
1098
158