Comparing version 1.4.0 to 1.4.1
406
index.js
@@ -27,3 +27,5 @@ 'use strict' | ||
this._connection = opts.connection || null | ||
this._channel = opts.channel || null | ||
this._consume_channel = null | ||
this._publish_channel = null | ||
this._work_channel = null | ||
this._exchange = opts.exchange || null | ||
@@ -34,2 +36,5 @@ | ||
this._exchange_callbacks = [] | ||
this._consume_channel_callbacks = [] | ||
this._publish_channel_callbacks = [] | ||
this._work_channel_callbacks = [] | ||
@@ -71,28 +76,32 @@ // Callback trackers | ||
// TODO Check this for a valid response | ||
self._channel.assertQueue(chosen_queue, { | ||
exclusive: false, | ||
durable: true, | ||
autoDelete: false | ||
self.__use_consume_channel(() => { | ||
// TODO Check this for a valid response | ||
self._consume_channel.assertQueue(chosen_queue, { | ||
exclusive: false, | ||
durable: true, | ||
autoDelete: false | ||
}) | ||
}) | ||
self._channel.bindQueue(chosen_queue, 'remit', event).then(() => { | ||
self._channel.consume(chosen_queue, (message) => { | ||
if (!message.properties.timestamp) { | ||
self.__consume_res(message, callbacks, context) | ||
} else { | ||
const time_to_wait = parseInt(message.properties.timestamp - new Date().getTime()) | ||
if (time_to_wait <= 0) { | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.bindQueue(chosen_queue, 'remit', event).then(() => { | ||
self._consume_channel.consume(chosen_queue, (message) => { | ||
if (!message.properties.timestamp) { | ||
self.__consume_res(message, callbacks, context) | ||
} else { | ||
setTimeout(() => { | ||
const time_to_wait = parseInt(message.properties.timestamp - new Date().getTime()) | ||
if (time_to_wait <= 0) { | ||
self.__consume_res(message, callbacks, context) | ||
}, time_to_wait) | ||
} else { | ||
setTimeout(() => { | ||
self.__consume_res(message, callbacks, context) | ||
}, time_to_wait) | ||
} | ||
} | ||
} | ||
}, { | ||
exclusive: false | ||
}) | ||
}).then(null, console.error) | ||
}, { | ||
exclusive: false | ||
}) | ||
}).then(null, console.error) | ||
}) | ||
}) | ||
@@ -127,3 +136,5 @@ }) | ||
if (!callback) { | ||
return self._channel.publish('remit', event, new Buffer(JSON.stringify(args || {})), options) | ||
return self.__use_publish_channel(() => { | ||
self._publish_channel.publish('remit', event, new Buffer(JSON.stringify(args || {})), options) | ||
}) | ||
} | ||
@@ -133,9 +144,11 @@ | ||
self._consuming_results = true | ||
self._channel.consume('amq.rabbitmq.reply-to', function (message) { | ||
self.__on_result.apply(self, arguments) | ||
}, { | ||
exclusive: true, | ||
noAck: true | ||
}).then(send_message).then(null, console.warn) | ||
self.__use_publish_channel(() => { | ||
self._publish_channel.consume('amq.rabbitmq.reply-to', function (message) { | ||
self.__on_result.apply(self, arguments) | ||
}, { | ||
exclusive: true, | ||
noAck: true | ||
}).then(send_message).then(null, console.warn) | ||
}) | ||
} else { | ||
@@ -182,3 +195,5 @@ send_message() | ||
self._channel.publish('remit', event, new Buffer(JSON.stringify(args || {})), options) | ||
self.__use_publish_channel(() => { | ||
self._publish_channel.publish('remit', event, new Buffer(JSON.stringify(args || {})), options) | ||
}) | ||
} | ||
@@ -338,24 +353,184 @@ }) | ||
self._connection = connection | ||
// Time to run the callbacks. Let's grab them and | ||
// take them out of the queue. | ||
// We now need to make a channel to communicate | ||
// through. | ||
// TODO Remove these one at a time as we process | ||
// them. | ||
const callbacks = self._connection_callbacks | ||
self._connection_callbacks = [] | ||
// Loop through and make everything happen! | ||
for (var i = 0; i < callbacks.length; i++) { | ||
callbacks[i]() | ||
} | ||
}).then(null, console.error) | ||
} | ||
Remit.prototype.__use_consume_channel = function __use_consume_channel (callback) { | ||
const self = this | ||
if (!callback) { | ||
callback = function () {} | ||
} | ||
if (self._consume_channel) { | ||
if (self._consume_channel_callbacks.length) { | ||
self._consume_channel_callbacks.push(callback) | ||
return | ||
} | ||
return callback() | ||
} | ||
const first = !self._consume_channel_callbacks.length | ||
self._consume_channel_callbacks.push(callback) | ||
if (!first) { | ||
return | ||
} | ||
self.__connect(() => { | ||
self._connection.createChannel().then((channel) => { | ||
// Everything went awesome. Let's set our new | ||
// global channel. | ||
self._channel = channel | ||
channel.on('error', (err) => { | ||
self._consume_channel = null | ||
self.__use_consume_channel() | ||
}) | ||
channel.on('close', () => { | ||
self._consume_channel = null | ||
self.__use_consume_channel() | ||
}) | ||
self._consume_channel = channel | ||
const callbacks = self._consume_channel_callbacks | ||
self._consume_channel_callbacks = [] | ||
const len = callbacks.length | ||
// Time to run the callbacks. Let's grab them and | ||
// take them out of the queue. | ||
for (var i = 0; i < len; i++) { | ||
callbacks[0]() | ||
callbacks.shift() | ||
} | ||
}) | ||
}) | ||
} | ||
Remit.prototype.__use_publish_channel = function __use_publish_channel (callback) { | ||
const self = this | ||
if (!callback) { | ||
callback = function () {} | ||
} | ||
if (self._publish_channel) { | ||
if (self._publish_channel_callbacks.length) { | ||
self._publish_channel_callbacks.push(callback) | ||
return | ||
} | ||
return callback() | ||
} | ||
const first = !self._publish_channel_callbacks.length | ||
self._publish_channel_callbacks.push(callback) | ||
if (!first) { | ||
return | ||
} | ||
self.__connect(() => { | ||
self._connection.createChannel().then((channel) => { | ||
channel.on('error', (err) => { | ||
self._publish_channel = null | ||
self.__use_publish_channel() | ||
}) | ||
channel.on('close', () => { | ||
self._publish_channel = null | ||
self.__use_publish_channel() | ||
}) | ||
self._publish_channel = channel | ||
const callbacks = self._publish_channel_callbacks | ||
self._publish_channel_callbacks = [] | ||
const len = callbacks.length | ||
// TODO Remove these one at a time as we process | ||
// them. | ||
const callbacks = self._connection_callbacks | ||
self._connection_callbacks = [] | ||
for (var i = 0; i < len; i++) { | ||
callbacks[0]() | ||
callbacks.shift() | ||
} | ||
}) | ||
}) | ||
} | ||
Remit.prototype.__use_work_channel = function __use_work_channel (callback) { | ||
const self = this | ||
if (!callback) { | ||
callback = function () {} | ||
} | ||
if (self._work_channel) { | ||
if (self._work_channel_callbacks.length) { | ||
self._work_channel_callbacks.push(callback) | ||
return | ||
} | ||
return callback() | ||
} | ||
const first = !self._work_channel_callbacks.length | ||
self._work_channel_callbacks.push(callback) | ||
if (!first) { | ||
return | ||
} | ||
self.__connect(() => { | ||
self._connection.createChannel().then((channel) => { | ||
channel.on('error', (err) => { | ||
self._work_channel = null | ||
self.__use_work_channel() | ||
}) | ||
channel.on('close', () => { | ||
self._work_channel = null | ||
self.__use_work_channel() | ||
}) | ||
self._work_channel = channel | ||
const callbacks = self._work_channel_callbacks | ||
self._work_channel_callbacks = [] | ||
const len = callbacks.length | ||
// Loop through and make everything happen! | ||
for (var i = 0; i < callbacks.length; i++) { | ||
callbacks[i]() | ||
for (var i = 0; i < len; i++) { | ||
callbacks[0]() | ||
callbacks.shift() | ||
} | ||
}).then(null, console.error) | ||
}).then(null, console.error) | ||
}) | ||
}) | ||
} | ||
@@ -406,21 +581,23 @@ | ||
// Let's try making this exchange! | ||
self._channel.assertExchange('remit', 'topic', { | ||
autoDelete: true | ||
}).then(() => { | ||
// Everything went awesome so we'll let everything | ||
// know that the exchange is up. | ||
self._exchange = true | ||
// Time to run any callbacks that were waiting on | ||
// this exchange being made. | ||
// TODO Remove these one at a time as we process | ||
// them. | ||
const callbacks = self._exchange_callbacks | ||
self._exchange_callbacks = [] | ||
for (var i = 0; i < callbacks.length; i++) { | ||
callbacks[i]() | ||
} | ||
}).then(null, console.error) | ||
self.__use_work_channel(() => { | ||
self._work_channel.assertExchange('remit', 'topic', { | ||
autoDelete: true | ||
}).then(() => { | ||
// Everything went awesome so we'll let everything | ||
// know that the exchange is up. | ||
self._exchange = true | ||
// Time to run any callbacks that were waiting on | ||
// this exchange being made. | ||
// TODO Remove these one at a time as we process | ||
// them. | ||
const callbacks = self._exchange_callbacks | ||
self._exchange_callbacks = [] | ||
for (var i = 0; i < callbacks.length; i++) { | ||
callbacks[i]() | ||
} | ||
}).then(null, console.error) | ||
}) | ||
} | ||
@@ -441,3 +618,5 @@ | ||
} catch (e) { | ||
return self._channel.nack(message, false, false) | ||
return self.__use_consume_channel(() => { | ||
self._consume_channel.nack(message, false, false) | ||
}) | ||
} | ||
@@ -453,3 +632,5 @@ | ||
function done (err, data) { | ||
self._channel.ack(message) | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.ack(message) | ||
}) | ||
} | ||
@@ -461,8 +642,33 @@ | ||
if (message.properties.headers && message.properties.headers.attempts && message.properties.headers.attempts > 4) { | ||
self._channel.nack(message, false, false) | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.nack(message, false, false) | ||
}) | ||
} else { | ||
message.properties.headers = increment_headers(message.properties.headers) | ||
self._channel.publish('', message.properties.replyTo, message.content, message.properties) | ||
self._channel.ack(message) | ||
function check_and_republish() { | ||
self.__use_work_channel(() => { | ||
self._work_channel.checkQueue(message.properties.replyTo).then(() => { | ||
self.__use_publish_channel(() => { | ||
self._publish_channel.publish('', message.properties.replyTo, message.content, message.properties) | ||
}) | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.ack(message) | ||
}) | ||
}).then(null, (err) => { | ||
// If we got a proper queue error then the queue must | ||
// just not be around. | ||
if (err.message.substr(0, 16) === 'Operation failed') { | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.nack(message, false, false) | ||
}) | ||
} else { | ||
check_and_republish() | ||
} | ||
}) | ||
}) | ||
} | ||
check_and_republish() | ||
} | ||
@@ -479,5 +685,29 @@ | ||
const options = {correlationId: message.properties.correlationId} | ||
const res_data = new Buffer(JSON.stringify(Array.prototype.slice.call(arguments))) | ||
self._channel.publish('', message.properties.replyTo, new Buffer(JSON.stringify(Array.prototype.slice.call(arguments))), options) | ||
self._channel.ack(message) | ||
function check_and_publish () { | ||
self.__use_work_channel(() => { | ||
self._work_channel.checkQueue(message.properties.replyTo).then(() => { | ||
self.__use_publish_channel(() => { | ||
self._publish_channel.publish('', message.properties.replyTo, res_data, options) | ||
}) | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.ack(message) | ||
}) | ||
}).then(null, (err) => { | ||
// If we got a proper queue error then the queue must | ||
// just not be around. | ||
if (err.message.substr(0, 16) === 'Operation failed') { | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.nack(message, false, false) | ||
}) | ||
} else { | ||
check_and_publish() | ||
} | ||
}) | ||
}) | ||
} | ||
check_and_publish() | ||
} | ||
@@ -489,7 +719,33 @@ | ||
if (message.properties.headers && message.properties.headers.attempts && message.properties.headers.attempts > 4) { | ||
self._channel.nack(message, false, false) | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.nack(message, false, false) | ||
}) | ||
} else { | ||
message.properties.headers = increment_headers(message.properties.headers) | ||
self._channel.publish('', message.properties.replyTo, message.content, message.properties) | ||
self._channel.ack(message) | ||
function check_and_republish () { | ||
self.__use_work_channel(() => { | ||
self._work_channel.checkQueue(message.properties.replyTo).then(() => { | ||
self.__use_publish_channel(() => { | ||
self._publish_channel.publish('', message.properties.replyTo, message.content, message.properties) | ||
}) | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.ack(message) | ||
}) | ||
}).then(null, (err) => { | ||
// If we got a proper queue error then the queue must | ||
// just not be around. | ||
if (err.message.substr(0, 16) === 'Operation failed') { | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.nack(message, false, false) | ||
}) | ||
} else { | ||
check_and_republish() | ||
} | ||
}) | ||
}) | ||
} | ||
check_and_republish() | ||
} | ||
@@ -496,0 +752,0 @@ |
{ | ||
"name": "remit", | ||
"version": "1.4.0", | ||
"version": "1.4.1", | ||
"description": "A small set of functionality used to create microservices that don't need to be aware of one-another's existence.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -33,5 +33,5 @@ 'use strict' | ||
throw new Error('Dummy') | ||
// throw new Error('Dummy') | ||
return done(null, 'here is the reply') | ||
}) |
@@ -8,2 +8,10 @@ 'use strict' | ||
// const log = require('remit-bunyan')('test-service', remit) | ||
// setTimeout(function () { | ||
// throw new Error('Balls') | ||
// }, 2000) | ||
console.log('Okay') | ||
remit.req('test.replies', { | ||
@@ -10,0 +18,0 @@ username: 'jack' |
42043
641