@bbc/cloudflare-queue-consumer
Advanced tools
Comparing version
@@ -8,2 +8,4 @@ "use strict"; | ||
const logger_js_1 = require("./logger.js"); | ||
const errors_js_1 = require("./errors.js"); | ||
// TODO: Document how to use this in the README | ||
/** | ||
@@ -28,2 +30,5 @@ * [Usage](https://bbc.github.io/cloudflare-queue-consumer/index.html#usage) | ||
this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 1000; | ||
this.handleMessageTimeout = options.handleMessageTimeout; | ||
this.alwaysAcknowledge = options.alwaysAcknowledge ?? false; | ||
this.retryMessageDelay = options.retryMessageDelay ?? 10; | ||
} | ||
@@ -37,12 +42,2 @@ /** | ||
/** | ||
* Returns the current status of the consumer. | ||
* This includes whether it is running or currently polling. | ||
*/ | ||
get status() { | ||
return { | ||
isRunning: !this.stopped, | ||
isPolling: this.isPolling, | ||
}; | ||
} | ||
/** | ||
* Start polling the queue. | ||
@@ -54,12 +49,51 @@ */ | ||
} | ||
// Create a new abort controller each time the consumer is started | ||
this.abortController = new AbortController(); | ||
logger_js_1.logger.debug("starting"); | ||
this.stopped = false; | ||
this.emit("started"); | ||
this.poll(); | ||
} | ||
/** | ||
* A reusable options object for sqs.send that's used to avoid duplication. | ||
*/ | ||
get fetchOptions() { | ||
return { | ||
// return the current abortController signal or a fresh signal that has not been aborted. | ||
// This effectively defaults the signal sent to not aborted | ||
signal: this.abortController?.signal || new AbortController().signal, | ||
}; | ||
} | ||
/** | ||
* Stop polling the queue. | ||
*/ | ||
stop() { | ||
stop(options) { | ||
if (this.stopped) { | ||
logger_js_1.logger.debug("already_stopped"); | ||
return; | ||
} | ||
logger_js_1.logger.debug("stopping"); | ||
this.stopped = true; | ||
if (this.pollingTimeoutId) { | ||
clearTimeout(this.pollingTimeoutId); | ||
this.pollingTimeoutId = undefined; | ||
} | ||
if (options?.abort) { | ||
logger_js_1.logger.debug("aborting"); | ||
this.abortController.abort(); | ||
this.emit("aborted"); | ||
} | ||
this.emit("stopped"); | ||
} | ||
/** | ||
* Returns the current status of the consumer. | ||
* This includes whether it is running or currently polling. | ||
*/ | ||
get status() { | ||
return { | ||
isRunning: !this.stopped, | ||
isPolling: this.isPolling, | ||
}; | ||
} | ||
/** | ||
* Poll the queue for messages. | ||
@@ -76,4 +110,29 @@ */ | ||
this.isPolling = true; | ||
const currentPollingTimeout = this.pollingWaitTimeMs; | ||
this.receiveMessage() | ||
.then((output) => this.handleQueueResponse(output)) | ||
.then(() => { | ||
if (this.pollingTimeoutId) { | ||
clearTimeout(this.pollingTimeoutId); | ||
} | ||
this.pollingTimeoutId = setTimeout(() => this.poll(), currentPollingTimeout); | ||
}) | ||
.catch((err) => { | ||
// TODO: Adjust the error handling here | ||
// TODO: Add an extension to the polling timeout if auth error | ||
this.emit("error", err); | ||
setTimeout(() => this.poll(), this.pollingWaitTimeMs); | ||
}) | ||
.finally(() => { | ||
this.isPolling = false; | ||
}); | ||
} | ||
/** | ||
* Send a request to CloudFlare Queues to retrieve messages | ||
* @param params The required params to receive messages from CloudFlare Queues | ||
*/ | ||
async receiveMessage() { | ||
try { | ||
const response = await (0, cloudflare_js_1.queuesClient)({ | ||
const result = (0, cloudflare_js_1.queuesClient)({ | ||
...this.fetchOptions, | ||
path: "messages/pull", | ||
@@ -88,50 +147,148 @@ method: "POST", | ||
}); | ||
if (!response.success) { | ||
this.emit("error", new Error("Failed to pull messages")); | ||
this.isPolling = false; | ||
return; | ||
return result; | ||
} | ||
catch (err) { | ||
throw (0, errors_js_1.toProviderError)(err, `Receive message failed: ${err.message}`); | ||
} | ||
} | ||
/** | ||
* Handles the response from AWS SQS, determining if we should proceed to | ||
* the message handler. | ||
* @param response The output from AWS SQS | ||
*/ | ||
async handleQueueResponse(response) { | ||
if (!response.success) { | ||
this.emit("error", new Error("Failed to pull messages")); | ||
this.isPolling = false; | ||
return; | ||
} | ||
if ((0, validation_js_1.hasMessages)(response)) { | ||
await Promise.all(response.result.messages.map((message) => this.processMessage(message))); | ||
this.emit("response_processed"); | ||
} | ||
else if (response) { | ||
this.emit("empty"); | ||
} | ||
} | ||
/** | ||
* Process a message that has been received from CloudFlare Queues. This will execute the message | ||
* handler and delete the message once complete. | ||
* @param message The message that was delivered from CloudFlare | ||
*/ | ||
async processMessage(message) { | ||
try { | ||
this.emit("message_received", message); | ||
// TODO: Invesitgate if we can do heartbear checks here like SQS Consumer | ||
// https://github.com/bbc/sqs-consumer/blob/main/src/consumer.ts#L339 | ||
const ackedMessage = await this.executeHandler(message); | ||
if (ackedMessage?.id === message.id) { | ||
// TODO: In order to conserve API reate limits, it would be better to do this | ||
// in a batch, rather than one at a time. | ||
await this.acknowledgeMessage([ | ||
{ | ||
lease_id: message.lease_id, | ||
}, | ||
], []); | ||
this.emit("message_processed", message); | ||
} | ||
const { messages } = response.result; | ||
if (!messages || messages.length === 0) { | ||
this.emit("empty"); | ||
this.isPolling = false; | ||
return; | ||
} | ||
catch (err) { | ||
this.emitError(err, message); | ||
// TODO: In order to conserve API reate limits, it would be better to do this | ||
// in a batch, rather than one at a time. | ||
await this.acknowledgeMessage([], [ | ||
{ | ||
lease_id: message.lease_id, | ||
delay_seconds: this.retryMessageDelay, | ||
}, | ||
]); | ||
} | ||
} | ||
/** | ||
* Trigger the applications handleMessage function | ||
* @param message The message that was received from CloudFlare | ||
*/ | ||
async executeHandler(message) { | ||
let handleMessageTimeoutId = undefined; | ||
try { | ||
let result; | ||
if (this.handleMessageTimeout) { | ||
const pending = new Promise((_, reject) => { | ||
handleMessageTimeoutId = setTimeout(() => { | ||
reject(new errors_js_1.TimeoutError()); | ||
}, this.handleMessageTimeout); | ||
}); | ||
result = await Promise.race([this.handleMessage(message), pending]); | ||
} | ||
const successfulMessages = []; | ||
const failedMessages = []; | ||
for (const message of messages) { | ||
this.emit("message_received", message); | ||
try { | ||
const result = await this.handleMessage(message); | ||
logger_js_1.logger.debug("message_processed", { result }); | ||
if (result) { | ||
successfulMessages.push(message.lease_id); | ||
this.emit("message_processed", message); | ||
} | ||
} | ||
catch (e) { | ||
failedMessages.push(message.lease_id); | ||
this.emit("processing_error", e, message); | ||
} | ||
else { | ||
result = await this.handleMessage(message); | ||
} | ||
logger_js_1.logger.debug("acknowledging_messages", { | ||
successfulMessages, | ||
failedMessages, | ||
}); | ||
await (0, cloudflare_js_1.queuesClient)({ | ||
return !this.alwaysAcknowledge && result instanceof Object | ||
? result | ||
: message; | ||
} | ||
catch (err) { | ||
if (err instanceof errors_js_1.TimeoutError) { | ||
throw (0, errors_js_1.toTimeoutError)(err, `Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`); | ||
} | ||
else if (err instanceof Error) { | ||
throw (0, errors_js_1.toStandardError)(err, `Unexpected message handler failure: ${err.message}`); | ||
} | ||
throw err; | ||
} | ||
finally { | ||
if (handleMessageTimeoutId) { | ||
clearTimeout(handleMessageTimeoutId); | ||
} | ||
} | ||
} | ||
/** | ||
* Change the visibility timeout on a message | ||
* @param message The message to change the value of | ||
* @param timeout The new timeout that should be set | ||
*/ | ||
async acknowledgeMessage(acks, retries) { | ||
try { | ||
// TODO: this is pretty hacky | ||
// TODO: This doesn't appear to be acknowledging correctly.... | ||
const input = { acks, retries }; | ||
this.emit("acknowledging_messages", acks, retries); | ||
const result = await (0, cloudflare_js_1.queuesClient)({ | ||
...this.fetchOptions, | ||
path: "messages/ack", | ||
method: "POST", | ||
body: { acks: successfulMessages, retries: failedMessages }, | ||
body: input, | ||
accountId: this.accountId, | ||
queueId: this.queueId, | ||
}); | ||
this.emit("response_processed"); | ||
if (!result.success) { | ||
throw new Error("Message Acknowledgement did not succeed."); | ||
} | ||
this.emit("acknowledged_messages", result.result); | ||
return result; | ||
} | ||
catch (e) { | ||
this.emit("error", e); | ||
catch (err) { | ||
this.emit("error", (0, errors_js_1.toProviderError)(err, `Error acknowledging messages: ${err.message}`)); | ||
} | ||
this.isPolling = false; | ||
setTimeout(() => this.poll(), this.pollingWaitTimeMs); | ||
} | ||
/** | ||
* Emit one of the consumer's error events depending on the error received. | ||
* @param err The error object to forward on | ||
* @param message The message that the error occurred on | ||
*/ | ||
emitError(err, message) { | ||
if (!message) { | ||
this.emit("error", err); | ||
} | ||
else if (err.name === errors_js_1.ProviderError.name) { | ||
this.emit("error", err, message); | ||
} | ||
else if (err instanceof errors_js_1.TimeoutError) { | ||
this.emit("timeout_error", err, message); | ||
} | ||
else { | ||
this.emit("processing_error", err, message); | ||
} | ||
} | ||
} | ||
exports.Consumer = Consumer; |
@@ -9,3 +9,3 @@ "use strict"; | ||
if (!QUEUES_API_TOKEN) { | ||
throw new Error("Missing Cloudflare credentials"); | ||
throw new Error("Missing Cloudflare credentials, please set a QUEUES_API_TOKEN in the environment variables."); | ||
} | ||
@@ -17,3 +17,3 @@ return { | ||
exports.getCredentials = getCredentials; | ||
async function queuesClient({ path, method, body, accountId, queueId, }) { | ||
async function queuesClient({ path, method, body, accountId, queueId, signal, }) { | ||
const { QUEUES_API_TOKEN } = getCredentials(); | ||
@@ -27,2 +27,3 @@ const response = await fetch(`${CLOUDFLARE_HOST}/accounts/${accountId}/queues/${queueId}/${path}`, { | ||
body: JSON.stringify(body), | ||
signal, | ||
}); | ||
@@ -29,0 +30,0 @@ (0, fetch_js_1.throwErrorIfResponseNotOk)(response); |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.validateOption = exports.assertOptions = void 0; | ||
exports.hasMessages = exports.validateOption = exports.assertOptions = void 0; | ||
const requiredOptions = ["accountId", "queueId", "handleMessage"]; | ||
@@ -38,1 +38,9 @@ function validateOption(option, value, strict) { | ||
exports.assertOptions = assertOptions; | ||
/** | ||
* Determine if the response has messages in it. | ||
* @param response The response from CloudFlare. | ||
*/ | ||
function hasMessages(response) { | ||
return response?.result?.messages && response.result.messages.length > 0; | ||
} | ||
exports.hasMessages = hasMessages; |
import { TypedEventEmitter } from "./emitter.js"; | ||
import { assertOptions } from "./validation.js"; | ||
import { assertOptions, hasMessages } from "./validation.js"; | ||
import { queuesClient } from "./lib/cloudflare.js"; | ||
import { logger } from "./logger.js"; | ||
import { toProviderError, ProviderError, toStandardError, toTimeoutError, TimeoutError, } from "./errors.js"; | ||
// TODO: Document how to use this in the README | ||
/** | ||
@@ -24,2 +26,5 @@ * [Usage](https://bbc.github.io/cloudflare-queue-consumer/index.html#usage) | ||
this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 1000; | ||
this.handleMessageTimeout = options.handleMessageTimeout; | ||
this.alwaysAcknowledge = options.alwaysAcknowledge ?? false; | ||
this.retryMessageDelay = options.retryMessageDelay ?? 10; | ||
} | ||
@@ -33,12 +38,2 @@ /** | ||
/** | ||
* Returns the current status of the consumer. | ||
* This includes whether it is running or currently polling. | ||
*/ | ||
get status() { | ||
return { | ||
isRunning: !this.stopped, | ||
isPolling: this.isPolling, | ||
}; | ||
} | ||
/** | ||
* Start polling the queue. | ||
@@ -50,12 +45,51 @@ */ | ||
} | ||
// Create a new abort controller each time the consumer is started | ||
this.abortController = new AbortController(); | ||
logger.debug("starting"); | ||
this.stopped = false; | ||
this.emit("started"); | ||
this.poll(); | ||
} | ||
/** | ||
* A reusable options object for sqs.send that's used to avoid duplication. | ||
*/ | ||
get fetchOptions() { | ||
return { | ||
// return the current abortController signal or a fresh signal that has not been aborted. | ||
// This effectively defaults the signal sent to not aborted | ||
signal: this.abortController?.signal || new AbortController().signal, | ||
}; | ||
} | ||
/** | ||
* Stop polling the queue. | ||
*/ | ||
stop() { | ||
stop(options) { | ||
if (this.stopped) { | ||
logger.debug("already_stopped"); | ||
return; | ||
} | ||
logger.debug("stopping"); | ||
this.stopped = true; | ||
if (this.pollingTimeoutId) { | ||
clearTimeout(this.pollingTimeoutId); | ||
this.pollingTimeoutId = undefined; | ||
} | ||
if (options?.abort) { | ||
logger.debug("aborting"); | ||
this.abortController.abort(); | ||
this.emit("aborted"); | ||
} | ||
this.emit("stopped"); | ||
} | ||
/** | ||
* Returns the current status of the consumer. | ||
* This includes whether it is running or currently polling. | ||
*/ | ||
get status() { | ||
return { | ||
isRunning: !this.stopped, | ||
isPolling: this.isPolling, | ||
}; | ||
} | ||
/** | ||
* Poll the queue for messages. | ||
@@ -72,4 +106,29 @@ */ | ||
this.isPolling = true; | ||
const currentPollingTimeout = this.pollingWaitTimeMs; | ||
this.receiveMessage() | ||
.then((output) => this.handleQueueResponse(output)) | ||
.then(() => { | ||
if (this.pollingTimeoutId) { | ||
clearTimeout(this.pollingTimeoutId); | ||
} | ||
this.pollingTimeoutId = setTimeout(() => this.poll(), currentPollingTimeout); | ||
}) | ||
.catch((err) => { | ||
// TODO: Adjust the error handling here | ||
// TODO: Add an extension to the polling timeout if auth error | ||
this.emit("error", err); | ||
setTimeout(() => this.poll(), this.pollingWaitTimeMs); | ||
}) | ||
.finally(() => { | ||
this.isPolling = false; | ||
}); | ||
} | ||
/** | ||
* Send a request to CloudFlare Queues to retrieve messages | ||
* @param params The required params to receive messages from CloudFlare Queues | ||
*/ | ||
async receiveMessage() { | ||
try { | ||
const response = await queuesClient({ | ||
const result = queuesClient({ | ||
...this.fetchOptions, | ||
path: "messages/pull", | ||
@@ -84,49 +143,147 @@ method: "POST", | ||
}); | ||
if (!response.success) { | ||
this.emit("error", new Error("Failed to pull messages")); | ||
this.isPolling = false; | ||
return; | ||
return result; | ||
} | ||
catch (err) { | ||
throw toProviderError(err, `Receive message failed: ${err.message}`); | ||
} | ||
} | ||
/** | ||
* Handles the response from AWS SQS, determining if we should proceed to | ||
* the message handler. | ||
* @param response The output from AWS SQS | ||
*/ | ||
async handleQueueResponse(response) { | ||
if (!response.success) { | ||
this.emit("error", new Error("Failed to pull messages")); | ||
this.isPolling = false; | ||
return; | ||
} | ||
if (hasMessages(response)) { | ||
await Promise.all(response.result.messages.map((message) => this.processMessage(message))); | ||
this.emit("response_processed"); | ||
} | ||
else if (response) { | ||
this.emit("empty"); | ||
} | ||
} | ||
/** | ||
* Process a message that has been received from CloudFlare Queues. This will execute the message | ||
* handler and delete the message once complete. | ||
* @param message The message that was delivered from CloudFlare | ||
*/ | ||
async processMessage(message) { | ||
try { | ||
this.emit("message_received", message); | ||
// TODO: Invesitgate if we can do heartbear checks here like SQS Consumer | ||
// https://github.com/bbc/sqs-consumer/blob/main/src/consumer.ts#L339 | ||
const ackedMessage = await this.executeHandler(message); | ||
if (ackedMessage?.id === message.id) { | ||
// TODO: In order to conserve API reate limits, it would be better to do this | ||
// in a batch, rather than one at a time. | ||
await this.acknowledgeMessage([ | ||
{ | ||
lease_id: message.lease_id, | ||
}, | ||
], []); | ||
this.emit("message_processed", message); | ||
} | ||
const { messages } = response.result; | ||
if (!messages || messages.length === 0) { | ||
this.emit("empty"); | ||
this.isPolling = false; | ||
return; | ||
} | ||
catch (err) { | ||
this.emitError(err, message); | ||
// TODO: In order to conserve API reate limits, it would be better to do this | ||
// in a batch, rather than one at a time. | ||
await this.acknowledgeMessage([], [ | ||
{ | ||
lease_id: message.lease_id, | ||
delay_seconds: this.retryMessageDelay, | ||
}, | ||
]); | ||
} | ||
} | ||
/** | ||
* Trigger the applications handleMessage function | ||
* @param message The message that was received from CloudFlare | ||
*/ | ||
async executeHandler(message) { | ||
let handleMessageTimeoutId = undefined; | ||
try { | ||
let result; | ||
if (this.handleMessageTimeout) { | ||
const pending = new Promise((_, reject) => { | ||
handleMessageTimeoutId = setTimeout(() => { | ||
reject(new TimeoutError()); | ||
}, this.handleMessageTimeout); | ||
}); | ||
result = await Promise.race([this.handleMessage(message), pending]); | ||
} | ||
const successfulMessages = []; | ||
const failedMessages = []; | ||
for (const message of messages) { | ||
this.emit("message_received", message); | ||
try { | ||
const result = await this.handleMessage(message); | ||
logger.debug("message_processed", { result }); | ||
if (result) { | ||
successfulMessages.push(message.lease_id); | ||
this.emit("message_processed", message); | ||
} | ||
} | ||
catch (e) { | ||
failedMessages.push(message.lease_id); | ||
this.emit("processing_error", e, message); | ||
} | ||
else { | ||
result = await this.handleMessage(message); | ||
} | ||
logger.debug("acknowledging_messages", { | ||
successfulMessages, | ||
failedMessages, | ||
}); | ||
await queuesClient({ | ||
return !this.alwaysAcknowledge && result instanceof Object | ||
? result | ||
: message; | ||
} | ||
catch (err) { | ||
if (err instanceof TimeoutError) { | ||
throw toTimeoutError(err, `Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`); | ||
} | ||
else if (err instanceof Error) { | ||
throw toStandardError(err, `Unexpected message handler failure: ${err.message}`); | ||
} | ||
throw err; | ||
} | ||
finally { | ||
if (handleMessageTimeoutId) { | ||
clearTimeout(handleMessageTimeoutId); | ||
} | ||
} | ||
} | ||
/** | ||
* Change the visibility timeout on a message | ||
* @param message The message to change the value of | ||
* @param timeout The new timeout that should be set | ||
*/ | ||
async acknowledgeMessage(acks, retries) { | ||
try { | ||
// TODO: this is pretty hacky | ||
// TODO: This doesn't appear to be acknowledging correctly.... | ||
const input = { acks, retries }; | ||
this.emit("acknowledging_messages", acks, retries); | ||
const result = await queuesClient({ | ||
...this.fetchOptions, | ||
path: "messages/ack", | ||
method: "POST", | ||
body: { acks: successfulMessages, retries: failedMessages }, | ||
body: input, | ||
accountId: this.accountId, | ||
queueId: this.queueId, | ||
}); | ||
this.emit("response_processed"); | ||
if (!result.success) { | ||
throw new Error("Message Acknowledgement did not succeed."); | ||
} | ||
this.emit("acknowledged_messages", result.result); | ||
return result; | ||
} | ||
catch (e) { | ||
this.emit("error", e); | ||
catch (err) { | ||
this.emit("error", toProviderError(err, `Error acknowledging messages: ${err.message}`)); | ||
} | ||
this.isPolling = false; | ||
setTimeout(() => this.poll(), this.pollingWaitTimeMs); | ||
} | ||
/** | ||
* Emit one of the consumer's error events depending on the error received. | ||
* @param err The error object to forward on | ||
* @param message The message that the error occurred on | ||
*/ | ||
emitError(err, message) { | ||
if (!message) { | ||
this.emit("error", err); | ||
} | ||
else if (err.name === ProviderError.name) { | ||
this.emit("error", err, message); | ||
} | ||
else if (err instanceof TimeoutError) { | ||
this.emit("timeout_error", err, message); | ||
} | ||
else { | ||
this.emit("processing_error", err, message); | ||
} | ||
} | ||
} |
@@ -6,3 +6,3 @@ import { throwErrorIfResponseNotOk } from "./fetch.js"; | ||
if (!QUEUES_API_TOKEN) { | ||
throw new Error("Missing Cloudflare credentials"); | ||
throw new Error("Missing Cloudflare credentials, please set a QUEUES_API_TOKEN in the environment variables."); | ||
} | ||
@@ -13,3 +13,3 @@ return { | ||
} | ||
export async function queuesClient({ path, method, body, accountId, queueId, }) { | ||
export async function queuesClient({ path, method, body, accountId, queueId, signal, }) { | ||
const { QUEUES_API_TOKEN } = getCredentials(); | ||
@@ -23,2 +23,3 @@ const response = await fetch(`${CLOUDFLARE_HOST}/accounts/${accountId}/queues/${queueId}/${path}`, { | ||
body: JSON.stringify(body), | ||
signal, | ||
}); | ||
@@ -25,0 +26,0 @@ throwErrorIfResponseNotOk(response); |
@@ -33,2 +33,9 @@ const requiredOptions = ["accountId", "queueId", "handleMessage"]; | ||
} | ||
export { assertOptions, validateOption }; | ||
/** | ||
* Determine if the response has messages in it. | ||
* @param response The response from CloudFlare. | ||
*/ | ||
function hasMessages(response) { | ||
return response?.result?.messages && response.result.messages.length > 0; | ||
} | ||
export { assertOptions, validateOption, hasMessages }; |
@@ -0,1 +1,2 @@ | ||
/// <reference types="node" resolution-mode="require"/> | ||
import { TypedEventEmitter } from "./emitter.js"; | ||
@@ -13,4 +14,9 @@ import type { ConsumerOptions } from "./types.js"; | ||
private pollingWaitTimeMs; | ||
private pollingTimeoutId; | ||
private stopped; | ||
private isPolling; | ||
private handleMessageTimeout; | ||
private alwaysAcknowledge; | ||
private retryMessageDelay; | ||
abortController: AbortController; | ||
/** | ||
@@ -26,2 +32,16 @@ * Create a new consumer | ||
/** | ||
* Start polling the queue. | ||
*/ | ||
start(): void; | ||
/** | ||
* A reusable options object for sqs.send that's used to avoid duplication. | ||
*/ | ||
private get fetchOptions(); | ||
/** | ||
* Stop polling the queue. | ||
*/ | ||
stop(options?: { | ||
abort?: boolean; | ||
}): void; | ||
/** | ||
* Returns the current status of the consumer. | ||
@@ -35,13 +55,39 @@ * This includes whether it is running or currently polling. | ||
/** | ||
* Start polling the queue. | ||
* Poll the queue for messages. | ||
*/ | ||
start(): void; | ||
private poll; | ||
/** | ||
* Stop polling the queue. | ||
* Send a request to CloudFlare Queues to retrieve messages | ||
* @param params The required params to receive messages from CloudFlare Queues | ||
*/ | ||
stop(): void; | ||
private receiveMessage; | ||
/** | ||
* Poll the queue for messages. | ||
* Handles the response from AWS SQS, determining if we should proceed to | ||
* the message handler. | ||
* @param response The output from AWS SQS | ||
*/ | ||
private poll; | ||
private handleQueueResponse; | ||
/** | ||
* Process a message that has been received from CloudFlare Queues. This will execute the message | ||
* handler and delete the message once complete. | ||
* @param message The message that was delivered from CloudFlare | ||
*/ | ||
private processMessage; | ||
/** | ||
* Trigger the applications handleMessage function | ||
* @param message The message that was received from CloudFlare | ||
*/ | ||
private executeHandler; | ||
/** | ||
* Change the visibility timeout on a message | ||
* @param message The message to change the value of | ||
* @param timeout The new timeout that should be set | ||
*/ | ||
private acknowledgeMessage; | ||
/** | ||
* Emit one of the consumer's error events depending on the error received. | ||
* @param err The error object to forward on | ||
* @param message The message that the error occurred on | ||
*/ | ||
private emitError; | ||
} |
export declare function getCredentials(): { | ||
QUEUES_API_TOKEN: string; | ||
}; | ||
export declare function queuesClient<T = unknown>({ path, method, body, accountId, queueId, }: { | ||
export declare function queuesClient<T = unknown>({ path, method, body, accountId, queueId, signal, }: { | ||
path: any; | ||
@@ -10,2 +10,3 @@ method: any; | ||
queueId: any; | ||
signal: any; | ||
}): Promise<T>; |
@@ -5,6 +5,41 @@ /** | ||
export interface ConsumerOptions { | ||
batchSize: number; | ||
visibilityTimeoutMs: number; | ||
/** | ||
* The number of messages to request from CloudFlare when polling (default `10`). | ||
* @defaultvalue `10` | ||
*/ | ||
batchSize?: number; | ||
/** | ||
* The duration (in milliseconds) that the received messages are hidden from subsequent | ||
* retrieve requests after being retrieved by a ReceiveMessage request. | ||
* @defaultvalue 1000 | ||
*/ | ||
visibilityTimeoutMs?: number; | ||
/** | ||
* You CloudFlare account id | ||
*/ | ||
accountId: string; | ||
/** | ||
* The ID of the queue you want to receive messages from. | ||
*/ | ||
queueId: string; | ||
/** | ||
* Time in ms to wait for `handleMessage` to process a message before timing out. | ||
* | ||
* Emits `timeout_error` on timeout. By default, if `handleMessage` times out, | ||
* the unprocessed message returns to the end of the queue. | ||
*/ | ||
handleMessageTimeout?: number; | ||
/** | ||
* By default, the consumer will treat an empty object or array from either of the | ||
* handlers as a acknowledgement of no messages and will not delete those messages as | ||
* a result. Set this to `true` to always acknowledge all messages no matter the returned | ||
* value. | ||
* @defaultvalue `false` | ||
*/ | ||
alwaysAcknowledge?: boolean; | ||
/** | ||
* The amount of time to delay a message for before retrying (in seconds) | ||
* @defaultvalue 10 | ||
*/ | ||
retryMessageDelay?: number; | ||
} | ||
@@ -17,2 +52,6 @@ export type Message = { | ||
lease_id: string; | ||
metadata: { | ||
"CF-sourceMessageSource": string; | ||
"CF-Content-Type": "json" | "text"; | ||
}; | ||
}; | ||
@@ -97,2 +136,24 @@ export type CloudFlareError = { | ||
stopped: []; | ||
/** | ||
* Fired when messages are acknowledging | ||
*/ | ||
acknowledging_messages: [ | ||
{ | ||
lease_id: string; | ||
}[], | ||
{ | ||
lease_id: string; | ||
delay_seconds: number; | ||
}[] | ||
]; | ||
/** | ||
* Fired when messages have been acknowledged | ||
*/ | ||
acknowledged_messages: [ | ||
{ | ||
ackCount: number; | ||
retryCount: number; | ||
warnings: string[]; | ||
} | ||
]; | ||
} |
@@ -1,2 +0,2 @@ | ||
import type { ConsumerOptions } from "./types.js"; | ||
import type { ConsumerOptions, PullMessagesResponse } from "./types.js"; | ||
declare function validateOption(option: string, value: number, strict?: boolean): void; | ||
@@ -8,2 +8,7 @@ /** | ||
declare function assertOptions(options: ConsumerOptions): void; | ||
export { assertOptions, validateOption }; | ||
/** | ||
* Determine if the response has messages in it. | ||
* @param response The response from CloudFlare. | ||
*/ | ||
declare function hasMessages(response: PullMessagesResponse): boolean; | ||
export { assertOptions, validateOption, hasMessages }; |
{ | ||
"name": "@bbc/cloudflare-queue-consumer", | ||
"version": "0.0.2", | ||
"version": "0.0.3", | ||
"description": "Build CloudFlare Queue applications without the boilerplate", | ||
@@ -46,3 +46,4 @@ "repository": { | ||
"posttest": "npm run lint && npm run format:check", | ||
"generate-docs": "typedoc" | ||
"generate-docs": "typedoc", | ||
"dev": "DEBUG=cloudflare-queue-consumer node ./example/index.js" | ||
}, | ||
@@ -49,0 +50,0 @@ "devDependencies": { |
@@ -31,3 +31,11 @@ # cloudflare-queue-consumer | ||
// TODO: Add example | ||
const consumer = new Consumer({ | ||
accountId: process.env.ACCOUNT_ID, // Your CloudFlare account ID | ||
queueId: process.env.QUEUE_ID, // The Queue ID that you want to use. | ||
handleMessage: async (message) => { | ||
// Your message handling code... | ||
}, | ||
}); | ||
consumer.start(); | ||
``` | ||
@@ -39,4 +47,10 @@ | ||
TODO: Add more information | ||
In order to authenticate with the CloudFlare API, you will need to create an API token with read and write access to CloudFlare Queues, more information can be found [here](https://developers.cloudflare.com/queues/reference/pull-consumers/#create-api-tokens). | ||
Copy that token and set it as the value for an environment variable named `QUEUES_API_TOKEN`. | ||
### Example project | ||
You'll aldo find an example project in the folder `./example`, set the variables `ACCOUNT_ID` and `QUEUE_ID` and then run this with the command `pnpm dev`. | ||
## Contributing | ||
@@ -43,0 +57,0 @@ |
@@ -8,6 +8,14 @@ import { TypedEventEmitter } from "./emitter.js"; | ||
} from "./types.js"; | ||
import { assertOptions } from "./validation.js"; | ||
import { assertOptions, hasMessages } from "./validation.js"; | ||
import { queuesClient } from "./lib/cloudflare.js"; | ||
import { logger } from "./logger.js"; | ||
import { | ||
toProviderError, | ||
ProviderError, | ||
toStandardError, | ||
toTimeoutError, | ||
TimeoutError, | ||
} from "./errors.js"; | ||
// TODO: Document how to use this in the README | ||
/** | ||
@@ -23,4 +31,9 @@ * [Usage](https://bbc.github.io/cloudflare-queue-consumer/index.html#usage) | ||
private pollingWaitTimeMs: number; | ||
private pollingTimeoutId: NodeJS.Timeout; | ||
private stopped = true; | ||
private isPolling = false; | ||
private handleMessageTimeout: number; | ||
private alwaysAcknowledge: number; | ||
private retryMessageDelay: number; | ||
public abortController: AbortController; | ||
@@ -40,2 +53,5 @@ /** | ||
this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 1000; | ||
this.handleMessageTimeout = options.handleMessageTimeout; | ||
this.alwaysAcknowledge = options.alwaysAcknowledge ?? false; | ||
this.retryMessageDelay = options.retryMessageDelay ?? 10; | ||
} | ||
@@ -51,16 +67,2 @@ | ||
/** | ||
* Returns the current status of the consumer. | ||
* This includes whether it is running or currently polling. | ||
*/ | ||
public get status(): { | ||
isRunning: boolean; | ||
isPolling: boolean; | ||
} { | ||
return { | ||
isRunning: !this.stopped, | ||
isPolling: this.isPolling, | ||
}; | ||
} | ||
/** | ||
* Start polling the queue. | ||
@@ -72,3 +74,7 @@ */ | ||
} | ||
// Create a new abort controller each time the consumer is started | ||
this.abortController = new AbortController(); | ||
logger.debug("starting"); | ||
this.stopped = false; | ||
this.emit("started"); | ||
this.poll(); | ||
@@ -78,9 +84,53 @@ } | ||
/** | ||
* A reusable options object for sqs.send that's used to avoid duplication. | ||
*/ | ||
private get fetchOptions(): { signal: AbortSignal } { | ||
return { | ||
// return the current abortController signal or a fresh signal that has not been aborted. | ||
// This effectively defaults the signal sent to not aborted | ||
signal: this.abortController?.signal || new AbortController().signal, | ||
}; | ||
} | ||
/** | ||
* Stop polling the queue. | ||
*/ | ||
public stop(): void { | ||
public stop(options?: { abort?: boolean }): void { | ||
if (this.stopped) { | ||
logger.debug("already_stopped"); | ||
return; | ||
} | ||
logger.debug("stopping"); | ||
this.stopped = true; | ||
if (this.pollingTimeoutId) { | ||
clearTimeout(this.pollingTimeoutId); | ||
this.pollingTimeoutId = undefined; | ||
} | ||
if (options?.abort) { | ||
logger.debug("aborting"); | ||
this.abortController.abort(); | ||
this.emit("aborted"); | ||
} | ||
this.emit("stopped"); | ||
} | ||
/** | ||
* Returns the current status of the consumer. | ||
* This includes whether it is running or currently polling. | ||
*/ | ||
public get status(): { | ||
isRunning: boolean; | ||
isPolling: boolean; | ||
} { | ||
return { | ||
isRunning: !this.stopped, | ||
isPolling: this.isPolling, | ||
}; | ||
} | ||
/** | ||
* Poll the queue for messages. | ||
@@ -101,4 +151,34 @@ */ | ||
const currentPollingTimeout: number = this.pollingWaitTimeMs; | ||
this.receiveMessage() | ||
.then((output: PullMessagesResponse) => this.handleQueueResponse(output)) | ||
.then((): void => { | ||
if (this.pollingTimeoutId) { | ||
clearTimeout(this.pollingTimeoutId); | ||
} | ||
this.pollingTimeoutId = setTimeout( | ||
() => this.poll(), | ||
currentPollingTimeout, | ||
); | ||
}) | ||
.catch((err): void => { | ||
// TODO: Adjust the error handling here | ||
// TODO: Add an extension to the polling timeout if auth error | ||
this.emit("error", err); | ||
setTimeout(() => this.poll(), this.pollingWaitTimeMs); | ||
}) | ||
.finally((): void => { | ||
this.isPolling = false; | ||
}); | ||
} | ||
/** | ||
* Send a request to CloudFlare Queues to retrieve messages | ||
* @param params The required params to receive messages from CloudFlare Queues | ||
*/ | ||
private async receiveMessage(): Promise<PullMessagesResponse> { | ||
try { | ||
const response = await queuesClient<PullMessagesResponse>({ | ||
const result = queuesClient<PullMessagesResponse>({ | ||
...this.fetchOptions, | ||
path: "messages/pull", | ||
@@ -114,43 +194,149 @@ method: "POST", | ||
if (!response.success) { | ||
this.emit("error", new Error("Failed to pull messages")); | ||
this.isPolling = false; | ||
return; | ||
} | ||
return result; | ||
} catch (err) { | ||
throw toProviderError(err, `Receive message failed: ${err.message}`); | ||
} | ||
} | ||
const { messages } = response.result; | ||
/** | ||
* Handles the response from AWS SQS, determining if we should proceed to | ||
* the message handler. | ||
* @param response The output from AWS SQS | ||
*/ | ||
private async handleQueueResponse( | ||
response: PullMessagesResponse, | ||
): Promise<void> { | ||
if (!response.success) { | ||
this.emit("error", new Error("Failed to pull messages")); | ||
this.isPolling = false; | ||
return; | ||
} | ||
if (!messages || messages.length === 0) { | ||
this.emit("empty"); | ||
this.isPolling = false; | ||
return; | ||
if (hasMessages(response)) { | ||
await Promise.all( | ||
response.result.messages.map((message: Message) => | ||
this.processMessage(message), | ||
), | ||
); | ||
this.emit("response_processed"); | ||
} else if (response) { | ||
this.emit("empty"); | ||
} | ||
} | ||
/** | ||
* Process a message that has been received from CloudFlare Queues. This will execute the message | ||
* handler and delete the message once complete. | ||
* @param message The message that was delivered from CloudFlare | ||
*/ | ||
private async processMessage(message: Message): Promise<void> { | ||
try { | ||
this.emit("message_received", message); | ||
// TODO: Invesitgate if we can do heartbear checks here like SQS Consumer | ||
// https://github.com/bbc/sqs-consumer/blob/main/src/consumer.ts#L339 | ||
const ackedMessage: Message = await this.executeHandler(message); | ||
if (ackedMessage?.id === message.id) { | ||
// TODO: In order to conserve API reate limits, it would be better to do this | ||
// in a batch, rather than one at a time. | ||
await this.acknowledgeMessage( | ||
[ | ||
{ | ||
lease_id: message.lease_id, | ||
}, | ||
], | ||
[], | ||
); | ||
this.emit("message_processed", message); | ||
} | ||
} catch (err) { | ||
this.emitError(err, message); | ||
const successfulMessages: string[] = []; | ||
const failedMessages: string[] = []; | ||
// TODO: In order to conserve API reate limits, it would be better to do this | ||
// in a batch, rather than one at a time. | ||
await this.acknowledgeMessage( | ||
[], | ||
[ | ||
{ | ||
lease_id: message.lease_id, | ||
delay_seconds: this.retryMessageDelay, | ||
}, | ||
], | ||
); | ||
} | ||
} | ||
for (const message of messages) { | ||
this.emit("message_received", message); | ||
try { | ||
const result = await this.handleMessage(message); | ||
logger.debug("message_processed", { result }); | ||
if (result) { | ||
successfulMessages.push(message.lease_id); | ||
this.emit("message_processed", message); | ||
} | ||
} catch (e) { | ||
failedMessages.push(message.lease_id); | ||
this.emit("processing_error", e, message); | ||
} | ||
/** | ||
* Trigger the applications handleMessage function | ||
* @param message The message that was received from CloudFlare | ||
*/ | ||
private async executeHandler(message: Message): Promise<Message> { | ||
let handleMessageTimeoutId: NodeJS.Timeout | undefined = undefined; | ||
try { | ||
let result; | ||
if (this.handleMessageTimeout) { | ||
const pending: Promise<void> = new Promise((_, reject): void => { | ||
handleMessageTimeoutId = setTimeout((): void => { | ||
reject(new TimeoutError()); | ||
}, this.handleMessageTimeout); | ||
}); | ||
result = await Promise.race([this.handleMessage(message), pending]); | ||
} else { | ||
result = await this.handleMessage(message); | ||
} | ||
logger.debug("acknowledging_messages", { | ||
successfulMessages, | ||
failedMessages, | ||
}); | ||
return !this.alwaysAcknowledge && result instanceof Object | ||
? result | ||
: message; | ||
} catch (err) { | ||
if (err instanceof TimeoutError) { | ||
throw toTimeoutError( | ||
err, | ||
`Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`, | ||
); | ||
} else if (err instanceof Error) { | ||
throw toStandardError( | ||
err, | ||
`Unexpected message handler failure: ${err.message}`, | ||
); | ||
} | ||
throw err; | ||
} finally { | ||
if (handleMessageTimeoutId) { | ||
clearTimeout(handleMessageTimeoutId); | ||
} | ||
} | ||
} | ||
await queuesClient<AckMessageResponse>({ | ||
/** | ||
* Change the visibility timeout on a message | ||
* @param message The message to change the value of | ||
* @param timeout The new timeout that should be set | ||
*/ | ||
private async acknowledgeMessage( | ||
acks: { | ||
lease_id: string; | ||
}[], | ||
retries: { | ||
lease_id: string; | ||
delay_seconds: number; | ||
}[], | ||
): Promise<AckMessageResponse> { | ||
try { | ||
// TODO: this is pretty hacky | ||
// TODO: This doesn't appear to be acknowledging correctly.... | ||
const input = { acks, retries }; | ||
this.emit("acknowledging_messages", acks, retries); | ||
const result = await queuesClient<AckMessageResponse>({ | ||
...this.fetchOptions, | ||
path: "messages/ack", | ||
method: "POST", | ||
body: { acks: successfulMessages, retries: failedMessages }, | ||
body: input, | ||
accountId: this.accountId, | ||
@@ -160,10 +346,33 @@ queueId: this.queueId, | ||
this.emit("response_processed"); | ||
} catch (e) { | ||
this.emit("error", e); | ||
if (!result.success) { | ||
throw new Error("Message Acknowledgement did not succeed."); | ||
} | ||
this.emit("acknowledged_messages", result.result); | ||
return result; | ||
} catch (err) { | ||
this.emit( | ||
"error", | ||
toProviderError(err, `Error acknowledging messages: ${err.message}`), | ||
); | ||
} | ||
} | ||
this.isPolling = false; | ||
setTimeout(() => this.poll(), this.pollingWaitTimeMs); | ||
/** | ||
* Emit one of the consumer's error events depending on the error received. | ||
* @param err The error object to forward on | ||
* @param message The message that the error occurred on | ||
*/ | ||
private emitError(err: Error, message?: Message): void { | ||
if (!message) { | ||
this.emit("error", err); | ||
} else if (err.name === ProviderError.name) { | ||
this.emit("error", err, message); | ||
} else if (err instanceof TimeoutError) { | ||
this.emit("timeout_error", err, message); | ||
} else { | ||
this.emit("processing_error", err, message); | ||
} | ||
} | ||
} |
@@ -9,3 +9,5 @@ import { throwErrorIfResponseNotOk } from "./fetch.js"; | ||
if (!QUEUES_API_TOKEN) { | ||
throw new Error("Missing Cloudflare credentials"); | ||
throw new Error( | ||
"Missing Cloudflare credentials, please set a QUEUES_API_TOKEN in the environment variables.", | ||
); | ||
} | ||
@@ -24,2 +26,3 @@ | ||
queueId, | ||
signal, | ||
}): Promise<T> { | ||
@@ -37,2 +40,3 @@ const { QUEUES_API_TOKEN } = getCredentials(); | ||
body: JSON.stringify(body), | ||
signal, | ||
}, | ||
@@ -39,0 +43,0 @@ ); |
@@ -5,6 +5,41 @@ /** | ||
export interface ConsumerOptions { | ||
batchSize: number; | ||
visibilityTimeoutMs: number; | ||
/** | ||
* The number of messages to request from CloudFlare when polling (default `10`). | ||
* @defaultvalue `10` | ||
*/ | ||
batchSize?: number; | ||
/** | ||
* The duration (in milliseconds) that the received messages are hidden from subsequent | ||
* retrieve requests after being retrieved by a ReceiveMessage request. | ||
* @defaultvalue 1000 | ||
*/ | ||
visibilityTimeoutMs?: number; | ||
/** | ||
* You CloudFlare account id | ||
*/ | ||
accountId: string; | ||
/** | ||
* The ID of the queue you want to receive messages from. | ||
*/ | ||
queueId: string; | ||
/** | ||
* Time in ms to wait for `handleMessage` to process a message before timing out. | ||
* | ||
* Emits `timeout_error` on timeout. By default, if `handleMessage` times out, | ||
* the unprocessed message returns to the end of the queue. | ||
*/ | ||
handleMessageTimeout?: number; | ||
/** | ||
* By default, the consumer will treat an empty object or array from either of the | ||
* handlers as a acknowledgement of no messages and will not delete those messages as | ||
* a result. Set this to `true` to always acknowledge all messages no matter the returned | ||
* value. | ||
* @defaultvalue `false` | ||
*/ | ||
alwaysAcknowledge?: boolean; | ||
/** | ||
* The amount of time to delay a message for before retrying (in seconds) | ||
* @defaultvalue 10 | ||
*/ | ||
retryMessageDelay?: number; | ||
} | ||
@@ -18,2 +53,6 @@ | ||
lease_id: string; | ||
metadata: { | ||
"CF-sourceMessageSource": string; | ||
"CF-Content-Type": "json" | "text"; | ||
}; | ||
}; | ||
@@ -103,2 +142,24 @@ | ||
stopped: []; | ||
/** | ||
* Fired when messages are acknowledging | ||
*/ | ||
acknowledging_messages: [ | ||
{ | ||
lease_id: string; | ||
}[], | ||
{ | ||
lease_id: string; | ||
delay_seconds: number; | ||
}[], | ||
]; | ||
/** | ||
* Fired when messages have been acknowledged | ||
*/ | ||
acknowledged_messages: [ | ||
{ | ||
ackCount: number; | ||
retryCount: number; | ||
warnings: string[]; | ||
}, | ||
]; | ||
} |
@@ -1,2 +0,2 @@ | ||
import type { ConsumerOptions } from "./types.js"; | ||
import type { ConsumerOptions, PullMessagesResponse } from "./types.js"; | ||
@@ -41,2 +41,10 @@ const requiredOptions = ["accountId", "queueId", "handleMessage"]; | ||
export { assertOptions, validateOption }; | ||
/** | ||
* Determine if the response has messages in it. | ||
* @param response The response from CloudFlare. | ||
*/ | ||
function hasMessages(response: PullMessagesResponse): boolean { | ||
return response?.result?.messages && response.result.messages.length > 0; | ||
} | ||
export { assertOptions, validateOption, hasMessages }; |
69975
83.47%42
7.69%2010
82.23%63
28.57%