New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

remit

Package Overview
Dependencies
Maintainers
1
Versions
76
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

remit - npm Package Compare versions

Comparing version 1.4.0 to 1.4.1

406

index.js

@@ -27,3 +27,5 @@ 'use strict'

this._connection = opts.connection || null
this._channel = opts.channel || null
this._consume_channel = null
this._publish_channel = null
this._work_channel = null
this._exchange = opts.exchange || null

@@ -34,2 +36,5 @@

this._exchange_callbacks = []
this._consume_channel_callbacks = []
this._publish_channel_callbacks = []
this._work_channel_callbacks = []

@@ -71,28 +76,32 @@ // Callback trackers

// TODO Check this for a valid response
self._channel.assertQueue(chosen_queue, {
exclusive: false,
durable: true,
autoDelete: false
self.__use_consume_channel(() => {
// TODO Check this for a valid response
self._consume_channel.assertQueue(chosen_queue, {
exclusive: false,
durable: true,
autoDelete: false
})
})
self._channel.bindQueue(chosen_queue, 'remit', event).then(() => {
self._channel.consume(chosen_queue, (message) => {
if (!message.properties.timestamp) {
self.__consume_res(message, callbacks, context)
} else {
const time_to_wait = parseInt(message.properties.timestamp - new Date().getTime())
if (time_to_wait <= 0) {
self.__use_consume_channel(() => {
self._consume_channel.bindQueue(chosen_queue, 'remit', event).then(() => {
self._consume_channel.consume(chosen_queue, (message) => {
if (!message.properties.timestamp) {
self.__consume_res(message, callbacks, context)
} else {
setTimeout(() => {
const time_to_wait = parseInt(message.properties.timestamp - new Date().getTime())
if (time_to_wait <= 0) {
self.__consume_res(message, callbacks, context)
}, time_to_wait)
} else {
setTimeout(() => {
self.__consume_res(message, callbacks, context)
}, time_to_wait)
}
}
}
}, {
exclusive: false
})
}).then(null, console.error)
}, {
exclusive: false
})
}).then(null, console.error)
})
})

@@ -127,3 +136,5 @@ })

if (!callback) {
return self._channel.publish('remit', event, new Buffer(JSON.stringify(args || {})), options)
return self.__use_publish_channel(() => {
self._publish_channel.publish('remit', event, new Buffer(JSON.stringify(args || {})), options)
})
}

@@ -133,9 +144,11 @@

self._consuming_results = true
self._channel.consume('amq.rabbitmq.reply-to', function (message) {
self.__on_result.apply(self, arguments)
}, {
exclusive: true,
noAck: true
}).then(send_message).then(null, console.warn)
self.__use_publish_channel(() => {
self._publish_channel.consume('amq.rabbitmq.reply-to', function (message) {
self.__on_result.apply(self, arguments)
}, {
exclusive: true,
noAck: true
}).then(send_message).then(null, console.warn)
})
} else {

@@ -182,3 +195,5 @@ send_message()

self._channel.publish('remit', event, new Buffer(JSON.stringify(args || {})), options)
self.__use_publish_channel(() => {
self._publish_channel.publish('remit', event, new Buffer(JSON.stringify(args || {})), options)
})
}

@@ -338,24 +353,184 @@ })

