Research
Security News
Malicious PyPI Package ‘pycord-self’ Targets Discord Developers with Token Theft and Backdoor Exploit
Socket researchers uncover the risks of a malicious Python package targeting Discord developers.
amqplib-rpc
Advanced tools
Thin Amplib utils for RabbitMQ RPC in Node.js. Uses replyTo
and correlationId
message properties as the RabbitMQ rpc tutorial suggests.
npm i --save amqplib-rpc
Make an rpc request, publish a message to an rpc queue.
properties.replyTo
and properties.correlationId
on request message./**
* @param {AmqplibConnection} connection rabbitmq connection
* @param {String} queue name of rpc-queue to send the message to
* @param {Buffer} content message content
* @param {Object} [opts] sendToQueue options
* @param {Object} [opts.timeout] timeout in ms, will reject w/ TimeoutError, default: undefined (no timeout)
* @param {Object} [opts.sendOpts] sendToQueue options
* @param {Object} [opts.queueOpts] assertQueue options for replyTo queue, queueOpts.exclusive defaults to true
* @param {Object} [opts.consumeOpts] consume options for replyTo queue, consumeOpts defaults to ''
* @param {Object} [opts.exchangeName] exchange name for request, exchangeName defaults to default exchange ('')
* @param {Function} [cb] optional callback, if using callback api
* @return {Promise} returns a promise, if using promise api
*/
// Promise api example
var amqplib = require('amqplib')
var request = require('amqplib-rpc').request
amqplib.connect().then(function (connection) {
return request(connection, 'multiply-queue', { a: 10, b: 20 }).then(function (replyMessage) {
console.log(replyMessage.content.toString()) // 200
})
})
.catch(...)
// Callback api example
var amqplib = require('amqplib/callback_api')
var request = require('amqplib-rpc').request
amqplib.connect(function (err, connection) {
if (err) throw err
var content = { a: 10, b: 20 } // gets converted to buffer automatically
// RPC request
request(connection, 'multiply-queue', content, function (err, replyMessage) {
if (err) throw err
console.log(replyMessage.content.toString()) // 200
})
})
// Timeout Error example
// triggered by timeout option
var amqplib = require('amqplib')
var request = require('amqplib-rpc').request
var TimeoutError = require('amqplib-rpc').TimeoutError
amqplib.connect(function (err, connection) {
if (err) throw err
var content = { a: 10, b: 20 } // gets converted to buffer automatically
var opts = { timeout: 100 }
// RPC request
request(connection, 'multiply-queue', content, opts, function (err, replyMessage) {
console.log(err) // [TimeoutError: 'rpc timed out']
console.log(err instanceof TimeoutError) // true
console.log(err.data)
/*
{
timeout: 100
queue: 'multiplyQueue',
content: { a: 10, b: 20 },
opts: { // shows default opts in since only timeout was passed
timeout: 100
sendOpts: {},
queueOpts: {
exclusive: true
},
consumeOpts: {
noAck: true
}
}
}
*/
})
})
// Channel Close Error example
// occurs if channel is closed before the response is received from rpc
var amqplib = require('amqplib')
var request = require('amqplib-rpc').request
var ChannelCloseError = require('amqplib-rpc').ChannelCloseError
amqplib.connect(function (err, connection) {
if (err) throw err
var content = { a: 10, b: 20 } // gets converted to buffer automatically
var opts = { timeout: 100 }
// RPC request
request(connection, 'multiply-queue', content, opts, function (err, replyMessage) {
console.log(err) // [ChannelCloseError: 'rpc channel closed before receiving the response message']
console.log(err instanceof ChannelCloseError) // true
console.log(err.data) // same as Timeout Error example above
})
})
Reply to an rpc request, publish a message to replyTo queue.
Replies to a message using properties.replyTo
and properties.correlationId
.
Reply will NOT error if the "replyTo" queue does not exist, if you need it to use checkReplyTo
(example below).
Reply will NOT ack
the message
. Ack/Nack must be done manually.
/**
* @param {AmqplibChannel} channel on which the message was recieved
* @param {Object} message incoming message on channel
* @param {Buffer|Object|Array|String} content message content
* @param {Object} opts publish options
* @return {Boolean} replyWriteSuccess - amqplib docs do not mention this ever failing..
*/
var amqplib = require('amqplib/callback_api')
var reply = require('amqplib-rpc').reply
amqplib.connect(function (err, connection) {
if (err) throw err
connection.createChannel(function (err, consumerChannel) {
if (err) throw err
connection.createChannel(function (err, publisherChannel) {
if (err) throw err
consumerChannel.consume('multiply-queue', messageHandler, function (err) {
if (err) throw err
})
function messageHandler (message) {
var json = JSON.parse(message.content.toString())
var content = json.a * json.b // gets converted to buffer automatically
var opts = {} // optional
// RPC reply
reply(publisherChannel, message, content, opts)
// "ack" message
consumerChannel.ack(message)
}
})
})
})
Create a channel, check if replyTo queue exists, and close the channel.
checkReplyQueue
creates it's own channel, bc checking for a non-existant queue errors and closes the channel.
/**
* @param {AmqpblibConnection} connection amqplib rabbitmq connection
* @param {Object} message request queue message
* @param {Function} [cb] not required if using promises
* @return {Promise} if using promises
*/
var amqplib = require('amqplib/callback_api')
var reply = require('amqplib-rpc').reply
var checkReplyQueue = require('amqplib-rpc').checkReplyQueue
var QueueNotFound = require('amqplib-rpc').QueueNotFound
amqplib.connect(function (err, connection) {
if (err) throw err
connection.createChannel(function (err, consumerChannel) {
if (err) throw err
connection.createChannel(function (err, publisherChannel) {
if (err) throw err
consumerChannel.consume('multiply-queue', messageHandler, function (err) {
if (err) throw err
})
function messageHandler (message) {
var json = JSON.parse(message.content.toString())
var content = json.a * json.b // gets converted to buffer automatically
var opts = {} // optional
// Check replyTo queue exists, note: also support promise api
checkReplyQueue(connection, message, function (err, exists) {
if (err) throw err
if (!exists) {
// "replyTo" queue no longer exists
// ack, nack, or etc.
return
}
// RPC reply
reply(publisherChannel, message, content, opts)
// "ack" message
consumerChannel.ack(message)
})
}
})
})
})
Create a channel, check if the queue exists, and close the channel. This is not rpc related but was implemented for checkReplyQueue
, so I've exported it.
In some cases, it may be useful to check if a queue exists before publishing to it.
/*
* @param {AmqpblibConnection} connection amqplib rabbitmq connection
* @param {String} queue queue name
* @param {Function} [cb] callback, not required if using promises
* @return {Promise} if using promises
*/
var amqplib = require('amqplib/callback_api')
var reply = require('amqplib-rpc').reply
var checkQueue = require('amqplib-rpc').checkQueue
var QueueNotFound = require('amqplib-rpc').QueueNotFound
amqplib.connect(function (err, connection) {
if (err) throw err
connection.createChannel(function (err, channel) {
if (err) throw err
var queue = 'some-queue'
// Check replyTo queue exists, note: also support promise api
checkQueue(connection, queue, function (err, exists) {
if (err) throw err
if (!exists) {
// queue does not exist, do something special
return
}
// publish message to queue
channel.sendToQueue(queue, content)
})
})
})
https://www.rabbitmq.com/tutorials/tutorial-six-javascript.html
MIT
3.1.0
FAQs
Thin Amplib utils for RabbitMQ RPC in Node.js
The npm package amqplib-rpc receives a total of 458 weekly downloads. As such, amqplib-rpc popularity was classified as not popular.
We found that amqplib-rpc demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket researchers uncover the risks of a malicious Python package targeting Discord developers.
Security News
The UK is proposing a bold ban on ransomware payments by public entities to disrupt cybercrime, protect critical services, and lead global cybersecurity efforts.
Security News
Snyk's use of malicious npm packages for research raises ethical concerns, highlighting risks in public deployment, data exfiltration, and unauthorized testing.