Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@node-ts/bus-core

Package Overview
Dependencies
Maintainers
1
Versions
96
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@node-ts/bus-core - npm Package Compare versions

Comparing version 1.1.10 to 1.1.11

34

dist/service-bus/bus-instance.d.ts

@@ -16,2 +16,10 @@ import { Transport, TransportMessage } from '../transport';

}
export interface AfterSend {
command: Command;
attributes?: MessageAttributes;
}
export interface AfterPublish {
event: Event;
attributes?: MessageAttributes;
}
export interface OnError<TTransportMessage> {

@@ -44,7 +52,33 @@ message: Message;

private readonly sendOnly;
/**
* Emitted before a command is sent to the transport
*/
readonly beforeSend: TypedEmitter<BeforeSend>;
/**
* Emitted before an event is published to the transport
*/
readonly beforePublish: TypedEmitter<BeforePublish>;
/**
* Emitted after a command has been sent to the transport
*/
readonly afterSend: TypedEmitter<AfterSend>;
/**
* Emitted after an event has been published to the transport
*/
readonly afterPublish: TypedEmitter<AfterPublish>;
/**
* Emitted when an error occurs during message handling
*/
readonly onError: TypedEmitter<OnError<TTransportMessage>>;
/**
* Emitted immediately after a message has been received from the transport
*/
readonly afterReceive: TypedEmitter<AfterReceive<TTransportMessage>>;
/**
* Emitted before a message is dispatched to handlers
*/
readonly beforeDispatch: TypedEmitter<BeforeDispatch>;
/**
* Emitted after a message has been dispatched and completed all handler invocations
*/
readonly afterDispatch: TypedEmitter<AfterDispatch>;

@@ -51,0 +85,0 @@ private internalState;

55

dist/service-bus/bus-instance.js

@@ -25,7 +25,33 @@ "use strict";

this.sendOnly = sendOnly;
/**
* Emitted before a command is sent to the transport
*/
this.beforeSend = new util_1.TypedEmitter();
/**
* Emitted before an event is published to the transport
*/
this.beforePublish = new util_1.TypedEmitter();
/**
* Emitted after a command has been sent to the transport
*/
this.afterSend = new util_1.TypedEmitter();
/**
* Emitted after an event has been published to the transport
*/
this.afterPublish = new util_1.TypedEmitter();
/**
* Emitted when an error occurs during message handling
*/
this.onError = new util_1.TypedEmitter();
/**
* Emitted immediately after a message has been received from the transport
*/
this.afterReceive = new util_1.TypedEmitter();
/**
* Emitted before a message is dispatched to handlers
*/
this.beforeDispatch = new util_1.TypedEmitter();
/**
* Emitted after a message has been dispatched and completed all handler invocations
*/
this.afterDispatch = new util_1.TypedEmitter();

@@ -95,3 +121,3 @@ this.internalState = bus_state_1.BusState.Stopped;

const outbox = this.outbox.get('outbox');
outbox.push(() => tslib_1.__awaiter(this, void 0, void 0, function* () { return this.transport.publish(event, attributes); }));
outbox.push({ event, attributes });
this.outbox.set('outbox', outbox);

@@ -117,3 +143,3 @@ }

const outbox = this.outbox.get('outbox');
outbox.push(() => tslib_1.__awaiter(this, void 0, void 0, function* () { return this.transport.send(command, attributes); }));
outbox.push({ command, attributes });
this.outbox.set('outbox', outbox);

@@ -354,4 +380,25 @@ }

const outboxedMessages = this.outbox.get('outbox');
if (outboxedMessages) {
yield Promise.all(outboxedMessages.map(dispatchOutboxedMessage => dispatchOutboxedMessage()));
if (outboxedMessages && outboxedMessages.length > 0) {
// In case of a large number of messages to send, use a worker pool to dispatch so that we don't blow out heap usage
const dispatchWorkerCount = Math.min(outboxedMessages.length, 10);
const workers = new Array(dispatchWorkerCount)
.fill(undefined)
.map(() => tslib_1.__awaiter(this, void 0, void 0, function* () {
while (true) {
const messageToSend = outboxedMessages.shift();
if (messageToSend === undefined) {
break;
}
const { command, event, attributes } = messageToSend;
if (command) {
yield this.transport.send(command, attributes);
this.afterSend.emit({ command, attributes });
}
else if (event) {
yield this.transport.publish(event, attributes);
this.afterPublish.emit({ event, attributes });
}
}
}));
yield Promise.all(workers);
}

@@ -358,0 +405,0 @@ }));

2

