Comparing version 1.0.9 to 1.1.0
487
index.js
'use strict' | ||
const amqp = require('amqp') | ||
const debug = require('debug')('remit') | ||
const master_debug = require('debug') | ||
const uuid = require('node-uuid').v4 | ||
const amqplib = require('amqplib/callback_api') | ||
const os = require('os') | ||
@@ -12,2 +12,7 @@ | ||
function Remit (opts) { | ||
@@ -21,2 +26,3 @@ if (!opts) opts = {} | ||
this._connection_callbacks = [] | ||
this._channel = opts.channel || null | ||
this._emit_on_error = opts.emit_on_error || null | ||
@@ -34,2 +40,4 @@ | ||
this._listener_count = 0 | ||
this._events = {} | ||
@@ -40,72 +48,113 @@ | ||
Remit.prototype.on_error = null | ||
Remit.prototype.res = function res (event, callback, context, options) { | ||
debug('res') | ||
const debug = master_debug('remit.res') | ||
if (this._events[event]) return false | ||
if (!options) options = {} | ||
let self = this | ||
debug(`Creating "${event}" endpoint`) | ||
this.__connect(function () { | ||
self._connection.queue(options.queueName || event, { | ||
exclusive: false, | ||
durable: true, | ||
autoDelete: false | ||
}, function (queue) { | ||
debug(queue.name) | ||
const self = this | ||
self._events[event] = { queue } | ||
self.__connect(() => { | ||
self.__assert_exchange(() => { | ||
const chosen_queue = options.queueName || event | ||
debug(self._events) | ||
self._channel.assertQueue(chosen_queue, { | ||
exclusive: false, | ||
durable: true, | ||
autoDelete: false | ||
}) | ||
queue.subscribe({ | ||
ack: true, | ||
prefetchCount: 0 | ||
}, function (message, headers, deliveryInfo, messageObject) { | ||
let event_info = { | ||
event: messageObject.routingKey, | ||
exchange: messageObject.exchange, | ||
contentType: messageObject.contentType, | ||
size: messageObject.size | ||
} | ||
self._channel.bindQueue(chosen_queue, 'remit', event, {}, (err, ok) => { | ||
if (err) throw new Error(err) | ||
debug(event_info) | ||
self._channel.consume(chosen_queue, (message) => { | ||
debug(`New message`) | ||
if (!messageObject.correlationId || !messageObject.replyTo) { | ||
let data | ||
try { | ||
messageObject.acknowledge() | ||
data = JSON.parse(message.content.toString()) | ||
} catch (e) { | ||
debug(`Failed to parse JSON data; NACKing`) | ||
callback(context, message, null, event_info) | ||
} catch (e) { | ||
if (self.on_error) { | ||
self.on_error(e) | ||
} else { | ||
throw new Error(e) | ||
} | ||
return self._channel.nack(message, false, false) | ||
} | ||
} else { | ||
try { | ||
callback.call(context, message, function (err, data) { | ||
let options = {correlationId: messageObject.correlationId} | ||
self._exchange.publish(messageObject.replyTo, Array.prototype.slice.call(arguments), options) | ||
debug(`Parsed JSON data`) | ||
messageObject.acknowledge() | ||
}) | ||
} catch (e) { | ||
if (self.on_error) { | ||
self.on_error(e) | ||
} else { | ||
throw new Error(e) | ||
if (!message.properties.correlationId || !message.properties.replyTo) { | ||
debug(`No callback found; acting as listener`) | ||
try { | ||
callback(data) | ||
self._channel.ack(message) | ||
} catch (e) { | ||
if (message.properties.headers && message.properties.headers.attempts && message.properties.headers.attempts > 4) { | ||
self._channel.nack(message, false, false) | ||
} else { | ||
message.properties.headers = increment_headers(message.properties.headers) | ||
self._channel.publish(message.fields.exchange, message.fields.routingKey, message.content, message.properties) | ||
self._channel.ack(message) | ||
} | ||
if (self.on_error) { | ||
self.on_error(e) | ||
} else { | ||
throw new Error(e) | ||
} | ||
} | ||
} else { | ||
debug(`Callback found; aiming to respond`) | ||
try { | ||
callback.call(context, data, function (err, data) { | ||
debug(`Received callback from responder`) | ||
const options = {correlationId: message.properties.correlationId} | ||
debug(`Publishing message to ${message.properties.replyTo}`) | ||
self._channel.publish('remit', message.properties.replyTo, new Buffer(JSON.stringify(Array.prototype.slice.call(arguments))), options) | ||
self._channel.ack(message) | ||
}) | ||
} catch (e) { | ||
debug(`Callback errored`) | ||
if (message.properties.headers && message.properties.headers.attempts && message.properties.headers.attempts > 4) { | ||
debug(`Run out of patience. NACKing...`) | ||
self._channel.nack(message, false, false) | ||
} else { | ||
debug(`Giving the message another chance...`) | ||
message.properties.headers = increment_headers(message.properties.headers) | ||
self._channel.publish(message.fields.exchange, message.fields.routingKey, message.content, message.properties) | ||
self._channel.ack(message) | ||
} | ||
if (self.on_error) { | ||
self.on_error(e) | ||
} else { | ||
throw new Error(e) | ||
} | ||
} | ||
} | ||
} | ||
}, { | ||
exclusive: false | ||
}) | ||
}) | ||
self.__assert_exchange(function () { | ||
queue.bind(self._exchange, event) | ||
}) | ||
}) | ||
@@ -115,81 +164,91 @@ }) | ||
Remit.prototype.req = function req (event, args, callback, options) { | ||
debug('req') | ||
const debug = master_debug('remit.req') | ||
if (!options) options = {} | ||
let self = this | ||
let correlation_id = uuid() | ||
const self = this | ||
const correlation_id = uuid() | ||
this.__connect(function () { | ||
if (!callback) { | ||
self.__assert_exchange(function () { | ||
debug(event) | ||
self.__connect(() => { | ||
self.__assert_exchange(() => { | ||
if (!callback) { | ||
debug(`No callback specified for req; publishing "${event}" message now`) | ||
self._exchange.publish(event, args || {}, {}) | ||
return self._channel.publish('remit', event, new Buffer(JSON.stringify(args || {})), {}) | ||
} | ||
self._results_name = `${this._service_name}:callback:${os.hostname()}:${process.pid}` | ||
self._channel.assertQueue(self._results_name, { | ||
exclusive: false, | ||
durable: false, | ||
autoDelete: true | ||
}) | ||
} else { | ||
self.__assert_exchange(function () { | ||
debug('GOT EXCHANGE') | ||
self.__assert_result_queue(function () { | ||
debug('GOT RESULT QUEUE') | ||
self._channel.bindQueue(self._results_name, 'remit', self._results_name, {}, (err, ok) => { | ||
if (err) throw new Error(err) | ||
self._results_callback[correlation_id] = { | ||
callback, | ||
context: null, | ||
autoDeleteCallback: true | ||
} | ||
if (!self._results_queue) { | ||
self._results_queue = true | ||
options.mandatory = true | ||
options.replyTo = self._results_name | ||
options.correlationId = correlation_id | ||
self._channel.consume(self._results_name, function (message) { | ||
self.__on_result.apply(self, arguments) | ||
}) | ||
} | ||
debug('Publishing ' + event + ' with ::', args) | ||
self._results_callback[correlation_id] = { | ||
callback: callback, | ||
context: null, | ||
autoDeleteCallback: true | ||
} | ||
self._results_timeouts[correlation_id] = setTimeout(function () { | ||
if (self._results_callback[correlation_id]) { | ||
debug('Timing out') | ||
options.mandatory = true | ||
options.replyTo = self._results_name, | ||
options.correlationId = correlation_id | ||
delete self._results_callback[correlation_id] | ||
delete self._results_timeouts[correlation_id] | ||
self._results_timeouts[correlation_id] = setTimeout(function () { | ||
if (!self._results_callback[correlation_id]) return | ||
try { | ||
callback({ | ||
event, | ||
args, | ||
options, | ||
message: 'Timed out after no response for ' + (options.timeout || 5000) + 'ms' | ||
}) | ||
} catch (e) { | ||
if (self.on_error) { | ||
self.on_error(e) | ||
} else { | ||
throw new Error(e) | ||
} | ||
} | ||
delete self._results_callback[correlation_id] | ||
delete self._results_timeouts[correlation_id] | ||
try { | ||
callback({ | ||
event: event, | ||
args: args, | ||
options: options, | ||
message: `Timed out after no response for ${options.timeout || 5000}ms` | ||
}) | ||
} catch (e) { | ||
if (self.on_error) { | ||
self.on_error(e) | ||
} else { | ||
throw new Error(err) | ||
} | ||
}, options.timeout || 5000) | ||
} | ||
}, options.timeout || 5000) | ||
self._exchange.publish(event, args, options, function (err) { | ||
if (err) { | ||
delete self._results_callback[correlation_id] | ||
callback(err) | ||
} | ||
}) | ||
}) | ||
self._channel.publish('remit', event, new Buffer(JSON.stringify(args || {})), options) | ||
}) | ||
} | ||
}) | ||
}) | ||
debug('Returning ' + correlation_id) | ||
return correlation_id | ||
} | ||
Remit.prototype.emit = function emit (event, args, options) { | ||
debug('emit') | ||
const debug = master_debug('remit.emit') | ||
let self = this | ||
const self = this | ||
if (!options) options = {} | ||
@@ -200,104 +259,97 @@ | ||
let correlation_id = this.req.call(self, event, args, options.onResponse, options.context, options) | ||
debug('Emitting ', args) | ||
self.req.call(self, event, args, options.onResponse, options.context, options) | ||
} | ||
Remit.prototype.listen = function listen (event, callback, context, options) { | ||
if (!this._service_name) { | ||
console.log('MUST PROVIDE A SERVICE NAME IF LISTENING') | ||
const self = this | ||
process.exit(2) | ||
} | ||
if (!self._service_name) throw new Error('Must provide a service name if listening') | ||
if (!options) options = {} | ||
options.queueName = `emission:${this._service_name}:${event}` | ||
options.queueName = `${event}:emission:${self._service_name}:${++self._listener_count}` | ||
debug('NAME INC') | ||
debug(options.queueName) | ||
return this.res.call(this, event, callback, context, options) | ||
self.res.call(self, event, callback, context, options) | ||
} | ||
Remit.prototype.__connect = function __connect (callback) { | ||
debug('__connect') | ||
const debug = master_debug('remit.__connect') | ||
debug(`Checking connection`) | ||
const self = this | ||
if (!callback) callback = function () {} | ||
if (self._connection) { | ||
if (self._connection_callbacks.length) { | ||
self._connection_callbacks.push(callback) | ||
if (this._connection) { | ||
if (this._connection_callbacks.length) { | ||
this._connection_callbacks.push(callback) | ||
return true | ||
} | ||
return callback(this._connection) | ||
} else { | ||
this._connection = null | ||
debug(`Instantly hitting callback`) | ||
return callback(self.connection) | ||
} | ||
let self = this | ||
const first = !self._connection_callbacks.length | ||
self._connection_callbacks.push(callback) | ||
this._connection_callbacks.push(callback) | ||
if (!first) return | ||
this._connection = amqp.createConnection({ | ||
url: this._url | ||
}) | ||
debug(`Connection not yet made; attemtping now...`) | ||
this._connection.on('ready', function () { | ||
debug('Connected to AMQP') | ||
amqplib.connect(self._url, (err, con) => { | ||
if (err) throw new Error(err) | ||
let callbacks = self._connection_callbacks | ||
self._connection_callbacks = [] | ||
debug(`Connected to ${self._url}`) | ||
for (var i = 0; i < callbacks.length; i++) { | ||
callbacks[i](self._connection) | ||
} | ||
}) | ||
self._connection = con | ||
this._connection.on('error', function (err) { | ||
throw new Error(err) | ||
}) | ||
} | ||
self._connection.createChannel((err, channel) => { | ||
if (err) throw new Error(err) | ||
Remit.prototype.__assert_exchange = function __assert_exchange (callback) { | ||
debug('__assert_exchange') | ||
debug(`Channel created`) | ||
if (!callback) callback = function () {} | ||
self._channel = channel | ||
if (this._exchange) { | ||
if (this._exchange_callbacks.length) { | ||
this._exchange_callbacks.push(callback) | ||
const callbacks = self._connection_callbacks | ||
self._connection_callbacks = [] | ||
return true | ||
} | ||
debug(`Running ${callbacks.length} callbacks`) | ||
return callback(this._exchange) | ||
} | ||
for (var i = 0; i < callbacks.length; i++) { | ||
callbacks[i](self._connection) | ||
} | ||
}) | ||
}) | ||
} | ||
let self = this | ||
this._exchange_callbacks.push(callback) | ||
this._exchange = this._connection.exchange(this._exchange_name, { | ||
autoDelete: true | ||
}, function (exchange) { | ||
debug('Exchange ' + self._exchange_name + ' open') | ||
let callbacks = self._exchange_callbacks | ||
self._exchange_callbacks = [] | ||
for (var i = 0; i < callbacks.length; i++) { | ||
callbacks[i](self._exchange) | ||
} | ||
}) | ||
} | ||
Remit.prototype.__assert_result_queue = function __assert_result_queue (callback) { | ||
debug('__assert_result_queue') | ||
Remit.prototype.__assert_exchange = function __assert_exchange (callback) { | ||
const self = this | ||
const debug = master_debug('remit.__assert_exchange') | ||
if (!callback) callback = function () {} | ||
if (this._results_queue) { | ||
if (this._results_callbacks.length) { | ||
this._results_callbacks.push(callback) | ||
if (self._exchange) { | ||
if (self._exchange_callbacks.length) { | ||
self._exchange_callbacks.push(callback) | ||
@@ -307,59 +359,52 @@ return true | ||
return callback(this._results_queue) | ||
debug(`Instantly hitting exchange callback`) | ||
return callback() | ||
} | ||
let self = this | ||
self._exchange_callbacks.push(callback) | ||
this._results_name = this.__generate_queue_name('callback') | ||
this._results_callbacks.push(callback) | ||
self._channel.assertExchange('remit', 'topic', { | ||
autoDelete: true | ||
}, (err, ok) => { | ||
if (err) throw new Error(err) | ||
debug(this._results_name) | ||
self._exchange = true | ||
this.__assert_exchange(function () { | ||
self._results_queue = self._connection.queue(self._results_name, { | ||
exclusive: false, | ||
durable: false, | ||
autoDelete: true | ||
}, function (queue) { | ||
debug('Callback queue is open') | ||
const callbacks = self._exchange_callbacks | ||
self._exchange_callbacks = [] | ||
queue.subscribe(function () { | ||
self.__on_result.apply(self, arguments) | ||
}) | ||
for (var i = 0; i < callbacks.length; i++) { | ||
callbacks[i]() | ||
} | ||
}) | ||
} | ||
queue.bind(self._exchange, self._results_name) | ||
let callbacks = self._results_callbacks | ||
self._results_callbacks = [] | ||
for (var i = 0; i < callbacks.length; i++) { | ||
callbacks[i](queue) | ||
} | ||
}) | ||
}) | ||
} | ||
Remit.prototype.__on_result = function __on_result (msg, headers, info) { | ||
debug('__on_result') | ||
if (!this._results_callback[info.correlationId]) return | ||
let self = this | ||
Remit.prototype.__on_result = function __on_result (message) { | ||
const self = this | ||
const debug = master_debug('remit.__on_result') | ||
let callback = this._results_callback[info.correlationId] | ||
if (!self._results_callback[message.properties.correlationId]) return | ||
const callback = self._results_callback[message.properties.correlationId] | ||
let args = [] | ||
if (Array.isArray(msg)) { | ||
for (var i = 0; i < msg.length; i++) { | ||
args.push(msg[i]) | ||
} | ||
} else { | ||
args.push(msg) | ||
} | ||
debug(`Got result`, JSON.parse(message.content.toString())) | ||
delete this._results_timeouts[info.correlationId] | ||
let data = JSON.parse(message.content.toString()) | ||
if (!Array.isArray(data)) data = [data] | ||
delete self._results_timeouts[message.properties.correlationId] | ||
try { | ||
callback.callback.apply(callback.context, args) | ||
callback.callback.apply(callback.context, data) | ||
} catch (e) { | ||
self._channel.nack(message, false, false) | ||
delete self._results_callback[message.properties.correlationId] | ||
if (self.on_error) { | ||
@@ -372,13 +417,23 @@ self.on_error(e) | ||
// if (callback.autoDeleteCallback !== false) delete this._results_callback[info.correlationId] | ||
delete this._results_callback[info.correlationId] | ||
self._channel.ack(message) | ||
delete self._results_callback[message.properties.correlationId] | ||
} | ||
if (this._emit_on_error && args[0]) this.emit(this._emit_on_error, args[0]) | ||
debug(this._results_callback) | ||
debug(this._results_timeouts) | ||
} | ||
Remit.prototype.__generate_queue_name = function (type) { | ||
return `${this._service_name}:${os.hostname()}:pid${process.pid}:${type}` | ||
function increment_headers (headers) { | ||
if (!headers) return {attempts: 1} | ||
if (!headers.attempts) { | ||
headers.attempts = 1 | ||
return headers | ||
} | ||
headers.attempts = parseInt(headers.attempts) + 1 | ||
return headers | ||
} |
{ | ||
"name": "remit", | ||
"version": "1.0.9", | ||
"version": "1.1.0", | ||
"description": "`remit` is intended to be a small set of functionality used to create simple microservices that don't need to be aware of one-another's existence.", | ||
@@ -12,6 +12,6 @@ "main": "index.js", | ||
"dependencies": { | ||
"amqp": "^0.2.4", | ||
"amqplib": "^0.4.0", | ||
"debug": "^2.2.0", | ||
"node-uuid": "^1.4.3" | ||
"node-uuid": "^1.4.7" | ||
} | ||
} |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
18365
274
1
+ Addedamqplib@^0.4.0
+ Addedamqplib@0.4.2(transitive)
+ Addedbitsyntax@0.0.4(transitive)
+ Addedbuffer-more-ints@0.0.2(transitive)
+ Addedcore-util-is@1.0.3(transitive)
+ Addedinherits@2.0.4(transitive)
+ Addedisarray@0.0.1(transitive)
+ Addedreadable-stream@1.1.14(transitive)
+ Addedstring_decoder@0.10.31(transitive)
+ Addedwhen@3.6.4(transitive)
- Removedamqp@^0.2.4
- Removedamqp@0.2.7(transitive)
- Removedlodash@4.17.21(transitive)
Updatednode-uuid@^1.4.7