New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

remit

Package Overview
Dependencies
Maintainers
3
Versions
76
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

remit - npm Package Compare versions

Comparing version 2.0.0-beta.2 to 2.0.0-beta.3

grafana.js

34

emit.js
const remit = require('./')()
;(function loop () {
remit.treq('jtd.test', {}, loop)
// remit.request.timeout(() => {console.log('global timeout')})
// remit.request.sent(() => {console.log('global sent')})
// remit.request.data(() => {console.log('global data')})
remit
.respond('one.two.three')
.data((event, callback) => {
return callback(null, {bar: 'baz'})
})
.ready(() => {
console.log('ready')
})
// remit.respond('foo')
// remit.respond('bar')
// remit.respond('baz')
// remit.respond('qux')
const foo = remit
.request('one.two.three')
// .data(() => {
// console.log('data')
// })
// .sent(() => {
// console.log('sent')
// })
// .timeout(() => {
// console.log('timeout')
// })
;(function bar () {
foo.send({foo: 'bar'}).then(bar)
})()

@@ -0,1 +1,2 @@

const packageJson = require('./package.json')
const EventEmitter = require('eventemitter3')

@@ -6,2 +7,3 @@ const Request = require('./lib/Request')

const connect = require('./lib/assertions/connection')
const bootWorkChannelPool = require('./lib/assertions/bootWorkChannelPool')

@@ -11,2 +13,3 @@ function Remit (options) {

this.version = packageJson.version
this._emitter = new EventEmitter()

@@ -70,2 +73,3 @@

connect.apply(this, [this._options])
this._workChannelPool = bootWorkChannelPool.apply(this)

@@ -72,0 +76,0 @@ return this

2

lib/assertions/publishChannel.js

@@ -23,3 +23,3 @@ const debug = require('debug')('remit:publishChannel')

channel.prefetch(128)
channel.prefetch(48)

@@ -26,0 +26,0 @@ return resolve(channel)

const debug = require('debug')('remit:callback-handler')
const getConsumeChannel = require('./assertions/consumeChannel')
const getWorkChannel = require('./assertions/workChannel')
// const getConsumeChannel = require('./assertions/consumeChannel')
// const getWorkChannel = require('./assertions/workChannel')
const getPublishChannel = require('./assertions/publishChannel')
module.exports = function getCallbackHandler (type, message) {
module.exports = function getCallbackHandler (type, response, message, event) {
const remit = this

@@ -12,3 +12,4 @@

const responseData = new Buffer(JSON.stringify(Array.from(arguments).slice(0, 2)))
event.finished = new Date()
const responseData = JSON.stringify(Array.from(arguments).slice(0, 2))
const shouldAck = !!type.options.shouldAck

@@ -18,3 +19,3 @@ const shouldReply = !!message.properties.replyTo

if (shouldReply) {
return reply.apply(remit, [message, responseData, shouldAck])
return reply.apply(remit, [message, responseData, shouldAck, event, type, response])
}

@@ -25,10 +26,11 @@

if (shouldAck) {
return ack.apply(remit, [message])
return ack.apply(remit, [message, event, type, undefined, response])
}
debug('No need to acknowledge')
type._emitter.emit('done', event)
}
}
function ack (message) {
function ack (message, event, type, responseData, response) {
const remit = this

@@ -39,16 +41,14 @@

return new Promise((resolve, reject) => {
getConsumeChannel.apply(remit)
.then((consumeChannel) => {
try {
consumeChannel.ack(message)
return resolve()
} catch (e) {
return reject(e)
}
})
try {
response._channel.ack(message)
resolve()
} catch (e) {
reject(e)
} finally {
type._emitter.emit('done', event, responseData)
}
})
}
function nack (message, requeue) {
function nack (message, requeue, event, type, response) {
const remit = this

@@ -59,16 +59,14 @@

return new Promise((resolve, reject) => {
getConsumeChannel.apply(remit)
.then((consumeChannel) => {
try {
consumeChannel.nack(message, false, requeue)
return resolve()
} catch (e) {
return reject(e)
}
})
try {
response._channel.nack(message, false, requeue)
resolve()
} catch (e) {
reject(e)
} finally {
type._emitter.emit('done', event)
}
})
}
function reply (message, responseData, shouldAck) {
function reply (message, responseData, shouldAck, event, type, response) {
const remit = this

@@ -78,21 +76,34 @@

getWorkChannel.apply(remit).then((workChannel) => {
let workChannel
remit._workChannelPool.acquire().then((channel) => {
// getWorkChannel.apply(remit).then((workChannel) => {
workChannel = channel
return workChannel.checkQueue(message.properties.replyTo)
}).then((ok) => {
remit._workChannelPool.release(workChannel)
return getPublishChannel.apply(remit)
}).then((publishChannel) => {
publishChannel.sendToQueue(message.properties.replyTo, responseData, message.properties)
publishChannel.sendToQueue(message.properties.replyTo, new Buffer(responseData), message.properties)
if (!shouldAck) {
type._emitter.emit('done', event, responseData)
return
}
return ack.apply(remit, [message])
return ack.apply(remit, [message, event, type, responseData, response])
}).catch((err) => {
remit._workChannelPool.destroy(workChannel)
if (err.message && err.message.substr(0, 16) === 'Operation failed') {
if (!shouldAck) {
type._emitter.emit('done', event)
return
}
return nack.apply(remit, [message, false])
return nack.apply(remit, [message, false, event, type, response])
}

@@ -102,4 +113,4 @@

return reply.apply(remit, [message, responseData, shouldAck])
return reply.apply(remit, [message, responseData, shouldAck, event, type, response])
})
}

@@ -24,4 +24,2 @@ const getCallbackHandler = require('./handleCallback')

response._emitter.once('done', getCallbackHandler.apply(remit, [type, message]))
let event = {

@@ -35,2 +33,6 @@ eventId: message.properties.messageId,

if (message.properties.headers) {
if (message.properties.headers.uuid) {
event.eventId = message.properties.headers.uuid
}
if (message.properties.headers.scheduled) {

@@ -53,2 +55,5 @@ event.scheduled = new Date(message.properties.headers.scheduled)

event.started = new Date()
response._emitter.once('done', getCallbackHandler.apply(remit, [type, response, message, event]))
type._emitter.emit('data', event, callback)

@@ -55,0 +60,0 @@ response._emitter.emit('data', event, callback)

const debug = require('debug')('remit:request')
const EventEmitter = require('eventemitter3')
const getPublishChannel = require('./assertions/publishChannel')
const getWorkChannel = require('./assertions/workChannel')
// const getWorkChannel = require('./assertions/workChannel')
const consumeReplies = require('./assertions/reply')

@@ -44,2 +44,8 @@ const uuid = require('uuid')

requestType.timeout = function onTimeout (callback) {
requestType._emitter.on('timeout', callback)
return requestType
}
return requestType

@@ -58,2 +64,3 @@ }

let expiration
let timeout

@@ -86,4 +93,8 @@ try {

return new Promise((resolve, reject) => {
getWorkChannel.apply(remit).then((workChannel) => {
return workChannel.assertQueue(`demission-${messageId}`, {
let workChannel
remit._workChannelPool.acquire().then((channel) => {
workChannel = channel
return workChannel.assertQueue(`d:${remit._options.exchange}:${options.event}:${expiration}`, {
messageTtl: expiration,

@@ -99,5 +110,10 @@ exclusive: false,

}).then((ok) => {
remit._workChannelPool.release(workChannel)
return getPublishChannel.apply(remit)
}).then((publishChannel) => {
return resolve(publishChannel)
}).catch((err) => {
remit._workChannelPool.destroy(workChannel)
throw err
})

@@ -125,2 +141,3 @@ })

let messageContent
clearTimeout(timeout)

@@ -136,2 +153,12 @@ try {

})
timeout = setTimeout(() => {
const timeoutOpts = {
code: 'timeout',
message: 'Request timed out after 5000ms'
}
type._emitter.emit('timeout', timeoutOpts)
request._emitter.emit('timeout', timeoutOpts)
}, 30000)
}

@@ -143,3 +170,3 @@

publishChannel.sendToQueue(
`demission-${messageOptions.messageId}`,
`d:${remit._options.exchange}:${options.event}:${expiration}`,
new Buffer(JSON.stringify(data)),

@@ -162,12 +189,14 @@ messageOptions

return new Promise((resolve, reject) => {
request._emitter.on('data', (err, result) => {
if (err) {
return reject(err)
}
const cleanUp = (err, result) => {
request._emitter.removeListener('data', cleanUp)
request._emitter.removeListener('timeout', cleanUp)
request._emitter.removeListener('error', cleanUp)
if (err) return reject(err)
return resolve(result)
})
}
request._emitter.once('timeout', reject)
request._emitter.once('error', reject)
request._emitter.on('data', cleanUp)
request._emitter.on('timeout', cleanUp)
request._emitter.on('error', cleanUp)
})

@@ -191,2 +220,8 @@ }

request.timeout = function onTimeout (callback) {
request._emitter.on('timeout', callback)
return request
}
return request

@@ -193,0 +228,0 @@ }

const debug = require('debug')('remit:response')
const async = require('async')
const EventEmitter = require('eventemitter3')
const getWorkChannel = require('./assertions/workChannel')
const getConsumeChannel = require('./assertions/consumeChannel')
// const getWorkChannel = require('./assertions/workChannel')
// const getConsumeChannel = require('./assertions/consumeChannel')
const handleMessage = require('./handleMessage')

@@ -52,2 +53,8 @@

responseType.done = function onDone (callback) {
responseType._emitter.on('done', callback)
return responseType
}
return responseType

@@ -76,15 +83,38 @@ }

response.data = function onData (callback) {
response._emitter.on('data', callback)
response.data = function onData (callbacks) {
callbacks = Array.isArray(callbacks) ? callbacks : [callbacks]
const finalCallback = callbacks.pop()
if (!callbacks.length) {
response._emitter.on('data', finalCallback)
} else {
const run = async.seq(...callbacks)
response._emitter.on('data', (event, callback) => {
run(event, (err, event) => {
if (err) {
return callback(err)
}
finalCallback(event, callback)
})
})
}
return response
}
getWorkChannel.apply(remit).then((workChannel) => {
let workChannel
// getWorkChannel.apply(remit).then((workChannel) => {
remit._workChannelPool.acquire().then((channel) => {
debug('Asserting endpoint', options.event)
workChannel = channel
return workChannel.assertQueue(options.queue, {
exclusive: false,
durable: true,
autoDelete: false
autoDelete: false,
maxPriority: 10
})

@@ -94,7 +124,14 @@ }).then((queueData) => {

return getConsumeChannel.apply(remit)
remit._workChannelPool.release(workChannel)
return remit._connection
}).then((connection) => {
return connection.createChannel()
}).then((consumeChannel) => {
debug('Binding event')
return consumeChannel.bindQueue(
consumeChannel.prefetch(48)
response._channel = consumeChannel
return response._channel.bindQueue(
options.queue,

@@ -105,7 +142,5 @@ remit._options.exchange,

}).then(() => {
return getConsumeChannel.apply(remit)
}).then((consumeChannel) => {
debug('Consuming messages')
return consumeChannel.consume(options.queue, (message) => {
return response._channel.consume(options.queue, (message) => {
if (!message) {

@@ -121,6 +156,7 @@ return console.trace('Consumer cancelled')

}).then(() => {
type._emitter.emit('ready')
response._emitter.emit('ready')
type._emitter.emit('ready', options)
response._emitter.emit('ready', options)
}).catch((err) => {
console.trace('Threw error here unexpectedly', err)
remit._workChannelPool.destroy(workChannel)
})

@@ -127,0 +163,0 @@

@@ -24,4 +24,15 @@ const url = require('url')

parsedUrl.query = {
frameMax: '0xf000', // 61,440 (~62KB)
channelMax: '3', // We should never have more than this
// Maximum permissible size of a frame (in bytes)
// to negotiate with clients. Setting to 0 means
// "unlimited" but will trigger a bug in some QPid
// clients. Setting a larger value may improve
// throughput; setting a smaller value may improve
// latency.
// I default it to 0x1000, i.e. 4kb, which is the
// allowed minimum, will fit many purposes, and not
// chug through Node.JS's buffer pooling.
//
// frameMax: '0x20000', // 131,072 (128kb)
//
frameMax: '0x1000', // 4,096 (4kb)
heartbeat: '15' // Frequent hearbeat

@@ -28,0 +39,0 @@ }

{
"name": "remit",
"version": "2.0.0-beta.2",
"version": "2.0.0-beta.3",
"description": "A small set of functionality used to create microservices that don't need to be aware of one-another's existence.",

@@ -18,6 +18,7 @@ "main": "index.js",

"amqplib": "^0.5.1",
"debug": "^2.4.1",
"stack-trace": "0.0.9",
"async": "^2.4.1",
"debug": "^2.4.4",
"eventemitter3": "^1.2.0",
"generic-pool": "^3.1.7",
"stack-trace": "0.0.9",
"uuid": "^3.0.1"

@@ -24,0 +25,0 @@ },

@@ -21,3 +21,5 @@ /* global describe, it, expect, sinon, before */

})
.ready(done)
.ready((options) => {
return done()
})
})

@@ -24,0 +26,0 @@

@@ -16,3 +16,5 @@ /* global describe, it, expect, sinon, remit */

.endpoint('holistic.request.response')
.ready(done)
.ready((options) => {
return done()
})

@@ -19,0 +21,0 @@ expect(endpoint).to.be.an('object')

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc