Security News
Opengrep Emerges as Open Source Alternative Amid Semgrep Licensing Controversy
Opengrep forks Semgrep to preserve open source SAST in response to controversial licensing changes.
amqp-connection-manager
Advanced tools
The amqp-connection-manager package is a Node.js library that provides a high-level API for managing AMQP (Advanced Message Queuing Protocol) connections. It simplifies the process of connecting to AMQP brokers like RabbitMQ, handling reconnections, and managing channels.
Creating a Connection
This feature allows you to create a connection to an AMQP broker. The connection manager handles reconnections automatically.
const amqp = require('amqp-connection-manager');
const connection = amqp.connect(['amqp://localhost']);
connection.on('connect', () => console.log('Connected!'));
connection.on('disconnect', params => console.log('Disconnected.', params.err.stack));
Creating a Channel
This feature allows you to create a channel and set up queues, exchanges, and bindings. The channel wrapper also supports JSON message encoding.
const amqp = require('amqp-connection-manager');
const connection = amqp.connect(['amqp://localhost']);
const channelWrapper = connection.createChannel({
json: true,
setup: channel => channel.assertQueue('my-queue', { durable: true })
});
channelWrapper.sendToQueue('my-queue', { hello: 'world' });
Handling Messages
This feature allows you to consume messages from a queue. The channel wrapper handles message acknowledgments and other channel-level operations.
const amqp = require('amqp-connection-manager');
const connection = amqp.connect(['amqp://localhost']);
const channelWrapper = connection.createChannel({
setup: channel => channel.consume('my-queue', msg => {
console.log(msg.content.toString());
channel.ack(msg);
})
});
amqplib is a lower-level library for interacting with AMQP brokers. It provides more granular control over AMQP operations but requires more boilerplate code compared to amqp-connection-manager.
rhea is a flexible AMQP 1.0 client for Node.js. It supports a wide range of AMQP features and is suitable for advanced use cases. However, it is more complex to use than amqp-connection-manager.
rascal is a configuration-driven library for working with RabbitMQ. It provides a higher-level API similar to amqp-connection-manager but focuses more on configuration and policy management.
Connection management for amqplib. This is a wrapper around amqplib which provides automatic reconnects.
npm install --save amqplib amqp-connection-manager
The basic idea here is that, usually, when you create a new channel, you do some setup work at the beginning (like asserting that various queues or exchanges exist, or binding to queues), and then you send and receive messages and you never touch that stuff again.
amqp-connection-manager will reconnect to a new broker whenever the broker it is
currently connected to dies. When you ask amqp-connection-manager for a
channel, you specify one or more setup
functions to run; the setup functions
will be run every time amqp-connection-manager reconnects, to make sure your
channel and broker are in a sane state.
Before we get into an example, note this example is written using Promises, however much like amqplib, any function which returns a Promise will also accept a callback as an optional parameter.
Here's the example:
var amqp = require('amqp-connection-manager');
// Create a new connection manager
var connection = amqp.connect(['amqp://localhost']);
// Ask the connection manager for a ChannelWrapper. Specify a setup function to
// run every time we reconnect to the broker.
var channelWrapper = connection.createChannel({
json: true,
setup: function (channel) {
// `channel` here is a regular amqplib `ConfirmChannel`.
// Note that `this` here is the channelWrapper instance.
return channel.assertQueue('rxQueueName', { durable: true });
},
});
// Send some messages to the queue. If we're not currently connected, these will be queued up in memory
// until we connect. Note that `sendToQueue()` and `publish()` return a Promise which is fulfilled or rejected
// when the message is actually sent (or not sent.)
channelWrapper
.sendToQueue('rxQueueName', { hello: 'world' })
.then(function () {
return console.log('Message was sent! Hooray!');
})
.catch(function (err) {
return console.log('Message was rejected... Boo!');
});
Sometimes it's handy to modify a channel at run time. For example, suppose you have a channel that's listening to one kind of message, and you decide you now also want to listen to some other kind of message. This can be done by adding a new setup function to an existing ChannelWrapper:
channelWrapper.addSetup(function (channel) {
return Promise.all([
channel.assertQueue('my-queue', { exclusive: true, autoDelete: true }),
channel.bindQueue('my-queue', 'my-exchange', 'create'),
channel.consume('my-queue', handleMessage),
]);
});
addSetup()
returns a Promise which resolves when the setup function is
finished (or immediately, if the underlying connection is not currently
connected to a broker.) There is also a removeSetup(setup, teardown)
which
will run teardown(channel)
if the channel is currently connected to a broker
(and will not run teardown
at all otherwise.) Note that setup
and teardown
must either accept a callback or return a Promise.
See a complete example in the examples folder.
Creates a new AmqpConnectionManager, which will connect to one of the URLs provided in urls
. If a broker is
unreachable or dies, then AmqpConnectionManager will try the next available broker, round-robin.
Options:
options.heartbeatIntervalInSeconds
- Interval to send heartbeats to broker. Defaults to 5 seconds.options.reconnectTimeInSeconds
- The time to wait before trying to reconnect. If not specified,
defaults to heartbeatIntervalInSeconds
.options.findServers(callback)
is a function which returns one or more servers to connect to. This should
return either a single URL or an array of URLs. This is handy when you're using a service discovery mechanism.
such as Consul or etcd. Instead of taking a callback
, this can also return a Promise. Note that if this
is supplied, then urls
is ignored.options.connectionOptions
is passed as options to the amqplib connect method.connect({connection, url})
- Emitted whenever we successfully connect to a broker.connectFailed({err, url})
- Emitted whenever we attempt to connect to a broker, but fail.disconnect({err})
- Emitted whenever we disconnect from a broker.blocked({reason})
- Emitted whenever a connection is blocked by a brokerunblocked
- Emitted whenever a connection is unblocked by a brokerCreate a new ChannelWrapper. This is a proxy for the actual channel (which may or may not exist at any moment, depending on whether or not we are currently connected.)
Options:
options.name
- Name for this channel. Used for debugging.options.setup(channel, [cb])
- A function to call whenever we reconnect to the
broker (and therefore create a new underlying channel.) This function should
either accept a callback, or return a Promise. See addSetup
below.
Note that this
inside the setup function will the returned ChannelWrapper.
The ChannelWrapper has a special context
member you can use to store
arbitrary data in.options.json
- if true, then ChannelWrapper assumes all messages passed to publish()
and sendToQueue()
are plain JSON objects. These will be encoded automatically before being sent.options.confirm
- if true (default), the created channel will be a ConfirmChanneloptions.publishTimeout
- a default timeout for messages published to this channel.Returns true if the AmqpConnectionManager is connected to a broker, false otherwise.
Close this AmqpConnectionManager and free all associated resources.
connect
- emitted every time this channel connects or reconnects.error(err, {name})
- emitted if an error occurs setting up the channel.close
- emitted when this channel closes via a call to close()
Adds a new 'setup handler'.
setup(channel, [cb])
is a function to call when a new underlying channel is created - handy for asserting
exchanges and queues exists, and whatnot. The channel
object here is a ConfirmChannel from amqplib.
The setup
function should return a Promise (or optionally take a callback) - no messages will be sent until
this Promise resolves.
If there is a connection, setup()
will be run immediately, and the addSetup Promise/callback won't resolve
until setup
is complete. Note that in this case, if the setup throws an error, no 'error' event will
be emitted, since you can just handle the error here (although the setup
will still be added for future
reconnects, even if it throws an error.)
Setup functions should, ideally, not throw errors, but if they do then the ChannelWrapper will emit an 'error' event.
Removes a setup handler. If the channel is currently connected, will call teardown(channel)
, passing in the
underlying amqplib ConfirmChannel. teardown
should either take a callback or return a Promise.
These work exactly like their counterparts in amqplib's Channel, except that they return a Promise (or accept a
callback) which resolves when the message is confirmed to have been delivered to the broker. The promise rejects if
either the broker refuses the message, or if close()
is called on the ChannelWrapper before the message can be
delivered.
Both of these functions take an additional option when passing options:
timeout
- If specified, if a messages is not acked by the amqp broker within the specified number of milliseconds,
the message will be rejected. Note that the message may still end up getting delivered after the timeout, as we
have no way to cancel the in-flight request.These are just aliases for calling ack()
and nack()
on the underlying channel. They do nothing if the underlying
channel is not connected.
Returns a count of messages currently waiting to be sent to the underlying channel.
Close a channel, clean up resources associated with it.
FAQs
Auto-reconnect and round robin support for amqplib.
The npm package amqp-connection-manager receives a total of 320,315 weekly downloads. As such, amqp-connection-manager popularity was classified as popular.
We found that amqp-connection-manager demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Opengrep forks Semgrep to preserve open source SAST in response to controversial licensing changes.
Security News
Critics call the Node.js EOL CVE a misuse of the system, sparking debate over CVE standards and the growing noise in vulnerability databases.
Security News
cURL and Go security teams are publicly rejecting CVSS as flawed for assessing vulnerabilities and are calling for more accurate, context-aware approaches.