New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

remit

Package Overview
Dependencies
Maintainers
1
Versions
76
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

remit - npm Package Compare versions

Comparing version 1.0.9 to 1.1.0

487

index.js
'use strict'
const amqp = require('amqp')
const debug = require('debug')('remit')
const master_debug = require('debug')
const uuid = require('node-uuid').v4
const amqplib = require('amqplib/callback_api')
const os = require('os')

@@ -12,2 +12,7 @@

function Remit (opts) {

@@ -21,2 +26,3 @@ if (!opts) opts = {}

this._connection_callbacks = []
this._channel = opts.channel || null
this._emit_on_error = opts.emit_on_error || null

@@ -34,2 +40,4 @@

this._listener_count = 0
this._events = {}

@@ -40,72 +48,113 @@

Remit.prototype.on_error = null
Remit.prototype.res = function res (event, callback, context, options) {
debug('res')
const debug = master_debug('remit.res')
if (this._events[event]) return false
if (!options) options = {}
let self = this
debug(`Creating "${event}" endpoint`)
this.__connect(function () {
self._connection.queue(options.queueName || event, {
exclusive: false,
durable: true,
autoDelete: false
}, function (queue) {
debug(queue.name)
const self = this
self._events[event] = { queue }
self.__connect(() => {
self.__assert_exchange(() => {
const chosen_queue = options.queueName || event
debug(self._events)
self._channel.assertQueue(chosen_queue, {
exclusive: false,
durable: true,
autoDelete: false
})
queue.subscribe({
ack: true,
prefetchCount: 0
}, function (message, headers, deliveryInfo, messageObject) {
let event_info = {
event: messageObject.routingKey,
exchange: messageObject.exchange,
contentType: messageObject.contentType,
size: messageObject.size
}
self._channel.bindQueue(chosen_queue, 'remit', event, {}, (err, ok) => {
if (err) throw new Error(err)
debug(event_info)
self._channel.consume(chosen_queue, (message) => {
debug(`New message`)
if (!messageObject.correlationId || !messageObject.replyTo) {
let data
try {
messageObject.acknowledge()
data = JSON.parse(message.content.toString())
} catch (e) {
debug(`Failed to parse JSON data; NACKing`)
callback(context, message, null, event_info)
} catch (e) {
if (self.on_error) {
self.on_error(e)
} else {
throw new Error(e)
}
return self._channel.nack(message, false, false)
}
} else {
try {
callback.call(context, message, function (err, data) {
let options = {correlationId: messageObject.correlationId}
self._exchange.publish(messageObject.replyTo, Array.prototype.slice.call(arguments), options)
debug(`Parsed JSON data`)
messageObject.acknowledge()
})
} catch (e) {
if (self.on_error) {
self.on_error(e)
} else {
throw new Error(e)
if (!message.properties.correlationId || !message.properties.replyTo) {
debug(`No callback found; acting as listener`)
try {
callback(data)
self._channel.ack(message)
} catch (e) {
if (message.properties.headers && message.properties.headers.attempts && message.properties.headers.attempts > 4) {
self._channel.nack(message, false, false)
} else {
message.properties.headers = increment_headers(message.properties.headers)
self._channel.publish(message.fields.exchange, message.fields.routingKey, message.content, message.properties)
self._channel.ack(message)
}
if (self.on_error) {
self.on_error(e)
} else {
throw new Error(e)
}
}
} else {
debug(`Callback found; aiming to respond`)
try {
callback.call(context, data, function (err, data) {
debug(`Received callback from responder`)
const options = {correlationId: message.properties.correlationId}
debug(`Publishing message to ${message.properties.replyTo}`)
self._channel.publish('remit', message.properties.replyTo, new Buffer(JSON.stringify(Array.prototype.slice.call(arguments))), options)
self._channel.ack(message)
})
} catch (e) {
debug(`Callback errored`)
if (message.properties.headers && message.properties.headers.attempts && message.properties.headers.attempts > 4) {
debug(`Run out of patience. NACKing...`)
self._channel.nack(message, false, false)
} else {
debug(`Giving the message another chance...`)
message.properties.headers = increment_headers(message.properties.headers)
self._channel.publish(message.fields.exchange, message.fields.routingKey, message.content, message.properties)
self._channel.ack(message)
}
if (self.on_error) {
self.on_error(e)
} else {
throw new Error(e)
}
}
}
}
}, {
exclusive: false
})
})
self.__assert_exchange(function () {
queue.bind(self._exchange, event)
})
})

@@ -115,81 +164,91 @@ })

Remit.prototype.req = function req (event, args, callback, options) {
debug('req')
const debug = master_debug('remit.req')
if (!options) options = {}
let self = this
let correlation_id = uuid()
const self = this
const correlation_id = uuid()
this.__connect(function () {
if (!callback) {
self.__assert_exchange(function () {
debug(event)
self.__connect(() => {
self.__assert_exchange(() => {
if (!callback) {
debug(`No callback specified for req; publishing "${event}" message now`)
self._exchange.publish(event, args || {}, {})
return self._channel.publish('remit', event, new Buffer(JSON.stringify(args || {})), {})
}
self._results_name = `${this._service_name}:callback:${os.hostname()}:${process.pid}`
self._channel.assertQueue(self._results_name, {
exclusive: false,
durable: false,
autoDelete: true
})
} else {
self.__assert_exchange(function () {
debug('GOT EXCHANGE')
self.__assert_result_queue(function () {
debug('GOT RESULT QUEUE')
self._channel.bindQueue(self._results_name, 'remit', self._results_name, {}, (err, ok) => {
if (err) throw new Error(err)
self._results_callback[correlation_id] = {
callback,
context: null,
autoDeleteCallback: true
}
if (!self._results_queue) {
self._results_queue = true
options.mandatory = true
options.replyTo = self._results_name
options.correlationId = correlation_id
self._channel.consume(self._results_name, function (message) {
self.__on_result.apply(self, arguments)
})
}
debug('Publishing ' + event + ' with ::', args)
self._results_callback[correlation_id] = {
callback: callback,
context: null,
autoDeleteCallback: true
}
self._results_timeouts[correlation_id] = setTimeout(function () {
if (self._results_callback[correlation_id]) {
debug('Timing out')
options.mandatory = true
options.replyTo = self._results_name,
options.correlationId = correlation_id
delete self._results_callback[correlation_id]
delete self._results_timeouts[correlation_id]
self._results_timeouts[correlation_id] = setTimeout(function () {
if (!self._results_callback[correlation_id]) return
try {
callback({
event,
args,
options,
message: 'Timed out after no response for ' + (options.timeout || 5000) + 'ms'
})
} catch (e) {
if (self.on_error) {
self.on_error(e)
} else {
throw new Error(e)
}
}
delete self._results_callback[correlation_id]
delete self._results_timeouts[correlation_id]
try {
callback({
event: event,
args: args,
options: options,
message: `Timed out after no response for ${options.timeout || 5000}ms`
})
} catch (e) {
if (self.on_error) {
self.on_error(e)
} else {
throw new Error(err)
}
}, options.timeout || 5000)
}
}, options.timeout || 5000)
self._exchange.publish(event, args, options, function (err) {
if (err) {
delete self._results_callback[correlation_id]
callback(err)
}
})
})
self._channel.publish('remit', event, new Buffer(JSON.stringify(args || {})), options)
})
}
})
})
debug('Returning ' + correlation_id)
return correlation_id
}
Remit.prototype.emit = function emit (event, args, options) {
debug('emit')
const debug = master_debug('remit.emit')
let self = this
const self = this
if (!options) options = {}

@@ -200,104 +259,97 @@

let correlation_id = this.req.call(self, event, args, options.onResponse, options.context, options)
debug('Emitting ', args)
self.req.call(self, event, args, options.onResponse, options.context, options)
}
Remit.prototype.listen = function listen (event, callback, context, options) {
if (!this._service_name) {
console.log('MUST PROVIDE A SERVICE NAME IF LISTENING')
const self = this
process.exit(2)
}
if (!self._service_name) throw new Error('Must provide a service name if listening')
if (!options) options = {}
options.queueName = `emission:${this._service_name}:${event}`
options.queueName = `${event}:emission:${self._service_name}:${++self._listener_count}`
debug('NAME INC')
debug(options.queueName)
return this.res.call(this, event, callback, context, options)
self.res.call(self, event, callback, context, options)
}
Remit.prototype.__connect = function __connect (callback) {
debug('__connect')
const debug = master_debug('remit.__connect')
debug(`Checking connection`)
const self = this
if (!callback) callback = function () {}
if (self._connection) {
if (self._connection_callbacks.length) {
self._connection_callbacks.push(callback)
if (this._connection) {
if (this._connection_callbacks.length) {
this._connection_callbacks.push(callback)
return true
}
return callback(this._connection)
} else {
this._connection = null
debug(`Instantly hitting callback`)
return callback(self.connection)
}
let self = this
const first = !self._connection_callbacks.length
self._connection_callbacks.push(callback)
this._connection_callbacks.push(callback)
if (!first) return
this._connection = amqp.createConnection({
url: this._url
})
debug(`Connection not yet made; attemtping now...`)
this._connection.on('ready', function () {
debug('Connected to AMQP')
amqplib.connect(self._url, (err, con) => {
if (err) throw new Error(err)
let callbacks = self._connection_callbacks
self._connection_callbacks = []
debug(`Connected to ${self._url}`)
for (var i = 0; i < callbacks.length; i++) {
callbacks[i](self._connection)
}
})
self._connection = con
this._connection.on('error', function (err) {
throw new Error(err)
})
}
self._connection.createChannel((err, channel) => {
if (err) throw new Error(err)
Remit.prototype.__assert_exchange = function __assert_exchange (callback) {
debug('__assert_exchange')
debug(`Channel created`)
if (!callback) callback = function () {}
self._channel = channel
if (this._exchange) {
if (this._exchange_callbacks.length) {
this._exchange_callbacks.push(callback)
const callbacks = self._connection_callbacks
self._connection_callbacks = []
return true
}
debug(`Running ${callbacks.length} callbacks`)
return callback(this._exchange)
}
for (var i = 0; i < callbacks.length; i++) {
callbacks[i](self._connection)
}
})
})
}
let self = this
this._exchange_callbacks.push(callback)
this._exchange = this._connection.exchange(this._exchange_name, {
autoDelete: true
}, function (exchange) {
debug('Exchange ' + self._exchange_name + ' open')
let callbacks = self._exchange_callbacks
self._exchange_callbacks = []
for (var i = 0; i < callbacks.length; i++) {
callbacks[i](self._exchange)
}
})
}
Remit.prototype.__assert_result_queue = function __assert_result_queue (callback) {
debug('__assert_result_queue')
Remit.prototype.__assert_exchange = function __assert_exchange (callback) {
const self = this
const debug = master_debug('remit.__assert_exchange')
if (!callback) callback = function () {}
if (this._results_queue) {
if (this._results_callbacks.length) {
this._results_callbacks.push(callback)
if (self._exchange) {
if (self._exchange_callbacks.length) {
self._exchange_callbacks.push(callback)

@@ -307,59 +359,52 @@ return true

return callback(this._results_queue)
debug(`Instantly hitting exchange callback`)
return callback()
}
let self = this
self._exchange_callbacks.push(callback)
this._results_name = this.__generate_queue_name('callback')
this._results_callbacks.push(callback)
self._channel.assertExchange('remit', 'topic', {
autoDelete: true
}, (err, ok) => {
if (err) throw new Error(err)
debug(this._results_name)
self._exchange = true
this.__assert_exchange(function () {
self._results_queue = self._connection.queue(self._results_name, {
exclusive: false,
durable: false,
autoDelete: true
}, function (queue) {
debug('Callback queue is open')
const callbacks = self._exchange_callbacks
self._exchange_callbacks = []
queue.subscribe(function () {
self.__on_result.apply(self, arguments)
})
for (var i = 0; i < callbacks.length; i++) {
callbacks[i]()
}
})
}
queue.bind(self._exchange, self._results_name)
let callbacks = self._results_callbacks
self._results_callbacks = []
for (var i = 0; i < callbacks.length; i++) {
callbacks[i](queue)
}
})
})
}
Remit.prototype.__on_result = function __on_result (msg, headers, info) {
debug('__on_result')
if (!this._results_callback[info.correlationId]) return
let self = this
Remit.prototype.__on_result = function __on_result (message) {
const self = this
const debug = master_debug('remit.__on_result')
let callback = this._results_callback[info.correlationId]
if (!self._results_callback[message.properties.correlationId]) return
const callback = self._results_callback[message.properties.correlationId]
let args = []
if (Array.isArray(msg)) {
for (var i = 0; i < msg.length; i++) {
args.push(msg[i])
}
} else {
args.push(msg)
}
debug(`Got result`, JSON.parse(message.content.toString()))
delete this._results_timeouts[info.correlationId]
let data = JSON.parse(message.content.toString())
if (!Array.isArray(data)) data = [data]
delete self._results_timeouts[message.properties.correlationId]
try {
callback.callback.apply(callback.context, args)
callback.callback.apply(callback.context, data)
} catch (e) {
self._channel.nack(message, false, false)
delete self._results_callback[message.properties.correlationId]
if (self.on_error) {

@@ -372,13 +417,23 @@ self.on_error(e)

// if (callback.autoDeleteCallback !== false) delete this._results_callback[info.correlationId]
delete this._results_callback[info.correlationId]
self._channel.ack(message)
delete self._results_callback[message.properties.correlationId]
}
if (this._emit_on_error && args[0]) this.emit(this._emit_on_error, args[0])
debug(this._results_callback)
debug(this._results_timeouts)
}
Remit.prototype.__generate_queue_name = function (type) {
return `${this._service_name}:${os.hostname()}:pid${process.pid}:${type}`
function increment_headers (headers) {
if (!headers) return {attempts: 1}
if (!headers.attempts) {
headers.attempts = 1
return headers
}
headers.attempts = parseInt(headers.attempts) + 1
return headers
}
{
"name": "remit",
"version": "1.0.9",
"version": "1.1.0",
"description": "`remit` is intended to be a small set of functionality used to create simple microservices that don't need to be aware of one-another's existence.",

@@ -12,6 +12,6 @@ "main": "index.js",

"dependencies": {
"amqp": "^0.2.4",
"amqplib": "^0.4.0",
"debug": "^2.2.0",
"node-uuid": "^1.4.3"
"node-uuid": "^1.4.7"
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc