New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

remit

Package Overview
Dependencies
Maintainers
3
Versions
76
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

remit - npm Package Compare versions

Comparing version 2.0.0-beta.7 to 2.0.0-beta.8

1

index.js

@@ -14,2 +14,3 @@ const packageJson = require('./package.json')

this._emitter = new EventEmitter()
this._publishPools = {}

@@ -16,0 +17,0 @@ this._options = {

12

lib/assertions/publishChannel.js

@@ -7,11 +7,5 @@ const debug = require('debug')('remit:publishChannel')

if (remit._publishChannel) {
debug('Providing existing publish channel')
return remit._publishChannel
}
debug('Requesting new publish channel')
remit._publishChannel = new Promise((resolve, reject) => {
return new Promise((resolve, reject) => {
connect.apply(remit).then((connection) => {

@@ -24,3 +18,3 @@ debug('Creating new publish channel')

channel.prefetch(48)
channel.prefetch(49)

@@ -32,6 +26,4 @@ return resolve(channel)

})
return remit._publishChannel
}
module.exports = getPublishChannel
const debug = require('debug')('remit:reply')
const getPublishChannel = require('./publishChannel')
function consumeReplies () {
function consumeReplies (type, request) {
const remit = this
if (remit._engaged) {
debug('Providing existing reply consumption')
debug('Requesting reply consumption')
return remit._engaged
}
let channel
debug('Requesting reply consumption')
remit._engaged = new Promise((resolve, reject) => {
return new Promise((resolve, reject) => {
getPublishChannel.apply(remit)

@@ -20,3 +16,9 @@ .then((publishChannel) => {

return publishChannel.consume('amq.rabbitmq.reply-to', function (message) {
channel = publishChannel
if (!type.options.expectReply) {
return
}
return channel.consume('amq.rabbitmq.reply-to', function (message) {
debug('Consumed a reply', message.properties.correlationId)

@@ -29,6 +31,6 @@

})
}).then((ok) => {
}).then(() => {
debug('Consuming new replies')
return resolve()
return resolve(channel)
}).catch((err) => {

@@ -38,6 +40,4 @@ return reject(err)

})
return remit._engaged
}
module.exports = consumeReplies
const debug = require('debug')('remit:callback-handler')
const getPublishChannel = require('./assertions/publishChannel')

@@ -31,4 +30,2 @@ module.exports = function getCallbackHandler (type, response, message, event) {

function ack (message, event, type, responseData, response) {
const remit = this
debug('Acking')

@@ -38,3 +35,3 @@

try {
response._channel.ack(message)
response._consumeChannel.ack(message)
resolve()

@@ -50,4 +47,2 @@ } catch (e) {

function nack (message, requeue, event, type, response) {
const remit = this
debug('Nacking')

@@ -57,3 +52,3 @@

try {
response._channel.nack(message, false, requeue)
response._consumeChannel.nack(message, false, requeue)
resolve()

@@ -82,5 +77,7 @@ } catch (e) {

return getPublishChannel.apply(remit)
}).then((publishChannel) => {
publishChannel.sendToQueue(message.properties.replyTo, new Buffer(responseData), message.properties)
response._publishChannel.sendToQueue(
message.properties.replyTo,
Buffer.from(responseData),
message.properties
)

@@ -87,0 +84,0 @@ if (!shouldAck) {

const debug = require('debug')('remit:request')
const EventEmitter = require('eventemitter3')
const getPublishChannel = require('./assertions/publishChannel')
const consumeReplies = require('./assertions/reply')
const uuid = require('uuid')
const parseEvent = require('./utils/parseEvent')
const getPublishChannel = require('./assertions/reply')

@@ -27,2 +26,5 @@ function RequestType (masterOptions) {

requestType._emitter = new EventEmitter()
requestType._sentCallbacks = []
requestType._dataCallbacks = []
requestType._timeoutCallbacks = []

@@ -40,3 +42,4 @@ requestType.options = {

requestType.sent = function onSent (callback) {
requestType._emitter.on('sent', callback)
// requestType._emitter.on('sent', callback)
requestType._sentCallbacks.push(callback)

@@ -64,6 +67,11 @@ return requestType

const messageId = uuid.v4()
let trace
let expiration
let expirationGroup
let timeout
let event
debug(`${messageId} - Request for ${options.event} created`)
try {

@@ -76,29 +84,29 @@ const re = /^\s{4}.+?(\/[^/].+?)\)/gm

;(type.options.expectReply ? consumeReplies.apply(remit) : Promise.resolve())
.then(() => {
if (isNaN(localOpts.delay) && !localOpts.schedule) {
return getPublishChannel.apply(remit)
}
let demitQueue
if ((!localOpts.delay || isNaN(localOpts.delay)) && (!localOpts.schedule || !(localOpts.schedule instanceof Date) || localOpts.schedule.toString() === 'Invalid Date')) {
throw new Error('Invalid schedule date or delay duration when attempting to send a delayed emission')
}
if (!isNaN(localOpts.delay) || localOpts.schedule) {
if ((!localOpts.delay || isNaN(localOpts.delay)) && (!localOpts.schedule || !(localOpts.schedule instanceof Date) || localOpts.schedule.toString() === 'Invalid Date')) {
throw new Error('Invalid schedule date or delay duration when attempting to send a delayed emission')
}
if (localOpts.schedule) {
expiration = +localOpts.schedule - now
} else {
expiration = localOpts.delay
}
if (localOpts.schedule) {
expirationGroup = +localOpts.schedule
expiration = +localOpts.schedule - now
} else {
expirationGroup = localOpts.delay
expiration = localOpts.delay
}
if (expiration <= 0) {
return getPublishChannel.apply(remit)
}
return new Promise((resolve, reject) => {
if (expiration > 0) {
demitQueue = new Promise((resolve, reject) => {
let workChannel
console.log('acquiring')
remit._workChannelPool.acquire().then((channel) => {
workChannel = channel
return workChannel.assertQueue(`d:${remit._options.exchange}:${options.event}:${expiration}`, {
console.log('got')
return workChannel.assertQueue(`d:${remit._options.exchange}:${options.event}:${expirationGroup}`, {
messageTtl: expiration,

@@ -110,95 +118,116 @@ exclusive: false,

deadLetterRoutingKey: options.event,
expires: expiration + 5000
expires: expiration * 2
})
}).then((ok) => {
}).then(() => {
console.log('asserted')
remit._workChannelPool.release(workChannel)
return getPublishChannel.apply(remit)
}).then((publishChannel) => {
return resolve(publishChannel)
}).catch((err) => {
return remit._publishPools[options.event]
}).then(resolve).catch((err) => {
remit._workChannelPool.destroy(workChannel)
throw err
reject(err)
})
})
}).then((publishChannel) => {
debug('Sending message...')
} else {
demitQueue = remit._publishPools[options.event]
}
} else {
demitQueue = remit._publishPools[options.event]
}
let messageOptions = {
mandatory: true,
messageId: messageId,
appId: remit._options.name,
timestamp: now,
let messageOptions = {
mandatory: true,
messageId: messageId,
appId: remit._options.name,
timestamp: now,
headers: {
trace: trace
}
headers: {
trace: trace
}
}
if (type.options.expectReply) {
messageOptions.correlationId = messageOptions.messageId
messageOptions.replyTo = 'amq.rabbitmq.reply-to'
remit._emitter.once(messageOptions.correlationId, (message) => {
let messageContent
clearTimeout(timeout)
try {
messageContent = JSON.parse(message.content.toString())
} catch (e) {
console.trace('Error processing message')
}
if (type.options.expectReply) {
messageOptions.correlationId = messageOptions.messageId
messageOptions.replyTo = 'amq.rabbitmq.reply-to'
type._emitter.emit(`data-${messageId}`, ...messageContent)
request._emitter.emit(`data-${messageId}`, ...messageContent)
})
remit._emitter.once(messageOptions.correlationId, (message) => {
let messageContent
clearTimeout(timeout)
timeout = setTimeout(() => {
const timeoutOpts = {
code: 'timeout',
message: 'Request timed out after 30000ms'
}
try {
messageContent = JSON.parse(message.content.toString())
} catch (e) {
console.trace('Error processing message')
}
type._emitter.emit(`timeout-${messageId}`, timeoutOpts)
request._emitter.emit(`timeout-${messageId}`, timeoutOpts)
}, 30000)
}
type._emitter.emit('data', ...messageContent)
request._emitter.emit('data', ...messageContent)
})
if (localOpts.priority) {
if (localOpts.priority > 10 || localOpts.priority < 0) {
throw new Error('Invalid priority', localOpts.priority, 'when pushing message')
}
timeout = setTimeout(() => {
const timeoutOpts = {
code: 'timeout',
message: 'Request timed out after 5000ms'
}
messageOptions.priority = localOpts.priority
}
type._emitter.emit('timeout', timeoutOpts)
request._emitter.emit('timeout', timeoutOpts)
}, 30000)
}
if (localOpts.schedule || localOpts.delay) {
if (localOpts.schedule) {
messageOptions.headers.schedule = +localOpts.schedule
} else {
messageOptions.headers.delay = localOpts.delay
}
}
if (localOpts.priority) {
if (localOpts.priority > 10 || localOpts.priority < 0) {
throw new Error('Invalid priority', localOpts.priority, 'when pushing message')
}
event = parseEvent(messageOptions, {
routingKey: options.event
}, data)
messageOptions.priority = localOpts.priority
}
debug(`${messageId} - Getting publish channel and setting up queues`)
if (localOpts.schedule || localOpts.delay) {
if (localOpts.schedule) {
messageOptions.headers.schedule = +localOpts.schedule
} else {
messageOptions.headers.delay = localOpts.delay
}
demitQueue.then((publishChannel) => {
debug(`${messageId} - Pushing message to queue`)
publishChannel.sendToQueue(
`d:${remit._options.exchange}:${options.event}:${expiration}`,
new Buffer(JSON.stringify(data)),
messageOptions
)
if (localOpts.schedule || localOpts.delay) {
if (localOpts.schedule) {
messageOptions.headers.schedule = +localOpts.schedule
} else {
publishChannel.publish(
remit._options.exchange,
options.event,
new Buffer(JSON.stringify(data)),
messageOptions
)
messageOptions.headers.delay = localOpts.delay
}
const sentEvent = parseEvent(messageOptions, {
routingKey: options.event
}, data)
console.log('sending')
type._emitter.emit('sent', sentEvent)
request._emitter.emit('sent', sentEvent)
})
publishChannel.sendToQueue(
`d:${remit._options.exchange}:${options.event}:${expirationGroup}`,
Buffer.from(JSON.stringify(data)),
messageOptions
)
console.log('sent to queue')
} else {
publishChannel.publish(
remit._options.exchange,
options.event,
Buffer.from(JSON.stringify(data)),
messageOptions
)
}
type._emitter.emit(`sent-${event.eventId}`, event)
request._emitter.emit(`sent-${event.eventId}`, event)
})
return new Promise((resolve, reject) => {

@@ -209,11 +238,11 @@ const cleanUp = (err, result) => {

if (type.options.expectReply) {
request._emitter.removeListener('data', cleanUp)
request._emitter.removeListener('timeout', cleanUp)
request._emitter.removeAllListeners(`data-${messageId}`)
request._emitter.removeAllListeners(`timeout-${messageId}`)
} else {
request._emitter.removeListener('sent', cleanUp)
request._emitter.removeAllListeners(`sent-${messageId}`)
}
request._emitter.removeListener('error', cleanUp)
request._emitter.removeAllListeners(`error-${messageId}`)
if (err) return reject(err)
if (err) return reject((err && err.message) || err)
return resolve(result)

@@ -223,9 +252,28 @@ }

if (type.options.expectReply) {
request._emitter.on('data', cleanUp)
request._emitter.on('timeout', cleanUp)
request._emitter.once(`data-${messageId}`, cleanUp)
type._dataCallbacks.forEach((callback) => {
request._emitter.once(`data-${messageId}`, callback)
})
request._dataCallbacks.forEach((callback) => {
request._emitter.once(`data-${messageId}`, callback)
})
request._emitter.once(`timeout-${messageId}`, cleanUp)
type._timeoutCallbacks.forEach((callback) => {
request._emitter.once(`timeout-${messageId}`, callback)
})
request._timeoutCallbacks.forEach((callback) => {
request._emitter.once(`timeout-${messageId}`, callback)
})
} else {
request._emitter.on('sent', a => cleanUp(null, a))
request._emitter.once(`sent-${messageId}`, a => cleanUp(null, a))
type._sentCallbacks.forEach((callback) => {
request._emitter.once(`sent-${messageId}`, callback)
})
request._sentCallbacks.forEach((callback) => {
request._emitter.once(`sent-${messageId}`, callback)
})
}
request._emitter.on('error', cleanUp)
request._emitter.on(`error-${messageId}`, cleanUp)
})

@@ -236,2 +284,10 @@ }

request._options = {}
request._dataCallbacks = []
request._sentCallbacks = []
request._timeoutCallbacks = []
if (!remit._publishPools[options.event]) {
remit._publishPools[options.event] = getPublishChannel.apply(remit, [type, request])
}
request.send = request

@@ -246,3 +302,3 @@

request.data = function onData (callback) {
request._emitter.on('data', callback)
request._dataCallbacks.push(callback)

@@ -253,3 +309,3 @@ return request

request.sent = function onSent (callback) {
request._emitter.on('sent', callback)
request._sentCallbacks.push(callback)

@@ -260,3 +316,3 @@ return request

request.timeout = function onTimeout (callback) {
request._emitter.on('timeout', callback)
request._timeoutCallbacks.push(callback)

@@ -263,0 +319,0 @@ return request

@@ -119,10 +119,15 @@ const debug = require('debug')('remit:response')

}).then((connection) => {
return connection.createChannel()
}).then((consumeChannel) => {
return Promise.all([
connection.createChannel(),
connection.createChannel()
])
}).then(([ consumeChannel, publishChannel ]) => {
debug('Binding event')
consumeChannel.prefetch(48)
response._channel = consumeChannel
publishChannel.prefetch(49)
response._consumeChannel = consumeChannel
response._publishChannel = publishChannel
return response._channel.bindQueue(
return response._consumeChannel.bindQueue(
options.queue,

@@ -135,3 +140,3 @@ remit._options.exchange,

return response._channel.consume(options.queue, (message) => {
return response._consumeChannel.consume(options.queue, (message) => {
if (!message) {

@@ -138,0 +143,0 @@ return console.trace('Consumer cancelled')

{
"name": "remit",
"version": "2.0.0-beta.7",
"version": "2.0.0-beta.8",
"description": "A small set of functionality used to create microservices that don't need to be aware of one-another's existence.",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -12,37 +12,27 @@ # _Remit_

``` js
// Create our Remit connection
const remit = require('remit')({
url: 'localhost',
name: 'a.micro.service'
})
const remit = require('remit')()
// Set up an endpoint with the name 'micro.service.info'
remit
.endpoint('micro.service.info')
;(async function () {
const endpoint = remit
.endpoint('user.profile')
.data((event, callback) => {
const user = getUser({ id: event.data.id })
// When the endpoint is hit, run this function
.data((data, callback) => {
console.log('Endpoint was hit!')
callback(null, user)
})
data.foo = 'bar'
await endpoint.ready()
// Reply with the mutated data
return callback(null, data)
try {
var user = await remit
.request('user.profile')
.send({ id: 123 })
} catch (e) {
console.error('Error getting user', e)
}
// Once the endpoint is ready...
}).ready().then(() => {
// Make a request to the 'micro.service.save' endpoint
// with the data {name: 'a.micro.service'}
return remit
.request('micro.service.save')
.send({name: 'a.micro.service'})
}).then((result) => {
// When the reply comes back, log the response.
console.log('Saved microservice info', result)
}).catch((err) => {
// If the request failed (the replying microservice returned
// an error, the request timed out or couldn't be routed),
// log the error.
console.error('Couldn\'t seem to save microservice info', err)
})
if (user) {
console.log('Got user', user)
}
})()
```

@@ -109,5 +99,3 @@

* [#request / #req](#remitrequestendpoint-options--request-)
* [#persistentRequest / #preq](#templates)
* [#emit](#templates)
* [#delayedEmit / #demit](#templates)
* [Response](#responsedata-)

@@ -125,14 +113,6 @@ * [#respond / #res / #endpoint](#remitrespondendpoint-options--response-)

* [`request`](#remitrequestendpoint-options--request-)
* [`req`](#remitrequestendpoint-options--request-)
* [`persistentRequest`](#templates)
* [`preq`](#templates)
* [`request` (`req`)](#remitrequestendpoint-options--request-)
* [`emit`](#templates)
* [`delayedEmit`](#templates)
* [`demit`](#templates)
* [`respond`](#remitrespondendpoint-options--response-)
* [`res`](#remitrespondendpoint-options--response-)
* [`endpoint`](#remitrespondendpoint-options--response-)
* [`listen`](#templates-1)
* [`on`](#templates-1)
* [`respond` (`res`, `endpoint`)](#remitrespondendpoint-options--response-)
* [`listen` (`on`)](#templates-1)

@@ -199,25 +179,2 @@ ------

##### Templates
Some basic options templates are also set up as separate functions to allow for some semantically-pleasing set-ups without having to skim through objects to figure out what's happening.
``` js
// remit.persistentRequest
// remit.preq
{
"some": "thing"
}
// remit.emit
{
"some": "thing"
}
// remit.delayedEmit
// remit.demit
{
"some": "thing"
}
```
##### AMQ behaviour

@@ -224,0 +181,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc