Comparing version 0.0.0-20230913120345 to 0.0.0-20231106083335
@@ -1,3 +0,7 @@ | ||
import type { VhostConfig } from 'rascal'; | ||
import type { VhostConfig, QueueConfig } from 'rascal'; | ||
export declare const vhost = "rabbit"; | ||
export declare const createBaseName: (queueName: string) => string; | ||
export type RetryConfig = ReadonlyArray<{ | ||
delayInSeconds: number; | ||
}>; | ||
export type DefaultVostConfig = { | ||
@@ -18,2 +22,4 @@ queueName: string; | ||
bindingKeys?: string[]; | ||
publicationRoutingKey?: string; | ||
retry?: RetryConfig; | ||
}; | ||
@@ -27,1 +33,41 @@ }; | ||
export declare const createBrokerConfig: (vhostConfigs: ReadonlyArray<VhostConfig>) => import("rascal").BrokerConfig; | ||
export declare const hasSubscriptions: (config: VhostConfig | DefaultVostConfig) => boolean; | ||
export declare const getSubscriptionConfig: (config: VhostConfig | DefaultVostConfig) => { | ||
name: string; | ||
vhost?: string | undefined; | ||
queue?: string | undefined; | ||
contentType?: string | undefined; | ||
options?: import("amqplib").Options.Consume | undefined; | ||
prefetch?: number | undefined; | ||
retry?: boolean | import("rascal").RetryConfig | undefined; | ||
handler?: string | undefined; | ||
handlers?: string[] | undefined; | ||
recovery?: any; | ||
deferCloseChannel?: number | undefined; | ||
encryption?: string | undefined; | ||
autoCreated?: boolean | undefined; | ||
redeliveries?: { | ||
counter: string; | ||
limit: number; | ||
timeout?: number | undefined; | ||
} | undefined; | ||
}; | ||
export declare const getSubscriptionName: (config: VhostConfig | DefaultVostConfig) => string; | ||
export declare const getSubscriptionQueueName: (config: VhostConfig | DefaultVostConfig) => string | undefined; | ||
export declare const getSubscriptionMaxMessageProcessingLimit: (config: VhostConfig | DefaultVostConfig) => any; | ||
export declare const getQueueConfigs: (vhostConfig: VhostConfig) => (string | QueueConfig)[]; | ||
export declare const getQueueName: (queueConfig: string | QueueConfig) => string | undefined; | ||
export declare const getRetryConfig: (config: VhostConfig | DefaultVostConfig) => RetryConfig; | ||
export declare const getRetryPublicationName: (config: VhostConfig | DefaultVostConfig, retryCount: number) => string; | ||
export declare const getPublicationConfig: (vHostConfig: VhostConfig | DefaultVostConfig) => { | ||
name: string; | ||
vhost?: string | undefined; | ||
exchange?: string | undefined; | ||
queue?: string | undefined; | ||
routingKey?: string | undefined; | ||
confirm?: boolean | undefined; | ||
options?: import("amqplib").Options.Publish | undefined; | ||
autoCreated?: boolean | undefined; | ||
deprecated?: boolean | undefined; | ||
encryption?: string | undefined; | ||
}; |
@@ -6,6 +6,7 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.createBrokerConfig = exports.ensureVhostConfig = exports.createDefaultVhostConfig = exports.vhost = void 0; | ||
exports.getPublicationConfig = exports.getRetryPublicationName = exports.getRetryConfig = exports.getQueueName = exports.getQueueConfigs = exports.getSubscriptionMaxMessageProcessingLimit = exports.getSubscriptionQueueName = exports.getSubscriptionName = exports.getSubscriptionConfig = exports.hasSubscriptions = exports.createBrokerConfig = exports.ensureVhostConfig = exports.createDefaultVhostConfig = exports.createBaseName = exports.vhost = void 0; | ||
const rascal_1 = require("rascal"); | ||
const lodash_mergewith_1 = __importDefault(require("lodash.mergewith")); | ||
const connection_1 = require("./connection"); | ||
const utils_1 = require("./utils"); | ||
exports.vhost = 'rabbit'; | ||
@@ -18,4 +19,5 @@ const createBaseName = (queueName) => { | ||
}; | ||
exports.createBaseName = createBaseName; | ||
const createDefaultVhostConfig = ({ queueName, options, }) => { | ||
const { withSubscription, withPublication, subscriptionPrefetch, subscriptionMessageProcessLimit, subscriptionDisabled, subscriptionRecovery, } = options; | ||
const { withSubscription, withPublication, subscriptionPrefetch, subscriptionMessageProcessLimit, subscriptionDisabled, subscriptionRecovery, retry, publicationRoutingKey, } = options; | ||
if (!withSubscription && !withPublication) { | ||
@@ -31,3 +33,3 @@ throw new Error("at least one of the options 'withSubscription' or 'withPublication' must be set to true"); | ||
const baseName = queueName.split('.')[0]; | ||
const queueExchange = options.exchange ?? `${createBaseName(queueName)}.exchange`; | ||
const queueExchange = options.exchange ?? `${(0, exports.createBaseName)(queueName)}.exchange`; | ||
const deadLetterQueueExchange = `${baseName}.exchange`; | ||
@@ -51,2 +53,14 @@ const deadLetterQueueName = `${baseName}.backout`; | ||
}, | ||
...(retry ?? []).map(({ delayInSeconds }) => ({ | ||
name: `${baseName}.${delayInSeconds}`, | ||
options: { | ||
durable: true, | ||
arguments: { | ||
'x-dead-letter-exchange': deadLetterQueueExchange, | ||
'x-dead-letter-routing-key': queueName, | ||
'x-message-ttl': delayInSeconds * 1000, | ||
}, | ||
}, | ||
assert: true, | ||
})), | ||
{ | ||
@@ -64,2 +78,8 @@ name: deadLetterQueueName, | ||
}, | ||
...(retry ?? []).map(({ delayInSeconds }) => ({ | ||
source: queueExchange, | ||
destination: `${baseName}.${delayInSeconds}`, | ||
destinationType: 'queue', | ||
bindingKey: `${baseName}.${delayInSeconds}`, | ||
})), | ||
{ | ||
@@ -75,7 +95,7 @@ source: deadLetterQueueExchange, | ||
? { | ||
[`${createBaseName(queueName)}-subscription`]: { | ||
[`${(0, exports.createBaseName)(queueName)}-subscription`]: { | ||
queue: queueName, | ||
contentType: 'application/json', | ||
vhost: exports.vhost, | ||
prefetch: subscriptionPrefetch ?? 10, | ||
prefetch: subscriptionPrefetch ?? 1, | ||
options: { | ||
@@ -97,10 +117,24 @@ arguments: { | ||
publications: { | ||
...(withPublication | ||
...(withPublication || retry | ||
? { | ||
[`${createBaseName(queueName)}-publication`]: { | ||
// TODO only if with publication | ||
[`${(0, exports.createBaseName)(queueName)}-publication`]: { | ||
exchange: `${baseName}.exchange`, | ||
vhost: exports.vhost, | ||
routingKey: queueName, | ||
routingKey: publicationRoutingKey ?? queueName, | ||
confirm: true, | ||
}, | ||
...(retry ?? []) | ||
.map(({ delayInSeconds }) => ({ | ||
[`${(0, exports.createBaseName)(queueName)}-${delayInSeconds}-publication`]: { | ||
exchange: `${baseName}.exchange`, | ||
vhost: exports.vhost, | ||
routingKey: `${baseName}.${delayInSeconds}`, | ||
confirm: true, | ||
}, | ||
})) | ||
.reduce((a, v) => ({ | ||
...a, | ||
[Object.keys(v)[0]]: v[Object.keys(v)[0]], | ||
}), {}), | ||
} | ||
@@ -136,2 +170,87 @@ : undefined), | ||
exports.createBrokerConfig = createBrokerConfig; | ||
const hasSubscriptions = (config) => { | ||
const vhostConfig = (0, exports.ensureVhostConfig)(config); | ||
return Object.keys(vhostConfig.subscriptions ?? {}).length > 0; | ||
}; | ||
exports.hasSubscriptions = hasSubscriptions; | ||
const getSubscriptionConfig = (config) => { | ||
const vhostConfig = (0, exports.ensureVhostConfig)(config); | ||
const subscriptionNames = Object.keys(vhostConfig.subscriptions ?? {}); | ||
const subscriptions = Object.values(vhostConfig.subscriptions ?? {}); | ||
const configStringify = JSON.stringify(config); | ||
if (subscriptions.length === 0) { | ||
throw new Error(`No subscription found in config ${configStringify} please set the option withSubscription to true`); | ||
} | ||
if (subscriptions.length > 1) { | ||
throw new Error(`Multiple subscriptions found for config ${configStringify} ensure only one subscription is configured per vhost config`); | ||
} | ||
return { | ||
...subscriptions[0], | ||
name: subscriptionNames[0], | ||
}; | ||
}; | ||
exports.getSubscriptionConfig = getSubscriptionConfig; | ||
const getSubscriptionName = (config) => (0, exports.getSubscriptionConfig)(config).name; | ||
exports.getSubscriptionName = getSubscriptionName; | ||
const getSubscriptionQueueName = (config) => (0, exports.getSubscriptionConfig)(config).queue; | ||
exports.getSubscriptionQueueName = getSubscriptionQueueName; | ||
const getSubscriptionMaxMessageProcessingLimit = (config) => (0, exports.getSubscriptionConfig)(config).options?.arguments?.messageProcessLimit; | ||
exports.getSubscriptionMaxMessageProcessingLimit = getSubscriptionMaxMessageProcessingLimit; | ||
const getQueueConfigs = (vhostConfig) => Array.isArray(vhostConfig.queues) ? vhostConfig.queues : Object.values(vhostConfig.queues ?? {}); | ||
exports.getQueueConfigs = getQueueConfigs; | ||
const getQueueName = (queueConfig) => { | ||
return typeof queueConfig === 'string' ? queueConfig : queueConfig.name; | ||
}; | ||
exports.getQueueName = getQueueName; | ||
const getRetryConfig = (config) => { | ||
if (!(0, exports.hasSubscriptions)(config)) { | ||
return []; | ||
} | ||
const vhostConfig = (0, exports.ensureVhostConfig)(config); | ||
const subscriptionQueueName = (0, exports.getSubscriptionQueueName)(config); | ||
const queueConfigs = (0, exports.getQueueConfigs)(vhostConfig); | ||
return queueConfigs | ||
.map((queueConfig) => { | ||
const baseName = subscriptionQueueName?.split('.')[0]; | ||
const queueName = (0, exports.getQueueName)(queueConfig); | ||
if (!queueName || queueName.split('.').length !== 2) { | ||
return undefined; | ||
} | ||
const queueConfigBaseName = queueName.split('.')[0]; | ||
const queueConfigRetryDelayInSeconds = queueName.split('.')[1] ?? ''; | ||
if (baseName === queueConfigBaseName && (0, utils_1.isPositivNumeric)(queueConfigRetryDelayInSeconds)) { | ||
return { delayInSeconds: parseInt(queueConfigRetryDelayInSeconds, 10) }; | ||
} | ||
return undefined; | ||
}) | ||
.filter(utils_1.notEmpty); | ||
}; | ||
exports.getRetryConfig = getRetryConfig; | ||
const getRetryPublicationName = (config, retryCount) => { | ||
const subscriptionQueueName = (0, exports.getSubscriptionQueueName)(config); | ||
if (!subscriptionQueueName) { | ||
throw new Error(`subscriptionQueueName expected for vHostConfig ${JSON.stringify(config)}`); | ||
} | ||
return `${(0, exports.createBaseName)(subscriptionQueueName)}-${(0, exports.getRetryConfig)(config)[retryCount].delayInSeconds}-publication`; | ||
}; | ||
exports.getRetryPublicationName = getRetryPublicationName; | ||
const getPublicationConfig = (vHostConfig) => { | ||
const vhostConfig = (0, exports.ensureVhostConfig)(vHostConfig); | ||
const retryPublicationNames = (0, exports.getRetryConfig)(vHostConfig).map((_, index) => (0, exports.getRetryPublicationName)(vHostConfig, index)); | ||
const publications = Object.entries(vhostConfig.publications ?? {}) | ||
.flatMap(([key, value]) => ({ | ||
...value, | ||
name: key, | ||
})) | ||
.filter((publication) => !retryPublicationNames.includes(publication.name)); | ||
const vHostConfigStringify = JSON.stringify(vHostConfig); | ||
if (publications.length === 0) { | ||
throw new Error(`No publications found in config ${vHostConfigStringify} please set the option withPublication to true`); | ||
} | ||
if (publications.length > 1) { | ||
throw new Error(`Multiple publications found for config ${vHostConfigStringify} ensure only one publications is configured per vhost config`); | ||
} | ||
return publications[0]; | ||
}; | ||
exports.getPublicationConfig = getPublicationConfig; | ||
//# sourceMappingURL=config.js.map |
@@ -0,4 +1,6 @@ | ||
import type { RetryConfig } from './config'; | ||
export { initBroker } from './broker'; | ||
export { publish } from './publication'; | ||
export { subscribe, Message } from './subscription'; | ||
export { subscribe, Message, OnFinalError, OnValidationError, RetryErrors, FinalErrorStrategy, } from './subscription'; | ||
export { RetryConfig, DefaultVostConfig } from './config'; | ||
export declare const mq: { | ||
@@ -11,1 +13,3 @@ initBroker: (vhostConfig: import("./config").VhostConfigs, log: import("pino").default.Logger, cleanup: import("./broker").Cleanup) => Promise<void>; | ||
}; | ||
export declare const RETRY_CONFIG_3_TIMES_IN_10_MINUTES: RetryConfig; | ||
export declare const RETRY_CONFIG_3_TIMES_IN_12_HOURS: RetryConfig; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.mq = exports.subscribe = exports.publish = exports.initBroker = void 0; | ||
exports.RETRY_CONFIG_3_TIMES_IN_12_HOURS = exports.RETRY_CONFIG_3_TIMES_IN_10_MINUTES = exports.mq = exports.subscribe = exports.publish = exports.initBroker = void 0; | ||
const broker_1 = require("./broker"); | ||
@@ -18,2 +18,14 @@ const publication_1 = require("./publication"); | ||
}; | ||
// E.g.: External service is overloaded. | ||
exports.RETRY_CONFIG_3_TIMES_IN_10_MINUTES = [ | ||
{ delayInSeconds: 10 }, | ||
{ delayInSeconds: 90 }, | ||
{ delayInSeconds: 500 }, | ||
]; | ||
// E.g.: External service is down. Give service provider time to recover. | ||
exports.RETRY_CONFIG_3_TIMES_IN_12_HOURS = [ | ||
{ delayInSeconds: 60 }, | ||
{ delayInSeconds: 600 }, | ||
{ delayInSeconds: 12 * 3600 - 660 }, | ||
]; | ||
//# sourceMappingURL=index.js.map |
@@ -12,21 +12,5 @@ "use strict"; | ||
const broker_1 = require("./broker"); | ||
const getPublicationConfig = (vHostConfig) => { | ||
const vhostConfig = (0, config_1.ensureVhostConfig)(vHostConfig); | ||
const publicationNames = Object.keys(vhostConfig.publications ?? {}); | ||
const publications = Object.values(vhostConfig.publications ?? {}); | ||
const vHostConfigJson = JSON.stringify(vHostConfig); | ||
if (publications.length === 0) { | ||
throw new Error(`No publications found in config ${vHostConfigJson} please set the option withPublication to true`); | ||
} | ||
if (publications.length > 1) { | ||
throw new Error(`Multiple publications found for config ${vHostConfigJson} ensure only one publications is configured per vhost config`); | ||
} | ||
return { | ||
...publications[0], | ||
name: publicationNames[0], | ||
}; | ||
}; | ||
const getRoutingKey = (publication) => { | ||
const routingKeyOverrides = publication.routingKey; | ||
const publicationConfig = getPublicationConfig(publication.vHostConfig); | ||
const publicationConfig = (0, config_1.getPublicationConfig)(publication.vHostConfig); | ||
const routingKey = routingKeyOverrides ?? publicationConfig.routingKey; | ||
@@ -57,3 +41,3 @@ if (!routingKey) { | ||
validatePublishingContent(contentValidator, content); | ||
const publicationConfig = getPublicationConfig(vHostConfig); | ||
const publicationConfig = (0, config_1.getPublicationConfig)(vHostConfig); | ||
const publicationName = publicationConfig.name; | ||
@@ -60,0 +44,0 @@ const overrides = createOverrides(publication); |
import type { Logger } from 'pino'; | ||
import type { z } from 'zod'; | ||
import type { MessageFields, MessageProperties } from 'amqplib'; | ||
import type { SubscriptionConfig, VhostConfig } from 'rascal'; | ||
import type { Message as AmqpMessage, MessageFields, MessageProperties } from 'amqplib'; | ||
import type { Recovery, SubscriberSessionAsPromised, SubscriptionConfig, VhostConfig } from 'rascal'; | ||
import type { ZodTypeDef } from 'zod/lib/types'; | ||
@@ -11,4 +11,13 @@ import type { DefaultVostConfig } from './config'; | ||
properties: MessageProperties; | ||
retryCount: number; | ||
maxRetries: number; | ||
}; | ||
export type MessageHandler<T> = (message: Message<T>) => Promise<void>; | ||
export type FinalErrorStrategy = 'ack' | 'nack'; | ||
export type OnFinalError<ZodOutput> = (error: Error, message: Message<ZodOutput>, log: Logger) => FinalErrorStrategy | Promise<FinalErrorStrategy>; | ||
export type Subset<K> = { | ||
[attr in keyof K]?: K[attr] extends object ? Subset<K[attr]> : K[attr]; | ||
}; | ||
export type OnValidationError<ZodInput> = (message: Message<Subset<ZodInput>>, error: z.ZodError<ZodInput>) => Promise<void>; | ||
export type RetryErrors = ReadonlyArray<any>; | ||
export type Subscription<ZodOutput, ZodInput> = SubscriptionConfig & { | ||
@@ -18,4 +27,8 @@ vHostConfig: VhostConfig | DefaultVostConfig; | ||
messageHandler: MessageHandler<ZodOutput>; | ||
retryOnlyOnTheseErrors?: RetryErrors; | ||
onFinalError?: OnFinalError<ZodOutput>; | ||
onValidationError?: OnValidationError<ZodInput>; | ||
log: Logger; | ||
}; | ||
export declare const handleMessage: <ZodOutput, ZodInput>(subscription: Subscription<ZodOutput, ZodInput>, subscriberSession: SubscriberSessionAsPromised, amqpMessage: AmqpMessage, rawContent: any, ackOrNack: (err?: Error, recovery?: Recovery | Recovery[]) => void) => Promise<void>; | ||
export declare const subscribe: <ZodOutput, ZodInput>(subscription: Subscription<ZodOutput, ZodInput>) => Promise<void>; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.subscribe = void 0; | ||
exports.subscribe = exports.handleMessage = void 0; | ||
const js_utils_1 = require("@axah/js-utils"); | ||
const config_1 = require("./config"); | ||
const broker_1 = require("./broker"); | ||
const getSubscriptionConfig = (vHostConfig) => { | ||
const vhostConfig = (0, config_1.ensureVhostConfig)(vHostConfig); | ||
const subscriptionNames = Object.keys(vhostConfig.subscriptions ?? {}); | ||
const subscriptions = Object.values(vhostConfig.subscriptions ?? {}); | ||
const vHostConfigJson = JSON.stringify(vHostConfig); | ||
if (subscriptions.length === 0) { | ||
throw new Error(`No subscription found in config ${vHostConfigJson} please set the option withSubscription to true`); | ||
const subscription_max_processing_1 = require("./subscription-max-processing"); | ||
const subscription_retry_1 = require("./subscription-retry"); | ||
const getJsonContent = (rawContent, log) => { | ||
let jsonContent = rawContent; | ||
if (rawContent instanceof Buffer) { | ||
log.warn('Received unparsed rawContent from rascal. Will convert Buffer to JSON'); | ||
jsonContent = rawContent.toJSON(); | ||
} | ||
if (subscriptions.length > 1) { | ||
throw new Error(`Multiple subscriptions found for config ${vHostConfigJson} ensure only one subscription is configured per vhost config`); | ||
} | ||
return { | ||
...subscriptions[0], | ||
name: subscriptionNames[0], | ||
}; | ||
return jsonContent; | ||
}; | ||
const messageProcessCountMap = new Map(); | ||
const subscribe = async (subscription) => { | ||
const { vHostConfig, contentValidator, messageHandler, log } = subscription; | ||
const subscriptionConfig = getSubscriptionConfig(vHostConfig); | ||
if (subscriptionConfig.options?.arguments?.disabled) { | ||
log.warn(`Subscription on queue: ${subscriptionConfig.queue} disabled via config.`); | ||
const handleMessage = async (subscription, subscriberSession, amqpMessage, rawContent, ackOrNack) => { | ||
const { vHostConfig, contentValidator, messageHandler, log, onValidationError, onFinalError } = subscription; | ||
const { fields, properties } = amqpMessage; | ||
const { messageId, headers } = amqpMessage.properties; | ||
const retryCount = parseInt(headers.retry || '0', 10); | ||
const maxRetries = (0, config_1.getRetryConfig)(vHostConfig).length; | ||
const subscriptionConfig = (0, config_1.getSubscriptionConfig)(vHostConfig); | ||
const subscriptionName = subscriptionConfig.name; | ||
if ((0, subscription_max_processing_1.isMaxMessageProcessingReached)(vHostConfig)) { | ||
const processLimitReached = `The messageProcessLimit (${(0, config_1.getSubscriptionMaxMessageProcessingLimit)(vHostConfig)}) is reached. Will cancel subscription and stop consuming messages. The message with messageId: ${messageId} will be rolled backed to the top of the queue `; | ||
log.warn(processLimitReached); | ||
ackOrNack(new Error(processLimitReached), { strategy: 'nack', defer: 3000, requeue: true }); | ||
await subscriberSession.cancel(); | ||
return; | ||
} | ||
const subscriptionName = subscriptionConfig.name; | ||
const broker = (0, broker_1.getBroker)(); | ||
const subscriptionResult = await broker.subscribe(subscriptionName, subscription); | ||
subscriptionResult.on('message', async (message, rawContent, ackOrNack) => { | ||
const messageProcessCount = messageProcessCountMap.get(subscriptionName) ?? 1; | ||
messageProcessCountMap.set(subscriptionName, messageProcessCount + 1); | ||
const messageProcessLimit = subscriptionConfig.options?.arguments?.messageProcessLimit; | ||
if (messageProcessLimit && messageProcessCount > messageProcessLimit) { | ||
const { messageId, headers } = message.properties; | ||
log.warn(`The messageProcessLimit (${messageProcessLimit}) is reached. Will cancel subscription and stop consuming messages. The message with messageId: ${messageId} and headers: ${JSON.stringify(headers)} will be rolled backed to the top of the queue `); | ||
ackOrNack(new Error(`Messaged with messageId: ${messageId} and headers: ${JSON.stringify(headers)} won't be processed. Processor is disabled via messageProcessLimit. Current messageProcessCount is ${JSON.stringify(messageProcessCountMap)}`), { strategy: 'nack', defer: 3000, requeue: true }); | ||
await subscriptionResult.cancel(); | ||
const jsonContent = getJsonContent(rawContent, log); | ||
const parseResult = contentValidator.safeParse(jsonContent); | ||
if (!parseResult.success) { | ||
const logContext = { jsonContent, err: parseResult.error }; | ||
const validationError = `Validation error while reading message from ${subscriptionName}. Error "${parseResult.error.message}" occurred. Will nack message and send to dead letter queue`; | ||
log.error(logContext, validationError); | ||
if (onValidationError) { | ||
const validationMessage = { content: rawContent, fields, properties, retryCount, maxRetries }; | ||
try { | ||
await onValidationError(validationMessage, parseResult.error); | ||
} | ||
catch (onValidationErr) { | ||
log.error({ err: (0, js_utils_1.throwableToError)(onValidationErr), originalError: validationError, jsonContent }, 'Error happened during onValidationError messageHandler. Will still nack message'); | ||
} | ||
} | ||
ackOrNack(new Error(validationError), { strategy: 'nack' }); | ||
return; | ||
} | ||
const content = parseResult.data; | ||
const message = { content, fields, properties, retryCount, maxRetries }; | ||
try { | ||
await messageHandler(message); | ||
ackOrNack(); | ||
} | ||
catch (e) { | ||
const err = (0, js_utils_1.throwableToError)(e); | ||
const subscriptionQueueName = (0, config_1.getSubscriptionQueueName)(vHostConfig); | ||
const errorMessage = `Error while processing message from ${subscriptionName}. Error "${err.message}" occurred while processing message.`; | ||
const logContext = { err, content }; | ||
if ((0, broker_1.isShuttingDown)()) { | ||
const shutDownErrorMessage = `${errorMessage} This happened during shutdown of broker and is ok. The message will be requeued`; | ||
log.info(logContext, shutDownErrorMessage); | ||
ackOrNack(new Error(err.message), { strategy: 'nack', defer: 0, requeue: true }); | ||
return; | ||
} | ||
let jsonContent = rawContent; | ||
if (rawContent instanceof Buffer) { | ||
jsonContent = rawContent.toJSON(); | ||
} | ||
const parseResult = contentValidator.safeParse(jsonContent); | ||
if (!parseResult.success) { | ||
log.error({ jsonContent, err: parseResult.error }, `Validation error while reading message from ${subscriptionName}. Error "${parseResult.error.message}" occurred. Will nack message and send to dead letter queue`); | ||
ackOrNack(new Error(`Bad message: ${JSON.stringify(message)} with content: ${jsonContent}`), { | ||
strategy: 'nack', | ||
}); | ||
if ((0, subscription_retry_1.shouldRetry)(subscription, err, retryCount) && subscriptionQueueName) { | ||
const publication = (0, config_1.getRetryPublicationName)(vHostConfig, retryCount); | ||
const { delayInSeconds } = (0, config_1.getRetryConfig)(vHostConfig)[retryCount]; | ||
const messageForwardedMessage = `${errorMessage} Will forward message via publication: ${publication} which will retry with a delay: ${delayInSeconds} seconds`; | ||
log.warn(logContext, messageForwardedMessage); | ||
ackOrNack(err, [ | ||
{ | ||
strategy: 'forward', | ||
publication, | ||
options: { options: { headers: { retry: retryCount + 1 } } }, | ||
attempts: retryCount + 1, | ||
}, | ||
{ strategy: 'nack' }, | ||
]); | ||
return; | ||
} | ||
const content = parseResult.data; | ||
try { | ||
const { fields, properties } = message; | ||
await messageHandler({ content, fields, properties }); | ||
ackOrNack(); | ||
} | ||
catch (e) { | ||
const err = (0, js_utils_1.throwableToError)(e); | ||
if ((0, broker_1.isShuttingDown)()) { | ||
const errorWhileShutDown = `Error: ${err.message} while shutting down subscription. This is ok. We will requeue message`; | ||
log.info(err, errorWhileShutDown); | ||
ackOrNack(new Error(err.message), { strategy: 'nack', defer: 0, requeue: true }); | ||
if (onFinalError && (0, subscription_retry_1.isFinalRetry)(subscription, retryCount)) { | ||
try { | ||
const errorStrategy = await onFinalError(err, message, log); | ||
const onFinalErrorHandledMessage = `Message ${amqpMessage.properties.messageId} has been successfully handled with "onFinalError" handler. Will ${errorStrategy} the message`; | ||
log.info(logContext, onFinalErrorHandledMessage); | ||
ackOrNack(err, { strategy: errorStrategy }); | ||
} | ||
else { | ||
const errorMessage = `Error while reading message from ${subscriptionName}. Error "${err.message}" occurred while processing message.`; | ||
if (subscription.recovery) { | ||
const currentAttempts = message.properties.headers.rascal?.recovery | ||
? message.properties.headers.rascal?.recovery?.[subscription.queue ?? ''].republished ?? | ||
subscription.recovery.attempts | ||
: 0; | ||
if (currentAttempts === subscription.recovery.attempts) { | ||
log.error({ err, content }, `${errorMessage} Will nack message and send to dead letter queue`); | ||
} | ||
else { | ||
log.warn({ err, content }, `${errorMessage} Will try to recover message with attempt: ${currentAttempts}`); | ||
} | ||
ackOrNack(err, [subscription.recovery, { strategy: 'nack' }]); | ||
} | ||
else { | ||
log.error({ err, content }, `${errorMessage} Will nack message and send to dead letter queue`); | ||
ackOrNack(err, { strategy: 'nack' }); | ||
} | ||
catch (finalErr) { | ||
const finalError = (0, js_utils_1.throwableToError)(finalErr); | ||
const failedOnFinalErrorMessage = `${errorMessage} Then final error handler got "${finalError.message}" in the last retry and can't handle the error. Will nack message and send to dead letter queue`; | ||
log.error({ err: (0, js_utils_1.throwableToError)(finalError), originalError: err, content }, failedOnFinalErrorMessage); | ||
ackOrNack(err, { strategy: 'nack' }); | ||
} | ||
return; | ||
} | ||
}); | ||
subscriptionResult.on('error', (err) => { | ||
if (subscription.recovery) { | ||
const messageRecoveredMessage = `${errorMessage} Will recover message and retry to same queue ${JSON.stringify(subscription.recovery)}`; | ||
log.warn(logContext, messageRecoveredMessage); | ||
ackOrNack(err, [...subscription.recovery, { strategy: 'nack' }]); | ||
return; | ||
} | ||
log.error(logContext, `${errorMessage}. Will nack message and send to dead letter queue`); | ||
ackOrNack(err, { strategy: 'nack' }); | ||
} | ||
}; | ||
exports.handleMessage = handleMessage; | ||
const subscribe = async (subscription) => { | ||
const { vHostConfig, log } = subscription; | ||
const subscriptionConfig = (0, config_1.getSubscriptionConfig)(vHostConfig); | ||
if (subscriptionConfig.options?.arguments?.disabled) { | ||
log.warn(`Subscription on queue: ${subscriptionConfig.queue} disabled via config.`); | ||
return; | ||
} | ||
const subscriptionName = subscriptionConfig.name; | ||
const broker = (0, broker_1.getBroker)(); | ||
const subscriberSession = await broker.subscribe(subscriptionName, subscription); | ||
subscriberSession.on('message', async (message, rawContent, ackOrNack) => (0, exports.handleMessage)(subscription, subscriberSession, message, rawContent, ackOrNack)); | ||
subscriberSession.on('error', (err) => { | ||
log.error({ err }, `Subscription error ${(0, js_utils_1.throwableToError)(err).message}`); | ||
}); | ||
subscriptionResult.on('invalid_content', (err, message, ackOrNack) => { | ||
subscriberSession.on('invalid_content', (err, message, ackOrNack) => { | ||
const errorMessage = `${(0, js_utils_1.throwableToError)(err).message}`; | ||
@@ -101,3 +129,3 @@ const content = { err, message }; | ||
}); | ||
subscriptionResult.on('redeliveries_exceeded', (err, message, ackOrNack) => { | ||
subscriberSession.on('redeliveries_exceeded', (err, message, ackOrNack) => { | ||
const errorMessage = `${(0, js_utils_1.throwableToError)(err).message}`; | ||
@@ -104,0 +132,0 @@ const content = { err, message }; |
{ | ||
"name": "@axah/mq", | ||
"version": "0.0.0-20230913120345", | ||
"version": "0.0.0-20231106083335", | ||
"description": "", | ||
@@ -15,3 +15,3 @@ "main": "lib/index.js", | ||
"@axah/js-utils": "^1.1.0", | ||
"@types/async-retry": "^1.4.5", | ||
"@types/async-retry": "^1.4.6", | ||
"amqplib": "^0.10.3", | ||
@@ -24,3 +24,3 @@ "async-retry": "^1.3.3", | ||
"uuid": "^9.0.1", | ||
"zod": "^3.22.2" | ||
"zod": "^3.22.4" | ||
}, | ||
@@ -31,14 +31,14 @@ "devDependencies": { | ||
"@changesets/cli": "^2.26.2", | ||
"@types/amqplib": "^0.10.1", | ||
"@types/jest": "^29.5.4", | ||
"@types/amqplib": "^0.10.2", | ||
"@types/jest": "^29.5.5", | ||
"@types/lodash.merge": "^4.6.7", | ||
"@types/lodash.mergewith": "^4.6.7", | ||
"@types/node": "^20.6.0", | ||
"@types/rascal": "^10.0.6", | ||
"@types/uuid": "^9.0.3", | ||
"eslint": "^8.49.0", | ||
"@types/node": "^20.8.2", | ||
"@types/rascal": "^10.0.7", | ||
"@types/uuid": "^9.0.4", | ||
"eslint": "^8.50.0", | ||
"jest": "^29.7.0", | ||
"pino": "^8.15.1", | ||
"pino": "^8.15.6", | ||
"prettier": "^3.0.3", | ||
"rimraf": "^5.0.1", | ||
"rimraf": "^5.0.5", | ||
"ts-jest": "^29.1.1", | ||
@@ -45,0 +45,0 @@ "ts-node": "^10.9.1", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
64092
29
727
Updated@types/async-retry@^1.4.6
Updatedzod@^3.22.4