self._connection = connection
// Time to run the callbacks. Let's grab them and
// take them out of the queue.
// We now need to make a channel to communicate
// through.
// TODO Remove these one at a time as we process
// them.
const callbacks = self._connection_callbacks
self._connection_callbacks = []
// Loop through and make everything happen!
for (var i = 0; i < callbacks.length; i++) {
callbacks[i]()
}
}).then(null, console.error)
}
Remit.prototype.__use_consume_channel = function __use_consume_channel (callback) {
const self = this
if (!callback) {
callback = function () {}
}
if (self._consume_channel) {
if (self._consume_channel_callbacks.length) {
self._consume_channel_callbacks.push(callback)
return
}
return callback()
}
const first = !self._consume_channel_callbacks.length
self._consume_channel_callbacks.push(callback)
if (!first) {
return
}
self.__connect(() => {
self._connection.createChannel().then((channel) => {
// Everything went awesome. Let's set our new
// global channel.
self._channel = channel
channel.on('error', (err) => {
self._consume_channel = null
self.__use_consume_channel()
})
channel.on('close', () => {
self._consume_channel = null
self.__use_consume_channel()
})
self._consume_channel = channel
const callbacks = self._consume_channel_callbacks
self._consume_channel_callbacks = []
const len = callbacks.length
// Time to run the callbacks. Let's grab them and
// take them out of the queue.
for (var i = 0; i < len; i++) {
callbacks[0]()
callbacks.shift()
}
})
})
}
Remit.prototype.__use_publish_channel = function __use_publish_channel (callback) {
const self = this
if (!callback) {
callback = function () {}
}
if (self._publish_channel) {
if (self._publish_channel_callbacks.length) {
self._publish_channel_callbacks.push(callback)
return
}
return callback()
}
const first = !self._publish_channel_callbacks.length
self._publish_channel_callbacks.push(callback)
if (!first) {
return
}
self.__connect(() => {
self._connection.createChannel().then((channel) => {
channel.on('error', (err) => {
self._publish_channel = null
self.__use_publish_channel()
})
channel.on('close', () => {
self._publish_channel = null
self.__use_publish_channel()
})
self._publish_channel = channel
const callbacks = self._publish_channel_callbacks
self._publish_channel_callbacks = []
const len = callbacks.length
// TODO Remove these one at a time as we process
// them.
const callbacks = self._connection_callbacks
self._connection_callbacks = []
for (var i = 0; i < len; i++) {
callbacks[0]()
callbacks.shift()
}
})
})
}
Remit.prototype.__use_work_channel = function __use_work_channel (callback) {
const self = this
if (!callback) {
callback = function () {}
}
if (self._work_channel) {
if (self._work_channel_callbacks.length) {
self._work_channel_callbacks.push(callback)
return
}
return callback()
}
const first = !self._work_channel_callbacks.length
self._work_channel_callbacks.push(callback)
if (!first) {
return
}
self.__connect(() => {
self._connection.createChannel().then((channel) => {
channel.on('error', (err) => {
self._work_channel = null
self.__use_work_channel()
})
channel.on('close', () => {
self._work_channel = null
self.__use_work_channel()
})
self._work_channel = channel
const callbacks = self._work_channel_callbacks
self._work_channel_callbacks = []
const len = callbacks.length
// Loop through and make everything happen!
for (var i = 0; i < callbacks.length; i++) {
callbacks[i]()
for (var i = 0; i < len; i++) {
callbacks[0]()
callbacks.shift()
}
}).then(null, console.error)
}).then(null, console.error)
})
})
}

@@ -406,21 +581,23 @@

// Let's try making this exchange!
self._channel.assertExchange('remit', 'topic', {
autoDelete: true
}).then(() => {
// Everything went awesome so we'll let everything
// know that the exchange is up.
self._exchange = true
// Time to run any callbacks that were waiting on
// this exchange being made.
// TODO Remove these one at a time as we process
// them.
const callbacks = self._exchange_callbacks
self._exchange_callbacks = []
for (var i = 0; i < callbacks.length; i++) {
callbacks[i]()
}
}).then(null, console.error)
self.__use_work_channel(() => {
self._work_channel.assertExchange('remit', 'topic', {
autoDelete: true
}).then(() => {
// Everything went awesome so we'll let everything
// know that the exchange is up.
self._exchange = true
// Time to run any callbacks that were waiting on
// this exchange being made.
// TODO Remove these one at a time as we process
// them.
const callbacks = self._exchange_callbacks
self._exchange_callbacks = []
for (var i = 0; i < callbacks.length; i++) {
callbacks[i]()
}
}).then(null, console.error)
})
}

@@ -441,3 +618,5 @@

} catch (e) {
return self._channel.nack(message, false, false)
return self.__use_consume_channel(() => {
self._consume_channel.nack(message, false, false)
})
}

