@node-ts/bus-core
Advanced tools
Comparing version 0.3.1 to 0.4.0
@@ -1,4 +0,6 @@ | ||
import { ContainerModule } from 'inversify'; | ||
import { ContainerModule, interfaces } from 'inversify'; | ||
export declare type SessionScopeBinder = (bind: interfaces.Bind) => void; | ||
export declare const defaultSessionScopeBinder: SessionScopeBinder; | ||
export declare class BusModule extends ContainerModule { | ||
constructor(); | ||
} |
@@ -11,6 +11,11 @@ "use strict"; | ||
const logger_core_1 = require("@node-ts/logger-core"); | ||
exports.defaultSessionScopeBinder = (bind) => { | ||
bind(bus_symbols_1.BUS_SYMBOLS.Bus).to(service_bus_1.ServiceBus).inSingletonScope(); | ||
}; | ||
class BusModule extends inversify_1.ContainerModule { | ||
constructor() { | ||
super(bind => { | ||
bindService(bind, bus_symbols_1.BUS_SYMBOLS.Bus, service_bus_1.ServiceBus).inSingletonScope(); | ||
bind(bus_symbols_1.BUS_INTERNAL_SYMBOLS.SessionScopeBinder).toConstantValue(exports.defaultSessionScopeBinder); | ||
exports.defaultSessionScopeBinder(bind); | ||
logger_core_1.bindLogger(bind, service_bus_1.ServiceBus); | ||
bindService(bind, bus_symbols_1.BUS_SYMBOLS.Transport, transport_1.MemoryQueue).inSingletonScope(); | ||
@@ -21,2 +26,3 @@ bindService(bind, bus_symbols_1.BUS_SYMBOLS.Serializer, serialization_1.JsonSerializer); | ||
bindService(bind, bus_symbols_1.BUS_SYMBOLS.JsonSerializer, serialization_1.JsonSerializer); | ||
bind(bus_symbols_1.BUS_SYMBOLS.MessageHandlingContext).toConstantValue({}); | ||
}); | ||
@@ -23,0 +29,0 @@ } |
@@ -8,3 +8,6 @@ export declare const BUS_SYMBOLS: { | ||
JsonSerializer: symbol; | ||
MessageHandlingContext: symbol; | ||
}; | ||
export declare const BUS_INTERNAL_SYMBOLS: {}; | ||
export declare const BUS_INTERNAL_SYMBOLS: { | ||
SessionScopeBinder: symbol; | ||
}; |
@@ -9,5 +9,8 @@ "use strict"; | ||
ApplicationBootstrap: Symbol.for('@node-ts/bus-core/application-bootstrap'), | ||
JsonSerializer: Symbol.for('@node-ts/bus-core/json-serializer') | ||
JsonSerializer: Symbol.for('@node-ts/bus-core/json-serializer'), | ||
MessageHandlingContext: Symbol.for('@node-ts/bus-core/message-handling-context') | ||
}; | ||
exports.BUS_INTERNAL_SYMBOLS = {}; | ||
exports.BUS_INTERNAL_SYMBOLS = { | ||
SessionScopeBinder: Symbol.for('@node-ts/bus-core/session-scope-binder') | ||
}; | ||
//# sourceMappingURL=bus-symbols.js.map |
@@ -7,3 +7,3 @@ import { Message } from '@node-ts/bus-messages'; | ||
declare type HandlerType = ClassConstructor<Handler<Message>> | ((context: interfaces.Context) => Handler<Message>); | ||
interface HandlerRegistration<MessageType extends Message> { | ||
export interface HandlerRegistration<MessageType extends Message> { | ||
defaultContainer: Container; | ||
@@ -10,0 +10,0 @@ resolveHandler(handlerContextContainer: Container): Handler<MessageType>; |
import { Message } from '@node-ts/bus-messages'; | ||
import { MessageOptions } from '../service-bus'; | ||
import { MessageAttributes } from '../service-bus'; | ||
import { ClassConstructor } from '@node-ts/logger-core'; | ||
@@ -15,3 +15,3 @@ export interface HandlerPrototype { | ||
export interface Handler<TMessage extends Message> { | ||
handle(message: TMessage, messageOptions?: MessageOptions): Promise<void> | void; | ||
handle(message: TMessage, messageOptions?: MessageAttributes): Promise<void> | void; | ||
} |
export { BUS_SYMBOLS } from './bus-symbols'; | ||
export * from './bus-module'; | ||
export * from './service-bus/bus'; | ||
export * from './service-bus/message-options'; | ||
export * from './service-bus/message-attributes'; | ||
export { Transport, TransportMessage } from './transport'; | ||
@@ -6,0 +6,0 @@ export * from './application-bootstrap'; |
@@ -8,3 +8,3 @@ "use strict"; | ||
tslib_1.__exportStar(require("./service-bus/bus"), exports); | ||
tslib_1.__exportStar(require("./service-bus/message-options"), exports); | ||
tslib_1.__exportStar(require("./service-bus/message-attributes"), exports); | ||
tslib_1.__exportStar(require("./application-bootstrap"), exports); | ||
@@ -11,0 +11,0 @@ tslib_1.__exportStar(require("./handler"), exports); |
import { Event, Command } from '@node-ts/bus-messages'; | ||
import { MessageOptions } from './message-options'; | ||
import { MessageAttributes } from './message-attributes'; | ||
export declare enum BusState { | ||
@@ -10,6 +10,6 @@ Stopped = "stopped", | ||
export interface Bus { | ||
publish<EventType extends Event>(event: EventType, messageOptions?: MessageOptions): Promise<void>; | ||
send<CommandType extends Command>(command: CommandType, messageOptions?: MessageOptions): Promise<void>; | ||
publish<EventType extends Event>(event: EventType, messageOptions?: MessageAttributes): Promise<void>; | ||
send<CommandType extends Command>(command: CommandType, messageOptions?: MessageAttributes): Promise<void>; | ||
start(): Promise<void>; | ||
stop(): Promise<void>; | ||
} |
export * from './bus'; | ||
export * from './service-bus'; | ||
export * from './message-options'; | ||
export * from './message-attributes'; |
@@ -6,3 +6,3 @@ "use strict"; | ||
tslib_1.__exportStar(require("./service-bus"), exports); | ||
tslib_1.__exportStar(require("./message-options"), exports); | ||
tslib_1.__exportStar(require("./message-attributes"), exports); | ||
//# sourceMappingURL=index.js.map |
@@ -6,3 +6,3 @@ import { Bus, BusState } from './bus'; | ||
import { HandlerRegistry } from '../handler'; | ||
import { MessageOptions } from './message-options'; | ||
import { MessageAttributes } from './message-attributes'; | ||
export declare class ServiceBus implements Bus { | ||
@@ -12,7 +12,8 @@ private readonly transport; | ||
private readonly handlerRegistry; | ||
private readonly messageHandlingContext; | ||
private internalState; | ||
private runningWorkerCount; | ||
constructor(transport: Transport<{}>, logger: Logger, handlerRegistry: HandlerRegistry); | ||
publish<TEvent extends Event>(event: TEvent, messageOptions?: MessageOptions): Promise<void>; | ||
send<TCommand extends Command>(command: TCommand, messageOptions?: MessageOptions): Promise<void>; | ||
constructor(transport: Transport<{}>, logger: Logger, handlerRegistry: HandlerRegistry, messageHandlingContext: MessageAttributes); | ||
publish<TEvent extends Event>(event: TEvent, messageOptions?: MessageAttributes): Promise<void>; | ||
send<TCommand extends Command>(command: TCommand, messageOptions?: MessageAttributes): Promise<void>; | ||
start(): Promise<void>; | ||
@@ -24,2 +25,3 @@ stop(): Promise<void>; | ||
private dispatchMessageToHandlers; | ||
private prepareTransportOptions; | ||
} |
@@ -12,19 +12,22 @@ "use strict"; | ||
const serializeError = require("serialize-error"); | ||
const message_options_1 = require("./message-options"); | ||
const message_attributes_1 = require("./message-attributes"); | ||
const EMPTY_QUEUE_SLEEP_MS = 500; | ||
let ServiceBus = class ServiceBus { | ||
constructor(transport, logger, handlerRegistry) { | ||
constructor(transport, logger, handlerRegistry, messageHandlingContext) { | ||
this.transport = transport; | ||
this.logger = logger; | ||
this.handlerRegistry = handlerRegistry; | ||
this.messageHandlingContext = messageHandlingContext; | ||
this.internalState = bus_1.BusState.Stopped; | ||
this.runningWorkerCount = 0; | ||
} | ||
async publish(event, messageOptions = new message_options_1.MessageOptions()) { | ||
async publish(event, messageOptions = new message_attributes_1.MessageAttributes()) { | ||
this.logger.debug('publish', { event }); | ||
return this.transport.publish(event, messageOptions); | ||
const transportOptions = this.prepareTransportOptions(messageOptions); | ||
return this.transport.publish(event, transportOptions); | ||
} | ||
async send(command, messageOptions = new message_options_1.MessageOptions()) { | ||
async send(command, messageOptions = new message_attributes_1.MessageAttributes()) { | ||
this.logger.debug('send', { command }); | ||
return this.transport.send(command, messageOptions); | ||
const transportOptions = this.prepareTransportOptions(messageOptions); | ||
return this.transport.send(command, transportOptions); | ||
} | ||
@@ -69,3 +72,3 @@ async start() { | ||
try { | ||
await this.dispatchMessageToHandlers(message.domainMessage, message.options); | ||
await this.dispatchMessageToHandlers(message.domainMessage, message.attributes); | ||
this.logger.debug('Message dispatched to all handlers', { message }); | ||
@@ -87,3 +90,3 @@ await this.transport.deleteMessage(message); | ||
} | ||
async dispatchMessageToHandlers(message, messageOptions) { | ||
async dispatchMessageToHandlers(message, context) { | ||
const handlers = this.handlerRegistry.get(message.$name); | ||
@@ -94,7 +97,13 @@ if (handlers.length === 0) { | ||
} | ||
const handlersToInvoke = handlers.map(h => h.resolveHandler(h.defaultContainer)); | ||
await Promise.all(handlersToInvoke.map(async (h) => { | ||
await h.handle(message, messageOptions); | ||
})); | ||
const handlersToInvoke = handlers.map(handler => dispatchMessageToHandler(message, context, handler)); | ||
await Promise.all(handlersToInvoke); | ||
} | ||
prepareTransportOptions(clientOptions) { | ||
const result = { | ||
correlationId: clientOptions.correlationId || this.messageHandlingContext.correlationId, | ||
attributes: clientOptions.attributes, | ||
stickyAttributes: Object.assign({}, clientOptions.stickyAttributes, this.messageHandlingContext.stickyAttributes) | ||
}; | ||
return result; | ||
} | ||
}; | ||
@@ -107,5 +116,19 @@ ServiceBus = tslib_1.__decorate([ | ||
tslib_1.__param(2, inversify_1.inject(bus_symbols_1.BUS_SYMBOLS.HandlerRegistry)), | ||
tslib_1.__metadata("design:paramtypes", [Object, Object, handler_1.HandlerRegistry]) | ||
tslib_1.__param(3, inversify_1.inject(bus_symbols_1.BUS_SYMBOLS.MessageHandlingContext)), | ||
tslib_1.__metadata("design:paramtypes", [Object, Object, handler_1.HandlerRegistry, | ||
message_attributes_1.MessageAttributes]) | ||
], ServiceBus); | ||
exports.ServiceBus = ServiceBus; | ||
async function dispatchMessageToHandler(message, context, handlerRegistration) { | ||
const container = handlerRegistration.defaultContainer; | ||
const childContainer = container.createChild(); | ||
childContainer | ||
.bind(bus_symbols_1.BUS_SYMBOLS.MessageHandlingContext) | ||
.toConstantValue(context); | ||
const sessionScopeBinder = container.get(bus_symbols_1.BUS_INTERNAL_SYMBOLS.SessionScopeBinder); | ||
// tslint:disable-next-line:no-unsafe-any | ||
sessionScopeBinder(childContainer.bind.bind(childContainer)); | ||
const handler = handlerRegistration.resolveHandler(childContainer); | ||
handler.handle(message, context); | ||
} | ||
//# sourceMappingURL=service-bus.js.map |
@@ -6,3 +6,3 @@ import { Transport } from './transport'; | ||
import { HandlerRegistry } from '../handler'; | ||
import { MessageOptions } from '../service-bus'; | ||
import { MessageAttributes } from '../service-bus'; | ||
export declare const RETRY_LIMIT = 10; | ||
@@ -38,4 +38,4 @@ export interface InMemoryMessage { | ||
dispose(): Promise<void>; | ||
publish<TEvent extends Event>(event: TEvent, messageOptions: MessageOptions): Promise<void>; | ||
send<TCommand extends Command>(command: TCommand, messageOptions: MessageOptions): Promise<void>; | ||
publish<TEvent extends Event>(event: TEvent, messageOptions: MessageAttributes): Promise<void>; | ||
send<TCommand extends Command>(command: TCommand, messageOptions: MessageAttributes): Promise<void>; | ||
readNextMessage(): Promise<TransportMessage<InMemoryMessage> | undefined>; | ||
@@ -42,0 +42,0 @@ deleteMessage(message: TransportMessage<InMemoryMessage>): Promise<void>; |
@@ -95,3 +95,3 @@ "use strict"; | ||
domainMessage: message, | ||
options: messageOptions, | ||
attributes: messageOptions, | ||
raw: { | ||
@@ -98,0 +98,0 @@ seenCount: 0, |
import { Message } from '@node-ts/bus-messages'; | ||
import { MessageOptions } from '../service-bus'; | ||
import { MessageAttributes } from '../service-bus'; | ||
/** | ||
@@ -23,3 +23,3 @@ * A message from the transport provider that encapsulates the raw message | ||
*/ | ||
options: MessageOptions; | ||
attributes: MessageAttributes; | ||
} |
import { Event, Command } from '@node-ts/bus-messages'; | ||
import { TransportMessage } from './transport-message'; | ||
import { HandlerRegistry } from '../handler'; | ||
import { MessageOptions } from '../service-bus'; | ||
import { MessageAttributes } from '../service-bus'; | ||
/** | ||
@@ -16,3 +16,3 @@ * A transport adapter interface that enables the service bus to use a messaging technology. | ||
*/ | ||
publish<TEvent extends Event>(event: TEvent, messageOptions: MessageOptions): Promise<void>; | ||
publish<TEvent extends Event>(event: TEvent, messageOptions: MessageAttributes): Promise<void>; | ||
/** | ||
@@ -25,3 +25,3 @@ * Sends a command to the underlying transport. This is generally done to a topic or some other | ||
*/ | ||
send<TCommand extends Command>(command: TCommand, messageOptions: MessageOptions): Promise<void>; | ||
send<TCommand extends Command>(command: TCommand, messageOptions: MessageAttributes): Promise<void>; | ||
/** | ||
@@ -28,0 +28,0 @@ * Fetch the next message from the underlying queue. If there are no messages, then `undefined` |
{ | ||
"name": "@node-ts/bus-core", | ||
"version": "0.3.1", | ||
"version": "0.4.0", | ||
"description": "A service bus for message-based, distributed node applications", | ||
@@ -29,2 +29,3 @@ "main": "./dist/index.js", | ||
"@types/faker": "^4.1.5", | ||
"faker": "^4.1.0", | ||
"inversify": "^5.0.1", | ||
@@ -51,3 +52,3 @@ "tslint": "^5.12.1", | ||
], | ||
"gitHead": "18b72030c51a4929214537554ff0707a640af5b3" | ||
"gitHead": "4dd4642852cb2af8e8f0941ce313045a57f1beed" | ||
} |
import { ContainerModule, interfaces } from 'inversify' | ||
import { BUS_SYMBOLS } from './bus-symbols' | ||
import { BUS_SYMBOLS, BUS_INTERNAL_SYMBOLS } from './bus-symbols' | ||
import { MemoryQueue } from './transport' | ||
@@ -11,2 +11,7 @@ import { ServiceBus } from './service-bus' | ||
export type SessionScopeBinder = (bind: interfaces.Bind) => void | ||
export const defaultSessionScopeBinder: SessionScopeBinder = (bind: interfaces.Bind) => { | ||
bind(BUS_SYMBOLS.Bus).to(ServiceBus).inSingletonScope() | ||
} | ||
export class BusModule extends ContainerModule { | ||
@@ -16,3 +21,6 @@ | ||
super(bind => { | ||
bindService(bind, BUS_SYMBOLS.Bus, ServiceBus).inSingletonScope() | ||
bind<SessionScopeBinder>(BUS_INTERNAL_SYMBOLS.SessionScopeBinder).toConstantValue(defaultSessionScopeBinder) | ||
defaultSessionScopeBinder(bind) | ||
bindLogger(bind, ServiceBus) | ||
bindService(bind, BUS_SYMBOLS.Transport, MemoryQueue).inSingletonScope() | ||
@@ -23,2 +31,4 @@ bindService(bind, BUS_SYMBOLS.Serializer, JsonSerializer) | ||
bindService(bind, BUS_SYMBOLS.JsonSerializer, JsonSerializer) | ||
bind(BUS_SYMBOLS.MessageHandlingContext).toConstantValue({}) | ||
}) | ||
@@ -25,0 +35,0 @@ } |
@@ -7,6 +7,8 @@ export const BUS_SYMBOLS = { | ||
ApplicationBootstrap: Symbol.for('@node-ts/bus-core/application-bootstrap'), | ||
JsonSerializer: Symbol.for('@node-ts/bus-core/json-serializer') | ||
JsonSerializer: Symbol.for('@node-ts/bus-core/json-serializer'), | ||
MessageHandlingContext: Symbol.for('@node-ts/bus-core/message-handling-context') | ||
} | ||
export const BUS_INTERNAL_SYMBOLS = { | ||
SessionScopeBinder: Symbol.for('@node-ts/bus-core/session-scope-binder') | ||
} |
@@ -10,3 +10,3 @@ import { Message } from '@node-ts/bus-messages' | ||
interface HandlerRegistration<MessageType extends Message> { | ||
export interface HandlerRegistration<MessageType extends Message> { | ||
defaultContainer: Container | ||
@@ -13,0 +13,0 @@ resolveHandler (handlerContextContainer: Container): Handler<MessageType> |
import { TestEvent } from '../test/test-event' | ||
import { Bus } from '../service-bus' | ||
import { Bus, MessageAttributes } from '../service-bus' | ||
import { TestCommand } from '../test/test-command' | ||
@@ -7,11 +7,21 @@ import { Container } from 'inversify' | ||
import { ApplicationBootstrap } from '../application-bootstrap' | ||
import { IMock, Mock, Times } from 'typemoq' | ||
import { IMock, Mock, Times, It } from 'typemoq' | ||
import { sleep } from '../util' | ||
import { TestContainer } from '../test/test-container' | ||
import { MessageLogger, MESSAGE_LOGGER, TestEventHandler } from '../test' | ||
import * as faker from 'faker' | ||
const event = new TestEvent() | ||
const command = new TestCommand() | ||
const attributes: MessageAttributes = { | ||
correlationId: faker.random.uuid(), | ||
attributes: { | ||
one: 1 | ||
}, | ||
stickyAttributes: { | ||
a: 'a' | ||
} | ||
} | ||
describe('Handler', () => { | ||
@@ -34,2 +44,7 @@ let serviceBus: Bus | ||
await bootstrapper.initialize(container) | ||
await serviceBus.publish(event) | ||
await serviceBus.publish(event, attributes) | ||
await serviceBus.send(command) | ||
await sleep(1) | ||
}) | ||
@@ -42,10 +57,14 @@ | ||
describe('when a handled message is received', () => { | ||
beforeAll(async () => { | ||
await serviceBus.publish(event) | ||
await sleep(1) | ||
it('should dispatch to the registered handler', () => { | ||
messageLogger.verify( | ||
m => m.log(event), | ||
Times.exactly(2) | ||
) | ||
}) | ||
}) | ||
it('should dispatch to the registered handler', () => { | ||
describe('when a handled message is received with attributes', () => { | ||
it('should receive the attributes', () => { | ||
messageLogger.verify( | ||
m => m.log(event), | ||
m => m.log(It.isObjectWith(attributes)), | ||
Times.once() | ||
@@ -57,6 +76,2 @@ ) | ||
describe('when an unhandled message is received', () => { | ||
beforeAll(async () => { | ||
await serviceBus.send(command) | ||
}) | ||
it('should not handle the message', () => { | ||
@@ -63,0 +78,0 @@ messageLogger.verify( |
import { Message } from '@node-ts/bus-messages' | ||
import { MessageOptions } from '../service-bus' | ||
import { MessageAttributes } from '../service-bus' | ||
import { ClassConstructor } from '@node-ts/logger-core' | ||
@@ -17,3 +17,3 @@ | ||
export interface Handler<TMessage extends Message> { | ||
handle (message: TMessage, messageOptions?: MessageOptions): Promise<void> | void | ||
handle (message: TMessage, messageOptions?: MessageAttributes): Promise<void> | void | ||
} |
@@ -70,3 +70,3 @@ # Handlers | ||
_: Command, | ||
messageOptions: MessageOptions<{ userId: string }> | ||
messageOptions: MessageAttributes<{ userId: string }> | ||
): Promise<void> { | ||
@@ -73,0 +73,0 @@ this.logger.info('Example consuming message headers', { userId: messageOptions.attributes.userId }) |
export { BUS_SYMBOLS } from './bus-symbols' | ||
export * from './bus-module' | ||
export * from './service-bus/bus' | ||
export * from './service-bus/message-options' | ||
export * from './service-bus/message-attributes' | ||
export { Transport, TransportMessage } from './transport' | ||
@@ -6,0 +6,0 @@ export * from './application-bootstrap' |
import { Event, Command } from '@node-ts/bus-messages' | ||
import { MessageOptions } from './message-options' | ||
import { MessageAttributes } from './message-attributes' | ||
@@ -12,6 +12,6 @@ export enum BusState { | ||
export interface Bus { | ||
publish<EventType extends Event> (event: EventType, messageOptions?: MessageOptions): Promise<void> | ||
send<CommandType extends Command> (command: CommandType, messageOptions?: MessageOptions): Promise<void> | ||
publish<EventType extends Event> (event: EventType, messageOptions?: MessageAttributes): Promise<void> | ||
send<CommandType extends Command> (command: CommandType, messageOptions?: MessageAttributes): Promise<void> | ||
start (): Promise<void> | ||
stop (): Promise<void> | ||
} |
export * from './bus' | ||
export * from './service-bus' | ||
export * from './message-options' | ||
export * from './message-attributes' |
import { injectable, inject } from 'inversify' | ||
import autobind from 'autobind-decorator' | ||
import { Bus, BusState } from './bus' | ||
import { BUS_SYMBOLS } from '../bus-symbols' | ||
import { BUS_SYMBOLS, BUS_INTERNAL_SYMBOLS } from '../bus-symbols' | ||
import { Transport } from '../transport' | ||
@@ -9,5 +9,6 @@ import { Event, Command, Message } from '@node-ts/bus-messages' | ||
import { sleep } from '../util' | ||
import { HandlerRegistry } from '../handler' | ||
import { HandlerRegistry, HandlerRegistration } from '../handler' | ||
import * as serializeError from 'serialize-error' | ||
import { MessageOptions } from './message-options' | ||
import { MessageAttributes } from './message-attributes' | ||
import { SessionScopeBinder } from '../bus-module' | ||
@@ -26,3 +27,4 @@ const EMPTY_QUEUE_SLEEP_MS = 500 | ||
@inject(LOGGER_SYMBOLS.Logger) private readonly logger: Logger, | ||
@inject(BUS_SYMBOLS.HandlerRegistry) private readonly handlerRegistry: HandlerRegistry | ||
@inject(BUS_SYMBOLS.HandlerRegistry) private readonly handlerRegistry: HandlerRegistry, | ||
@inject(BUS_SYMBOLS.MessageHandlingContext) private readonly messageHandlingContext: MessageAttributes | ||
) { | ||
@@ -33,6 +35,7 @@ } | ||
event: TEvent, | ||
messageOptions: MessageOptions = new MessageOptions() | ||
messageOptions: MessageAttributes = new MessageAttributes() | ||
): Promise<void> { | ||
this.logger.debug('publish', { event }) | ||
return this.transport.publish(event, messageOptions) | ||
const transportOptions = this.prepareTransportOptions(messageOptions) | ||
return this.transport.publish(event, transportOptions) | ||
} | ||
@@ -42,6 +45,7 @@ | ||
command: TCommand, | ||
messageOptions: MessageOptions = new MessageOptions() | ||
messageOptions: MessageAttributes = new MessageAttributes() | ||
): Promise<void> { | ||
this.logger.debug('send', { command }) | ||
return this.transport.send(command, messageOptions) | ||
const transportOptions = this.prepareTransportOptions(messageOptions) | ||
return this.transport.send(command, transportOptions) | ||
} | ||
@@ -96,3 +100,3 @@ | ||
try { | ||
await this.dispatchMessageToHandlers(message.domainMessage, message.options) | ||
await this.dispatchMessageToHandlers(message.domainMessage, message.attributes) | ||
this.logger.debug('Message dispatched to all handlers', { message }) | ||
@@ -116,3 +120,3 @@ await this.transport.deleteMessage(message) | ||
private async dispatchMessageToHandlers (message: Message, messageOptions: MessageOptions): Promise<void> { | ||
private async dispatchMessageToHandlers (message: Message, context: MessageAttributes): Promise<void> { | ||
const handlers = this.handlerRegistry.get(message.$name) | ||
@@ -124,7 +128,43 @@ if (handlers.length === 0) { | ||
const handlersToInvoke = handlers.map(h => h.resolveHandler(h.defaultContainer)) | ||
await Promise.all(handlersToInvoke.map(async h => { | ||
await h.handle(message, messageOptions) | ||
})) | ||
const handlersToInvoke = handlers.map(handler => dispatchMessageToHandler( | ||
message, | ||
context, | ||
handler | ||
)) | ||
await Promise.all(handlersToInvoke) | ||
} | ||
private prepareTransportOptions (clientOptions: MessageAttributes): MessageAttributes { | ||
const result: MessageAttributes = { | ||
correlationId: clientOptions.correlationId || this.messageHandlingContext.correlationId, | ||
attributes: clientOptions.attributes, | ||
stickyAttributes: { | ||
...clientOptions.stickyAttributes, | ||
...this.messageHandlingContext.stickyAttributes | ||
} | ||
} | ||
return result | ||
} | ||
} | ||
async function dispatchMessageToHandler ( | ||
message: Message, | ||
context: MessageAttributes, | ||
handlerRegistration: HandlerRegistration<Message> | ||
): Promise<void> { | ||
const container = handlerRegistration.defaultContainer | ||
const childContainer = container.createChild() | ||
childContainer | ||
.bind<MessageAttributes>(BUS_SYMBOLS.MessageHandlingContext) | ||
.toConstantValue(context) | ||
const sessionScopeBinder = container.get<SessionScopeBinder>(BUS_INTERNAL_SYMBOLS.SessionScopeBinder) | ||
// tslint:disable-next-line:no-unsafe-any | ||
sessionScopeBinder(childContainer.bind.bind(childContainer)) | ||
const handler = handlerRegistration.resolveHandler(childContainer) | ||
handler.handle(message, context) | ||
} |
@@ -1,9 +0,9 @@ | ||
import { Message } from '@node-ts/bus-messages' | ||
import { HandlesMessage } from '../handler/handles-message' | ||
import { TestEvent } from './test-event' | ||
import { inject } from 'inversify' | ||
import { MessageAttributes } from '../service-bus' | ||
export const MESSAGE_LOGGER = Symbol.for('@node-ts/bus-core/message-logger') | ||
export interface MessageLogger { | ||
log (message: Message): void | ||
log (message: unknown): void | ||
} | ||
@@ -19,5 +19,6 @@ | ||
async handle (testEvent: TestEvent): Promise<void> { | ||
async handle (testEvent: TestEvent, attributes: MessageAttributes): Promise<void> { | ||
this.messageLogger.log(testEvent) | ||
this.messageLogger.log(attributes) | ||
} | ||
} |
@@ -7,3 +7,3 @@ import { MemoryQueue, InMemoryMessage, RETRY_LIMIT } from './memory-queue' | ||
import { HandlerRegistry } from '../handler' | ||
import { MessageOptions } from '../service-bus' | ||
import { MessageAttributes } from '../service-bus' | ||
import * as faker from 'faker' | ||
@@ -18,3 +18,3 @@ | ||
const handledMessageNames = [TestCommand.NAME, TestEvent.NAME] | ||
const messageOptions: MessageOptions = { | ||
const messageOptions: MessageAttributes = { | ||
correlationId: faker.random.uuid() | ||
@@ -21,0 +21,0 @@ } |
@@ -7,3 +7,3 @@ import { injectable, inject } from 'inversify' | ||
import { HandlerRegistry } from '../handler' | ||
import { MessageOptions } from '../service-bus' | ||
import { MessageAttributes } from '../service-bus' | ||
@@ -60,7 +60,7 @@ export const RETRY_LIMIT = 10 | ||
async publish<TEvent extends Event> (event: TEvent, messageOptions: MessageOptions): Promise<void> { | ||
async publish<TEvent extends Event> (event: TEvent, messageOptions: MessageAttributes): Promise<void> { | ||
this.addToQueue(event, messageOptions) | ||
} | ||
async send<TCommand extends Command> (command: TCommand, messageOptions: MessageOptions): Promise<void> { | ||
async send<TCommand extends Command> (command: TCommand, messageOptions: MessageAttributes): Promise<void> { | ||
this.addToQueue(command, messageOptions) | ||
@@ -115,3 +115,3 @@ } | ||
private addToQueue (message: Message, messageOptions: MessageOptions): void { | ||
private addToQueue (message: Message, messageOptions: MessageAttributes): void { | ||
if (this.messagesWithHandlers[message.$name]) { | ||
@@ -129,3 +129,3 @@ const transportMessage = toTransportMessage(message, messageOptions, false) | ||
message: Message, | ||
messageOptions: MessageOptions, | ||
messageOptions: MessageAttributes, | ||
isProcessing: boolean | ||
@@ -136,3 +136,3 @@ ): TransportMessage<InMemoryMessage> { | ||
domainMessage: message, | ||
options: messageOptions, | ||
attributes: messageOptions, | ||
raw: { | ||
@@ -139,0 +139,0 @@ seenCount: 0, |
import { Message } from '@node-ts/bus-messages' | ||
import { MessageOptions } from '../service-bus' | ||
import { MessageAttributes } from '../service-bus' | ||
@@ -27,3 +27,3 @@ /** | ||
*/ | ||
options: MessageOptions | ||
attributes: MessageAttributes | ||
} |
import { Event, Command } from '@node-ts/bus-messages' | ||
import { TransportMessage } from './transport-message' | ||
import { HandlerRegistry } from '../handler' | ||
import { MessageOptions } from '../service-bus' | ||
import { MessageAttributes } from '../service-bus' | ||
@@ -17,3 +17,3 @@ /** | ||
*/ | ||
publish<TEvent extends Event> (event: TEvent, messageOptions: MessageOptions): Promise<void> | ||
publish<TEvent extends Event> (event: TEvent, messageOptions: MessageAttributes): Promise<void> | ||
@@ -27,3 +27,3 @@ /** | ||
*/ | ||
send<TCommand extends Command> (command: TCommand, messageOptions: MessageOptions): Promise<void> | ||
send<TCommand extends Command> (command: TCommand, messageOptions: MessageAttributes): Promise<void> | ||
@@ -30,0 +30,0 @@ /** |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
123887
2312
8