New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

remit

Package Overview
Dependencies
Maintainers
4
Versions
76
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

remit - npm Package Compare versions

Comparing version 2.3.3 to 2.4.0

50

lib/Emitter.js

@@ -1,2 +0,2 @@

const bubble = require('../utils/bubble')
const opentracing = require('opentracing')
const CallableInstance = require('callable-instance')

@@ -7,2 +7,3 @@ const EventEmitter = require('eventemitter3')

const getStackLine = require('../utils/getStackLine')
const ms = require('ms')

@@ -56,7 +57,7 @@ class Emitter extends CallableInstance {

if (!bubble.active) args[2] = true
if (!this._remit._namespace.active) args[2] = true
return bubble.active
return this._remit._namespace.active
? this._send(...args)
: bubble.runAndReturn(this._send.bind(this, ...args))
: this._remit._namespace.runAndReturn(this._send.bind(this, ...args))
}

@@ -72,7 +73,2 @@

const originId = bubble.get('originId') || messageId
const bubbleId = bubble.get('bubbleId') || null
bubble.set('originId', originId)
bubble.set('bubbleId', bubbleId)
const message = {

@@ -85,6 +81,5 @@ mandatory: false,

trace: getStackLine.parse(callsites),
originId: originId,
bubbleId: bubbleId,
fromBubbleId: bubbleId
}
context: {}
},
persistent: true
}

@@ -106,3 +101,3 @@

