Comparing version 2.0.0-beta.7 to 2.0.0-beta.8
@@ -14,2 +14,3 @@ const packageJson = require('./package.json') | ||
this._emitter = new EventEmitter() | ||
this._publishPools = {} | ||
@@ -16,0 +17,0 @@ this._options = { |
@@ -7,11 +7,5 @@ const debug = require('debug')('remit:publishChannel') | ||
if (remit._publishChannel) { | ||
debug('Providing existing publish channel') | ||
return remit._publishChannel | ||
} | ||
debug('Requesting new publish channel') | ||
remit._publishChannel = new Promise((resolve, reject) => { | ||
return new Promise((resolve, reject) => { | ||
connect.apply(remit).then((connection) => { | ||
@@ -24,3 +18,3 @@ debug('Creating new publish channel') | ||
channel.prefetch(48) | ||
channel.prefetch(49) | ||
@@ -32,6 +26,4 @@ return resolve(channel) | ||
}) | ||
return remit._publishChannel | ||
} | ||
module.exports = getPublishChannel |
const debug = require('debug')('remit:reply') | ||
const getPublishChannel = require('./publishChannel') | ||
function consumeReplies () { | ||
function consumeReplies (type, request) { | ||
const remit = this | ||
if (remit._engaged) { | ||
debug('Providing existing reply consumption') | ||
debug('Requesting reply consumption') | ||
return remit._engaged | ||
} | ||
let channel | ||
debug('Requesting reply consumption') | ||
remit._engaged = new Promise((resolve, reject) => { | ||
return new Promise((resolve, reject) => { | ||
getPublishChannel.apply(remit) | ||
@@ -20,3 +16,9 @@ .then((publishChannel) => { | ||
return publishChannel.consume('amq.rabbitmq.reply-to', function (message) { | ||
channel = publishChannel | ||
if (!type.options.expectReply) { | ||
return | ||
} | ||
return channel.consume('amq.rabbitmq.reply-to', function (message) { | ||
debug('Consumed a reply', message.properties.correlationId) | ||
@@ -29,6 +31,6 @@ | ||
}) | ||
}).then((ok) => { | ||
}).then(() => { | ||
debug('Consuming new replies') | ||
return resolve() | ||
return resolve(channel) | ||
}).catch((err) => { | ||
@@ -38,6 +40,4 @@ return reject(err) | ||
}) | ||
return remit._engaged | ||
} | ||
module.exports = consumeReplies |
const debug = require('debug')('remit:callback-handler') | ||
const getPublishChannel = require('./assertions/publishChannel') | ||
@@ -31,4 +30,2 @@ module.exports = function getCallbackHandler (type, response, message, event) { | ||
function ack (message, event, type, responseData, response) { | ||
const remit = this | ||
debug('Acking') | ||
@@ -38,3 +35,3 @@ | ||
try { | ||
response._channel.ack(message) | ||
response._consumeChannel.ack(message) | ||
resolve() | ||
@@ -50,4 +47,2 @@ } catch (e) { | ||
function nack (message, requeue, event, type, response) { | ||
const remit = this | ||
debug('Nacking') | ||
@@ -57,3 +52,3 @@ | ||
try { | ||
response._channel.nack(message, false, requeue) | ||
response._consumeChannel.nack(message, false, requeue) | ||
resolve() | ||
@@ -82,5 +77,7 @@ } catch (e) { | ||
return getPublishChannel.apply(remit) | ||
}).then((publishChannel) => { | ||
publishChannel.sendToQueue(message.properties.replyTo, new Buffer(responseData), message.properties) | ||
response._publishChannel.sendToQueue( | ||
message.properties.replyTo, | ||
Buffer.from(responseData), | ||
message.properties | ||
) | ||
@@ -87,0 +84,0 @@ if (!shouldAck) { |
const debug = require('debug')('remit:request') | ||
const EventEmitter = require('eventemitter3') | ||
const getPublishChannel = require('./assertions/publishChannel') | ||
const consumeReplies = require('./assertions/reply') | ||
const uuid = require('uuid') | ||
const parseEvent = require('./utils/parseEvent') | ||
const getPublishChannel = require('./assertions/reply') | ||
@@ -27,2 +26,5 @@ function RequestType (masterOptions) { | ||
requestType._emitter = new EventEmitter() | ||
requestType._sentCallbacks = [] | ||
requestType._dataCallbacks = [] | ||
requestType._timeoutCallbacks = [] | ||
@@ -40,3 +42,4 @@ requestType.options = { | ||
requestType.sent = function onSent (callback) { | ||
requestType._emitter.on('sent', callback) | ||
// requestType._emitter.on('sent', callback) | ||
requestType._sentCallbacks.push(callback) | ||
@@ -64,6 +67,11 @@ return requestType | ||
const messageId = uuid.v4() | ||
let trace | ||
let expiration | ||
let expirationGroup | ||
let timeout | ||
let event | ||
debug(`${messageId} - Request for ${options.event} created`) | ||
try { | ||
@@ -76,29 +84,29 @@ const re = /^\s{4}.+?(\/[^/].+?)\)/gm | ||
;(type.options.expectReply ? consumeReplies.apply(remit) : Promise.resolve()) | ||
.then(() => { | ||
if (isNaN(localOpts.delay) && !localOpts.schedule) { | ||
return getPublishChannel.apply(remit) | ||
} | ||
let demitQueue | ||
if ((!localOpts.delay || isNaN(localOpts.delay)) && (!localOpts.schedule || !(localOpts.schedule instanceof Date) || localOpts.schedule.toString() === 'Invalid Date')) { | ||
throw new Error('Invalid schedule date or delay duration when attempting to send a delayed emission') | ||
} | ||
if (!isNaN(localOpts.delay) || localOpts.schedule) { | ||
if ((!localOpts.delay || isNaN(localOpts.delay)) && (!localOpts.schedule || !(localOpts.schedule instanceof Date) || localOpts.schedule.toString() === 'Invalid Date')) { | ||
throw new Error('Invalid schedule date or delay duration when attempting to send a delayed emission') | ||
} | ||
if (localOpts.schedule) { | ||
expiration = +localOpts.schedule - now | ||
} else { | ||
expiration = localOpts.delay | ||
} | ||
if (localOpts.schedule) { | ||
expirationGroup = +localOpts.schedule | ||
expiration = +localOpts.schedule - now | ||
} else { | ||
expirationGroup = localOpts.delay | ||
expiration = localOpts.delay | ||
} | ||
if (expiration <= 0) { | ||
return getPublishChannel.apply(remit) | ||
} | ||
return new Promise((resolve, reject) => { | ||
if (expiration > 0) { | ||
demitQueue = new Promise((resolve, reject) => { | ||
let workChannel | ||
console.log('acquiring') | ||
remit._workChannelPool.acquire().then((channel) => { | ||
workChannel = channel | ||
return workChannel.assertQueue(`d:${remit._options.exchange}:${options.event}:${expiration}`, { | ||
console.log('got') | ||
return workChannel.assertQueue(`d:${remit._options.exchange}:${options.event}:${expirationGroup}`, { | ||
messageTtl: expiration, | ||
@@ -110,95 +118,116 @@ exclusive: false, | ||
deadLetterRoutingKey: options.event, | ||
expires: expiration + 5000 | ||
expires: expiration * 2 | ||
}) | ||
}).then((ok) => { | ||
}).then(() => { | ||
console.log('asserted') | ||
remit._workChannelPool.release(workChannel) | ||
return getPublishChannel.apply(remit) | ||
}).then((publishChannel) => { | ||
return resolve(publishChannel) | ||
}).catch((err) => { | ||
return remit._publishPools[options.event] | ||
}).then(resolve).catch((err) => { | ||
remit._workChannelPool.destroy(workChannel) | ||
throw err | ||
reject(err) | ||
}) | ||
}) | ||
}).then((publishChannel) => { | ||
debug('Sending message...') | ||
} else { | ||
demitQueue = remit._publishPools[options.event] | ||
} | ||
} else { | ||
demitQueue = remit._publishPools[options.event] | ||
} | ||
let messageOptions = { | ||
mandatory: true, | ||
messageId: messageId, | ||
appId: remit._options.name, | ||
timestamp: now, | ||
let messageOptions = { | ||
mandatory: true, | ||
messageId: messageId, | ||
appId: remit._options.name, | ||
timestamp: now, | ||
headers: { | ||
trace: trace | ||
} | ||
headers: { | ||
trace: trace | ||
} | ||
} | ||
if (type.options.expectReply) { | ||
messageOptions.correlationId = messageOptions.messageId | ||
messageOptions.replyTo = 'amq.rabbitmq.reply-to' | ||
remit._emitter.once(messageOptions.correlationId, (message) => { | ||
let messageContent | ||
clearTimeout(timeout) | ||
try { | ||
messageContent = JSON.parse(message.content.toString()) | ||
} catch (e) { | ||
console.trace('Error processing message') | ||
} | ||
if (type.options.expectReply) { | ||
messageOptions.correlationId = messageOptions.messageId | ||
messageOptions.replyTo = 'amq.rabbitmq.reply-to' | ||
type._emitter.emit(`data-${messageId}`, ...messageContent) | ||
request._emitter.emit(`data-${messageId}`, ...messageContent) | ||
}) | ||
remit._emitter.once(messageOptions.correlationId, (message) => { | ||
let messageContent | ||
clearTimeout(timeout) | ||
timeout = setTimeout(() => { | ||
const timeoutOpts = { | ||
code: 'timeout', | ||
message: 'Request timed out after 30000ms' | ||
} | ||
try { | ||
messageContent = JSON.parse(message.content.toString()) | ||
} catch (e) { | ||
console.trace('Error processing message') | ||
} | ||
type._emitter.emit(`timeout-${messageId}`, timeoutOpts) | ||
request._emitter.emit(`timeout-${messageId}`, timeoutOpts) | ||
}, 30000) | ||
} | ||
type._emitter.emit('data', ...messageContent) | ||
request._emitter.emit('data', ...messageContent) | ||
}) | ||
if (localOpts.priority) { | ||
if (localOpts.priority > 10 || localOpts.priority < 0) { | ||
throw new Error('Invalid priority', localOpts.priority, 'when pushing message') | ||
} | ||
timeout = setTimeout(() => { | ||
const timeoutOpts = { | ||
code: 'timeout', | ||
message: 'Request timed out after 5000ms' | ||
} | ||
messageOptions.priority = localOpts.priority | ||
} | ||
type._emitter.emit('timeout', timeoutOpts) | ||
request._emitter.emit('timeout', timeoutOpts) | ||
}, 30000) | ||
} | ||
if (localOpts.schedule || localOpts.delay) { | ||
if (localOpts.schedule) { | ||
messageOptions.headers.schedule = +localOpts.schedule | ||
} else { | ||
messageOptions.headers.delay = localOpts.delay | ||
} | ||
} | ||
if (localOpts.priority) { | ||
if (localOpts.priority > 10 || localOpts.priority < 0) { | ||
throw new Error('Invalid priority', localOpts.priority, 'when pushing message') | ||
} | ||
event = parseEvent(messageOptions, { | ||
routingKey: options.event | ||
}, data) | ||
messageOptions.priority = localOpts.priority | ||
} | ||
debug(`${messageId} - Getting publish channel and setting up queues`) | ||
if (localOpts.schedule || localOpts.delay) { | ||
if (localOpts.schedule) { | ||
messageOptions.headers.schedule = +localOpts.schedule | ||
} else { | ||
messageOptions.headers.delay = localOpts.delay | ||
} | ||
demitQueue.then((publishChannel) => { | ||
debug(`${messageId} - Pushing message to queue`) | ||
publishChannel.sendToQueue( | ||
`d:${remit._options.exchange}:${options.event}:${expiration}`, | ||
new Buffer(JSON.stringify(data)), | ||
messageOptions | ||
) | ||
if (localOpts.schedule || localOpts.delay) { | ||
if (localOpts.schedule) { | ||
messageOptions.headers.schedule = +localOpts.schedule | ||
} else { | ||
publishChannel.publish( | ||
remit._options.exchange, | ||
options.event, | ||
new Buffer(JSON.stringify(data)), | ||
messageOptions | ||
) | ||
messageOptions.headers.delay = localOpts.delay | ||
} | ||
const sentEvent = parseEvent(messageOptions, { | ||
routingKey: options.event | ||
}, data) | ||
console.log('sending') | ||
type._emitter.emit('sent', sentEvent) | ||
request._emitter.emit('sent', sentEvent) | ||
}) | ||
publishChannel.sendToQueue( | ||
`d:${remit._options.exchange}:${options.event}:${expirationGroup}`, | ||
Buffer.from(JSON.stringify(data)), | ||
messageOptions | ||
) | ||
console.log('sent to queue') | ||
} else { | ||
publishChannel.publish( | ||
remit._options.exchange, | ||
options.event, | ||
Buffer.from(JSON.stringify(data)), | ||
messageOptions | ||
) | ||
} | ||
type._emitter.emit(`sent-${event.eventId}`, event) | ||
request._emitter.emit(`sent-${event.eventId}`, event) | ||
}) | ||
return new Promise((resolve, reject) => { | ||
@@ -209,11 +238,11 @@ const cleanUp = (err, result) => { | ||
if (type.options.expectReply) { | ||
request._emitter.removeListener('data', cleanUp) | ||
request._emitter.removeListener('timeout', cleanUp) | ||
request._emitter.removeAllListeners(`data-${messageId}`) | ||
request._emitter.removeAllListeners(`timeout-${messageId}`) | ||
} else { | ||
request._emitter.removeListener('sent', cleanUp) | ||
request._emitter.removeAllListeners(`sent-${messageId}`) | ||
} | ||
request._emitter.removeListener('error', cleanUp) | ||
request._emitter.removeAllListeners(`error-${messageId}`) | ||
if (err) return reject(err) | ||
if (err) return reject((err && err.message) || err) | ||
return resolve(result) | ||
@@ -223,9 +252,28 @@ } | ||
if (type.options.expectReply) { | ||
request._emitter.on('data', cleanUp) | ||
request._emitter.on('timeout', cleanUp) | ||
request._emitter.once(`data-${messageId}`, cleanUp) | ||
type._dataCallbacks.forEach((callback) => { | ||
request._emitter.once(`data-${messageId}`, callback) | ||
}) | ||
request._dataCallbacks.forEach((callback) => { | ||
request._emitter.once(`data-${messageId}`, callback) | ||
}) | ||
request._emitter.once(`timeout-${messageId}`, cleanUp) | ||
type._timeoutCallbacks.forEach((callback) => { | ||
request._emitter.once(`timeout-${messageId}`, callback) | ||
}) | ||
request._timeoutCallbacks.forEach((callback) => { | ||
request._emitter.once(`timeout-${messageId}`, callback) | ||
}) | ||
} else { | ||
request._emitter.on('sent', a => cleanUp(null, a)) | ||
request._emitter.once(`sent-${messageId}`, a => cleanUp(null, a)) | ||
type._sentCallbacks.forEach((callback) => { | ||
request._emitter.once(`sent-${messageId}`, callback) | ||
}) | ||
request._sentCallbacks.forEach((callback) => { | ||
request._emitter.once(`sent-${messageId}`, callback) | ||
}) | ||
} | ||
request._emitter.on('error', cleanUp) | ||
request._emitter.on(`error-${messageId}`, cleanUp) | ||
}) | ||
@@ -236,2 +284,10 @@ } | ||
request._options = {} | ||
request._dataCallbacks = [] | ||
request._sentCallbacks = [] | ||
request._timeoutCallbacks = [] | ||
if (!remit._publishPools[options.event]) { | ||
remit._publishPools[options.event] = getPublishChannel.apply(remit, [type, request]) | ||
} | ||
request.send = request | ||
@@ -246,3 +302,3 @@ | ||
request.data = function onData (callback) { | ||
request._emitter.on('data', callback) | ||
request._dataCallbacks.push(callback) | ||
@@ -253,3 +309,3 @@ return request | ||
request.sent = function onSent (callback) { | ||
request._emitter.on('sent', callback) | ||
request._sentCallbacks.push(callback) | ||
@@ -260,3 +316,3 @@ return request | ||
request.timeout = function onTimeout (callback) { | ||
request._emitter.on('timeout', callback) | ||
request._timeoutCallbacks.push(callback) | ||
@@ -263,0 +319,0 @@ return request |
@@ -119,10 +119,15 @@ const debug = require('debug')('remit:response') | ||
}).then((connection) => { | ||
return connection.createChannel() | ||
}).then((consumeChannel) => { | ||
return Promise.all([ | ||
connection.createChannel(), | ||
connection.createChannel() | ||
]) | ||
}).then(([ consumeChannel, publishChannel ]) => { | ||
debug('Binding event') | ||
consumeChannel.prefetch(48) | ||
response._channel = consumeChannel | ||
publishChannel.prefetch(49) | ||
response._consumeChannel = consumeChannel | ||
response._publishChannel = publishChannel | ||
return response._channel.bindQueue( | ||
return response._consumeChannel.bindQueue( | ||
options.queue, | ||
@@ -135,3 +140,3 @@ remit._options.exchange, | ||
return response._channel.consume(options.queue, (message) => { | ||
return response._consumeChannel.consume(options.queue, (message) => { | ||
if (!message) { | ||
@@ -138,0 +143,0 @@ return console.trace('Consumer cancelled') |
{ | ||
"name": "remit", | ||
"version": "2.0.0-beta.7", | ||
"version": "2.0.0-beta.8", | ||
"description": "A small set of functionality used to create microservices that don't need to be aware of one-another's existence.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -12,37 +12,27 @@ # _Remit_ | ||
``` js | ||
// Create our Remit connection | ||
const remit = require('remit')({ | ||
url: 'localhost', | ||
name: 'a.micro.service' | ||
}) | ||
const remit = require('remit')() | ||
// Set up an endpoint with the name 'micro.service.info' | ||
remit | ||
.endpoint('micro.service.info') | ||
;(async function () { | ||
const endpoint = remit | ||
.endpoint('user.profile') | ||
.data((event, callback) => { | ||
const user = getUser({ id: event.data.id }) | ||
// When the endpoint is hit, run this function | ||
.data((data, callback) => { | ||
console.log('Endpoint was hit!') | ||
callback(null, user) | ||
}) | ||
data.foo = 'bar' | ||
await endpoint.ready() | ||
// Reply with the mutated data | ||
return callback(null, data) | ||
try { | ||
var user = await remit | ||
.request('user.profile') | ||
.send({ id: 123 }) | ||
} catch (e) { | ||
console.error('Error getting user', e) | ||
} | ||
// Once the endpoint is ready... | ||
}).ready().then(() => { | ||
// Make a request to the 'micro.service.save' endpoint | ||
// with the data {name: 'a.micro.service'} | ||
return remit | ||
.request('micro.service.save') | ||
.send({name: 'a.micro.service'}) | ||
}).then((result) => { | ||
// When the reply comes back, log the response. | ||
console.log('Saved microservice info', result) | ||
}).catch((err) => { | ||
// If the request failed (the replying microservice returned | ||
// an error, the request timed out or couldn't be routed), | ||
// log the error. | ||
console.error('Couldn\'t seem to save microservice info', err) | ||
}) | ||
if (user) { | ||
console.log('Got user', user) | ||
} | ||
})() | ||
``` | ||
@@ -109,5 +99,3 @@ | ||
* [#request / #req](#remitrequestendpoint-options--request-) | ||
* [#persistentRequest / #preq](#templates) | ||
* [#emit](#templates) | ||
* [#delayedEmit / #demit](#templates) | ||
* [Response](#responsedata-) | ||
@@ -125,14 +113,6 @@ * [#respond / #res / #endpoint](#remitrespondendpoint-options--response-) | ||
* [`request`](#remitrequestendpoint-options--request-) | ||
* [`req`](#remitrequestendpoint-options--request-) | ||
* [`persistentRequest`](#templates) | ||
* [`preq`](#templates) | ||
* [`request` (`req`)](#remitrequestendpoint-options--request-) | ||
* [`emit`](#templates) | ||
* [`delayedEmit`](#templates) | ||
* [`demit`](#templates) | ||
* [`respond`](#remitrespondendpoint-options--response-) | ||
* [`res`](#remitrespondendpoint-options--response-) | ||
* [`endpoint`](#remitrespondendpoint-options--response-) | ||
* [`listen`](#templates-1) | ||
* [`on`](#templates-1) | ||
* [`respond` (`res`, `endpoint`)](#remitrespondendpoint-options--response-) | ||
* [`listen` (`on`)](#templates-1) | ||
@@ -199,25 +179,2 @@ ------ | ||
##### Templates | ||
Some basic options templates are also set up as separate functions to allow for some semantically-pleasing set-ups without having to skim through objects to figure out what's happening. | ||
``` js | ||
// remit.persistentRequest | ||
// remit.preq | ||
{ | ||
"some": "thing" | ||
} | ||
// remit.emit | ||
{ | ||
"some": "thing" | ||
} | ||
// remit.delayedEmit | ||
// remit.demit | ||
{ | ||
"some": "thing" | ||
} | ||
``` | ||
##### AMQ behaviour | ||
@@ -224,0 +181,0 @@ |
43662
985
237