AMQP Message Bus
Node.js message bus interface for AMQP servers, such as RabbitMQ.
data:image/s3,"s3://crabby-images/55aeb/55aeb49a5b1e8557ecdaba0a38091361f5fdd7e3" alt="npm version"
Features
- Message bus API hides the complexity of AMQP connectors;
- Supports symmetric message encryption;
- Works pefectly fine with async/await.
Installation
$ npm install amqp-message-bus
Requirements
Quick start
Install amqp-message-bus
from npm.
$ npm install amqp-message-bus --save
Create new message bus.
const MessageBus = require('amqp-message-bus');
const bus = new MessageBus({
queue: 'tasks',
url: 'amqp://localhost',
encryptionKey: 'keep-it-safe'
});
Connect to AMQP server and subscribe for messages.
bus.connect()
.then(() => bus.subscribe((msg, props, done) => {
console.log(`Received message ${props.messageId} with priority ${props.priority}, published on ${props.timestamp}`);
done();
}))
.then((unsubscribe) => unsubscribe())
.catch((err) => console.error(err));
The same looks much better using async/await.
await bus.connect();
const unsubscribe = await bus.subscribe((msg, props, done) => {
console.log(`Received message ${props.messageId} with priority ${props.priority}, published on ${props.timestamp}`);
done();
});
await unsubscribe();
Connect to AMQP server, publish message and immediately disconnect.
bus.connect()
.then(() => bus.publish({ foo: 1, bar: 2 }))
.catch((err) => console.error(err))
.finally(() => bus.disconnect);
API Docs
#constructor(spec) -> MessageBus
Constructs new message bus with the supplied properties.
Arguments
spec
(Object) message bus properties (required).
spec.url
(string) AMQP server URL (required).spec.queue
(string) the name of the queue to subscribe to (required).spec.encryptionKey
(string) encryption key to use with assymetric encryption (optional). Signifies no encryption if left unspecified.
Example
const bus = new MessageBus({
queue: 'tasks',
url: 'amqp://localhost',
encryptionKey: 'keep-it-safe'
});
#connect() -> Promise
Connects to AMQP server using the connection properties specified at construction time.
Returns
Returns a native Promise.
Example
bus.connect()
.then(() => {
console.log('Connected to amqp server');
})
.catch((err) => {
console.error(err);
});
#disconnect() -> Promise
Disconnects from AMQP server.
Returns
Returns a native Promise.
Example
bus.disconnect()
.then(() => {
console.log('Disconnected from amqp server');
})
.catch((err) => {
console.error(err);
});
#subscribe(listener) -> Promise<Function>
Subscribes to the message bus for incoming messages.
Arguments
listener
(Function<Object, Object, Function>) listener function (required).
Listener function arguments
msg
(Object) message body (required).props
(Object) message meta-data (required).done
(Function) call done to signal message proccessing is done (required).
Please visit http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue for further info on props
meta-data.
Returns
Returns a native Promise resolving to an unsubscribe()
method.
Example
const listener = (msg, props, done) => {
console.log(JSON.stringify(msg, null, 2));
done();
};
bus.subscribe(listener)
.then((unsubscribe) => {
unsubscribe();
})
.catch((err) => {
console.error(err);
});
Example using async/await
const unsubscribe = await bus.subscribe((msg, props, done) => {
console.log(JSON.stringify(msg, null, 2));
done();
});
await unsubscribe();
#publish(msg, props) -> Promise<boolean>
Publishes the supplied message to the AMQP server.
Arguments
content
(*) message body (required); can be any JSON serializable value, e.g. Object, Array.props
(Object) message props (optional).
props.id
(string) message ID (optional; defaults to UUID v4
)props.priority
(integer) message priority, must be between 1 and 10 (optional; defaults to 1)props.timestamp
(number) message timestamp (optional; defaults to Date.now()
)props.type
(string) message type (optional)
Returns
Returns a native Promise resolving to a boolean value.
Example
bus.publish({ foo: 'bar' }, { type: 'nonsense', priority: 10 })
.catch((err) => {
console.error(err);
});
Example using async/await
await bus.publish({ foo: 'bar' }, { type: 'nonsense', priority: 10 });
Contribute
Source code contributions are most welcome. The following rules apply:
- Follow the Airbnb Style Guide;
- Make sure not to break the tests.
License
MIT