Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@node-ts/bus-core

Package Overview
Dependencies
Maintainers
1
Versions
96
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@node-ts/bus-core - npm Package Compare versions

Comparing version 0.2.1 to 0.2.2

8

dist/handler/handler-registry.js

@@ -7,2 +7,3 @@ "use strict";

const logger_core_1 = require("@node-ts/logger-core");
const serializeError = require("serialize-error");
/**

@@ -83,3 +84,3 @@ * An internal singleton that contains all registrations of messages to functions that handle

resolveHandler: (container) => {
this.logger.debug(`Resolving ${messageName}`);
this.logger.debug(`Resolving handlers for ${messageName}`);
try {

@@ -89,3 +90,6 @@ return container.get(h.symbol);

catch (error) {
this.logger.error('Could not get handlers for message from the IoC container.', { messageName, error });
this.logger.error('Could not resolve handler from the IoC container.', {
messageName,
error: serializeError(error)
});
throw error;

@@ -92,0 +96,0 @@ }

@@ -11,2 +11,3 @@ "use strict";

const handler_1 = require("../handler");
const serializeError = require("serialize-error");
const EMPTY_QUEUE_SLEEP_MS = 500;

@@ -72,4 +73,5 @@ let ServiceBus = class ServiceBus {

catch (error) {
this.logger.warn('Message was unsuccessfully handled. Returning to queue', { message, error });
this.logger.warn('Message was unsuccessfully handled. Returning to queue', { message, error: serializeError(error) });
await this.transport.returnMessage(message);
return false;
}

@@ -80,3 +82,3 @@ return true;

catch (error) {
this.logger.error('Failed to receive message from transport', { error });
this.logger.error('Failed to receive message from transport', { error: serializeError(error) });
}

@@ -83,0 +85,0 @@ return false;

@@ -6,4 +6,15 @@ import { Transport } from './transport';

import { HandlerRegistry } from '../handler';
export declare const RETRY_LIMIT = 10;
export interface InMemoryMessage {
/**
* If the message is currently being handled and not visible to other consumers
*/
inFlight: boolean;
/**
* The number of times the message has been fetched from the queue
*/
seenCount: number;
/**
* The body of the message that was sent by the consumer
*/
payload: Message;

@@ -21,2 +32,3 @@ }

private queue;
private deadLetterQueue;
private messagesWithHandlers;

@@ -32,3 +44,5 @@ constructor(logger: Logger);

readonly depth: number;
readonly deadLetterQueueDepth: number;
private sendToDeadLetterQueue;
private addToQueue;
}

@@ -6,2 +6,3 @@ "use strict";

