Comparing version 1.6.0 to 2.0.0-alpha.2
895
index.js
@@ -1,867 +0,68 @@ | ||
'use strict' | ||
const EventEmitter = require('eventemitter3') | ||
const Request = require('./lib/Request') | ||
const Response = require('./lib/Response') | ||
const aliases = require('./resources/aliases') | ||
const connect = require('./lib/assertions/connection') | ||
const os = require('os') | ||
const uuid = require('uuid').v4 | ||
const trace = require('stack-trace') | ||
const amqplib = require('amqplib/callback_api') | ||
function Remit (options) { | ||
options = options || {} | ||
module.exports = function (opts) { | ||
return new Remit(opts) | ||
} | ||
this._emitter = new EventEmitter() | ||
this._options = { | ||
exchange: options.exchange || 'remit', | ||
name: options.name || process.env.REMIT_NAME || '', | ||
url: options.url || process.env.REMIT_URL || 'amqp://localhost' | ||
} | ||
this._internal = { | ||
listenerCount: 0 | ||
} | ||
this.request = Request.apply(this, [{ | ||
expectReply: true | ||
}]) | ||
this.emit = Request.apply(this, [{ | ||
expectReply: false | ||
}]) | ||
this.delayedEmit = Request.apply(this, [{ | ||
expectReply: false | ||
}]) | ||
function Remit (opts) { | ||
if (!opts) opts = {} | ||
this.respond = Response.apply(this, [{ | ||
shouldAck: false, | ||
shouldReply: true | ||
}]) | ||
// Exposed items | ||
this._service_name = opts.name || '' | ||
this._url = opts.url || 'amqp://localhost' | ||
this._trace = opts.trace === false ? false : true | ||
this._exchange_name = opts.exchange || 'remit' | ||
this.respondQueue = Response.apply(this, [{ | ||
shouldAck: true, | ||
shouldReply: true | ||
}]) | ||
// Global items | ||
this._connection = opts.connection || null | ||
this._consume_channel = null | ||
this._publish_channel = null | ||
this._work_channel = null | ||
this._exchange = null | ||
this.listen = Response.apply(this, [{ | ||
shouldAck: true, | ||
shouldReply: false, | ||
// Callback queues | ||
this._connection_callbacks = [] | ||
this._exchange_callbacks = [] | ||
this._consume_channel_callbacks = [] | ||
this._publish_channel_callbacks = [] | ||
this._work_channel_callbacks = [] | ||
before: (remit, options) => { | ||
options.queue = options.queue || `${options.event}:l:${remit._options.name}:${++remit._internal.listenerCount}` | ||
// Callback trackers | ||
this._results_callbacks = {} | ||
this._results_timeouts = {} | ||
// States | ||
this._consuming_results = false | ||
this._listener_counts = {} | ||
return this | ||
} | ||
Remit.prototype.on_error = null | ||
Remit.prototype.version = require('./package.json').version | ||
Remit.prototype.res = function res (event, callbacks, context, options) { | ||
const self = this | ||
// Set up default options if we haven't been given any. | ||
if (!options) { | ||
options = {} | ||
return options | ||
} | ||
}]) | ||
self.__connect(() => { | ||
self.__assert_exchange(() => { | ||
const chosen_queue = options.queueName || event | ||
self.__use_consume_channel(() => { | ||
// TODO Check this for a valid response | ||
self._consume_channel.assertQueue(chosen_queue, { | ||
exclusive: false, | ||
durable: true, | ||
autoDelete: false | ||
}) | ||
}) | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.bindQueue(chosen_queue, self._exchange_name, event, {}, (err, ok) => { | ||
if (err) { | ||
console.error(err) | ||
} | ||
self._consume_channel.consume(chosen_queue, (message) => { | ||
if (!message.properties.timestamp) { | ||
self.__consume_res(message, callbacks, context) | ||
} else { | ||
const time_to_wait = parseInt(message.properties.timestamp - new Date().getTime()) | ||
if (time_to_wait <= 0) { | ||
self.__consume_res(message, callbacks, context) | ||
} else { | ||
setTimeout(() => { | ||
self.__consume_res(message, callbacks, context) | ||
}, time_to_wait) | ||
} | ||
} | ||
}, { | ||
exclusive: false | ||
}) | ||
}) | ||
}) | ||
}) | ||
Object.keys(aliases).forEach((key) => { | ||
aliases[key].forEach((alias) => { | ||
this[alias] = this[key] | ||
}) | ||
} | ||
}) | ||
connect.apply(this, [this._options]) | ||
Remit.prototype.req = function req (event, args, callback, options, caller) { | ||
const self = this | ||
if (!options) { | ||
options = {} | ||
} | ||
if (self._trace) { | ||
if (!caller) { | ||
caller = trace.get(Remit.prototype.req)[0].toString() | ||
} | ||
options.appId = self._service_name | ||
options.messageId = caller | ||
options.type = event | ||
} | ||
options.headers = options.headers || {} | ||
options.headers.uuid = uuid() | ||
self.__connect(() => { | ||
self.__assert_exchange(() => { | ||
if (!callback) { | ||
return self.__use_publish_channel(() => { | ||
self._publish_channel.publish(self._exchange_name, event, new Buffer(JSON.stringify(args || {})), options) | ||
}) | ||
} | ||
if (!self._consuming_results) { | ||
self._consuming_results = true | ||
self.__use_publish_channel(() => { | ||
self._publish_channel.consume('amq.rabbitmq.reply-to', function (message) { | ||
self.__on_result.apply(self, arguments) | ||
}, { | ||
exclusive: true, | ||
noAck: true | ||
}, (err, ok) => { | ||
if (err) { | ||
console.warn(err) | ||
} else { | ||
send_message() | ||
} | ||
}) | ||
}) | ||
} else { | ||
send_message() | ||
} | ||
function send_message () { | ||
const correlation_id = uuid() | ||
self._results_callbacks[correlation_id] = { | ||
callback: callback, | ||
context: null, | ||
autoDeleteCallback: true | ||
} | ||
options.mandatory = true | ||
options.replyTo = 'amq.rabbitmq.reply-to' | ||
options.correlationId = correlation_id | ||
self._results_timeouts[correlation_id] = setTimeout(function () { | ||
if (!self._results_callbacks[correlation_id]) { | ||
return | ||
} | ||
delete self._results_callbacks[correlation_id] | ||
delete self._results_timeouts[correlation_id] | ||
try { | ||
callback({ | ||
event: event, | ||
args: args, | ||
options: options, | ||
message: `Timed out after no response for ${options.timeout || 5000}ms` | ||
}) | ||
} catch (e) { | ||
if (self.on_error) { | ||
self.on_error(e) | ||
} else { | ||
throw e | ||
} | ||
} | ||
}, options.timeout || 5000) | ||
self.__use_publish_channel(() => { | ||
self._publish_channel.publish(self._exchange_name, event, new Buffer(JSON.stringify(args || {})), options) | ||
}) | ||
} | ||
}) | ||
}) | ||
return this | ||
} | ||
Remit.prototype.listen = function listen (event, callback, context, options) { | ||
const self = this | ||
if (!self._service_name) { | ||
throw new Error('Must provide a service name if listening') | ||
} | ||
if (!options) { | ||
options = {} | ||
} | ||
self._listener_counts[event] = self._listener_counts[event] || 0 | ||
options.queueName = `${event}:emission:${self._service_name}:${++self._listener_counts[event]}` | ||
self.res.call(self, event, callback, context, options) | ||
module.exports = function (options) { | ||
return new Remit(options) | ||
} | ||
Remit.prototype.emit = function emit (event, args, options) { | ||
const self = this | ||
if (!options) { | ||
options = {} | ||
} | ||
options.broadcast = true | ||
options.autoDeleteCallback = options.ttl ? false : true | ||
let caller | ||
if (self._trace) { | ||
caller = trace.get(Remit.prototype.emit)[0].toString() | ||
} | ||
self.req.call(self, event, args, options.onResponse, options, caller) | ||
} | ||
Remit.prototype.demit = function demit (event, delay, args, options) { | ||
const self = this | ||
if (!options) { | ||
options = {} | ||
} | ||
options.broadcast = true | ||
options.autoDeleteCallback = options.ttl ? false : true | ||
if (Object.prototype.toString.call(delay) === '[object Date]') { | ||
options.timestamp = delay.getTime() | ||
} | ||
let caller | ||
if (self._trace) { | ||
caller = trace.get(Remit.prototype.demit)[0].toString() | ||
} | ||
self.req.call(self, event, args, options.onResponse, options, caller) | ||
} | ||
Remit.prototype.treq = function treq (event, args, callback, options) { | ||
const self = this | ||
if (!options) { | ||
options = {} | ||
} | ||
if (!options.expiration) { | ||
options.expiration = 5000 | ||
} | ||
if (!options.timeout) { | ||
options.timeout = 5000 | ||
} | ||
let caller | ||
if (self._trace) { | ||
caller = trace.get(Remit.prototype.treq)[0].toString() | ||
} | ||
self.req(event, args, callback, options, caller) | ||
} | ||
Remit.prototype.__connect = function __connect (callback) { | ||
const self = this | ||
// If no callback was given, we still pretend there | ||
// is one. We use this to signify queue presence. | ||
if (!callback) { | ||
callback = function () {} | ||
} | ||
// If a connection already exists | ||
if (self._connection) { | ||
// If there are still callbacks being processed, | ||
// hop into the queue; no need to trigger it now. | ||
// Be British and get in line! | ||
if (self._connection_callbacks.length) { | ||
self._connection_callbacks.push(callback) | ||
return | ||
} | ||
// Otherwise we do need to trigger now. We missed | ||
// the queue. #awkward | ||
return callback() | ||
} | ||
// If we're here, a connection doesn't currently exist. | ||
// Now we check whether we're the first call to do this. | ||
// If we are, we'll be the ones to try and connect. | ||
const first = !self._connection_callbacks.length | ||
// Push our callback in to the queue, eh? | ||
self._connection_callbacks.push(callback) | ||
if (!first) { | ||
return | ||
} | ||
let connection_options = {} | ||
if (self._service_name) { | ||
connection_options.clientProperties = { | ||
connection_name: self._service_name | ||
} | ||
} | ||
// So let's connect! | ||
amqplib.connect(self._url, connection_options, (err, connection) => { | ||
if (err) { | ||
throw err | ||
} | ||
// Everything's go fine, so we'll set this global | ||
// object to our new connection. | ||
self._connection = connection | ||
// Time to run the callbacks. Let's run them and | ||
// take them out of the queue. | ||
// Loop through and make everything happen! | ||
while (self._connection_callbacks.length > 0) { | ||
self._connection_callbacks[0]() | ||
self._connection_callbacks.shift() | ||
} | ||
}) | ||
} | ||
Remit.prototype.__use_consume_channel = function __use_consume_channel (callback) { | ||
const self = this | ||
if (!callback) { | ||
callback = function () {} | ||
} | ||
if (self._consume_channel) { | ||
if (self._consume_channel_callbacks.length) { | ||
self._consume_channel_callbacks.push(callback) | ||
return | ||
} | ||
return callback() | ||
} | ||
const first = !self._consume_channel_callbacks.length | ||
self._consume_channel_callbacks.push(callback) | ||
if (!first) { | ||
return | ||
} | ||
self.__connect(() => { | ||
self._connection.createChannel((err, channel) => { | ||
channel.on('error', (err) => { | ||
self._consume_channel = null | ||
self.__use_consume_channel() | ||
}) | ||
channel.on('close', () => { | ||
self._consume_channel = null | ||
self.__use_consume_channel() | ||
}) | ||
self._consume_channel = channel | ||
// Loop through and make everything happen! | ||
while (self._consume_channel_callbacks.length > 0) { | ||
self._consume_channel_callbacks[0]() | ||
self._consume_channel_callbacks.shift() | ||
} | ||
}) | ||
}) | ||
} | ||
Remit.prototype.__use_publish_channel = function __use_publish_channel (callback) { | ||
const self = this | ||
if (!callback) { | ||
callback = function () {} | ||
} | ||
if (self._publish_channel) { | ||
if (self._publish_channel_callbacks.length) { | ||
self._publish_channel_callbacks.push(callback) | ||
return | ||
} | ||
return callback() | ||
} | ||
const first = !self._publish_channel_callbacks.length | ||
self._publish_channel_callbacks.push(callback) | ||
if (!first) { | ||
return | ||
} | ||
self.__connect(() => { | ||
self._connection.createChannel((err, channel) => { | ||
channel.on('error', (err) => { | ||
self._publish_channel = null | ||
self.__use_publish_channel() | ||
}) | ||
channel.on('close', () => { | ||
self._publish_channel = null | ||
self.__use_publish_channel() | ||
}) | ||
self._publish_channel = channel | ||
// Loop through and make everything happen! | ||
while (self._publish_channel_callbacks.length > 0) { | ||
self._publish_channel_callbacks[0]() | ||
self._publish_channel_callbacks.shift() | ||
} | ||
}) | ||
}) | ||
} | ||
Remit.prototype.__use_work_channel = function __use_work_channel (callback) { | ||
const self = this | ||
if (!callback) { | ||
callback = function () {} | ||
} | ||
if (self._work_channel) { | ||
if (self._work_channel_callbacks.length) { | ||
self._work_channel_callbacks.push(callback) | ||
return | ||
} | ||
return callback() | ||
} | ||
const first = !self._work_channel_callbacks.length | ||
self._work_channel_callbacks.push(callback) | ||
if (!first) { | ||
return | ||
} | ||
self.__connect(() => { | ||
self._connection.createChannel((err, channel) => { | ||
channel.on('error', (err) => { | ||
self._work_channel = null | ||
self.__use_work_channel() | ||
}) | ||
channel.on('close', () => { | ||
self._work_channel = null | ||
self.__use_work_channel() | ||
}) | ||
self._work_channel = channel | ||
// Loop through and make everything happen! | ||
while (self._work_channel_callbacks.length > 0) { | ||
self._work_channel_callbacks[0]() | ||
self._work_channel_callbacks.shift() | ||
} | ||
}) | ||
}) | ||
} | ||
Remit.prototype.__assert_exchange = function __assert_exchange (callback) { | ||
const self = this | ||
// If no callback was given, we still pretend there | ||
// is one. We use this to signify queue presence. | ||
if (!callback) { | ||
callback = function () {} | ||
} | ||
// If the exchange already exists | ||
if (self._exchange) { | ||
// If there are still callbacks being processed, | ||
// hop into the queue; no need to trigger it now. | ||
// Be British and get in line! | ||
if (self._exchange_callbacks.length) { | ||
self._exchange_callbacks.push(callback) | ||
return | ||
} | ||
// Otherwise we do need to trigger now. We missed | ||
// the queue. #awkward | ||
return callback() | ||
} | ||
// If we're here, an exchange doesn't currently exist. | ||
// Now we check whether we're the first call to do this. | ||
// If we are, we'll be the ones to try and connect. | ||
const first = !self._exchange_callbacks.length | ||
// Push our callback in to the queue, eh? | ||
self._exchange_callbacks.push(callback) | ||
if (!first) { | ||
return | ||
} | ||
// Let's try making this exchange! | ||
self.__use_work_channel(() => { | ||
self._work_channel.assertExchange(self._exchange_name, 'topic', { | ||
autoDelete: true | ||
}, (err, ok) => { | ||
if (err) { | ||
console.error(err) | ||
} | ||
// Everything went awesome so we'll let everything | ||
// know that the exchange is up. | ||
self._exchange = true | ||
// Time to run any callbacks that were waiting on | ||
// this exchange being made. | ||
// Loop through and make everything happen! | ||
while (self._exchange_callbacks.length > 0) { | ||
self._exchange_callbacks[0]() | ||
self._exchange_callbacks.shift() | ||
} | ||
}) | ||
}) | ||
} | ||
Remit.prototype.__consume_res = function __consume_res (message, callbacks, context) { | ||
const self = this | ||
let data | ||
try { | ||
data = JSON.parse(message.content.toString()) | ||
} catch (e) { | ||
return self.__use_consume_channel(() => { | ||
self._consume_channel.nack(message, false, false) | ||
}) | ||
} | ||
const extra = { | ||
service: message.properties.appId, | ||
event: message.properties.type, | ||
caller: message.properties.messageId, | ||
uuid: message.properties.headers.uuid | ||
} | ||
if (!message.properties.correlationId || !message.properties.replyTo) { | ||
function done (err, data) { | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.ack(message) | ||
}) | ||
} | ||
try { | ||
step_through_callbacks(callbacks, data, extra, done) | ||
} catch (e) { | ||
if (message.properties.headers && message.properties.headers.attempts && message.properties.headers.attempts > 4) { | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.nack(message, false, false) | ||
}) | ||
} else { | ||
message.properties.headers = increment_headers(message.properties.headers) | ||
function check_and_republish() { | ||
self.__use_work_channel(() => { | ||
self._work_channel.checkQueue(message.properties.replyTo, (err, ok) => { | ||
if (err) { | ||
// If we got a proper queue error then the queue must | ||
// just not be around. | ||
if (err.message.substr(0, 16) === 'Operation failed') { | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.nack(message, false, false) | ||
}) | ||
} else { | ||
check_and_republish() | ||
} | ||
} else { | ||
self.__use_publish_channel(() => { | ||
self._publish_channel.publish('', message.properties.replyTo, message.content, message.properties) | ||
}) | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.ack(message) | ||
}) | ||
} | ||
}) | ||
}) | ||
} | ||
check_and_republish() | ||
} | ||
if (self.on_error) { | ||
self.on_error(e) | ||
} else { | ||
throw e | ||
} | ||
} | ||
} else { | ||
function done (err, data) { | ||
const options = {correlationId: message.properties.correlationId} | ||
const res_data = new Buffer(JSON.stringify(Array.prototype.slice.call(arguments))) | ||
function check_and_publish () { | ||
self.__use_work_channel(() => { | ||
self._work_channel.checkQueue(message.properties.replyTo, (err, ok) => { | ||
if (err) { | ||
// If we got a proper queue error then the queue must | ||
// just not be around. | ||
if (err.message.substr(0, 16) === 'Operation failed') { | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.nack(message, false, false) | ||
}) | ||
} else { | ||
check_and_publish() | ||
} | ||
} else { | ||
self.__use_publish_channel(() => { | ||
self._publish_channel.publish('', message.properties.replyTo, res_data, options) | ||
}) | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.ack(message) | ||
}) | ||
} | ||
}) | ||
}) | ||
} | ||
check_and_publish() | ||
} | ||
try { | ||
step_through_callbacks(callbacks, data, extra, done) | ||
} catch (e) { | ||
if (message.properties.headers && message.properties.headers.attempts && message.properties.headers.attempts > 4) { | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.nack(message, false, false) | ||
}) | ||
} else { | ||
message.properties.headers = increment_headers(message.properties.headers) | ||
function check_and_republish () { | ||
self.__use_work_channel(() => { | ||
self._work_channel.checkQueue(message.properties.replyTo, (err, ok) => { | ||
if (err) { | ||
// If we got a proper queue error then the queue must | ||
// just not be around. | ||
if (err.message.substr(0, 16) === 'Operation failed') { | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.nack(message, false, false) | ||
}) | ||
} else { | ||
check_and_republish() | ||
} | ||
} else { | ||
self.__use_publish_channel(() => { | ||
self._publish_channel.publish('', message.properties.replyTo, message.content, message.properties) | ||
}) | ||
self.__use_consume_channel(() => { | ||
self._consume_channel.ack(message) | ||
}) | ||
} | ||
}) | ||
}) | ||
} | ||
check_and_republish() | ||
} | ||
if (self.on_error) { | ||
self.on_error(e) | ||
} else { | ||
throw e | ||
} | ||
} | ||
} | ||
} | ||
Remit.prototype.__on_result = function __on_result (message) { | ||
const self = this | ||
const callback = self._results_callbacks[message.properties.correlationId] | ||
let data = JSON.parse(message.content.toString()) | ||
if (!Array.isArray(data)) data = [data] | ||
delete self._results_timeouts[message.properties.correlationId] | ||
// If it turns out we don't have a callback here (this can | ||
// happen if the timeout manages to get there first) then | ||
// let's exit before we get ourselves into trouble. | ||
if (!callback) { | ||
return | ||
} | ||
try { | ||
callback.callback.apply(callback.context, data) | ||
} catch (e) { | ||
delete self._results_callbacks[message.properties.correlationId] | ||
if (self.on_error) { | ||
self.on_error(e) | ||
} else { | ||
throw e | ||
} | ||
} | ||
delete self._results_callbacks[message.properties.correlationId] | ||
} | ||
function increment_headers (headers) { | ||
if (!headers) { | ||
return { | ||
attempts: 1 | ||
} | ||
} | ||
if (!headers.attempts) { | ||
headers.attempts = 1 | ||
return headers | ||
} | ||
headers.attempts = parseInt(headers.attempts) + 1 | ||
return headers | ||
} | ||
function step_through_callbacks (callbacks, args, extra, done, index) { | ||
args = args !== undefined ? args : {} | ||
extra = extra || {} | ||
if (!index) { | ||
index = 0 | ||
if (!Array.isArray(callbacks)) { | ||
return callbacks(args, done, extra) | ||
} | ||
if (callbacks.length === 1) { | ||
return callbacks[index](args, done, extra) | ||
} | ||
return callbacks[index](args, (err, args) => { | ||
if (err) { | ||
return done(err, args) | ||
} | ||
return step_through_callbacks(callbacks, args, extra, done, ++index) | ||
}, extra) | ||
} | ||
if (!callbacks[index]) { | ||
return done(null, args) | ||
} | ||
return callbacks[index](args, (err, args) => { | ||
if (err) { | ||
return done(err, args) | ||
} | ||
return step_through_callbacks(callbacks, args, extra, done, ++index) | ||
}, extra) | ||
} |
{ | ||
"name": "remit", | ||
"version": "1.6.0", | ||
"version": "2.0.0-alpha.2", | ||
"description": "A small set of functionality used to create microservices that don't need to be aware of one-another's existence.", | ||
"main": "index.js", | ||
"engines": { | ||
"node": ">=6" | ||
}, | ||
"scripts": { | ||
"test": "./node_modules/.bin/mocha ./test/index.js -w --debug", | ||
"coverage": "./node_modules/.bin/istanbul cover ./node_modules/.bin/_mocha -- ./test/index.js --timeout=60000" | ||
"test": "./node_modules/.bin/mocha --require test/bootstrap test/*.test.js test/**/*.test.js", | ||
"coverage": "./node_modules/.bin/istanbul cover ./node_modules/mocha/bin/_mocha -- -R spec --require test/bootstrap test/*.test.js test/**/*.test.js", | ||
"travis": "./node_modules/.bin/istanbul cover ./node_modules/mocha/bin/_mocha --report lcovonly -- -R spec --require test/bootstrap test/*.test.js test/**/*.test.js && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage" | ||
}, | ||
"author": "Jack Williams <jack@tagstr.co>", | ||
"license": "ISC", | ||
"author": "Jack Williams <jack@wildfire.gg>", | ||
"license": "MIT", | ||
"dependencies": { | ||
"amqplib": "^0.4.2", | ||
"amqplib": "^0.5.1", | ||
"debug": "^2.4.1", | ||
"stack-trace": "0.0.9", | ||
"debug": "^2.4.4", | ||
"eventemitter3": "^1.2.0", | ||
"uuid": "^3.0.1" | ||
@@ -20,10 +26,11 @@ }, | ||
"chai": "^3.5.0", | ||
"chalk": "^1.1.3", | ||
"coveralls": "^2.11.15", | ||
"istanbul": "^0.4.5", | ||
"mocha": "^2.5.3", | ||
"prompt": "^1.0.0" | ||
"mocha": "^3.2.0", | ||
"sinon": "^1.17.6", | ||
"sinon-chai": "^2.8.0" | ||
}, | ||
"repository": { | ||
"type": "git", | ||
"url": "git+ssh://git@github.com/jpwilliams/remit.git" | ||
"url": "https://github.com/jpwilliams/remit.git" | ||
}, | ||
@@ -42,3 +49,6 @@ "keywords": [ | ||
"emit", | ||
"listen" | ||
"listen", | ||
"distributed", | ||
"events", | ||
"messaging" | ||
], | ||
@@ -45,0 +55,0 @@ "bugs": { |
517
README.md
@@ -1,424 +0,279 @@ | ||
![Dependencies](https://david-dm.org/jpwilliams/remit.svg) | ||
![Downloads](https://img.shields.io/npm/dm/remit.svg) | ||
# _Remit_ | ||
# What's Remit? | ||
A small set of functionality used to create microservices that don't need to be aware of one-another's existence. It uses AMQP at its core to manage service discovery-like behaviour without the need to explicitly connect one service to another. | ||
[![Build Status](https://travis-ci.org/jpwilliams/remit.svg?branch=overhaul)](https://travis-ci.org/jpwilliams/remit) [![Coverage Status](https://coveralls.io/repos/github/jpwilliams/remit/badge.svg?branch=overhaul)](https://coveralls.io/github/jpwilliams/remit?branch=overhaul) [![npm downloads per month](https://img.shields.io/npm/dm/remit.svg)](https://www.npmjs.com/package/remit) [![npm version](https://img.shields.io/npm/v/remit.svg)](https://www.npmjs.com/package/remit) | ||
# Contents | ||
A small set of functionality used to create microservices that don't need to be aware of one-another's existence. | ||
* [Simple usage](#simple-usage) | ||
* [Pre-requisites](#pre-requisites) | ||
* [Installation](#installation) | ||
* [Key examples](#key-examples) | ||
* [API reference](#api-reference) | ||
* [Improvements](#improvements) | ||
``` sh | ||
npm install remit | ||
``` | ||
# Simple usage | ||
`remit` makes use of four simple commands: `req` (request), `res` (respond), `emit` and `listen`. | ||
* `req` requests data from a defined endpoint which, in turn, is created using `res` | ||
* `listen` waits for messages `emit`ted from anywhere in the system. | ||
A connection to your AMQP server's required before you can get going, but you can easily do that! | ||
```javascript | ||
``` js | ||
// Create our Remit connection | ||
const remit = require('remit')({ | ||
name: 'my_service', // this is required for a service that has a listener | ||
url: 'amqp://localhost' | ||
url: 'localhost', | ||
name: 'a.micro.service' | ||
}) | ||
``` | ||
After that, the world is yours! Here are some basic examples of the four commands mentioned above. | ||
// Set up an endpoint with the name 'micro.service.info' | ||
remit | ||
.endpoint('micro.service.info') | ||
```javascript | ||
// API | ||
remit.req('add', { | ||
first: 2, | ||
second: 7 | ||
}, function (err, data) { | ||
console.log('The result is ' + data) | ||
}) | ||
// When the endpoint is hit, run this function | ||
.data((data, callback) => { | ||
console.log('Endpoint was hit!') | ||
// Server | ||
remit.res('add', function (args, done) { | ||
done(null, (args.first + args.second)) | ||
data.foo = 'bar' | ||
remit.emit('something.happened', args) | ||
}) | ||
// Reply with the mutated data | ||
return callback(null, data) | ||
// Listener 1 | ||
remit.listen('something.happened', function (args, done) { | ||
console.log(args) | ||
// We return done() to acknowledge that the task has been completed | ||
return done() | ||
}) | ||
// Listener 2 | ||
remit.listen('something.#', function (args) { | ||
console.log('Something... did something...') | ||
return done() | ||
}) | ||
/* | ||
1. The API requests the 'add' endpoint. | ||
2. The Server responds with the result of the sum. | ||
3. The API logs 'The result is 9'. | ||
4. The Server emits the 'something.happened' event. | ||
5. Listener 1 logs the arguments the API sent. | ||
6. Listener 2 logs 'Something... did something...'. | ||
*/ | ||
// Once the endpoint is ready... | ||
}).ready().then(() => { | ||
// Make a request to the 'micro.service.save' endpoint | ||
// with the data {name: 'a.micro.service'} | ||
return remit | ||
.request('micro.service.save') | ||
.send({name: 'a.micro.service'}) | ||
}).then((result) => { | ||
// When the reply comes back, log the response. | ||
console.log('Saved microservice info', result) | ||
}).catch((err) => { | ||
// If the request failed (the replying microservice returned | ||
// an error, the request timed out or couldn't be routed), | ||
// log the error. | ||
console.error('Couldn\'t seem to save microservice info', err) | ||
}) | ||
``` | ||
# Pre-requisites | ||
## Contents | ||
To use `remit` you'll need: | ||
* A _RabbitMQ_ server (_Remit_ `1.2.0+` requires `>=3.4.0`) | ||
* _Node v4.x.x_ | ||
* _npm_ | ||
* [Getting started](#) | ||
* [Concepts](#concepts) | ||
* [API reference](#api-reference) | ||
# Installation | ||
## Concepts | ||
Once your _RabbitMQ_ server's up and running, simply use `npm` to install `remit`! | ||
```javascript | ||
npm install remit | ||
``` | ||
##### Basic | ||
# Key examples | ||
_Remit_, primarily, makes use of four simple commands: `request`, `respond`, `emit` and `listen`. | ||
There are two methods for sending messages with `remit`: _request_ or _emit_. | ||
`request` requests data from an endpoint defined using `respond`. | ||
`emit` "emits" data to any and all "listeners" defined using `listen`. | ||
A _request_ implies that the requester wants a response back, whereas using an _emission_ means you wish to notify other services of an event without requiring their input. | ||
##### Example | ||
Let's start with a simple authentication example. We'll set up an API that our user can request to log in. | ||
A simple example here allows us to define an endpoint that increments a counter and emits that change to the rest of the system. | ||
```javascript | ||
// Import remit and connect to our AMQP server | ||
``` js | ||
// This service sets up our counter and incrementer. | ||
const remit = require('remit')() | ||
let counter = 0 | ||
// Import whatever HTTP API creator we want | ||
const api = require('some-api-maker') | ||
remit | ||
.endpoint('counter.increment') | ||
.data((event, callback) => { | ||
remit.emit('counter.incremented').send(++counter) | ||
// Set up a route using our API creator | ||
api.get('/login', function (req, res) { | ||
// Send a request via remit to the 'user.login' endpoint | ||
remit.req('user.login', { | ||
username: req.username, | ||
password: req.password | ||
}, function (err, data) { | ||
//If there's something wrong... | ||
if (err) return res.failure(err) | ||
// Otherwise, woohoo! We're logged in! | ||
return res.success(data.user) | ||
}) | ||
}) | ||
return callback(null, counter) | ||
}) | ||
``` | ||
Awesome! Now we'll set up the authentication service that'll respond to the request. | ||
```javascript | ||
// Import remit and connect to our AMQP server | ||
``` js | ||
// Here we set up a listener for when the counter's been incremented | ||
const remit = require('remit')() | ||
// Respond to 'user.login' events | ||
remit.res('user.login', function (args, done) { | ||
// If it's not Mr. Bean, send back an error! | ||
if (args.username !== 'Mr. Bean') return done('You\'re not Mr. Bean!') | ||
remit | ||
.listen('counter.incremented') | ||
.data((event, callback) => { | ||
console.log(`Counter is now ${event.data}`) | ||
// Otherwise, let's "log in" | ||
done(null, { | ||
username: 'Mr. Bean', | ||
birthday: '14/06/1961' | ||
}) | ||
}) | ||
return callback() | ||
}) | ||
``` | ||
Done. That's it. Our `API` service will request an answer to the `user.login` endpoint and our server will respond. Simples. | ||
``` js | ||
// And here we increment the counter! | ||
const remit = require('remit')() | ||
Let's now say that we want a service to listen out for if it's a user's birthday and send them an email if they've logged in on that day! With most other systems, this would require adding business logic to our login service to explicitly call some `birthday` service and check, but not with `remit`. | ||
At the end of our `authentication` service, let's add an emission of `user.login.success`. | ||
```javascript | ||
// Respond to 'user.login' events | ||
remit.res('user.login', function (args, done) { | ||
// If it's not Mr. Bean, send back an error! | ||
if (args.username !== 'Mr. Bean') return done('You\'re not Mr. Bean!') | ||
// Otherwise, let's "log in" | ||
let user = { | ||
username: 'Mr. Bean', | ||
birthday: '14/06/1961' | ||
} | ||
done(null, user) | ||
// After we've logged the user in, let's emit that everything went well! | ||
remit.emit('user.login.success', { user }) | ||
}) | ||
remit.request('counter.increment')() | ||
``` | ||
Now that we've done that, _any_ other services on the network can listen in on that event and react accordingly! | ||
## API reference | ||
Let's make our `birthday` service. | ||
* [Remit](#remit-1) | ||
* [remit](#requireremitoptions--remit-) | ||
* [Request](#requestdata-) | ||
* [#request / #req](#remitrequestendpoint-options--request-) | ||
* [#persistentRequest / #preq](#templates) | ||
* [#emit](#templates) | ||
* [#delayedEmit / #demit](#templates) | ||
* [Response](#responsedata-) | ||
* [#respond / #res / #endpoint](#remitrespondendpoint-options--response-) | ||
* [#listen / #on](#templates-1) | ||
```javascript | ||
const remit = require('remit')({ | ||
name: 'birthday' | ||
}) | ||
------ | ||
const beanmail = require('send-mail-to-mr-bean') | ||
#### `Remit` | ||
remit.listen('user.login.success', function (args, done) { | ||
let today = '14/06/1961' | ||
A _Remit_ instance representing a single connection to the AMQ in use. | ||
if (today === args.user.birthday) { | ||
beanmail.send() | ||
} | ||
##### Properties | ||
return done() | ||
}) | ||
``` | ||
* [`request`](#remitrequestendpoint-options--request-) | ||
* [`req`](#remitrequestendpoint-options--request-) | ||
* [`persistentRequest`](#templates) | ||
* [`preq`](#templates) | ||
* [`emit`](#templates) | ||
* [`delayedEmit`](#templates) | ||
* [`demit`](#templates) | ||
* [`respond`](#remitrespondendpoint-options--response-) | ||
* [`res`](#remitrespondendpoint-options--response-) | ||
* [`endpoint`](#remitrespondendpoint-options--response-) | ||
* [`listen`](#templates-1) | ||
* [`on`](#templates-1) | ||
Sorted. Now every time someone logs in successfully, we run a check to see if it's their birthday. | ||
------ | ||
Emissions can be hooked into by any number of different services, but only one "worker" per service will receive each emission. | ||
#### `require('remit')([options])` > [`Remit`](#remit-1) [^](#api-reference) | ||
So let's also start logging every time a user performs _any_ action. We can do this by using the `#` wildcard. | ||
Creates a new _Remit_ instance, using the given options, and connects to the message queue. | ||
```javascript | ||
const remit = require('remit')({ | ||
name: 'logger' | ||
}) | ||
##### Arguments | ||
let user_action_counter = 0 | ||
* `options` (_Optional_) An object containing a mixture of _Remit_ and AMQ options. Acceptable values are currently: | ||
* `url` (_Optional, defaults to `'amqp://localhost'`_) The [RabbitMQ URI](https://www.rabbitmq.com/uri-spec.html) to use to connect to the AMQ instance. If not defined, tries to fall back to the `REMIT_URL` environment variable before the default. Any query string options defined are overwritten. | ||
* `name` (_Optional, defaults to `''`_) The friendly connection name to give the service. This is used heavily for load balancing requests, so instances of the same service (that should load balance requests between themselves) should have the same name. If not defined, tries to fall back to the `REMIT_NAME` environment variable before the default. | ||
* `exchange` (_Optional, defaults to `'remit'`_) The AMQ exchange to connect to. | ||
remit.listen('user.#', function (args, done) { | ||
user_action_counter++ | ||
##### Returns [`Remit`](#remit-1) | ||
return done() | ||
}) | ||
``` | ||
##### AMQ behaviour | ||
# API reference | ||
1. Connects to the AMQ | ||
* [`Remit`](#requireremitoptions) - Instantiate Remit | ||
* [`req`](#reqendpoint-data-callback-options--timeout-5000) - Make a request to an endpoint | ||
* [`treq`](#treqendpoint-data-callback-options--timeout-5000) - Make a transient request to an endpoint | ||
* [`res`](#resendpoint-callback-context-options--queuename-my_queue) - Define an endpoint | ||
* [`emit`](#emitevent-data-options) - Emit to all listeners of an event | ||
* [`demit`](#demitevent-eta-data-options) - Emit to all listeners of an event at a specified time | ||
* [`listen`](#listenevent-callback-context-options--queuename-my_queue) - Listen to emissions of an event | ||
------ | ||
## require('remit')([options]) | ||
#### `Request([data])` [^](#api-reference) | ||
Creates a Remit object, with the specified `options` (if any), ready for use with further functions. | ||
A request set up for a specific endpoint, ready to send and receive data. | ||
#### Arguments | ||
##### Arguments | ||
* `options` - _Optional_ An object containing options to give to the Remit instantiation. Currently-acceptable options are: | ||
* `name` - The name to give the current service. This is used heavily for load balancing requests, so instances of the same service (that should load balance requests between themselves) should have the same name. Is _required_ if using [`listen`](#listenevent-callback-context-options--queuename-my_queue). | ||
* `url` - The URL to use to connect to the AMQ. Defaults to `amqp://localhost`. | ||
* `connection` - If you already have a valid AMQ connection, you can provide and use it here. The use cases for this are slim but present. | ||
* `data` (_Optional_) Can be any JSON-compatible data that you wish to send to the endpoint. | ||
## req(endpoint, data, [callback], [options = {timeout: 5000}]) | ||
##### Properties | ||
Makes a request to the specified `endpoint` with `data`, optionally returning a `callback` detailing the response. It's also possible to provide `options`, namely a `timeout`. | ||
* `send` Synonymous with calling the `Request` object. | ||
* `data(callback)` Provide a callback to be run when a reply is received from a request. | ||
* `sent(callback)` Provide a callback to be run when the request successfully sends data to an endpoint. | ||
#### Arguments | ||
##### Returns | ||
* `endpoint` - A string endpoint that can be defined using [`res`](#resendpoint-callback-context-options--queuename-my_queue). | ||
* `data` - Can be any of `boolean`, `string`, `array` or `object` and will be passed to the responder. | ||
* `callback(err, data)` - _Optional_ A callback which is called either when the responder has handled the message or the message "timed out" waiting for a response. In the case of a timeout, `err` will be populated, though the responder can also explicitly control what is sent back in both `err` and `data`. | ||
* `options` - _Optional_ Supply an object here to explicitly define certain options for the AMQ message. `timeout` is the amount of time in milliseconds to wait for a response before returning an error. There is currently only one _defined_ use case for this, though it gives you total freedom as to what options you provide. | ||
Returns a promise that resolves when the reply is received. If `timeout` or `error` are emitted, the promise is rejected. If `data` is emitted with a falsey `err`, the promise is resolved. If the `err` is truthy, the promise is rejected. | ||
#### Examples | ||
------ | ||
```javascript | ||
// Calls the 'user.profile', endpoint, but doesn't ask for a response. | ||
remit.req('user.profile', { | ||
username: 'jacob123' | ||
}) | ||
``` | ||
#### `remit.request(endpoint[, options])` > [`Request`](#requestdata-) [^](#api-reference) | ||
```javascript | ||
// Calls the 'user.profile' endpoint asking for a response but timing out after the default of 5 seconds. | ||
remit.req('user.profile', { | ||
username: 'jacob123' | ||
}, (err, data) => { | ||
if (err) console.error('Oh no! Something went wrong!', err) | ||
_Aliases: `req`_ | ||
return console.log('Got the result back!', data) | ||
}) | ||
``` | ||
Sets up a request pointed at the specified `endpoint` with the given `options`. | ||
```javascript | ||
// Calls the 'user.profile', endpoint asking for a response but timing out after a custom wait of 20 seconds. | ||
remit.req('user.profile', { | ||
username: 'jacob123' | ||
}, (err, data) => { | ||
if (err) console.error('Oh no! Something went wrong!', err) | ||
##### Arguments | ||
return console.log('Got the result back!', data) | ||
}, { | ||
timeout: 20000 | ||
}) | ||
``` | ||
* `endpoint` A string representing an endpoint, defined using [`respond` / `res` / `endpoint`](#remitrespondevent-options--response-). | ||
* `options` (_Optional_) An object containing a mixture of _Remit_ and AMQ options. Acceptable values are currently: | ||
* `something` | ||
#### AMQ behaviour | ||
##### Properties | ||
1. Confirms connection and exchange exists. | ||
2. If a callback's provided, confirm the existence of and consume from a "result queue" specific to this process. | ||
3. Publish the message using the provided `endpoint` as a routing key. | ||
* `data(callback)` Provide a global callback to be run when a reply is received from _any_ request. | ||
* `sent(callback)` Provide a global callback to be run when _any_ request successfully sends data to an endpoint. | ||
## treq(endpoint, data, [callback], [options = {timeout: 5000}]) | ||
##### Returns [`Request`](#requestdata-) | ||
Identical to [`req`](#reqendpoint-data-callback-options--timeout-5000) but will remove the request message upon timing out. Useful for calls from APIs. For example, if a client makes a request to delete a piece of content but that request times out, it'd be jarring to have that action suddenly undertaken at an unspecified interval afterwards. `treq` is useful for avoiding that circumstance. | ||
##### Templates | ||
#### AMQ behaviour | ||
Some basic options templates are also set up as separate functions to allow for some semantically-pleasing set-ups without having to skim through objects to figure out what's happening. | ||
Like [`req`](#reqendpoint-data-callback-options--timeout-5000) but adds an `expiration` field to the message. | ||
``` js | ||
// remit.persistentRequest | ||
// remit.preq | ||
{ | ||
"some": "thing" | ||
} | ||
## res(endpoint, callback, [context], [options = {queueName: 'my_queue'}]) | ||
// remit.emit | ||
{ | ||
"some": "thing" | ||
} | ||
Defines an endpoint that responds to [`req`](#reqendpoint-data-callback-options--timeout-5000)s. Returning the provided `callback` is a nessecity regardless of whether the requester wants a response as it is to used to acknowledge messages as being handled. | ||
#### Arguments | ||
* `endpoint` - A string endpoint that requetsers will use to reach this function. | ||
* `callback(args, done)` - A callback containing data from the requester in `args` and requiring the running of `done(err, data)` to signify completion regardless of the requester's requirement for a response. | ||
* `context` - _Optional_ The context in which `callback(args, done)` will be called. | ||
* `options` - _Optional_ An object that can contain a custom queue to listen for messages on. | ||
#### Examples | ||
```javascript | ||
// Defines the 'user.profile' profile endpoint, retrieving a user from our dummy database | ||
remit.res('user.profile', function (args, done) { | ||
if (args.username) return done('No username provided!') | ||
mydb.get_user(args.username, function (err, user) { | ||
return done(err, user) | ||
}) | ||
}) | ||
// remit.delayedEmit | ||
// remit.demit | ||
{ | ||
"some": "thing" | ||
} | ||
``` | ||
#### AMQ behaviour | ||
##### AMQ behaviour | ||
1. Confirms connection and exchange exists. | ||
2. Binds to and consumes from the queue with the name defined by `endpoint` | ||
1. If a reply is sought, follow sub-steps, otherwise skip to step #2 | ||
1. Ensure a connection is available | ||
2. Ensure the channel used for publishing is available | ||
3. Start consuming from `amq.rabbitmq.reply-to` | ||
2. Ensure a connection is available | ||
3. Ensure the channel used for publishing is available | ||
4. Publish the message | ||
## emit(event, [data], [options]) | ||
------ | ||
Emits to all [`listen`](#listenevent-callback-context-options--queuename-my_queue)ers of the specified event, optionally with some `data`. This is essentially the same as [`req`](#reqendpoint-data-callback-options--timeout-5000) but no `callback` can be defined and `broadcast` is set to `true` in the message options. | ||
#### `Response` [^](#api-reference) | ||
#### Arguments | ||
An active endpoint set up and ready to receive data. | ||
* `event` - The "event" to emit to [`listen`](#listenevent-callback-context-options--queuename-my_queue)ers. | ||
* `data` - _Optional_ Data to send to [`listen`](#listenevent-callback-context-options--queuename-my_queue)ers. Can be any of `boolean`, `string`, `array` or `object`. | ||
* `options` - _Optional_ Like [`req`](#reqendpoint-data-callback-options--timeout-5000), supply an object here to explicitly define certain options for the AMQ message. | ||
##### Properties | ||
#### Examples | ||
* `data` Provide a callback to be run when a request is received. | ||
* `ready` Provide a callback to be run when the endpoint becomes live and ready. | ||
```javascript | ||
// Emits the 'user.registered' event to all listeners | ||
remit.emit('user.registered') | ||
``` | ||
------ | ||
```javascript | ||
// Emits the 'user.registered' event, supplying some of the user's basic information | ||
remit.emit('user.registered', { | ||
username: 'jacob123', | ||
name: 'Jacob Four', | ||
email: 'jacob@five.com', | ||
website: 'www.six.com' | ||
}) | ||
``` | ||
#### `remit.respond(endpoint[, options])` > [`Response`](#responsedata-) [^](#api-reference) | ||
#### AMQ behaviour | ||
_Aliases: `res`, `endpoint`_ | ||
1. Confirms connection and exchange exists. | ||
2. Publish the message using the provided `endpoint` as a routing key and with the `broadcast` option set to `true`. | ||
##### Arguments | ||
## demit(event, eta, [data], [options]) | ||
* `endpoint` A string representing the name of the endpoint. This is used as a routing key (see the [RabbitMQ Topics Tutorial](https://www.rabbitmq.com/tutorials/tutorial-five-javascript.html)) so the only allowed characters are `a-zA-Z0-9_.-`. | ||
* `options` (_Optional_) An object containing a mixture of _Remit_ and AMQ options. Acceptable values are currently: | ||
* `something` | ||
Like [`emit`](#emitevent-data-options) but tells [`listen`](#listenevent-callback-context-options--queuename-my_queue)ers to wait until `eta` to running their respective functions. Similar in design and functionality to [Celery's `eta` usage](http://docs.celeryproject.org/en/latest/userguide/calling.html#eta-and-countdown). Largely useful for tasks that should repeat like session health checks. | ||
##### Properties | ||
#### Arguments | ||
* `data` Provide a global callback to be run when _any_ request is received. | ||
* `ready` Provide a global callback to be run when _any_ endpoint becomes live and ready. | ||
* `event` - The "event" to emit to [`listen`](#listenevent-callback-context-options--queuename-my_queue)ers. | ||
* `eta` - A `date` object being the earliest time you wish listeners to respond to the emission. | ||
* `data` - _Optional_ Data to send to [`listen`](#listenevent-callback-context-options--queuename-my_queue)ers. Can be any of `boolean`, `string`, `array` or `object`. | ||
* `options` - _Optional_ Like [`req`](#reqendpoint-data-callback-options--timeout-5000), supply an object here to explicitly define certain options for the AMQ message. | ||
##### Returns [`Response`](#responsedata-) | ||
#### Examples | ||
##### Templates | ||
```javascript | ||
// Emits a "health.check" event that should be processed in 24 hours | ||
let tomorrow = new Date() | ||
tomorrow.setDate(tomorrow.getDate() + 1) | ||
Some basic options templates are also set up as separate functions to allow for some semantically-pleasing set-ups without having to skim through objects to figure out what's happening | ||
remit.demit('health.check', tomorrow) | ||
``` js | ||
// remit.listen | ||
// remit.on | ||
{ | ||
"some": "thing" | ||
} | ||
``` | ||
```javascript | ||
// Emits a "health.check" event that should be processed in 24 hours, providing some relevant data | ||
let tomorrow = new Date() | ||
tomorrow.setDate(tomorrow.getDate() + 1) | ||
##### AMQ behaviour | ||
remit.demit('health.check', tomorrow, { | ||
current_health: 52 | ||
}) | ||
``` | ||
#### AMQ behaviour | ||
Like [`emit`](#emitevent-data-options) but adds a `timestamp` field to the message which is understood by [`listen`](#listenevent-callback-context-options--queuename-my_queue)-based functions. | ||
## listen(event, callback, [context], [options = {queueName: 'my_queue'}]) | ||
Listens to events emitted using [`emit`](#emitevent-data-options). Listeners are grouped for load balancing using their `name` provided when instantiating Remit. | ||
While listeners can't sent data back to the [`emit`](#emitevent-data-options)ter, calling the `callback` is still required for confirming successful message delivery. | ||
#### Arguments | ||
* `event` - The "event" to listen for emissions of. | ||
* `callback(args, done)` - A callback containing data from the emitter in `args` and requiring the running of `done(err)` to signify completion. | ||
* `context` - _Optional_ The context in which `callback(args, done)` will be called. | ||
* `options` - _Optional_ An object that can contain a custom queue to listen for messages on. | ||
#### Examples | ||
```javascript | ||
// Listens for the "user.registered" event, logging the outputted data | ||
remit.listen('user.registered', function (args, done) { | ||
console.log('User registered!', args) | ||
return done() | ||
}) | ||
``` | ||
#### AMQ behaviour | ||
1. Confirms connection and exchange exists. | ||
2. Sets a service-unique queue name and confirms it exists | ||
3. Binds the queue to the routing key defined by `event` and starts consuming from said queue | ||
# Improvements | ||
`remit`'s in its very early stages. Basic use is working well, but here are some features I'm looking at implementing to make things a bit more diverse. | ||
* Ability to specify exchange per connection, endpoint or event | ||
* Cleaner error handling (along with some standards) | ||
* ~~Removal of all use of `process.exit()`~~ | ||
* Connection retrying when losing connection to the AMQ | ||
* ~~Use promises instead of callbacks~~ | ||
* Warnings for duplicate `req` subscriptions | ||
* ~~Better handling of `req` timeouts~~ | ||
* Ability for emissions to receive (multiple) results from listeners if required (I really want to use generators for this) | ||
* Obey the `JSON-RPC 2.0` spec | ||
* Tests! | ||
1. Ensure a connection is available | ||
2. Ensure the channel used for miscellaneous work is available | ||
3. Assert the presence of the queue named after the event given | ||
4. Ensure the channel used for consuming is available | ||
5. Bind the asserted queue using a duplicate routing key | ||
6. Ensure the channel used for consuming is available | ||
7. Start consuming from the event |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
Deprecated
MaintenanceThe maintainer of the package marked it as deprecated. This could indicate that a single version should not be used, or that the package is no longer maintained and any new vulnerabilities will not be fixed.
Found 1 instance in 1 package
Shell access
Supply chain riskThis module accesses the system shell. Accessing the system shell increases the risk of executing arbitrary code.
Found 1 instance in 1 package
51824
30
903
0
5
6
1
280
2
+ Addedeventemitter3@^1.2.0
+ Addedamqplib@0.5.6(transitive)
+ Addedbitsyntax@0.1.0(transitive)
+ Addedbluebird@3.7.2(transitive)
+ Addedbuffer-more-ints@1.0.0(transitive)
+ Addedeventemitter3@1.2.0(transitive)
+ Addedquerystringify@2.2.0(transitive)
+ Addedrequires-port@1.0.0(transitive)
+ Addedsafe-buffer@5.1.2(transitive)
+ Addedurl-parse@1.4.7(transitive)
- Removedamqplib@0.4.2(transitive)
- Removedbitsyntax@0.0.4(transitive)
- Removedbuffer-more-ints@0.0.2(transitive)
- Removedwhen@3.6.4(transitive)
Updatedamqplib@^0.5.1
Updateddebug@^2.4.4