New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

remit

Package Overview
Dependencies
Maintainers
1
Versions
76
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

remit - npm Package Compare versions

Comparing version 1.1.7 to 1.2.0

499

index.js
'use strict'
const master_debug = require('debug')
const os = require('os')
const uuid = require('node-uuid').v4
const amqplib = require('amqplib/callback_api')
const os = require('os')
const amqplib = require('amqplib')

@@ -19,25 +18,24 @@ module.exports = function (opts) {

if (!opts) opts = {}
// Exposed items
this._service_name = opts.name || ''
this._url = opts.url || 'amqp://localhost'
// Global items
this._connection = opts.connection || null
this._channel = opts.channel || null
this._exchange = opts.exchange || null
// Callback queues
this._connection_callbacks = []
this._channel = opts.channel || null
this._emit_on_error = opts.emit_on_error || null
this._exchange = null
this._exchange_name = 'remit'
this._exchange_callbacks = []
this._results_queue = null
this._results_name = null
this._results_callback = {}
// Callback trackers
this._results_callbacks = {}
this._results_timeouts = {}
this._results_callbacks = []
// States
this._consuming_results = false
this._listener_count = 0
this._events = {}
return this

@@ -59,15 +57,14 @@ }

Remit.prototype.res = function res (event, callback, context, options) {
const debug = master_debug('remit.res')
if (this._events[event]) return false
if (!options) options = {}
debug(`Creating "${event}" endpoint`)
const self = this
// Set up default options if we haven't been given any.
if (!options) {
options = {}
}
self.__connect(() => {
self.__assert_exchange(() => {
const chosen_queue = options.queueName || event
// TODO Check this for a valid response
self._channel.assertQueue(chosen_queue, {

@@ -78,25 +75,14 @@ exclusive: false,

})
self._channel.bindQueue(chosen_queue, 'remit', event, {}, (err, ok) => {
if (err) throw new Error(err)
self._channel.bindQueue(chosen_queue, 'remit', event).then(() => {
self._channel.consume(chosen_queue, (message) => {
debug(`New message`)
if (!message.properties.timestamp) {
debug('No timestamp; processing message now')
self.__consume_res(message, callback, context)
} else {
debug('Timestamp found')
const time_to_wait = parseInt(message.properties.timestamp) - new Date().getTime()
const time_to_wait = parseInt(message.properties.timestamp - new Date().getTime())
if (time_to_wait < 0) {
debug('Timestamp obsolete; processing message now')
self.__consume_res(message, callback, context)
} else {
debug(`Processing message in ${time_to_wait}ms`)
setTimeout(() => {
debug(`Processing message after waiting for ${time_to_wait}ms...`)
self.__consume_res(message, callback, context)

@@ -109,3 +95,3 @@ }, time_to_wait)

})
})
}).then(null, console.error)
})

@@ -120,56 +106,32 @@ })

Remit.prototype.treq = function reqt (event, args, callback, options) {
const debug = master_debug('remit.treq')
const self = this
if (!options) options = {}
if (!options.expiration) options.expiration = 5000
if (!options.timeout) options.timeout = 5000
self.req(event, args, callback, options)
}
Remit.prototype.req = function req (event, args, callback, options) {
const debug = master_debug('remit.req')
if (!options) options = {}
const self = this
const correlation_id = uuid()
if (!options) {
options = {}
}
self.__connect(() => {
self.__assert_exchange(() => {
if (!callback) {
debug(`No callback specified for req; publishing "${event}" message now`)
debug(options)
return self._channel.publish('remit', event, new Buffer(JSON.stringify(args || {})), options)
}
self._results_name = `${this._service_name}:callback:${os.hostname()}:${process.pid}`
self._channel.assertQueue(self._results_name, {
exclusive: false,
durable: false,
autoDelete: true
})
self._channel.bindQueue(self._results_name, 'remit', self._results_name, {}, (err, ok) => {
if (err) throw new Error(err)
if (!self._results_queue) {
self._results_queue = true
self._channel.consume(self._results_name, function (message) {
self.__on_result.apply(self, arguments)
})
}
self._results_callback[correlation_id] = {
if (!self._consuming_results) {
self._consuming_results = true
self._channel.consume('amq.rabbitmq.reply-to', function (message) {
self.__on_result.apply(self, arguments)
}, {
exclusive: true,
noAck: true
}).then(send_message).then(null, console.warn)
} else {
send_message()
}
function send_message () {
const correlation_id = uuid()
self._results_callbacks[correlation_id] = {
callback: callback,

@@ -179,13 +141,15 @@ context: null,

}
options.mandatory = true
options.replyTo = self._results_name,
options.replyTo = 'amq.rabbitmq.reply-to'
options.correlationId = correlation_id
self._results_timeouts[correlation_id] = setTimeout(function () {
if (!self._results_callback[correlation_id]) return
delete self._results_callback[correlation_id]
if (!self._results_callbacks[correlation_id]) {
return
}
delete self._results_callbacks[correlation_id]
delete self._results_timeouts[correlation_id]
try {

@@ -206,9 +170,28 @@ callback({

}, options.timeout || 5000)
self._channel.publish('remit', event, new Buffer(JSON.stringify(args || {})), options)
})
}
})
})
}
return correlation_id
Remit.prototype.listen = function listen (event, callback, context, options) {
const self = this
if (!self._service_name) {
throw new Error('Must provide a service name if listening')
}
if (!options) {
options = {}
}
options.queueName = `${event}:emission:${self._service_name}:${++self._listener_count}`
self.res.call(self, event, callback, context, options)
}

@@ -222,12 +205,11 @@

Remit.prototype.emit = function emit (event, args, options) {
const debug = master_debug('remit.emit')
const self = this
if (!options) options = {}
if (!options) {
options = {}
}
options.broadcast = true
options.autoDeleteCallback = options.ttl ? false : true
debug('Emitting ', args)
self.req.call(self, event, args, options.onResponse, options)

@@ -242,13 +224,15 @@ }

Remit.prototype.demit = function demit (event, delay, args, options) {
const debug = master_debug('remit.demit')
const self = this
if (!options) options = {}
if (!options) {
options = {}
}
options.broadcast = true
if (Object.prototype.toString.call(delay) === '[object Date]') options.timestamp = delay.getTime()
options.autoDeleteCallback = options.ttl ? false : true
debug ('Demitting ', args)
if (Object.prototype.toString.call(delay) === '[object Date]') {
options.timestamp = delay.getTime()
}
self.req.call(self, event, args, options.onResponse, options)

@@ -262,11 +246,18 @@ }

Remit.prototype.listen = function listen (event, callback, context, options) {
Remit.prototype.treq = function treq (event, args, callback, options) {
const self = this
if (!self._service_name) throw new Error('Must provide a service name if listening')
if (!options) options = {}
options.queueName = `${event}:emission:${self._service_name}:${++self._listener_count}`
self.res.call(self, event, callback, context, options)
if (!options) {
options = {}
}
if (!options.expiration) {
options.expiration = 5000
}
if (!options.timeout) {
options.timeout = 5000
}
self.req(event, args, callback, options)
}

@@ -280,53 +271,65 @@

Remit.prototype.__connect = function __connect (callback) {
const debug = master_debug('remit.__connect')
debug(`Checking connection`)
const self = this
if (!callback) callback = function () {}
// If no callback was given, we still pretend there
// is one. We use this to signify queue presence.
if (!callback) {
callback = function () {}
}
// If a connection already exists
if (self._connection) {
// If there are still callbacks being processed,
// hop into the queue; no need to trigger it now.
// Be British and get in line!
if (self._connection_callbacks.length) {
self._connection_callbacks.push(callback)
return true
return
}
debug(`Instantly hitting callback`)
return callback(self.connection)
// Otherwise we do need to trigger now. We missed
// the queue. #awkward
return callback()
}
// If we're here, a connection doesn't currently exist.
// Now we check whether we're the first call to do this.
// If we are, we'll be the ones to try and connect.
const first = !self._connection_callbacks.length
// Push our callback in to the queue, eh?
self._connection_callbacks.push(callback)
if (!first) return
debug(`Connection not yet made; attemtping now...`)
amqplib.connect(self._url, (err, con) => {
if (err) throw new Error(err)
debug(`Connected to ${self._url}`)
self._connection = con
self._connection.createChannel((err, channel) => {
if (err) throw new Error(err)
debug(`Channel created`)
if (!first) {
return
}
// So let's connect!
amqplib.connect(self._url).then((connection) => {
// Everything's go fine, so we'll set this global
// object to our new connection.
self._connection = connection
// We now need to make a channel to communicate
// through.
self._connection.createChannel().then((channel) => {
// Everything went awesome. Let's set our new
// global channel.
self._channel = channel
// Time to run the callbacks. Let's grab them and
// take them out of the queue.
// TODO Remove these one at a time as we process
// them.
const callbacks = self._connection_callbacks
self._connection_callbacks = []
debug(`Running ${callbacks.length} callbacks`)
// Loop through and make everything happen!
for (var i = 0; i < callbacks.length; i++) {
callbacks[i](self._connection)
callbacks[i]()
}
})
})
}).then(null, console.error)
}).then(null, console.error)
}

@@ -341,35 +344,57 @@

const self = this
const debug = master_debug('remit.__assert_exchange')
if (!callback) callback = function () {}
// If no callback was given, we still pretend there
// is one. We use this to signify queue presence.
if (!callback) {
callback = function () {}
}
// If the exchange already exists
if (self._exchange) {
// If there are still callbacks being processed,
// hop into the queue; no need to trigger it now.
// Be British and get in line!
if (self._exchange_callbacks.length) {
self._exchange_callbacks.push(callback)
return true
return
}
debug(`Instantly hitting exchange callback`)
// Otherwise we do need to trigger now. We missed
// the queue. #awkward
return callback()
}
// If we're here, an exchange doesn't currently exist.
// Now we check whether we're the first call to do this.
// If we are, we'll be the ones to try and connect.
const first = !self._exchange_callbacks.length
// Push our callback in to the queue, eh?
self._exchange_callbacks.push(callback)
if (!first) {
return
}
// Let's try making this exchange!
self._channel.assertExchange('remit', 'topic', {
autoDelete: true
}, (err, ok) => {
if (err) throw new Error(err)
}).then(() => {
// Everything went awesome so we'll let everything
// know that the exchange is up.
self._exchange = true
// Time to run any callbacks that were waiting on
// this exchange being made.
// TODO Remove these one at a time as we process
// them.
const callbacks = self._exchange_callbacks
self._exchange_callbacks = []
for (var i = 0; i < callbacks.length; i++) {
callbacks[i]()
}
})
}).then(null, console.error)
}

@@ -382,63 +407,14 @@

Remit.prototype.__on_result = function __on_result (message) {
const self = this
const debug = master_debug('remit.__on_result')
if (!self._results_callback[message.properties.correlationId]) {
// If we've got it when we don't need it, we'll at least `nack` it first
return self._channel.nack(message, false, false)
}
const callback = self._results_callback[message.properties.correlationId]
let args = []
debug(`Got result`, JSON.parse(message.content.toString()))
let data = JSON.parse(message.content.toString())
if (!Array.isArray(data)) data = [data]
delete self._results_timeouts[message.properties.correlationId]
try {
callback.callback.apply(callback.context, data)
} catch (e) {
self._channel.nack(message, false, false)
delete self._results_callback[message.properties.correlationId]
if (self.on_error) {
self.on_error(e)
} else {
throw e
}
}
self._channel.ack(message)
delete self._results_callback[message.properties.correlationId]
}
Remit.prototype.__consume_res = function __consume_res (message, callback, context) {
const debug = master_debug('remit.__consume_res')
const self = this
let data
try {
data = JSON.parse(message.content.toString())
} catch (e) {
debug(`Failed to parse JSON data; NACKing`)
return self._channel.nack(message, false, false)
}
debug(`Parsed JSON data`)
if (!message.properties.correlationId || !message.properties.replyTo) {
debug(`No callback found; acting as listener`)
try {

@@ -453,6 +429,7 @@ callback.call(context, data, function (err) {

message.properties.headers = increment_headers(message.properties.headers)
self._channel.publish(message.fields.exchange, message.fields.routingKey, message.content, message.properties)
self._channel.publish('', message.properties.replyTo, message.content, message.properties)
self._channel.ack(message)
}
if (self.on_error) {

@@ -465,27 +442,15 @@ self.on_error(e)

} else {
debug(`Callback found; aiming to respond`)
try {
callback.call(context, data, function (err, data) {
debug(`Received callback from responder`)
const options = {correlationId: message.properties.correlationId}
debug(`Publishing message to ${message.properties.replyTo}`)
self._channel.publish('remit', message.properties.replyTo, new Buffer(JSON.stringify(Array.prototype.slice.call(arguments))), options)
self._channel.publish('', message.properties.replyTo, new Buffer(JSON.stringify(Array.prototype.slice.call(arguments))), options)
self._channel.ack(message)
})
} catch (e) {
debug(`Callback errored`)
if (message.properties.headers && message.properties.headers.attempts && message.properties.headers.attempts > 4) {
debug(`Run out of patience. NACKing...`)
self._channel.nack(message, false, false)
} else {
debug(`Giving the message another chance...`)
message.properties.headers = increment_headers(message.properties.headers)
self._channel.publish(message.fields.exchange, message.fields.routingKey, message.content, message.properties)
self._channel.publish('', message.properties.replyTo, message.content, message.properties)
self._channel.ack(message)

@@ -508,14 +473,48 @@ }

Remit.prototype.__on_result = function __on_result (message) {
const self = this
const callback = self._results_callbacks[message.properties.correlationId]
let data = JSON.parse(message.content.toString())
if (!Array.isArray(data)) data = [data]
delete self._results_timeouts[message.properties.correlationId]
try {
callback.callback.apply(callback.context, data)
} catch (e) {
delete self._results_callbacks[message.properties.correlationId]
if (self.on_error) {
self.on_error(e)
} else {
throw e
}
}
delete self._results_callbacks[message.properties.correlationId]
}
function increment_headers (headers) {
if (!headers) return {attempts: 1}
if (!headers) {
return {
attempts: 1
}
}
if (!headers.attempts) {
headers.attempts = 1
return headers
}
headers.attempts = parseInt(headers.attempts) + 1
return headers
}
{
"name": "remit",
"version": "1.1.7",
"version": "1.2.0",
"description": "A small set of functionality used to create microservices that don't need to be aware of one-another's existence.",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -75,3 +75,3 @@ # What's Remit?

To use `remit` you'll need:
* A _RabbitMQ_ server
* A _RabbitMQ_ server (_Remit_ `1.2.0+` requires `>=3.4.0`)
* _Node v4.x.x_

@@ -417,3 +417,3 @@ * _npm_

* Connection retrying when losing connection to the AMQ
* Use promises instead of callbacks
* ~~Use promises instead of callbacks~~
* Warnings for duplicate `req` subscriptions

@@ -420,0 +420,0 @@ * ~~Better handling of `req` timeouts~~

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc