@node-ts/bus-core
Advanced tools
Comparing version 0.2.2 to 0.3.0
import { Message } from '@node-ts/bus-messages'; | ||
import { MessageOptions } from '../service-bus'; | ||
import { ClassConstructor } from '@node-ts/logger-core'; | ||
@@ -10,5 +11,7 @@ export interface HandlerPrototype { | ||
* An interface used by `HandlesMessages` used to dispatch messages to | ||
* @param message A message that has been received from the bus and passed to the handler for processing | ||
* @param options (optional) Additional message options and metadata that were sent along with the message | ||
*/ | ||
export interface Handler<TMessage extends Message> { | ||
handle(message: TMessage): Promise<void> | void; | ||
handle(message: TMessage, messageOptions?: MessageOptions): Promise<void> | void; | ||
} |
export { BUS_SYMBOLS } from './bus-symbols'; | ||
export * from './bus-module'; | ||
export * from './service-bus/bus'; | ||
export * from './service-bus/message-options'; | ||
export { Transport, TransportMessage } from './transport'; | ||
@@ -5,0 +6,0 @@ export * from './application-bootstrap'; |
@@ -8,2 +8,3 @@ "use strict"; | ||
tslib_1.__exportStar(require("./service-bus/bus"), exports); | ||
tslib_1.__exportStar(require("./service-bus/message-options"), exports); | ||
tslib_1.__exportStar(require("./application-bootstrap"), exports); | ||
@@ -10,0 +11,0 @@ tslib_1.__exportStar(require("./handler"), exports); |
import { Event, Command } from '@node-ts/bus-messages'; | ||
import { MessageOptions } from './message-options'; | ||
export declare enum BusState { | ||
@@ -9,6 +10,6 @@ Stopped = "stopped", | ||
export interface Bus { | ||
publish<EventType extends Event>(event: EventType): Promise<void>; | ||
send<CommandType extends Command>(command: CommandType): Promise<void>; | ||
publish<EventType extends Event>(event: EventType, messageOptions?: MessageOptions): Promise<void>; | ||
send<CommandType extends Command>(command: CommandType, messageOptions?: MessageOptions): Promise<void>; | ||
start(): Promise<void>; | ||
stop(): Promise<void>; | ||
} |
export * from './bus'; | ||
export * from './service-bus'; | ||
export * from './message-options'; |
@@ -6,2 +6,3 @@ "use strict"; | ||
tslib_1.__exportStar(require("./service-bus"), exports); | ||
tslib_1.__exportStar(require("./message-options"), exports); | ||
//# sourceMappingURL=index.js.map |
@@ -6,2 +6,3 @@ import { Bus, BusState } from './bus'; | ||
import { HandlerRegistry } from '../handler'; | ||
import { MessageOptions } from './message-options'; | ||
export declare class ServiceBus implements Bus { | ||
@@ -14,4 +15,4 @@ private readonly transport; | ||
constructor(transport: Transport<{}>, logger: Logger, handlerRegistry: HandlerRegistry); | ||
publish<TEvent extends Event>(event: TEvent): Promise<void>; | ||
send<TCommand extends Command>(command: TCommand): Promise<void>; | ||
publish<TEvent extends Event>(event: TEvent, messageOptions?: MessageOptions): Promise<void>; | ||
send<TCommand extends Command>(command: TCommand, messageOptions?: MessageOptions): Promise<void>; | ||
start(): Promise<void>; | ||
@@ -18,0 +19,0 @@ stop(): Promise<void>; |
@@ -12,2 +12,3 @@ "use strict"; | ||
const serializeError = require("serialize-error"); | ||
const message_options_1 = require("./message-options"); | ||
const EMPTY_QUEUE_SLEEP_MS = 500; | ||
@@ -22,9 +23,9 @@ let ServiceBus = class ServiceBus { | ||
} | ||
async publish(event) { | ||
async publish(event, messageOptions = new message_options_1.MessageOptions()) { | ||
this.logger.debug('publish', { event }); | ||
return this.transport.publish(event); | ||
return this.transport.publish(event, messageOptions); | ||
} | ||
async send(command) { | ||
async send(command, messageOptions = new message_options_1.MessageOptions()) { | ||
this.logger.debug('send', { command }); | ||
return this.transport.send(command); | ||
return this.transport.send(command, messageOptions); | ||
} | ||
@@ -69,3 +70,3 @@ async start() { | ||
try { | ||
await this.dispatchMessageToHandlers(message.domainMessage); | ||
await this.dispatchMessageToHandlers(message.domainMessage, message.options); | ||
this.logger.debug('Message dispatched to all handlers', { message }); | ||
@@ -87,3 +88,3 @@ await this.transport.deleteMessage(message); | ||
} | ||
async dispatchMessageToHandlers(message) { | ||
async dispatchMessageToHandlers(message, messageOptions) { | ||
const handlers = this.handlerRegistry.get(message.$name); | ||
@@ -96,3 +97,3 @@ if (handlers.length === 0) { | ||
await Promise.all(handlersToInvoke.map(async (h) => { | ||
await h.handle(message); | ||
await h.handle(message, messageOptions); | ||
})); | ||
@@ -99,0 +100,0 @@ } |
@@ -6,2 +6,3 @@ import { Transport } from './transport'; | ||
import { HandlerRegistry } from '../handler'; | ||
import { MessageOptions } from '../service-bus'; | ||
export declare const RETRY_LIMIT = 10; | ||
@@ -37,4 +38,4 @@ export interface InMemoryMessage { | ||
dispose(): Promise<void>; | ||
publish<TEvent extends Event>(event: TEvent): Promise<void>; | ||
send<TCommand extends Command>(command: TCommand): Promise<void>; | ||
publish<TEvent extends Event>(event: TEvent, messageOptions: MessageOptions): Promise<void>; | ||
send<TCommand extends Command>(command: TCommand, messageOptions: MessageOptions): Promise<void>; | ||
readNextMessage(): Promise<TransportMessage<InMemoryMessage> | undefined>; | ||
@@ -41,0 +42,0 @@ deleteMessage(message: TransportMessage<InMemoryMessage>): Promise<void>; |
@@ -30,7 +30,7 @@ "use strict"; | ||
} | ||
async publish(event) { | ||
this.addToQueue(event); | ||
async publish(event, messageOptions) { | ||
this.addToQueue(event, messageOptions); | ||
} | ||
async send(command) { | ||
this.addToQueue(command); | ||
async send(command, messageOptions) { | ||
this.addToQueue(command, messageOptions); | ||
} | ||
@@ -75,5 +75,5 @@ async readNextMessage() { | ||
} | ||
addToQueue(message) { | ||
addToQueue(message, messageOptions) { | ||
if (this.messagesWithHandlers[message.$name]) { | ||
const transportMessage = toTransportMessage(message, false); | ||
const transportMessage = toTransportMessage(message, messageOptions, false); | ||
this.queue.push(transportMessage); | ||
@@ -93,6 +93,7 @@ this.logger.debug('Added message to queue', { message, queueSize: this.queue.length }); | ||
exports.MemoryQueue = MemoryQueue; | ||
function toTransportMessage(message, isProcessing) { | ||
function toTransportMessage(message, messageOptions, isProcessing) { | ||
return { | ||
id: undefined, | ||
domainMessage: message, | ||
options: messageOptions, | ||
raw: { | ||
@@ -99,0 +100,0 @@ seenCount: 0, |
import { Message } from '@node-ts/bus-messages'; | ||
import { MessageOptions } from '../service-bus'; | ||
/** | ||
@@ -19,2 +20,6 @@ * A message from the transport provider that encapsulates the raw message | ||
raw: TransportMessageType; | ||
/** | ||
* Additional attributes and metadata that was sent along with the message | ||
*/ | ||
options: MessageOptions; | ||
} |
import { Event, Command } from '@node-ts/bus-messages'; | ||
import { TransportMessage } from './transport-message'; | ||
import { HandlerRegistry } from '../handler'; | ||
import { MessageOptions } from '../service-bus'; | ||
/** | ||
@@ -12,4 +13,6 @@ * A transport adapter interface that enables the service bus to use a messaging technology. | ||
* @param event A domain event to be published | ||
* @param messageOptions Options that control the behaviour around how the message is sent and | ||
* additional information that travels with it. | ||
*/ | ||
publish<TEvent extends Event>(event: TEvent): Promise<void>; | ||
publish<TEvent extends Event>(event: TEvent, messageOptions: MessageOptions): Promise<void>; | ||
/** | ||
@@ -19,4 +22,6 @@ * Sends a command to the underlying transport. This is generally done to a topic or some other | ||
* @param command A domain command to be sent | ||
* @param messageOptions Options that control the behaviour around how the message is sent and | ||
* additional information that travels with it. | ||
*/ | ||
send<TCommand extends Command>(command: TCommand): Promise<void>; | ||
send<TCommand extends Command>(command: TCommand, messageOptions: MessageOptions): Promise<void>; | ||
/** | ||
@@ -23,0 +28,0 @@ * Fetch the next message from the underlying queue. If there are no messages, then `undefined` |
{ | ||
"name": "@node-ts/bus-core", | ||
"version": "0.2.2", | ||
"version": "0.3.0", | ||
"description": "A service bus for message-based, distributed node applications", | ||
@@ -28,2 +28,3 @@ "main": "./dist/index.js", | ||
"@node-ts/logger-core": "^0.0.17", | ||
"@types/faker": "^4.1.5", | ||
"inversify": "^5.0.1", | ||
@@ -50,3 +51,3 @@ "tslint": "^5.12.1", | ||
], | ||
"gitHead": "bacefae4e4f09c32a6337f6072910121a282ed21" | ||
"gitHead": "5894011a1c2150aa25735267c9ef78dcaeb0cd70" | ||
} |
import { Message } from '@node-ts/bus-messages' | ||
import { MessageOptions } from '../service-bus' | ||
import { ClassConstructor } from '@node-ts/logger-core' | ||
@@ -12,5 +13,7 @@ | ||
* An interface used by `HandlesMessages` used to dispatch messages to | ||
* @param message A message that has been received from the bus and passed to the handler for processing | ||
* @param options (optional) Additional message options and metadata that were sent along with the message | ||
*/ | ||
export interface Handler<TMessage extends Message> { | ||
handle (message: TMessage): Promise<void> | void | ||
handle (message: TMessage, messageOptions?: MessageOptions): Promise<void> | void | ||
} |
export { BUS_SYMBOLS } from './bus-symbols' | ||
export * from './bus-module' | ||
export * from './service-bus/bus' | ||
export * from './service-bus/message-options' | ||
export { Transport, TransportMessage } from './transport' | ||
@@ -5,0 +6,0 @@ export * from './application-bootstrap' |
import { Event, Command } from '@node-ts/bus-messages' | ||
import { MessageOptions } from './message-options' | ||
@@ -11,6 +12,6 @@ export enum BusState { | ||
export interface Bus { | ||
publish<EventType extends Event> (event: EventType): Promise<void> | ||
send<CommandType extends Command> (command: CommandType): Promise<void> | ||
publish<EventType extends Event> (event: EventType, messageOptions?: MessageOptions): Promise<void> | ||
send<CommandType extends Command> (command: CommandType, messageOptions?: MessageOptions): Promise<void> | ||
start (): Promise<void> | ||
stop (): Promise<void> | ||
} |
export * from './bus' | ||
export * from './service-bus' | ||
export * from './message-options' |
@@ -11,2 +11,3 @@ import { injectable, inject } from 'inversify' | ||
import * as serializeError from 'serialize-error' | ||
import { MessageOptions } from './message-options' | ||
@@ -29,10 +30,16 @@ const EMPTY_QUEUE_SLEEP_MS = 500 | ||
async publish<TEvent extends Event> (event: TEvent): Promise<void> { | ||
async publish<TEvent extends Event> ( | ||
event: TEvent, | ||
messageOptions: MessageOptions = new MessageOptions() | ||
): Promise<void> { | ||
this.logger.debug('publish', { event }) | ||
return this.transport.publish(event) | ||
return this.transport.publish(event, messageOptions) | ||
} | ||
async send<TCommand extends Command> (command: TCommand): Promise<void> { | ||
async send<TCommand extends Command> ( | ||
command: TCommand, | ||
messageOptions: MessageOptions = new MessageOptions() | ||
): Promise<void> { | ||
this.logger.debug('send', { command }) | ||
return this.transport.send(command) | ||
return this.transport.send(command, messageOptions) | ||
} | ||
@@ -87,3 +94,3 @@ | ||
try { | ||
await this.dispatchMessageToHandlers(message.domainMessage) | ||
await this.dispatchMessageToHandlers(message.domainMessage, message.options) | ||
this.logger.debug('Message dispatched to all handlers', { message }) | ||
@@ -107,3 +114,3 @@ await this.transport.deleteMessage(message) | ||
private async dispatchMessageToHandlers (message: Message): Promise<void> { | ||
private async dispatchMessageToHandlers (message: Message, messageOptions: MessageOptions): Promise<void> { | ||
const handlers = this.handlerRegistry.get(message.$name) | ||
@@ -117,5 +124,5 @@ if (handlers.length === 0) { | ||
await Promise.all(handlersToInvoke.map(async h => { | ||
await h.handle(message) | ||
await h.handle(message, messageOptions) | ||
})) | ||
} | ||
} |
@@ -7,2 +7,4 @@ import { MemoryQueue, InMemoryMessage, RETRY_LIMIT } from './memory-queue' | ||
import { HandlerRegistry } from '../handler' | ||
import { MessageOptions } from '../service-bus' | ||
import * as faker from 'faker' | ||
@@ -16,2 +18,5 @@ const event = new TestEvent() | ||
const handledMessageNames = [TestCommand.NAME, TestEvent.NAME] | ||
const messageOptions: MessageOptions = { | ||
correlationId: faker.random.uuid() | ||
} | ||
@@ -33,3 +38,3 @@ beforeEach(async () => { | ||
it('should push the event onto the memory queue', async () => { | ||
await sut.publish(event) | ||
await sut.publish(event, messageOptions) | ||
expect(sut.depth).toEqual(1) | ||
@@ -41,3 +46,3 @@ }) | ||
it('should push the command onto the memory queue', async () => { | ||
await sut.send(command) | ||
await sut.send(command, messageOptions) | ||
expect(sut.depth).toEqual(1) | ||
@@ -49,3 +54,3 @@ }) | ||
it('should not push the message onto the queue', async () => { | ||
await sut.send(command2) | ||
await sut.send(command2, messageOptions) | ||
expect(sut.depth).toEqual(0) | ||
@@ -62,3 +67,3 @@ }) | ||
it('should return the message when the queue has one', async () => { | ||
await sut.publish(event) | ||
await sut.publish(event, messageOptions) | ||
const message = await sut.readNextMessage() | ||
@@ -69,3 +74,3 @@ expect(message!.domainMessage).toEqual(event) | ||
it('should read new messages with seenCount equal to 1', async () => { | ||
await sut.publish(event) | ||
await sut.publish(event, messageOptions) | ||
const message = await sut.readNextMessage() | ||
@@ -76,4 +81,4 @@ expect(message!.raw.seenCount).toEqual(0) | ||
it('should return the oldest message when there are many', async () => { | ||
await sut.publish(event) | ||
await sut.send(command) | ||
await sut.publish(event, messageOptions) | ||
await sut.send(command, {}) | ||
@@ -88,3 +93,3 @@ const firstMessage = await sut.readNextMessage() | ||
it('should retain the queue depth while the message is unacknowledged', async () => { | ||
await sut.publish(event) | ||
await sut.publish(event, messageOptions) | ||
expect(sut.depth).toEqual(1) | ||
@@ -103,3 +108,3 @@ | ||
beforeEach(async () => { | ||
await sut.publish(event) | ||
await sut.publish(event, messageOptions) | ||
message = await sut.readNextMessage() | ||
@@ -127,3 +132,3 @@ }) | ||
beforeEach(async () => { | ||
await sut.publish(event) | ||
await sut.publish(event, messageOptions) | ||
@@ -130,0 +135,0 @@ let attempt = 0 |
@@ -7,2 +7,3 @@ import { injectable, inject } from 'inversify' | ||
import { HandlerRegistry } from '../handler' | ||
import { MessageOptions } from '../service-bus' | ||
@@ -59,8 +60,8 @@ export const RETRY_LIMIT = 10 | ||
async publish<TEvent extends Event> (event: TEvent): Promise<void> { | ||
this.addToQueue(event) | ||
async publish<TEvent extends Event> (event: TEvent, messageOptions: MessageOptions): Promise<void> { | ||
this.addToQueue(event, messageOptions) | ||
} | ||
async send<TCommand extends Command> (command: TCommand): Promise<void> { | ||
this.addToQueue(command) | ||
async send<TCommand extends Command> (command: TCommand, messageOptions: MessageOptions): Promise<void> { | ||
this.addToQueue(command, messageOptions) | ||
} | ||
@@ -114,5 +115,5 @@ | ||
private addToQueue (message: Message): void { | ||
private addToQueue (message: Message, messageOptions: MessageOptions): void { | ||
if (this.messagesWithHandlers[message.$name]) { | ||
const transportMessage = toTransportMessage(message, false) | ||
const transportMessage = toTransportMessage(message, messageOptions, false) | ||
this.queue.push(transportMessage) | ||
@@ -126,6 +127,11 @@ this.logger.debug('Added message to queue', { message, queueSize: this.queue.length }) | ||
function toTransportMessage (message: Message, isProcessing: boolean): TransportMessage<InMemoryMessage> { | ||
function toTransportMessage ( | ||
message: Message, | ||
messageOptions: MessageOptions, | ||
isProcessing: boolean | ||
): TransportMessage<InMemoryMessage> { | ||
return { | ||
id: undefined, | ||
domainMessage: message, | ||
options: messageOptions, | ||
raw: { | ||
@@ -132,0 +138,0 @@ seenCount: 0, |
import { Message } from '@node-ts/bus-messages' | ||
import { MessageOptions } from '../service-bus' | ||
@@ -22,2 +23,7 @@ /** | ||
raw: TransportMessageType | ||
/** | ||
* Additional attributes and metadata that was sent along with the message | ||
*/ | ||
options: MessageOptions | ||
} |
import { Event, Command } from '@node-ts/bus-messages' | ||
import { TransportMessage } from './transport-message' | ||
import { HandlerRegistry } from '../handler' | ||
import { MessageOptions } from '../service-bus' | ||
@@ -13,4 +14,6 @@ /** | ||
* @param event A domain event to be published | ||
* @param messageOptions Options that control the behaviour around how the message is sent and | ||
* additional information that travels with it. | ||
*/ | ||
publish<TEvent extends Event> (event: TEvent): Promise<void> | ||
publish<TEvent extends Event> (event: TEvent, messageOptions: MessageOptions): Promise<void> | ||
@@ -21,4 +24,6 @@ /** | ||
* @param command A domain command to be sent | ||
* @param messageOptions Options that control the behaviour around how the message is sent and | ||
* additional information that travels with it. | ||
*/ | ||
send<TCommand extends Command> (command: TCommand): Promise<void> | ||
send<TCommand extends Command> (command: TCommand, messageOptions: MessageOptions): Promise<void> | ||
@@ -25,0 +30,0 @@ /** |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
116978
112
2215
7