Security News
Input Validation Vulnerabilities Dominate MITRE's 2024 CWE Top 25 List
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
@appolo/bus
Advanced tools
bus module for appolo
built with rabbot
npm i @appolo/bus
key | Description | Type | Default |
---|---|---|---|
id | injection id | string | busProvider |
connection | AMQP connection string | string | null |
auto | true to auto initialize busProvider and start listen to events | boolean | true |
listener | true to register queue event handlers | boolean | true |
exchangeName | name of the exchange | string | |
queueName | name of the queue | string | |
appendEnv | append env name to queueName and exchangeName | boolean | true |
exchange | exchange options | object | {} |
queue | queue options | object | {} |
requestQueue | request queue options | object | {} |
replayQueue | request queue options or false to disable | object | {} |
key | Description | Type | Default |
---|---|---|---|
type | request queue options or false to disable | string | topic |
autoDelete | delete when consumer count goes to 0 | boolean | false |
durable | survive broker restarts | boolean | true |
persistent | persistent delivery, messages saved to disk | boolean | true |
alternate | define an alternate exchange | string | |
publishTimeout | timeout in milliseconds for publish calls to this exchange | 2^32 | |
replyTimeout | timeout in milliseconds to wait for a reply | 2^32 | |
limit | the number of unpublished messages to cache while waiting on connection | 2^16 | |
noConfirm | prevents rabbot from creating the exchange in confirm mode | boolean | false |
key | Description | Type | Default |
---|---|---|---|
autoDelete | delete when consumer count goes to 0 | boolean | false |
durable | survive broker restarts | boolean | true |
subscribe | auto-start the subscription | boolean | false |
limit | max number of unacked messages allowed for consumer | 2^16 | 1 |
noAck | the server will remove messages from the queue as soon as they are delivered | boolean | false |
noBatch | causes ack, nack & reject to take place immediately | boolean | false |
noCacheKeys | disable cache of matched routing keys to prevent unbounded memory growth | boolean | false |
queueLimit | max number of ready messages a queue can hold | 2^32 | |
messageTt | time in ms before a message expires on the queue | 2^32 | |
expires | time in ms before a queue with 0 consumers expires | 2^32 | |
in config/modules/all.ts |
import {PubSubModule} from '@appolo/pubsub';
export = async function (app: App) {
await app.module(new BusModule({redis:"amqp://connection-string"}));
}
import {define, singleton} from 'appolo'
import {publisher} from "@appolo/bus";
@define()
@singleton()
export class SomePublisher {
@publisher("test")
async publish(data: any): Promise<any> {
return data
}
}
Or with BusProvider
@define()
@singleton()
export class SomePublisher {
inject() busProvider:BusProvider
publish(data:any): Promise<any> {
return this.busProvider.publish("test",data)
}
}
if you don not call msg ack or nack
it will be called on handler return msg.ack()
or msg.nack()
on error
import {define, singleton} from 'appolo'
import {handler} from "@appolo/bus";
@define()
@singleton()
export class SomeHandler {
@handler("test")
handle(msg: IMessage<data>) {
//do something
}
@handler("someName")
handle(msg: IMessage<data>) {
try{
//do some thing
msg.ack();
}
catch(){
msg.nack();
}
}
}
import {define, singleton} from 'appolo'
import {request} from "@appolo/bus";
@define()
@singleton()
export class SomePublisher {
@request("test")
async getData(data: any): Promise<any> {
return data
}
public async handleData(){
let data = await this.getData({userId:1})
}
}
Or with BusProvider
@define()
@singleton()
export class SomePublisher {
inject() busProvider:busProvider
publish(data:any): Promise<any> {
let data = await this.busProvider.request("test",data)
return data;
}
}
import {define, singleton} from 'appolo'
import {handler} from "@appolo/bus";
@define()
@singleton()
export class SomeHandler {
inject() busProvider:busProvider
@reply("test")
handle(msg: IMessage<data>) {
return {userId:1}
}
// or reply methods
@reply("someName")
handle(msg: IMessage<data>) {
try{
//get some data
msg.replySuccess(msg,{userId:1})
}
catch(){
msg.replyError(msg,e)
}
}
}
each handler and reply handler called with message object
{
// metadata specific to routing & delivery
fields: {
consumerTag: "", // identifies the consumer to rabbit
deliveryTag: #, // identifies the message delivered for rabbit
redelivered: true|false, // indicates if the message was previously nacked or returned to the queue
exchange: "" // name of exchange the message was published to,
routingKey: "" // the routing key (if any) used when published
},
properties:{
contentType: "application/json", // see serialization for how defaults are determined
contentEncoding: "utf8", // rabbot's default
headers: {}, // any user provided headers
correlationId: "", // the correlation id if provided
replyTo: "", // the reply queue would go here
messageId: "", // message id if provided
type: "", // the type of the message published
appId: "" // not used by rabbot
},
content: { "type": "Buffer", "data": [ ... ] }, // raw buffer of message body
body: , // this could be an object, string, etc - whatever was published
type: "" // this also contains the type of the message published
}
message.ack()
Enqueues the message for acknowledgement.
message.nack()
Enqueues the message for rejection. This will re-enqueue the message.
message.reject()
Rejects the message without re-queueing it. Please use with caution and consider having a dead-letter-exchange assigned to the queue before using this feature.
message.reply( data:any )
Acknowledges the messages and sends the message back to the requestor.
message.replySuccess( data:T )
reply the message with json object {success: true,data}
message.replyError( e: RequestError<T> )
reply the message with json object {success: false,message: e.message, data:e.data}
initialize busProvider and start listen to events if not in in auto
mode
publish(type: string, data: any, expire?: number): Promise<void>
publish event
request<T>(type: string, data: any, expire?: number): Promise<T>
request data by event return promise with event response
close<T>(): Promise<void>
close the connection and clean all handlers
getQueueMessagesCount(): Promise<number>
return number of pending events in the queue
FAQs
appolo bus module
The npm package @appolo/bus receives a total of 80 weekly downloads. As such, @appolo/bus popularity was classified as not popular.
We found that @appolo/bus demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
Security News
In this segment of the Risky Business podcast, Feross Aboukhadijeh and Patrick Gray discuss the challenges of tracking malware discovered in open source softare.
Research
Security News
A threat actor's playbook for exploiting the npm ecosystem was exposed on the dark web, detailing how to build a blockchain-powered botnet.