Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@pager/jackrabbit

Package Overview
Dependencies
Maintainers
5
Versions
48
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@pager/jackrabbit - npm Package Compare versions

Comparing version 4.7.0 to 4.7.1

README.md

293

lib/exchange.js

@@ -1,17 +0,16 @@

'use strict';
'use strict'
var amqp = require('amqplib/callback_api');
var extend = require('lodash.assignin');
var EventEmitter = require('events').EventEmitter;
var uuid = require('uuid/v4');
const extend = require('lodash.assignin')
const EventEmitter = require('events').EventEmitter
const uuid = require('uuid/v4')
var queue = require('./queue');
const queue = require('./queue')
var DEFAULT_EXCHANGES = {
const DEFAULT_EXCHANGES = {
'direct': 'amq.direct',
'fanout': 'amq.fanout',
'topic': 'amq.topic'
};
}
var DEFAULT_EXCHANGE_OPTIONS = {
const DEFAULT_EXCHANGE_OPTIONS = {
durable: true,

@@ -21,5 +20,5 @@ internal: false,

alternateExchange: undefined
};
}
var DEFAULT_PUBLISH_OPTIONS = {
const DEFAULT_PUBLISH_OPTIONS = {
contentType: 'application/json',

@@ -32,28 +31,28 @@ mandatory: false,

BCC: undefined
};
}
var DEFAULT_RPC_CLIENT_OPTIONS = {
const DEFAULT_RPC_CLIENT_OPTIONS = {
timeout: 3000
};
}
module.exports = exchange;
module.exports = exchange
function exchange(name, type, options) {
function exchange (name, type, options) {
if (!type) {
throw new Error('missing exchange type');
throw new Error('missing exchange type')
}
if (!isNameless(name)) {
name = name || DEFAULT_EXCHANGES[type];
name = name || DEFAULT_EXCHANGES[type]
if (!name) {
throw new Error('missing exchange name');
throw new Error('missing exchange name')
}
}
var ready = false;
var connection, channel;
var publishing = 0;
var replyQueue = queue({ exclusive: true });
var pendingReplies = {};
let ready = false
let connection, channel
let publishing = 0
const replyQueue = queue({ exclusive: true })
const pendingReplies = {}
var emitter = extend(new EventEmitter(), {
const emitter = extend(new EventEmitter(), {
name: name,

@@ -67,30 +66,29 @@ type: type,

rpcServer: rpcServer
});
})
return emitter;
return emitter
function rpcClient(key, msg, options, cb) {
function rpcClient (key, msg, options, cb) {
if (!key) {
throw new Error('missing rpc method');
throw new Error('missing rpc method')
}
if (!cb && typeof options === 'function') {
cb = options;
cb = options
}
if (!options || typeof options !== 'object') {
options = DEFAULT_RPC_CLIENT_OPTIONS;
options = DEFAULT_RPC_CLIENT_OPTIONS
}
var opts = extend({}, {
const opts = extend({}, {
key: key,
rpcCallback: cb
}, options);
}, options)
publish(msg, opts);
publish(msg, opts)
}
function rpcServer(key, handler) {
var rpcQueue = createQueue({
function rpcServer (key, handler) {
const rpcQueue = createQueue({
key: key,

@@ -101,50 +99,50 @@ name: key,

autoDelete: true
});
rpcQueue.consume(handler);
})
rpcQueue.consume(handler)
}
function connect(con) {
connection = con;
connection.createChannel(onChannel);
replyQueue.on('close', bail.bind(this));
replyQueue.consume(onReply, { noAck: true });
return emitter;
function connect (con) {
connection = con
connection.createChannel(onChannel)
replyQueue.on('close', bail.bind(this))
replyQueue.consume(onReply, { noAck: true })
return emitter
}
function createQueue(options) {
var newQueue = queue(options);
newQueue.on('close', bail.bind(this));
function createQueue (options) {
const newQueue = queue(options)
newQueue.on('close', bail.bind(this))
newQueue.once('ready', function () {
// the default exchange has implicit bindings to all queues
if (!isNameless(emitter.name)) {
var keys = options.keys || [options.key];
const keys = options.keys || [options.key]
bindKeys(keys)
.then(function emitBoundEvent(res) {
.then(function emitBoundEvent (res) {
newQueue.emit('bound')
})
.catch(bail);
.catch(bail)
}
});
})
if (connection) {
newQueue.connect(connection);
newQueue.connect(connection)
} else {
emitter.once('ready', function () {
newQueue.connect(connection);
});
newQueue.connect(connection)
})
}
return newQueue;
return newQueue
// return a promise when all keys are bound
function bindKeys(keys) {
return Promise.all(keys.map(bindKey));
function bindKeys (keys) {
return Promise.all(keys.map(bindKey))
// returns a promise when a key is bound
function bindKey(key) {
function bindKey (key) {
return new Promise(function (resolve, reject) {
channel.bindQueue(newQueue.name, emitter.name, key, {}, function onBind(err, ok) {
if (err) return reject(err);
return resolve(ok);
});
});
channel.bindQueue(newQueue.name, emitter.name, key, {}, function onBind (err, ok) {
if (err) return reject(err)
return resolve(ok)
})
})
}

@@ -154,150 +152,145 @@ }

function publish(message, options) {
function publish (message, options) {
publishing++
options = options || {}
publishing++;
options = options || {};
const sendMessageRef = options.rpcCallback ? sendRpcMessage : sendMessage
var sendMessageRef = options.rpcCallback ? sendRpcMessage : sendMessage;
if (ready) {
sendMessageRef();
sendMessageRef()
} else {
emitter.once('ready', sendMessageRef);
emitter.once('ready', sendMessageRef)
}
return emitter;
return emitter
function sendMessage() {
function sendMessage () {
// TODO: better blacklisting/whitelisting of properties
var opts = extend({}, DEFAULT_PUBLISH_OPTIONS, options);
var msg = encodeMessage(message, opts.contentType);
const opts = extend({}, DEFAULT_PUBLISH_OPTIONS, options)
const msg = encodeMessage(message, opts.contentType)
if (opts.reply) {
opts.replyTo = replyQueue.name;
opts.correlationId = uuid();
pendingReplies[opts.correlationId] = opts.reply;
delete opts.reply;
opts.replyTo = replyQueue.name
opts.correlationId = uuid()
pendingReplies[opts.correlationId] = opts.reply
delete opts.reply
}
var drained = channel.publish(emitter.name, opts.key, new Buffer(msg), opts);
if (drained) onDrain();
const drained = channel.publish(emitter.name, opts.key, new Buffer(msg), opts)
if (drained) onDrain()
}
function sendRpcMessage() {
function sendRpcMessage () {
const opts = extend({}, DEFAULT_PUBLISH_OPTIONS, options)
const msg = encodeMessage(message, opts.contentType)
var opts = extend({}, DEFAULT_PUBLISH_OPTIONS, options);
var msg = encodeMessage(message, opts.contentType);
let replied = false
const correlationId = uuid()
const rpcCallback = opts.rpcCallback
var replied = false;
var correlationId = uuid();
var rpcCallback = opts.rpcCallback;
function onReply (reply) {
clearTimeout(timeout)
channel.removeListener('return', onNotFound)
function onReply(reply) {
clearTimeout(timeout);
channel.removeListener('return', onNotFound);
if (!replied) {
replied = true;
rpcCallback(reply);
replied = true
rpcCallback(reply)
}
}
function onNotFound(notFound) {
function onNotFound (notFound) {
clearTimeout(timeout)
clearPendingReply(correlationId)
channel.removeListener('return', onNotFound)
clearTimeout(timeout);
clearPendingReply(correlationId);
channel.removeListener('return', onNotFound);
if (!replied) {
replied = true;
rpcCallback(new Error('Not Found'));
replied = true
rpcCallback(new Error('Not Found'))
}
}
var timeout = setTimeout(function () {
const timeout = setTimeout(function () {
clearPendingReply(correlationId)
channel.removeListener('return', onNotFound)
clearPendingReply(correlationId);
channel.removeListener('return', onNotFound);
if (!replied) {
replied = true;
rpcCallback(new Error('Timeout'));
replied = true
rpcCallback(new Error('Timeout'))
}
}, options.timeout || DEFAULT_RPC_CLIENT_OPTIONS.timeout);
}, options.timeout || DEFAULT_RPC_CLIENT_OPTIONS.timeout)
opts.replyTo = replyQueue.name;
opts.correlationId = correlationId;
opts.mandatory = true;
opts.replyTo = replyQueue.name
opts.correlationId = correlationId
opts.mandatory = true
pendingReplies[opts.correlationId] = onReply;
channel.once('return', onNotFound);
pendingReplies[opts.correlationId] = onReply
channel.once('return', onNotFound)
var drained = channel.publish(emitter.name, opts.key, new Buffer(msg), opts);
if (drained) onDrain();
const drained = channel.publish(emitter.name, opts.key, new Buffer(msg), opts)
if (drained) onDrain()
}
}
function encodeMessage(message, contentType) {
if (contentType === 'application/json') return JSON.stringify(message);
return message;
function encodeMessage (message, contentType) {
if (contentType === 'application/json') return JSON.stringify(message)
return message
}
function onReply(data, ack, nack, msg) {
var replyCallback = pendingReplies[msg.properties.correlationId];
if (replyCallback) replyCallback(data);
clearPendingReply(msg.properties.correlationId);
function onReply (data, ack, nack, msg) {
const replyCallback = pendingReplies[msg.properties.correlationId]
if (replyCallback) replyCallback(data)
clearPendingReply(msg.properties.correlationId)
}
function clearPendingReply(correlationId) {
delete pendingReplies[correlationId];
function clearPendingReply (correlationId) {
delete pendingReplies[correlationId]
}
function bail(err) {
function bail (err) {
// TODO: close all queue channels?
connection = undefined;
channel = undefined;
emitter.emit('close', err);
connection = undefined
channel = undefined
emitter.emit('close', err)
}
function onDrain() {
function onDrain () {
setImmediate(function () {
publishing--;
publishing--
if (publishing === 0) {
emitter.emit('drain');
emitter.emit('drain')
}
});
})
}
function onChannel(err, chan) {
if (err) return bail(err);
channel = chan;
channel.on('close', bail.bind(this, new Error('channel closed')));
channel.on('drain', onDrain);
emitter.emit('connected');
function onChannel (err, chan) {
if (err) return bail(err)
channel = chan
channel.on('close', bail.bind(this, new Error('channel closed')))
channel.on('drain', onDrain)
emitter.emit('connected')
if (isDefault(emitter.name, emitter.type) || isNameless(emitter.name)) {
onExchange(undefined, {
exchange: emitter.name
});
})
} else {
channel.assertExchange(emitter.name, emitter.type, emitter.options, onExchange);
channel.assertExchange(emitter.name, emitter.type, emitter.options, onExchange)
}
}
function onExchange(err, info) {
if (err) return bail(err);
replyQueue.connect(connection);
function onExchange (err, info) {
if (err) return bail(err)
replyQueue.connect(connection)
replyQueue.once('ready', function () {
ready = true;
emitter.emit('ready');
});
ready = true
emitter.emit('ready')
})
}
function isDefault(name, type) {
return DEFAULT_EXCHANGES[type] === name;
function isDefault (name, type) {
return DEFAULT_EXCHANGES[type] === name
}
function isNameless(name) {
return name === '';
function isNameless (name) {
return name === ''
}
}

@@ -1,6 +0,8 @@

var amqp = require('amqplib/callback_api');
var extend = require('lodash.assignin');
var EventEmitter = require('events').EventEmitter;
var exchange = require('./exchange');
'use strict';
const amqp = require('amqplib/callback_api');
const extend = require('lodash.assignin');
const EventEmitter = require('events').EventEmitter;
const exchange = require('./exchange');
module.exports = jackrabbit;

@@ -12,5 +14,5 @@

// state
var connection;
let connection;
var rabbit = extend(new EventEmitter(), {
const rabbit = extend(new EventEmitter(), {
default: createDefaultExchange,

@@ -62,3 +64,3 @@ direct: createExchange().bind(null, 'direct'),

return function(type, name, options) {
var newExchange = exchange(name, type, options);
const newExchange = exchange(name, type, options);
if (connection) {

@@ -65,0 +67,0 @@ newExchange.connect(connection);

@@ -1,6 +0,8 @@

var amqp = require('amqplib/callback_api');
var extend = require('lodash.assignin');
var EventEmitter = require('events').EventEmitter;
'use strict';
var DEFAULT_QUEUE_OPTIONS = {
const amqp = require('amqplib/callback_api');
const extend = require('lodash.assignin');
const EventEmitter = require('events').EventEmitter;
const DEFAULT_QUEUE_OPTIONS = {
exclusive: false,

@@ -13,3 +15,3 @@ durable: true,

var DEFAULT_CONSUME_OPTIONS = {
const DEFAULT_CONSUME_OPTIONS = {
consumerTag: undefined,

@@ -25,4 +27,4 @@ noAck: false,

options = options || {};
var channel, consumerTag;
var emitter = extend(new EventEmitter(), {
let channel, consumerTag, ready;
const emitter = extend(new EventEmitter(), {
name: options.name,

@@ -43,9 +45,16 @@ options: extend({}, DEFAULT_QUEUE_OPTIONS, options),

function consume(callback, options) {
emitter.once('ready', function() {
var opts = extend({}, DEFAULT_CONSUME_OPTIONS, options);
if( ready ){
const opts = extend({}, DEFAULT_CONSUME_OPTIONS, options);
channel.consume(emitter.name, onMessage, opts, onConsume);
});
}
else {
emitter.once('ready', function() {
const opts = extend({}, DEFAULT_CONSUME_OPTIONS, options);
channel.consume(emitter.name, onMessage, opts, onConsume);
});
}
function onMessage(msg) {
var data = parseMessage(msg);
const data = parseMessage(msg);
if (!data) return;

@@ -56,6 +65,6 @@

function ack(reply) {
var replyTo = msg.properties.replyTo;
var id = msg.properties.correlationId;
const replyTo = msg.properties.replyTo;
const id = msg.properties.correlationId;
if (replyTo && id) {
var buffer = encodeMessage(reply, msg.properties.contentType);
const buffer = encodeMessage(reply, msg.properties.contentType);
channel.publish('', replyTo, buffer, {

@@ -145,4 +154,5 @@ correlationId: id,

emitter.name = info.queue;
ready = true;
emitter.emit('ready');
}
}
{
"name": "@pager/jackrabbit",
"version": "4.7.0",
"version": "4.7.1",
"description": "Easy RabbitMQ for node",

@@ -22,3 +22,4 @@ "keywords": [

"scripts": {
"test": "mocha -R spec"
"test": "mocha -R spec",
"lint": "eslint ."
},

@@ -36,5 +37,8 @@ "engines": {

"devDependencies": {
"chai": "3.x.x",
"chai": "4.x",
"eslint": "6.x.x",
"eslint-config-hapi": "12.x.x",
"eslint-plugin-hapi": "4.x.x",
"mocha": "3.x.x"
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc