Appolo Bus Module
bus module for appolo
built with rabbot
Installation
npm i @appolo/bus
Options
key | Description | Type | Default |
---|
id | injection id | string | busProvider |
connection | AMQP connection string | string | null |
auto | true to auto initialize busProvider and start listen to events | boolean | true |
listener | true to register queue event handlers | boolean | true |
exchangeName | name of the exchange | string | |
queueName | name of the queue | string | |
appendEnv | append env name to queueName and exchangeName | boolean | true |
exchange | exchange options | object | {} |
queue | queue options | object | {} |
requestQueue | request queue options | object | {} |
replayQueue | request queue options or false to disable | object | {} |
Exchange Options
key | Description | Type | Default |
---|
type | request queue options or false to disable | string | topic |
autoDelete | delete when consumer count goes to 0 | boolean | false |
durable | survive broker restarts | boolean | true |
persistent | persistent delivery, messages saved to disk | boolean | true |
alternate | define an alternate exchange | string | |
publishTimeout | timeout in milliseconds for publish calls to this exchange | 2^32 | |
replyTimeout | timeout in milliseconds to wait for a reply | 2^32 | |
limit | the number of unpublished messages to cache while waiting on connection | 2^16 | |
noConfirm | prevents rabbot from creating the exchange in confirm mode | boolean | false |
Queue Options
key | Description | Type | Default |
---|
autoDelete | delete when consumer count goes to 0 | boolean | false |
durable | survive broker restarts | boolean | true |
subscribe | auto-start the subscription | boolean | false |
limit | max number of unacked messages allowed for consumer | 2^16 | 1 |
noAck | the server will remove messages from the queue as soon as they are delivered | boolean | false |
noBatch | causes ack, nack & reject to take place immediately | boolean | false |
noCacheKeys | disable cache of matched routing keys to prevent unbounded memory growth | boolean | false |
queueLimit | max number of ready messages a queue can hold | 2^32 | |
messageTt | time in ms before a message expires on the queue | 2^32 | |
expires | time in ms before a queue with 0 consumers expires | 2^32 | |
in config/modules/all.ts | | | |
import {PubSubModule} from '@appolo/pubsub';
export = async function (app: App) {
await app.module(new BusModule({redis:"amqp://connection-string"}));
}
Usage
Publisher
import {define, singleton} from 'appolo'
import {publisher} from "@appolo/bus";
@define()
@singleton()
export class SomePublisher {
@publisher("test")
async publish(data: any): Promise<any> {
return data
}
}
Or with BusProvider
@define()
@singleton()
export class SomePublisher {
inject() busProvider:BusProvider
publish(data:any): Promise<any> {
return this.busProvider.publish("test",data)
}
}
Handler
if you don not call msg ack or nack
it will be called on handler return msg.ack()
or msg.nack()
on error
import {define, singleton} from 'appolo'
import {handler} from "@appolo/bus";
@define()
@singleton()
export class SomeHandler {
@handler("test")
handle(msg: IMessage<data>) {
}
@handler("someName")
handle(msg: IMessage<data>) {
try{
msg.ack();
}
catch(){
msg.nack();
}
}
}
Request
import {define, singleton} from 'appolo'
import {request} from "@appolo/bus";
@define()
@singleton()
export class SomePublisher {
@request("test")
async getData(data: any): Promise<any> {
return data
}
public async handleData(){
let data = await this.getData({userId:1})
}
}
Or with BusProvider
@define()
@singleton()
export class SomePublisher {
inject() busProvider:busProvider
publish(data:any): Promise<any> {
let data = await this.busProvider.request("test",data)
return data;
}
}
Reply
import {define, singleton} from 'appolo'
import {handler} from "@appolo/bus";
@define()
@singleton()
export class SomeHandler {
inject() busProvider:busProvider
@reply("test")
handle(msg: IMessage<data>) {
return {userId:1}
}
@reply("someName")
handle(msg: IMessage<data>) {
try{
msg.replySuccess(msg,{userId:1})
}
catch(){
msg.replyError(msg,e)
}
}
}
IMessage
each handler and reply handler called with message object
{
fields: {
consumerTag: "",
deliveryTag: #,
redelivered: true|false,
exchange: ""
routingKey: ""
},
properties:{
contentType: "application/json",
contentEncoding: "utf8",
headers: {},
correlationId: "",
replyTo: "",
messageId: "",
type: "",
appId: ""
},
content: { "type": "Buffer", "data": [ ... ] },
body: ,
type: ""
}
message.ack()
Enqueues the message for acknowledgement.
message.nack()
Enqueues the message for rejection. This will re-enqueue the message.
message.reject()
Rejects the message without re-queueing it. Please use with caution and consider having a dead-letter-exchange assigned to the queue before using this feature.
message.reply( data:any )
Acknowledges the messages and sends the message back to the requestor.
message.replySuccess( data:T )
reply the message with json object {success: true,data}
message.replyError( e: RequestError<T> )
reply the message with json object {success: false,message: e.message, data:e.data}
BusProvider
initialize()
initialize busProvider and start listen to events if not in in auto
mode
publish(type: string, data: any, expire?: number): Promise<void>
publish event
- type - event name
- data - any data
- expire - timeout until the message is expired in the queue
request<T>(type: string, data: any, expire?: number): Promise<T>
request data by event return promise with event response
- type - event name
- data - any data
- expire - timeout until the request is rejected
close<T>(): Promise<void>
close the connection and clean all handlers
getQueueMessagesCount(): Promise<number>
return number of pending events in the queue