Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@appolo/bus

Package Overview
Dependencies
Maintainers
2
Versions
38
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@appolo/bus

appolo bus module

  • 8.1.5
  • latest
  • Source
  • npm
  • Socket score

Version published
Maintainers
2
Created
Source

Appolo Bus Module

bus module for appolo built with rabbot

Installation

npm i @appolo/bus

Options

keyDescriptionTypeDefault
idinjection idstringbusProvider
connectionAMQP connection stringstringnull
autotrue to auto initialize busProvider and start listen to eventsbooleantrue
listenertrue to register queue event handlersbooleantrue
exchangeNamename of the exchangestring
queueNamename of the queuestring
appendEnvappend env name to queueName and exchangeNamebooleantrue
exchangeexchange optionsobject{}
queuequeue optionsobject{}
requestQueuerequest queue optionsobject{}
replayQueuerequest queue options or false to disableobject{}

Exchange Options

keyDescriptionTypeDefault
typerequest queue options or false to disablestringtopic
autoDeletedelete when consumer count goes to 0booleanfalse
durablesurvive broker restartsbooleantrue
persistentpersistent delivery, messages saved to diskbooleantrue
alternatedefine an alternate exchangestring
publishTimeouttimeout in milliseconds for publish calls to this exchange2^32
replyTimeouttimeout in milliseconds to wait for a reply2^32
limitthe number of unpublished messages to cache while waiting on connection2^16
noConfirmprevents rabbot from creating the exchange in confirm modebooleanfalse

Queue Options

keyDescriptionTypeDefault
autoDeletedelete when consumer count goes to 0booleanfalse
durablesurvive broker restartsbooleantrue
subscribeauto-start the subscriptionbooleanfalse
limitmax number of unacked messages allowed for consumer2^161
noAckthe server will remove messages from the queue as soon as they are deliveredbooleanfalse
noBatchcauses ack, nack & reject to take place immediatelybooleanfalse
noCacheKeysdisable cache of matched routing keys to prevent unbounded memory growthbooleanfalse
queueLimitmax number of ready messages a queue can hold2^32
messageTttime in ms before a message expires on the queue2^32
expirestime in ms before a queue with 0 consumers expires2^32
in config/modules/all.ts
import {PubSubModule} from '@appolo/pubsub';

export = async function (app: App) {
   await app.module(new BusModule({redis:"amqp://connection-string"}));
}

Usage

Publisher

import {define, singleton} from 'appolo'
import {publisher} from "@appolo/bus";

@define()
@singleton()
export class SomePublisher {

    @publisher("test")
    async publish(data: any): Promise<any> {
        return data
    }
}

Or with BusProvider

@define()
@singleton()
export class SomePublisher {

    inject() busProvider:BusProvider

    publish(data:any): Promise<any> {
        return this.busProvider.publish("test",data)
    }
}

Handler

if you don not call msg ack or nack it will be called on handler return msg.ack() or msg.nack() on error

import {define, singleton} from 'appolo'
import {handler} from "@appolo/bus";

@define()
@singleton()
export class SomeHandler {

    @handler("test")
    handle(msg: IMessage<data>) {
       //do something
    }

    @handler("someName")
    handle(msg: IMessage<data>) {

        try{
           //do some thing

           msg.ack();
        }
        catch(){
            msg.nack();
        }
    }
}

Request

import {define, singleton} from 'appolo'
import {request} from "@appolo/bus";

@define()
@singleton()
export class SomePublisher {

    @request("test")
    async getData(data: any): Promise<any> {
        return data
    }

    public async handleData(){
        let data = await this.getData({userId:1})
    }


}

Or with BusProvider

@define()
@singleton()
export class SomePublisher {

    inject() busProvider:busProvider

    publish(data:any): Promise<any> {
        let data = await  this.busProvider.request("test",data)

        return data;
    }
}

Reply

import {define, singleton} from 'appolo'
import {handler} from "@appolo/bus";

@define()
@singleton()
export class SomeHandler {

    inject() busProvider:busProvider


    @reply("test")
    handle(msg: IMessage<data>) {
        return {userId:1}
    }

    // or reply methods
    @reply("someName")
    handle(msg: IMessage<data>) {

        try{
            //get some data
         msg.replySuccess(msg,{userId:1})
        }
        catch(){
            msg.replyError(msg,e)
        }
    }
}

IMessage

each handler and reply handler called with message object

{
  // metadata specific to routing & delivery
  fields: {
    consumerTag: "", // identifies the consumer to rabbit
    deliveryTag: #, // identifies the message delivered for rabbit
    redelivered: true|false, // indicates if the message was previously nacked or returned to the queue
    exchange: "" // name of exchange the message was published to,
    routingKey: "" // the routing key (if any) used when published
  },
  properties:{
    contentType: "application/json", // see serialization for how defaults are determined
    contentEncoding: "utf8", // rabbot's default
    headers: {}, // any user provided headers
    correlationId: "", // the correlation id if provided
    replyTo: "", // the reply queue would go here
    messageId: "", // message id if provided
    type: "", // the type of the message published
    appId: "" // not used by rabbot
  },
  content: { "type": "Buffer", "data": [ ... ] }, // raw buffer of message body
  body: , // this could be an object, string, etc - whatever was published
  type: "" // this also contains the type of the message published
}

message.ack()

Enqueues the message for acknowledgement.

message.nack()

Enqueues the message for rejection. This will re-enqueue the message.

message.reject()

Rejects the message without re-queueing it. Please use with caution and consider having a dead-letter-exchange assigned to the queue before using this feature.

message.reply( data:any )

Acknowledges the messages and sends the message back to the requestor.

message.replySuccess( data:T )

reply the message with json object {success: true,data}

message.replyError( e: RequestError<T> )

reply the message with json object {success: false,message: e.message, data:e.data}

BusProvider

initialize()

initialize busProvider and start listen to events if not in in auto mode

publish(type: string, data: any, expire?: number): Promise<void>

publish event

  • type - event name
  • data - any data
  • expire - timeout until the message is expired in the queue

request<T>(type: string, data: any, expire?: number): Promise<T>

request data by event return promise with event response

  • type - event name
  • data - any data
  • expire - timeout until the request is rejected

close<T>(): Promise<void>

close the connection and clean all handlers

getQueueMessagesCount(): Promise<number>

return number of pending events in the queue

Keywords

FAQs

Package last updated on 20 Jun 2023

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc