Comparing version 2.3.3 to 2.4.0
@@ -1,2 +0,2 @@ | ||
const bubble = require('../utils/bubble') | ||
const opentracing = require('opentracing') | ||
const CallableInstance = require('callable-instance') | ||
@@ -7,2 +7,3 @@ const EventEmitter = require('eventemitter3') | ||
const getStackLine = require('../utils/getStackLine') | ||
const ms = require('ms') | ||
@@ -56,7 +57,7 @@ class Emitter extends CallableInstance { | ||
if (!bubble.active) args[2] = true | ||
if (!this._remit._namespace.active) args[2] = true | ||
return bubble.active | ||
return this._remit._namespace.active | ||
? this._send(...args) | ||
: bubble.runAndReturn(this._send.bind(this, ...args)) | ||
: this._remit._namespace.runAndReturn(this._send.bind(this, ...args)) | ||
} | ||
@@ -72,7 +73,2 @@ | ||
const originId = bubble.get('originId') || messageId | ||
const bubbleId = bubble.get('bubbleId') || null | ||
bubble.set('originId', originId) | ||
bubble.set('bubbleId', bubbleId) | ||
const message = { | ||
@@ -85,6 +81,5 @@ mandatory: false, | ||
trace: getStackLine.parse(callsites), | ||
originId: originId, | ||
bubbleId: bubbleId, | ||
fromBubbleId: bubbleId | ||
} | ||
context: {} | ||
}, | ||
persistent: true | ||
} | ||
@@ -106,3 +101,3 @@ | ||
if (typeof parsedData === 'undefined') { | ||
console.warn('[WARN] Remit request sent with unparsable JSON; this could be a function or an undefined variable. Data instead set to NULL.') | ||
console.warn('[WARN] Remit emit sent with unparsable JSON; this could be a function or an undefined variable. Data instead set to NULL.') | ||
@@ -113,2 +108,18 @@ // string here coerces to actual NULL once JSON.parse is performed | ||
const parentContext = this._remit._namespace.get('context') | ||
const span = this._remit._tracer.startSpan(`Remit Emit: ${parsedOptions.event}`, { | ||
tags: { | ||
'remit.version': this._remit.version, | ||
[opentracing.Tags.SAMPLING_PRIORITY]: 1, | ||
[opentracing.Tags.COMPONENT]: 'remit', | ||
[opentracing.Tags.MESSAGE_BUS_DESTINATION]: parsedOptions.event, | ||
[opentracing.Tags.SPAN_KIND]: opentracing.Tags.SPAN_KIND_MESSAGING_PRODUCER, | ||
'data.outgoing': data | ||
}, | ||
childOf: parentContext | ||
}) | ||
this._remit._tracer.inject(span.context(), opentracing.FORMAT_TEXT_MAP, message.headers.context) | ||
const demitQueue = await this._setupDemitQueue(parsedOptions, now) | ||
@@ -143,2 +154,3 @@ const worker = await this._remit._workers.acquire() | ||
this._remit._workers.release(worker) | ||
span.finish() | ||
@@ -166,3 +178,11 @@ // We do this to make room for multiple emits. | ||
_generateOptions (opts = {}) { | ||
return Object.assign({}, this._options || {}, opts) | ||
const parsedOpts = {} | ||
if (opts.hasOwnProperty('delay')) { | ||
parsedOpts.delay = (typeof opts.delay === 'string') | ||
? ms(opts.delay) | ||
: opts.delay | ||
} | ||
return Object.assign({}, this._options || {}, opts, parsedOpts) | ||
} | ||
@@ -169,0 +189,0 @@ |
const EventEmitter = require('eventemitter3') | ||
const bubble = require('../utils/bubble') | ||
const { ulid } = require('ulid') | ||
const opentracing = require('opentracing') | ||
const parseEvent = require('../utils/parseEvent') | ||
@@ -116,3 +115,3 @@ const waterfall = require('../utils/asyncWaterfall') | ||
this._options.queue, | ||
bubble.bind(this._incoming.bind(this)), | ||
this._remit._namespace.bind(this._incoming.bind(this)), | ||
{ | ||
@@ -153,9 +152,19 @@ noAck: true, | ||
if (message.properties.headers) { | ||
bubble.set('originId', message.properties.headers.originId) | ||
if (!bubble.get('bubbleId')) bubble.set('bubbleId', ulid()) | ||
} | ||
const parentContext = this._remit._tracer.extract(opentracing.FORMAT_TEXT_MAP, (message.properties.headers && message.properties.headers.context) || {}) || null | ||
const span = this._remit._tracer.startSpan(`Remit Endpoint: ${this._options.event}`, { | ||
tags: { | ||
'remit.version': this._remit.version, | ||
[opentracing.Tags.SAMPLING_PRIORITY]: 1, | ||
[opentracing.Tags.COMPONENT]: 'remit', | ||
[opentracing.Tags.MESSAGE_BUS_DESTINATION]: this._options.event, | ||
[opentracing.Tags.SPAN_KIND]: opentracing.Tags.SPAN_KIND_RPC_SERVER, | ||
'data.incoming': data | ||
}, | ||
childOf: parentContext | ||
}) | ||
this._remit._namespace.set('context', span.context()) | ||
const event = parseEvent(message.properties, message.fields, data, { | ||
flowType: 'entry', | ||
isReceiver: true | ||
@@ -174,10 +183,18 @@ }) | ||
if (!canReply) { | ||
await resultOp | ||
let finalData = await resultOp | ||
const [ resErr, resData ] = finalData | ||
if (resErr) { | ||
span.setTag(opentracing.Tags.ERROR, true) | ||
span.setTag('data.outgoing', resErr) | ||
} else { | ||
let finalData = await resultOp | ||
span.setTag('data.outgoing', resData) | ||
} | ||
// if a cold pause has been requested, don't process this | ||
if (this._cold) return | ||
span.finish() | ||
// if a cold pause has been requested, don't process this | ||
if (this._cold) return | ||
if (canReply) { | ||
finalData = serializeData(finalData) | ||
@@ -190,8 +207,2 @@ | ||
if (message.properties.headers) { | ||
message.properties.headers.originId = message.properties.headers.originId || bubble.get('originId') | ||
message.properties.headers.fromBubbleId = message.properties.headers.bubbleId | ||
message.properties.headers.bubbleId = bubble.get('bubbleId') | ||
} | ||
try { | ||
@@ -198,0 +209,0 @@ await worker.sendToQueue( |
@@ -1,3 +0,2 @@ | ||
const bubble = require('../utils/bubble') | ||
const { ulid } = require('ulid') | ||
const opentracing = require('opentracing') | ||
const EventEmitter = require('eventemitter3') | ||
@@ -109,2 +108,3 @@ const parseEvent = require('../utils/parseEvent') | ||
this._resuming = new Promise(async (resolve, reject) => { | ||
const shouldSubscribe = Boolean(this._options.subscribe) | ||
let consumeResult | ||
@@ -114,7 +114,7 @@ | ||
consumeResult = await this._consumer.consume( | ||
this._options.queue, | ||
bubble.bind(this._incoming.bind(this)), | ||
this._consumerQueueName, | ||
this._remit._namespace.bind(this._incoming.bind(this)), | ||
{ | ||
noAck: false, | ||
exclusive: false | ||
noAck: shouldSubscribe, | ||
exclusive: shouldSubscribe | ||
} | ||
@@ -153,9 +153,19 @@ ) | ||
if (message.properties.headers) { | ||
bubble.set('originId', message.properties.headers.originId) | ||
if (!bubble.get('bubbleId')) bubble.set('bubbleId', ulid()) | ||
} | ||
const parentContext = this._remit._tracer.extract(opentracing.FORMAT_TEXT_MAP, (message.properties.headers && message.properties.headers.context) || {}) || null | ||
const span = this._remit._tracer.startSpan(`Remit Listener: ${this._options.event}`, { | ||
tags: { | ||
'remit.version': this._remit.version, | ||
[opentracing.Tags.SAMPLING_PRIORITY]: 1, | ||
[opentracing.Tags.COMPONENT]: 'remit', | ||
[opentracing.Tags.MESSAGE_BUS_DESTINATION]: this._options.event, | ||
[opentracing.Tags.SPAN_KIND]: opentracing.Tags.SPAN_KIND_MESSAGING_CONSUMER, | ||
'data.incoming': data | ||
}, | ||
references: [opentracing.followsFrom(parentContext)] | ||
}) | ||
this._remit._namespace.set('context', span.context()) | ||
const event = parseEvent(message.properties, message.fields, data, { | ||
flowType: 'entry', | ||
isReceiver: true | ||
@@ -173,2 +183,3 @@ }) | ||
await resultOp | ||
span.finish() | ||
@@ -181,13 +192,15 @@ // if a cold pause has been requested, don't process this | ||
async _setup ({ queue, event, prefetch = 48 }) { | ||
async _setup ({ queue, event, prefetch = 48, subscribe }) { | ||
this._starting = true | ||
const shouldSubscribe = Boolean(subscribe) | ||
try { | ||
const worker = await this._remit._workers.acquire() | ||
let ok | ||
try { | ||
await worker.assertQueue(queue, { | ||
exclusive: false, | ||
durable: true, | ||
autoDelete: false, | ||
ok = await worker.assertQueue(shouldSubscribe ? '' : queue, { | ||
exclusive: shouldSubscribe, | ||
durable: !shouldSubscribe, | ||
autoDelete: shouldSubscribe, | ||
maxPriority: 10 | ||
@@ -203,2 +216,3 @@ }) | ||
this._consumerQueueName = ok.queue | ||
const connection = await this._remit._connection | ||
@@ -216,3 +230,3 @@ this._consumer = await connection.createChannel() | ||
await this._consumer.bindQueue( | ||
queue, | ||
this._consumerQueueName, | ||
this._remit._exchange, | ||
@@ -219,0 +233,0 @@ event |
const url = require('url') | ||
const amqplib = require('amqplib') | ||
const { Tracer } = require('opentracing') | ||
const EventEmitter = require('eventemitter3') | ||
@@ -14,2 +15,3 @@ const packageJson = require('../package.json') | ||
const Emitter = require('./Emitter') | ||
const { createNamespace } = require('cls-hooked') | ||
@@ -28,3 +30,3 @@ class Remit { | ||
this._options.exchange = options.exchange || 'remit' | ||
this._options.name = options.name || process.env.REMIT_NAME || '' | ||
this._options.name = options.name || process.env.REMIT_NAME || 'remit' | ||
this._options.url = options.url || process.env.REMIT_URL || 'amqp://localhost' | ||
@@ -37,2 +39,12 @@ | ||
this._tracer = options.tracer || new Tracer({ | ||
serviceName: this._options.name, | ||
reporter: { | ||
logSpans: true, | ||
flushIntervalMs: 10 | ||
} | ||
}) | ||
this._namespace = options.namespace || createNamespace('remit') | ||
// TODO make this better | ||
@@ -39,0 +51,0 @@ this._eventCounters = {} |
@@ -0,1 +1,2 @@ | ||
const opentracing = require('opentracing') | ||
const CallableInstance = require('callable-instance') | ||
@@ -7,3 +8,3 @@ const EventEmitter = require('eventemitter3') | ||
const throwAsException = require('../utils/throwAsException') | ||
const bubble = require('../utils/bubble') | ||
const ms = require('ms') | ||
@@ -71,7 +72,7 @@ class Request extends CallableInstance { | ||
if (!bubble.active) args[2] = true | ||
if (!this._remit._namespace.active) args[2] = true | ||
return bubble.active | ||
return this._remit._namespace.active | ||
? this._send(...args) | ||
: bubble.runAndReturn(this._send.bind(this, ...args)) | ||
: this._remit._namespace.runAndReturn(this._send.bind(this, ...args)) | ||
} | ||
@@ -89,5 +90,2 @@ | ||
const originId = bubble.get('originId') || messageId | ||
const bubbleId = bubble.get('bubbleId') || null | ||
const message = { | ||
@@ -99,6 +97,4 @@ mandatory: false, | ||
headers: { | ||
trace: trace, | ||
originId: originId, | ||
bubbleId: bubbleId, | ||
fromBubbleId: bubbleId | ||
trace, | ||
context: {} | ||
}, | ||
@@ -137,2 +133,18 @@ correlationId: messageId, | ||
const parentContext = this._remit._namespace.get('context') | ||
const span = this._remit._tracer.startSpan(`Remit Request: ${parsedOptions.event}`, { | ||
tags: { | ||
'remit.version': this._remit.version, | ||
[opentracing.Tags.SAMPLING_PRIORITY]: 1, | ||
[opentracing.Tags.COMPONENT]: 'remit', | ||
[opentracing.Tags.MESSAGE_BUS_DESTINATION]: parsedOptions.event, | ||
[opentracing.Tags.SPAN_KIND]: opentracing.Tags.SPAN_KIND_RPC_CLIENT, | ||
'data.outgoing': eventData | ||
}, | ||
childOf: parentContext | ||
}) | ||
this._remit._tracer.inject(span.context(), opentracing.FORMAT_TEXT_MAP, message.headers.context) | ||
this._channel.publish( | ||
@@ -147,5 +159,3 @@ this._remit._exchange, | ||
routingKey: parsedOptions.event | ||
}, eventData, { | ||
flowType: 'exit' | ||
}) | ||
}, eventData) | ||
@@ -158,7 +168,15 @@ this._emitter.emit('sent', event) | ||
return this._waitForResult(messageId) | ||
return this._waitForResult(messageId, span) | ||
} | ||
_generateOptions (opts = {}) { | ||
return Object.assign({}, this._options || {}, opts) | ||
const parsedOpts = {} | ||
if (opts.hasOwnProperty('timeout')) { | ||
parsedOpts.timeout = (typeof opts.timeout === 'string') | ||
? ms(opts.timeout) | ||
: opts.timeout | ||
} | ||
return Object.assign({}, this._options || {}, opts, parsedOpts) | ||
} | ||
@@ -192,3 +210,3 @@ | ||
'amq.rabbitmq.reply-to', | ||
bubble.bind(this._remit._incoming.bind(this._remit)), | ||
this._remit._namespace.bind(this._remit._incoming.bind(this._remit)), | ||
{ | ||
@@ -209,3 +227,3 @@ noAck: true, | ||
_waitForResult (messageId) { | ||
_waitForResult (messageId, span) { | ||
const types = ['data', 'timeout'] | ||
@@ -230,13 +248,16 @@ | ||
} | ||
span.setTag(opentracing.Tags.ERROR, true) | ||
span.setTag('data.incoming', err) | ||
} else { | ||
resolve(result) | ||
this._emitter.emit('success', result, message) | ||
span.setTag('data.incoming', result) | ||
} | ||
span.finish() | ||
this._emitter.emit( | ||
'data', | ||
parseEvent(message.properties, message.fields, err || result, { | ||
switchBubbles: true, | ||
isReceiver: true | ||
}) | ||
parseEvent(message.properties, message.fields, err || result) | ||
) | ||
@@ -243,0 +264,0 @@ } |
{ | ||
"name": "remit", | ||
"version": "2.3.3", | ||
"version": "2.4.0", | ||
"description": "A small set of functionality used to create microservices that don't need to be aware of one-another's existence.", | ||
@@ -14,3 +14,3 @@ "main": "index.js", | ||
}, | ||
"author": "Jack Williams <jack@wildfire.gg>", | ||
"author": "Jack Williams <jpwilliamsphotography@gmail.com>", | ||
"license": "MIT", | ||
@@ -24,2 +24,4 @@ "dependencies": { | ||
"generic-pool": "^3.1.7", | ||
"ms": "^2.1.1", | ||
"opentracing": "^0.14.3", | ||
"serialize-error": "^2.1.0", | ||
@@ -26,0 +28,0 @@ "ulid": "^2.2.0" |
# remit | ||
[![Build Status](https://travis-ci.org/jpwilliams/remit.svg?branch=master)](https://travis-ci.org/jpwilliams/remit) [![Coverage Status](https://coveralls.io/repos/github/jpwilliams/remit/badge.svg?branch=master)](https://coveralls.io/github/jpwilliams/remit?branch=v2) [![npm downloads per month](https://img.shields.io/npm/dm/remit.svg)](https://www.npmjs.com/package/remit) [![npm version](https://img.shields.io/npm/v/remit.svg)](https://www.npmjs.com/package/remit) | ||
[![Build Status](https://travis-ci.org/jpwilliams/remit.svg?branch=master)](https://travis-ci.org/jpwilliams/remit) [![Coverage Status](https://coveralls.io/repos/github/jpwilliams/remit/badge.svg?branch=master)](https://coveralls.io/github/jpwilliams/remit?branch=master) [![npm downloads per month](https://img.shields.io/npm/dm/remit.svg)](https://www.npmjs.com/package/remit) [![npm version](https://img.shields.io/npm/v/remit.svg)](https://www.npmjs.com/package/remit) [![OpenTracing Badge](https://img.shields.io/badge/OpenTracing-enabled-blue.svg)](http://opentracing.io) [![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fjpwilliams%2Fremit.svg?type=shield)](https://app.fossa.io/projects/git%2Bgithub.com%2Fjpwilliams%2Fremit?ref=badge_shield) | ||
@@ -490,1 +490,5 @@ A wrapper for RabbitMQ for communication between microservices. No service discovery needed. | ||
See [`remitrace`](https://github.com/jpwilliams/remitrace). | ||
## License | ||
[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fjpwilliams%2Fremit.svg?type=large)](https://app.fossa.io/projects/git%2Bgithub.com%2Fjpwilliams%2Fremit?ref=badge_large) |
@@ -76,2 +76,10 @@ /* global describe, it, before, expect */ | ||
it('should parse timestrings in a delay option', function () { | ||
const emit = emitRemit.emit('options-timestring-test') | ||
emit.options({delay: '30m'}) | ||
expect(emit._options).to.have.property('delay', 1800000) | ||
emit.options({delay: '2s'}) | ||
expect(emit._options).to.have.property('delay', 2000) | ||
}) | ||
it('should return promise on send that resolves on sent') | ||
@@ -78,0 +86,0 @@ it('should emit "sent" on sending') |
@@ -85,2 +85,10 @@ /* global describe, it, before, expect */ | ||
it('should parse timestrings in a timeout option', function () { | ||
const request = remit.request('options-timestring-test') | ||
request.options({timeout: '30m'}) | ||
expect(request._options).to.have.property('timeout', 1800000) | ||
request.options({timeout: '2s'}) | ||
expect(request._options).to.have.property('timeout', 2000) | ||
}) | ||
it('should return \'ready\' promise when request is ready to be used', function () { | ||
@@ -87,0 +95,0 @@ this.slow(200) |
@@ -1,4 +0,1 @@ | ||
const { ulid } = require('ulid') | ||
const bubble = require('../utils/bubble') | ||
function parseEvent (properties = {}, fields = {}, data, opts = {}) { | ||
@@ -9,12 +6,5 @@ const event = { | ||
resource: properties.appId, | ||
data: data, | ||
metadata: { | ||
instanceId: ulid() | ||
} | ||
data: data | ||
} | ||
if (opts.flowType) { | ||
event.metadata.flowType = opts.flowType | ||
} | ||
if (opts.isReceiver) { | ||
@@ -25,10 +15,4 @@ event.started = new Date() | ||
if (properties.headers) { | ||
event.metadata.originId = properties.headers.originId | ||
if (opts.switchBubbles) { | ||
event.metadata.bubbleId = properties.headers.fromBubbleId | ||
event.metadata.fromBubbleId = properties.headers.bubbleId | ||
} else { | ||
event.metadata.fromBubbleId = properties.headers.fromBubbleId | ||
event.metadata.bubbleId = bubble.get('bubbleId') || null | ||
if (properties.headers.context) { | ||
event.context = properties.headers.context | ||
} | ||
@@ -35,0 +19,0 @@ |
493
94074
10
36
2066
+ Addedms@^2.1.1
+ Addedopentracing@^0.14.3
+ Addedms@2.1.3(transitive)
+ Addedopentracing@0.14.7(transitive)