@pager/jackrabbit
Advanced tools
Comparing version 4.7.0 to 4.7.1
@@ -1,17 +0,16 @@ | ||
'use strict'; | ||
'use strict' | ||
var amqp = require('amqplib/callback_api'); | ||
var extend = require('lodash.assignin'); | ||
var EventEmitter = require('events').EventEmitter; | ||
var uuid = require('uuid/v4'); | ||
const extend = require('lodash.assignin') | ||
const EventEmitter = require('events').EventEmitter | ||
const uuid = require('uuid/v4') | ||
var queue = require('./queue'); | ||
const queue = require('./queue') | ||
var DEFAULT_EXCHANGES = { | ||
const DEFAULT_EXCHANGES = { | ||
'direct': 'amq.direct', | ||
'fanout': 'amq.fanout', | ||
'topic': 'amq.topic' | ||
}; | ||
} | ||
var DEFAULT_EXCHANGE_OPTIONS = { | ||
const DEFAULT_EXCHANGE_OPTIONS = { | ||
durable: true, | ||
@@ -21,5 +20,5 @@ internal: false, | ||
alternateExchange: undefined | ||
}; | ||
} | ||
var DEFAULT_PUBLISH_OPTIONS = { | ||
const DEFAULT_PUBLISH_OPTIONS = { | ||
contentType: 'application/json', | ||
@@ -32,28 +31,28 @@ mandatory: false, | ||
BCC: undefined | ||
}; | ||
} | ||
var DEFAULT_RPC_CLIENT_OPTIONS = { | ||
const DEFAULT_RPC_CLIENT_OPTIONS = { | ||
timeout: 3000 | ||
}; | ||
} | ||
module.exports = exchange; | ||
module.exports = exchange | ||
function exchange(name, type, options) { | ||
function exchange (name, type, options) { | ||
if (!type) { | ||
throw new Error('missing exchange type'); | ||
throw new Error('missing exchange type') | ||
} | ||
if (!isNameless(name)) { | ||
name = name || DEFAULT_EXCHANGES[type]; | ||
name = name || DEFAULT_EXCHANGES[type] | ||
if (!name) { | ||
throw new Error('missing exchange name'); | ||
throw new Error('missing exchange name') | ||
} | ||
} | ||
var ready = false; | ||
var connection, channel; | ||
var publishing = 0; | ||
var replyQueue = queue({ exclusive: true }); | ||
var pendingReplies = {}; | ||
let ready = false | ||
let connection, channel | ||
let publishing = 0 | ||
const replyQueue = queue({ exclusive: true }) | ||
const pendingReplies = {} | ||
var emitter = extend(new EventEmitter(), { | ||
const emitter = extend(new EventEmitter(), { | ||
name: name, | ||
@@ -67,30 +66,29 @@ type: type, | ||
rpcServer: rpcServer | ||
}); | ||
}) | ||
return emitter; | ||
return emitter | ||
function rpcClient(key, msg, options, cb) { | ||
function rpcClient (key, msg, options, cb) { | ||
if (!key) { | ||
throw new Error('missing rpc method'); | ||
throw new Error('missing rpc method') | ||
} | ||
if (!cb && typeof options === 'function') { | ||
cb = options; | ||
cb = options | ||
} | ||
if (!options || typeof options !== 'object') { | ||
options = DEFAULT_RPC_CLIENT_OPTIONS; | ||
options = DEFAULT_RPC_CLIENT_OPTIONS | ||
} | ||
var opts = extend({}, { | ||
const opts = extend({}, { | ||
key: key, | ||
rpcCallback: cb | ||
}, options); | ||
}, options) | ||
publish(msg, opts); | ||
publish(msg, opts) | ||
} | ||
function rpcServer(key, handler) { | ||
var rpcQueue = createQueue({ | ||
function rpcServer (key, handler) { | ||
const rpcQueue = createQueue({ | ||
key: key, | ||
@@ -101,50 +99,50 @@ name: key, | ||
autoDelete: true | ||
}); | ||
rpcQueue.consume(handler); | ||
}) | ||
rpcQueue.consume(handler) | ||
} | ||
function connect(con) { | ||
connection = con; | ||
connection.createChannel(onChannel); | ||
replyQueue.on('close', bail.bind(this)); | ||
replyQueue.consume(onReply, { noAck: true }); | ||
return emitter; | ||
function connect (con) { | ||
connection = con | ||
connection.createChannel(onChannel) | ||
replyQueue.on('close', bail.bind(this)) | ||
replyQueue.consume(onReply, { noAck: true }) | ||
return emitter | ||
} | ||
function createQueue(options) { | ||
var newQueue = queue(options); | ||
newQueue.on('close', bail.bind(this)); | ||
function createQueue (options) { | ||
const newQueue = queue(options) | ||
newQueue.on('close', bail.bind(this)) | ||
newQueue.once('ready', function () { | ||
// the default exchange has implicit bindings to all queues | ||
if (!isNameless(emitter.name)) { | ||
var keys = options.keys || [options.key]; | ||
const keys = options.keys || [options.key] | ||
bindKeys(keys) | ||
.then(function emitBoundEvent(res) { | ||
.then(function emitBoundEvent (res) { | ||
newQueue.emit('bound') | ||
}) | ||
.catch(bail); | ||
.catch(bail) | ||
} | ||
}); | ||
}) | ||
if (connection) { | ||
newQueue.connect(connection); | ||
newQueue.connect(connection) | ||
} else { | ||
emitter.once('ready', function () { | ||
newQueue.connect(connection); | ||
}); | ||
newQueue.connect(connection) | ||
}) | ||
} | ||
return newQueue; | ||
return newQueue | ||
// return a promise when all keys are bound | ||
function bindKeys(keys) { | ||
return Promise.all(keys.map(bindKey)); | ||
function bindKeys (keys) { | ||
return Promise.all(keys.map(bindKey)) | ||
// returns a promise when a key is bound | ||
function bindKey(key) { | ||
function bindKey (key) { | ||
return new Promise(function (resolve, reject) { | ||
channel.bindQueue(newQueue.name, emitter.name, key, {}, function onBind(err, ok) { | ||
if (err) return reject(err); | ||
return resolve(ok); | ||
}); | ||
}); | ||
channel.bindQueue(newQueue.name, emitter.name, key, {}, function onBind (err, ok) { | ||
if (err) return reject(err) | ||
return resolve(ok) | ||
}) | ||
}) | ||
} | ||
@@ -154,150 +152,145 @@ } | ||
function publish(message, options) { | ||
function publish (message, options) { | ||
publishing++ | ||
options = options || {} | ||
publishing++; | ||
options = options || {}; | ||
const sendMessageRef = options.rpcCallback ? sendRpcMessage : sendMessage | ||
var sendMessageRef = options.rpcCallback ? sendRpcMessage : sendMessage; | ||
if (ready) { | ||
sendMessageRef(); | ||
sendMessageRef() | ||
} else { | ||
emitter.once('ready', sendMessageRef); | ||
emitter.once('ready', sendMessageRef) | ||
} | ||
return emitter; | ||
return emitter | ||
function sendMessage() { | ||
function sendMessage () { | ||
// TODO: better blacklisting/whitelisting of properties | ||
var opts = extend({}, DEFAULT_PUBLISH_OPTIONS, options); | ||
var msg = encodeMessage(message, opts.contentType); | ||
const opts = extend({}, DEFAULT_PUBLISH_OPTIONS, options) | ||
const msg = encodeMessage(message, opts.contentType) | ||
if (opts.reply) { | ||
opts.replyTo = replyQueue.name; | ||
opts.correlationId = uuid(); | ||
pendingReplies[opts.correlationId] = opts.reply; | ||
delete opts.reply; | ||
opts.replyTo = replyQueue.name | ||
opts.correlationId = uuid() | ||
pendingReplies[opts.correlationId] = opts.reply | ||
delete opts.reply | ||
} | ||
var drained = channel.publish(emitter.name, opts.key, new Buffer(msg), opts); | ||
if (drained) onDrain(); | ||
const drained = channel.publish(emitter.name, opts.key, new Buffer(msg), opts) | ||
if (drained) onDrain() | ||
} | ||
function sendRpcMessage() { | ||
function sendRpcMessage () { | ||
const opts = extend({}, DEFAULT_PUBLISH_OPTIONS, options) | ||
const msg = encodeMessage(message, opts.contentType) | ||
var opts = extend({}, DEFAULT_PUBLISH_OPTIONS, options); | ||
var msg = encodeMessage(message, opts.contentType); | ||
let replied = false | ||
const correlationId = uuid() | ||
const rpcCallback = opts.rpcCallback | ||
var replied = false; | ||
var correlationId = uuid(); | ||
var rpcCallback = opts.rpcCallback; | ||
function onReply (reply) { | ||
clearTimeout(timeout) | ||
channel.removeListener('return', onNotFound) | ||
function onReply(reply) { | ||
clearTimeout(timeout); | ||
channel.removeListener('return', onNotFound); | ||
if (!replied) { | ||
replied = true; | ||
rpcCallback(reply); | ||
replied = true | ||
rpcCallback(reply) | ||
} | ||
} | ||
function onNotFound(notFound) { | ||
function onNotFound (notFound) { | ||
clearTimeout(timeout) | ||
clearPendingReply(correlationId) | ||
channel.removeListener('return', onNotFound) | ||
clearTimeout(timeout); | ||
clearPendingReply(correlationId); | ||
channel.removeListener('return', onNotFound); | ||
if (!replied) { | ||
replied = true; | ||
rpcCallback(new Error('Not Found')); | ||
replied = true | ||
rpcCallback(new Error('Not Found')) | ||
} | ||
} | ||
var timeout = setTimeout(function () { | ||
const timeout = setTimeout(function () { | ||
clearPendingReply(correlationId) | ||
channel.removeListener('return', onNotFound) | ||
clearPendingReply(correlationId); | ||
channel.removeListener('return', onNotFound); | ||
if (!replied) { | ||
replied = true; | ||
rpcCallback(new Error('Timeout')); | ||
replied = true | ||
rpcCallback(new Error('Timeout')) | ||
} | ||
}, options.timeout || DEFAULT_RPC_CLIENT_OPTIONS.timeout); | ||
}, options.timeout || DEFAULT_RPC_CLIENT_OPTIONS.timeout) | ||
opts.replyTo = replyQueue.name; | ||
opts.correlationId = correlationId; | ||
opts.mandatory = true; | ||
opts.replyTo = replyQueue.name | ||
opts.correlationId = correlationId | ||
opts.mandatory = true | ||
pendingReplies[opts.correlationId] = onReply; | ||
channel.once('return', onNotFound); | ||
pendingReplies[opts.correlationId] = onReply | ||
channel.once('return', onNotFound) | ||
var drained = channel.publish(emitter.name, opts.key, new Buffer(msg), opts); | ||
if (drained) onDrain(); | ||
const drained = channel.publish(emitter.name, opts.key, new Buffer(msg), opts) | ||
if (drained) onDrain() | ||
} | ||
} | ||
function encodeMessage(message, contentType) { | ||
if (contentType === 'application/json') return JSON.stringify(message); | ||
return message; | ||
function encodeMessage (message, contentType) { | ||
if (contentType === 'application/json') return JSON.stringify(message) | ||
return message | ||
} | ||
function onReply(data, ack, nack, msg) { | ||
var replyCallback = pendingReplies[msg.properties.correlationId]; | ||
if (replyCallback) replyCallback(data); | ||
clearPendingReply(msg.properties.correlationId); | ||
function onReply (data, ack, nack, msg) { | ||
const replyCallback = pendingReplies[msg.properties.correlationId] | ||
if (replyCallback) replyCallback(data) | ||
clearPendingReply(msg.properties.correlationId) | ||
} | ||
function clearPendingReply(correlationId) { | ||
delete pendingReplies[correlationId]; | ||
function clearPendingReply (correlationId) { | ||
delete pendingReplies[correlationId] | ||
} | ||
function bail(err) { | ||
function bail (err) { | ||
// TODO: close all queue channels? | ||
connection = undefined; | ||
channel = undefined; | ||
emitter.emit('close', err); | ||
connection = undefined | ||
channel = undefined | ||
emitter.emit('close', err) | ||
} | ||
function onDrain() { | ||
function onDrain () { | ||
setImmediate(function () { | ||
publishing--; | ||
publishing-- | ||
if (publishing === 0) { | ||
emitter.emit('drain'); | ||
emitter.emit('drain') | ||
} | ||
}); | ||
}) | ||
} | ||
function onChannel(err, chan) { | ||
if (err) return bail(err); | ||
channel = chan; | ||
channel.on('close', bail.bind(this, new Error('channel closed'))); | ||
channel.on('drain', onDrain); | ||
emitter.emit('connected'); | ||
function onChannel (err, chan) { | ||
if (err) return bail(err) | ||
channel = chan | ||
channel.on('close', bail.bind(this, new Error('channel closed'))) | ||
channel.on('drain', onDrain) | ||
emitter.emit('connected') | ||
if (isDefault(emitter.name, emitter.type) || isNameless(emitter.name)) { | ||
onExchange(undefined, { | ||
exchange: emitter.name | ||
}); | ||
}) | ||
} else { | ||
channel.assertExchange(emitter.name, emitter.type, emitter.options, onExchange); | ||
channel.assertExchange(emitter.name, emitter.type, emitter.options, onExchange) | ||
} | ||
} | ||
function onExchange(err, info) { | ||
if (err) return bail(err); | ||
replyQueue.connect(connection); | ||
function onExchange (err, info) { | ||
if (err) return bail(err) | ||
replyQueue.connect(connection) | ||
replyQueue.once('ready', function () { | ||
ready = true; | ||
emitter.emit('ready'); | ||
}); | ||
ready = true | ||
emitter.emit('ready') | ||
}) | ||
} | ||
function isDefault(name, type) { | ||
return DEFAULT_EXCHANGES[type] === name; | ||
function isDefault (name, type) { | ||
return DEFAULT_EXCHANGES[type] === name | ||
} | ||
function isNameless(name) { | ||
return name === ''; | ||
function isNameless (name) { | ||
return name === '' | ||
} | ||
} |
@@ -1,6 +0,8 @@ | ||
var amqp = require('amqplib/callback_api'); | ||
var extend = require('lodash.assignin'); | ||
var EventEmitter = require('events').EventEmitter; | ||
var exchange = require('./exchange'); | ||
'use strict'; | ||
const amqp = require('amqplib/callback_api'); | ||
const extend = require('lodash.assignin'); | ||
const EventEmitter = require('events').EventEmitter; | ||
const exchange = require('./exchange'); | ||
module.exports = jackrabbit; | ||
@@ -12,5 +14,5 @@ | ||
// state | ||
var connection; | ||
let connection; | ||
var rabbit = extend(new EventEmitter(), { | ||
const rabbit = extend(new EventEmitter(), { | ||
default: createDefaultExchange, | ||
@@ -62,3 +64,3 @@ direct: createExchange().bind(null, 'direct'), | ||
return function(type, name, options) { | ||
var newExchange = exchange(name, type, options); | ||
const newExchange = exchange(name, type, options); | ||
if (connection) { | ||
@@ -65,0 +67,0 @@ newExchange.connect(connection); |
@@ -1,6 +0,8 @@ | ||
var amqp = require('amqplib/callback_api'); | ||
var extend = require('lodash.assignin'); | ||
var EventEmitter = require('events').EventEmitter; | ||
'use strict'; | ||
var DEFAULT_QUEUE_OPTIONS = { | ||
const amqp = require('amqplib/callback_api'); | ||
const extend = require('lodash.assignin'); | ||
const EventEmitter = require('events').EventEmitter; | ||
const DEFAULT_QUEUE_OPTIONS = { | ||
exclusive: false, | ||
@@ -13,3 +15,3 @@ durable: true, | ||
var DEFAULT_CONSUME_OPTIONS = { | ||
const DEFAULT_CONSUME_OPTIONS = { | ||
consumerTag: undefined, | ||
@@ -25,4 +27,4 @@ noAck: false, | ||
options = options || {}; | ||
var channel, consumerTag; | ||
var emitter = extend(new EventEmitter(), { | ||
let channel, consumerTag, ready; | ||
const emitter = extend(new EventEmitter(), { | ||
name: options.name, | ||
@@ -43,9 +45,16 @@ options: extend({}, DEFAULT_QUEUE_OPTIONS, options), | ||
function consume(callback, options) { | ||
emitter.once('ready', function() { | ||
var opts = extend({}, DEFAULT_CONSUME_OPTIONS, options); | ||
if( ready ){ | ||
const opts = extend({}, DEFAULT_CONSUME_OPTIONS, options); | ||
channel.consume(emitter.name, onMessage, opts, onConsume); | ||
}); | ||
} | ||
else { | ||
emitter.once('ready', function() { | ||
const opts = extend({}, DEFAULT_CONSUME_OPTIONS, options); | ||
channel.consume(emitter.name, onMessage, opts, onConsume); | ||
}); | ||
} | ||
function onMessage(msg) { | ||
var data = parseMessage(msg); | ||
const data = parseMessage(msg); | ||
if (!data) return; | ||
@@ -56,6 +65,6 @@ | ||
function ack(reply) { | ||
var replyTo = msg.properties.replyTo; | ||
var id = msg.properties.correlationId; | ||
const replyTo = msg.properties.replyTo; | ||
const id = msg.properties.correlationId; | ||
if (replyTo && id) { | ||
var buffer = encodeMessage(reply, msg.properties.contentType); | ||
const buffer = encodeMessage(reply, msg.properties.contentType); | ||
channel.publish('', replyTo, buffer, { | ||
@@ -145,4 +154,5 @@ correlationId: id, | ||
emitter.name = info.queue; | ||
ready = true; | ||
emitter.emit('ready'); | ||
} | ||
} |
{ | ||
"name": "@pager/jackrabbit", | ||
"version": "4.7.0", | ||
"version": "4.7.1", | ||
"description": "Easy RabbitMQ for node", | ||
@@ -22,3 +22,4 @@ "keywords": [ | ||
"scripts": { | ||
"test": "mocha -R spec" | ||
"test": "mocha -R spec", | ||
"lint": "eslint ." | ||
}, | ||
@@ -36,5 +37,8 @@ "engines": { | ||
"devDependencies": { | ||
"chai": "3.x.x", | ||
"chai": "4.x", | ||
"eslint": "6.x.x", | ||
"eslint-config-hapi": "12.x.x", | ||
"eslint-plugin-hapi": "4.x.x", | ||
"mocha": "3.x.x" | ||
} | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
16011
450
88
5
1