Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqp-message-bus

Package Overview
Dependencies
Maintainers
1
Versions
3
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp-message-bus

Node.js message bus interface for AMQP servers, such as RabbitMQ.

  • 1.1.1
  • Source
  • npm
  • Socket score

Version published
Maintainers
1
Created
Source

AMQP Message Bus

Node.js message bus interface for AMQP servers, such as RabbitMQ.

Build Status npm version

Features
  • Message bus API hides the complexity of AMQP connectors;
  • Supports symmetric message encryption;
  • Works pefectly fine with async/await.

Installation

$ npm install amqp-message-bus
Requirements
  • Node.js v.6+

Quick start

Install amqp-message-bus from npm.

$ npm install amqp-message-bus --save

Create new message bus.

const MessageBus = require('amqp-message-bus');

const bus = new MessageBus({
  queue: 'tasks',
  url: 'amqp://localhost',
  encryptionKey: 'keep-it-safe'
});

Connect to AMQP server and subscribe for messages.

bus.connect()
  .then(() => bus.subscribe((msg, props, done) => {
    // process msg + props
    console.log(`Received message ${props.messageId} with priority ${props.priority}, published on ${props.timestamp}`);
    // call done when message is done processing to remove from rabbitmq
    done();
  }))
  // call this when you want to unsubscribe...
  .then((unsubscribe) => unsubscribe())
  // always catch errors with promises :-)
  .catch((err) => console.error(err));

The same looks much better using async/await.

await bus.connect();

const unsubscribe = await bus.subscribe((msg, props, done) => {
  // process msg + props
  console.log(`Received message ${props.messageId} with priority ${props.priority}, published on ${props.timestamp}`);
  // call done when ready to remove message from rabbitmq
  done();
});

// call this when you want to unsubscribe...
await unsubscribe();

Connect to AMQP server, publish message and immediately disconnect.

bus.connect()
  .then(() => bus.publish({ foo: 1, bar: 2 }))
  .catch((err) => console.error(err))
  .finally(() => bus.disconnect);

API Docs

#constructor(spec) -> MessageBus

Constructs new message bus with the supplied properties.

Arguments
  1. spec (Object) message bus properties (required).
    • spec.url (string) AMQP server URL (required).
    • spec.queue (string) the name of the queue to subscribe to (required).
    • spec.encryptionKey (string) encryption key to use with assymetric encryption (optional). Signifies no encryption if left unspecified.
Example
const bus = new MessageBus({
  queue: 'tasks',
  url: 'amqp://localhost',
  encryptionKey: 'keep-it-safe'
});

#connect() -> Promise

Connects to AMQP server using the connection properties specified at construction time.

Returns

Returns a native Promise.

Example
bus.connect()
  .then(() => {
    console.log('Connected to amqp server');
  })
  .catch((err) => {
    console.error(err);
  });

#disconnect() -> Promise

Disconnects from AMQP server.

Returns

Returns a native Promise.

Example
bus.disconnect()
  .then(() => {
    console.log('Disconnected from amqp server');
  })
  .catch((err) => {
    console.error(err);
  });

#subscribe(listener) -> Promise<Function>

Subscribes to the message bus for incoming messages.

Arguments
  1. listener (Function<Object, Object, Function>) listener function (required).
Listener function arguments
  1. msg (Object) message body (required).
  2. props (Object) message meta-data (required).
  3. done (Function) call done to signal message proccessing is done (required).

Please visit http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue for further info on props meta-data.

Returns

Returns a native Promise resolving to an unsubscribe() method.

Example
const listener = (msg, props, done) => {
  // do something with msg and props
  console.log(JSON.stringify(msg, null, 2));
  // call done when you are done with msg to remove from queue
  done();
};

bus.subscribe(listener)
  .then((unsubscribe) => {
    // unsubscribe when ready
    unsubscribe();
  })
  .catch((err) => {
    console.error(err);
  });
Example using async/await
const unsubscribe = await bus.subscribe((msg, props, done) => {
  // do something with msg and props
  console.log(JSON.stringify(msg, null, 2));
  // call done when you are done with msg to remove from queue
  done();
});

// unsubscribe when ready
await unsubscribe();

#publish(msg, props) -> Promise<boolean>

Publishes the supplied message to the AMQP server.

Arguments
  1. content (*) message body (required); can be any JSON serializable value, e.g. Object, Array.
  2. props (Object) message props (optional).
    • props.id (string) message ID (optional; defaults to UUID v4)
    • props.priority (integer) message priority, must be between 1 and 10 (optional; defaults to 1)
    • props.timestamp (number) message timestamp (optional; defaults to Date.now())
    • props.type (string) message type (optional)
Returns

Returns a native Promise resolving to a boolean value.

Example
bus.publish({ foo: 'bar' }, { type: 'nonsense', priority: 10 })
  .catch((err) => {
    console.error(err);
  });
Example using async/await
await bus.publish({ foo: 'bar' }, { type: 'nonsense', priority: 10 });

Contribute

Source code contributions are most welcome. The following rules apply:

  1. Follow the Airbnb Style Guide;
  2. Make sure not to break the tests.

License

MIT

Keywords

FAQs

Package last updated on 11 May 2017

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc