@emartech/rabbitmq-client
Advanced tools
Comparing version 1.2.0 to 1.3.0
@@ -44,3 +44,3 @@ { | ||
}, | ||
"version": "1.2.0" | ||
"version": "1.3.0" | ||
} |
@@ -43,3 +43,5 @@ 'use strict'; | ||
const connectionMock = { | ||
createChannel: () => Promise.resolve(true) | ||
createChannel: () => Promise.resolve({ | ||
assertQueue: () => {} | ||
}) | ||
}; | ||
@@ -46,0 +48,0 @@ sandbox.stub(amqp, 'connect').resolves(connectionMock); |
@@ -76,6 +76,2 @@ 'use strict'; | ||
}); | ||
process.on('uncaughtException', error => { | ||
logger.error('Consumer uncaught error', error.message, JSON.stringify({ error: error })); | ||
}); | ||
} catch (error) { | ||
@@ -82,0 +78,0 @@ logger.error('Consumer initialization error', error.message, JSON.stringify({ error: error })); |
@@ -42,3 +42,5 @@ 'use strict'; | ||
const connectionMock = { | ||
createChannel: () => Promise.resolve(true) | ||
createChannel: () => Promise.resolve({ | ||
assertQueue: () => {} | ||
}) | ||
}; | ||
@@ -45,0 +47,0 @@ sandbox.stub(amqp, 'connect').resolves(connectionMock); |
@@ -7,2 +7,3 @@ 'use strict'; | ||
let connections = {}; | ||
let assertedQueues = {}; | ||
@@ -13,3 +14,3 @@ module.exports = { | ||
await rabbitMq.connect(connections); | ||
await rabbitMq.createChannel(channels); | ||
await rabbitMq.createChannel(channels, assertedQueues); | ||
@@ -16,0 +17,0 @@ return rabbitMq; |
@@ -34,3 +34,3 @@ 'use strict'; | ||
async createChannel(channels = {}) { | ||
async createChannel(channels = {}, assertedQueues = {}) { | ||
this._validate(); | ||
@@ -43,4 +43,12 @@ | ||
this._channel = await channels[this._connectionType]; | ||
await this._assertQueue(assertedQueues); | ||
} | ||
async _assertQueue(assertedQueues) { | ||
if (!assertedQueues[this.queueName]) { | ||
assertedQueues[this.queueName] = this._channel.assertQueue(this.queueName, { durable: false }); | ||
} | ||
await assertedQueues[this.queueName]; | ||
} | ||
async closeConnection() { | ||
@@ -47,0 +55,0 @@ await this._connection.close(); |
@@ -23,3 +23,3 @@ 'use strict'; | ||
describe('RabbitMQ', function() { | ||
let subject; | ||
let rabbitMq; | ||
let sandbox = sinon.sandbox.create(); | ||
@@ -34,3 +34,4 @@ | ||
deleteQueue: sandbox.stub().resolves(true), | ||
purgeQueue: sandbox.stub().resolves(true) | ||
purgeQueue: sandbox.stub().resolves(true), | ||
assertQueue: sandbox.stub().resolves(true) | ||
}; | ||
@@ -44,3 +45,3 @@ | ||
sandbox.stub(amqp, 'connect').resolves(connectionMock); | ||
subject = new RabbitMq(config, queueName); | ||
rabbitMq = new RabbitMq(config, queueName); | ||
}); | ||
@@ -53,3 +54,3 @@ | ||
it('#connect should call amqp connect with rigth parameters', async function() { | ||
await subject.connect(); | ||
await rabbitMq.connect(); | ||
expect(amqp.connect).to.have.been.calledWith( | ||
@@ -63,3 +64,3 @@ 'amqp://test:secret@192.168.40.10:5672/cubebloc', | ||
const connections = {}; | ||
await subject.connect(connections); | ||
await rabbitMq.connect(connections); | ||
const connection = await connections.default; | ||
@@ -75,5 +76,5 @@ | ||
const connections = { default: Promise.resolve(localConnectionMock) }; | ||
await subject.connect(connections); | ||
await rabbitMq.connect(connections); | ||
await subject.closeConnection(); | ||
await rabbitMq.closeConnection(); | ||
expect(localConnectionMock.close).to.have.been.calledOnce; | ||
@@ -83,11 +84,15 @@ }); | ||
it('#createChannel should check if connection is ready', async function() { | ||
await expect(subject.createChannel()).to.be.rejectedWith('No RabbitMQ connection'); | ||
await subject.connect(); | ||
await expect(subject.createChannel()).to.be.fulfilled; | ||
await expect(rabbitMq.createChannel()).to.be.rejectedWith('No RabbitMQ connection'); | ||
await rabbitMq.connect(); | ||
await expect(rabbitMq.createChannel()).to.be.fulfilled; | ||
}); | ||
it('#createChannel should cache the channel', async function() { | ||
it('#createChannel should cache the channel and assert the queue', async function() { | ||
const assertQueueValue = { testing: 123 }; | ||
channelMock.assertQueue = sandbox.stub().resolves(assertQueueValue); | ||
const channels = {}; | ||
await subject.connect(); | ||
await subject.createChannel(channels); | ||
const assertedQueues = {}; | ||
await rabbitMq.connect(); | ||
await rabbitMq.createChannel(channels, assertedQueues); | ||
@@ -97,19 +102,24 @@ const channel = await channels.default; | ||
expect(channel).to.be.equal(channelMock); | ||
expect(channelMock.assertQueue).to.have.been.calledWith(queueName, { durable: false }); | ||
expect(await assertedQueues[queueName]).to.eq(assertQueueValue); | ||
}); | ||
it('#createChannel should reuse existing channel if it was already created', async function() { | ||
const channels = { default: Promise.resolve('channel') }; | ||
it('#createChannel should reuse existing channel and assertQueue if it was already created', async function() { | ||
const localChannelMock = Object.assign({}, channelMock); | ||
const channels = { default: Promise.resolve(localChannelMock) }; | ||
await subject.connect(); | ||
await subject.createChannel(channels); | ||
const assertedQueues = {}; | ||
assertedQueues[queueName] = 'called'; | ||
const channel = await channels.default; | ||
await rabbitMq.connect(); | ||
await rabbitMq.createChannel(channels, assertedQueues); | ||
expect(subject.getChannel()).to.be.equal(channel); | ||
expect(await rabbitMq.getChannel()).to.be.eq(localChannelMock); | ||
expect(localChannelMock.assertQueue).not.to.have.been.called; | ||
}); | ||
it('#createChannel should check if queueName was set', async function() { | ||
subject = new RabbitMq(config); | ||
await subject.connect(); | ||
await expect(subject.createChannel()).to.be.rejectedWith('No RabbitMQ queue'); | ||
rabbitMq = new RabbitMq(config); | ||
await rabbitMq.connect(); | ||
await expect(rabbitMq.createChannel()).to.be.rejectedWith('No RabbitMQ queue'); | ||
}); | ||
@@ -119,5 +129,5 @@ | ||
const data = { test: 'data' }; | ||
await subject.connect(); | ||
await subject.createChannel(); | ||
subject.insert(data); | ||
await rabbitMq.connect(); | ||
await rabbitMq.createChannel(); | ||
rabbitMq.insert(data); | ||
expect(channelMock.sendToQueue).to.have.been.calledWith(queueName, new Buffer(JSON.stringify(data))); | ||
@@ -129,6 +139,6 @@ }); | ||
const data = { test: 'data' }; | ||
await subject.connect(); | ||
await subject.createChannel(); | ||
await rabbitMq.connect(); | ||
await rabbitMq.createChannel(); | ||
subject.insertWithGroupBy(groupBy, data); | ||
rabbitMq.insertWithGroupBy(groupBy, data); | ||
expect(channelMock.sendToQueue).to.have.been.calledWith( | ||
@@ -142,6 +152,6 @@ queueName, | ||
it('#purge should empty the queue', async function() { | ||
await subject.connect(); | ||
await subject.createChannel(); | ||
await rabbitMq.connect(); | ||
await rabbitMq.createChannel(); | ||
await subject.purge(); | ||
await rabbitMq.purge(); | ||
@@ -152,6 +162,6 @@ expect(channelMock.purgeQueue).to.have.been.calledWith(queueName); | ||
it('#closeConnection should close the rabbitMq connection', async function() { | ||
await subject.connect(); | ||
await subject.createChannel(); | ||
await rabbitMq.connect(); | ||
await rabbitMq.createChannel(); | ||
await subject.closeConnection(); | ||
await rabbitMq.closeConnection(); | ||
expect(connectionMock.close).to.have.been.calledOnce; | ||
@@ -161,8 +171,8 @@ }); | ||
it('#destroy should delete the queue', async function() { | ||
await subject.connect(); | ||
await subject.createChannel(); | ||
await rabbitMq.connect(); | ||
await rabbitMq.createChannel(); | ||
await subject.destroy(); | ||
await rabbitMq.destroy(); | ||
expect(channelMock.deleteQueue).to.have.been.calledWith(queueName); | ||
}); | ||
}); |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
28603
737
0