Comparing version 2.0.0-beta.2 to 2.0.0-beta.3
34
emit.js
const remit = require('./')() | ||
;(function loop () { | ||
remit.treq('jtd.test', {}, loop) | ||
// remit.request.timeout(() => {console.log('global timeout')}) | ||
// remit.request.sent(() => {console.log('global sent')}) | ||
// remit.request.data(() => {console.log('global data')}) | ||
remit | ||
.respond('one.two.three') | ||
.data((event, callback) => { | ||
return callback(null, {bar: 'baz'}) | ||
}) | ||
.ready(() => { | ||
console.log('ready') | ||
}) | ||
// remit.respond('foo') | ||
// remit.respond('bar') | ||
// remit.respond('baz') | ||
// remit.respond('qux') | ||
const foo = remit | ||
.request('one.two.three') | ||
// .data(() => { | ||
// console.log('data') | ||
// }) | ||
// .sent(() => { | ||
// console.log('sent') | ||
// }) | ||
// .timeout(() => { | ||
// console.log('timeout') | ||
// }) | ||
;(function bar () { | ||
foo.send({foo: 'bar'}).then(bar) | ||
})() |
@@ -0,1 +1,2 @@ | ||
const packageJson = require('./package.json') | ||
const EventEmitter = require('eventemitter3') | ||
@@ -6,2 +7,3 @@ const Request = require('./lib/Request') | ||
const connect = require('./lib/assertions/connection') | ||
const bootWorkChannelPool = require('./lib/assertions/bootWorkChannelPool') | ||
@@ -11,2 +13,3 @@ function Remit (options) { | ||
this.version = packageJson.version | ||
this._emitter = new EventEmitter() | ||
@@ -70,2 +73,3 @@ | ||
connect.apply(this, [this._options]) | ||
this._workChannelPool = bootWorkChannelPool.apply(this) | ||
@@ -72,0 +76,0 @@ return this |
@@ -23,3 +23,3 @@ const debug = require('debug')('remit:publishChannel') | ||
channel.prefetch(128) | ||
channel.prefetch(48) | ||
@@ -26,0 +26,0 @@ return resolve(channel) |
const debug = require('debug')('remit:callback-handler') | ||
const getConsumeChannel = require('./assertions/consumeChannel') | ||
const getWorkChannel = require('./assertions/workChannel') | ||
// const getConsumeChannel = require('./assertions/consumeChannel') | ||
// const getWorkChannel = require('./assertions/workChannel') | ||
const getPublishChannel = require('./assertions/publishChannel') | ||
module.exports = function getCallbackHandler (type, message) { | ||
module.exports = function getCallbackHandler (type, response, message, event) { | ||
const remit = this | ||
@@ -12,3 +12,4 @@ | ||
const responseData = new Buffer(JSON.stringify(Array.from(arguments).slice(0, 2))) | ||
event.finished = new Date() | ||
const responseData = JSON.stringify(Array.from(arguments).slice(0, 2)) | ||
const shouldAck = !!type.options.shouldAck | ||
@@ -18,3 +19,3 @@ const shouldReply = !!message.properties.replyTo | ||
if (shouldReply) { | ||
return reply.apply(remit, [message, responseData, shouldAck]) | ||
return reply.apply(remit, [message, responseData, shouldAck, event, type, response]) | ||
} | ||
@@ -25,10 +26,11 @@ | ||
if (shouldAck) { | ||
return ack.apply(remit, [message]) | ||
return ack.apply(remit, [message, event, type, undefined, response]) | ||
} | ||
debug('No need to acknowledge') | ||
type._emitter.emit('done', event) | ||
} | ||
} | ||
function ack (message) { | ||
function ack (message, event, type, responseData, response) { | ||
const remit = this | ||
@@ -39,16 +41,14 @@ | ||
return new Promise((resolve, reject) => { | ||
getConsumeChannel.apply(remit) | ||
.then((consumeChannel) => { | ||
try { | ||
consumeChannel.ack(message) | ||
return resolve() | ||
} catch (e) { | ||
return reject(e) | ||
} | ||
}) | ||
try { | ||
response._channel.ack(message) | ||
resolve() | ||
} catch (e) { | ||
reject(e) | ||
} finally { | ||
type._emitter.emit('done', event, responseData) | ||
} | ||
}) | ||
} | ||
function nack (message, requeue) { | ||
function nack (message, requeue, event, type, response) { | ||
const remit = this | ||
@@ -59,16 +59,14 @@ | ||
return new Promise((resolve, reject) => { | ||
getConsumeChannel.apply(remit) | ||
.then((consumeChannel) => { | ||
try { | ||
consumeChannel.nack(message, false, requeue) | ||
return resolve() | ||
} catch (e) { | ||
return reject(e) | ||
} | ||
}) | ||
try { | ||
response._channel.nack(message, false, requeue) | ||
resolve() | ||
} catch (e) { | ||
reject(e) | ||
} finally { | ||
type._emitter.emit('done', event) | ||
} | ||
}) | ||
} | ||
function reply (message, responseData, shouldAck) { | ||
function reply (message, responseData, shouldAck, event, type, response) { | ||
const remit = this | ||
@@ -78,21 +76,34 @@ | ||
getWorkChannel.apply(remit).then((workChannel) => { | ||
let workChannel | ||
remit._workChannelPool.acquire().then((channel) => { | ||
// getWorkChannel.apply(remit).then((workChannel) => { | ||
workChannel = channel | ||
return workChannel.checkQueue(message.properties.replyTo) | ||
}).then((ok) => { | ||
remit._workChannelPool.release(workChannel) | ||
return getPublishChannel.apply(remit) | ||
}).then((publishChannel) => { | ||
publishChannel.sendToQueue(message.properties.replyTo, responseData, message.properties) | ||
publishChannel.sendToQueue(message.properties.replyTo, new Buffer(responseData), message.properties) | ||
if (!shouldAck) { | ||
type._emitter.emit('done', event, responseData) | ||
return | ||
} | ||
return ack.apply(remit, [message]) | ||
return ack.apply(remit, [message, event, type, responseData, response]) | ||
}).catch((err) => { | ||
remit._workChannelPool.destroy(workChannel) | ||
if (err.message && err.message.substr(0, 16) === 'Operation failed') { | ||
if (!shouldAck) { | ||
type._emitter.emit('done', event) | ||
return | ||
} | ||
return nack.apply(remit, [message, false]) | ||
return nack.apply(remit, [message, false, event, type, response]) | ||
} | ||
@@ -102,4 +113,4 @@ | ||
return reply.apply(remit, [message, responseData, shouldAck]) | ||
return reply.apply(remit, [message, responseData, shouldAck, event, type, response]) | ||
}) | ||
} |
@@ -24,4 +24,2 @@ const getCallbackHandler = require('./handleCallback') | ||
response._emitter.once('done', getCallbackHandler.apply(remit, [type, message])) | ||
let event = { | ||
@@ -35,2 +33,6 @@ eventId: message.properties.messageId, | ||
if (message.properties.headers) { | ||
if (message.properties.headers.uuid) { | ||
event.eventId = message.properties.headers.uuid | ||
} | ||
if (message.properties.headers.scheduled) { | ||
@@ -53,2 +55,5 @@ event.scheduled = new Date(message.properties.headers.scheduled) | ||
event.started = new Date() | ||
response._emitter.once('done', getCallbackHandler.apply(remit, [type, response, message, event])) | ||
type._emitter.emit('data', event, callback) | ||
@@ -55,0 +60,0 @@ response._emitter.emit('data', event, callback) |
const debug = require('debug')('remit:request') | ||
const EventEmitter = require('eventemitter3') | ||
const getPublishChannel = require('./assertions/publishChannel') | ||
const getWorkChannel = require('./assertions/workChannel') | ||
// const getWorkChannel = require('./assertions/workChannel') | ||
const consumeReplies = require('./assertions/reply') | ||
@@ -44,2 +44,8 @@ const uuid = require('uuid') | ||
requestType.timeout = function onTimeout (callback) { | ||
requestType._emitter.on('timeout', callback) | ||
return requestType | ||
} | ||
return requestType | ||
@@ -58,2 +64,3 @@ } | ||
let expiration | ||
let timeout | ||
@@ -86,4 +93,8 @@ try { | ||
return new Promise((resolve, reject) => { | ||
getWorkChannel.apply(remit).then((workChannel) => { | ||
return workChannel.assertQueue(`demission-${messageId}`, { | ||
let workChannel | ||
remit._workChannelPool.acquire().then((channel) => { | ||
workChannel = channel | ||
return workChannel.assertQueue(`d:${remit._options.exchange}:${options.event}:${expiration}`, { | ||
messageTtl: expiration, | ||
@@ -99,5 +110,10 @@ exclusive: false, | ||
}).then((ok) => { | ||
remit._workChannelPool.release(workChannel) | ||
return getPublishChannel.apply(remit) | ||
}).then((publishChannel) => { | ||
return resolve(publishChannel) | ||
}).catch((err) => { | ||
remit._workChannelPool.destroy(workChannel) | ||
throw err | ||
}) | ||
@@ -125,2 +141,3 @@ }) | ||
let messageContent | ||
clearTimeout(timeout) | ||
@@ -136,2 +153,12 @@ try { | ||
}) | ||
timeout = setTimeout(() => { | ||
const timeoutOpts = { | ||
code: 'timeout', | ||
message: 'Request timed out after 5000ms' | ||
} | ||
type._emitter.emit('timeout', timeoutOpts) | ||
request._emitter.emit('timeout', timeoutOpts) | ||
}, 30000) | ||
} | ||
@@ -143,3 +170,3 @@ | ||
publishChannel.sendToQueue( | ||
`demission-${messageOptions.messageId}`, | ||
`d:${remit._options.exchange}:${options.event}:${expiration}`, | ||
new Buffer(JSON.stringify(data)), | ||
@@ -162,12 +189,14 @@ messageOptions | ||
return new Promise((resolve, reject) => { | ||
request._emitter.on('data', (err, result) => { | ||
if (err) { | ||
return reject(err) | ||
} | ||
const cleanUp = (err, result) => { | ||
request._emitter.removeListener('data', cleanUp) | ||
request._emitter.removeListener('timeout', cleanUp) | ||
request._emitter.removeListener('error', cleanUp) | ||
if (err) return reject(err) | ||
return resolve(result) | ||
}) | ||
} | ||
request._emitter.once('timeout', reject) | ||
request._emitter.once('error', reject) | ||
request._emitter.on('data', cleanUp) | ||
request._emitter.on('timeout', cleanUp) | ||
request._emitter.on('error', cleanUp) | ||
}) | ||
@@ -191,2 +220,8 @@ } | ||
request.timeout = function onTimeout (callback) { | ||
request._emitter.on('timeout', callback) | ||
return request | ||
} | ||
return request | ||
@@ -193,0 +228,0 @@ } |
const debug = require('debug')('remit:response') | ||
const async = require('async') | ||
const EventEmitter = require('eventemitter3') | ||
const getWorkChannel = require('./assertions/workChannel') | ||
const getConsumeChannel = require('./assertions/consumeChannel') | ||
// const getWorkChannel = require('./assertions/workChannel') | ||
// const getConsumeChannel = require('./assertions/consumeChannel') | ||
const handleMessage = require('./handleMessage') | ||
@@ -52,2 +53,8 @@ | ||
responseType.done = function onDone (callback) { | ||
responseType._emitter.on('done', callback) | ||
return responseType | ||
} | ||
return responseType | ||
@@ -76,15 +83,38 @@ } | ||
response.data = function onData (callback) { | ||
response._emitter.on('data', callback) | ||
response.data = function onData (callbacks) { | ||
callbacks = Array.isArray(callbacks) ? callbacks : [callbacks] | ||
const finalCallback = callbacks.pop() | ||
if (!callbacks.length) { | ||
response._emitter.on('data', finalCallback) | ||
} else { | ||
const run = async.seq(...callbacks) | ||
response._emitter.on('data', (event, callback) => { | ||
run(event, (err, event) => { | ||
if (err) { | ||
return callback(err) | ||
} | ||
finalCallback(event, callback) | ||
}) | ||
}) | ||
} | ||
return response | ||
} | ||
getWorkChannel.apply(remit).then((workChannel) => { | ||
let workChannel | ||
// getWorkChannel.apply(remit).then((workChannel) => { | ||
remit._workChannelPool.acquire().then((channel) => { | ||
debug('Asserting endpoint', options.event) | ||
workChannel = channel | ||
return workChannel.assertQueue(options.queue, { | ||
exclusive: false, | ||
durable: true, | ||
autoDelete: false | ||
autoDelete: false, | ||
maxPriority: 10 | ||
}) | ||
@@ -94,7 +124,14 @@ }).then((queueData) => { | ||
return getConsumeChannel.apply(remit) | ||
remit._workChannelPool.release(workChannel) | ||
return remit._connection | ||
}).then((connection) => { | ||
return connection.createChannel() | ||
}).then((consumeChannel) => { | ||
debug('Binding event') | ||
return consumeChannel.bindQueue( | ||
consumeChannel.prefetch(48) | ||
response._channel = consumeChannel | ||
return response._channel.bindQueue( | ||
options.queue, | ||
@@ -105,7 +142,5 @@ remit._options.exchange, | ||
}).then(() => { | ||
return getConsumeChannel.apply(remit) | ||
}).then((consumeChannel) => { | ||
debug('Consuming messages') | ||
return consumeChannel.consume(options.queue, (message) => { | ||
return response._channel.consume(options.queue, (message) => { | ||
if (!message) { | ||
@@ -121,6 +156,7 @@ return console.trace('Consumer cancelled') | ||
}).then(() => { | ||
type._emitter.emit('ready') | ||
response._emitter.emit('ready') | ||
type._emitter.emit('ready', options) | ||
response._emitter.emit('ready', options) | ||
}).catch((err) => { | ||
console.trace('Threw error here unexpectedly', err) | ||
remit._workChannelPool.destroy(workChannel) | ||
}) | ||
@@ -127,0 +163,0 @@ |
@@ -24,4 +24,15 @@ const url = require('url') | ||
parsedUrl.query = { | ||
frameMax: '0xf000', // 61,440 (~62KB) | ||
channelMax: '3', // We should never have more than this | ||
// Maximum permissible size of a frame (in bytes) | ||
// to negotiate with clients. Setting to 0 means | ||
// "unlimited" but will trigger a bug in some QPid | ||
// clients. Setting a larger value may improve | ||
// throughput; setting a smaller value may improve | ||
// latency. | ||
// I default it to 0x1000, i.e. 4kb, which is the | ||
// allowed minimum, will fit many purposes, and not | ||
// chug through Node.JS's buffer pooling. | ||
// | ||
// frameMax: '0x20000', // 131,072 (128kb) | ||
// | ||
frameMax: '0x1000', // 4,096 (4kb) | ||
heartbeat: '15' // Frequent hearbeat | ||
@@ -28,0 +39,0 @@ } |
{ | ||
"name": "remit", | ||
"version": "2.0.0-beta.2", | ||
"version": "2.0.0-beta.3", | ||
"description": "A small set of functionality used to create microservices that don't need to be aware of one-another's existence.", | ||
@@ -18,6 +18,7 @@ "main": "index.js", | ||
"amqplib": "^0.5.1", | ||
"debug": "^2.4.1", | ||
"stack-trace": "0.0.9", | ||
"async": "^2.4.1", | ||
"debug": "^2.4.4", | ||
"eventemitter3": "^1.2.0", | ||
"generic-pool": "^3.1.7", | ||
"stack-trace": "0.0.9", | ||
"uuid": "^3.0.1" | ||
@@ -24,0 +25,0 @@ }, |
@@ -21,3 +21,5 @@ /* global describe, it, expect, sinon, before */ | ||
}) | ||
.ready(done) | ||
.ready((options) => { | ||
return done() | ||
}) | ||
}) | ||
@@ -24,0 +26,0 @@ |
@@ -16,3 +16,5 @@ /* global describe, it, expect, sinon, remit */ | ||
.endpoint('holistic.request.response') | ||
.ready(done) | ||
.ready((options) => { | ||
return done() | ||
}) | ||
@@ -19,0 +21,0 @@ expect(endpoint).to.be.an('object') |
137473
1532
7
+ Addedasync@^2.4.1
+ Addedgeneric-pool@^3.1.7
+ Addedasync@2.6.4(transitive)
+ Addedgeneric-pool@3.9.0(transitive)
+ Addedlodash@4.17.21(transitive)