jackrabbit
Advanced tools
Comparing version 3.0.9 to 4.0.0
@@ -1,135 +0,76 @@ | ||
var amqp = require('amqplib'); | ||
var amqp = require('amqplib/callback_api'); | ||
var _ = require('lodash'); | ||
var EventEmitter = require('events').EventEmitter; | ||
var exchange = require('./exchange'); | ||
var Queue = require('./queue'); | ||
module.exports = jackrabbit; | ||
function JackRabbit(url, prefetch) { | ||
function jackrabbit(url) { | ||
if (!url) throw new Error('url required for jackrabbit connection'); | ||
EventEmitter.call(this); | ||
// state | ||
var connection; | ||
this.connection = null; | ||
this.channel = null; | ||
this.prefetch = prefetch || 1; | ||
this.queues = {}; | ||
var rabbit = _.extend(new EventEmitter(), { | ||
default: createDefaultExchange, | ||
direct: createExchange('direct'), | ||
fanout: createExchange('fanout'), | ||
topic: createExchange('topic'), | ||
close: close, | ||
getInternals: getInternals | ||
}); | ||
amqp | ||
.connect(url) | ||
.then(this.createChannel.bind(this)) | ||
.then(this.onChannel.bind(this)) | ||
.catch(this.onConnectionErr.bind(this)); | ||
} | ||
amqp.connect(url, onConnection); | ||
return rabbit; | ||
module.exports = function createJackRabbit(url, prefetch) { | ||
return new JackRabbit(url, prefetch); | ||
}; | ||
// public | ||
JackRabbit.prototype = Object.create(EventEmitter.prototype); | ||
function getInternals() { | ||
return { | ||
amqp: amqp, | ||
connection: connection | ||
}; | ||
} | ||
JackRabbit.prototype.createChannel = function(connection) { | ||
this.connection = connection; | ||
this.connection.once('close', this.onClose.bind(this)); | ||
return connection.createChannel(); | ||
}; | ||
function close(callback) { | ||
connection.close(function(err) { | ||
if (callback) callback(err); | ||
rabbit.emit('close'); | ||
}); | ||
} | ||
JackRabbit.prototype.onChannel = function(channel) { | ||
this.channel = channel; | ||
function createDefaultExchange() { | ||
return createExchange('direct')(''); | ||
} | ||
// Create a reply queue | ||
this.channel.replyHandlers = {}; | ||
this.channel.assertQueue('', { exclusive: true }) | ||
.then(function(replyTo) { | ||
this.channel.replyName = replyTo.queue; | ||
this.channel.consume(this.replyName, this.onReply.bind(this), { noAck: true }); | ||
this.emit('connected'); | ||
}.bind(this)); | ||
function createExchange(type) { | ||
return function(name) { | ||
var newExchange = exchange(name, type); | ||
if (connection) { | ||
newExchange.connect(connection); | ||
} | ||
else { | ||
rabbit.once('connected', function() { | ||
newExchange.connect(connection); | ||
}); | ||
} | ||
return newExchange; | ||
}; | ||
} | ||
this.channel.on('close', this.onChannelClose.bind(this)); | ||
this.channel.prefetch(this.prefetch); | ||
}; | ||
// private | ||
JackRabbit.prototype.onReply = function(msg) { | ||
var id = msg.properties.correlationId; | ||
var replyHandler = this.channel.replyHandlers[id]; | ||
if (!replyHandler) return; | ||
function bail(err) { | ||
// TODO close any connections or channels that remain open | ||
connection = undefined; | ||
channel = undefined; | ||
rabbit.emit('error', err); | ||
} | ||
var body = msg.content.toString(); | ||
var obj = JSON.parse(body); | ||
replyHandler(null, obj); | ||
//release the replyHandler in this.channel.replyHandlers | ||
delete this.channel.replyHandlers[id]; | ||
}; | ||
JackRabbit.prototype.close = function() { | ||
this.connection.close(); | ||
}; | ||
JackRabbit.prototype.onClose = function() { | ||
this.emit('disconnected'); | ||
}; | ||
JackRabbit.prototype.onConnectionErr = function(err) { | ||
this.emit('disconnected', err); | ||
}; | ||
JackRabbit.prototype.onChannelClose = function() { | ||
this.emit('disconnected'); | ||
}; | ||
JackRabbit.prototype.queue = function(name, options) { | ||
return new Queue(this.channel, name, options); | ||
}; | ||
JackRabbit.prototype.create = function(name, options, done) { | ||
if (!done) { | ||
if (typeof options === 'function') { | ||
done = options; | ||
options = {}; | ||
} | ||
else { | ||
done = function() {}; | ||
} | ||
function onConnection(err, conn) { | ||
if (err) return bail(err); | ||
connection = conn; | ||
connection.on('close', bail.bind(this)); | ||
rabbit.emit('connected'); | ||
} | ||
var queue = new Queue(this.channel, name, options); | ||
queue | ||
.once('ready', function onQueueReady(info) { | ||
this.queues[name] = queue; | ||
done(null, queue, info); | ||
}.bind(this)) | ||
.once('error', function onQueueErr(err) { | ||
done(err); | ||
}); | ||
}; | ||
JackRabbit.prototype.destroy = function(name, done) { | ||
this.channel | ||
.deleteQueue(name) | ||
.then(onSuccess) | ||
.catch(onFail); | ||
function onSuccess() { done(null, true); } | ||
function onFail(err) { done(err); } | ||
}; | ||
JackRabbit.prototype.purge = function(name, done) { | ||
this.channel | ||
.purgeQueue(name) | ||
.then(onSuccess) | ||
.catch(onFail); | ||
function onSuccess(response) { done(null, response.messageCount); } | ||
function onFail(err) { done(err); } | ||
}; | ||
JackRabbit.prototype.publish = function(name, obj, replyHandler) { | ||
this.queues[name].publish(obj, replyHandler); | ||
}; | ||
JackRabbit.prototype.handle = function(name, handler) { | ||
this.queues[name].subscribe(handler); | ||
}; | ||
JackRabbit.prototype.ignore = function(name) { | ||
this.queues[name].unsubscribe(); | ||
}; | ||
} |
168
lib/queue.js
@@ -0,81 +1,117 @@ | ||
var amqp = require('amqplib/callback_api'); | ||
var _ = require('lodash'); | ||
var EventEmitter = require('events').EventEmitter; | ||
var uuid = require('node-uuid'); | ||
var _ = require('lodash'); | ||
function Queue(channel, name, options) { | ||
EventEmitter.call(this); | ||
options = _.extend({}, { durable: true, noAck: false, messageTtl: 1000 }, options); | ||
var DEFAULT_QUEUE_OPTIONS = { | ||
exclusive: false, | ||
durable: true, | ||
prefetch: 1, // can be set on the queue because we use a per-queue channel | ||
messageTtl: undefined, | ||
maxLength: undefined | ||
}; | ||
this.handler = function() {}; | ||
this.name = name; | ||
this.replyName = null; | ||
this.channel = channel; | ||
this.durable = options.durable; | ||
this.noAck = options.noAck; | ||
this.tag = null; | ||
this.messageTtl = options.messageTtl; | ||
var DEFAULT_CONSUME_OPTIONS = { | ||
consumerTag: undefined, | ||
noAck: false, | ||
exclusive: false, | ||
priority: undefined | ||
}; | ||
this.channel | ||
.assertQueue(name, { | ||
durable: this.durable | ||
}) | ||
.then(function createReplyQueue(info) { | ||
this.emit('ready', info); | ||
}.bind(this)); | ||
} | ||
module.exports = queue; | ||
Queue.prototype = Object.create(EventEmitter.prototype); | ||
function queue(options) { | ||
options = options || {}; | ||
var channel, consumerTag; | ||
var emitter = _.extend(new EventEmitter(), { | ||
name: options.name, | ||
options: _.extend({}, DEFAULT_QUEUE_OPTIONS, options), | ||
connect: connect, | ||
consume: consume | ||
}); | ||
module.exports = Queue; | ||
return emitter; | ||
Queue.prototype.subscribe = function(handler) { | ||
this.handler = handler; | ||
var tag = this.channel | ||
.consume(this.name, this.onMessage.bind(this), { noAck: this.noAck }) | ||
.then(saveTag.bind(this)); | ||
// TODO: is there a race condition here if this isn't called before unsubscribe? | ||
function saveTag(obj) { | ||
this.tag = obj.consumerTag; | ||
function connect(connection) { | ||
connection.createChannel(onChannel); | ||
} | ||
}; | ||
Queue.prototype.unsubscribe = function() { | ||
this.channel.cancel(this.tag); | ||
this.handler = null; | ||
this.tag = null; | ||
}; | ||
function consume(callback, options) { | ||
emitter.once('ready', function() { | ||
var opts = _.extend({}, DEFAULT_CONSUME_OPTIONS, options); | ||
channel.consume(emitter.name, onMessage, opts, onConsume); | ||
}); | ||
Queue.prototype.onMessage = function(msg) { | ||
if (!msg) return; | ||
var body = msg.content.toString(); | ||
var obj = JSON.parse(body); | ||
var hasReply = msg.properties.replyTo; | ||
function onMessage(msg) { | ||
var data = parseMessage(msg); | ||
if (!data) return; | ||
if (hasReply && !this.noAck) this.channel.ack(msg); | ||
this.handler(obj, function(reply) { | ||
if (hasReply) { | ||
var replyBuffer = new Buffer(JSON.stringify(reply || '')); | ||
this.channel.sendToQueue(msg.properties.replyTo, replyBuffer, { | ||
correlationId: msg.properties.correlationId | ||
}); | ||
callback(data, ack, nack, msg); | ||
function ack(reply) { | ||
var replyTo = msg.properties.replyTo; | ||
var id = msg.properties.correlationId; | ||
if (replyTo && id) { | ||
var buffer = encodeMessage(reply, msg.properties.contentType); | ||
channel.publish('', replyTo, buffer, { | ||
correlationId: id, | ||
contentType: msg.properties.contentType | ||
}); | ||
} | ||
channel.ack(msg); | ||
} | ||
function nack() { | ||
channel.nack(msg); | ||
} | ||
} | ||
else if (!this.noAck) { | ||
this.channel.ack(msg); | ||
} | ||
function encodeMessage(message, contentType) { | ||
if (contentType === 'application/json') { | ||
return new Buffer(JSON.stringify(message)); | ||
} | ||
}.bind(this)); | ||
}; | ||
return new Buffer(message.toString()); | ||
} | ||
Queue.prototype.publish = function(obj, replyHandler) { | ||
var msg = JSON.stringify(obj); | ||
var id = uuid.v4(); | ||
if (replyHandler) { | ||
this.channel.replyHandlers[id] = replyHandler; | ||
function parseMessage(msg) { | ||
if (msg.properties.contentType === 'application/json') { | ||
try { | ||
return JSON.parse(msg.content.toString()); | ||
} | ||
catch (e) { | ||
emitter.emit('error', new Error('unable to parse message as JSON')); | ||
return; | ||
} | ||
} | ||
return msg.content; | ||
} | ||
this.channel.sendToQueue(this.name, new Buffer(msg), { | ||
persistent: !replyHandler, | ||
correlationId: id, | ||
expiration: this.messageTtl, | ||
replyTo: replyHandler ? this.channel.replyName : undefined | ||
}); | ||
}; | ||
function onConsume(err, info) { | ||
if (err) return bail(err); | ||
consumerTag = info.consumerTag; // required to stop consuming | ||
emitter.emit('consuming'); | ||
} | ||
function bail(err) { | ||
// TODO: close the channel if still open | ||
channel = undefined; | ||
emitter.name = undefined; | ||
consumerTag = undefined; | ||
emitter.emit('close', err); | ||
} | ||
function onChannel(err, chan) { | ||
if (err) return bail(err); | ||
channel = chan; | ||
channel.prefetch(emitter.options.prefetch); | ||
channel.on('close', bail.bind(this, new Error('channel closed'))); | ||
emitter.emit('connected'); | ||
channel.assertQueue(emitter.name, emitter.options, onQueue); | ||
} | ||
function onQueue(err, info) { | ||
if (err) return bail(err); | ||
emitter.name = info.queue; | ||
emitter.emit('ready'); | ||
} | ||
} |
{ | ||
"name": "jackrabbit", | ||
"version": "3.0.9", | ||
"description": "Simple AMQP / RabbitMQ job queues for node", | ||
"version": "4.0.0", | ||
"description": "Easy RabbitMQ for node", | ||
"readme": "https://github.com/hunterloftis/jackrabbit/blob/master/readme.md", | ||
"keywords": [ | ||
@@ -19,3 +20,3 @@ "amqp", | ||
"scripts": { | ||
"test": "mocha" | ||
"test": "mocha -R spec" | ||
}, | ||
@@ -28,12 +29,10 @@ "engines": { | ||
"dependencies": { | ||
"amqplib": "^0.2.1", | ||
"lodash": "^2.4.1", | ||
"amqplib": "^0.3.2", | ||
"lodash": "^3.10.1", | ||
"node-uuid": "^1.4.1" | ||
}, | ||
"devDependencies": { | ||
"chai": "^1.9.1", | ||
"express": "^4.8.7", | ||
"mocha": "^1.21.4", | ||
"supertest": "^0.13.0" | ||
"chai": "^3.2.0", | ||
"mocha": "^2.2.5" | ||
} | ||
} |
126
readme.md
# Jackrabbit | ||
Simple AMQP / RabbitMQ job queues for node | ||
RabbitMQ in Node.js without hating life. | ||
```js | ||
var queue = jackrabbit('amqp://localhost'); | ||
*producer.js:* | ||
queue.on('connected', function() { | ||
queue.create('jobs.greet', { prefetch: 5 }, onReady); | ||
function onReady() { | ||
queue.handle('jobs.greet', onJob); | ||
queue.publish('jobs.greet', { name: 'Hunter' }); | ||
} | ||
function onJob(job, ack) { | ||
console.log('Hello, ' + job.name); | ||
ack(); | ||
} | ||
}); | ||
``` | ||
## Installation | ||
``` | ||
npm install --save jackrabbit | ||
``` | ||
```js | ||
var jackrabbit = require('jackrabbit'); | ||
``` | ||
var rabbit = jackrabbit(process.env.RABBIT_URL); | ||
## Use | ||
First, create a queue and connect to an amqp server: | ||
```js | ||
var queue = jackrabbit(amqp_url, prefetch) | ||
rabbit | ||
.default() | ||
.publish('Hello World!', { key: 'hello' }) | ||
.on('drain', rabbit.close); | ||
``` | ||
- amqp_url: eg, 'amqp://localhost' | ||
- prefetch: messages to prefetch (default = 1) | ||
*consumer.js:* | ||
#### create | ||
Create (or assert) a queue. | ||
```js | ||
queue.create(name, options, callback) | ||
``` | ||
var jackrabbit = require('jackrabbit'); | ||
var rabbit = jackrabbit(process.env.RABBIT_URL); | ||
- name: name of the queue (eg, 'jobs.scrape') | ||
- options: object with... | ||
- durable (Boolean, default = true) | ||
- prefetch (Number, default = 1) | ||
- callback: callback function for result (err, queue_instance, queue_info) | ||
rabbit | ||
.default() | ||
.queue({ name: 'hello' }) | ||
.consume(onMessage, { noAck: true }); | ||
#### destroy | ||
Destroy a queue. | ||
```js | ||
queue.destroy(name, callback) | ||
function onMessage(data) { | ||
console.log('received:', data); | ||
} | ||
``` | ||
- name: name of the queue | ||
- callback: callback for result (err, destroyed) | ||
## Use | ||
You can destroy queues that exist or don't exist; | ||
if you try to destroy a queue that doesn't exist, | ||
destroyed = false. If a queue was destroyed, | ||
destroyed = true. | ||
For now, the best usage help is found in the examples, | ||
which map 1-to-1 with the official RabbitMQ tutorials. | ||
#### publish | ||
## Installation | ||
Publish a job to a queue. | ||
```js | ||
queue.publish(name, message) | ||
``` | ||
- name: name of the queue | ||
- message: an Object that is your message / job to add to the queue | ||
#### handle | ||
Start handling jobs in a queue. | ||
```js | ||
queue.handle(name, handler) | ||
npm install --save jackrabbit | ||
``` | ||
- name: name of the queue | ||
- handler: a Function to receive jobs (job, ack) | ||
## Tests | ||
Jobs must be acknowledged. You can either ack immediately | ||
(so all jobs will be run at most once) or | ||
you can ack on job completion (so all jobs will run at least once). | ||
The tests are set up with Docker + Docker-Compose, | ||
so you don't need to install rabbitmq (or even node) | ||
to run them: | ||
#### ignore | ||
Stop handling a queue. | ||
```js | ||
queue.ignore(name) | ||
``` | ||
- name: name of the queue | ||
#### purge | ||
Purge a queue. | ||
```js | ||
queue.purge(name, callback); | ||
$ docker-compose run jackrabbit npm test | ||
``` | ||
- name: name of the queue | ||
- callback: Function to be called on completion with (err, countOfPurgedMessages) | ||
## Tests | ||
1. Run rabbit on 'amqp://localhost' | ||
2. `npm test` |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
23948
2
22
693
53
30
1
+ Addedamqplib@0.3.2(transitive)
+ Addedlodash@3.10.1(transitive)
+ Addedwhen@3.6.4(transitive)
- Removedamqplib@0.2.1(transitive)
- Removedlodash@2.4.2(transitive)
- Removedwhen@3.2.3(transitive)
Updatedamqplib@^0.3.2
Updatedlodash@^3.10.1