
Security News
Feross on TBPN: How North Korea Hijacked Axios
Socket CEO Feross Aboukhadijeh breaks down how North Korea hijacked Axios and what it means for the future of software supply chain security.
pxlt-rabbit-handler
Advanced tools
A generic class that handles RabbitMQ connection, consume and produce functionality.
A generic class which handles RabbitMQ connect, consume and produce functionalities.
npm install pxlt-rabbit-handler
A library that uses amqplib and provides out-of-the-box RabbitMQ related functionality: connect, consume, publish, disconnection recovery, and thus allowing for decoupling of business logic from Rabbit-related code, by emitting events to notify clients on any Rabbit related occurrences.
The 'connect' logic creates a single connection based on parameters provided in the object initialization.
Upon connect failure or connection closed event, an exponential backoff retry mechanism will be applied to re-obtain a connection. The maximum number of retry attempts and other relevant parameters can be set in the constructor. If not provided, defaults will be used as described in Class parameters section below.
The 'consume' logic contains a creation of a channel and consume on that channel. It accepts a callback function to be executed upon every message arrival.
It is based on a user-defined prefetch value (defaults to 1). The channel created during consume defaults to confirmation mode (using createConfirmChannel). To create a channel using createChannel, set 'confirm' parameter to false when calling consume.
One channel will be created per consumed queue.
Upon a message arrival, the provided processing callback will be invoked.
Upon a connection closed event, all non-cancelled consumed queues will be automatically restored by the class if connection successfully re-established.
In case of consume failure after a reconnection, connection will be closed and 'connectionClosed' event will be emitted, so it's important to listen to this event and is recommended to exit the app.
The 'publish' method accepts, apart from the mandatory parameters of exchange, routing key and message, an optional
'options' object that complies with amqplib's documentation. In addition, a 'timeout' option can be provided - if provided, an exception will be thrown if the timeout has passed before the server sent an ack/nack.
All publish/sendMessage calls will use one channel, which will be a separate channel than any consume activity.
The 'getMessage' method supports RabbitMQ's pull api, meaning pulling single messages from a specific queue.
All getMessage calls will use once channel, same as publish/sendMessage.
The 'getConnectionState' method returns the connection state: 'active', 'closed', 'error', 'blocked'.
const rabbit = require('pxlt-rabbit-handler');
const rabbitInst = new rabbit('amqp://<USERNAME>:<PASSWORD>@<CLUSTER_ENDPOINT>/<VHOST>', { heartbeat: 10, maxBackoffIntervalMs: 60000, logLevel: 'debug' });
// Consumer
// New incoming message
function processMessage1(message) {
logger.info(Buffer.from(message.message.content).toString('utf8'));
return Promise.resolve() // do business logic of msgs from queue1
.then(() => rabbitInst.ack(message))
.catch((err) => rabbitInst.nack(message));
}
function processMessage2(message) {
logger.info(Buffer.from(message.message.content).toString('utf8'));
return Promise.resolve() // do business logic of msgs from queue2
.then(() => rabbitInst.ack(message))
.catch((err) => rabbitInst.nack(message));
}
// If connection was closed and could not be re-established after maxBackoffCount or re-consume failed, exit
rabbitInst.on('connectionClosed', (err) => {
logger.error(`Failed to connect to rabbit: ${err}, Exiting...`);
process.exit(1);
});
rabbitInst.connect()
.then(() => {
const queues = [{ q: 'queueName1', cb: processMessage1 }, { q: 'queueName2', cb: processMessage2 }];
const options = { prefetch: 10 };
return Promise.all(queues.map((q) => rabbitInst.consume(q.q, q.cb, options).catch((err) => { logger.info(err); })));
})
.then(() => rabbitInst.cancelConsumer('queueName1'))
.catch((err) => {
logger.info(err);
});
// Publisher
rabbitInst.connect()
.then(() => rabbitInst.checkExchange('<EXCHANGE_NAME>'))
.then(() => rabbitInst.publish('<EXCHANGE_NAME>', '<ROUTING_KEY>', '<MESSAGE_BUFFER>', { timeout: 10000 }))
.then((res) => {
console.log(res);
return rabbitInst.gracefullyDisconnect();
})
.catch((err) => {
// error logic
});
npm run test
npm run test:integraion
Integration tests require an RMQ instance. There is a docker-compose.yaml that can lauch one. Alternativelly, any other instance may be used, but the needed configuration should be provided
For minikube users: PXLT_RMQ_HOST=$(minikube ip) npm run test:integration
FAQs
A generic class that handles RabbitMQ connection, consume and produce functionality.
We found that pxlt-rabbit-handler demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 3 open source maintainers collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Socket CEO Feross Aboukhadijeh breaks down how North Korea hijacked Axios and what it means for the future of software supply chain security.

Security News
OpenSSF has issued a high-severity advisory warning open source developers of an active Slack-based campaign using impersonation to deliver malware.

Research
/Security News
Malicious packages published to npm, PyPI, Go Modules, crates.io, and Packagist impersonate developer tooling to fetch staged malware, steal credentials and wallets, and enable remote access.