Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@pager/jackrabbit

Package Overview
Dependencies
Maintainers
15
Versions
48
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@pager/jackrabbit - npm Package Compare versions

Comparing version 4.6.1 to 4.7.0-rc.1

142

lib/exchange.js

@@ -0,1 +1,3 @@

'use strict';
var amqp = require('amqplib/callback_api');

@@ -67,9 +69,10 @@ var extend = require('lodash.assignin');

function rpcClient (key, msg, options, replyTo) {
if(!key) {
function rpcClient(key, msg, options, cb) {
if (!key) {
throw new Error('missing rpc method');
}
if(!replyTo && typeof options === 'function') {
replyTo = options;
if (!cb && typeof options === 'function') {
cb = options;
}

@@ -81,35 +84,16 @@

let replied = false;
var timeout = setTimeout(() => {
replyTo(new Error('Timeout'));
replied = true;
}, options.timeout || DEFAULT_RPC_CLIENT_OPTIONS.timeout);
function onReply(reply) {
clearTimeout(timeout);
if (!replied) {
replyTo(reply);
}
}
function onNotFound(notFound) {
clearTimeout(timeout);
if (!replied) {
replyTo(new Error('Not Found'));
}
}
publish(msg, {
key: key,
mandatory: true,
reply: onReply,
notFound: onNotFound
rpcCallback: cb
});
}
function rpcServer (key, handler) {
var rpcQueue = createQueue({ key: key, name: key, prefetch: 1, durable: false, autoDelete: true });
function rpcServer(key, handler) {
var rpcQueue = createQueue({
key: key,
name: key,
prefetch: 1,
durable: false,
autoDelete: true
});
rpcQueue.consume(handler);

@@ -129,3 +113,3 @@ }

newQueue.on('close', bail.bind(this));
newQueue.once('ready', function() {
newQueue.once('ready', function () {
// the default exchange has implicit bindings to all queues

@@ -144,5 +128,4 @@ if (!isNameless(emitter.name)) {

newQueue.connect(connection);
}
else {
emitter.once('ready', function() {
} else {
emitter.once('ready', function () {
newQueue.connect(connection);

@@ -171,6 +154,14 @@ });

function publish(message, options) {
publishing++;
options = options || {};
if (ready) sendMessage();
else emitter.once('ready', sendMessage);
var sendMessageRef = options.rpcCallback ? sendRpcMessage : sendMessage;
if (ready) {
sendMessageRef();
} else {
emitter.once('ready', sendMessageRef);
}
return emitter;

@@ -182,2 +173,3 @@

var msg = encodeMessage(message, opts.contentType);
if (opts.reply) {

@@ -189,5 +181,57 @@ opts.replyTo = replyQueue.name;

}
if (opts.mandatory && opts.notFound) {
channel.on('return', opts.notFound);
var drained = channel.publish(emitter.name, opts.key, new Buffer(msg), opts);
if (drained) onDrain();
}
function sendRpcMessage() {
var opts = extend({}, DEFAULT_PUBLISH_OPTIONS, options);
var msg = encodeMessage(message, opts.contentType);
var replied = false;
var correlationId = uuid();
var rpcCallback = opts.rpcCallback;
function onReply(reply) {
clearTimeout(timeout);
channel.removeListener('return', onNotFound);
if (!replied) {
replied = true;
rpcCallback(reply);
}
}
function onNotFound(notFound) {
clearTimeout(timeout);
clearPendingReply(correlationId);
channel.removeListener('return', onNotFound);
if (!replied) {
replied = true;
rpcCallback(new Error('Not Found'));
}
}
var timeout = setTimeout(() => {
clearPendingReply(correlationId);
channel.removeListener('return', onNotFound);
if (!replied) {
replied = true;
rpcCallback(new Error('Timeout'));
}
}, options.timeout || DEFAULT_RPC_CLIENT_OPTIONS.timeout);
opts.replyTo = replyQueue.name;
opts.correlationId = correlationId;
opts.mandatory = true;
pendingReplies[opts.correlationId] = onReply;
channel.once('return', onNotFound);
var drained = channel.publish(emitter.name, opts.key, new Buffer(msg), opts);

@@ -206,4 +250,9 @@ if (drained) onDrain();

if (replyCallback) replyCallback(data);
clearPendingReply(msg.properties.correlationId);
}
function clearPendingReply(correlationId) {
delete pendingReplies[correlationId];
}
function bail(err) {

@@ -217,3 +266,3 @@ // TODO: close all queue channels?

function onDrain() {
setImmediate(function() {
setImmediate(function () {
publishing--;

@@ -233,5 +282,6 @@ if (publishing === 0) {

if (isDefault(emitter.name, emitter.type) || isNameless(emitter.name)) {
onExchange(undefined, { exchange: emitter.name });
}
else {
onExchange(undefined, {
exchange: emitter.name
});
} else {
channel.assertExchange(emitter.name, emitter.type, emitter.options, onExchange);

@@ -244,3 +294,3 @@ }

replyQueue.connect(connection);
replyQueue.once('ready', function() {
replyQueue.once('ready', function () {
ready = true;

@@ -258,2 +308,2 @@ emitter.emit('ready');

}
}
}
{
"name": "@pager/jackrabbit",
"version": "4.6.1",
"version": "4.7.0-rc.1",
"description": "Easy RabbitMQ for node",

@@ -32,3 +32,3 @@ "keywords": [

"lodash.assignin": "4.x.x",
"uuid": "^3.0.1"
"uuid": "3.x.x"
},

@@ -35,0 +35,0 @@ "devDependencies": {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc