@node-ts/bus-core
Advanced tools
Comparing version 1.1.10 to 1.1.11
@@ -16,2 +16,10 @@ import { Transport, TransportMessage } from '../transport'; | ||
} | ||
export interface AfterSend { | ||
command: Command; | ||
attributes?: MessageAttributes; | ||
} | ||
export interface AfterPublish { | ||
event: Event; | ||
attributes?: MessageAttributes; | ||
} | ||
export interface OnError<TTransportMessage> { | ||
@@ -44,7 +52,33 @@ message: Message; | ||
private readonly sendOnly; | ||
/** | ||
* Emitted before a command is sent to the transport | ||
*/ | ||
readonly beforeSend: TypedEmitter<BeforeSend>; | ||
/** | ||
* Emitted before an event is published to the transport | ||
*/ | ||
readonly beforePublish: TypedEmitter<BeforePublish>; | ||
/** | ||
* Emitted after a command has been sent to the transport | ||
*/ | ||
readonly afterSend: TypedEmitter<AfterSend>; | ||
/** | ||
* Emitted after an event has been published to the transport | ||
*/ | ||
readonly afterPublish: TypedEmitter<AfterPublish>; | ||
/** | ||
* Emitted when an error occurs during message handling | ||
*/ | ||
readonly onError: TypedEmitter<OnError<TTransportMessage>>; | ||
/** | ||
* Emitted immediately after a message has been received from the transport | ||
*/ | ||
readonly afterReceive: TypedEmitter<AfterReceive<TTransportMessage>>; | ||
/** | ||
* Emitted before a message is dispatched to handlers | ||
*/ | ||
readonly beforeDispatch: TypedEmitter<BeforeDispatch>; | ||
/** | ||
* Emitted after a message has been dispatched and completed all handler invocations | ||
*/ | ||
readonly afterDispatch: TypedEmitter<AfterDispatch>; | ||
@@ -51,0 +85,0 @@ private internalState; |
@@ -25,7 +25,33 @@ "use strict"; | ||
this.sendOnly = sendOnly; | ||
/** | ||
* Emitted before a command is sent to the transport | ||
*/ | ||
this.beforeSend = new util_1.TypedEmitter(); | ||
/** | ||
* Emitted before an event is published to the transport | ||
*/ | ||
this.beforePublish = new util_1.TypedEmitter(); | ||
/** | ||
* Emitted after a command has been sent to the transport | ||
*/ | ||
this.afterSend = new util_1.TypedEmitter(); | ||
/** | ||
* Emitted after an event has been published to the transport | ||
*/ | ||
this.afterPublish = new util_1.TypedEmitter(); | ||
/** | ||
* Emitted when an error occurs during message handling | ||
*/ | ||
this.onError = new util_1.TypedEmitter(); | ||
/** | ||
* Emitted immediately after a message has been received from the transport | ||
*/ | ||
this.afterReceive = new util_1.TypedEmitter(); | ||
/** | ||
* Emitted before a message is dispatched to handlers | ||
*/ | ||
this.beforeDispatch = new util_1.TypedEmitter(); | ||
/** | ||
* Emitted after a message has been dispatched and completed all handler invocations | ||
*/ | ||
this.afterDispatch = new util_1.TypedEmitter(); | ||
@@ -95,3 +121,3 @@ this.internalState = bus_state_1.BusState.Stopped; | ||
const outbox = this.outbox.get('outbox'); | ||
outbox.push(() => tslib_1.__awaiter(this, void 0, void 0, function* () { return this.transport.publish(event, attributes); })); | ||
outbox.push({ event, attributes }); | ||
this.outbox.set('outbox', outbox); | ||
@@ -117,3 +143,3 @@ } | ||
const outbox = this.outbox.get('outbox'); | ||
outbox.push(() => tslib_1.__awaiter(this, void 0, void 0, function* () { return this.transport.send(command, attributes); })); | ||
outbox.push({ command, attributes }); | ||
this.outbox.set('outbox', outbox); | ||
@@ -354,4 +380,25 @@ } | ||
const outboxedMessages = this.outbox.get('outbox'); | ||
if (outboxedMessages) { | ||
yield Promise.all(outboxedMessages.map(dispatchOutboxedMessage => dispatchOutboxedMessage())); | ||
if (outboxedMessages && outboxedMessages.length > 0) { | ||
// In case of a large number of messages to send, use a worker pool to dispatch so that we don't blow out heap usage | ||
const dispatchWorkerCount = Math.min(outboxedMessages.length, 10); | ||
const workers = new Array(dispatchWorkerCount) | ||
.fill(undefined) | ||
.map(() => tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
while (true) { | ||
const messageToSend = outboxedMessages.shift(); | ||
if (messageToSend === undefined) { | ||
break; | ||
} | ||
const { command, event, attributes } = messageToSend; | ||
if (command) { | ||
yield this.transport.send(command, attributes); | ||
this.afterSend.emit({ command, attributes }); | ||
} | ||
else if (event) { | ||
yield this.transport.publish(event, attributes); | ||
this.afterPublish.emit({ event, attributes }); | ||
} | ||
} | ||
})); | ||
yield Promise.all(workers); | ||
} | ||
@@ -358,0 +405,0 @@ })); |
{ | ||
"name": "@node-ts/bus-core", | ||
"version": "1.1.10", | ||
"version": "1.1.11", | ||
"description": "A service bus for message-based, distributed node applications", | ||
@@ -5,0 +5,0 @@ "main": "./dist/index.js", |
@@ -12,2 +12,4 @@ import { Mock, Times } from 'typemoq' | ||
jest.setTimeout(20_000) | ||
describe('BusInstance Outboxing', () => { | ||
@@ -39,2 +41,43 @@ describe('when a message is sent from outside of a handler', () => { | ||
}) | ||
describe('when a large number of messages are sent from inside a handler', () => { | ||
let bus: BusInstance | ||
beforeAll(async () => { | ||
const numberOfMessages = 20_000 | ||
bus = Bus.configure() | ||
.withHandler( | ||
handlerFor(TestCommand, async () => { | ||
const publishMessages = new Array(numberOfMessages) | ||
.fill(undefined) | ||
.map(async () => bus.publish(new TestEvent())) | ||
await Promise.all(publishMessages) | ||
}) | ||
) | ||
.build() | ||
let messagesPublishedCount = 0 | ||
const messagesPublished = new Promise<void>(resolve => { | ||
bus.afterPublish.on(() => { | ||
if (++messagesPublishedCount === numberOfMessages) { | ||
resolve() | ||
} | ||
}) | ||
}) | ||
await bus.initialize() | ||
await bus.start() | ||
await bus.send(new TestCommand()) | ||
await messagesPublished | ||
}) | ||
afterAll(async () => { | ||
await bus.dispose() | ||
}) | ||
it('should dispatch all messages without exhausting the heap', () => { | ||
// If code execution reaches this point, the test has passed and not run out of memory | ||
}) | ||
}) | ||
describe('when a message is sent from two handlers, and one fails', () => { | ||
@@ -88,3 +131,2 @@ let bus: BusInstance | ||
}) | ||
describe('when a message is sent in a workflow handler, that fails to persist', () => { | ||
@@ -91,0 +133,0 @@ let bus: BusInstance |
@@ -51,2 +51,12 @@ import { Transport, TransportMessage } from '../transport' | ||
export interface AfterSend { | ||
command: Command | ||
attributes?: MessageAttributes | ||
} | ||
export interface AfterPublish { | ||
event: Event | ||
attributes?: MessageAttributes | ||
} | ||
export interface OnError<TTransportMessage> { | ||
@@ -74,7 +84,33 @@ message: Message | ||
export class BusInstance<TTransportMessage = {}> { | ||
/** | ||
* Emitted before a command is sent to the transport | ||
*/ | ||
readonly beforeSend = new TypedEmitter<BeforeSend>() | ||
/** | ||
* Emitted before an event is published to the transport | ||
*/ | ||
readonly beforePublish = new TypedEmitter<BeforePublish>() | ||
/** | ||
* Emitted after a command has been sent to the transport | ||
*/ | ||
readonly afterSend = new TypedEmitter<AfterSend>() | ||
/** | ||
* Emitted after an event has been published to the transport | ||
*/ | ||
readonly afterPublish = new TypedEmitter<AfterPublish>() | ||
/** | ||
* Emitted when an error occurs during message handling | ||
*/ | ||
readonly onError = new TypedEmitter<OnError<TTransportMessage>>() | ||
/** | ||
* Emitted immediately after a message has been received from the transport | ||
*/ | ||
readonly afterReceive = new TypedEmitter<AfterReceive<TTransportMessage>>() | ||
/** | ||
* Emitted before a message is dispatched to handlers | ||
*/ | ||
readonly beforeDispatch = new TypedEmitter<BeforeDispatch>() | ||
/** | ||
* Emitted after a message has been dispatched and completed all handler invocations | ||
*/ | ||
readonly afterDispatch = new TypedEmitter<AfterDispatch>() | ||
@@ -86,3 +122,5 @@ | ||
private isInitialized = false | ||
private outbox: ALS<(() => Promise<void>)[]> = new ALS() | ||
private outbox: ALS< | ||
{ command?: Command; event?: Event; attributes?: MessageAttributes }[] | ||
> = new ALS() | ||
@@ -162,3 +200,3 @@ constructor( | ||
const outbox = this.outbox.get('outbox')! | ||
outbox.push(async () => this.transport.publish(event, attributes)) | ||
outbox.push({ event, attributes }) | ||
this.outbox.set('outbox', outbox) | ||
@@ -186,3 +224,3 @@ } else { | ||
const outbox = this.outbox.get('outbox')! | ||
outbox.push(async () => this.transport.send(command, attributes)) | ||
outbox.push({ command, attributes }) | ||
this.outbox.set('outbox', outbox) | ||
@@ -475,8 +513,26 @@ } else { | ||
const outboxedMessages = this.outbox.get('outbox') | ||
if (outboxedMessages) { | ||
await Promise.all( | ||
outboxedMessages.map(dispatchOutboxedMessage => | ||
dispatchOutboxedMessage() | ||
) | ||
) | ||
if (outboxedMessages && outboxedMessages.length > 0) { | ||
// In case of a large number of messages to send, use a worker pool to dispatch so that we don't blow out heap usage | ||
const dispatchWorkerCount = Math.min(outboxedMessages.length, 10) | ||
const workers = new Array(dispatchWorkerCount) | ||
.fill(undefined) | ||
.map(async () => { | ||
while (true) { | ||
const messageToSend = outboxedMessages.shift() | ||
if (messageToSend === undefined) { | ||
break | ||
} | ||
const { command, event, attributes } = messageToSend | ||
if (command) { | ||
await this.transport.send(command, attributes) | ||
this.afterSend.emit({ command, attributes }) | ||
} else if (event) { | ||
await this.transport.publish(event, attributes) | ||
this.afterPublish.emit({ event, attributes }) | ||
} | ||
} | ||
}) | ||
await Promise.all(workers) | ||
} | ||
@@ -483,0 +539,0 @@ }) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
366091
8949