@@ -453,3 +632,5 @@

function done (err, data) {
self._channel.ack(message)
self.__use_consume_channel(() => {
self._consume_channel.ack(message)
})
}

@@ -461,8 +642,33 @@

if (message.properties.headers && message.properties.headers.attempts && message.properties.headers.attempts > 4) {
self._channel.nack(message, false, false)
self.__use_consume_channel(() => {
self._consume_channel.nack(message, false, false)
})
} else {
message.properties.headers = increment_headers(message.properties.headers)
self._channel.publish('', message.properties.replyTo, message.content, message.properties)
self._channel.ack(message)
function check_and_republish() {
self.__use_work_channel(() => {
self._work_channel.checkQueue(message.properties.replyTo).then(() => {
self.__use_publish_channel(() => {
self._publish_channel.publish('', message.properties.replyTo, message.content, message.properties)
})
self.__use_consume_channel(() => {
self._consume_channel.ack(message)
})
}).then(null, (err) => {
// If we got a proper queue error then the queue must
// just not be around.
if (err.message.substr(0, 16) === 'Operation failed') {
self.__use_consume_channel(() => {
self._consume_channel.nack(message, false, false)
})
} else {
check_and_republish()
}
})
})
}
check_and_republish()
}

@@ -479,5 +685,29 @@

const options = {correlationId: message.properties.correlationId}
const res_data = new Buffer(JSON.stringify(Array.prototype.slice.call(arguments)))
self._channel.publish('', message.properties.replyTo, new Buffer(JSON.stringify(Array.prototype.slice.call(arguments))), options)
self._channel.ack(message)
function check_and_publish () {
self.__use_work_channel(() => {
self._work_channel.checkQueue(message.properties.replyTo).then(() => {
self.__use_publish_channel(() => {
self._publish_channel.publish('', message.properties.replyTo, res_data, options)
})
self.__use_consume_channel(() => {
self._consume_channel.ack(message)
})
}).then(null, (err) => {
// If we got a proper queue error then the queue must
// just not be around.
if (err.message.substr(0, 16) === 'Operation failed') {
self.__use_consume_channel(() => {
self._consume_channel.nack(message, false, false)
})
} else {
check_and_publish()
}
})
})
}
check_and_publish()
}

@@ -489,7 +719,33 @@

if (message.properties.headers && message.properties.headers.attempts && message.properties.headers.attempts > 4) {
self._channel.nack(message, false, false)
self.__use_consume_channel(() => {
self._consume_channel.nack(message, false, false)
})
} else {
message.properties.headers = increment_headers(message.properties.headers)
self._channel.publish('', message.properties.replyTo, message.content, message.properties)
self._channel.ack(message)
function check_and_republish () {
self.__use_work_channel(() => {
self._work_channel.checkQueue(message.properties.replyTo).then(() => {
self.__use_publish_channel(() => {
self._publish_channel.publish('', message.properties.replyTo, message.content, message.properties)
})
self.__use_consume_channel(() => {
self._consume_channel.ack(message)
})
}).then(null, (err) => {
// If we got a proper queue error then the queue must
// just not be around.
if (err.message.substr(0, 16) === 'Operation failed') {
self.__use_consume_channel(() => {
self._consume_channel.nack(message, false, false)
})
} else {
check_and_republish()
}
})
})
}
check_and_republish()
}

@@ -496,0 +752,0 @@

2

package.json
{
"name": "remit",
"version": "1.4.0",
"version": "1.4.1",
"description": "A small set of functionality used to create microservices that don't need to be aware of one-another's existence.",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -33,5 +33,5 @@ 'use strict'

throw new Error('Dummy')
// throw new Error('Dummy')
return done(null, 'here is the reply')
})

@@ -8,2 +8,10 @@ 'use strict'

// const log = require('remit-bunyan')('test-service', remit)
// setTimeout(function () {
// throw new Error('Balls')
// }, 2000)
console.log('Okay')
remit.req('test.replies', {

@@ -10,0 +18,0 @@ username: 'jack'

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc