@emartech/rabbitmq-client
Advanced tools
Comparing version 4.1.1 to 4.2.0
@@ -39,3 +39,3 @@ { | ||
}, | ||
"version": "4.1.1" | ||
"version": "4.2.0" | ||
} |
@@ -28,2 +28,3 @@ 'use strict'; | ||
this._cryptoLib = configuration.cryptoLib; | ||
this._extendedMessages = configuration.extendedMessages; | ||
@@ -65,9 +66,15 @@ if (this._prefetchCount < this._batchSize) { | ||
_handleSimpleCollectedMessages(groupBy, messageObjects) { | ||
let contents; | ||
let parsedMessages; | ||
try { | ||
contents = messageObjects.map((message) => JSON.parse(message.content.toString())); | ||
const getParsedContent = (message) => JSON.parse(message.content.toString()); | ||
const parseMessageContent = (message) => ({ | ||
...message, | ||
content: getParsedContent(message) | ||
}); | ||
parsedMessages = messageObjects.map(this._extendedMessages ? parseMessageContent : getParsedContent); | ||
} catch (error) { | ||
return this._consumerError(error, groupBy, messageObjects); | ||
} | ||
this._onMessages(groupBy, contents) | ||
this._onMessages(groupBy, parsedMessages) | ||
.then(() => { | ||
@@ -84,9 +91,22 @@ return this._consumerSuccess(groupBy, messageObjects); | ||
async _getDecryptedMessageContent(message) { | ||
const decryptedContent = await this._cryptoLib.decrypt(message.content.toString()); | ||
return JSON.parse(decryptedContent); | ||
} | ||
async _getMessageWithDecryptedContent(message) { | ||
const decryptedContent = await this._getDecryptedMessageContent(message); | ||
return { ...message, content: decryptedContent }; | ||
} | ||
_handleEncryptedCollectedMessages(groupBy, messageObjects) { | ||
return Promise.all(messageObjects.map((messsage) => this._cryptoLib.decrypt(messsage.content.toString()))) | ||
return Promise.all( | ||
messageObjects.map( | ||
this._extendedMessages ? | ||
this._getMessageWithDecryptedContent.bind(this) : | ||
this._getDecryptedMessageContent.bind(this) | ||
) | ||
) | ||
.then((decryptedMessages) => { | ||
this._onMessages( | ||
groupBy, | ||
decryptedMessages.map((message) => JSON.parse(message)) | ||
) | ||
this._onMessages(groupBy, decryptedMessages) | ||
.then(() => { | ||
@@ -93,0 +113,0 @@ return this._consumerSuccess(groupBy, messageObjects); |
@@ -262,2 +262,69 @@ 'use strict'; | ||
it('should call onMessages with batched message with meta data when extendedMessages is true', async function () { | ||
const onMessagesSpy = sinon.stub().resolves({}); | ||
const properties = { headers: { groupBy: 'testGroup', 'x-first-death-reason': 'rejected' } }; | ||
stubRabbitMq(); | ||
await createConsumer({ | ||
extendedMessages: true, | ||
onMessages: onMessagesSpy | ||
}); | ||
await consume(createMessage({ content: '{"foo":"bar"}', properties })); | ||
await consume(createMessage({ content: '{"abc":"123"}', properties })); | ||
expect(onMessagesSpy).not.to.be.called; | ||
clock.tick(60000); | ||
expect(onMessagesSpy).to.be.calledOnce; | ||
expect(onMessagesSpy).to.be.calledWith('testGroup', [ | ||
{ content: { foo: 'bar' }, properties }, | ||
{ content: { abc: '123' }, properties } | ||
]); | ||
}); | ||
// eslint-disable-next-line max-len | ||
it('should call onMessages with decrypted batched messages with meta data when extendedMessages is true', async function () { | ||
const properties = { headers: { groupBy: 'testGroup', 'x-first-death-reason': 'rejected' } }; | ||
const encryptedMessageContent1 = await dummyCrypto.encrypt('{"foo":"bar"}'); | ||
const encryptedMessageContent2 = await dummyCrypto.encrypt('{"abc":"123"}'); | ||
const message1 = createMessage({ content: encryptedMessageContent1, properties }); | ||
const message2 = createMessage({ content: encryptedMessageContent2, properties }); | ||
stubRabbitMq(); | ||
let onMessagesArguments = null; | ||
let resolveWait; | ||
const waitForOnMessages = new Promise((resolve) => { | ||
resolveWait = resolve; | ||
}); | ||
await createConsumer({ | ||
cryptoLib: dummyCrypto, | ||
extendedMessages: true, | ||
onMessages: async function () { | ||
onMessagesArguments = arguments; | ||
await resolveWait(); | ||
return Promise.resolve(); | ||
} | ||
}); | ||
await consume(message1); | ||
await consume(message2); | ||
expect(onMessagesArguments).to.be.null; | ||
clock.tick(60000); | ||
await waitForOnMessages; | ||
expect(onMessagesArguments[0]).to.be.eql('testGroup'); | ||
expect(onMessagesArguments[1]).to.be.eql([ | ||
{ content: { foo: 'bar' }, properties }, | ||
{ content: { abc: '123' }, properties } | ||
]); | ||
}); | ||
const stubRabbitMq = function () { | ||
@@ -264,0 +331,0 @@ sandbox.stub(RabbitMQ.prototype, 'getChannel').returns({ |
61180
22
1557