Comparing version 2.0.0-beta.3 to 2.0.0-beta.4
@@ -5,5 +5,7 @@ const remit = require('../../')({ | ||
const demit = remit.demit('examples.basic.demission') | ||
remit.listen('examples.basic.demission') | ||
.data((event, callback) => { | ||
console.log('Got event', event) | ||
console.log('Got event', event.eventId, event.delay) | ||
@@ -13,5 +15,7 @@ return callback(null, true) | ||
.ready().then(() => { | ||
Array.from([5000, 7000, 1000, 3000, 4000, 9000, 2000, 10000, 6000, 8000]).forEach((time) => { | ||
remit.demit('examples.basic.demission')({foo: 'bar'}, (new Date(+new Date() + time))) | ||
Array.from([5000, 7000, 1000, 3000, 4000, 5000, 2000, 2000, 2000, 9000, 2000, 10000, 6000, 8000, 30000, 30000, 30000]).forEach((time) => { | ||
demit({foo: 'bar'}, { | ||
delay: time | ||
}) | ||
}) | ||
}) |
const debug = require('debug')('remit:callback-handler') | ||
// const getConsumeChannel = require('./assertions/consumeChannel') | ||
// const getWorkChannel = require('./assertions/workChannel') | ||
const getPublishChannel = require('./assertions/publishChannel') | ||
@@ -74,3 +72,2 @@ | ||
remit._workChannelPool.acquire().then((channel) => { | ||
// getWorkChannel.apply(remit).then((workChannel) => { | ||
workChannel = channel | ||
@@ -77,0 +74,0 @@ |
@@ -40,2 +40,6 @@ const getCallbackHandler = require('./handleCallback') | ||
if (message.properties.headers.delay) { | ||
event.delay = message.properties.headers.delay | ||
} | ||
if (message.properties.headers.trace) { | ||
@@ -42,0 +46,0 @@ event.resourceTrace = message.properties.headers.trace |
const debug = require('debug')('remit:request') | ||
const EventEmitter = require('eventemitter3') | ||
const getPublishChannel = require('./assertions/publishChannel') | ||
// const getWorkChannel = require('./assertions/workChannel') | ||
const consumeReplies = require('./assertions/reply') | ||
@@ -59,2 +58,3 @@ const uuid = require('uuid') | ||
const data = extra.shift() | ||
const localOpts = Object.assign({}, request._options, extra.shift() || {}) | ||
const now = +new Date() | ||
@@ -79,7 +79,11 @@ const messageId = uuid.v4() | ||
if (!(extra[0] instanceof Date) || extra[0].toString() === 'Invalid Date') { | ||
throw new Error('Invalid date object given when attempting to send a delayed emission') | ||
if ((!localOpts.delay || isNaN(localOpts.delay)) && (!localOpts.schedule || !(localOpts.schedule instanceof Date) || localOpts.schedule.toString() === 'Invalid Date')) { | ||
throw new Error('Invalid schedule date or delay duration when attempting to send a delayed emission') | ||
} | ||
expiration = +extra[0] - now | ||
if (localOpts.schedule) { | ||
expiration = +localOpts.schedule - now | ||
} else { | ||
expiration = localOpts.delay | ||
} | ||
@@ -105,3 +109,2 @@ if (expiration <= 0) { | ||
deadLetterRoutingKey: options.event, | ||
maxLength: 1, | ||
expires: expiration + 5000 | ||
@@ -163,4 +166,16 @@ }) | ||
if (localOpts.priority) { | ||
if (localOpts.priority > 10 || localOpts.priority < 0) { | ||
throw new Error('Invalid priority', localOpts.priority, 'when pushing message') | ||
} | ||
messageOptions.priority = localOpts.priority | ||
} | ||
if (options.demission) { | ||
messageOptions.headers.scheduled = +extra[0] | ||
if (localOpts.schedule) { | ||
messageOptions.headers.schedule = +localOpts.schedule | ||
} else { | ||
messageOptions.headers.delay = localOpts.delay | ||
} | ||
@@ -187,4 +202,11 @@ publishChannel.sendToQueue( | ||
const cleanUp = (err, result) => { | ||
request._emitter.removeListener('data', cleanUp) | ||
request._emitter.removeListener('timeout', cleanUp) | ||
clearTimeout(timeout) | ||
if (type.options.expectReply) { | ||
request._emitter.removeListener('data', cleanUp) | ||
request._emitter.removeListener('timeout', cleanUp) | ||
} else { | ||
request._emitter.removeListener('sent', cleanUp) | ||
} | ||
request._emitter.removeListener('error', cleanUp) | ||
@@ -196,4 +218,9 @@ | ||
request._emitter.on('data', cleanUp) | ||
request._emitter.on('timeout', cleanUp) | ||
if (type.options.expectReply) { | ||
request._emitter.on('data', a => cleanUp(null, a)) | ||
request._emitter.on('timeout', cleanUp) | ||
} else { | ||
request._emitter.on('sent', a => cleanUp(null, a)) | ||
} | ||
request._emitter.on('error', cleanUp) | ||
@@ -204,4 +231,11 @@ }) | ||
request._emitter = new EventEmitter() | ||
request._options = {} | ||
request.send = request | ||
request.options = function setOptions (options) { | ||
request._options = Object.assign({}, request._options, options || {}) | ||
return request | ||
} | ||
request.data = function onData (callback) { | ||
@@ -208,0 +242,0 @@ request._emitter.on('data', callback) |
const debug = require('debug')('remit:response') | ||
const async = require('async') | ||
const EventEmitter = require('eventemitter3') | ||
// const getWorkChannel = require('./assertions/workChannel') | ||
// const getConsumeChannel = require('./assertions/consumeChannel') | ||
const handleMessage = require('./handleMessage') | ||
@@ -107,3 +105,2 @@ | ||
// getWorkChannel.apply(remit).then((workChannel) => { | ||
remit._workChannelPool.acquire().then((channel) => { | ||
@@ -110,0 +107,0 @@ debug('Asserting endpoint', options.event) |
{ | ||
"name": "remit", | ||
"version": "2.0.0-beta.3", | ||
"version": "2.0.0-beta.4", | ||
"description": "A small set of functionality used to create microservices that don't need to be aware of one-another's existence.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
138382
1559