Socket
Socket
Sign inDemoInstall

amqp-cacoon

Package Overview
Dependencies
29
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

    amqp-cacoon

AmqpCacoon is an abstraction around amqplib that provides a simple interface with flow control included out of the box


Version published
Weekly downloads
98
decreased by-24.03%
Maintainers
1
Created
Weekly downloads
 

Readme

Source

AmqpCacoon

CircleCI License: MIT TypeScript semantic-release

Overview

This is a basic library to provide amqp support. Originally, this library was a wrapper around amqplib. It has since been updated to work with node-amqp-connection-manager, which provides support for behind-the-scenes retries on network failure. Node-amqp-connection-manager guarantees receipt of published messages and provides wrappers around potentially non-persistent channels.

Features

  • Simple interace around node-amqp-manager
  • Publish flow control included out of the box (Wait for drain event if we can't publish)
  • timeout if drain event does not occurs after some amount of time when channel is not ready to receive a publish~. As of 9/26, the publish on drain functionality has been removed, as node-amqp-manager does not support it at this time (pending a bugfix).
  • Consume single or batch of messages
  • Automatically handles reconnect if AMQP connection is lost and re-established
  • Caches published messages if they are published while AMQP is disconnected

Requirements to tests

  1. docker
  2. docker-compose

If using macos brew cask install docker should install both docker and docker-compose

Running Tests

  1. Start docker-compose - This starts RabbitMq
    docker-compose up -d
    
  2. Install node modules (This also loads local modules from our own repositories)
    npm install
    
  3. Run tests
    npm run test
    

Simple Usage

Connect

This allows you to connect to an amqp server

const config = {
  messageBus: {
    // Protocol should be "amqp" or "amqps"
    protocol: 'amqp',
    // Username + Password on the RabbitMQ host
    username: 'valtech',
    password: 'iscool',
    // Host
    host: 'localhost',
    // Port
    port: 5672,
    // Queue setup
    testQueue: 'test-queue',
  },
};

let amqpCacoon = new AmqpCacoon({
  protocol: config.messageBus.protocol,
  username: config.messageBus.username,
  password: config.messageBus.password,
  host: config.messageBus.host,
  port: config.messageBus.port,
  amqp_opts: {},
  providers: {
    logger: logger,
  },
});

Publish

This allows you to publish a message

let channel = await amqpCacoon.getPublishChannel(); // Connects and sets up a publish channel

// Create queue and setup publish channel
channel.assertQueue(config.messageBus.testQueue);

// Publish
await amqpCacoon.publish(
  '', // Publish directly to queue
  config.messageBus.testQueue,
  Buffer.from('TestMessage')
);

Consume Single Message

This will allow use to consume a single message.

let channel = await amqpCacoon.getConsumerChannel(); // Connects and sets up a subscription channel

// Create queue
channel.assertQueue(config.messageBus.testQueue);

// Consume single message at a time
amqpCacoon.registerConsumer(
  config.messageBus.testQueue,
  async (channel: Channel, msg: ConsumeMessage) => {
    try {
      console.log('Messsage content', msg.content.toString());
      // ... Do other processing here
      channel.ack(msg) // To ack a messages
    } catch (e) {
      channel.nack(msg) // To ack a messages
    }
  }
);

Consume Message Batch

This allows you to wait until either some time has elapsed or a specified message size has been exceeded before the messages are consumed

// Consume batch of message at a time. Configuration for time based or size based batching is provided
amqpCacoon.registerConsumerBatch(
  config.messageBus.testQueue,
  async (channel: Channel, msg: ConsumeBatchMessages) => {
    try {
      console.log('Messsage content', msg.content.toString());
      // ... Do other processing here
      msg.ackAll() // To ack all messages
    } catch (e) {
      msg.nackAll() // To nack all messages
    }
  },
  {
    batching: {
      maxTimeMs: 60000, // Don't provide messages to the callback until at least 60000 ms have passed
      maxSizeBytes: 1024 * 1024 * 2, // 1024 * 1024 * 2 = 2 MB -don't provide message to the callback until 2 MB of data have been received
    }
  }
);

Dealing With Channels via ChannelWrapper

This library exposes node-amqp-connection-manager's ChannelWrapper when you call either getConsumerChannel or getPublishChannel. Instead of exposing the Amqp Channel directly (which may or may not be valid depending on the network status), AmqpConnectionManager provides a ChannelWrapper class as an interface to interacting with the underlying channel. Most functions that can be performed on an AmqpLib Channel can be performed on the ChannelWrapper, including ackAll, nackAll, etc. though they are Promise-based. See AMQPConnectionManager's documentation for more info, as well as the underlying amqplib docs.

Just a couple thing that you should remember to do.

  1. Remember to ack or nack on all messages.
  2. An alternative is to pass an option into the registerConsumer to not require an ack (noAck). The problem with this is that if your application is reset or errors out, you may loose the message or messages.

Amqp-connection-manager Setup function

AmqpConnectionManager allows a setup function to be passed in its configuration, or added to a ChannelWrapper at any point. This function can be used with callbacks or Promises and direclty exposes the underlying AMQP channel as a ConfirmChannel (an extension of Channel) (since we know it is valid at that point). The setup function is useful for asserting queues and performing other necessary tasks that must be completed once a valid connection to amqp is made. Again, see AMQPConnectionManager's documentation for more details.

FAQs

Last updated on 21 Oct 2020

Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc