Comparing version 2.0.0-beta.17 to 2.0.0-beta.18
@@ -165,8 +165,10 @@ const CallableInstance = require('callable-instance') | ||
deadLetterExchange: this._remit._exchange, | ||
deadLetterRoutingKey: opts.event, | ||
expires: expiration * 2 | ||
deadLetterRoutingKey: opts.event | ||
} | ||
if (!opts.schedule) { | ||
if (opts.delay) { | ||
queueOpts.messageTtl = expiration | ||
queueOpts.expires = expiration * 2 | ||
} else { | ||
queueOpts.expires = expiration + 60000 | ||
} | ||
@@ -183,3 +185,10 @@ | ||
this._remit._workers.destroy(worker) | ||
throw e | ||
// if we're scheduling an emission and we have an inequivalent | ||
// x-expires argument, that's fine; that'll happen | ||
if (opts.schedule && e.message && e.message.substr(94, 28) === 'inequivalent arg \'x-expires\'') { | ||
return { queue, expiration } | ||
} else { | ||
throw e | ||
} | ||
} | ||
@@ -186,0 +195,0 @@ } |
@@ -166,2 +166,4 @@ const CallableInstance = require('callable-instance') | ||
_waitForResult (messageId) { | ||
const types = ['data', 'timeout'] | ||
return new Promise((resolve, reject) => { | ||
@@ -172,4 +174,5 @@ const cleanUp = (err, result) => { | ||
this._emitter.removeAllListeners(`data-${messageId}`) | ||
this._emitter.removeAllListeners(`timeout-${messageId}`) | ||
types.forEach((type) => { | ||
this._emitter.removeAllListeners(`${type}-${messageId}`) | ||
}) | ||
@@ -190,13 +193,8 @@ if (err) { | ||
this._emitter.once(`data-${messageId}`, cleanUp) | ||
this._emitter.once(`data-${messageId}`, (...args) => { | ||
this._emitter.emit('data', ...args) | ||
types.forEach((type) => { | ||
this._emitter.once(`${type}-${messageId}`, (...args) => { | ||
cleanUp(...args) | ||
this._emitter.emit(type, ...args) | ||
}) | ||
}) | ||
this._emitter.once(`timeout-${messageId}`, cleanUp) | ||
this._emitter.once(`timeout-${messageId}`, (...args) => { | ||
this._emitter.emit('timeout', ...args) | ||
}) | ||
}) | ||
@@ -203,0 +201,0 @@ } |
{ | ||
"name": "remit", | ||
"version": "2.0.0-beta.17", | ||
"version": "2.0.0-beta.18", | ||
"description": "A small set of functionality used to create microservices that don't need to be aware of one-another's existence.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -8,3 +8,3 @@ const genericPool = require('generic-pool') | ||
const channel = await con.createChannel() | ||
channel.on('error', console.error) | ||
channel.on('error', () => {}) | ||
channel.on('close', () => console.log('Worker channel closed')) | ||
@@ -11,0 +11,0 @@ |
66013
1468