New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@bbc/cloudflare-queue-consumer

Package Overview
Dependencies
Maintainers
8
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@bbc/cloudflare-queue-consumer - npm Package Compare versions

Comparing version 0.0.3 to 0.0.4

104

dist/cjs/consumer.js

@@ -26,4 +26,6 @@ "use strict";

this.handleMessage = options.handleMessage;
this.handleMessageBatch = options.handleMessageBatch;
this.batchSize = options.batchSize ?? 10;
this.visibilityTimeoutMs = options.visibilityTimeoutMs ?? 1000;
this.retryMessagesOnError = options.retryMessagesOnError || false;
this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 1000;

@@ -35,3 +37,3 @@ this.handleMessageTimeout = options.handleMessageTimeout;

/**
* Creates a new SQS consumer.
* Creates a new consumer.
*/

@@ -56,3 +58,3 @@ static create(options) {

/**
* A reusable options object for sqs.send that's used to avoid duplication.
* A reusable options object for queue.sending that's used to avoid duplication.
*/

@@ -152,5 +154,5 @@ get fetchOptions() {

/**
* Handles the response from AWS SQS, determining if we should proceed to
* Handles the response from CloudFlare, determining if we should proceed to
* the message handler.
* @param response The output from AWS SQS
* @param response The output from CloudFlare
*/

@@ -164,3 +166,8 @@ async handleQueueResponse(response) {

if ((0, validation_js_1.hasMessages)(response)) {
await Promise.all(response.result.messages.map((message) => this.processMessage(message)));
if (this.handleMessageBatch) {
await this.processMessageBatch(response.result.messages);
}
else {
await Promise.all(response.result.messages.map((message) => this.processMessage(message)));
}
this.emit("response_processed");

@@ -186,7 +193,3 @@ }

// in a batch, rather than one at a time.
await this.acknowledgeMessage([
{
lease_id: message.lease_id,
},
], []);
await this.acknowledgeMessage([message], []);
this.emit("message_processed", message);

@@ -197,13 +200,36 @@ }

this.emitError(err, message);
// TODO: In order to conserve API reate limits, it would be better to do this
// in a batch, rather than one at a time.
await this.acknowledgeMessage([], [
{
lease_id: message.lease_id,
delay_seconds: this.retryMessageDelay,
},
]);
if (this.retryMessagesOnError) {
// TODO: In order to conserve API reate limits, it would be better to do this
// in a batch, rather than one at a time.
await this.acknowledgeMessage([], [message]);
}
}
}
/**
* Process a batch of messages from the SQS queue.
* @param messages The messages that were delivered from SQS
*/
async processMessageBatch(messages) {
try {
messages.forEach((message) => {
this.emit("message_received", message);
});
// TODO: Invesitgate if we can do heartbear checks here like SQS Consumer
// https://github.com/bbc/sqs-consumer/blob/main/src/consumer.ts#L375
const ackedMessages = await this.executeBatchHandler(messages);
if (ackedMessages?.length > 0) {
await this.acknowledgeMessage(ackedMessages, []);
ackedMessages.forEach((message) => {
this.emit("message_processed", message);
});
}
}
catch (err) {
this.emit("error", err, messages);
if (this.retryMessagesOnError) {
await this.acknowledgeMessage([], messages);
}
}
}
/**
* Trigger the applications handleMessage function

@@ -247,6 +273,24 @@ * @param message The message that was received from CloudFlare

/**
* Change the visibility timeout on a message
* @param message The message to change the value of
* @param timeout The new timeout that should be set
* Execute the application's message batch handler
* @param messages The messages that should be forwarded from the SQS queue
*/
async executeBatchHandler(messages) {
try {
const result = await this.handleMessageBatch(messages);
return !this.alwaysAcknowledge && result instanceof Object
? result
: messages;
}
catch (err) {
if (err instanceof Error) {
throw (0, errors_js_1.toStandardError)(err, `Unexpected message handler failure: ${err.message}`);
}
throw err;
}
}
/**
* Acknowledge a message that has been processed by the consumer
* @param acks The message(s) to acknowledge
* @param retries The message(s) to retry
*/
async acknowledgeMessage(acks, retries) {

@@ -256,4 +300,8 @@ try {

// TODO: This doesn't appear to be acknowledging correctly....
const input = { acks, retries };
this.emit("acknowledging_messages", acks, retries);
const retriesWithDelay = retries.map((message) => ({
...message,
delay_seconds: this.retryMessageDelay,
}));
const input = { acks, retries: retriesWithDelay };
this.emit("acknowledging_messages", acks, retriesWithDelay);
const result = await (0, cloudflare_js_1.queuesClient)({

@@ -278,2 +326,12 @@ ...this.fetchOptions,

/**
* Validates and then updates the provided option to the provided value.
* @param option The option to validate and then update
* @param value The value to set the provided option to
*/
updateOption(option, value) {
(0, validation_js_1.validateOption)(option, value, true);
this[option] = value;
this.emit("option_updated", option, value);
}
/**
* Emit one of the consumer's error events depending on the error received.

@@ -280,0 +338,0 @@ * @param err The error object to forward on

@@ -12,3 +12,3 @@ "use strict";

/**
* Formats an AWSError the the SQSError type.
* Formats a provider's error the the ProviderError type.
* @param err The error object that was received.

@@ -19,4 +19,10 @@ * @param message The message to send with the error.

const error = new ProviderError(message);
error.code = err.code;
error.stack = err?.stack;
error.time = new Date();
if (err.response) {
error.status = err.response.status;
error.statusText = err.response.statusText;
error.url = err.response.url;
}
return error;

@@ -23,0 +29,0 @@ }

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.throwErrorIfResponseNotOk = void 0;
const errors_js_1 = require("../errors.js");
function throwErrorIfResponseNotOk(response) {
const { ok, status, statusText, url } = response;
if (!ok) {
let error;
if (status) {
error = new Error(`[${status} - ${statusText}] ${url}`);
}
else {
error = new Error(`[${statusText}] ${url}`);
}
throw error;
const message = status
? `[${status} - ${statusText}] ${url}`
: `[${statusText}] ${url}`;
const error = new errors_js_1.ProviderError(message);
error.code = "ResponseNotOk";
Object.defineProperty(error, "response", { value: response });
throw (0, errors_js_1.toProviderError)(error, message);
}
}
exports.throwErrorIfResponseNotOk = throwErrorIfResponseNotOk;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.hasMessages = exports.validateOption = exports.assertOptions = void 0;
const requiredOptions = ["accountId", "queueId", "handleMessage"];
const requiredOptions = [
"accountId",
"queueId",
// only one of handleMessage / handleMessagesBatch is required
"handleMessage|handleMessageBatch",
];
function validateOption(option, value, strict) {
switch (option) {
case "batchSize":
if (value < 1) {
throw new Error("batchSize must be at least 1.");
if (value > 100 || value < 1) {
throw new Error("batchSize must be between 1 and 100");
}
break;
case "visibilityTimeoutMs":
if (value > 43200000) {
throw new Error("visibilityTimeoutMs must be less than 43200000");
}
break;
case "retryMessageDelay":
if (value > 42300) {
throw new Error("retryMessageDelay must be less than 42300");
}
break;
default:

@@ -15,0 +28,0 @@ if (strict) {

import { TypedEventEmitter } from "./emitter.js";
import { assertOptions, hasMessages } from "./validation.js";
import { assertOptions, hasMessages, validateOption } from "./validation.js";
import { queuesClient } from "./lib/cloudflare.js";

@@ -23,4 +23,6 @@ import { logger } from "./logger.js";

this.handleMessage = options.handleMessage;
this.handleMessageBatch = options.handleMessageBatch;
this.batchSize = options.batchSize ?? 10;
this.visibilityTimeoutMs = options.visibilityTimeoutMs ?? 1000;
this.retryMessagesOnError = options.retryMessagesOnError || false;
this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 1000;

@@ -32,3 +34,3 @@ this.handleMessageTimeout = options.handleMessageTimeout;

/**
* Creates a new SQS consumer.
* Creates a new consumer.
*/

@@ -53,3 +55,3 @@ static create(options) {

/**
* A reusable options object for sqs.send that's used to avoid duplication.
* A reusable options object for queue.sending that's used to avoid duplication.
*/

@@ -149,5 +151,5 @@ get fetchOptions() {

/**
* Handles the response from AWS SQS, determining if we should proceed to
* Handles the response from CloudFlare, determining if we should proceed to
* the message handler.
* @param response The output from AWS SQS
* @param response The output from CloudFlare
*/

@@ -161,3 +163,8 @@ async handleQueueResponse(response) {

if (hasMessages(response)) {
await Promise.all(response.result.messages.map((message) => this.processMessage(message)));
if (this.handleMessageBatch) {
await this.processMessageBatch(response.result.messages);
}
else {
await Promise.all(response.result.messages.map((message) => this.processMessage(message)));
}
this.emit("response_processed");

@@ -183,7 +190,3 @@ }

// in a batch, rather than one at a time.
await this.acknowledgeMessage([
{
lease_id: message.lease_id,
},
], []);
await this.acknowledgeMessage([message], []);
this.emit("message_processed", message);

@@ -194,13 +197,36 @@ }

this.emitError(err, message);
// TODO: In order to conserve API reate limits, it would be better to do this
// in a batch, rather than one at a time.
await this.acknowledgeMessage([], [
{
lease_id: message.lease_id,
delay_seconds: this.retryMessageDelay,
},
]);
if (this.retryMessagesOnError) {
// TODO: In order to conserve API reate limits, it would be better to do this
// in a batch, rather than one at a time.
await this.acknowledgeMessage([], [message]);
}
}
}
/**
* Process a batch of messages from the SQS queue.
* @param messages The messages that were delivered from SQS
*/
async processMessageBatch(messages) {
try {
messages.forEach((message) => {
this.emit("message_received", message);
});
// TODO: Invesitgate if we can do heartbear checks here like SQS Consumer
// https://github.com/bbc/sqs-consumer/blob/main/src/consumer.ts#L375
const ackedMessages = await this.executeBatchHandler(messages);
if (ackedMessages?.length > 0) {
await this.acknowledgeMessage(ackedMessages, []);
ackedMessages.forEach((message) => {
this.emit("message_processed", message);
});
}
}
catch (err) {
this.emit("error", err, messages);
if (this.retryMessagesOnError) {
await this.acknowledgeMessage([], messages);
}
}
}
/**
* Trigger the applications handleMessage function

@@ -244,6 +270,24 @@ * @param message The message that was received from CloudFlare

/**
* Change the visibility timeout on a message
* @param message The message to change the value of
* @param timeout The new timeout that should be set
* Execute the application's message batch handler
* @param messages The messages that should be forwarded from the SQS queue
*/
async executeBatchHandler(messages) {
try {
const result = await this.handleMessageBatch(messages);
return !this.alwaysAcknowledge && result instanceof Object
? result
: messages;
}
catch (err) {
if (err instanceof Error) {
throw toStandardError(err, `Unexpected message handler failure: ${err.message}`);
}
throw err;
}
}
/**
* Acknowledge a message that has been processed by the consumer
* @param acks The message(s) to acknowledge
* @param retries The message(s) to retry
*/
async acknowledgeMessage(acks, retries) {

@@ -253,4 +297,8 @@ try {

// TODO: This doesn't appear to be acknowledging correctly....
const input = { acks, retries };
this.emit("acknowledging_messages", acks, retries);
const retriesWithDelay = retries.map((message) => ({
...message,
delay_seconds: this.retryMessageDelay,
}));
const input = { acks, retries: retriesWithDelay };
this.emit("acknowledging_messages", acks, retriesWithDelay);
const result = await queuesClient({

@@ -275,2 +323,12 @@ ...this.fetchOptions,

/**
* Validates and then updates the provided option to the provided value.
* @param option The option to validate and then update
* @param value The value to set the provided option to
*/
updateOption(option, value) {
validateOption(option, value, true);
this[option] = value;
this.emit("option_updated", option, value);
}
/**
* Emit one of the consumer's error events depending on the error received.

@@ -277,0 +335,0 @@ * @param err The error object to forward on

@@ -8,3 +8,3 @@ export class ProviderError extends Error {

/**
* Formats an AWSError the the SQSError type.
* Formats a provider's error the the ProviderError type.
* @param err The error object that was received.

@@ -15,4 +15,10 @@ * @param message The message to send with the error.

const error = new ProviderError(message);
error.code = err.code;
error.stack = err?.stack;
error.time = new Date();
if (err.response) {
error.status = err.response.status;
error.statusText = err.response.statusText;
error.url = err.response.url;
}
return error;

@@ -19,0 +25,0 @@ }

@@ -0,13 +1,13 @@

import { toProviderError, ProviderError } from "../errors.js";
export function throwErrorIfResponseNotOk(response) {
const { ok, status, statusText, url } = response;
if (!ok) {
let error;
if (status) {
error = new Error(`[${status} - ${statusText}] ${url}`);
}
else {
error = new Error(`[${statusText}] ${url}`);
}
throw error;
const message = status
? `[${status} - ${statusText}] ${url}`
: `[${statusText}] ${url}`;
const error = new ProviderError(message);
error.code = "ResponseNotOk";
Object.defineProperty(error, "response", { value: response });
throw toProviderError(error, message);
}
}

@@ -1,11 +0,24 @@

const requiredOptions = ["accountId", "queueId", "handleMessage"];
const requiredOptions = [
"accountId",
"queueId",
// only one of handleMessage / handleMessagesBatch is required
"handleMessage|handleMessageBatch",
];
function validateOption(option, value, strict) {
switch (option) {
case "batchSize":
if (value < 1) {
throw new Error("batchSize must be at least 1.");
if (value > 100 || value < 1) {
throw new Error("batchSize must be between 1 and 100");
}
break;
case "visibilityTimeoutMs":
if (value > 43200000) {
throw new Error("visibilityTimeoutMs must be less than 43200000");
}
break;
case "retryMessageDelay":
if (value > 42300) {
throw new Error("retryMessageDelay must be less than 42300");
}
break;
default:

@@ -12,0 +25,0 @@ if (strict) {

/// <reference types="node" resolution-mode="require"/>
import { TypedEventEmitter } from "./emitter.js";
import type { ConsumerOptions } from "./types.js";
import type { ConsumerOptions, UpdatableOptions, StopOptions } from "./types.js";
/**

@@ -11,4 +11,6 @@ * [Usage](https://bbc.github.io/cloudflare-queue-consumer/index.html#usage)

private handleMessage;
private handleMessageBatch;
private batchSize;
private visibilityTimeoutMs;
private retryMessagesOnError;
private pollingWaitTimeMs;

@@ -28,3 +30,3 @@ private pollingTimeoutId;

/**
* Creates a new SQS consumer.
* Creates a new consumer.
*/

@@ -37,3 +39,3 @@ static create(options: ConsumerOptions): Consumer;

/**
* A reusable options object for sqs.send that's used to avoid duplication.
* A reusable options object for queue.sending that's used to avoid duplication.
*/

@@ -44,5 +46,3 @@ private get fetchOptions();

*/
stop(options?: {
abort?: boolean;
}): void;
stop(options?: StopOptions): void;
/**

@@ -66,5 +66,5 @@ * Returns the current status of the consumer.

/**
* Handles the response from AWS SQS, determining if we should proceed to
* Handles the response from CloudFlare, determining if we should proceed to
* the message handler.
* @param response The output from AWS SQS
* @param response The output from CloudFlare
*/

@@ -79,2 +79,7 @@ private handleQueueResponse;

/**
* Process a batch of messages from the SQS queue.
* @param messages The messages that were delivered from SQS
*/
private processMessageBatch;
/**
* Trigger the applications handleMessage function

@@ -85,8 +90,19 @@ * @param message The message that was received from CloudFlare

/**
* Change the visibility timeout on a message
* @param message The message to change the value of
* @param timeout The new timeout that should be set
* Execute the application's message batch handler
* @param messages The messages that should be forwarded from the SQS queue
*/
private executeBatchHandler;
/**
* Acknowledge a message that has been processed by the consumer
* @param acks The message(s) to acknowledge
* @param retries The message(s) to retry
*/
private acknowledgeMessage;
/**
* Validates and then updates the provided option to the provided value.
* @param option The option to validate and then update
* @param value The value to set the provided option to
*/
updateOption(option: UpdatableOptions, value: ConsumerOptions[UpdatableOptions]): void;
/**
* Emit one of the consumer's error events depending on the error received.

@@ -93,0 +109,0 @@ * @param err The error object to forward on

@@ -0,13 +1,21 @@

/// <reference types="node" resolution-mode="require"/>
export declare class ProviderError extends Error {
stack: string;
code: string;
time: Date;
status: number;
statusText?: string;
url?: string;
constructor(message: string);
}
interface ErrorWithResponse extends Error {
code: string;
response?: Response;
}
/**
* Formats an AWSError the the SQSError type.
* Formats a provider's error the the ProviderError type.
* @param err The error object that was received.
* @param message The message to send with the error.
*/
export declare function toProviderError(err: Error, message: string): ProviderError;
export declare function toProviderError(err: ErrorWithResponse, message: string): ProviderError;
export declare class StandardError extends Error {

@@ -35,1 +43,2 @@ cause: Error;

export declare function toTimeoutError(err: TimeoutError, message: string): TimeoutError;
export {};

@@ -17,2 +17,7 @@ /**

/**
* If the Consumer should trigger the message(s) to be retired on
* @defaultvalue false
*/
retryMessagesOnError?: boolean;
/**
* You CloudFlare account id

@@ -26,2 +31,22 @@ */

/**
* An `async` function (or function that returns a `Promise`) to be called whenever
* a message is received.
*
* In the case that you need to acknowledge the message, return an object containing
* the MessageId that you'd like to acknowledge.
*/
handleMessage?(message: Message): Promise<Message | void>;
/**
* An `async` function (or function that returns a `Promise`) to be called whenever
* a batch of messages is received. Similar to `handleMessage` but will receive the
* list of messages, not each message individually, this is preferred to reduce API
* rate limits.
*
* **If both are set, `handleMessageBatch` overrides `handleMessage`**.
*
* In the case that you need to ack only some of the messages, return an array with
* the successful messages only.
*/
handleMessageBatch?(messages: Message[]): Promise<Message[] | void>;
/**
* Time in ms to wait for `handleMessage` to process a message before timing out.

@@ -46,3 +71,24 @@ *

retryMessageDelay?: number;
/**
* The duration (in milliseconds) to wait before repolling the queue.
* (Note: As CloudFlare uses short polling, you probably shouldn't set this too low)
* @defaultvalue `1000`
*/
pollingWaitTimeMs?: number;
}
/**
* A subset of the ConsumerOptions that can be updated at runtime.
*/
export type UpdatableOptions = "visibilityTimeoutMs" | "batchSize" | "pollingWaitTimeMs";
/**
* The options for the stop method.
*/
export interface StopOptions {
/**
* Default to `false`, if you want the stop action to also abort requests to SQS
* set this to `true`.
* @defaultvalue `false`
*/
abort?: boolean;
}
export type Message = {

@@ -126,3 +172,3 @@ body: string;

/**
* Fired when requests to SQS were aborted.
* Fired when requests to CloudFlare were aborted.
*/

@@ -160,2 +206,6 @@ aborted: [];

];
/**
* Fired when an option is updated
*/
option_updated: [UpdatableOptions, ConsumerOptions[UpdatableOptions]];
}
{
"name": "@bbc/cloudflare-queue-consumer",
"version": "0.0.3",
"version": "0.0.4",
"description": "Build CloudFlare Queue applications without the boilerplate",

@@ -14,3 +14,3 @@ "repository": {

"keywords": [
"sqs",
"cloudflare",
"queue",

@@ -17,0 +17,0 @@ "consumer"

@@ -39,7 +39,25 @@ # cloudflare-queue-consumer

app.on("error", (err) => {
console.error(err.message);
});
app.on("processing_error", (err) => {
console.error(err.message);
});
consumer.start();
```
TODO: Add more information
Some implementation notes:
- Pull consumers are designed to use a "short polling" approach, this means that the API from CloudFlare will respond immediately with any messages that are available, or an empty response if there are no messages available, this is different from SQS which will wait an amount of time before responding with an empty response.
- `handleMessage` will send one message to the handler at a time, if you would prefer to receive multiple messages at once, use the `handleMessageBatch` method instead.
- It is important to await any processing that you are doing to ensure that the next action only happens after your processing has completed.
- By default, messages that are sent to the functions will be considered as processed if they return without an error.
- To acknowledge, you can return a promise that resolves the message or messages that you want to acknowledge.
- Returning an empty object or an empty array will be considered an acknowledgment of no message(s). If you would like to change this behaviour, you can set the `alwaysAcknowledge` option to `true`.
- By default, if an object or an array is not returned, all messages will be acknowledged.
- Any message that errors will not be retried until the end of the visibility timeout, if you would like to trigger an immediate retry, you can set the `retryMessagesOnError` option to `true`.
- You can set a delay for this retry with the `retryMessageDelay` option.
### Credentials

@@ -55,2 +73,40 @@

## API
### `Consumer.create(options)`
Creates a new SQS consumer using the [defined options](https://bbc.github.io/cloudflare-queue-consumer/interfaces/ConsumerOptions.html).
### `consumer.start()`
Start polling the queue for messages.
### `consumer.stop(options)`
Stop polling the queue for messages. [You can find the options definition here](https://bbc.github.io/cloudflare-queue-consumerinterfaces/StopOptions.html).
By default, the value of `abort` is set to `false` which means pre existing requests to CloudFlare will still be made until they have concluded. If you would like to abort these requests instead, pass the abort value as `true`, like so:
`consumer.stop({ abort: true })`
### `consumer.status`
Returns the current status of the consumer.
- `isRunning` - `true` if the consumer has been started and not stopped, `false` if was not started or if it was stopped.
- `isPolling` - `true` if the consumer is actively polling, `false` if it is not.
### `consumer.updateOption(option, value)`
Updates the provided option with the provided value.
Please note that any update of the option `pollingWaitTimeMs` will take effect only on next polling cycle.
You can [find out more about this here](https://bbc.github.io/cloudflare-queue-consumer/classes/Consumer.html#updateOption).
### Events
Each consumer is an [`EventEmitter`](https://nodejs.org/api/events.html) and [emits these events](https://bbc.github.io/cloudflare-queue-consumer/interfaces/Events.html).
## Contributing

@@ -57,0 +113,0 @@

@@ -7,4 +7,6 @@ import { TypedEventEmitter } from "./emitter.js";

AckMessageResponse,
UpdatableOptions,
StopOptions,
} from "./types.js";
import { assertOptions, hasMessages } from "./validation.js";
import { assertOptions, hasMessages, validateOption } from "./validation.js";
import { queuesClient } from "./lib/cloudflare.js";

@@ -28,4 +30,6 @@ import { logger } from "./logger.js";

private handleMessage: (message: Message) => Promise<Message | void>;
private handleMessageBatch: (message: Message[]) => Promise<Message[] | void>;
private batchSize: number;
private visibilityTimeoutMs: number;
private retryMessagesOnError: boolean;
private pollingWaitTimeMs: number;

@@ -50,4 +54,6 @@ private pollingTimeoutId: NodeJS.Timeout;

this.handleMessage = options.handleMessage;
this.handleMessageBatch = options.handleMessageBatch;
this.batchSize = options.batchSize ?? 10;
this.visibilityTimeoutMs = options.visibilityTimeoutMs ?? 1000;
this.retryMessagesOnError = options.retryMessagesOnError || false;
this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 1000;

@@ -60,3 +66,3 @@ this.handleMessageTimeout = options.handleMessageTimeout;

/**
* Creates a new SQS consumer.
* Creates a new consumer.
*/

@@ -83,3 +89,3 @@ public static create(options: ConsumerOptions): Consumer {

/**
* A reusable options object for sqs.send that's used to avoid duplication.
* A reusable options object for queue.sending that's used to avoid duplication.
*/

@@ -97,3 +103,3 @@ private get fetchOptions(): { signal: AbortSignal } {

*/
public stop(options?: { abort?: boolean }): void {
public stop(options?: StopOptions): void {
if (this.stopped) {

@@ -200,5 +206,5 @@ logger.debug("already_stopped");

/**
* Handles the response from AWS SQS, determining if we should proceed to
* Handles the response from CloudFlare, determining if we should proceed to
* the message handler.
* @param response The output from AWS SQS
* @param response The output from CloudFlare
*/

@@ -215,7 +221,11 @@ private async handleQueueResponse(

if (hasMessages(response)) {
await Promise.all(
response.result.messages.map((message: Message) =>
this.processMessage(message),
),
);
if (this.handleMessageBatch) {
await this.processMessageBatch(response.result.messages);
} else {
await Promise.all(
response.result.messages.map((message: Message) =>
this.processMessage(message),
),
);
}

@@ -245,10 +255,3 @@ this.emit("response_processed");

// in a batch, rather than one at a time.
await this.acknowledgeMessage(
[
{
lease_id: message.lease_id,
},
],
[],
);
await this.acknowledgeMessage([message], []);

@@ -260,13 +263,7 @@ this.emit("message_processed", message);

// TODO: In order to conserve API reate limits, it would be better to do this
// in a batch, rather than one at a time.
await this.acknowledgeMessage(
[],
[
{
lease_id: message.lease_id,
delay_seconds: this.retryMessageDelay,
},
],
);
if (this.retryMessagesOnError) {
// TODO: In order to conserve API reate limits, it would be better to do this
// in a batch, rather than one at a time.
await this.acknowledgeMessage([], [message]);
}
}

@@ -276,2 +273,33 @@ }

/**
* Process a batch of messages from the SQS queue.
* @param messages The messages that were delivered from SQS
*/
private async processMessageBatch(messages: Message[]): Promise<void> {
try {
messages.forEach((message: Message): void => {
this.emit("message_received", message);
});
// TODO: Invesitgate if we can do heartbear checks here like SQS Consumer
// https://github.com/bbc/sqs-consumer/blob/main/src/consumer.ts#L375
const ackedMessages: Message[] = await this.executeBatchHandler(messages);
if (ackedMessages?.length > 0) {
await this.acknowledgeMessage(ackedMessages, []);
ackedMessages.forEach((message: Message): void => {
this.emit("message_processed", message);
});
}
} catch (err) {
this.emit("error", err, messages);
if (this.retryMessagesOnError) {
await this.acknowledgeMessage([], messages);
}
}
}
/**
* Trigger the applications handleMessage function

@@ -321,14 +349,31 @@ * @param message The message that was received from CloudFlare

/**
* Change the visibility timeout on a message
* @param message The message to change the value of
* @param timeout The new timeout that should be set
* Execute the application's message batch handler
* @param messages The messages that should be forwarded from the SQS queue
*/
private async executeBatchHandler(messages: Message[]): Promise<Message[]> {
try {
const result: void | Message[] = await this.handleMessageBatch(messages);
return !this.alwaysAcknowledge && result instanceof Object
? result
: messages;
} catch (err) {
if (err instanceof Error) {
throw toStandardError(
err,
`Unexpected message handler failure: ${err.message}`,
);
}
throw err;
}
}
/**
* Acknowledge a message that has been processed by the consumer
* @param acks The message(s) to acknowledge
* @param retries The message(s) to retry
*/
private async acknowledgeMessage(
acks: {
lease_id: string;
}[],
retries: {
lease_id: string;
delay_seconds: number;
}[],
acks: Message[],
retries: Message[],
): Promise<AckMessageResponse> {

@@ -338,4 +383,8 @@ try {

// TODO: This doesn't appear to be acknowledging correctly....
const input = { acks, retries };
this.emit("acknowledging_messages", acks, retries);
const retriesWithDelay = retries.map((message) => ({
...message,
delay_seconds: this.retryMessageDelay,
}));
const input = { acks, retries: retriesWithDelay };
this.emit("acknowledging_messages", acks, retriesWithDelay);

@@ -367,2 +416,18 @@ const result = await queuesClient<AckMessageResponse>({

/**
* Validates and then updates the provided option to the provided value.
* @param option The option to validate and then update
* @param value The value to set the provided option to
*/
public updateOption(
option: UpdatableOptions,
value: ConsumerOptions[UpdatableOptions],
): void {
validateOption(option, value, true);
this[option] = value;
this.emit("option_updated", option, value);
}
/**
* Emit one of the consumer's error events depending on the error received.

@@ -369,0 +434,0 @@ * @param err The error object to forward on

export class ProviderError extends Error {
stack: string;
code: string;
time: Date;
status: number;
statusText?: string;
url?: string;

@@ -12,12 +15,27 @@ constructor(message: string) {

interface ErrorWithResponse extends Error {
code: string;
response?: Response;
}
/**
* Formats an AWSError the the SQSError type.
* Formats a provider's error the the ProviderError type.
* @param err The error object that was received.
* @param message The message to send with the error.
*/
export function toProviderError(err: Error, message: string): ProviderError {
export function toProviderError(
err: ErrorWithResponse,
message: string,
): ProviderError {
const error = new ProviderError(message);
error.code = err.code;
error.stack = err?.stack;
error.time = new Date();
if (err.response) {
error.status = err.response.status;
error.statusText = err.response.statusText;
error.url = err.response.url;
}
return error;

@@ -24,0 +42,0 @@ }

@@ -0,1 +1,3 @@

import { toProviderError, ProviderError } from "../errors.js";
export function throwErrorIfResponseNotOk(response: Response): void {

@@ -5,12 +7,12 @@ const { ok, status, statusText, url } = response;

if (!ok) {
let error: Error;
const message = status
? `[${status} - ${statusText}] ${url}`
: `[${statusText}] ${url}`;
if (status) {
error = new Error(`[${status} - ${statusText}] ${url}`);
} else {
error = new Error(`[${statusText}] ${url}`);
}
const error = new ProviderError(message);
error.code = "ResponseNotOk";
Object.defineProperty(error, "response", { value: response });
throw error;
throw toProviderError(error, message);
}
}

@@ -17,2 +17,7 @@ /**

/**
* If the Consumer should trigger the message(s) to be retired on
* @defaultvalue false
*/
retryMessagesOnError?: boolean;
/**
* You CloudFlare account id

@@ -26,2 +31,22 @@ */

/**
* An `async` function (or function that returns a `Promise`) to be called whenever
* a message is received.
*
* In the case that you need to acknowledge the message, return an object containing
* the MessageId that you'd like to acknowledge.
*/
handleMessage?(message: Message): Promise<Message | void>;
/**
* An `async` function (or function that returns a `Promise`) to be called whenever
* a batch of messages is received. Similar to `handleMessage` but will receive the
* list of messages, not each message individually, this is preferred to reduce API
* rate limits.
*
* **If both are set, `handleMessageBatch` overrides `handleMessage`**.
*
* In the case that you need to ack only some of the messages, return an array with
* the successful messages only.
*/
handleMessageBatch?(messages: Message[]): Promise<Message[] | void>;
/**
* Time in ms to wait for `handleMessage` to process a message before timing out.

@@ -46,4 +71,31 @@ *

retryMessageDelay?: number;
/**
* The duration (in milliseconds) to wait before repolling the queue.
* (Note: As CloudFlare uses short polling, you probably shouldn't set this too low)
* @defaultvalue `1000`
*/
pollingWaitTimeMs?: number;
}
/**
* A subset of the ConsumerOptions that can be updated at runtime.
*/
export type UpdatableOptions =
| "visibilityTimeoutMs"
| "batchSize"
| "pollingWaitTimeMs";
/**
* The options for the stop method.
*/
export interface StopOptions {
/**
* Default to `false`, if you want the stop action to also abort requests to SQS
* set this to `true`.
* @defaultvalue `false`
*/
abort?: boolean;
}
export type Message = {

@@ -132,3 +184,3 @@ body: string;

/**
* Fired when requests to SQS were aborted.
* Fired when requests to CloudFlare were aborted.
*/

@@ -166,2 +218,6 @@ aborted: [];

];
/**
* Fired when an option is updated
*/
option_updated: [UpdatableOptions, ConsumerOptions[UpdatableOptions]];
}
import type { ConsumerOptions, PullMessagesResponse } from "./types.js";
const requiredOptions = ["accountId", "queueId", "handleMessage"];
const requiredOptions = [
"accountId",
"queueId",
// only one of handleMessage / handleMessagesBatch is required
"handleMessage|handleMessageBatch",
];

@@ -8,8 +13,16 @@ function validateOption(option: string, value: number, strict?: boolean): void {

case "batchSize":
if (value < 1) {
throw new Error("batchSize must be at least 1.");
if (value > 100 || value < 1) {
throw new Error("batchSize must be between 1 and 100");
}
break;
case "visibilityTimeoutMs":
if (value > 43200000) {
throw new Error("visibilityTimeoutMs must be less than 43200000");
}
break;
case "retryMessageDelay":
if (value > 42300) {
throw new Error("retryMessageDelay must be less than 42300");
}
break;
default:

@@ -16,0 +29,0 @@ if (strict) {

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc