@pager/jackrabbit
Advanced tools
Comparing version 4.7.1 to 4.8.0
@@ -1,291 +0,361 @@ | ||
'use strict' | ||
'use strict'; | ||
const extend = require('lodash.assignin') | ||
const EventEmitter = require('events').EventEmitter | ||
const uuid = require('uuid/v4') | ||
const Extend = require('lodash.assignin'); | ||
const EventEmitter = require('events').EventEmitter; | ||
const Uuid = require('uuid/v4'); | ||
const queue = require('./queue') | ||
const Queue = require('./queue'); | ||
const DEFAULT_EXCHANGES = { | ||
'direct': 'amq.direct', | ||
'fanout': 'amq.fanout', | ||
'topic': 'amq.topic' | ||
} | ||
'direct': 'amq.direct', | ||
'fanout': 'amq.fanout', | ||
'topic': 'amq.topic' | ||
}; | ||
const DEFAULT_EXCHANGE_OPTIONS = { | ||
durable: true, | ||
internal: false, | ||
autoDelete: false, | ||
alternateExchange: undefined | ||
} | ||
durable: true, | ||
noReply: false, | ||
internal: false, | ||
autoDelete: false, | ||
alternateExchange: undefined | ||
}; | ||
const DEFAULT_PUBLISH_OPTIONS = { | ||
contentType: 'application/json', | ||
mandatory: false, | ||
persistent: false, | ||
expiration: undefined, | ||
userId: undefined, | ||
CC: undefined, | ||
BCC: undefined | ||
} | ||
contentType: 'application/json', | ||
mandatory: false, | ||
persistent: false, | ||
expiration: undefined, | ||
userId: undefined, | ||
CC: undefined, | ||
BCC: undefined | ||
}; | ||
const DEFAULT_RPC_CLIENT_OPTIONS = { | ||
timeout: 3000 | ||
} | ||
timeout: 3000 | ||
}; | ||
module.exports = exchange | ||
function exchange (name, type, options) { | ||
if (!type) { | ||
throw new Error('missing exchange type') | ||
} | ||
if (!isNameless(name)) { | ||
name = name || DEFAULT_EXCHANGES[type] | ||
if (!name) { | ||
throw new Error('missing exchange name') | ||
} | ||
} | ||
const isDefault = (name, type) => { | ||
let ready = false | ||
let connection, channel | ||
let publishing = 0 | ||
const replyQueue = queue({ exclusive: true }) | ||
const pendingReplies = {} | ||
return DEFAULT_EXCHANGES[type] === name; | ||
}; | ||
const emitter = extend(new EventEmitter(), { | ||
name: name, | ||
type: type, | ||
options: extend({}, DEFAULT_EXCHANGE_OPTIONS, options), | ||
queue: createQueue, | ||
connect: connect, | ||
publish: publish, | ||
rpcClient: rpcClient, | ||
rpcServer: rpcServer | ||
}) | ||
const isNameless = (name) => { | ||
return emitter | ||
return name === ''; | ||
}; | ||
function rpcClient (key, msg, options, cb) { | ||
if (!key) { | ||
throw new Error('missing rpc method') | ||
} | ||
const exchange = (name, type, exchangeOptions) => { | ||
if (!cb && typeof options === 'function') { | ||
cb = options | ||
if (!type) { | ||
throw new Error('missing exchange type'); | ||
} | ||
if (!options || typeof options !== 'object') { | ||
options = DEFAULT_RPC_CLIENT_OPTIONS | ||
if (!isNameless(name)) { | ||
name = name || DEFAULT_EXCHANGES[type]; | ||
if (!name) { | ||
throw new Error('missing exchange name'); | ||
} | ||
} | ||
const opts = extend({}, { | ||
key: key, | ||
rpcCallback: cb | ||
}, options) | ||
let ready = false; | ||
let channel; | ||
let connection; | ||
let publishing = 0; | ||
const options = Extend({}, DEFAULT_EXCHANGE_OPTIONS, exchangeOptions); | ||
const replyQueue = options.noReply ? null : Queue({ exclusive: true }); | ||
const pendingReplies = {}; | ||
publish(msg, opts) | ||
} | ||
const rpcClient = (key, msg, rpcOptions, cb) => { | ||
function rpcServer (key, handler) { | ||
const rpcQueue = createQueue({ | ||
key: key, | ||
name: key, | ||
prefetch: 1, | ||
durable: false, | ||
autoDelete: true | ||
}) | ||
rpcQueue.consume(handler) | ||
} | ||
if (!key) { | ||
throw new Error('missing rpc method'); | ||
} | ||
function connect (con) { | ||
connection = con | ||
connection.createChannel(onChannel) | ||
replyQueue.on('close', bail.bind(this)) | ||
replyQueue.consume(onReply, { noAck: true }) | ||
return emitter | ||
} | ||
if (!cb && typeof rpcOptions === 'const') { | ||
function createQueue (options) { | ||
const newQueue = queue(options) | ||
newQueue.on('close', bail.bind(this)) | ||
newQueue.once('ready', function () { | ||
// the default exchange has implicit bindings to all queues | ||
if (!isNameless(emitter.name)) { | ||
const keys = options.keys || [options.key] | ||
bindKeys(keys) | ||
.then(function emitBoundEvent (res) { | ||
newQueue.emit('bound') | ||
}) | ||
.catch(bail) | ||
} | ||
}) | ||
cb = rpcOptions; | ||
} | ||
if (connection) { | ||
newQueue.connect(connection) | ||
} else { | ||
emitter.once('ready', function () { | ||
newQueue.connect(connection) | ||
}) | ||
} | ||
return newQueue | ||
if (!rpcOptions || typeof rpcOptions !== 'object') { | ||
rpcOptions = DEFAULT_RPC_CLIENT_OPTIONS; | ||
} | ||
// return a promise when all keys are bound | ||
function bindKeys (keys) { | ||
return Promise.all(keys.map(bindKey)) | ||
const opts = Extend({}, { | ||
key, | ||
rpcCallback: cb | ||
}, rpcOptions); | ||
// returns a promise when a key is bound | ||
function bindKey (key) { | ||
return new Promise(function (resolve, reject) { | ||
channel.bindQueue(newQueue.name, emitter.name, key, {}, function onBind (err, ok) { | ||
if (err) return reject(err) | ||
return resolve(ok) | ||
}) | ||
}) | ||
} | ||
} | ||
} | ||
publish(msg, opts); | ||
}; | ||
function publish (message, options) { | ||
publishing++ | ||
options = options || {} | ||
const rpcServer = (key, handler) => { | ||
const sendMessageRef = options.rpcCallback ? sendRpcMessage : sendMessage | ||
const rpcQueue = createQueue({ | ||
key, | ||
name: key, | ||
prefetch: 1, | ||
durable: false, | ||
autoDelete: true | ||
}); | ||
rpcQueue.consume(handler); | ||
}; | ||
if (ready) { | ||
sendMessageRef() | ||
} else { | ||
emitter.once('ready', sendMessageRef) | ||
} | ||
const connect = (con) => { | ||
return emitter | ||
connection = con; | ||
connection.createChannel(onChannel); | ||
function sendMessage () { | ||
// TODO: better blacklisting/whitelisting of properties | ||
const opts = extend({}, DEFAULT_PUBLISH_OPTIONS, options) | ||
const msg = encodeMessage(message, opts.contentType) | ||
if (replyQueue) { | ||
replyQueue.on('close', bail.bind(this)); | ||
replyQueue.consume(onReply, { noAck: true }); | ||
} | ||
if (opts.reply) { | ||
opts.replyTo = replyQueue.name | ||
opts.correlationId = uuid() | ||
pendingReplies[opts.correlationId] = opts.reply | ||
delete opts.reply | ||
} | ||
return emitter; | ||
}; | ||
const drained = channel.publish(emitter.name, opts.key, new Buffer(msg), opts) | ||
if (drained) onDrain() | ||
} | ||
const createQueue = (queueOptions) => { | ||
function sendRpcMessage () { | ||
const opts = extend({}, DEFAULT_PUBLISH_OPTIONS, options) | ||
const msg = encodeMessage(message, opts.contentType) | ||
// return a promise when all keys are bound | ||
const bindKeys = (keys) => { | ||
let replied = false | ||
const correlationId = uuid() | ||
const rpcCallback = opts.rpcCallback | ||
// returns a promise when a key is bound | ||
const bindKey = (key) => { | ||
function onReply (reply) { | ||
clearTimeout(timeout) | ||
channel.removeListener('return', onNotFound) | ||
return new Promise(((resolve, reject) => { | ||
if (!replied) { | ||
replied = true | ||
rpcCallback(reply) | ||
channel.bindQueue(newQueue.name, emitter.name, key, {}, (err, ok) => { | ||
if (err) { | ||
return reject(err); | ||
} | ||
return resolve(ok); | ||
}); | ||
})); | ||
}; | ||
return Promise.all(keys.map(bindKey)); | ||
}; | ||
const newQueue = Queue(queueOptions); | ||
newQueue.on('close', bail.bind(this)); | ||
newQueue.once('ready', () => { | ||
// the default exchange has implicit bindings to all queues | ||
if (!isNameless(emitter.name)) { | ||
const keys = queueOptions.keys || [queueOptions.key]; | ||
bindKeys(keys) | ||
.then((res) => { | ||
newQueue.emit('bound'); | ||
}) | ||
.catch(bail); | ||
} | ||
}); | ||
if (connection) { | ||
newQueue.connect(connection); | ||
} | ||
} | ||
else { | ||
emitter.once('ready', () => { | ||
function onNotFound (notFound) { | ||
clearTimeout(timeout) | ||
clearPendingReply(correlationId) | ||
channel.removeListener('return', onNotFound) | ||
newQueue.connect(connection); | ||
}); | ||
} | ||
if (!replied) { | ||
replied = true | ||
rpcCallback(new Error('Not Found')) | ||
return newQueue; | ||
}; | ||
const publish = (message, publishOptions) => { | ||
const sendMessage = () => { | ||
// TODO: better blacklisting/whitelisting of properties | ||
const opts = Extend({}, DEFAULT_PUBLISH_OPTIONS, publishOptions); | ||
const msg = encodeMessage(message, opts.contentType); | ||
if (opts.reply) { | ||
if (!replyQueue) { | ||
throw new Error('reply queue not found'); | ||
} | ||
opts.replyTo = replyQueue.name; | ||
opts.correlationId = Uuid(); | ||
pendingReplies[opts.correlationId] = opts.reply; | ||
delete opts.reply; | ||
} | ||
const drained = channel.publish(emitter.name, opts.key, Buffer.from(msg), opts); | ||
if (drained) { | ||
onDrain(); | ||
} | ||
}; | ||
const sendRpcMessage = () => { | ||
const opts = Extend({}, DEFAULT_PUBLISH_OPTIONS, publishOptions); | ||
const msg = encodeMessage(message, opts.contentType); | ||
let replied = false; | ||
const correlationId = Uuid(); | ||
const rpcCallback = opts.rpcCallback; | ||
const onReply = (reply) => { | ||
clearTimeout(timeout); | ||
channel.removeListener('return', onNotFound); | ||
if (!replied) { | ||
replied = true; | ||
rpcCallback(reply); | ||
} | ||
}; | ||
const onNotFound = (notFound) => { | ||
clearTimeout(timeout); | ||
clearPendingReply(correlationId); | ||
channel.removeListener('return', onNotFound); | ||
if (!replied) { | ||
replied = true; | ||
rpcCallback(new Error('Not Found')); | ||
} | ||
}; | ||
const timeout = setTimeout(() => { | ||
clearPendingReply(correlationId); | ||
channel.removeListener('return', onNotFound); | ||
if (!replied) { | ||
replied = true; | ||
rpcCallback(new Error('Timeout')); | ||
} | ||
}, publishOptions.timeout || DEFAULT_RPC_CLIENT_OPTIONS.timeout); | ||
opts.replyTo = replyQueue.name; | ||
opts.correlationId = correlationId; | ||
opts.mandatory = true; | ||
pendingReplies[opts.correlationId] = onReply; | ||
channel.once('return', onNotFound); | ||
const drained = channel.publish(emitter.name, opts.key, Buffer.from(msg), opts); | ||
if (drained) { | ||
onDrain(); | ||
} | ||
}; | ||
publishing++; | ||
publishOptions = publishOptions || {}; | ||
const sendMessageRef = publishOptions.rpcCallback ? sendRpcMessage : sendMessage; | ||
if (ready) { | ||
sendMessageRef(); | ||
} | ||
} | ||
else { | ||
emitter.once('ready', sendMessageRef); | ||
} | ||
const timeout = setTimeout(function () { | ||
clearPendingReply(correlationId) | ||
channel.removeListener('return', onNotFound) | ||
return emitter; | ||
}; | ||
if (!replied) { | ||
replied = true | ||
rpcCallback(new Error('Timeout')) | ||
const encodeMessage = (message, contentType) => { | ||
if (contentType === 'application/json') { | ||
return JSON.stringify(message); | ||
} | ||
}, options.timeout || DEFAULT_RPC_CLIENT_OPTIONS.timeout) | ||
opts.replyTo = replyQueue.name | ||
opts.correlationId = correlationId | ||
opts.mandatory = true | ||
return message; | ||
}; | ||
pendingReplies[opts.correlationId] = onReply | ||
channel.once('return', onNotFound) | ||
const onReply = (data, ack, nack, msg) => { | ||
const drained = channel.publish(emitter.name, opts.key, new Buffer(msg), opts) | ||
if (drained) onDrain() | ||
} | ||
} | ||
const replyCallback = pendingReplies[msg.properties.correlationId]; | ||
if (replyCallback) { | ||
replyCallback(data); | ||
} | ||
function encodeMessage (message, contentType) { | ||
if (contentType === 'application/json') return JSON.stringify(message) | ||
return message | ||
} | ||
clearPendingReply(msg.properties.correlationId); | ||
}; | ||
function onReply (data, ack, nack, msg) { | ||
const replyCallback = pendingReplies[msg.properties.correlationId] | ||
if (replyCallback) replyCallback(data) | ||
clearPendingReply(msg.properties.correlationId) | ||
} | ||
const clearPendingReply = (correlationId) => { | ||
function clearPendingReply (correlationId) { | ||
delete pendingReplies[correlationId] | ||
} | ||
delete pendingReplies[correlationId]; | ||
}; | ||
function bail (err) { | ||
// TODO: close all queue channels? | ||
connection = undefined | ||
channel = undefined | ||
emitter.emit('close', err) | ||
} | ||
const bail = (err) => { | ||
function onDrain () { | ||
setImmediate(function () { | ||
publishing-- | ||
if (publishing === 0) { | ||
emitter.emit('drain') | ||
} | ||
}) | ||
} | ||
// TODO: close all queue channels? | ||
connection = undefined; | ||
channel = undefined; | ||
emitter.emit('close', err); | ||
}; | ||
function onChannel (err, chan) { | ||
if (err) return bail(err) | ||
channel = chan | ||
channel.on('close', bail.bind(this, new Error('channel closed'))) | ||
channel.on('drain', onDrain) | ||
emitter.emit('connected') | ||
if (isDefault(emitter.name, emitter.type) || isNameless(emitter.name)) { | ||
onExchange(undefined, { | ||
exchange: emitter.name | ||
}) | ||
} else { | ||
channel.assertExchange(emitter.name, emitter.type, emitter.options, onExchange) | ||
} | ||
} | ||
const onDrain = () => { | ||
function onExchange (err, info) { | ||
if (err) return bail(err) | ||
replyQueue.connect(connection) | ||
replyQueue.once('ready', function () { | ||
ready = true | ||
emitter.emit('ready') | ||
}) | ||
} | ||
setImmediate(() => { | ||
function isDefault (name, type) { | ||
return DEFAULT_EXCHANGES[type] === name | ||
} | ||
publishing--; | ||
if (publishing === 0) { | ||
emitter.emit('drain'); | ||
} | ||
}); | ||
}; | ||
function isNameless (name) { | ||
return name === '' | ||
} | ||
} | ||
const onChannel = (err, chan) => { | ||
if (err) { | ||
return bail(err); | ||
} | ||
channel = chan; | ||
channel.on('close', bail.bind(this, new Error('channel closed'))); | ||
channel.on('drain', onDrain); | ||
emitter.emit('connected'); | ||
if (isDefault(emitter.name, DEFAULT_EXCHANGES[emitter.type]) || isNameless(emitter.name)) { | ||
onExchange(undefined, { | ||
exchange: emitter.name | ||
}); | ||
} | ||
else { | ||
channel.assertExchange(emitter.name, emitter.type, emitter.options, onExchange); | ||
} | ||
}; | ||
const onExchange = (err, info) => { | ||
if (err) { | ||
return bail(err); | ||
} | ||
if (!replyQueue) { | ||
ready = true; | ||
emitter.emit('ready'); | ||
return; | ||
} | ||
replyQueue.connect(connection); | ||
replyQueue.once('ready', () => { | ||
ready = true; | ||
emitter.emit('ready'); | ||
}); | ||
}; | ||
const emitter = Extend(new EventEmitter(), { | ||
name, | ||
type, | ||
options, | ||
queue: createQueue, | ||
connect, | ||
publish, | ||
rpcClient, | ||
rpcServer | ||
}); | ||
return emitter; | ||
}; | ||
module.exports = exchange; |
'use strict'; | ||
const amqp = require('amqplib/callback_api'); | ||
const extend = require('lodash.assignin'); | ||
const Amqp = require('amqplib/callback_api'); | ||
const Extend = require('lodash.assignin'); | ||
const EventEmitter = require('events').EventEmitter; | ||
const exchange = require('./exchange'); | ||
const Exchange = require('./exchange'); | ||
module.exports = jackrabbit; | ||
const jackrabbit = (url) => { | ||
function jackrabbit(url) { | ||
if (!url) throw new Error('url required for jackrabbit connection'); | ||
if (!url) { | ||
throw new Error('url required for jackrabbit connection'); | ||
} | ||
// state | ||
let connection; | ||
// state | ||
const rabbit = extend(new EventEmitter(), { | ||
default: createDefaultExchange, | ||
direct: createExchange().bind(null, 'direct'), | ||
fanout: createExchange().bind(null, 'fanout'), | ||
topic: createExchange().bind(null, 'topic'), | ||
exchange: createExchange(), | ||
close: close, | ||
getInternals: getInternals | ||
}); | ||
let connection; | ||
amqp.connect(url, onConnection); | ||
return rabbit; | ||
// public | ||
// public | ||
const getInternals = () => { | ||
function getInternals() { | ||
return { | ||
amqp: amqp, | ||
connection: connection | ||
return { | ||
amqp: Amqp, | ||
connection | ||
}; | ||
}; | ||
} | ||
function close(callback) { | ||
if (!connection) { | ||
if (callback) callback(); | ||
return; | ||
} | ||
try { | ||
// I don't think amqplib should be throwing here, as this is an async function | ||
// TODO: figure out how to test whether or not amqplib will throw | ||
// (eg, how do they determine if closing is an illegal operation?) | ||
connection.close(function(err) { | ||
if (callback) callback(err); | ||
rabbit.emit('close'); | ||
}); | ||
} | ||
catch (e) { | ||
if (callback) callback(e); | ||
} | ||
} | ||
const close = (callback) => { | ||
function createDefaultExchange() { | ||
return createExchange()('direct', ''); | ||
} | ||
if (!connection) { | ||
if (callback) { | ||
callback(); | ||
} | ||
function createExchange() { | ||
return function(type, name, options) { | ||
const newExchange = exchange(name, type, options); | ||
if (connection) { | ||
newExchange.connect(connection); | ||
} | ||
else { | ||
rabbit.once('connected', function() { | ||
newExchange.connect(connection); | ||
}); | ||
} | ||
return newExchange; | ||
return; | ||
} | ||
try { | ||
// I don't think amqplib should be throwing here, as this is an async const | ||
// TODO: figure out how to test whether or not amqplib will throw | ||
// (eg, how do they determine if closing is an illegal operation?) | ||
connection.close((err) => { | ||
if (callback) { | ||
callback(err); | ||
} | ||
rabbit.emit('close'); | ||
}); | ||
} | ||
catch (e) { | ||
if (callback) { | ||
callback(e); | ||
} | ||
} | ||
}; | ||
} | ||
// private | ||
const createDefaultExchange = () => { | ||
function bail(err) { | ||
// TODO close any connections or channels that remain open | ||
connection = undefined; | ||
channel = undefined; | ||
if (err) rabbit.emit('error', err); | ||
} | ||
return createExchange()('direct', ''); | ||
}; | ||
function onConnection(err, conn) { | ||
if (err) return bail(err); | ||
connection = conn; | ||
connection.on('close', bail.bind(this)); | ||
rabbit.emit('connected'); | ||
} | ||
} | ||
const createExchange = () => { | ||
return (type, name, options) => { | ||
const newExchange = Exchange(name, type, options); | ||
if (connection) { | ||
newExchange.connect(connection); | ||
} | ||
else { | ||
rabbit.once('connected', () => { | ||
newExchange.connect(connection); | ||
}); | ||
} | ||
return newExchange; | ||
}; | ||
}; | ||
// private | ||
const bail = (err) => { | ||
// TODO close any connections or channels that remain open | ||
connection = undefined; | ||
if (err) { | ||
rabbit.emit('error', err); | ||
} | ||
}; | ||
const onConnection = (err, conn) => { | ||
if (err) { | ||
return bail(err); | ||
} | ||
connection = conn; | ||
connection.on('close', bail.bind(this)); | ||
rabbit.emit('connected'); | ||
}; | ||
const rabbit = Extend(new EventEmitter(), { | ||
default: createDefaultExchange, | ||
direct: createExchange().bind(null, 'direct'), | ||
fanout: createExchange().bind(null, 'fanout'), | ||
topic: createExchange().bind(null, 'topic'), | ||
exchange: createExchange(), | ||
close, | ||
getInternals | ||
}); | ||
Amqp.connect(url, onConnection); | ||
return rabbit; | ||
}; | ||
module.exports = jackrabbit; |
284
lib/queue.js
'use strict'; | ||
const amqp = require('amqplib/callback_api'); | ||
const extend = require('lodash.assignin'); | ||
const Extend = require('lodash.assignin'); | ||
const EventEmitter = require('events').EventEmitter; | ||
const DEFAULT_QUEUE_OPTIONS = { | ||
exclusive: false, | ||
durable: true, | ||
prefetch: 1, // can be set on the queue because we use a per-queue channel | ||
messageTtl: undefined, | ||
maxLength: undefined | ||
exclusive: false, | ||
durable: true, | ||
prefetch: 1, // can be set on the queue because we use a per-queue channel | ||
messageTtl: undefined, | ||
maxLength: undefined | ||
}; | ||
const DEFAULT_CONSUME_OPTIONS = { | ||
consumerTag: undefined, | ||
noAck: false, | ||
exclusive: false, | ||
priority: undefined | ||
consumerTag: undefined, | ||
noAck: false, | ||
exclusive: false, | ||
priority: undefined | ||
}; | ||
module.exports = queue; | ||
const queue = (options) => { | ||
function queue(options) { | ||
options = options || {}; | ||
let channel, consumerTag, ready; | ||
const emitter = extend(new EventEmitter(), { | ||
name: options.name, | ||
options: extend({}, DEFAULT_QUEUE_OPTIONS, options), | ||
connect: connect, | ||
consume: consume, | ||
cancel: cancel, | ||
purge: purge | ||
}); | ||
const connect = (connection) => { | ||
return emitter; | ||
connection.createChannel(onChannel); | ||
}; | ||
function connect(connection) { | ||
connection.createChannel(onChannel); | ||
} | ||
const consume = (callback, consumeOptions) => { | ||
function consume(callback, options) { | ||
if( ready ){ | ||
const opts = extend({}, DEFAULT_CONSUME_OPTIONS, options); | ||
channel.consume(emitter.name, onMessage, opts, onConsume); | ||
} | ||
if ( ready ){ | ||
const opts = Extend({}, DEFAULT_CONSUME_OPTIONS, consumeOptions); | ||
channel.consume(emitter.name, onMessage, opts, onConsume); | ||
} | ||
else { | ||
emitter.once('ready', () => { | ||
else { | ||
emitter.once('ready', function() { | ||
const opts = extend({}, DEFAULT_CONSUME_OPTIONS, options); | ||
channel.consume(emitter.name, onMessage, opts, onConsume); | ||
}); | ||
} | ||
const opts = Extend({}, DEFAULT_CONSUME_OPTIONS, consumeOptions); | ||
channel.consume(emitter.name, onMessage, opts, onConsume); | ||
}); | ||
} | ||
function onMessage(msg) { | ||
const data = parseMessage(msg); | ||
if (!data) return; | ||
const onMessage = (msg) => { | ||
callback(data, ack, nack, msg); | ||
const data = parseMessage(msg); | ||
if (!data) { | ||
return; | ||
} | ||
function ack(reply) { | ||
const replyTo = msg.properties.replyTo; | ||
const id = msg.properties.correlationId; | ||
if (replyTo && id) { | ||
const buffer = encodeMessage(reply, msg.properties.contentType); | ||
channel.publish('', replyTo, buffer, { | ||
correlationId: id, | ||
contentType: msg.properties.contentType | ||
}); | ||
const ack = (reply) => { | ||
const replyTo = msg.properties.replyTo; | ||
const id = msg.properties.correlationId; | ||
if (replyTo && id) { | ||
const buffer = encodeMessage(reply, msg.properties.contentType); | ||
channel.publish('', replyTo, buffer, { | ||
correlationId: id, | ||
contentType: msg.properties.contentType | ||
}); | ||
} | ||
channel.ack(msg); | ||
}; | ||
const nack = (opts) => { | ||
opts = opts || {}; | ||
opts.allUpTo = opts.allUpTo !== undefined ? opts.allUpTo : false; | ||
opts.requeue = opts.requeue !== undefined ? opts.requeue : true; | ||
channel.nack(msg, opts.allUpTo, opts.requeue); | ||
}; | ||
callback(data, ack, nack, msg); | ||
}; | ||
}; | ||
const cancel = (done) => { | ||
if (!consumerTag) { | ||
return; | ||
} | ||
channel.ack(msg); | ||
} | ||
function nack(opts) { | ||
opts = opts || {}; | ||
opts.allUpTo = opts.allUpTo !== undefined ? opts.allUpTo : false; | ||
opts.requeue = opts.requeue !== undefined ? opts.requeue : true; | ||
channel.nack(msg, opts.allUpTo, opts.requeue); | ||
} | ||
} | ||
} | ||
if (!channel) { | ||
return; | ||
} | ||
function cancel(done) { | ||
if (!consumerTag) return; | ||
if (!channel) return; | ||
channel.cancel(consumerTag, done); | ||
} | ||
channel.cancel(consumerTag, done); | ||
}; | ||
function purge(done) { | ||
if (channel) { | ||
channel.purgeQueue(emitter.name, onPurged); | ||
} else { | ||
emitter.once('ready', function() { | ||
channel.purgeQueue(emitter.name, onPurged); | ||
}); | ||
} | ||
const purge = (done) => { | ||
function onPurged(err, obj) { | ||
if (err) return done(err); | ||
done(undefined, obj.messageCount); | ||
} | ||
} | ||
const onPurged = (err, obj) => { | ||
function encodeMessage(message, contentType) { | ||
if (contentType === 'application/json') { | ||
return new Buffer(JSON.stringify(message)); | ||
} | ||
return new Buffer(message.toString()); | ||
} | ||
if (err) { | ||
return done(err); | ||
} | ||
function parseMessage(msg) { | ||
if (msg.properties.contentType === 'application/json') { | ||
try { | ||
return JSON.parse(msg.content.toString()); | ||
} | ||
catch (e) { | ||
emitter.emit('error', new Error('unable to parse message as JSON')); | ||
return; | ||
} | ||
} | ||
return msg.content; | ||
} | ||
done(undefined, obj.messageCount); | ||
}; | ||
function onConsume(err, info) { | ||
if (err) return bail(err); | ||
consumerTag = info.consumerTag; // required to stop consuming | ||
emitter.emit('consuming'); | ||
} | ||
if (channel) { | ||
channel.purgeQueue(emitter.name, onPurged); | ||
} | ||
else { | ||
emitter.once('ready', () => { | ||
function bail(err) { | ||
channel.purgeQueue(emitter.name, onPurged); | ||
}); | ||
} | ||
}; | ||
const encodeMessage = (message, contentType) => { | ||
if (contentType === 'application/json') { | ||
return Buffer.from(JSON.stringify(message)); | ||
} | ||
return Buffer.from(message.toString()); | ||
}; | ||
const parseMessage = (msg) => { | ||
if (msg.properties.contentType === 'application/json') { | ||
try { | ||
return JSON.parse(msg.content.toString()); | ||
} | ||
catch (e) { | ||
emitter.emit('error', new Error('unable to parse message as JSON')); | ||
return; | ||
} | ||
} | ||
return msg.content; | ||
}; | ||
const onConsume = (err, info) => { | ||
if (err) { | ||
return bail(err); | ||
} | ||
consumerTag = info.consumerTag; // required to stop consuming | ||
emitter.emit('consuming'); | ||
}; | ||
const bail = (err) => { | ||
// TODO: close the channel if still open | ||
channel = undefined; | ||
emitter.name = undefined; | ||
consumerTag = undefined; | ||
emitter.emit('close', err); | ||
} | ||
channel = undefined; | ||
emitter.name = undefined; | ||
consumerTag = undefined; | ||
emitter.emit('close', err); | ||
}; | ||
function onChannel(err, chan) { | ||
if (err) return bail(err); | ||
channel = chan; | ||
channel.prefetch(emitter.options.prefetch); | ||
channel.on('close', bail.bind(this, new Error('channel closed'))); | ||
emitter.emit('connected'); | ||
channel.assertQueue(emitter.name, emitter.options, onQueue); | ||
} | ||
const onChannel = (err, chan) => { | ||
function onQueue(err, info) { | ||
if (err) return bail(err); | ||
emitter.name = info.queue; | ||
ready = true; | ||
emitter.emit('ready'); | ||
} | ||
} | ||
if (err) { | ||
return bail(err); | ||
} | ||
channel = chan; | ||
channel.prefetch(emitter.options.prefetch); | ||
channel.on('close', bail.bind(this, new Error('channel closed'))); | ||
emitter.emit('connected'); | ||
channel.assertQueue(emitter.name, emitter.options, onQueue); | ||
}; | ||
const onQueue = (err, info) => { | ||
if (err) { | ||
return bail(err); | ||
} | ||
emitter.name = info.queue; | ||
ready = true; | ||
emitter.emit('ready'); | ||
}; | ||
options = options || {}; | ||
let channel; let consumerTag; let ready; | ||
const emitter = Extend(new EventEmitter(), { | ||
name: options.name, | ||
options: Extend({}, DEFAULT_QUEUE_OPTIONS, options), | ||
connect, | ||
consume, | ||
cancel, | ||
purge | ||
}); | ||
return emitter; | ||
}; | ||
module.exports = queue; |
{ | ||
"name": "@pager/jackrabbit", | ||
"version": "4.7.1", | ||
"version": "4.8.0", | ||
"description": "Easy RabbitMQ for node", | ||
@@ -22,7 +22,7 @@ "keywords": [ | ||
"scripts": { | ||
"test": "mocha -R spec", | ||
"lint": "eslint ." | ||
"test": "mocha", | ||
"lint": "eslint lib/** test/**" | ||
}, | ||
"engines": { | ||
"node": ">= 0.10.x" | ||
"node": ">= 10.x" | ||
}, | ||
@@ -32,13 +32,17 @@ "author": "Hunter Loftis <hunter@hunterloftis.com>", | ||
"dependencies": { | ||
"amqplib": "^0.5.1", | ||
"amqplib": "0.5.x", | ||
"lodash.assignin": "4.x.x", | ||
"uuid": "^3.0.1" | ||
"uuid": "3.x.x" | ||
}, | ||
"devDependencies": { | ||
"chai": "4.x", | ||
"chai": "4.x.x", | ||
"eslint": "6.x.x", | ||
"eslint-config-hapi": "12.x.x", | ||
"eslint-plugin-hapi": "4.x.x", | ||
"mocha": "3.x.x" | ||
"mocha": "6.x.x" | ||
}, | ||
"mocha": { | ||
"bail": true, | ||
"recursive": true | ||
} | ||
} |
@@ -78,11 +78,3 @@ # Jackrabbit | ||
``` | ||
$ docker-compose run jackrabbit npm test | ||
$ docker-compose up | ||
``` | ||
If using Docker-Machine on OSX: | ||
``` | ||
$ docker-machine start | ||
$ eval "$(docker-machine env default)" | ||
$ docker-compose run jackrabbit npm test | ||
``` |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
18961
505
80
Updatedamqplib@0.5.x
Updateduuid@3.x.x