package.json
{
"name": "@node-ts/bus-core",
"version": "1.1.10",
"version": "1.1.11",
"description": "A service bus for message-based, distributed node applications",

@@ -5,0 +5,0 @@ "main": "./dist/index.js",

@@ -12,2 +12,4 @@ import { Mock, Times } from 'typemoq'

jest.setTimeout(20_000)
describe('BusInstance Outboxing', () => {

@@ -39,2 +41,43 @@ describe('when a message is sent from outside of a handler', () => {

})
describe('when a large number of messages are sent from inside a handler', () => {
let bus: BusInstance
beforeAll(async () => {
const numberOfMessages = 20_000
bus = Bus.configure()
.withHandler(
handlerFor(TestCommand, async () => {
const publishMessages = new Array(numberOfMessages)
.fill(undefined)
.map(async () => bus.publish(new TestEvent()))
await Promise.all(publishMessages)
})
)
.build()
let messagesPublishedCount = 0
const messagesPublished = new Promise<void>(resolve => {
bus.afterPublish.on(() => {
if (++messagesPublishedCount === numberOfMessages) {
resolve()
}
})
})
await bus.initialize()
await bus.start()
await bus.send(new TestCommand())
await messagesPublished
})
afterAll(async () => {
await bus.dispose()
})
it('should dispatch all messages without exhausting the heap', () => {
// If code execution reaches this point, the test has passed and not run out of memory
})
})
describe('when a message is sent from two handlers, and one fails', () => {

@@ -88,3 +131,2 @@ let bus: BusInstance

})
describe('when a message is sent in a workflow handler, that fails to persist', () => {

@@ -91,0 +133,0 @@ let bus: BusInstance

@@ -51,2 +51,12 @@ import { Transport, TransportMessage } from '../transport'

export interface AfterSend {
command: Command
attributes?: MessageAttributes
}
export interface AfterPublish {
event: Event
attributes?: MessageAttributes
}
export interface OnError<TTransportMessage> {

@@ -74,7 +84,33 @@ message: Message

export class BusInstance<TTransportMessage = {}> {
/**
* Emitted before a command is sent to the transport
*/
readonly beforeSend = new TypedEmitter<BeforeSend>()
/**
* Emitted before an event is published to the transport
*/
readonly beforePublish = new TypedEmitter<BeforePublish>()
/**
* Emitted after a command has been sent to the transport
*/
readonly afterSend = new TypedEmitter<AfterSend>()
/**
* Emitted after an event has been published to the transport
*/
readonly afterPublish = new TypedEmitter<AfterPublish>()
/**
* Emitted when an error occurs during message handling
*/
readonly onError = new TypedEmitter<OnError<TTransportMessage>>()
/**
* Emitted immediately after a message has been received from the transport
*/
readonly afterReceive = new TypedEmitter<AfterReceive<TTransportMessage>>()
/**
* Emitted before a message is dispatched to handlers
*/
readonly beforeDispatch = new TypedEmitter<BeforeDispatch>()
/**
* Emitted after a message has been dispatched and completed all handler invocations
*/
readonly afterDispatch = new TypedEmitter<AfterDispatch>()

@@ -86,3 +122,5 @@

private isInitialized = false
private outbox: ALS<(() => Promise<void>)[]> = new ALS()
private outbox: ALS<
{ command?: Command; event?: Event; attributes?: MessageAttributes }[]
> = new ALS()

@@ -162,3 +200,3 @@ constructor(

const outbox = this.outbox.get('outbox')!
outbox.push(async () => this.transport.publish(event, attributes))
outbox.push({ event, attributes })
this.outbox.set('outbox', outbox)

@@ -186,3 +224,3 @@ } else {

const outbox = this.outbox.get('outbox')!
outbox.push(async () => this.transport.send(command, attributes))
outbox.push({ command, attributes })
this.outbox.set('outbox', outbox)

@@ -475,8 +513,26 @@ } else {

const outboxedMessages = this.outbox.get('outbox')
if (outboxedMessages) {
await Promise.all(
outboxedMessages.map(dispatchOutboxedMessage =>
dispatchOutboxedMessage()
)
)
if (outboxedMessages && outboxedMessages.length > 0) {
// In case of a large number of messages to send, use a worker pool to dispatch so that we don't blow out heap usage
const dispatchWorkerCount = Math.min(outboxedMessages.length, 10)
const workers = new Array(dispatchWorkerCount)
.fill(undefined)
.map(async () => {
while (true) {
const messageToSend = outboxedMessages.shift()
if (messageToSend === undefined) {
break
}
const { command, event, attributes } = messageToSend
if (command) {
await this.transport.send(command, attributes)
this.afterSend.emit({ command, attributes })
} else if (event) {
await this.transport.publish(event, attributes)
this.afterPublish.emit({ event, attributes })
}
}
})
await Promise.all(workers)
}

@@ -483,0 +539,0 @@ })

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc