New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

jackrabbit

Package Overview
Dependencies
Maintainers
1
Versions
24
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

jackrabbit - npm Package Compare versions

Comparing version 3.0.9 to 4.0.0

docker-compose.yml

179

lib/jackrabbit.js

@@ -1,135 +0,76 @@

var amqp = require('amqplib');
var amqp = require('amqplib/callback_api');
var _ = require('lodash');
var EventEmitter = require('events').EventEmitter;
var exchange = require('./exchange');
var Queue = require('./queue');
module.exports = jackrabbit;
function JackRabbit(url, prefetch) {
function jackrabbit(url) {
if (!url) throw new Error('url required for jackrabbit connection');
EventEmitter.call(this);
// state
var connection;
this.connection = null;
this.channel = null;
this.prefetch = prefetch || 1;
this.queues = {};
var rabbit = _.extend(new EventEmitter(), {
default: createDefaultExchange,
direct: createExchange('direct'),
fanout: createExchange('fanout'),
topic: createExchange('topic'),
close: close,
getInternals: getInternals
});
amqp
.connect(url)
.then(this.createChannel.bind(this))
.then(this.onChannel.bind(this))
.catch(this.onConnectionErr.bind(this));
}
amqp.connect(url, onConnection);
return rabbit;
module.exports = function createJackRabbit(url, prefetch) {
return new JackRabbit(url, prefetch);
};
// public
JackRabbit.prototype = Object.create(EventEmitter.prototype);
function getInternals() {
return {
amqp: amqp,
connection: connection
};
}
JackRabbit.prototype.createChannel = function(connection) {
this.connection = connection;
this.connection.once('close', this.onClose.bind(this));
return connection.createChannel();
};
function close(callback) {
connection.close(function(err) {
if (callback) callback(err);
rabbit.emit('close');
});
}
JackRabbit.prototype.onChannel = function(channel) {
this.channel = channel;
function createDefaultExchange() {
return createExchange('direct')('');
}
// Create a reply queue
this.channel.replyHandlers = {};
this.channel.assertQueue('', { exclusive: true })
.then(function(replyTo) {
this.channel.replyName = replyTo.queue;
this.channel.consume(this.replyName, this.onReply.bind(this), { noAck: true });
this.emit('connected');
}.bind(this));
function createExchange(type) {
return function(name) {
var newExchange = exchange(name, type);
if (connection) {
newExchange.connect(connection);
}
else {
rabbit.once('connected', function() {
newExchange.connect(connection);
});
}
return newExchange;
};
}
this.channel.on('close', this.onChannelClose.bind(this));
this.channel.prefetch(this.prefetch);
};
// private
JackRabbit.prototype.onReply = function(msg) {
var id = msg.properties.correlationId;
var replyHandler = this.channel.replyHandlers[id];
if (!replyHandler) return;
function bail(err) {
// TODO close any connections or channels that remain open
connection = undefined;
channel = undefined;
rabbit.emit('error', err);
}
var body = msg.content.toString();
var obj = JSON.parse(body);
replyHandler(null, obj);
//release the replyHandler in this.channel.replyHandlers
delete this.channel.replyHandlers[id];
};
JackRabbit.prototype.close = function() {
this.connection.close();
};
JackRabbit.prototype.onClose = function() {
this.emit('disconnected');
};
JackRabbit.prototype.onConnectionErr = function(err) {
this.emit('disconnected', err);
};
JackRabbit.prototype.onChannelClose = function() {
this.emit('disconnected');
};
JackRabbit.prototype.queue = function(name, options) {
return new Queue(this.channel, name, options);
};
JackRabbit.prototype.create = function(name, options, done) {
if (!done) {
if (typeof options === 'function') {
done = options;
options = {};
}
else {
done = function() {};
}
function onConnection(err, conn) {
if (err) return bail(err);
connection = conn;
connection.on('close', bail.bind(this));
rabbit.emit('connected');
}
var queue = new Queue(this.channel, name, options);
queue
.once('ready', function onQueueReady(info) {
this.queues[name] = queue;
done(null, queue, info);
}.bind(this))
.once('error', function onQueueErr(err) {
done(err);
});
};
JackRabbit.prototype.destroy = function(name, done) {
this.channel
.deleteQueue(name)
.then(onSuccess)
.catch(onFail);
function onSuccess() { done(null, true); }
function onFail(err) { done(err); }
};
JackRabbit.prototype.purge = function(name, done) {
this.channel
.purgeQueue(name)
.then(onSuccess)
.catch(onFail);
function onSuccess(response) { done(null, response.messageCount); }
function onFail(err) { done(err); }
};
JackRabbit.prototype.publish = function(name, obj, replyHandler) {
this.queues[name].publish(obj, replyHandler);
};
JackRabbit.prototype.handle = function(name, handler) {
this.queues[name].subscribe(handler);
};
JackRabbit.prototype.ignore = function(name) {
this.queues[name].unsubscribe();
};
}

@@ -0,81 +1,117 @@

var amqp = require('amqplib/callback_api');
var _ = require('lodash');
var EventEmitter = require('events').EventEmitter;
var uuid = require('node-uuid');
var _ = require('lodash');
function Queue(channel, name, options) {
EventEmitter.call(this);
options = _.extend({}, { durable: true, noAck: false, messageTtl: 1000 }, options);
var DEFAULT_QUEUE_OPTIONS = {
exclusive: false,
durable: true,
prefetch: 1, // can be set on the queue because we use a per-queue channel
messageTtl: undefined,
maxLength: undefined
};
this.handler = function() {};
this.name = name;
this.replyName = null;
this.channel = channel;
this.durable = options.durable;
this.noAck = options.noAck;
this.tag = null;
this.messageTtl = options.messageTtl;
var DEFAULT_CONSUME_OPTIONS = {
consumerTag: undefined,
noAck: false,
exclusive: false,
priority: undefined
};
this.channel
.assertQueue(name, {
durable: this.durable
})
.then(function createReplyQueue(info) {
this.emit('ready', info);
}.bind(this));
}
module.exports = queue;
Queue.prototype = Object.create(EventEmitter.prototype);
function queue(options) {
options = options || {};
var channel, consumerTag;
var emitter = _.extend(new EventEmitter(), {
name: options.name,
options: _.extend({}, DEFAULT_QUEUE_OPTIONS, options),
connect: connect,
consume: consume
});
module.exports = Queue;
return emitter;
Queue.prototype.subscribe = function(handler) {
this.handler = handler;
var tag = this.channel
.consume(this.name, this.onMessage.bind(this), { noAck: this.noAck })
.then(saveTag.bind(this));
// TODO: is there a race condition here if this isn't called before unsubscribe?
function saveTag(obj) {
this.tag = obj.consumerTag;
function connect(connection) {
connection.createChannel(onChannel);
}
};
Queue.prototype.unsubscribe = function() {
this.channel.cancel(this.tag);
this.handler = null;
this.tag = null;
};
function consume(callback, options) {
emitter.once('ready', function() {
var opts = _.extend({}, DEFAULT_CONSUME_OPTIONS, options);
channel.consume(emitter.name, onMessage, opts, onConsume);
});
Queue.prototype.onMessage = function(msg) {
if (!msg) return;
var body = msg.content.toString();
var obj = JSON.parse(body);
var hasReply = msg.properties.replyTo;
function onMessage(msg) {
var data = parseMessage(msg);
if (!data) return;
if (hasReply && !this.noAck) this.channel.ack(msg);
this.handler(obj, function(reply) {
if (hasReply) {
var replyBuffer = new Buffer(JSON.stringify(reply || ''));
this.channel.sendToQueue(msg.properties.replyTo, replyBuffer, {
correlationId: msg.properties.correlationId
});
callback(data, ack, nack, msg);
function ack(reply) {
var replyTo = msg.properties.replyTo;
var id = msg.properties.correlationId;
if (replyTo && id) {
var buffer = encodeMessage(reply, msg.properties.contentType);
channel.publish('', replyTo, buffer, {
correlationId: id,
contentType: msg.properties.contentType
});
}
channel.ack(msg);
}
function nack() {
channel.nack(msg);
}
}
else if (!this.noAck) {
this.channel.ack(msg);
}
function encodeMessage(message, contentType) {
if (contentType === 'application/json') {
return new Buffer(JSON.stringify(message));
}
}.bind(this));
};
return new Buffer(message.toString());
}
Queue.prototype.publish = function(obj, replyHandler) {
var msg = JSON.stringify(obj);
var id = uuid.v4();
if (replyHandler) {
this.channel.replyHandlers[id] = replyHandler;
function parseMessage(msg) {
if (msg.properties.contentType === 'application/json') {
try {
return JSON.parse(msg.content.toString());
}
catch (e) {
emitter.emit('error', new Error('unable to parse message as JSON'));
return;
}
}
return msg.content;
}
this.channel.sendToQueue(this.name, new Buffer(msg), {
persistent: !replyHandler,
correlationId: id,
expiration: this.messageTtl,
replyTo: replyHandler ? this.channel.replyName : undefined
});
};
function onConsume(err, info) {
if (err) return bail(err);
consumerTag = info.consumerTag; // required to stop consuming
emitter.emit('consuming');
}
function bail(err) {
// TODO: close the channel if still open
channel = undefined;
emitter.name = undefined;
consumerTag = undefined;
emitter.emit('close', err);
}
function onChannel(err, chan) {
if (err) return bail(err);
channel = chan;
channel.prefetch(emitter.options.prefetch);
channel.on('close', bail.bind(this, new Error('channel closed')));
emitter.emit('connected');
channel.assertQueue(emitter.name, emitter.options, onQueue);
}
function onQueue(err, info) {
if (err) return bail(err);
emitter.name = info.queue;
emitter.emit('ready');
}
}
{
"name": "jackrabbit",
"version": "3.0.9",
"description": "Simple AMQP / RabbitMQ job queues for node",
"version": "4.0.0",
"description": "Easy RabbitMQ for node",
"readme": "https://github.com/hunterloftis/jackrabbit/blob/master/readme.md",
"keywords": [

@@ -19,3 +20,3 @@ "amqp",

"scripts": {
"test": "mocha"
"test": "mocha -R spec"
},

@@ -28,12 +29,10 @@ "engines": {

"dependencies": {
"amqplib": "^0.2.1",
"lodash": "^2.4.1",
"amqplib": "^0.3.2",
"lodash": "^3.10.1",
"node-uuid": "^1.4.1"
},
"devDependencies": {
"chai": "^1.9.1",
"express": "^4.8.7",
"mocha": "^1.21.4",
"supertest": "^0.13.0"
"chai": "^3.2.0",
"mocha": "^2.2.5"
}
}
# Jackrabbit
Simple AMQP / RabbitMQ job queues for node
RabbitMQ in Node.js without hating life.
```js
var queue = jackrabbit('amqp://localhost');
*producer.js:*
queue.on('connected', function() {
queue.create('jobs.greet', { prefetch: 5 }, onReady);
function onReady() {
queue.handle('jobs.greet', onJob);
queue.publish('jobs.greet', { name: 'Hunter' });
}
function onJob(job, ack) {
console.log('Hello, ' + job.name);
ack();
}
});
```
## Installation
```
npm install --save jackrabbit
```
```js
var jackrabbit = require('jackrabbit');
```
var rabbit = jackrabbit(process.env.RABBIT_URL);
## Use
First, create a queue and connect to an amqp server:
```js
var queue = jackrabbit(amqp_url, prefetch)
rabbit
.default()
.publish('Hello World!', { key: 'hello' })
.on('drain', rabbit.close);
```
- amqp_url: eg, 'amqp://localhost'
- prefetch: messages to prefetch (default = 1)
*consumer.js:*
#### create
Create (or assert) a queue.
```js
queue.create(name, options, callback)
```
var jackrabbit = require('jackrabbit');
var rabbit = jackrabbit(process.env.RABBIT_URL);
- name: name of the queue (eg, 'jobs.scrape')
- options: object with...
- durable (Boolean, default = true)
- prefetch (Number, default = 1)
- callback: callback function for result (err, queue_instance, queue_info)
rabbit
.default()
.queue({ name: 'hello' })
.consume(onMessage, { noAck: true });
#### destroy
Destroy a queue.
```js
queue.destroy(name, callback)
function onMessage(data) {
console.log('received:', data);
}
```
- name: name of the queue
- callback: callback for result (err, destroyed)
## Use
You can destroy queues that exist or don't exist;
if you try to destroy a queue that doesn't exist,
destroyed = false. If a queue was destroyed,
destroyed = true.
For now, the best usage help is found in the examples,
which map 1-to-1 with the official RabbitMQ tutorials.
#### publish
## Installation
Publish a job to a queue.
```js
queue.publish(name, message)
```
- name: name of the queue
- message: an Object that is your message / job to add to the queue
#### handle
Start handling jobs in a queue.
```js
queue.handle(name, handler)
npm install --save jackrabbit
```
- name: name of the queue
- handler: a Function to receive jobs (job, ack)
## Tests
Jobs must be acknowledged. You can either ack immediately
(so all jobs will be run at most once) or
you can ack on job completion (so all jobs will run at least once).
The tests are set up with Docker + Docker-Compose,
so you don't need to install rabbitmq (or even node)
to run them:
#### ignore
Stop handling a queue.
```js
queue.ignore(name)
```
- name: name of the queue
#### purge
Purge a queue.
```js
queue.purge(name, callback);
$ docker-compose run jackrabbit npm test
```
- name: name of the queue
- callback: Function to be called on completion with (err, countOfPurgedMessages)
## Tests
1. Run rabbit on 'amqp://localhost'
2. `npm test`
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc