Comparing version 1.1.7 to 1.2.0
499
index.js
'use strict' | ||
const master_debug = require('debug') | ||
const os = require('os') | ||
const uuid = require('node-uuid').v4 | ||
const amqplib = require('amqplib/callback_api') | ||
const os = require('os') | ||
const amqplib = require('amqplib') | ||
@@ -19,25 +18,24 @@ module.exports = function (opts) { | ||
if (!opts) opts = {} | ||
// Exposed items | ||
this._service_name = opts.name || '' | ||
this._url = opts.url || 'amqp://localhost' | ||
// Global items | ||
this._connection = opts.connection || null | ||
this._channel = opts.channel || null | ||
this._exchange = opts.exchange || null | ||
// Callback queues | ||
this._connection_callbacks = [] | ||
this._channel = opts.channel || null | ||
this._emit_on_error = opts.emit_on_error || null | ||
this._exchange = null | ||
this._exchange_name = 'remit' | ||
this._exchange_callbacks = [] | ||
this._results_queue = null | ||
this._results_name = null | ||
this._results_callback = {} | ||
// Callback trackers | ||
this._results_callbacks = {} | ||
this._results_timeouts = {} | ||
this._results_callbacks = [] | ||
// States | ||
this._consuming_results = false | ||
this._listener_count = 0 | ||
this._events = {} | ||
return this | ||
@@ -59,15 +57,14 @@ } | ||
Remit.prototype.res = function res (event, callback, context, options) { | ||
const debug = master_debug('remit.res') | ||
if (this._events[event]) return false | ||
if (!options) options = {} | ||
debug(`Creating "${event}" endpoint`) | ||
const self = this | ||
// Set up default options if we haven't been given any. | ||
if (!options) { | ||
options = {} | ||
} | ||
self.__connect(() => { | ||
self.__assert_exchange(() => { | ||
const chosen_queue = options.queueName || event | ||
// TODO Check this for a valid response | ||
self._channel.assertQueue(chosen_queue, { | ||
@@ -78,25 +75,14 @@ exclusive: false, | ||
}) | ||
self._channel.bindQueue(chosen_queue, 'remit', event, {}, (err, ok) => { | ||
if (err) throw new Error(err) | ||
self._channel.bindQueue(chosen_queue, 'remit', event).then(() => { | ||
self._channel.consume(chosen_queue, (message) => { | ||
debug(`New message`) | ||
if (!message.properties.timestamp) { | ||
debug('No timestamp; processing message now') | ||
self.__consume_res(message, callback, context) | ||
} else { | ||
debug('Timestamp found') | ||
const time_to_wait = parseInt(message.properties.timestamp) - new Date().getTime() | ||
const time_to_wait = parseInt(message.properties.timestamp - new Date().getTime()) | ||
if (time_to_wait < 0) { | ||
debug('Timestamp obsolete; processing message now') | ||
self.__consume_res(message, callback, context) | ||
} else { | ||
debug(`Processing message in ${time_to_wait}ms`) | ||
setTimeout(() => { | ||
debug(`Processing message after waiting for ${time_to_wait}ms...`) | ||
self.__consume_res(message, callback, context) | ||
@@ -109,3 +95,3 @@ }, time_to_wait) | ||
}) | ||
}) | ||
}).then(null, console.error) | ||
}) | ||
@@ -120,56 +106,32 @@ }) | ||
Remit.prototype.treq = function reqt (event, args, callback, options) { | ||
const debug = master_debug('remit.treq') | ||
const self = this | ||
if (!options) options = {} | ||
if (!options.expiration) options.expiration = 5000 | ||
if (!options.timeout) options.timeout = 5000 | ||
self.req(event, args, callback, options) | ||
} | ||
Remit.prototype.req = function req (event, args, callback, options) { | ||
const debug = master_debug('remit.req') | ||
if (!options) options = {} | ||
const self = this | ||
const correlation_id = uuid() | ||
if (!options) { | ||
options = {} | ||
} | ||
self.__connect(() => { | ||
self.__assert_exchange(() => { | ||
if (!callback) { | ||
debug(`No callback specified for req; publishing "${event}" message now`) | ||
debug(options) | ||
return self._channel.publish('remit', event, new Buffer(JSON.stringify(args || {})), options) | ||
} | ||
self._results_name = `${this._service_name}:callback:${os.hostname()}:${process.pid}` | ||
self._channel.assertQueue(self._results_name, { | ||
exclusive: false, | ||
durable: false, | ||
autoDelete: true | ||
}) | ||
self._channel.bindQueue(self._results_name, 'remit', self._results_name, {}, (err, ok) => { | ||
if (err) throw new Error(err) | ||
if (!self._results_queue) { | ||
self._results_queue = true | ||
self._channel.consume(self._results_name, function (message) { | ||
self.__on_result.apply(self, arguments) | ||
}) | ||
} | ||
self._results_callback[correlation_id] = { | ||
if (!self._consuming_results) { | ||
self._consuming_results = true | ||
self._channel.consume('amq.rabbitmq.reply-to', function (message) { | ||
self.__on_result.apply(self, arguments) | ||
}, { | ||
exclusive: true, | ||
noAck: true | ||
}).then(send_message).then(null, console.warn) | ||
} else { | ||
send_message() | ||
} | ||
function send_message () { | ||
const correlation_id = uuid() | ||
self._results_callbacks[correlation_id] = { | ||
callback: callback, | ||
@@ -179,13 +141,15 @@ context: null, | ||
} | ||
options.mandatory = true | ||
options.replyTo = self._results_name, | ||
options.replyTo = 'amq.rabbitmq.reply-to' | ||
options.correlationId = correlation_id | ||
self._results_timeouts[correlation_id] = setTimeout(function () { | ||
if (!self._results_callback[correlation_id]) return | ||
delete self._results_callback[correlation_id] | ||
if (!self._results_callbacks[correlation_id]) { | ||
return | ||
} | ||
delete self._results_callbacks[correlation_id] | ||
delete self._results_timeouts[correlation_id] | ||
try { | ||
@@ -206,9 +170,28 @@ callback({ | ||
}, options.timeout || 5000) | ||
self._channel.publish('remit', event, new Buffer(JSON.stringify(args || {})), options) | ||
}) | ||
} | ||
}) | ||
}) | ||
} | ||
return correlation_id | ||
Remit.prototype.listen = function listen (event, callback, context, options) { | ||
const self = this | ||
if (!self._service_name) { | ||
throw new Error('Must provide a service name if listening') | ||
} | ||
if (!options) { | ||
options = {} | ||
} | ||
options.queueName = `${event}:emission:${self._service_name}:${++self._listener_count}` | ||
self.res.call(self, event, callback, context, options) | ||
} | ||
@@ -222,12 +205,11 @@ | ||
Remit.prototype.emit = function emit (event, args, options) { | ||
const debug = master_debug('remit.emit') | ||
const self = this | ||
if (!options) options = {} | ||
if (!options) { | ||
options = {} | ||
} | ||
options.broadcast = true | ||
options.autoDeleteCallback = options.ttl ? false : true | ||
debug('Emitting ', args) | ||
self.req.call(self, event, args, options.onResponse, options) | ||
@@ -242,13 +224,15 @@ } | ||
Remit.prototype.demit = function demit (event, delay, args, options) { | ||
const debug = master_debug('remit.demit') | ||
const self = this | ||
if (!options) options = {} | ||
if (!options) { | ||
options = {} | ||
} | ||
options.broadcast = true | ||
if (Object.prototype.toString.call(delay) === '[object Date]') options.timestamp = delay.getTime() | ||
options.autoDeleteCallback = options.ttl ? false : true | ||
debug ('Demitting ', args) | ||
if (Object.prototype.toString.call(delay) === '[object Date]') { | ||
options.timestamp = delay.getTime() | ||
} | ||
self.req.call(self, event, args, options.onResponse, options) | ||
@@ -262,11 +246,18 @@ } | ||
Remit.prototype.listen = function listen (event, callback, context, options) { | ||
Remit.prototype.treq = function treq (event, args, callback, options) { | ||
const self = this | ||
if (!self._service_name) throw new Error('Must provide a service name if listening') | ||
if (!options) options = {} | ||
options.queueName = `${event}:emission:${self._service_name}:${++self._listener_count}` | ||
self.res.call(self, event, callback, context, options) | ||
if (!options) { | ||
options = {} | ||
} | ||
if (!options.expiration) { | ||
options.expiration = 5000 | ||
} | ||
if (!options.timeout) { | ||
options.timeout = 5000 | ||
} | ||
self.req(event, args, callback, options) | ||
} | ||
@@ -280,53 +271,65 @@ | ||
Remit.prototype.__connect = function __connect (callback) { | ||
const debug = master_debug('remit.__connect') | ||
debug(`Checking connection`) | ||
const self = this | ||
if (!callback) callback = function () {} | ||
// If no callback was given, we still pretend there | ||
// is one. We use this to signify queue presence. | ||
if (!callback) { | ||
callback = function () {} | ||
} | ||
// If a connection already exists | ||
if (self._connection) { | ||
// If there are still callbacks being processed, | ||
// hop into the queue; no need to trigger it now. | ||
// Be British and get in line! | ||
if (self._connection_callbacks.length) { | ||
self._connection_callbacks.push(callback) | ||
return true | ||
return | ||
} | ||
debug(`Instantly hitting callback`) | ||
return callback(self.connection) | ||
// Otherwise we do need to trigger now. We missed | ||
// the queue. #awkward | ||
return callback() | ||
} | ||
// If we're here, a connection doesn't currently exist. | ||
// Now we check whether we're the first call to do this. | ||
// If we are, we'll be the ones to try and connect. | ||
const first = !self._connection_callbacks.length | ||
// Push our callback in to the queue, eh? | ||
self._connection_callbacks.push(callback) | ||
if (!first) return | ||
debug(`Connection not yet made; attemtping now...`) | ||
amqplib.connect(self._url, (err, con) => { | ||
if (err) throw new Error(err) | ||
debug(`Connected to ${self._url}`) | ||
self._connection = con | ||
self._connection.createChannel((err, channel) => { | ||
if (err) throw new Error(err) | ||
debug(`Channel created`) | ||
if (!first) { | ||
return | ||
} | ||
// So let's connect! | ||
amqplib.connect(self._url).then((connection) => { | ||
// Everything's go fine, so we'll set this global | ||
// object to our new connection. | ||
self._connection = connection | ||
// We now need to make a channel to communicate | ||
// through. | ||
self._connection.createChannel().then((channel) => { | ||
// Everything went awesome. Let's set our new | ||
// global channel. | ||
self._channel = channel | ||
// Time to run the callbacks. Let's grab them and | ||
// take them out of the queue. | ||
// TODO Remove these one at a time as we process | ||
// them. | ||
const callbacks = self._connection_callbacks | ||
self._connection_callbacks = [] | ||
debug(`Running ${callbacks.length} callbacks`) | ||
// Loop through and make everything happen! | ||
for (var i = 0; i < callbacks.length; i++) { | ||
callbacks[i](self._connection) | ||
callbacks[i]() | ||
} | ||
}) | ||
}) | ||
}).then(null, console.error) | ||
}).then(null, console.error) | ||
} | ||
@@ -341,35 +344,57 @@ | ||
const self = this | ||
const debug = master_debug('remit.__assert_exchange') | ||
if (!callback) callback = function () {} | ||
// If no callback was given, we still pretend there | ||
// is one. We use this to signify queue presence. | ||
if (!callback) { | ||
callback = function () {} | ||
} | ||
// If the exchange already exists | ||
if (self._exchange) { | ||
// If there are still callbacks being processed, | ||
// hop into the queue; no need to trigger it now. | ||
// Be British and get in line! | ||
if (self._exchange_callbacks.length) { | ||
self._exchange_callbacks.push(callback) | ||
return true | ||
return | ||
} | ||
debug(`Instantly hitting exchange callback`) | ||
// Otherwise we do need to trigger now. We missed | ||
// the queue. #awkward | ||
return callback() | ||
} | ||
// If we're here, an exchange doesn't currently exist. | ||
// Now we check whether we're the first call to do this. | ||
// If we are, we'll be the ones to try and connect. | ||
const first = !self._exchange_callbacks.length | ||
// Push our callback in to the queue, eh? | ||
self._exchange_callbacks.push(callback) | ||
if (!first) { | ||
return | ||
} | ||
// Let's try making this exchange! | ||
self._channel.assertExchange('remit', 'topic', { | ||
autoDelete: true | ||
}, (err, ok) => { | ||
if (err) throw new Error(err) | ||
}).then(() => { | ||
// Everything went awesome so we'll let everything | ||
// know that the exchange is up. | ||
self._exchange = true | ||
// Time to run any callbacks that were waiting on | ||
// this exchange being made. | ||
// TODO Remove these one at a time as we process | ||
// them. | ||
const callbacks = self._exchange_callbacks | ||
self._exchange_callbacks = [] | ||
for (var i = 0; i < callbacks.length; i++) { | ||
callbacks[i]() | ||
} | ||
}) | ||
}).then(null, console.error) | ||
} | ||
@@ -382,63 +407,14 @@ | ||
Remit.prototype.__on_result = function __on_result (message) { | ||
const self = this | ||
const debug = master_debug('remit.__on_result') | ||
if (!self._results_callback[message.properties.correlationId]) { | ||
// If we've got it when we don't need it, we'll at least `nack` it first | ||
return self._channel.nack(message, false, false) | ||
} | ||
const callback = self._results_callback[message.properties.correlationId] | ||
let args = [] | ||
debug(`Got result`, JSON.parse(message.content.toString())) | ||
let data = JSON.parse(message.content.toString()) | ||
if (!Array.isArray(data)) data = [data] | ||
delete self._results_timeouts[message.properties.correlationId] | ||
try { | ||
callback.callback.apply(callback.context, data) | ||
} catch (e) { | ||
self._channel.nack(message, false, false) | ||
delete self._results_callback[message.properties.correlationId] | ||
if (self.on_error) { | ||
self.on_error(e) | ||
} else { | ||
throw e | ||
} | ||
} | ||
self._channel.ack(message) | ||
delete self._results_callback[message.properties.correlationId] | ||
} | ||
Remit.prototype.__consume_res = function __consume_res (message, callback, context) { | ||
const debug = master_debug('remit.__consume_res') | ||
const self = this | ||
let data | ||
try { | ||
data = JSON.parse(message.content.toString()) | ||
} catch (e) { | ||
debug(`Failed to parse JSON data; NACKing`) | ||
return self._channel.nack(message, false, false) | ||
} | ||
debug(`Parsed JSON data`) | ||
if (!message.properties.correlationId || !message.properties.replyTo) { | ||
debug(`No callback found; acting as listener`) | ||
try { | ||
@@ -453,6 +429,7 @@ callback.call(context, data, function (err) { | ||
message.properties.headers = increment_headers(message.properties.headers) | ||
self._channel.publish(message.fields.exchange, message.fields.routingKey, message.content, message.properties) | ||
self._channel.publish('', message.properties.replyTo, message.content, message.properties) | ||
self._channel.ack(message) | ||
} | ||
if (self.on_error) { | ||
@@ -465,27 +442,15 @@ self.on_error(e) | ||
} else { | ||
debug(`Callback found; aiming to respond`) | ||
try { | ||
callback.call(context, data, function (err, data) { | ||
debug(`Received callback from responder`) | ||
const options = {correlationId: message.properties.correlationId} | ||
debug(`Publishing message to ${message.properties.replyTo}`) | ||
self._channel.publish('remit', message.properties.replyTo, new Buffer(JSON.stringify(Array.prototype.slice.call(arguments))), options) | ||
self._channel.publish('', message.properties.replyTo, new Buffer(JSON.stringify(Array.prototype.slice.call(arguments))), options) | ||
self._channel.ack(message) | ||
}) | ||
} catch (e) { | ||
debug(`Callback errored`) | ||
if (message.properties.headers && message.properties.headers.attempts && message.properties.headers.attempts > 4) { | ||
debug(`Run out of patience. NACKing...`) | ||
self._channel.nack(message, false, false) | ||
} else { | ||
debug(`Giving the message another chance...`) | ||
message.properties.headers = increment_headers(message.properties.headers) | ||
self._channel.publish(message.fields.exchange, message.fields.routingKey, message.content, message.properties) | ||
self._channel.publish('', message.properties.replyTo, message.content, message.properties) | ||
self._channel.ack(message) | ||
@@ -508,14 +473,48 @@ } | ||
Remit.prototype.__on_result = function __on_result (message) { | ||
const self = this | ||
const callback = self._results_callbacks[message.properties.correlationId] | ||
let data = JSON.parse(message.content.toString()) | ||
if (!Array.isArray(data)) data = [data] | ||
delete self._results_timeouts[message.properties.correlationId] | ||
try { | ||
callback.callback.apply(callback.context, data) | ||
} catch (e) { | ||
delete self._results_callbacks[message.properties.correlationId] | ||
if (self.on_error) { | ||
self.on_error(e) | ||
} else { | ||
throw e | ||
} | ||
} | ||
delete self._results_callbacks[message.properties.correlationId] | ||
} | ||
function increment_headers (headers) { | ||
if (!headers) return {attempts: 1} | ||
if (!headers) { | ||
return { | ||
attempts: 1 | ||
} | ||
} | ||
if (!headers.attempts) { | ||
headers.attempts = 1 | ||
return headers | ||
} | ||
headers.attempts = parseInt(headers.attempts) + 1 | ||
return headers | ||
} |
{ | ||
"name": "remit", | ||
"version": "1.1.7", | ||
"version": "1.2.0", | ||
"description": "A small set of functionality used to create microservices that don't need to be aware of one-another's existence.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -75,3 +75,3 @@ # What's Remit? | ||
To use `remit` you'll need: | ||
* A _RabbitMQ_ server | ||
* A _RabbitMQ_ server (_Remit_ `1.2.0+` requires `>=3.4.0`) | ||
* _Node v4.x.x_ | ||
@@ -417,3 +417,3 @@ * _npm_ | ||
* Connection retrying when losing connection to the AMQ | ||
* Use promises instead of callbacks | ||
* ~~Use promises instead of callbacks~~ | ||
* Warnings for duplicate `req` subscriptions | ||
@@ -420,0 +420,0 @@ * ~~Better handling of `req` timeouts~~ |
31056
346