@node-ts/bus-core
Advanced tools
Comparing version 0.2.1 to 0.2.2
@@ -7,2 +7,3 @@ "use strict"; | ||
const logger_core_1 = require("@node-ts/logger-core"); | ||
const serializeError = require("serialize-error"); | ||
/** | ||
@@ -83,3 +84,3 @@ * An internal singleton that contains all registrations of messages to functions that handle | ||
resolveHandler: (container) => { | ||
this.logger.debug(`Resolving ${messageName}`); | ||
this.logger.debug(`Resolving handlers for ${messageName}`); | ||
try { | ||
@@ -89,3 +90,6 @@ return container.get(h.symbol); | ||
catch (error) { | ||
this.logger.error('Could not get handlers for message from the IoC container.', { messageName, error }); | ||
this.logger.error('Could not resolve handler from the IoC container.', { | ||
messageName, | ||
error: serializeError(error) | ||
}); | ||
throw error; | ||
@@ -92,0 +96,0 @@ } |
@@ -11,2 +11,3 @@ "use strict"; | ||
const handler_1 = require("../handler"); | ||
const serializeError = require("serialize-error"); | ||
const EMPTY_QUEUE_SLEEP_MS = 500; | ||
@@ -72,4 +73,5 @@ let ServiceBus = class ServiceBus { | ||
catch (error) { | ||
this.logger.warn('Message was unsuccessfully handled. Returning to queue', { message, error }); | ||
this.logger.warn('Message was unsuccessfully handled. Returning to queue', { message, error: serializeError(error) }); | ||
await this.transport.returnMessage(message); | ||
return false; | ||
} | ||
@@ -80,3 +82,3 @@ return true; | ||
catch (error) { | ||
this.logger.error('Failed to receive message from transport', { error }); | ||
this.logger.error('Failed to receive message from transport', { error: serializeError(error) }); | ||
} | ||
@@ -83,0 +85,0 @@ return false; |
@@ -6,4 +6,15 @@ import { Transport } from './transport'; | ||
import { HandlerRegistry } from '../handler'; | ||
export declare const RETRY_LIMIT = 10; | ||
export interface InMemoryMessage { | ||
/** | ||
* If the message is currently being handled and not visible to other consumers | ||
*/ | ||
inFlight: boolean; | ||
/** | ||
* The number of times the message has been fetched from the queue | ||
*/ | ||
seenCount: number; | ||
/** | ||
* The body of the message that was sent by the consumer | ||
*/ | ||
payload: Message; | ||
@@ -21,2 +32,3 @@ } | ||
private queue; | ||
private deadLetterQueue; | ||
private messagesWithHandlers; | ||
@@ -32,3 +44,5 @@ constructor(logger: Logger); | ||
readonly depth: number; | ||
readonly deadLetterQueueDepth: number; | ||
private sendToDeadLetterQueue; | ||
private addToQueue; | ||
} |
@@ -6,2 +6,3 @@ "use strict"; | ||
const logger_core_1 = require("@node-ts/logger-core"); | ||
exports.RETRY_LIMIT = 10; | ||
/** | ||
@@ -18,2 +19,3 @@ * An in-memory message queue. This isn't intended for production use as all messages | ||
this.queue = []; | ||
this.deadLetterQueue = []; | ||
} | ||
@@ -54,3 +56,11 @@ async initialize(handlerRegistry) { | ||
async returnMessage(message) { | ||
message.raw.inFlight = false; | ||
message.raw.seenCount++; | ||
if (message.raw.seenCount >= exports.RETRY_LIMIT) { | ||
// Message retries exhausted, send to DLQ | ||
this.logger.info('Message retry limit exceeded, sending to dead letter queue', { message }); | ||
await this.sendToDeadLetterQueue(message); | ||
} | ||
else { | ||
message.raw.inFlight = false; | ||
} | ||
} | ||
@@ -60,2 +70,9 @@ get depth() { | ||
} | ||
get deadLetterQueueDepth() { | ||
return this.deadLetterQueue.length; | ||
} | ||
async sendToDeadLetterQueue(message) { | ||
this.deadLetterQueue.push(message); | ||
await this.deleteMessage(message); | ||
} | ||
addToQueue(message) { | ||
@@ -83,2 +100,3 @@ if (this.messagesWithHandlers[message.$name]) { | ||
raw: { | ||
seenCount: 0, | ||
payload: message, | ||
@@ -85,0 +103,0 @@ inFlight: isProcessing |
{ | ||
"name": "@node-ts/bus-core", | ||
"version": "0.2.1", | ||
"version": "0.2.2", | ||
"description": "A service bus for message-based, distributed node applications", | ||
@@ -22,2 +22,3 @@ "main": "./dist/index.js", | ||
"reflect-metadata": "^0.1.13", | ||
"serialize-error": "^4.1.0", | ||
"tslib": "^1.9.3" | ||
@@ -49,3 +50,3 @@ }, | ||
], | ||
"gitHead": "d89967a0a88deeccd51784e18aadf879685540f9" | ||
"gitHead": "bacefae4e4f09c32a6337f6072910121a282ed21" | ||
} |
@@ -6,2 +6,3 @@ import { Message } from '@node-ts/bus-messages' | ||
import { LOGGER_SYMBOLS, Logger } from '@node-ts/logger-core' | ||
import * as serializeError from 'serialize-error' | ||
@@ -127,7 +128,13 @@ type HandlerType = ClassConstructor<Handler<Message>> | ((context: interfaces.Context) => Handler<Message>) | ||
resolveHandler: (container: Container) => { | ||
this.logger.debug(`Resolving ${messageName}`) | ||
this.logger.debug(`Resolving handlers for ${messageName}`) | ||
try { | ||
return container.get<Handler<MessageType>>(h.symbol) | ||
} catch (error) { | ||
this.logger.error('Could not get handlers for message from the IoC container.', { messageName, error }) | ||
this.logger.error( | ||
'Could not resolve handler from the IoC container.', | ||
{ | ||
messageName, | ||
error: serializeError(error) | ||
} | ||
) | ||
throw error | ||
@@ -134,0 +141,0 @@ } |
@@ -10,2 +10,3 @@ import { injectable, inject } from 'inversify' | ||
import { HandlerRegistry } from '../handler' | ||
import * as serializeError from 'serialize-error' | ||
@@ -89,4 +90,8 @@ const EMPTY_QUEUE_SLEEP_MS = 500 | ||
} catch (error) { | ||
this.logger.warn('Message was unsuccessfully handled. Returning to queue', { message, error }) | ||
this.logger.warn( | ||
'Message was unsuccessfully handled. Returning to queue', | ||
{ message, error: serializeError(error) } | ||
) | ||
await this.transport.returnMessage(message) | ||
return false | ||
} | ||
@@ -96,3 +101,3 @@ return true | ||
} catch (error) { | ||
this.logger.error('Failed to receive message from transport', { error }) | ||
this.logger.error('Failed to receive message from transport', { error: serializeError(error) }) | ||
} | ||
@@ -99,0 +104,0 @@ return false |
@@ -1,2 +0,2 @@ | ||
import { MemoryQueue, InMemoryMessage } from './memory-queue' | ||
import { MemoryQueue, InMemoryMessage, RETRY_LIMIT } from './memory-queue' | ||
import { TestCommand, TestEvent, TestCommand2 } from '../test' | ||
@@ -62,2 +62,8 @@ import { TransportMessage } from '../transport' | ||
it('should read new messages with seenCount equal to 1', async () => { | ||
await sut.publish(event) | ||
const message = await sut.readNextMessage() | ||
expect(message!.raw.seenCount).toEqual(0) | ||
}) | ||
it('should return the oldest message when there are many', async () => { | ||
@@ -102,3 +108,27 @@ await sut.publish(event) | ||
}) | ||
it('should increment the seenCount', async () => { | ||
await sut.returnMessage(message!) | ||
expect(message!.raw.seenCount).toEqual(1) | ||
}) | ||
}) | ||
describe('when retrying a message has been retried beyond the retry limit', () => { | ||
let message: TransportMessage<InMemoryMessage> | undefined | ||
beforeEach(async () => { | ||
await sut.publish(event) | ||
let attempt = 0 | ||
while (attempt < RETRY_LIMIT) { | ||
// Retry to the limit | ||
message = await sut.readNextMessage() | ||
await sut.returnMessage(message!) | ||
attempt++ | ||
} | ||
}) | ||
it('should send the message to the dead letter queue', () => { | ||
expect(sut.deadLetterQueueDepth).toEqual(1) | ||
}) | ||
}) | ||
}) |
@@ -8,4 +8,18 @@ import { injectable, inject } from 'inversify' | ||
export const RETRY_LIMIT = 10 | ||
export interface InMemoryMessage { | ||
/** | ||
* If the message is currently being handled and not visible to other consumers | ||
*/ | ||
inFlight: boolean | ||
/** | ||
* The number of times the message has been fetched from the queue | ||
*/ | ||
seenCount: number | ||
/** | ||
* The body of the message that was sent by the consumer | ||
*/ | ||
payload: Message | ||
@@ -25,2 +39,3 @@ } | ||
private queue: TransportMessage<InMemoryMessage>[] = [] | ||
private deadLetterQueue: TransportMessage<InMemoryMessage>[] = [] | ||
private messagesWithHandlers: { [key: string]: {} } | ||
@@ -75,3 +90,11 @@ | ||
async returnMessage (message: TransportMessage<InMemoryMessage>): Promise<void> { | ||
message.raw.inFlight = false | ||
message.raw.seenCount++ | ||
if (message.raw.seenCount >= RETRY_LIMIT) { | ||
// Message retries exhausted, send to DLQ | ||
this.logger.info('Message retry limit exceeded, sending to dead letter queue', { message }) | ||
await this.sendToDeadLetterQueue(message) | ||
} else { | ||
message.raw.inFlight = false | ||
} | ||
} | ||
@@ -83,2 +106,11 @@ | ||
get deadLetterQueueDepth (): number { | ||
return this.deadLetterQueue.length | ||
} | ||
private async sendToDeadLetterQueue (message: TransportMessage<InMemoryMessage>): Promise<void> { | ||
this.deadLetterQueue.push(message) | ||
await this.deleteMessage(message) | ||
} | ||
private addToQueue (message: Message): void { | ||
@@ -100,2 +132,3 @@ if (this.messagesWithHandlers[message.$name]) { | ||
raw: { | ||
seenCount: 0, | ||
payload: message, | ||
@@ -102,0 +135,0 @@ inFlight: isProcessing |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
109310
2074
10
+ Addedserialize-error@^4.1.0
+ Addedserialize-error@4.1.0(transitive)
+ Addedtype-fest@0.3.1(transitive)