const logger_core_1 = require("@node-ts/logger-core");
exports.RETRY_LIMIT = 10;
/**

@@ -18,2 +19,3 @@ * An in-memory message queue. This isn't intended for production use as all messages

this.queue = [];
this.deadLetterQueue = [];
}

@@ -54,3 +56,11 @@ async initialize(handlerRegistry) {

async returnMessage(message) {
message.raw.inFlight = false;
message.raw.seenCount++;
if (message.raw.seenCount >= exports.RETRY_LIMIT) {
// Message retries exhausted, send to DLQ
this.logger.info('Message retry limit exceeded, sending to dead letter queue', { message });
await this.sendToDeadLetterQueue(message);
}
else {
message.raw.inFlight = false;
}
}

@@ -60,2 +70,9 @@ get depth() {

}
get deadLetterQueueDepth() {
return this.deadLetterQueue.length;
}
async sendToDeadLetterQueue(message) {
this.deadLetterQueue.push(message);
await this.deleteMessage(message);
}
addToQueue(message) {

@@ -83,2 +100,3 @@ if (this.messagesWithHandlers[message.$name]) {

raw: {
seenCount: 0,
payload: message,

@@ -85,0 +103,0 @@ inFlight: isProcessing

{
"name": "@node-ts/bus-core",
"version": "0.2.1",
"version": "0.2.2",
"description": "A service bus for message-based, distributed node applications",

@@ -22,2 +22,3 @@ "main": "./dist/index.js",

"reflect-metadata": "^0.1.13",
"serialize-error": "^4.1.0",
"tslib": "^1.9.3"

@@ -49,3 +50,3 @@ },

],
"gitHead": "d89967a0a88deeccd51784e18aadf879685540f9"
"gitHead": "bacefae4e4f09c32a6337f6072910121a282ed21"
}

@@ -6,2 +6,3 @@ import { Message } from '@node-ts/bus-messages'

import { LOGGER_SYMBOLS, Logger } from '@node-ts/logger-core'
import * as serializeError from 'serialize-error'

@@ -127,7 +128,13 @@ type HandlerType = ClassConstructor<Handler<Message>> | ((context: interfaces.Context) => Handler<Message>)

resolveHandler: (container: Container) => {
this.logger.debug(`Resolving ${messageName}`)
this.logger.debug(`Resolving handlers for ${messageName}`)
try {
return container.get<Handler<MessageType>>(h.symbol)
} catch (error) {
this.logger.error('Could not get handlers for message from the IoC container.', { messageName, error })
this.logger.error(
'Could not resolve handler from the IoC container.',
{
messageName,
error: serializeError(error)
}
)
throw error

@@ -134,0 +141,0 @@ }

@@ -10,2 +10,3 @@ import { injectable, inject } from 'inversify'

import { HandlerRegistry } from '../handler'
import * as serializeError from 'serialize-error'

@@ -89,4 +90,8 @@ const EMPTY_QUEUE_SLEEP_MS = 500

} catch (error) {
this.logger.warn('Message was unsuccessfully handled. Returning to queue', { message, error })
this.logger.warn(
'Message was unsuccessfully handled. Returning to queue',
{ message, error: serializeError(error) }
)
await this.transport.returnMessage(message)
return false
}

@@ -96,3 +101,3 @@ return true

} catch (error) {
this.logger.error('Failed to receive message from transport', { error })
this.logger.error('Failed to receive message from transport', { error: serializeError(error) })
}

@@ -99,0 +104,0 @@ return false

@@ -1,2 +0,2 @@

import { MemoryQueue, InMemoryMessage } from './memory-queue'
import { MemoryQueue, InMemoryMessage, RETRY_LIMIT } from './memory-queue'
import { TestCommand, TestEvent, TestCommand2 } from '../test'

@@ -62,2 +62,8 @@ import { TransportMessage } from '../transport'

it('should read new messages with seenCount equal to 1', async () => {
await sut.publish(event)
const message = await sut.readNextMessage()
expect(message!.raw.seenCount).toEqual(0)
})
it('should return the oldest message when there are many', async () => {

@@ -102,3 +108,27 @@ await sut.publish(event)

})
it('should increment the seenCount', async () => {
await sut.returnMessage(message!)
expect(message!.raw.seenCount).toEqual(1)
})
})
describe('when retrying a message has been retried beyond the retry limit', () => {
let message: TransportMessage<InMemoryMessage> | undefined
beforeEach(async () => {
await sut.publish(event)
let attempt = 0
while (attempt < RETRY_LIMIT) {
// Retry to the limit
message = await sut.readNextMessage()
await sut.returnMessage(message!)
attempt++
}
})
it('should send the message to the dead letter queue', () => {
expect(sut.deadLetterQueueDepth).toEqual(1)
})
})
})

@@ -8,4 +8,18 @@ import { injectable, inject } from 'inversify'

export const RETRY_LIMIT = 10
export interface InMemoryMessage {
/**
* If the message is currently being handled and not visible to other consumers
*/
inFlight: boolean
/**
* The number of times the message has been fetched from the queue
*/
seenCount: number
/**
* The body of the message that was sent by the consumer
*/
payload: Message

@@ -25,2 +39,3 @@ }

private queue: TransportMessage<InMemoryMessage>[] = []
private deadLetterQueue: TransportMessage<InMemoryMessage>[] = []
private messagesWithHandlers: { [key: string]: {} }

@@ -75,3 +90,11 @@

async returnMessage (message: TransportMessage<InMemoryMessage>): Promise<void> {
message.raw.inFlight = false
message.raw.seenCount++
if (message.raw.seenCount >= RETRY_LIMIT) {
// Message retries exhausted, send to DLQ
this.logger.info('Message retry limit exceeded, sending to dead letter queue', { message })
await this.sendToDeadLetterQueue(message)
} else {
message.raw.inFlight = false
}
}

@@ -83,2 +106,11 @@

get deadLetterQueueDepth (): number {
return this.deadLetterQueue.length
}
private async sendToDeadLetterQueue (message: TransportMessage<InMemoryMessage>): Promise<void> {
this.deadLetterQueue.push(message)
await this.deleteMessage(message)
}
private addToQueue (message: Message): void {

@@ -100,2 +132,3 @@ if (this.messagesWithHandlers[message.$name]) {

raw: {
seenCount: 0,
payload: message,

@@ -102,0 +135,0 @@ inFlight: isProcessing

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc