@node-ts/bus-core
![CircleCI](https://circleci.com/gh/node-ts/bus/tree/master.svg?style=svg)
![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)
The core messaging framework. This package provides an in-memory queue and persistence by default, but is designed to be used with other @node-ts/bus-* packages that provide compatibility with other transports (SQS, RabbitMQ, Azure Queues) and persistence technologies (PostgreSQL, SQL Server, Oracle).
Installation
Download and install the packages:
npm i inversify @node-ts/bus-core --save
Load BusModule
into your application's inversify container:
import { Container } from 'inversify'
import { BusModule } from '@node-ts/bus-core'
export class ApplicationContainer extends Container {
constructor () {
this.load(new BusModule())
}
}
Register a message handler
Messages are handled by defining and registering a handler class. Each time a message is received by the application, it will be dispatched to each of the registered handlers.
Define the handler:
import { injectable } from 'inversify'
import { HandlesMessage, Handler } from '@node-ts/bus-core'
import { SendEmail } from 'my-corporation/commands'
import { SERVICE_SYMBOLS, EmailService } from '../services'
@HandlesMessage(SendEmail)
export class SendEmailHandler implements Handler<SendEmail> {
constructor (
@inject(SERVICE_SYMBOLS.EmailService) private readonly emailService: EmailService
) {
}
async handle (sendEmailCommand: SendEmail): Promise<void> {
await this.emailService.send(
sendEmailCommand.to,
sendEmailCommand.title,
sendEmailCommand.body
)
}
}
Register the handler:
import { inject, injectable } from 'inversify'
import { BUS_SYMBOLS, ApplicationBootstrap, Bus } from '@node-ts/bus-core'
@injectable()
export class Application {
constructor (
@inject(BUS_SYMBOLS.ApplicationBootstrap) private readonly applicationBootstrap: ApplicationBootstrap,
@inject(BUS_SYMBOLS.Bus) private readonly bus: Bus
) {
}
async initialize (): Promise<void> {
this.applicationBootstrap.registerHandler(SendEmailHandler)
await this.applicationBootstrap.initialize()
}
async stop (): Promise<void> {
await this.bus.stop()
}
}
Hooks
Hooks are callback functions that are invoked each time an action occurs. These are commonly used to add in testing, logging or health probes centrally to the application.
Hooks can be added by calling .on()
on the bus. For example, to trigger a callback each time a message is attempted to be sent, use:
addHook (): void {
const bus = this.container.get<Bus>(BUS_SYMBOLS.Bus)
const callback = message => console.log('Sending', JSON.stringify(message))
bus.on('send', callback)
bus.off('send', callback)
}
Failing a Message
When an error is thrown whilst handling an error, the message is typically sent back to the queue so that it can be retried.
There are times when we know that a message will never succeed even if it were to be retried. In these situations we may not want to wait for our message to be retried before sending it to the dead letter queue, but instead bypass the retries and send it to the dead letter queue immediately.
This can be done by calling bus.fail()
from within the scope of a message handling context. This will instruct @node-ts/bus
to forward the currently handled message to the dead letter queue and remove it from the service queue.
Message handling concurrency
By default, @node-ts/bus
will run with a message handling concurrency of 1. This means that only a single message will be read off the queue and processed at a time.
To increase the message handling concurrency, provide your configuration like so:
import { Container } from 'inversify'
import { BusModule, BUS_SYMBOLS, BusConfiguration } from '@node-ts/bus-core'
const concurrency = 3
export class ApplicationContainer extends Container {
constructor () {
this.load(new BusModule())
this
.rebind<BusConfiguration>(BUS_SYMBOLS.BusConfiguration)
.toConstantValue({ concurrency })
}
}