Comparing version 2.0.0-alpha.2 to 2.0.0-beta.1
@@ -31,3 +31,9 @@ const EventEmitter = require('eventemitter3') | ||
this.delayedEmit = Request.apply(this, [{ | ||
expectReply: false | ||
expectReply: false, | ||
before: (remit, options) => { | ||
options.demission = true | ||
return options | ||
} | ||
}]) | ||
@@ -34,0 +40,0 @@ |
@@ -9,3 +9,2 @@ const getCallbackHandler = require('./handleCallback') | ||
messageContent = JSON.parse(message.content.toString()) | ||
messageContent = Array.isArray(messageContent) ? messageContent[0] : messageContent | ||
} catch (e) { | ||
@@ -21,3 +20,3 @@ console.trace('Error processing message') | ||
const event = { | ||
let event = { | ||
eventId: message.properties.messageId, | ||
@@ -31,2 +30,6 @@ timestamp: new Date(message.properties.timestamp), | ||
if (message.properties.headers.scheduled) { | ||
event.scheduled = new Date(message.properties.headers.scheduled) | ||
} | ||
type._emitter.emit('data', event, callback) | ||
@@ -33,0 +36,0 @@ response._emitter.emit('data', event, callback) |
const debug = require('debug')('remit:request') | ||
const EventEmitter = require('eventemitter3') | ||
const getPublishChannel = require('./assertions/publishChannel') | ||
const getWorkChannel = require('./assertions/workChannel') | ||
const consumeReplies = require('./assertions/reply') | ||
@@ -18,2 +19,6 @@ const uuid = require('uuid') | ||
if (masterOptions.before) { | ||
options = masterOptions.before(remit, options) | ||
} | ||
return Request.apply(remit, [null, requestType, options]) | ||
@@ -47,5 +52,8 @@ } | ||
let request = function () { | ||
const data = Array.from(arguments) | ||
const extra = Array.from(arguments) | ||
const data = extra.shift() | ||
const now = +new Date() | ||
const messageId = uuid.v4() | ||
let trace | ||
let expiration | ||
@@ -61,3 +69,36 @@ try { | ||
.then(() => { | ||
return getPublishChannel.apply(remit) | ||
if (!options.demission) { | ||
return getPublishChannel.apply(remit) | ||
} | ||
if (!(extra[0] instanceof Date) || extra[0].toString() === 'Invalid Date') { | ||
throw new Error('Invalid date object given when attempting to send a delayed emission') | ||
} | ||
expiration = +extra[0] - now | ||
if (expiration <= 0) { | ||
options.demission = false | ||
return getPublishChannel.apply(remit) | ||
} | ||
return new Promise((resolve, reject) => { | ||
getWorkChannel.apply(remit).then((workChannel) => { | ||
return workChannel.assertQueue(`demission-${messageId}`, { | ||
messageTtl: expiration, | ||
exclusive: false, | ||
durable: true, | ||
autoDelete: true, | ||
deadLetterExchange: remit._options.exchange, | ||
deadLetterRoutingKey: options.event, | ||
maxLength: 1, | ||
expires: expiration + 5000 | ||
}) | ||
}).then((ok) => { | ||
return getPublishChannel.apply(remit) | ||
}).then((publishChannel) => { | ||
return resolve(publishChannel) | ||
}) | ||
}) | ||
}).then((publishChannel) => { | ||
@@ -68,3 +109,3 @@ debug('Sending message...') | ||
mandatory: true, | ||
messageId: uuid.v4(), | ||
messageId: messageId, | ||
appId: remit._options.name, | ||
@@ -96,9 +137,19 @@ timestamp: now, | ||
publishChannel.publish( | ||
remit._options.exchange, | ||
options.event, | ||
new Buffer(JSON.stringify(data)), | ||
messageOptions | ||
) | ||
if (options.demission) { | ||
messageOptions.headers.scheduled = +extra[0] | ||
publishChannel.sendToQueue( | ||
`demission-${messageOptions.messageId}`, | ||
new Buffer(JSON.stringify(data)), | ||
messageOptions | ||
) | ||
} else { | ||
publishChannel.publish( | ||
remit._options.exchange, | ||
options.event, | ||
new Buffer(JSON.stringify(data)), | ||
messageOptions | ||
) | ||
} | ||
type._emitter.emit('sent', data) | ||
@@ -105,0 +156,0 @@ request._emitter.emit('sent', data) |
{ | ||
"name": "remit", | ||
"version": "2.0.0-alpha.2", | ||
"version": "2.0.0-beta.1", | ||
"description": "A small set of functionality used to create microservices that don't need to be aware of one-another's existence.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -108,2 +108,5 @@ /* global describe, it, expect, sinon, remit */ | ||
it('should be hit as a result of calling the request', function (done) { | ||
this.timeout(500) | ||
this.slow(400) | ||
request | ||
@@ -117,3 +120,3 @@ .send({foo: 'bar'}) | ||
return done() | ||
}, 25) | ||
}, 250) | ||
}) | ||
@@ -120,0 +123,0 @@ }) |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
54075
31
968
0