amqper
Advanced tools
Comparing version 0.4.0 to 0.5.0
"use strict"; | ||
var amqper = require('../../'); | ||
const amqper = require('../..'); | ||
var client = amqper.connect('amqp://guest:guest@localhost:5672'); | ||
const client = amqper.connect('amqp://guest:guest@localhost:5672'); | ||
@@ -7,0 +7,0 @@ client.route('test.a', {}, function (message) { |
"use strict"; | ||
var amqper = require('../../'); | ||
const PromiseA = require('bluebird'); | ||
const amqper = require('../..'); | ||
var client = amqper.connect('amqp://guest:guest@localhost:5672'); | ||
const client = amqper.connect('amqp://guest:guest@localhost:5672'); | ||
for (var i = 0; i < 10; i++) { | ||
client.publish('amq.topic', 'test.a', i); | ||
function publish(i) { | ||
client.publish('amq.topic', 'test.a', i).then(() => { | ||
console.log('published ' + i); | ||
}); | ||
} | ||
let i = 0; | ||
(async () => { | ||
while (true) { | ||
await PromiseA.delay(1000); | ||
publish(i++); | ||
} | ||
})(); |
@@ -7,5 +7,5 @@ "use strict"; | ||
var Connection = require('amqplib/lib/connection').Connection; | ||
const Connection = require('amqplib/lib/connection').Connection; | ||
var close = Connection.prototype.close; | ||
const close = Connection.prototype.close; | ||
@@ -12,0 +12,0 @@ Connection.prototype.close = function (cb) { |
"use strict"; | ||
var debug = require('debug')('amqper:client'); | ||
var deprecate = require('depd')('amqper'); | ||
var EventEmitter = require('events').EventEmitter; | ||
var util = require('util'); | ||
var Promise = require('bluebird'); | ||
var codecs = require('./codecs'); | ||
const debug = require('debug')('amqper:client'); | ||
const deprecate = require('depd')('amqper'); | ||
const EventEmitter = require('events').EventEmitter; | ||
const PromiseA = require('bluebird'); | ||
const codecs = require('./codecs'); | ||
module.exports = Client; | ||
class Client extends EventEmitter { | ||
constructor(options) { | ||
super(); | ||
this.__amqper__ = true; | ||
// Do the tedious string-or-buffer conversion. If I was using byte | ||
// streams, this would be done automatically; however I'm using | ||
// streams in object mode. | ||
function bufferify(chunk, encoding) { | ||
return (typeof chunk === 'string') ? new Buffer(chunk, encoding || 'utf8') : chunk; | ||
} | ||
this.options = options = options || {}; | ||
function Client(options) { | ||
if (!(this instanceof Client)) { | ||
return new Client(options); | ||
this.name = options.name; | ||
this.label = options.label || (options.name ? '#' + options.name : ''); | ||
this.codec = codecs.byName(this.options.format || 'json'); | ||
this.conn = null; | ||
this.channel = null; | ||
this.connected = false; | ||
this.routers = []; | ||
this.$promise = null; | ||
} | ||
EventEmitter.call(this); | ||
this.__amqper__ = true; | ||
format(fmt) { | ||
this.options.format = fmt; | ||
this.codec = codecs.byName(fmt || 'json'); | ||
}; | ||
this.options = options = options || {}; | ||
ready(done) { | ||
return this.$promise.then(function () { | ||
if (done) done(); | ||
}, function (err) { | ||
if (done) return done(err); | ||
return err; | ||
}); | ||
}; | ||
this.name = options.name; | ||
this.label = options.label || (options.name ? '#' + options.name : ''); | ||
this.codec = codecs.byName(this.options.format || 'json'); | ||
/** | ||
* | ||
* @param {String} exchange | ||
* @param {String} routingKey | ||
* @param {Object|String|Number} content | ||
* @returns {*|PromiseA} | ||
*/ | ||
publish(exchange, routingKey, content) { | ||
const that = this; | ||
return this.$promise.then(function () { | ||
const channel = that.channel; | ||
const codec = that.codec; | ||
return PromiseA.try(function () { | ||
content = bufferify(codec.encode(content)); | ||
return channel.publish(exchange, routingKey, content); | ||
}); | ||
}); | ||
}; | ||
this.conn = null; | ||
this.channel = null; | ||
this.connected = false; | ||
this.routers = []; | ||
this.$promise = null; | ||
} | ||
/** | ||
* | ||
* @param {String} route | ||
* @param {Object|Function} [options] | ||
* @param {Function} [handler] function (err, message) | ||
* @returns {*|PromiseA} | ||
*/ | ||
route(route, options, handler) { | ||
if (typeof options === 'function') { | ||
handler = options; | ||
options = null; | ||
} | ||
util.inherits(Client, EventEmitter); | ||
if (handler && handler.length > 1) { | ||
deprecate('route handler signature changed from route(err, message) to route(message)'); | ||
} | ||
Client.prototype.format = function (fmt) { | ||
this.options.format = fmt; | ||
this.codec = codecs.byName(fmt || 'json'); | ||
}; | ||
function fn(message) { | ||
if (!handler) return; | ||
if (handler.length > 1) { | ||
handler(null, message); | ||
} else { | ||
handler(message); | ||
} | ||
} | ||
Client.prototype.ready = function (done) { | ||
return this.$promise.then(function () { | ||
if (done) done(); | ||
}, function (err) { | ||
if (done) return done(err); | ||
return err; | ||
}); | ||
}; | ||
/** | ||
* | ||
* @param {String} exchange | ||
* @param {String} routingKey | ||
* @param {Object|String|Number} content | ||
* @returns {*|Promise} | ||
*/ | ||
Client.prototype.publish = function (exchange, routingKey, content) { | ||
var that = this; | ||
return this.$promise.then(function () { | ||
var channel = that.channel; | ||
var codec = that.codec; | ||
return Promise.try(function () { | ||
content = bufferify(codec.encode(content)); | ||
return channel.publish(exchange, routingKey, content); | ||
const that = this; | ||
return this.$promise.then(function () { | ||
const codec = that.codec; | ||
const router = that.context.route(route, options, function (message) { | ||
PromiseA.try(function () { | ||
message.payload = codec.decode(message.content); | ||
return fn(message); | ||
}).then(function () { | ||
return message.ack(); | ||
}).catch(function (err) { | ||
debug('error', 'Error thrown in routing handler, not acking message. Error: ', err.stack); | ||
that.emit('error', err); | ||
}); | ||
}); | ||
that.routers.push(router); | ||
return router.$promise; | ||
}); | ||
}); | ||
}; | ||
}; | ||
/** | ||
* | ||
* @param {String} route | ||
* @param {Object|Function} [options] | ||
* @param {Function} [handler] function (err, message) | ||
* @returns {*|Promise} | ||
*/ | ||
Client.prototype.route = function (route, options, handler) { | ||
if (typeof options === 'function') { | ||
handler = options; | ||
options = null; | ||
/** | ||
* | ||
* @param {String} route | ||
* @param {Object|Function} [options] | ||
* @param {Function} [handler] function (err, message) | ||
* @returns {*|PromiseA} | ||
*/ | ||
subscribe(route, options, handler) { | ||
return this.route(...arguments); | ||
} | ||
if (handler && handler.length > 1) { | ||
deprecate('route handler signature changed from route(err, message) to route(message)'); | ||
} | ||
function fn(message) { | ||
if (!handler) return; | ||
if (handler.length > 1) { | ||
handler(null, message); | ||
} else { | ||
handler(message); | ||
close(cb) { | ||
if (this.closing || this.closed) { | ||
if (cb) cb(); | ||
return PromiseA.resolve(); | ||
} | ||
} | ||
var that = this; | ||
return this.$promise.then(function () { | ||
var codec = that.codec; | ||
var router = that.context.route(route, options, function (message) { | ||
Promise.try(function () { | ||
message.payload = codec.decode(message.content); | ||
return fn(message); | ||
}).then(function () { | ||
return message.ack(); | ||
}).catch(function (err) { | ||
debug('error', 'Error thrown in routing handler, not acking message. Error: ', err.stack); | ||
that.emit('error', err); | ||
}); | ||
this.closing = true; | ||
const that = this; | ||
return close_connection(this.conn).then(function () { | ||
return PromiseA.all(PromiseA.map(that.routers, function (router) { | ||
return router.connection.then(function (conn) { | ||
if (conn === that.conn) return; | ||
return close_connection(conn); | ||
}); | ||
})); | ||
}).then(function () { | ||
that.routers = []; | ||
that.closed = true; | ||
that.closing = false; | ||
if (cb) cb(); | ||
}); | ||
that.routers.push(router); | ||
return router.$promise; | ||
}); | ||
}; | ||
}; | ||
} | ||
Client.prototype.subscribe = Client.prototype.route; | ||
// Do the tedious string-or-buffer conversion. If I was using byte | ||
// streams, this would be done automatically; however I'm using | ||
// streams in object mode. | ||
function bufferify(chunk, encoding) { | ||
return (typeof chunk === 'string') ? new Buffer(chunk, encoding || 'utf8') : chunk; | ||
} | ||
Client.prototype.close = function (cb) { | ||
if (this.closing || this.closed) { | ||
if (cb) cb(); | ||
return Promise.resolve(); | ||
} | ||
this.closing = true; | ||
var that = this; | ||
return close_connection(this.conn).then(function () { | ||
return Promise.all(Promise.map(that.routers, function (router) { | ||
return router.connection.then(function (conn) { | ||
if (conn === that.conn) return; | ||
return close_connection(conn); | ||
}); | ||
})); | ||
}).then(function () { | ||
that.routers = []; | ||
that.closed = true; | ||
that.closing = false; | ||
if (cb) cb(); | ||
}); | ||
}; | ||
function close_connection(conn) { | ||
if (!conn || conn.cloing || conn.closed) return Promise.resolve(); | ||
return Promise.try(function () { | ||
return new Promise(function (resolve) { | ||
if (!conn || conn.cloing || conn.closed) return PromiseA.resolve(); | ||
return PromiseA.try(function () { | ||
return new PromiseA(function (resolve) { | ||
conn.once('close', function () { | ||
@@ -158,1 +160,3 @@ resolve(); | ||
} | ||
module.exports = Client; |
"use strict"; | ||
var msgpack = require('msgpack'); | ||
const msgpack = require('msgpack'); | ||
@@ -5,0 +5,0 @@ exports.define = function (name, encoding, fns) { |
"use strict"; | ||
var debug = require('debug')('amqper:connect'); | ||
var Promise = require('bluebird'); | ||
var amqp = require('./amqp'); | ||
var routify = require('./routify'); | ||
var Client = require('./client'); | ||
const debug = require('debug')('amqper:connect'); | ||
const PromiseA = require('bluebird'); | ||
const amqp = require('./amqp'); | ||
const routify = require('./routify'); | ||
const Client = require('./client'); | ||
@@ -15,6 +15,6 @@ module.exports = function (options) { | ||
var client = new Client(options); | ||
const client = new Client(options); | ||
client.$promise = Promise.try(function () { | ||
var url = options.url; | ||
client.$promise = PromiseA.try(function () { | ||
const url = options.url; | ||
@@ -27,6 +27,6 @@ if (!url) { | ||
//var closeErr = new Error('AMQP connection closed by remote host (AMQP server).'); | ||
var routerErr = new Error('Error in router acting on AMQP connection.'); | ||
//const closeErr = new Error('AMQP connection closed by remote host (AMQP server).'); | ||
const routerErr = new Error('Error in router acting on AMQP connection.'); | ||
var open = Promise.resolve(amqp.connect(url)); | ||
const open = PromiseA.resolve(amqp.connect(url)); | ||
@@ -37,3 +37,3 @@ client.context = routify.createContext({ | ||
var setup = open.then(function (conn) { | ||
const setup = open.then(function (conn) { | ||
debug('connected'); | ||
@@ -55,3 +55,3 @@ client.conn = conn; | ||
// now we're set up, get a channel for publishes | ||
return Promise.resolve(conn.createChannel()).then(function (channel) { | ||
return PromiseA.resolve(conn.createChannel()).then(function (channel) { | ||
client.channel = channel; | ||
@@ -58,0 +58,0 @@ debug('channel created'); |
"use strict"; | ||
var _ = require('lodash'); | ||
var Router = require('./router'); | ||
const _ = require('lodash'); | ||
const Router = require('./router'); | ||
module.exports = function Context(settings) { | ||
if (!(this instanceof Context)) return new Context(settings); | ||
class Context { | ||
constructor(settings) { | ||
this._settings = settings || {}; | ||
} | ||
settings = settings || {}; | ||
this.route = function (route, options, handler) { | ||
route(route, options, handler) { | ||
if ('function' !== typeof handler) throw new Error('handler function required'); | ||
options = _.defaults(options, settings); | ||
var router = new Router(route, options, handler); | ||
options = _.defaults(options, this._settings); | ||
const router = new Router(route, options, handler); | ||
router.init(); | ||
return router; | ||
}; | ||
}; | ||
} | ||
} | ||
module.exports = Context; |
"use strict"; | ||
exports.createContext = require('./context'); | ||
const Context = require('./context'); | ||
exports.createContext = settings => new Context(settings); |
'use strict'; | ||
var debug = require('debug')('amqper:parser'); | ||
var _ = require('lodash'); | ||
var Houkou = require('houkou'); | ||
const debug = require('debug')('amqper:parser'); | ||
const _ = require('lodash'); | ||
const Houkou = require('houkou'); | ||
@@ -21,8 +21,8 @@ module.exports = Parser; | ||
var params = route.match(/\:([a-zA-Z0-9]+)/g); | ||
var requirements = {}; | ||
const params = route.match(/\:([a-zA-Z0-9]+)/g); | ||
const requirements = {}; | ||
if (Array.isArray(params)) { | ||
params.forEach(function cleanParams(param) { | ||
var sparam = param.replace(/:/, ''); | ||
const sparam = param.replace(/:/, ''); | ||
requirements[sparam] = "[a-zA-Z0-9]+"; | ||
@@ -37,7 +37,7 @@ }); | ||
Parser.prototype.parse = function (data) { | ||
var routingKey = data.fields.routingKey; | ||
const routingKey = data.fields.routingKey; | ||
debug('pattern', this.route.pattern); | ||
debug('routingKey', routingKey); | ||
var result = {}; | ||
let result = {}; | ||
if (routingKey) { | ||
@@ -44,0 +44,0 @@ result = this.route.match(routingKey); |
'use strict'; | ||
var debug = require('debug')('amqper:router'); | ||
var crypto = require('crypto'); | ||
var amqp = require('../amqp'); | ||
var Promise = require('bluebird'); | ||
var _ = require('lodash'); | ||
var Parser = require('./parser'); | ||
const debug = require('debug')('amqper:router'); | ||
const crypto = require('crypto'); | ||
const amqp = require('../amqp'); | ||
const PromiseA = require('bluebird'); | ||
const _ = require('lodash'); | ||
const Parser = require('./parser'); | ||
module.exports = Router; | ||
var defaults = { | ||
const defaults = { | ||
url: 'amqp://guest:guest@localhost:5672', | ||
@@ -36,3 +36,3 @@ exchange: 'amq.topic', | ||
var that = this; | ||
const that = this; | ||
@@ -49,3 +49,3 @@ function consumerTag() { | ||
var message = data; | ||
const message = data; | ||
@@ -75,6 +75,6 @@ message.params = that.parser.parse(data); | ||
var routingKey = that.options.route.replace(/:[a-zA-Z0-9]+/g, '*'); | ||
const routingKey = that.options.route.replace(/:[a-zA-Z0-9]+/g, '*'); | ||
debug('routingKey', routingKey); | ||
return Promise.all([ | ||
return PromiseA.all([ | ||
ch.assertExchange(that.options.exchange, that.options.exchangeType), | ||
@@ -96,7 +96,7 @@ ch.assertQueue(that.options.queue, that.options.queueOpts), | ||
var open = this.connection = options.connection || amqp.connect(options.url); | ||
const open = this.connection = options.connection || amqp.connect(options.url); | ||
this.$promise = open.then(function (conn) { | ||
conn.on('error', that.cleanup); | ||
var ok = conn.createChannel(); | ||
const ok = conn.createChannel(); | ||
return ok.then(subscribe).then(function () { | ||
@@ -103,0 +103,0 @@ return ok; |
{ | ||
"name": "amqper", | ||
"version": "0.4.0", | ||
"version": "0.5.0", | ||
"description": "A simple and elegant AMQP client for node based on amqplib.", | ||
@@ -18,6 +18,6 @@ "homepage": "https://github.com/taoyuan/amqper", | ||
"dependencies": { | ||
"amqplib": "^0.5.1", | ||
"bluebird": "^3.5.0", | ||
"debug": "^2.6.6", | ||
"depd": "^1.1.0", | ||
"amqplib": "^0.5.2", | ||
"bluebird": "^3.5.1", | ||
"debug": "^3.1.0", | ||
"depd": "^1.1.1", | ||
"houkou": "^0.2.2", | ||
@@ -28,6 +28,7 @@ "lodash": "^4.17.4", | ||
"devDependencies": { | ||
"chai": "^3.5.0", | ||
"@types/chai": "^4.0.10", | ||
"chai": "^4.1.2", | ||
"gulp": "^3.9.1", | ||
"gulp-exclude-gitignore": "^1.1.1", | ||
"gulp-istanbul": "^1.1.1", | ||
"gulp-exclude-gitignore": "^1.2.0", | ||
"gulp-istanbul": "^1.1.2", | ||
"gulp-jshint": "^2.0.4", | ||
@@ -38,9 +39,9 @@ "gulp-mocha": "^4.3.1", | ||
"jshint-stylish": "^2.2.1", | ||
"mocha": "^3.3.0", | ||
"uuid": "^3.0.1" | ||
"mocha": "^4.0.1", | ||
"uuid": "^3.1.0" | ||
}, | ||
"scripts": { | ||
"test": "gulp" | ||
"test": "mocha test/**.test.js" | ||
}, | ||
"license": "MIT" | ||
} |
'use strict'; | ||
var t = require('chai').assert; | ||
var amqper = require('../'); | ||
const assert = require('chai').assert; | ||
const amqper = require('..'); | ||
@@ -21,5 +21,5 @@ function randomQueue() { | ||
it('should connect to rabbit server', function (done) { | ||
var client = amqper.connect('amqp://guest:guest@localhost:5672'); | ||
const client = amqper.connect('amqp://guest:guest@localhost:5672'); | ||
client.$promise.then(function (conn) { | ||
t.ok(conn); | ||
assert.ok(conn); | ||
client.close(done); | ||
@@ -32,10 +32,10 @@ }); | ||
it('should publish and received in route', function (done) { | ||
var data = { | ||
const data = { | ||
foo: 'bar1' | ||
}; | ||
var client = amqper.connect('amqp://guest:guest@localhost:5672'); | ||
const client = amqper.connect('amqp://guest:guest@localhost:5672'); | ||
client.$promise.then(function () { | ||
client.route('test1.:arg', {queue: randomQueue()}, function (message) { | ||
t.deepEqual(message.payload, data); | ||
assert.deepEqual(message.payload, data); | ||
delayCloseClient(client, done); | ||
@@ -51,11 +51,11 @@ }).then(function () { | ||
it('should publish and received in route with msgpack format', function (done) { | ||
var data = { | ||
const data = { | ||
hello: 'world' | ||
}; | ||
var client = amqper.connect('amqp://guest:guest@localhost:5672'); | ||
const client = amqper.connect('amqp://guest:guest@localhost:5672'); | ||
client.$promise.then(function () { | ||
client.format('msgpack'); | ||
client.route('test2.:arg', {queue: randomQueue()}, function (message) { | ||
t.deepEqual(message.payload, data); | ||
assert.deepEqual(message.payload, data); | ||
delayCloseClient(client, done); | ||
@@ -69,7 +69,7 @@ }).then(function () { | ||
it('should handle the error in handler', function (done) { | ||
var data = { | ||
const data = { | ||
hello: 'world' | ||
}; | ||
var client = amqper.connect('amqp://guest:guest@localhost:5672'); | ||
const client = amqper.connect('amqp://guest:guest@localhost:5672'); | ||
client.$promise.then(function () { | ||
@@ -83,3 +83,3 @@ client.route('test2.:arg', {queue: randomQueue()}, function () { | ||
client.on('error', function (err) { | ||
t.equal(err.message, 'boom'); | ||
assert.equal(err.message, 'boom'); | ||
delayCloseClient(client, done); | ||
@@ -86,0 +86,0 @@ }); |
84336
12
20
489
+ Addeddebug@3.2.7(transitive)
+ Addedms@2.1.3(transitive)
Updatedamqplib@^0.5.2
Updatedbluebird@^3.5.1
Updateddebug@^3.1.0
Updateddepd@^1.1.1