Socket
Socket
Sign inDemoInstall

amqper

Package Overview
Dependencies
32
Maintainers
1
Versions
24
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.4.0 to 0.5.0

4

examples/round-robin/consumer.js
"use strict";
var amqper = require('../../');
const amqper = require('../..');
var client = amqper.connect('amqp://guest:guest@localhost:5672');
const client = amqper.connect('amqp://guest:guest@localhost:5672');

@@ -7,0 +7,0 @@ client.route('test.a', {}, function (message) {

"use strict";
var amqper = require('../../');
const PromiseA = require('bluebird');
const amqper = require('../..');
var client = amqper.connect('amqp://guest:guest@localhost:5672');
const client = amqper.connect('amqp://guest:guest@localhost:5672');
for (var i = 0; i < 10; i++) {
client.publish('amq.topic', 'test.a', i);
function publish(i) {
client.publish('amq.topic', 'test.a', i).then(() => {
console.log('published ' + i);
});
}
let i = 0;
(async () => {
while (true) {
await PromiseA.delay(1000);
publish(i++);
}
})();

@@ -7,5 +7,5 @@ "use strict";

var Connection = require('amqplib/lib/connection').Connection;
const Connection = require('amqplib/lib/connection').Connection;
var close = Connection.prototype.close;
const close = Connection.prototype.close;

@@ -12,0 +12,0 @@ Connection.prototype.close = function (cb) {

"use strict";
var debug = require('debug')('amqper:client');
var deprecate = require('depd')('amqper');
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var Promise = require('bluebird');
var codecs = require('./codecs');
const debug = require('debug')('amqper:client');
const deprecate = require('depd')('amqper');
const EventEmitter = require('events').EventEmitter;
const PromiseA = require('bluebird');
const codecs = require('./codecs');
module.exports = Client;
class Client extends EventEmitter {
constructor(options) {
super();
this.__amqper__ = true;
// Do the tedious string-or-buffer conversion. If I was using byte
// streams, this would be done automatically; however I'm using
// streams in object mode.
function bufferify(chunk, encoding) {
return (typeof chunk === 'string') ? new Buffer(chunk, encoding || 'utf8') : chunk;
}
this.options = options = options || {};
function Client(options) {
if (!(this instanceof Client)) {
return new Client(options);
this.name = options.name;
this.label = options.label || (options.name ? '#' + options.name : '');
this.codec = codecs.byName(this.options.format || 'json');
this.conn = null;
this.channel = null;
this.connected = false;
this.routers = [];
this.$promise = null;
}
EventEmitter.call(this);
this.__amqper__ = true;
format(fmt) {
this.options.format = fmt;
this.codec = codecs.byName(fmt || 'json');
};
this.options = options = options || {};
ready(done) {
return this.$promise.then(function () {
if (done) done();
}, function (err) {
if (done) return done(err);
return err;
});
};
this.name = options.name;
this.label = options.label || (options.name ? '#' + options.name : '');
this.codec = codecs.byName(this.options.format || 'json');
/**
*
* @param {String} exchange
* @param {String} routingKey
* @param {Object|String|Number} content
* @returns {*|PromiseA}
*/
publish(exchange, routingKey, content) {
const that = this;
return this.$promise.then(function () {
const channel = that.channel;
const codec = that.codec;
return PromiseA.try(function () {
content = bufferify(codec.encode(content));
return channel.publish(exchange, routingKey, content);
});
});
};
this.conn = null;
this.channel = null;
this.connected = false;
this.routers = [];
this.$promise = null;
}
/**
*
* @param {String} route
* @param {Object|Function} [options]
* @param {Function} [handler] function (err, message)
* @returns {*|PromiseA}
*/
route(route, options, handler) {
if (typeof options === 'function') {
handler = options;
options = null;
}
util.inherits(Client, EventEmitter);
if (handler && handler.length > 1) {
deprecate('route handler signature changed from route(err, message) to route(message)');
}
Client.prototype.format = function (fmt) {
this.options.format = fmt;
this.codec = codecs.byName(fmt || 'json');
};
function fn(message) {
if (!handler) return;
if (handler.length > 1) {
handler(null, message);
} else {
handler(message);
}
}
Client.prototype.ready = function (done) {
return this.$promise.then(function () {
if (done) done();
}, function (err) {
if (done) return done(err);
return err;
});
};
/**
*
* @param {String} exchange
* @param {String} routingKey
* @param {Object|String|Number} content
* @returns {*|Promise}
*/
Client.prototype.publish = function (exchange, routingKey, content) {
var that = this;
return this.$promise.then(function () {
var channel = that.channel;
var codec = that.codec;
return Promise.try(function () {
content = bufferify(codec.encode(content));
return channel.publish(exchange, routingKey, content);
const that = this;
return this.$promise.then(function () {
const codec = that.codec;
const router = that.context.route(route, options, function (message) {
PromiseA.try(function () {
message.payload = codec.decode(message.content);
return fn(message);
}).then(function () {
return message.ack();
}).catch(function (err) {
debug('error', 'Error thrown in routing handler, not acking message. Error: ', err.stack);
that.emit('error', err);
});
});
that.routers.push(router);
return router.$promise;
});
});
};
};
/**
*
* @param {String} route
* @param {Object|Function} [options]
* @param {Function} [handler] function (err, message)
* @returns {*|Promise}
*/
Client.prototype.route = function (route, options, handler) {
if (typeof options === 'function') {
handler = options;
options = null;
/**
*
* @param {String} route
* @param {Object|Function} [options]
* @param {Function} [handler] function (err, message)
* @returns {*|PromiseA}
*/
subscribe(route, options, handler) {
return this.route(...arguments);
}
if (handler && handler.length > 1) {
deprecate('route handler signature changed from route(err, message) to route(message)');
}
function fn(message) {
if (!handler) return;
if (handler.length > 1) {
handler(null, message);
} else {
handler(message);
close(cb) {
if (this.closing || this.closed) {
if (cb) cb();
return PromiseA.resolve();
}
}
var that = this;
return this.$promise.then(function () {
var codec = that.codec;
var router = that.context.route(route, options, function (message) {
Promise.try(function () {
message.payload = codec.decode(message.content);
return fn(message);
}).then(function () {
return message.ack();
}).catch(function (err) {
debug('error', 'Error thrown in routing handler, not acking message. Error: ', err.stack);
that.emit('error', err);
});
this.closing = true;
const that = this;
return close_connection(this.conn).then(function () {
return PromiseA.all(PromiseA.map(that.routers, function (router) {
return router.connection.then(function (conn) {
if (conn === that.conn) return;
return close_connection(conn);
});
}));
}).then(function () {
that.routers = [];
that.closed = true;
that.closing = false;
if (cb) cb();
});
that.routers.push(router);
return router.$promise;
});
};
};
}
Client.prototype.subscribe = Client.prototype.route;
// Do the tedious string-or-buffer conversion. If I was using byte
// streams, this would be done automatically; however I'm using
// streams in object mode.
function bufferify(chunk, encoding) {
return (typeof chunk === 'string') ? new Buffer(chunk, encoding || 'utf8') : chunk;
}
Client.prototype.close = function (cb) {
if (this.closing || this.closed) {
if (cb) cb();
return Promise.resolve();
}
this.closing = true;
var that = this;
return close_connection(this.conn).then(function () {
return Promise.all(Promise.map(that.routers, function (router) {
return router.connection.then(function (conn) {
if (conn === that.conn) return;
return close_connection(conn);
});
}));
}).then(function () {
that.routers = [];
that.closed = true;
that.closing = false;
if (cb) cb();
});
};
function close_connection(conn) {
if (!conn || conn.cloing || conn.closed) return Promise.resolve();
return Promise.try(function () {
return new Promise(function (resolve) {
if (!conn || conn.cloing || conn.closed) return PromiseA.resolve();
return PromiseA.try(function () {
return new PromiseA(function (resolve) {
conn.once('close', function () {

@@ -158,1 +160,3 @@ resolve();

}
module.exports = Client;
"use strict";
var msgpack = require('msgpack');
const msgpack = require('msgpack');

@@ -5,0 +5,0 @@ exports.define = function (name, encoding, fns) {

"use strict";
var debug = require('debug')('amqper:connect');
var Promise = require('bluebird');
var amqp = require('./amqp');
var routify = require('./routify');
var Client = require('./client');
const debug = require('debug')('amqper:connect');
const PromiseA = require('bluebird');
const amqp = require('./amqp');
const routify = require('./routify');
const Client = require('./client');

@@ -15,6 +15,6 @@ module.exports = function (options) {

var client = new Client(options);
const client = new Client(options);
client.$promise = Promise.try(function () {
var url = options.url;
client.$promise = PromiseA.try(function () {
const url = options.url;

@@ -27,6 +27,6 @@ if (!url) {

//var closeErr = new Error('AMQP connection closed by remote host (AMQP server).');
var routerErr = new Error('Error in router acting on AMQP connection.');
//const closeErr = new Error('AMQP connection closed by remote host (AMQP server).');
const routerErr = new Error('Error in router acting on AMQP connection.');
var open = Promise.resolve(amqp.connect(url));
const open = PromiseA.resolve(amqp.connect(url));

@@ -37,3 +37,3 @@ client.context = routify.createContext({

var setup = open.then(function (conn) {
const setup = open.then(function (conn) {
debug('connected');

@@ -55,3 +55,3 @@ client.conn = conn;

// now we're set up, get a channel for publishes
return Promise.resolve(conn.createChannel()).then(function (channel) {
return PromiseA.resolve(conn.createChannel()).then(function (channel) {
client.channel = channel;

@@ -58,0 +58,0 @@ debug('channel created');

"use strict";
var _ = require('lodash');
var Router = require('./router');
const _ = require('lodash');
const Router = require('./router');
module.exports = function Context(settings) {
if (!(this instanceof Context)) return new Context(settings);
class Context {
constructor(settings) {
this._settings = settings || {};
}
settings = settings || {};
this.route = function (route, options, handler) {
route(route, options, handler) {
if ('function' !== typeof handler) throw new Error('handler function required');
options = _.defaults(options, settings);
var router = new Router(route, options, handler);
options = _.defaults(options, this._settings);
const router = new Router(route, options, handler);
router.init();
return router;
};
};
}
}
module.exports = Context;
"use strict";
exports.createContext = require('./context');
const Context = require('./context');
exports.createContext = settings => new Context(settings);
'use strict';
var debug = require('debug')('amqper:parser');
var _ = require('lodash');
var Houkou = require('houkou');
const debug = require('debug')('amqper:parser');
const _ = require('lodash');
const Houkou = require('houkou');

@@ -21,8 +21,8 @@ module.exports = Parser;

var params = route.match(/\:([a-zA-Z0-9]+)/g);
var requirements = {};
const params = route.match(/\:([a-zA-Z0-9]+)/g);
const requirements = {};
if (Array.isArray(params)) {
params.forEach(function cleanParams(param) {
var sparam = param.replace(/:/, '');
const sparam = param.replace(/:/, '');
requirements[sparam] = "[a-zA-Z0-9]+";

@@ -37,7 +37,7 @@ });

Parser.prototype.parse = function (data) {
var routingKey = data.fields.routingKey;
const routingKey = data.fields.routingKey;
debug('pattern', this.route.pattern);
debug('routingKey', routingKey);
var result = {};
let result = {};
if (routingKey) {

@@ -44,0 +44,0 @@ result = this.route.match(routingKey);

'use strict';
var debug = require('debug')('amqper:router');
var crypto = require('crypto');
var amqp = require('../amqp');
var Promise = require('bluebird');
var _ = require('lodash');
var Parser = require('./parser');
const debug = require('debug')('amqper:router');
const crypto = require('crypto');
const amqp = require('../amqp');
const PromiseA = require('bluebird');
const _ = require('lodash');
const Parser = require('./parser');
module.exports = Router;
var defaults = {
const defaults = {
url: 'amqp://guest:guest@localhost:5672',

@@ -36,3 +36,3 @@ exchange: 'amq.topic',

var that = this;
const that = this;

@@ -49,3 +49,3 @@ function consumerTag() {

var message = data;
const message = data;

@@ -75,6 +75,6 @@ message.params = that.parser.parse(data);

var routingKey = that.options.route.replace(/:[a-zA-Z0-9]+/g, '*');
const routingKey = that.options.route.replace(/:[a-zA-Z0-9]+/g, '*');
debug('routingKey', routingKey);
return Promise.all([
return PromiseA.all([
ch.assertExchange(that.options.exchange, that.options.exchangeType),

@@ -96,7 +96,7 @@ ch.assertQueue(that.options.queue, that.options.queueOpts),

var open = this.connection = options.connection || amqp.connect(options.url);
const open = this.connection = options.connection || amqp.connect(options.url);
this.$promise = open.then(function (conn) {
conn.on('error', that.cleanup);
var ok = conn.createChannel();
const ok = conn.createChannel();
return ok.then(subscribe).then(function () {

@@ -103,0 +103,0 @@ return ok;

{
"name": "amqper",
"version": "0.4.0",
"version": "0.5.0",
"description": "A simple and elegant AMQP client for node based on amqplib.",

@@ -18,6 +18,6 @@ "homepage": "https://github.com/taoyuan/amqper",

"dependencies": {
"amqplib": "^0.5.1",
"bluebird": "^3.5.0",
"debug": "^2.6.6",
"depd": "^1.1.0",
"amqplib": "^0.5.2",
"bluebird": "^3.5.1",
"debug": "^3.1.0",
"depd": "^1.1.1",
"houkou": "^0.2.2",

@@ -28,6 +28,7 @@ "lodash": "^4.17.4",

"devDependencies": {
"chai": "^3.5.0",
"@types/chai": "^4.0.10",
"chai": "^4.1.2",
"gulp": "^3.9.1",
"gulp-exclude-gitignore": "^1.1.1",
"gulp-istanbul": "^1.1.1",
"gulp-exclude-gitignore": "^1.2.0",
"gulp-istanbul": "^1.1.2",
"gulp-jshint": "^2.0.4",

@@ -38,9 +39,9 @@ "gulp-mocha": "^4.3.1",

"jshint-stylish": "^2.2.1",
"mocha": "^3.3.0",
"uuid": "^3.0.1"
"mocha": "^4.0.1",
"uuid": "^3.1.0"
},
"scripts": {
"test": "gulp"
"test": "mocha test/**.test.js"
},
"license": "MIT"
}
'use strict';
var t = require('chai').assert;
var amqper = require('../');
const assert = require('chai').assert;
const amqper = require('..');

@@ -21,5 +21,5 @@ function randomQueue() {

it('should connect to rabbit server', function (done) {
var client = amqper.connect('amqp://guest:guest@localhost:5672');
const client = amqper.connect('amqp://guest:guest@localhost:5672');
client.$promise.then(function (conn) {
t.ok(conn);
assert.ok(conn);
client.close(done);

@@ -32,10 +32,10 @@ });

it('should publish and received in route', function (done) {
var data = {
const data = {
foo: 'bar1'
};
var client = amqper.connect('amqp://guest:guest@localhost:5672');
const client = amqper.connect('amqp://guest:guest@localhost:5672');
client.$promise.then(function () {
client.route('test1.:arg', {queue: randomQueue()}, function (message) {
t.deepEqual(message.payload, data);
assert.deepEqual(message.payload, data);
delayCloseClient(client, done);

@@ -51,11 +51,11 @@ }).then(function () {

it('should publish and received in route with msgpack format', function (done) {
var data = {
const data = {
hello: 'world'
};
var client = amqper.connect('amqp://guest:guest@localhost:5672');
const client = amqper.connect('amqp://guest:guest@localhost:5672');
client.$promise.then(function () {
client.format('msgpack');
client.route('test2.:arg', {queue: randomQueue()}, function (message) {
t.deepEqual(message.payload, data);
assert.deepEqual(message.payload, data);
delayCloseClient(client, done);

@@ -69,7 +69,7 @@ }).then(function () {

it('should handle the error in handler', function (done) {
var data = {
const data = {
hello: 'world'
};
var client = amqper.connect('amqp://guest:guest@localhost:5672');
const client = amqper.connect('amqp://guest:guest@localhost:5672');
client.$promise.then(function () {

@@ -83,3 +83,3 @@ client.route('test2.:arg', {queue: randomQueue()}, function () {

client.on('error', function (err) {
t.equal(err.message, 'boom');
assert.equal(err.message, 'boom');
delayCloseClient(client, done);

@@ -86,0 +86,0 @@ });

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc