@pager/jackrabbit
Advanced tools
Comparing version 4.6.1 to 4.7.0-rc.1
@@ -0,1 +1,3 @@ | ||
'use strict'; | ||
var amqp = require('amqplib/callback_api'); | ||
@@ -67,9 +69,10 @@ var extend = require('lodash.assignin'); | ||
function rpcClient (key, msg, options, replyTo) { | ||
if(!key) { | ||
function rpcClient(key, msg, options, cb) { | ||
if (!key) { | ||
throw new Error('missing rpc method'); | ||
} | ||
if(!replyTo && typeof options === 'function') { | ||
replyTo = options; | ||
if (!cb && typeof options === 'function') { | ||
cb = options; | ||
} | ||
@@ -81,35 +84,16 @@ | ||
let replied = false; | ||
var timeout = setTimeout(() => { | ||
replyTo(new Error('Timeout')); | ||
replied = true; | ||
}, options.timeout || DEFAULT_RPC_CLIENT_OPTIONS.timeout); | ||
function onReply(reply) { | ||
clearTimeout(timeout); | ||
if (!replied) { | ||
replyTo(reply); | ||
} | ||
} | ||
function onNotFound(notFound) { | ||
clearTimeout(timeout); | ||
if (!replied) { | ||
replyTo(new Error('Not Found')); | ||
} | ||
} | ||
publish(msg, { | ||
key: key, | ||
mandatory: true, | ||
reply: onReply, | ||
notFound: onNotFound | ||
rpcCallback: cb | ||
}); | ||
} | ||
function rpcServer (key, handler) { | ||
var rpcQueue = createQueue({ key: key, name: key, prefetch: 1, durable: false, autoDelete: true }); | ||
function rpcServer(key, handler) { | ||
var rpcQueue = createQueue({ | ||
key: key, | ||
name: key, | ||
prefetch: 1, | ||
durable: false, | ||
autoDelete: true | ||
}); | ||
rpcQueue.consume(handler); | ||
@@ -129,3 +113,3 @@ } | ||
newQueue.on('close', bail.bind(this)); | ||
newQueue.once('ready', function() { | ||
newQueue.once('ready', function () { | ||
// the default exchange has implicit bindings to all queues | ||
@@ -144,5 +128,4 @@ if (!isNameless(emitter.name)) { | ||
newQueue.connect(connection); | ||
} | ||
else { | ||
emitter.once('ready', function() { | ||
} else { | ||
emitter.once('ready', function () { | ||
newQueue.connect(connection); | ||
@@ -171,6 +154,14 @@ }); | ||
function publish(message, options) { | ||
publishing++; | ||
options = options || {}; | ||
if (ready) sendMessage(); | ||
else emitter.once('ready', sendMessage); | ||
var sendMessageRef = options.rpcCallback ? sendRpcMessage : sendMessage; | ||
if (ready) { | ||
sendMessageRef(); | ||
} else { | ||
emitter.once('ready', sendMessageRef); | ||
} | ||
return emitter; | ||
@@ -182,2 +173,3 @@ | ||
var msg = encodeMessage(message, opts.contentType); | ||
if (opts.reply) { | ||
@@ -189,5 +181,57 @@ opts.replyTo = replyQueue.name; | ||
} | ||
if (opts.mandatory && opts.notFound) { | ||
channel.on('return', opts.notFound); | ||
var drained = channel.publish(emitter.name, opts.key, new Buffer(msg), opts); | ||
if (drained) onDrain(); | ||
} | ||
function sendRpcMessage() { | ||
var opts = extend({}, DEFAULT_PUBLISH_OPTIONS, options); | ||
var msg = encodeMessage(message, opts.contentType); | ||
var replied = false; | ||
var correlationId = uuid(); | ||
var rpcCallback = opts.rpcCallback; | ||
function onReply(reply) { | ||
clearTimeout(timeout); | ||
channel.removeListener('return', onNotFound); | ||
if (!replied) { | ||
replied = true; | ||
rpcCallback(reply); | ||
} | ||
} | ||
function onNotFound(notFound) { | ||
clearTimeout(timeout); | ||
clearPendingReply(correlationId); | ||
channel.removeListener('return', onNotFound); | ||
if (!replied) { | ||
replied = true; | ||
rpcCallback(new Error('Not Found')); | ||
} | ||
} | ||
var timeout = setTimeout(() => { | ||
clearPendingReply(correlationId); | ||
channel.removeListener('return', onNotFound); | ||
if (!replied) { | ||
replied = true; | ||
rpcCallback(new Error('Timeout')); | ||
} | ||
}, options.timeout || DEFAULT_RPC_CLIENT_OPTIONS.timeout); | ||
opts.replyTo = replyQueue.name; | ||
opts.correlationId = correlationId; | ||
opts.mandatory = true; | ||
pendingReplies[opts.correlationId] = onReply; | ||
channel.once('return', onNotFound); | ||
var drained = channel.publish(emitter.name, opts.key, new Buffer(msg), opts); | ||
@@ -206,4 +250,9 @@ if (drained) onDrain(); | ||
if (replyCallback) replyCallback(data); | ||
clearPendingReply(msg.properties.correlationId); | ||
} | ||
function clearPendingReply(correlationId) { | ||
delete pendingReplies[correlationId]; | ||
} | ||
function bail(err) { | ||
@@ -217,3 +266,3 @@ // TODO: close all queue channels? | ||
function onDrain() { | ||
setImmediate(function() { | ||
setImmediate(function () { | ||
publishing--; | ||
@@ -233,5 +282,6 @@ if (publishing === 0) { | ||
if (isDefault(emitter.name, emitter.type) || isNameless(emitter.name)) { | ||
onExchange(undefined, { exchange: emitter.name }); | ||
} | ||
else { | ||
onExchange(undefined, { | ||
exchange: emitter.name | ||
}); | ||
} else { | ||
channel.assertExchange(emitter.name, emitter.type, emitter.options, onExchange); | ||
@@ -244,3 +294,3 @@ } | ||
replyQueue.connect(connection); | ||
replyQueue.once('ready', function() { | ||
replyQueue.once('ready', function () { | ||
ready = true; | ||
@@ -258,2 +308,2 @@ emitter.emit('ready'); | ||
} | ||
} | ||
} |
{ | ||
"name": "@pager/jackrabbit", | ||
"version": "4.6.1", | ||
"version": "4.7.0-rc.1", | ||
"description": "Easy RabbitMQ for node", | ||
@@ -32,3 +32,3 @@ "keywords": [ | ||
"lodash.assignin": "4.x.x", | ||
"uuid": "^3.0.1" | ||
"uuid": "3.x.x" | ||
}, | ||
@@ -35,0 +35,0 @@ "devDependencies": { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
15276
430
0
2
Updateduuid@3.x.x