You're Invited: Meet the Socket team at BSidesSF and RSAC - April 27 - May 1.RSVP →

@bbc/cloudflare-queue-consumer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@bbc/cloudflare-queue-consumer - npm Package Compare versions

Comparing version

to
0.0.3

@@ -8,2 +8,4 @@ "use strict";

const logger_js_1 = require("./logger.js");
const errors_js_1 = require("./errors.js");
// TODO: Document how to use this in the README
/**

@@ -28,2 +30,5 @@ * [Usage](https://bbc.github.io/cloudflare-queue-consumer/index.html#usage)

this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 1000;
this.handleMessageTimeout = options.handleMessageTimeout;
this.alwaysAcknowledge = options.alwaysAcknowledge ?? false;
this.retryMessageDelay = options.retryMessageDelay ?? 10;
}

@@ -37,12 +42,2 @@ /**

/**
* Returns the current status of the consumer.
* This includes whether it is running or currently polling.
*/
get status() {
return {
isRunning: !this.stopped,
isPolling: this.isPolling,
};
}
/**
* Start polling the queue.

@@ -54,12 +49,51 @@ */

}
// Create a new abort controller each time the consumer is started
this.abortController = new AbortController();
logger_js_1.logger.debug("starting");
this.stopped = false;
this.emit("started");
this.poll();
}
/**
* A reusable options object for sqs.send that's used to avoid duplication.
*/
get fetchOptions() {
return {
// return the current abortController signal or a fresh signal that has not been aborted.
// This effectively defaults the signal sent to not aborted
signal: this.abortController?.signal || new AbortController().signal,
};
}
/**
* Stop polling the queue.
*/
stop() {
stop(options) {
if (this.stopped) {
logger_js_1.logger.debug("already_stopped");
return;
}
logger_js_1.logger.debug("stopping");
this.stopped = true;
if (this.pollingTimeoutId) {
clearTimeout(this.pollingTimeoutId);
this.pollingTimeoutId = undefined;
}
if (options?.abort) {
logger_js_1.logger.debug("aborting");
this.abortController.abort();
this.emit("aborted");
}
this.emit("stopped");
}
/**
* Returns the current status of the consumer.
* This includes whether it is running or currently polling.
*/
get status() {
return {
isRunning: !this.stopped,
isPolling: this.isPolling,
};
}
/**
* Poll the queue for messages.

@@ -76,4 +110,29 @@ */

this.isPolling = true;
const currentPollingTimeout = this.pollingWaitTimeMs;
this.receiveMessage()
.then((output) => this.handleQueueResponse(output))
.then(() => {
if (this.pollingTimeoutId) {
clearTimeout(this.pollingTimeoutId);
}
this.pollingTimeoutId = setTimeout(() => this.poll(), currentPollingTimeout);
})
.catch((err) => {
// TODO: Adjust the error handling here
// TODO: Add an extension to the polling timeout if auth error
this.emit("error", err);
setTimeout(() => this.poll(), this.pollingWaitTimeMs);
})
.finally(() => {
this.isPolling = false;
});
}
/**
* Send a request to CloudFlare Queues to retrieve messages
* @param params The required params to receive messages from CloudFlare Queues
*/
async receiveMessage() {
try {
const response = await (0, cloudflare_js_1.queuesClient)({
const result = (0, cloudflare_js_1.queuesClient)({
...this.fetchOptions,
path: "messages/pull",

@@ -88,50 +147,148 @@ method: "POST",

});
if (!response.success) {
this.emit("error", new Error("Failed to pull messages"));
this.isPolling = false;
return;
return result;
}
catch (err) {
throw (0, errors_js_1.toProviderError)(err, `Receive message failed: ${err.message}`);
}
}
/**
* Handles the response from AWS SQS, determining if we should proceed to
* the message handler.
* @param response The output from AWS SQS
*/
async handleQueueResponse(response) {
if (!response.success) {
this.emit("error", new Error("Failed to pull messages"));
this.isPolling = false;
return;
}
if ((0, validation_js_1.hasMessages)(response)) {
await Promise.all(response.result.messages.map((message) => this.processMessage(message)));
this.emit("response_processed");
}
else if (response) {
this.emit("empty");
}
}
/**
* Process a message that has been received from CloudFlare Queues. This will execute the message
* handler and delete the message once complete.
* @param message The message that was delivered from CloudFlare
*/
async processMessage(message) {
try {
this.emit("message_received", message);
// TODO: Invesitgate if we can do heartbear checks here like SQS Consumer
// https://github.com/bbc/sqs-consumer/blob/main/src/consumer.ts#L339
const ackedMessage = await this.executeHandler(message);
if (ackedMessage?.id === message.id) {
// TODO: In order to conserve API reate limits, it would be better to do this
// in a batch, rather than one at a time.
await this.acknowledgeMessage([
{
lease_id: message.lease_id,
},
], []);
this.emit("message_processed", message);
}
const { messages } = response.result;
if (!messages || messages.length === 0) {
this.emit("empty");
this.isPolling = false;
return;
}
catch (err) {
this.emitError(err, message);
// TODO: In order to conserve API reate limits, it would be better to do this
// in a batch, rather than one at a time.
await this.acknowledgeMessage([], [
{
lease_id: message.lease_id,
delay_seconds: this.retryMessageDelay,
},
]);
}
}
/**
* Trigger the applications handleMessage function
* @param message The message that was received from CloudFlare
*/
async executeHandler(message) {
let handleMessageTimeoutId = undefined;
try {
let result;
if (this.handleMessageTimeout) {
const pending = new Promise((_, reject) => {
handleMessageTimeoutId = setTimeout(() => {
reject(new errors_js_1.TimeoutError());
}, this.handleMessageTimeout);
});
result = await Promise.race([this.handleMessage(message), pending]);
}
const successfulMessages = [];
const failedMessages = [];
for (const message of messages) {
this.emit("message_received", message);
try {
const result = await this.handleMessage(message);
logger_js_1.logger.debug("message_processed", { result });
if (result) {
successfulMessages.push(message.lease_id);
this.emit("message_processed", message);
}
}
catch (e) {
failedMessages.push(message.lease_id);
this.emit("processing_error", e, message);
}
else {
result = await this.handleMessage(message);
}
logger_js_1.logger.debug("acknowledging_messages", {
successfulMessages,
failedMessages,
});
await (0, cloudflare_js_1.queuesClient)({
return !this.alwaysAcknowledge && result instanceof Object
? result
: message;
}
catch (err) {
if (err instanceof errors_js_1.TimeoutError) {
throw (0, errors_js_1.toTimeoutError)(err, `Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`);
}
else if (err instanceof Error) {
throw (0, errors_js_1.toStandardError)(err, `Unexpected message handler failure: ${err.message}`);
}
throw err;
}
finally {
if (handleMessageTimeoutId) {
clearTimeout(handleMessageTimeoutId);
}
}
}
/**
* Change the visibility timeout on a message
* @param message The message to change the value of
* @param timeout The new timeout that should be set
*/
async acknowledgeMessage(acks, retries) {
try {
// TODO: this is pretty hacky
// TODO: This doesn't appear to be acknowledging correctly....
const input = { acks, retries };
this.emit("acknowledging_messages", acks, retries);
const result = await (0, cloudflare_js_1.queuesClient)({
...this.fetchOptions,
path: "messages/ack",
method: "POST",
body: { acks: successfulMessages, retries: failedMessages },
body: input,
accountId: this.accountId,
queueId: this.queueId,
});
this.emit("response_processed");
if (!result.success) {
throw new Error("Message Acknowledgement did not succeed.");
}
this.emit("acknowledged_messages", result.result);
return result;
}
catch (e) {
this.emit("error", e);
catch (err) {
this.emit("error", (0, errors_js_1.toProviderError)(err, `Error acknowledging messages: ${err.message}`));
}
this.isPolling = false;
setTimeout(() => this.poll(), this.pollingWaitTimeMs);
}
/**
* Emit one of the consumer's error events depending on the error received.
* @param err The error object to forward on
* @param message The message that the error occurred on
*/
emitError(err, message) {
if (!message) {
this.emit("error", err);
}
else if (err.name === errors_js_1.ProviderError.name) {
this.emit("error", err, message);
}
else if (err instanceof errors_js_1.TimeoutError) {
this.emit("timeout_error", err, message);
}
else {
this.emit("processing_error", err, message);
}
}
}
exports.Consumer = Consumer;

@@ -9,3 +9,3 @@ "use strict";

if (!QUEUES_API_TOKEN) {
throw new Error("Missing Cloudflare credentials");
throw new Error("Missing Cloudflare credentials, please set a QUEUES_API_TOKEN in the environment variables.");
}

@@ -17,3 +17,3 @@ return {

exports.getCredentials = getCredentials;
async function queuesClient({ path, method, body, accountId, queueId, }) {
async function queuesClient({ path, method, body, accountId, queueId, signal, }) {
const { QUEUES_API_TOKEN } = getCredentials();

@@ -27,2 +27,3 @@ const response = await fetch(`${CLOUDFLARE_HOST}/accounts/${accountId}/queues/${queueId}/${path}`, {

body: JSON.stringify(body),
signal,
});

@@ -29,0 +30,0 @@ (0, fetch_js_1.throwErrorIfResponseNotOk)(response);

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.validateOption = exports.assertOptions = void 0;
exports.hasMessages = exports.validateOption = exports.assertOptions = void 0;
const requiredOptions = ["accountId", "queueId", "handleMessage"];

@@ -38,1 +38,9 @@ function validateOption(option, value, strict) {

exports.assertOptions = assertOptions;
/**
* Determine if the response has messages in it.
* @param response The response from CloudFlare.
*/
function hasMessages(response) {
return response?.result?.messages && response.result.messages.length > 0;
}
exports.hasMessages = hasMessages;
import { TypedEventEmitter } from "./emitter.js";
import { assertOptions } from "./validation.js";
import { assertOptions, hasMessages } from "./validation.js";
import { queuesClient } from "./lib/cloudflare.js";
import { logger } from "./logger.js";
import { toProviderError, ProviderError, toStandardError, toTimeoutError, TimeoutError, } from "./errors.js";
// TODO: Document how to use this in the README
/**

@@ -24,2 +26,5 @@ * [Usage](https://bbc.github.io/cloudflare-queue-consumer/index.html#usage)

this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 1000;
this.handleMessageTimeout = options.handleMessageTimeout;
this.alwaysAcknowledge = options.alwaysAcknowledge ?? false;
this.retryMessageDelay = options.retryMessageDelay ?? 10;
}

@@ -33,12 +38,2 @@ /**

/**
* Returns the current status of the consumer.
* This includes whether it is running or currently polling.
*/
get status() {
return {
isRunning: !this.stopped,
isPolling: this.isPolling,
};
}
/**
* Start polling the queue.

@@ -50,12 +45,51 @@ */

}
// Create a new abort controller each time the consumer is started
this.abortController = new AbortController();
logger.debug("starting");
this.stopped = false;
this.emit("started");
this.poll();
}
/**
* A reusable options object for sqs.send that's used to avoid duplication.
*/
get fetchOptions() {
return {
// return the current abortController signal or a fresh signal that has not been aborted.
// This effectively defaults the signal sent to not aborted
signal: this.abortController?.signal || new AbortController().signal,
};
}
/**
* Stop polling the queue.
*/
stop() {
stop(options) {
if (this.stopped) {
logger.debug("already_stopped");
return;
}
logger.debug("stopping");
this.stopped = true;
if (this.pollingTimeoutId) {
clearTimeout(this.pollingTimeoutId);
this.pollingTimeoutId = undefined;
}
if (options?.abort) {
logger.debug("aborting");
this.abortController.abort();
this.emit("aborted");
}
this.emit("stopped");
}
/**
* Returns the current status of the consumer.
* This includes whether it is running or currently polling.
*/
get status() {
return {
isRunning: !this.stopped,
isPolling: this.isPolling,
};
}
/**
* Poll the queue for messages.

@@ -72,4 +106,29 @@ */

this.isPolling = true;
const currentPollingTimeout = this.pollingWaitTimeMs;
this.receiveMessage()
.then((output) => this.handleQueueResponse(output))
.then(() => {
if (this.pollingTimeoutId) {
clearTimeout(this.pollingTimeoutId);
}
this.pollingTimeoutId = setTimeout(() => this.poll(), currentPollingTimeout);
})
.catch((err) => {
// TODO: Adjust the error handling here
// TODO: Add an extension to the polling timeout if auth error
this.emit("error", err);
setTimeout(() => this.poll(), this.pollingWaitTimeMs);
})
.finally(() => {
this.isPolling = false;
});
}
/**
* Send a request to CloudFlare Queues to retrieve messages
* @param params The required params to receive messages from CloudFlare Queues
*/
async receiveMessage() {
try {
const response = await queuesClient({
const result = queuesClient({
...this.fetchOptions,
path: "messages/pull",

@@ -84,49 +143,147 @@ method: "POST",

});
if (!response.success) {
this.emit("error", new Error("Failed to pull messages"));
this.isPolling = false;
return;
return result;
}
catch (err) {
throw toProviderError(err, `Receive message failed: ${err.message}`);
}
}
/**
* Handles the response from AWS SQS, determining if we should proceed to
* the message handler.
* @param response The output from AWS SQS
*/
async handleQueueResponse(response) {
if (!response.success) {
this.emit("error", new Error("Failed to pull messages"));
this.isPolling = false;
return;
}
if (hasMessages(response)) {
await Promise.all(response.result.messages.map((message) => this.processMessage(message)));
this.emit("response_processed");
}
else if (response) {
this.emit("empty");
}
}
/**
* Process a message that has been received from CloudFlare Queues. This will execute the message
* handler and delete the message once complete.
* @param message The message that was delivered from CloudFlare
*/
async processMessage(message) {
try {
this.emit("message_received", message);
// TODO: Invesitgate if we can do heartbear checks here like SQS Consumer
// https://github.com/bbc/sqs-consumer/blob/main/src/consumer.ts#L339
const ackedMessage = await this.executeHandler(message);
if (ackedMessage?.id === message.id) {
// TODO: In order to conserve API reate limits, it would be better to do this
// in a batch, rather than one at a time.
await this.acknowledgeMessage([
{
lease_id: message.lease_id,
},
], []);
this.emit("message_processed", message);
}
const { messages } = response.result;
if (!messages || messages.length === 0) {
this.emit("empty");
this.isPolling = false;
return;
}
catch (err) {
this.emitError(err, message);
// TODO: In order to conserve API reate limits, it would be better to do this
// in a batch, rather than one at a time.
await this.acknowledgeMessage([], [
{
lease_id: message.lease_id,
delay_seconds: this.retryMessageDelay,
},
]);
}
}
/**
* Trigger the applications handleMessage function
* @param message The message that was received from CloudFlare
*/
async executeHandler(message) {
let handleMessageTimeoutId = undefined;
try {
let result;
if (this.handleMessageTimeout) {
const pending = new Promise((_, reject) => {
handleMessageTimeoutId = setTimeout(() => {
reject(new TimeoutError());
}, this.handleMessageTimeout);
});
result = await Promise.race([this.handleMessage(message), pending]);
}
const successfulMessages = [];
const failedMessages = [];
for (const message of messages) {
this.emit("message_received", message);
try {
const result = await this.handleMessage(message);
logger.debug("message_processed", { result });
if (result) {
successfulMessages.push(message.lease_id);
this.emit("message_processed", message);
}
}
catch (e) {
failedMessages.push(message.lease_id);
this.emit("processing_error", e, message);
}
else {
result = await this.handleMessage(message);
}
logger.debug("acknowledging_messages", {
successfulMessages,
failedMessages,
});
await queuesClient({
return !this.alwaysAcknowledge && result instanceof Object
? result
: message;
}
catch (err) {
if (err instanceof TimeoutError) {
throw toTimeoutError(err, `Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`);
}
else if (err instanceof Error) {
throw toStandardError(err, `Unexpected message handler failure: ${err.message}`);
}
throw err;
}
finally {
if (handleMessageTimeoutId) {
clearTimeout(handleMessageTimeoutId);
}
}
}
/**
* Change the visibility timeout on a message
* @param message The message to change the value of
* @param timeout The new timeout that should be set
*/
async acknowledgeMessage(acks, retries) {
try {
// TODO: this is pretty hacky
// TODO: This doesn't appear to be acknowledging correctly....
const input = { acks, retries };
this.emit("acknowledging_messages", acks, retries);
const result = await queuesClient({
...this.fetchOptions,
path: "messages/ack",
method: "POST",
body: { acks: successfulMessages, retries: failedMessages },
body: input,
accountId: this.accountId,
queueId: this.queueId,
});
this.emit("response_processed");
if (!result.success) {
throw new Error("Message Acknowledgement did not succeed.");
}
this.emit("acknowledged_messages", result.result);
return result;
}
catch (e) {
this.emit("error", e);
catch (err) {
this.emit("error", toProviderError(err, `Error acknowledging messages: ${err.message}`));
}
this.isPolling = false;
setTimeout(() => this.poll(), this.pollingWaitTimeMs);
}
/**
* Emit one of the consumer's error events depending on the error received.
* @param err The error object to forward on
* @param message The message that the error occurred on
*/
emitError(err, message) {
if (!message) {
this.emit("error", err);
}
else if (err.name === ProviderError.name) {
this.emit("error", err, message);
}
else if (err instanceof TimeoutError) {
this.emit("timeout_error", err, message);
}
else {
this.emit("processing_error", err, message);
}
}
}

@@ -6,3 +6,3 @@ import { throwErrorIfResponseNotOk } from "./fetch.js";

if (!QUEUES_API_TOKEN) {
throw new Error("Missing Cloudflare credentials");
throw new Error("Missing Cloudflare credentials, please set a QUEUES_API_TOKEN in the environment variables.");
}

@@ -13,3 +13,3 @@ return {

}
export async function queuesClient({ path, method, body, accountId, queueId, }) {
export async function queuesClient({ path, method, body, accountId, queueId, signal, }) {
const { QUEUES_API_TOKEN } = getCredentials();

@@ -23,2 +23,3 @@ const response = await fetch(`${CLOUDFLARE_HOST}/accounts/${accountId}/queues/${queueId}/${path}`, {

body: JSON.stringify(body),
signal,
});

@@ -25,0 +26,0 @@ throwErrorIfResponseNotOk(response);

@@ -33,2 +33,9 @@ const requiredOptions = ["accountId", "queueId", "handleMessage"];

}
export { assertOptions, validateOption };
/**
* Determine if the response has messages in it.
* @param response The response from CloudFlare.
*/
function hasMessages(response) {
return response?.result?.messages && response.result.messages.length > 0;
}
export { assertOptions, validateOption, hasMessages };

@@ -0,1 +1,2 @@

/// <reference types="node" resolution-mode="require"/>
import { TypedEventEmitter } from "./emitter.js";

@@ -13,4 +14,9 @@ import type { ConsumerOptions } from "./types.js";

private pollingWaitTimeMs;
private pollingTimeoutId;
private stopped;
private isPolling;
private handleMessageTimeout;
private alwaysAcknowledge;
private retryMessageDelay;
abortController: AbortController;
/**

@@ -26,2 +32,16 @@ * Create a new consumer

/**
* Start polling the queue.
*/
start(): void;
/**
* A reusable options object for sqs.send that's used to avoid duplication.
*/
private get fetchOptions();
/**
* Stop polling the queue.
*/
stop(options?: {
abort?: boolean;
}): void;
/**
* Returns the current status of the consumer.

@@ -35,13 +55,39 @@ * This includes whether it is running or currently polling.

/**
* Start polling the queue.
* Poll the queue for messages.
*/
start(): void;
private poll;
/**
* Stop polling the queue.
* Send a request to CloudFlare Queues to retrieve messages
* @param params The required params to receive messages from CloudFlare Queues
*/
stop(): void;
private receiveMessage;
/**
* Poll the queue for messages.
* Handles the response from AWS SQS, determining if we should proceed to
* the message handler.
* @param response The output from AWS SQS
*/
private poll;
private handleQueueResponse;
/**
* Process a message that has been received from CloudFlare Queues. This will execute the message
* handler and delete the message once complete.
* @param message The message that was delivered from CloudFlare
*/
private processMessage;
/**
* Trigger the applications handleMessage function
* @param message The message that was received from CloudFlare
*/
private executeHandler;
/**
* Change the visibility timeout on a message
* @param message The message to change the value of
* @param timeout The new timeout that should be set
*/
private acknowledgeMessage;
/**
* Emit one of the consumer's error events depending on the error received.
* @param err The error object to forward on
* @param message The message that the error occurred on
*/
private emitError;
}
export declare function getCredentials(): {
QUEUES_API_TOKEN: string;
};
export declare function queuesClient<T = unknown>({ path, method, body, accountId, queueId, }: {
export declare function queuesClient<T = unknown>({ path, method, body, accountId, queueId, signal, }: {
path: any;

@@ -10,2 +10,3 @@ method: any;

queueId: any;
signal: any;
}): Promise<T>;

@@ -5,6 +5,41 @@ /**

export interface ConsumerOptions {
batchSize: number;
visibilityTimeoutMs: number;
/**
* The number of messages to request from CloudFlare when polling (default `10`).
* @defaultvalue `10`
*/
batchSize?: number;
/**
* The duration (in milliseconds) that the received messages are hidden from subsequent
* retrieve requests after being retrieved by a ReceiveMessage request.
* @defaultvalue 1000
*/
visibilityTimeoutMs?: number;
/**
* You CloudFlare account id
*/
accountId: string;
/**
* The ID of the queue you want to receive messages from.
*/
queueId: string;
/**
* Time in ms to wait for `handleMessage` to process a message before timing out.
*
* Emits `timeout_error` on timeout. By default, if `handleMessage` times out,
* the unprocessed message returns to the end of the queue.
*/
handleMessageTimeout?: number;
/**
* By default, the consumer will treat an empty object or array from either of the
* handlers as a acknowledgement of no messages and will not delete those messages as
* a result. Set this to `true` to always acknowledge all messages no matter the returned
* value.
* @defaultvalue `false`
*/
alwaysAcknowledge?: boolean;
/**
* The amount of time to delay a message for before retrying (in seconds)
* @defaultvalue 10
*/
retryMessageDelay?: number;
}

@@ -17,2 +52,6 @@ export type Message = {

lease_id: string;
metadata: {
"CF-sourceMessageSource": string;
"CF-Content-Type": "json" | "text";
};
};

@@ -97,2 +136,24 @@ export type CloudFlareError = {

stopped: [];
/**
* Fired when messages are acknowledging
*/
acknowledging_messages: [
{
lease_id: string;
}[],
{
lease_id: string;
delay_seconds: number;
}[]
];
/**
* Fired when messages have been acknowledged
*/
acknowledged_messages: [
{
ackCount: number;
retryCount: number;
warnings: string[];
}
];
}

@@ -1,2 +0,2 @@

import type { ConsumerOptions } from "./types.js";
import type { ConsumerOptions, PullMessagesResponse } from "./types.js";
declare function validateOption(option: string, value: number, strict?: boolean): void;

@@ -8,2 +8,7 @@ /**

declare function assertOptions(options: ConsumerOptions): void;
export { assertOptions, validateOption };
/**
* Determine if the response has messages in it.
* @param response The response from CloudFlare.
*/
declare function hasMessages(response: PullMessagesResponse): boolean;
export { assertOptions, validateOption, hasMessages };
{
"name": "@bbc/cloudflare-queue-consumer",
"version": "0.0.2",
"version": "0.0.3",
"description": "Build CloudFlare Queue applications without the boilerplate",

@@ -46,3 +46,4 @@ "repository": {

"posttest": "npm run lint && npm run format:check",
"generate-docs": "typedoc"
"generate-docs": "typedoc",
"dev": "DEBUG=cloudflare-queue-consumer node ./example/index.js"
},

@@ -49,0 +50,0 @@ "devDependencies": {

@@ -31,3 +31,11 @@ # cloudflare-queue-consumer

// TODO: Add example
const consumer = new Consumer({
accountId: process.env.ACCOUNT_ID, // Your CloudFlare account ID
queueId: process.env.QUEUE_ID, // The Queue ID that you want to use.
handleMessage: async (message) => {
// Your message handling code...
},
});
consumer.start();
```

@@ -39,4 +47,10 @@

TODO: Add more information
In order to authenticate with the CloudFlare API, you will need to create an API token with read and write access to CloudFlare Queues, more information can be found [here](https://developers.cloudflare.com/queues/reference/pull-consumers/#create-api-tokens).
Copy that token and set it as the value for an environment variable named `QUEUES_API_TOKEN`.
### Example project
You'll aldo find an example project in the folder `./example`, set the variables `ACCOUNT_ID` and `QUEUE_ID` and then run this with the command `pnpm dev`.
## Contributing

@@ -43,0 +57,0 @@

@@ -8,6 +8,14 @@ import { TypedEventEmitter } from "./emitter.js";

} from "./types.js";
import { assertOptions } from "./validation.js";
import { assertOptions, hasMessages } from "./validation.js";
import { queuesClient } from "./lib/cloudflare.js";
import { logger } from "./logger.js";
import {
toProviderError,
ProviderError,
toStandardError,
toTimeoutError,
TimeoutError,
} from "./errors.js";
// TODO: Document how to use this in the README
/**

@@ -23,4 +31,9 @@ * [Usage](https://bbc.github.io/cloudflare-queue-consumer/index.html#usage)

private pollingWaitTimeMs: number;
private pollingTimeoutId: NodeJS.Timeout;
private stopped = true;
private isPolling = false;
private handleMessageTimeout: number;
private alwaysAcknowledge: number;
private retryMessageDelay: number;
public abortController: AbortController;

@@ -40,2 +53,5 @@ /**

this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 1000;
this.handleMessageTimeout = options.handleMessageTimeout;
this.alwaysAcknowledge = options.alwaysAcknowledge ?? false;
this.retryMessageDelay = options.retryMessageDelay ?? 10;
}

@@ -51,16 +67,2 @@

/**
* Returns the current status of the consumer.
* This includes whether it is running or currently polling.
*/
public get status(): {
isRunning: boolean;
isPolling: boolean;
} {
return {
isRunning: !this.stopped,
isPolling: this.isPolling,
};
}
/**
* Start polling the queue.

@@ -72,3 +74,7 @@ */

}
// Create a new abort controller each time the consumer is started
this.abortController = new AbortController();
logger.debug("starting");
this.stopped = false;
this.emit("started");
this.poll();

@@ -78,9 +84,53 @@ }

/**
* A reusable options object for sqs.send that's used to avoid duplication.
*/
private get fetchOptions(): { signal: AbortSignal } {
return {
// return the current abortController signal or a fresh signal that has not been aborted.
// This effectively defaults the signal sent to not aborted
signal: this.abortController?.signal || new AbortController().signal,
};
}
/**
* Stop polling the queue.
*/
public stop(): void {
public stop(options?: { abort?: boolean }): void {
if (this.stopped) {
logger.debug("already_stopped");
return;
}
logger.debug("stopping");
this.stopped = true;
if (this.pollingTimeoutId) {
clearTimeout(this.pollingTimeoutId);
this.pollingTimeoutId = undefined;
}
if (options?.abort) {
logger.debug("aborting");
this.abortController.abort();
this.emit("aborted");
}
this.emit("stopped");
}
/**
* Returns the current status of the consumer.
* This includes whether it is running or currently polling.
*/
public get status(): {
isRunning: boolean;
isPolling: boolean;
} {
return {
isRunning: !this.stopped,
isPolling: this.isPolling,
};
}
/**
* Poll the queue for messages.

@@ -101,4 +151,34 @@ */

const currentPollingTimeout: number = this.pollingWaitTimeMs;
this.receiveMessage()
.then((output: PullMessagesResponse) => this.handleQueueResponse(output))
.then((): void => {
if (this.pollingTimeoutId) {
clearTimeout(this.pollingTimeoutId);
}
this.pollingTimeoutId = setTimeout(
() => this.poll(),
currentPollingTimeout,
);
})
.catch((err): void => {
// TODO: Adjust the error handling here
// TODO: Add an extension to the polling timeout if auth error
this.emit("error", err);
setTimeout(() => this.poll(), this.pollingWaitTimeMs);
})
.finally((): void => {
this.isPolling = false;
});
}
/**
* Send a request to CloudFlare Queues to retrieve messages
* @param params The required params to receive messages from CloudFlare Queues
*/
private async receiveMessage(): Promise<PullMessagesResponse> {
try {
const response = await queuesClient<PullMessagesResponse>({
const result = queuesClient<PullMessagesResponse>({
...this.fetchOptions,
path: "messages/pull",

@@ -114,43 +194,149 @@ method: "POST",

if (!response.success) {
this.emit("error", new Error("Failed to pull messages"));
this.isPolling = false;
return;
}
return result;
} catch (err) {
throw toProviderError(err, `Receive message failed: ${err.message}`);
}
}
const { messages } = response.result;
/**
* Handles the response from AWS SQS, determining if we should proceed to
* the message handler.
* @param response The output from AWS SQS
*/
private async handleQueueResponse(
response: PullMessagesResponse,
): Promise<void> {
if (!response.success) {
this.emit("error", new Error("Failed to pull messages"));
this.isPolling = false;
return;
}
if (!messages || messages.length === 0) {
this.emit("empty");
this.isPolling = false;
return;
if (hasMessages(response)) {
await Promise.all(
response.result.messages.map((message: Message) =>
this.processMessage(message),
),
);
this.emit("response_processed");
} else if (response) {
this.emit("empty");
}
}
/**
* Process a message that has been received from CloudFlare Queues. This will execute the message
* handler and delete the message once complete.
* @param message The message that was delivered from CloudFlare
*/
private async processMessage(message: Message): Promise<void> {
try {
this.emit("message_received", message);
// TODO: Invesitgate if we can do heartbear checks here like SQS Consumer
// https://github.com/bbc/sqs-consumer/blob/main/src/consumer.ts#L339
const ackedMessage: Message = await this.executeHandler(message);
if (ackedMessage?.id === message.id) {
// TODO: In order to conserve API reate limits, it would be better to do this
// in a batch, rather than one at a time.
await this.acknowledgeMessage(
[
{
lease_id: message.lease_id,
},
],
[],
);
this.emit("message_processed", message);
}
} catch (err) {
this.emitError(err, message);
const successfulMessages: string[] = [];
const failedMessages: string[] = [];
// TODO: In order to conserve API reate limits, it would be better to do this
// in a batch, rather than one at a time.
await this.acknowledgeMessage(
[],
[
{
lease_id: message.lease_id,
delay_seconds: this.retryMessageDelay,
},
],
);
}
}
for (const message of messages) {
this.emit("message_received", message);
try {
const result = await this.handleMessage(message);
logger.debug("message_processed", { result });
if (result) {
successfulMessages.push(message.lease_id);
this.emit("message_processed", message);
}
} catch (e) {
failedMessages.push(message.lease_id);
this.emit("processing_error", e, message);
}
/**
* Trigger the applications handleMessage function
* @param message The message that was received from CloudFlare
*/
private async executeHandler(message: Message): Promise<Message> {
let handleMessageTimeoutId: NodeJS.Timeout | undefined = undefined;
try {
let result;
if (this.handleMessageTimeout) {
const pending: Promise<void> = new Promise((_, reject): void => {
handleMessageTimeoutId = setTimeout((): void => {
reject(new TimeoutError());
}, this.handleMessageTimeout);
});
result = await Promise.race([this.handleMessage(message), pending]);
} else {
result = await this.handleMessage(message);
}
logger.debug("acknowledging_messages", {
successfulMessages,
failedMessages,
});
return !this.alwaysAcknowledge && result instanceof Object
? result
: message;
} catch (err) {
if (err instanceof TimeoutError) {
throw toTimeoutError(
err,
`Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`,
);
} else if (err instanceof Error) {
throw toStandardError(
err,
`Unexpected message handler failure: ${err.message}`,
);
}
throw err;
} finally {
if (handleMessageTimeoutId) {
clearTimeout(handleMessageTimeoutId);
}
}
}
await queuesClient<AckMessageResponse>({
/**
* Change the visibility timeout on a message
* @param message The message to change the value of
* @param timeout The new timeout that should be set
*/
private async acknowledgeMessage(
acks: {
lease_id: string;
}[],
retries: {
lease_id: string;
delay_seconds: number;
}[],
): Promise<AckMessageResponse> {
try {
// TODO: this is pretty hacky
// TODO: This doesn't appear to be acknowledging correctly....
const input = { acks, retries };
this.emit("acknowledging_messages", acks, retries);
const result = await queuesClient<AckMessageResponse>({
...this.fetchOptions,
path: "messages/ack",
method: "POST",
body: { acks: successfulMessages, retries: failedMessages },
body: input,
accountId: this.accountId,

@@ -160,10 +346,33 @@ queueId: this.queueId,

this.emit("response_processed");
} catch (e) {
this.emit("error", e);
if (!result.success) {
throw new Error("Message Acknowledgement did not succeed.");
}
this.emit("acknowledged_messages", result.result);
return result;
} catch (err) {
this.emit(
"error",
toProviderError(err, `Error acknowledging messages: ${err.message}`),
);
}
}
this.isPolling = false;
setTimeout(() => this.poll(), this.pollingWaitTimeMs);
/**
* Emit one of the consumer's error events depending on the error received.
* @param err The error object to forward on
* @param message The message that the error occurred on
*/
private emitError(err: Error, message?: Message): void {
if (!message) {
this.emit("error", err);
} else if (err.name === ProviderError.name) {
this.emit("error", err, message);
} else if (err instanceof TimeoutError) {
this.emit("timeout_error", err, message);
} else {
this.emit("processing_error", err, message);
}
}
}

@@ -9,3 +9,5 @@ import { throwErrorIfResponseNotOk } from "./fetch.js";

if (!QUEUES_API_TOKEN) {
throw new Error("Missing Cloudflare credentials");
throw new Error(
"Missing Cloudflare credentials, please set a QUEUES_API_TOKEN in the environment variables.",
);
}

@@ -24,2 +26,3 @@

queueId,
signal,
}): Promise<T> {

@@ -37,2 +40,3 @@ const { QUEUES_API_TOKEN } = getCredentials();

body: JSON.stringify(body),
signal,
},

@@ -39,0 +43,0 @@ );

@@ -5,6 +5,41 @@ /**

export interface ConsumerOptions {
batchSize: number;
visibilityTimeoutMs: number;
/**
* The number of messages to request from CloudFlare when polling (default `10`).
* @defaultvalue `10`
*/
batchSize?: number;
/**
* The duration (in milliseconds) that the received messages are hidden from subsequent
* retrieve requests after being retrieved by a ReceiveMessage request.
* @defaultvalue 1000
*/
visibilityTimeoutMs?: number;
/**
* You CloudFlare account id
*/
accountId: string;
/**
* The ID of the queue you want to receive messages from.
*/
queueId: string;
/**
* Time in ms to wait for `handleMessage` to process a message before timing out.
*
* Emits `timeout_error` on timeout. By default, if `handleMessage` times out,
* the unprocessed message returns to the end of the queue.
*/
handleMessageTimeout?: number;
/**
* By default, the consumer will treat an empty object or array from either of the
* handlers as a acknowledgement of no messages and will not delete those messages as
* a result. Set this to `true` to always acknowledge all messages no matter the returned
* value.
* @defaultvalue `false`
*/
alwaysAcknowledge?: boolean;
/**
* The amount of time to delay a message for before retrying (in seconds)
* @defaultvalue 10
*/
retryMessageDelay?: number;
}

@@ -18,2 +53,6 @@

lease_id: string;
metadata: {
"CF-sourceMessageSource": string;
"CF-Content-Type": "json" | "text";
};
};

@@ -103,2 +142,24 @@

stopped: [];
/**
* Fired when messages are acknowledging
*/
acknowledging_messages: [
{
lease_id: string;
}[],
{
lease_id: string;
delay_seconds: number;
}[],
];
/**
* Fired when messages have been acknowledged
*/
acknowledged_messages: [
{
ackCount: number;
retryCount: number;
warnings: string[];
},
];
}

@@ -1,2 +0,2 @@

import type { ConsumerOptions } from "./types.js";
import type { ConsumerOptions, PullMessagesResponse } from "./types.js";

@@ -41,2 +41,10 @@ const requiredOptions = ["accountId", "queueId", "handleMessage"];

export { assertOptions, validateOption };
/**
* Determine if the response has messages in it.
* @param response The response from CloudFlare.
*/
function hasMessages(response: PullMessagesResponse): boolean {
return response?.result?.messages && response.result.messages.length > 0;
}
export { assertOptions, validateOption, hasMessages };