sqs-consumer
Advanced tools
Comparing version 6.2.1 to 7.0.0
@@ -1,2 +0,2 @@ | ||
import { ConsumerOptions, TypedEventEmitter } from './types'; | ||
import { ConsumerOptions, TypedEventEmitter, StopOptions } from './types'; | ||
/** | ||
@@ -37,3 +37,3 @@ * [Usage](https://bbc.github.io/sqs-consumer/index.html#usage) | ||
*/ | ||
stop(): void; | ||
stop(options: StopOptions): void; | ||
/** | ||
@@ -50,2 +50,6 @@ * Returns the current polling state of the consumer: `true` if it is actively polling, `false` if it is not. | ||
/** | ||
* A reusable options object for sqs.send that's used to avoid duplication. | ||
*/ | ||
private sqsSendOptions; | ||
/** | ||
* Poll for new messages from SQS | ||
@@ -52,0 +56,0 @@ */ |
@@ -10,2 +10,3 @@ "use strict"; | ||
const validation_1 = require("./validation"); | ||
const controllers_1 = require("./controllers"); | ||
const debug = (0, debug_1.default)('sqs-consumer'); | ||
@@ -23,2 +24,8 @@ /** | ||
this.stopped = true; | ||
/** | ||
* A reusable options object for sqs.send that's used to avoid duplication. | ||
*/ | ||
this.sqsSendOptions = { | ||
abortSignal: controllers_1.abortController.signal | ||
}; | ||
(0, validation_1.assertOptions)(options); | ||
@@ -67,3 +74,3 @@ this.queueUrl = options.queueUrl; | ||
*/ | ||
stop() { | ||
stop(options) { | ||
if (this.stopped) { | ||
@@ -79,2 +86,7 @@ debug('Consumer was already stopped'); | ||
} | ||
if (options === null || options === void 0 ? void 0 : options.abort) { | ||
debug('Aborting SQS requests'); | ||
controllers_1.abortController.abort(); | ||
this.emit('aborted'); | ||
} | ||
this.emit('stopped'); | ||
@@ -150,3 +162,3 @@ } | ||
try { | ||
return await this.sqs.send(new client_sqs_1.ReceiveMessageCommand(params)); | ||
return await this.sqs.send(new client_sqs_1.ReceiveMessageCommand(params), this.sqsSendOptions); | ||
} | ||
@@ -261,3 +273,3 @@ catch (err) { | ||
}; | ||
return await this.sqs.send(new client_sqs_1.ChangeMessageVisibilityCommand(input)); | ||
return await this.sqs.send(new client_sqs_1.ChangeMessageVisibilityCommand(input), this.sqsSendOptions); | ||
} | ||
@@ -283,3 +295,3 @@ catch (err) { | ||
try { | ||
return await this.sqs.send(new client_sqs_1.ChangeMessageVisibilityBatchCommand(params)); | ||
return await this.sqs.send(new client_sqs_1.ChangeMessageVisibilityBatchCommand(params), this.sqsSendOptions); | ||
} | ||
@@ -352,3 +364,3 @@ catch (err) { | ||
try { | ||
await this.sqs.send(new client_sqs_1.DeleteMessageCommand(deleteParams)); | ||
await this.sqs.send(new client_sqs_1.DeleteMessageCommand(deleteParams), this.sqsSendOptions); | ||
} | ||
@@ -377,3 +389,3 @@ catch (err) { | ||
try { | ||
await this.sqs.send(new client_sqs_1.DeleteMessageBatchCommand(deleteParams)); | ||
await this.sqs.send(new client_sqs_1.DeleteMessageBatchCommand(deleteParams), this.sqsSendOptions); | ||
} | ||
@@ -380,0 +392,0 @@ catch (err) { |
@@ -108,2 +108,10 @@ /// <reference types="node" /> | ||
} | ||
export interface StopOptions { | ||
/** | ||
* Default to `false`, if you want the stop action to also abort requests to SQS | ||
* set this to `true`. | ||
* @defaultvalue `false` | ||
*/ | ||
abort?: boolean; | ||
} | ||
export interface Events { | ||
@@ -142,2 +150,6 @@ /** | ||
/** | ||
* Fired when requests to SQS were aborted. | ||
*/ | ||
aborted: []; | ||
/** | ||
* Fired when the consumer finally stops its work. | ||
@@ -144,0 +156,0 @@ */ |
{ | ||
"name": "sqs-consumer", | ||
"version": "6.2.1", | ||
"version": "7.0.0", | ||
"description": "Build SQS-based Node applications without the boilerplate", | ||
"main": "dist/index.js", | ||
"types": "dist/index.d.ts", | ||
"engines": { | ||
"node": ">=16.19.0" | ||
}, | ||
"scripts": { | ||
@@ -8,0 +11,0 @@ "build": "npm run clean && tsc", |
@@ -25,2 +25,8 @@ # sqs-consumer | ||
### Node version | ||
From v7 and above, this library will only support Node v16 or above. If you are still using Node 14, please use a previous version of the library. | ||
This decision was made due to the removal of security support from the Node.JS team from April 30th, 2023. | ||
## Usage | ||
@@ -53,2 +59,3 @@ | ||
- By default messages are processed one at a time – a new message won't be received until the first one has been processed. To process messages in parallel, use the `batchSize` option [detailed below](#options). | ||
- By default, messages that are sent to the `handleMessage` and `handleMessageBatch` functions will be considered as processed if they return without an error. To acknowledge individual messages, please return the message that you want to acknowledge if you are using `handleMessage` or the messages for `handleMessageBatch`. It's also important to await any processing that you are doing to ensure that messages are processed one at a time. | ||
@@ -113,6 +120,10 @@ ### Credentials | ||
### `consumer.stop()` | ||
### `consumer.stop(options)` | ||
Stop polling the queue for messages (pre existing requests will still be made until concluded). | ||
Stop polling the queue for messages. [You can find the options definition here](https://bbc.github.io/sqs-consumer/interfaces/StopOptions.html). | ||
By default, the value of `abort` is set to `false` which means pre existing requests to AWS SQS will still be made until they have concluded. If you would like to abort these requests instead, pass the abort value as `true`, like so: | ||
`consumer.stop({ abort: true })` | ||
### `consumer.isRunning` | ||
@@ -119,0 +130,0 @@ |
@@ -20,3 +20,3 @@ import { | ||
import { ConsumerOptions, TypedEventEmitter } from './types'; | ||
import { ConsumerOptions, TypedEventEmitter, StopOptions } from './types'; | ||
import { autoBind } from './bind'; | ||
@@ -30,2 +30,3 @@ import { | ||
import { assertOptions, hasMessages } from './validation'; | ||
import { abortController } from './controllers'; | ||
@@ -106,3 +107,3 @@ const debug = Debug('sqs-consumer'); | ||
*/ | ||
public stop(): void { | ||
public stop(options: StopOptions): void { | ||
if (this.stopped) { | ||
@@ -121,2 +122,10 @@ debug('Consumer was already stopped'); | ||
if (options?.abort) { | ||
debug('Aborting SQS requests'); | ||
abortController.abort(); | ||
this.emit('aborted'); | ||
} | ||
this.emit('stopped'); | ||
@@ -150,2 +159,9 @@ } | ||
/** | ||
* A reusable options object for sqs.send that's used to avoid duplication. | ||
*/ | ||
private sqsSendOptions = { | ||
abortSignal: abortController.signal | ||
}; | ||
/** | ||
* Poll for new messages from SQS | ||
@@ -198,3 +214,6 @@ */ | ||
try { | ||
return await this.sqs.send(new ReceiveMessageCommand(params)); | ||
return await this.sqs.send( | ||
new ReceiveMessageCommand(params), | ||
this.sqsSendOptions | ||
); | ||
} catch (err) { | ||
@@ -328,3 +347,6 @@ throw toSQSError(err, `SQS receive message failed: ${err.message}`); | ||
}; | ||
return await this.sqs.send(new ChangeMessageVisibilityCommand(input)); | ||
return await this.sqs.send( | ||
new ChangeMessageVisibilityCommand(input), | ||
this.sqsSendOptions | ||
); | ||
} catch (err) { | ||
@@ -358,3 +380,4 @@ this.emit( | ||
return await this.sqs.send( | ||
new ChangeMessageVisibilityBatchCommand(params) | ||
new ChangeMessageVisibilityBatchCommand(params), | ||
this.sqsSendOptions | ||
); | ||
@@ -437,3 +460,6 @@ } catch (err) { | ||
try { | ||
await this.sqs.send(new DeleteMessageCommand(deleteParams)); | ||
await this.sqs.send( | ||
new DeleteMessageCommand(deleteParams), | ||
this.sqsSendOptions | ||
); | ||
} catch (err) { | ||
@@ -469,3 +495,6 @@ throw toSQSError(err, `SQS delete message failed: ${err.message}`); | ||
try { | ||
await this.sqs.send(new DeleteMessageBatchCommand(deleteParams)); | ||
await this.sqs.send( | ||
new DeleteMessageBatchCommand(deleteParams), | ||
this.sqsSendOptions | ||
); | ||
} catch (err) { | ||
@@ -472,0 +501,0 @@ throw toSQSError(err, `SQS delete message failed: ${err.message}`); |
@@ -109,2 +109,11 @@ import { SQSClient, Message } from '@aws-sdk/client-sqs'; | ||
export interface StopOptions { | ||
/** | ||
* Default to `false`, if you want the stop action to also abort requests to SQS | ||
* set this to `true`. | ||
* @defaultvalue `false` | ||
*/ | ||
abort?: boolean; | ||
} | ||
export interface Events { | ||
@@ -143,2 +152,6 @@ /** | ||
/** | ||
* Fired when requests to SQS were aborted. | ||
*/ | ||
aborted: []; | ||
/** | ||
* Fired when the consumer finally stops its work. | ||
@@ -145,0 +158,0 @@ */ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
91240
43
1783
143