if (typeof parsedData === 'undefined') {
console.warn('[WARN] Remit request sent with unparsable JSON; this could be a function or an undefined variable. Data instead set to NULL.')
console.warn('[WARN] Remit emit sent with unparsable JSON; this could be a function or an undefined variable. Data instead set to NULL.')

@@ -113,2 +108,18 @@ // string here coerces to actual NULL once JSON.parse is performed

const parentContext = this._remit._namespace.get('context')
const span = this._remit._tracer.startSpan(`Remit Emit: ${parsedOptions.event}`, {
tags: {
'remit.version': this._remit.version,
[opentracing.Tags.SAMPLING_PRIORITY]: 1,
[opentracing.Tags.COMPONENT]: 'remit',
[opentracing.Tags.MESSAGE_BUS_DESTINATION]: parsedOptions.event,
[opentracing.Tags.SPAN_KIND]: opentracing.Tags.SPAN_KIND_MESSAGING_PRODUCER,
'data.outgoing': data
},
childOf: parentContext
})
this._remit._tracer.inject(span.context(), opentracing.FORMAT_TEXT_MAP, message.headers.context)
const demitQueue = await this._setupDemitQueue(parsedOptions, now)

@@ -143,2 +154,3 @@ const worker = await this._remit._workers.acquire()

this._remit._workers.release(worker)
span.finish()

@@ -166,3 +178,11 @@ // We do this to make room for multiple emits.

_generateOptions (opts = {}) {
return Object.assign({}, this._options || {}, opts)
const parsedOpts = {}
if (opts.hasOwnProperty('delay')) {
parsedOpts.delay = (typeof opts.delay === 'string')
? ms(opts.delay)
: opts.delay
}
return Object.assign({}, this._options || {}, opts, parsedOpts)
}

@@ -169,0 +189,0 @@

const EventEmitter = require('eventemitter3')
const bubble = require('../utils/bubble')
const { ulid } = require('ulid')
const opentracing = require('opentracing')
const parseEvent = require('../utils/parseEvent')

@@ -116,3 +115,3 @@ const waterfall = require('../utils/asyncWaterfall')

this._options.queue,
bubble.bind(this._incoming.bind(this)),
this._remit._namespace.bind(this._incoming.bind(this)),
{

@@ -153,9 +152,19 @@ noAck: true,

if (message.properties.headers) {
bubble.set('originId', message.properties.headers.originId)
if (!bubble.get('bubbleId')) bubble.set('bubbleId', ulid())
}
const parentContext = this._remit._tracer.extract(opentracing.FORMAT_TEXT_MAP, (message.properties.headers && message.properties.headers.context) || {}) || null
const span = this._remit._tracer.startSpan(`Remit Endpoint: ${this._options.event}`, {
tags: {
'remit.version': this._remit.version,
[opentracing.Tags.SAMPLING_PRIORITY]: 1,
[opentracing.Tags.COMPONENT]: 'remit',
[opentracing.Tags.MESSAGE_BUS_DESTINATION]: this._options.event,
[opentracing.Tags.SPAN_KIND]: opentracing.Tags.SPAN_KIND_RPC_SERVER,
'data.incoming': data
},
childOf: parentContext
})
this._remit._namespace.set('context', span.context())
const event = parseEvent(message.properties, message.fields, data, {
flowType: 'entry',
isReceiver: true

@@ -174,10 +183,18 @@ })

if (!canReply) {
await resultOp
let finalData = await resultOp
const [ resErr, resData ] = finalData
if (resErr) {
span.setTag(opentracing.Tags.ERROR, true)
span.setTag('data.outgoing', resErr)
} else {
let finalData = await resultOp
span.setTag('data.outgoing', resData)
}
// if a cold pause has been requested, don't process this
if (this._cold) return
span.finish()
// if a cold pause has been requested, don't process this
if (this._cold) return
if (canReply) {
finalData = serializeData(finalData)

@@ -190,8 +207,2 @@

if (message.properties.headers) {
message.properties.headers.originId = message.properties.headers.originId || bubble.get('originId')
message.properties.headers.fromBubbleId = message.properties.headers.bubbleId
message.properties.headers.bubbleId = bubble.get('bubbleId')
}
try {

@@ -198,0 +209,0 @@ await worker.sendToQueue(

@@ -1,3 +0,2 @@

const bubble = require('../utils/bubble')
const { ulid } = require('ulid')
const opentracing = require('opentracing')
const EventEmitter = require('eventemitter3')

@@ -109,2 +108,3 @@ const parseEvent = require('../utils/parseEvent')

this._resuming = new Promise(async (resolve, reject) => {
const shouldSubscribe = Boolean(this._options.subscribe)
let consumeResult

@@ -114,7 +114,7 @@

consumeResult = await this._consumer.consume(
this._options.queue,
bubble.bind(this._incoming.bind(this)),
this._consumerQueueName,
this._remit._namespace.bind(this._incoming.bind(this)),
{
noAck: false,
exclusive: false
noAck: shouldSubscribe,
exclusive: shouldSubscribe
}

@@ -153,9 +153,19 @@ )

if (message.properties.headers) {
bubble.set('originId', message.properties.headers.originId)
if (!bubble.get('bubbleId')) bubble.set('bubbleId', ulid())
}
const parentContext = this._remit._tracer.extract(opentracing.FORMAT_TEXT_MAP, (message.properties.headers && message.properties.headers.context) || {}) || null
const span = this._remit._tracer.startSpan(`Remit Listener: ${this._options.event}`, {
tags: {
'remit.version': this._remit.version,
[opentracing.Tags.SAMPLING_PRIORITY]: 1,
[opentracing.Tags.COMPONENT]: 'remit',
[opentracing.Tags.MESSAGE_BUS_DESTINATION]: this._options.event,
[opentracing.Tags.SPAN_KIND]: opentracing.Tags.SPAN_KIND_MESSAGING_CONSUMER,
'data.incoming': data
},
references: [opentracing.followsFrom(parentContext)]
})
this._remit._namespace.set('context', span.context())
const event = parseEvent(message.properties, message.fields, data, {
flowType: 'entry',
isReceiver: true

@@ -173,2 +183,3 @@ })

await resultOp
span.finish()

@@ -181,13 +192,15 @@ // if a cold pause has been requested, don't process this

async _setup ({ queue, event, prefetch = 48 }) {
async _setup ({ queue, event, prefetch = 48, subscribe }) {
this._starting = true
const shouldSubscribe = Boolean(subscribe)
try {
const worker = await this._remit._workers.acquire()
let ok
try {
await worker.assertQueue(queue, {
exclusive: false,
durable: true,
autoDelete: false,
ok = await worker.assertQueue(shouldSubscribe ? '' : queue, {
exclusive: shouldSubscribe,
durable: !shouldSubscribe,
autoDelete: shouldSubscribe,
maxPriority: 10

@@ -203,2 +216,3 @@ })

this._consumerQueueName = ok.queue
const connection = await this._remit._connection

@@ -216,3 +230,3 @@ this._consumer = await connection.createChannel()

await this._consumer.bindQueue(
queue,
this._consumerQueueName,
this._remit._exchange,

@@ -219,0 +233,0 @@ event

const url = require('url')
const amqplib = require('amqplib')
const { Tracer } = require('opentracing')
const EventEmitter = require('eventemitter3')

@@ -14,2 +15,3 @@ const packageJson = require('../package.json')

const Emitter = require('./Emitter')
const { createNamespace } = require('cls-hooked')

@@ -28,3 +30,3 @@ class Remit {

this._options.exchange = options.exchange || 'remit'
this._options.name = options.name || process.env.REMIT_NAME || ''
this._options.name = options.name || process.env.REMIT_NAME || 'remit'
this._options.url = options.url || process.env.REMIT_URL || 'amqp://localhost'

@@ -37,2 +39,12 @@

this._tracer = options.tracer || new Tracer({
serviceName: this._options.name,
reporter: {
logSpans: true,
flushIntervalMs: 10
}
})
this._namespace = options.namespace || createNamespace('remit')
// TODO make this better

@@ -39,0 +51,0 @@ this._eventCounters = {}

@@ -0,1 +1,2 @@

const opentracing = require('opentracing')
const CallableInstance = require('callable-instance')

@@ -7,3 +8,3 @@ const EventEmitter = require('eventemitter3')

const throwAsException = require('../utils/throwAsException')
const bubble = require('../utils/bubble')
const ms = require('ms')

@@ -71,7 +72,7 @@ class Request extends CallableInstance {

if (!bubble.active) args[2] = true
if (!this._remit._namespace.active) args[2] = true
return bubble.active
return this._remit._namespace.active
? this._send(...args)
: bubble.runAndReturn(this._send.bind(this, ...args))
: this._remit._namespace.runAndReturn(this._send.bind(this, ...args))
}

@@ -89,5 +90,2 @@

const originId = bubble.get('originId') || messageId
const bubbleId = bubble.get('bubbleId') || null
const message = {

@@ -99,6 +97,4 @@ mandatory: false,

headers: {
trace: trace,
originId: originId,
bubbleId: bubbleId,
fromBubbleId: bubbleId
trace,
context: {}
},

@@ -137,2 +133,18 @@ correlationId: messageId,

const parentContext = this._remit._namespace.get('context')
const span = this._remit._tracer.startSpan(`Remit Request: ${parsedOptions.event}`, {
tags: {
'remit.version': this._remit.version,
[opentracing.Tags.SAMPLING_PRIORITY]: 1,
[opentracing.Tags.COMPONENT]: 'remit',
[opentracing.Tags.MESSAGE_BUS_DESTINATION]: parsedOptions.event,
[opentracing.Tags.SPAN_KIND]: opentracing.Tags.SPAN_KIND_RPC_CLIENT,
'data.outgoing': eventData
},
childOf: parentContext
})
this._remit._tracer.inject(span.context(), opentracing.FORMAT_TEXT_MAP, message.headers.context)
this._channel.publish(

@@ -147,5 +159,3 @@ this._remit._exchange,

routingKey: parsedOptions.event
}, eventData, {
flowType: 'exit'
})
}, eventData)

@@ -158,7 +168,15 @@ this._emitter.emit('sent', event)

return this._waitForResult(messageId)
return this._waitForResult(messageId, span)
}
_generateOptions (opts = {}) {
return Object.assign({}, this._options || {}, opts)
const parsedOpts = {}
if (opts.hasOwnProperty('timeout')) {
parsedOpts.timeout = (typeof opts.timeout === 'string')
? ms(opts.timeout)
: opts.timeout
}
return Object.assign({}, this._options || {}, opts, parsedOpts)
}

@@ -192,3 +210,3 @@

'amq.rabbitmq.reply-to',
bubble.bind(this._remit._incoming.bind(this._remit)),
this._remit._namespace.bind(this._remit._incoming.bind(this._remit)),
{

@@ -209,3 +227,3 @@ noAck: true,

_waitForResult (messageId) {
_waitForResult (messageId, span) {
const types = ['data', 'timeout']

@@ -230,13 +248,16 @@

}
span.setTag(opentracing.Tags.ERROR, true)
span.setTag('data.incoming', err)
} else {
resolve(result)
this._emitter.emit('success', result, message)
span.setTag('data.incoming', result)
}
span.finish()
this._emitter.emit(
'data',
parseEvent(message.properties, message.fields, err || result, {
switchBubbles: true,
isReceiver: true
})
parseEvent(message.properties, message.fields, err || result)
)

@@ -243,0 +264,0 @@ }

{
"name": "remit",
"version": "2.3.3",
"version": "2.4.0",
"description": "A small set of functionality used to create microservices that don't need to be aware of one-another's existence.",

@@ -14,3 +14,3 @@ "main": "index.js",

},
"author": "Jack Williams <jack@wildfire.gg>",
"author": "Jack Williams <jpwilliamsphotography@gmail.com>",
"license": "MIT",

@@ -24,2 +24,4 @@ "dependencies": {

"generic-pool": "^3.1.7",
"ms": "^2.1.1",
"opentracing": "^0.14.3",
"serialize-error": "^2.1.0",

@@ -26,0 +28,0 @@ "ulid": "^2.2.0"

# remit
[![Build Status](https://travis-ci.org/jpwilliams/remit.svg?branch=master)](https://travis-ci.org/jpwilliams/remit) [![Coverage Status](https://coveralls.io/repos/github/jpwilliams/remit/badge.svg?branch=master)](https://coveralls.io/github/jpwilliams/remit?branch=v2) [![npm downloads per month](https://img.shields.io/npm/dm/remit.svg)](https://www.npmjs.com/package/remit) [![npm version](https://img.shields.io/npm/v/remit.svg)](https://www.npmjs.com/package/remit)
[![Build Status](https://travis-ci.org/jpwilliams/remit.svg?branch=master)](https://travis-ci.org/jpwilliams/remit) [![Coverage Status](https://coveralls.io/repos/github/jpwilliams/remit/badge.svg?branch=master)](https://coveralls.io/github/jpwilliams/remit?branch=master) [![npm downloads per month](https://img.shields.io/npm/dm/remit.svg)](https://www.npmjs.com/package/remit) [![npm version](https://img.shields.io/npm/v/remit.svg)](https://www.npmjs.com/package/remit) [![OpenTracing Badge](https://img.shields.io/badge/OpenTracing-enabled-blue.svg)](http://opentracing.io) [![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fjpwilliams%2Fremit.svg?type=shield)](https://app.fossa.io/projects/git%2Bgithub.com%2Fjpwilliams%2Fremit?ref=badge_shield)

@@ -490,1 +490,5 @@ A wrapper for RabbitMQ for communication between microservices. No service discovery needed.

See [`remitrace`](https://github.com/jpwilliams/remitrace).
## License
[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fjpwilliams%2Fremit.svg?type=large)](https://app.fossa.io/projects/git%2Bgithub.com%2Fjpwilliams%2Fremit?ref=badge_large)

@@ -76,2 +76,10 @@ /* global describe, it, before, expect */

it('should parse timestrings in a delay option', function () {
const emit = emitRemit.emit('options-timestring-test')
emit.options({delay: '30m'})
expect(emit._options).to.have.property('delay', 1800000)
emit.options({delay: '2s'})
expect(emit._options).to.have.property('delay', 2000)
})
it('should return promise on send that resolves on sent')

@@ -78,0 +86,0 @@ it('should emit "sent" on sending')

@@ -85,2 +85,10 @@ /* global describe, it, before, expect */

it('should parse timestrings in a timeout option', function () {
const request = remit.request('options-timestring-test')
request.options({timeout: '30m'})
expect(request._options).to.have.property('timeout', 1800000)
request.options({timeout: '2s'})
expect(request._options).to.have.property('timeout', 2000)
})
it('should return \'ready\' promise when request is ready to be used', function () {

@@ -87,0 +95,0 @@ this.slow(200)

@@ -1,4 +0,1 @@

const { ulid } = require('ulid')
const bubble = require('../utils/bubble')
function parseEvent (properties = {}, fields = {}, data, opts = {}) {

@@ -9,12 +6,5 @@ const event = {

resource: properties.appId,
data: data,
metadata: {
instanceId: ulid()
}
data: data
}
if (opts.flowType) {
event.metadata.flowType = opts.flowType
}
if (opts.isReceiver) {

@@ -25,10 +15,4 @@ event.started = new Date()

if (properties.headers) {
event.metadata.originId = properties.headers.originId
if (opts.switchBubbles) {
event.metadata.bubbleId = properties.headers.fromBubbleId
event.metadata.fromBubbleId = properties.headers.bubbleId
} else {
event.metadata.fromBubbleId = properties.headers.fromBubbleId
event.metadata.bubbleId = bubble.get('bubbleId') || null
if (properties.headers.context) {
event.context = properties.headers.context
}

@@ -35,0 +19,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc