@emartech/rabbitmq-client
Advanced tools
Comparing version 2.0.0 to 2.1.0
@@ -6,3 +6,3 @@ { | ||
"scripts": { | ||
"test": "mocha --reporter spec ./src/setup-tests.spec.js ./src/*.spec.js ./src/**/*.spec.js", | ||
"test": "mocha --reporter spec ./src/*.spec.js ./src/**/*.spec.js", | ||
"semantic-release": "CI=true semantic-release pre && npm publish --access public && semantic-release post" | ||
@@ -45,3 +45,3 @@ }, | ||
}, | ||
"version": "2.0.0" | ||
"version": "2.1.0" | ||
} |
@@ -24,3 +24,2 @@ 'use strict'; | ||
this._consumerCanceled = false; | ||
this._queueOptions = configuration.queueOptions; | ||
@@ -64,3 +63,3 @@ if (this._prefetchCount < this._batchSize) { | ||
if (!this._rabbitMqChannel) { | ||
this._rabbitMq = await RabbitMq.create(this._amqpConfig, this._channel, this._connectionType, this._queueOptions); | ||
this._rabbitMq = await RabbitMq.create(this._amqpConfig, this._channel, this._connectionType); | ||
this._rabbitMqChannel = this._rabbitMq.getChannel(); | ||
@@ -67,0 +66,0 @@ await this._rabbitMqChannel.prefetch(this._prefetchCount); |
@@ -24,5 +24,2 @@ 'use strict'; | ||
url: 'amqp://test:secret@192.168.40.10:5672/cubebloc' | ||
}, | ||
special: { | ||
url: 'amqp://spec:ial@192.168.40.10:5672/special' | ||
} | ||
@@ -48,4 +45,3 @@ }; | ||
createChannel: () => Promise.resolve({ | ||
assertQueue: () => {}, | ||
on: () => {} | ||
assertQueue: () => {} | ||
}) | ||
@@ -82,14 +78,2 @@ }; | ||
it('should create a RabbitMQ connection with the defined queueOptions', async function() { | ||
const rabbitMqSpy = sandbox.spy(RabbitMQSingleton, 'create'); | ||
stubRabbitMq(); | ||
await createConsumer({ | ||
connectionType: 'special', | ||
queueOptions: { durable: true } | ||
}); | ||
expect(rabbitMqSpy).have.been.calledWith(amqpConfig, channelName, 'special', { durable: true }); | ||
}); | ||
it('should call onMessages with batched messages', async function() { | ||
@@ -96,0 +80,0 @@ const message1 = createMessage({ content: '{"foo":"bar"}' }); |
@@ -17,4 +17,2 @@ 'use strict'; | ||
this._amqpConfig = amqpConfig; | ||
this._queueOptions = configuration.queueOptions; | ||
this._connectionType = configuration.connectionType || 'default'; | ||
} | ||
@@ -27,3 +25,3 @@ | ||
try { | ||
const rabbitMq = await RabbitMq.create(this._amqpConfig, this._channel, this._connectionType, this._queueOptions); | ||
const rabbitMq = await RabbitMq.create(this._amqpConfig, this._channel); | ||
const channel = rabbitMq.getChannel(); | ||
@@ -30,0 +28,0 @@ await channel.prefetch(this._prefetchCount); |
@@ -23,5 +23,2 @@ 'use strict'; | ||
url: 'amqp://test:secret@192.168.40.10:5672/cubebloc' | ||
}, | ||
test: { | ||
url: 'amqp://test:test@test:5672/test' | ||
} | ||
@@ -91,34 +88,2 @@ }; | ||
it('should create a RabbitMQ connection with the right preferences', async function() { | ||
const configuration = { | ||
logger: loggerName, | ||
channel: channelName, | ||
connectionType: 'test', | ||
onMessage: async function() {} | ||
}; | ||
const rabbitMqStub = sandbox.stub(RabbitMQSingleton, 'create'); | ||
const rabbitMQConsumer = RabbitMQConsumer.create(amqpConfig, configuration); | ||
await rabbitMQConsumer.process(); | ||
expect(rabbitMqStub).have.been.calledWith(amqpConfig, channelName, 'test'); | ||
}); | ||
it('should create a RabbitMQ connection with the desired queue options', async function() { | ||
const configuration = { | ||
logger: loggerName, | ||
channel: channelName, | ||
queueOptions: { durable: true }, | ||
onMessage: async function() {} | ||
}; | ||
const rabbitMqStub = sandbox.stub(RabbitMQSingleton, 'create'); | ||
const rabbitMQConsumer = RabbitMQConsumer.create(amqpConfig, configuration); | ||
await rabbitMQConsumer.process(); | ||
expect(rabbitMqStub).have.been.calledWith(amqpConfig, channelName, 'default', { durable: true }); | ||
}); | ||
it('should not retry when message is not parsable as JSON', async function() { | ||
@@ -125,0 +90,0 @@ const message = { content: new Buffer('Not a JSON') }; |
'use strict'; | ||
const Pool = require('./pool'); | ||
const RabbitMq = require('./rabbit-mq'); | ||
let channels = {}; | ||
let connections = {}; | ||
let assertedQueues = {}; | ||
module.exports = { | ||
create: async (amqpConfig, queueName, connectionType, queueOptions) => { | ||
const rabbitMq = Pool.create(amqpConfig, connectionType).getClient(queueName, queueOptions); | ||
await rabbitMq.connect(); | ||
await rabbitMq.createChannel(); | ||
create: async (amqpConfig, queueName, connectionType) => { | ||
const rabbitMq = new RabbitMq(amqpConfig, queueName, connectionType); | ||
await rabbitMq.connect(connections); | ||
await rabbitMq.createChannel(channels, assertedQueues); | ||
@@ -11,0 +15,0 @@ return rabbitMq; |
@@ -10,21 +10,19 @@ 'use strict'; | ||
class RabbitMq { | ||
constructor(amqpConfig, queueName, queueOptions = {}) { | ||
constructor(amqpConfig, queueName, connectionType = 'default') { | ||
this.queueName = queueName; | ||
this.queueOptions = queueOptions; | ||
this._amqpConfig = amqpConfig; | ||
this._connectionType = connectionType; | ||
this._connection = null; | ||
this._queueAsserted = false; | ||
this._channel = null; | ||
} | ||
async connect() { | ||
if (!this._connectionProgress) { | ||
const options = this._getOpts(); | ||
this._connectionProgress = amqp.connect(this._config.url, options); | ||
this._connectionProgress.then(connection => { | ||
this._connection = connection; | ||
}); | ||
async connect(connections = {}) { | ||
if (connections[this._connectionType]) { | ||
this._connection = await connections[this._connectionType]; | ||
return; | ||
} | ||
await this._connectionProgress; | ||
const opts = this._getOpts(); | ||
connections[this._connectionType] = amqp.connect(this._config.url, opts); | ||
this._connection = await connections[this._connectionType]; | ||
} | ||
@@ -38,44 +36,34 @@ | ||
async createChannel() { | ||
async createChannel(channels = {}, assertedQueues = {}) { | ||
this._validate(); | ||
let registerCloseListener = false; | ||
if (!this._channelCreationProgress) { | ||
this._channelCreationProgress = this._connection.createChannel(). | ||
then(channel => { | ||
this._channel = channel; | ||
return channel; | ||
}). | ||
then(this._handleErrorEvent.bind(this)). | ||
then(this._handleCloseEvent.bind(this)). | ||
then(this._assertQueue.bind(this)); | ||
if (!channels[this._connectionType]) { | ||
channels[this._connectionType] = this._connection.createChannel(); | ||
registerCloseListener = true; | ||
} | ||
await this._channelCreationProgress; | ||
} | ||
this._channel = await channels[this._connectionType]; | ||
_handleErrorEvent(channel) { | ||
channel.on('error', error => { | ||
logger.error('Channel error', error.message, JSON.stringify(error)); | ||
}); | ||
return channel; | ||
} | ||
if (registerCloseListener) { | ||
this._channel.on('error', error => { | ||
logger.error('Channel error', error.message, JSON.stringify(error)); | ||
}); | ||
_handleCloseEvent(channel) { | ||
channel.on('close', () => { | ||
this._channel = null; | ||
this._channelCreationProgress = null; | ||
this._queueAsserted = false; | ||
logger.error('Channel close'); | ||
}); | ||
this._channel.on('close', () => { | ||
delete channels[this._connectionType]; | ||
delete assertedQueues[this.queueName]; | ||
logger.error('Channel close'); | ||
}); | ||
} | ||
return channel; | ||
await this._assertQueue(assertedQueues); | ||
} | ||
async _assertQueue(channel) { | ||
if (!this._queueAsserted) { | ||
this._queueAsserted = true; | ||
async _assertQueue(assertedQueues) { | ||
if (!assertedQueues[this.queueName]) { | ||
assertedQueues[this.queueName] = this._channel.assertQueue(this.queueName, { durable: false }); | ||
} | ||
await channel.assertQueue(this.queueName, this.queueOptions); | ||
await assertedQueues[this.queueName]; | ||
} | ||
@@ -91,14 +79,16 @@ | ||
insert(data) { | ||
return this._channel.sendToQueue(this.queueName, new Buffer(JSON.stringify(data))); | ||
} | ||
insertWithGroupBy(groupBy, data) { | ||
insert(data, options = {}) { | ||
return this._channel.sendToQueue( | ||
this.queueName, | ||
new Buffer(JSON.stringify(data)), | ||
{ headers: { groupBy } } | ||
options | ||
); | ||
} | ||
insertWithGroupBy(groupBy, data, options = {}) { | ||
const insertOptions = Object.assign({}, options, { headers: { groupBy } }); | ||
return this.insert(data, insertOptions); | ||
} | ||
async purge() { | ||
@@ -123,3 +113,3 @@ await this._channel.purgeQueue(this.queueName); | ||
get _config() { | ||
return this._amqpConfig; | ||
return this._amqpConfig[this._connectionType]; | ||
} | ||
@@ -126,0 +116,0 @@ } |
'use strict'; | ||
const amqp = require('amqplib'); | ||
const Pool = require('./pool'); | ||
const RabbitMq = require('./rabbit-mq'); | ||
const chai = require('chai'); | ||
const sinon = require('sinon'); | ||
const chaiAsPromised = require('chai-as-promised'); | ||
const sinonChai = require('sinon-chai'); | ||
const EventEmitter = require('events'); | ||
chai.use(sinonChai); | ||
chai.use(chaiAsPromised); | ||
const expect = chai.expect; | ||
const config = { | ||
@@ -17,3 +26,3 @@ default: { | ||
let rabbitMq; | ||
let sandbox; | ||
let sandbox = sinon.sandbox.create(); | ||
@@ -24,4 +33,2 @@ let connectionMock; | ||
beforeEach(async function() { | ||
sandbox = this.sandbox; | ||
channelMock = Object.assign(new EventEmitter(), { | ||
@@ -40,5 +47,9 @@ sendToQueue: sandbox.stub().returns(true), | ||
sandbox.stub(amqp, 'connect').resolves(connectionMock); | ||
rabbitMq = Pool.createFromNewPool(config, 'default').getClient(queueName, { durable: true, autoDelete: true }); | ||
rabbitMq = new RabbitMq(config, queueName); | ||
}); | ||
afterEach(async function() { | ||
sandbox.restore(); | ||
}); | ||
it('#connect should call amqp connect with rigth parameters', async function() { | ||
@@ -55,5 +66,16 @@ await rabbitMq.connect(); | ||
await rabbitMq.connect(connections); | ||
const connection = await connections.default; | ||
expect(connection).to.be.equal(connectionMock); | ||
}); | ||
it('#connect should reuse existing connection if it was already created', async function() { | ||
const localConnectionMock = { | ||
close: sandbox.stub().resolves(true) | ||
}; | ||
const connections = { default: Promise.resolve(localConnectionMock) }; | ||
await rabbitMq.connect(connections); | ||
expect(amqp.connect).to.have.been.calledOnce; | ||
await rabbitMq.closeConnection(); | ||
expect(localConnectionMock.close).to.have.been.calledOnce; | ||
}); | ||
@@ -71,13 +93,30 @@ | ||
const channels = {}; | ||
const assertedQueues = {}; | ||
await rabbitMq.connect(); | ||
await rabbitMq.createChannel(); | ||
await rabbitMq.createChannel(); | ||
await rabbitMq.createChannel(channels, assertedQueues); | ||
expect(channelMock.assertQueue).to.have.been.calledWith(queueName, { durable: true, autoDelete: true }); | ||
expect(channelMock.assertQueue).to.have.been.calledOnce; | ||
expect(connectionMock.createChannel).to.have.been.calledOnce; | ||
const channel = await channels.default; | ||
expect(channel).to.be.equal(channelMock); | ||
expect(channelMock.assertQueue).to.have.been.calledWith(queueName, { durable: false }); | ||
expect(await assertedQueues[queueName]).to.eq(assertQueueValue); | ||
}); | ||
it('#createChannel should reuse existing channel and assertQueue if it was already created', async function() { | ||
const localChannelMock = Object.assign({}, channelMock); | ||
const channels = { default: Promise.resolve(localChannelMock) }; | ||
const assertedQueues = {}; | ||
assertedQueues[queueName] = 'called'; | ||
await rabbitMq.connect(); | ||
await rabbitMq.createChannel(channels, assertedQueues); | ||
expect(await rabbitMq.getChannel()).to.be.eq(localChannelMock); | ||
expect(localChannelMock.assertQueue).not.to.have.been.called; | ||
}); | ||
it('#createChannel should check if queueName was set', async function() { | ||
rabbitMq = Pool.createFromNewPool(config, 'default').getClient(); | ||
rabbitMq = new RabbitMq(config); | ||
await rabbitMq.connect(); | ||
@@ -95,2 +134,11 @@ await expect(rabbitMq.createChannel()).to.be.rejectedWith('No RabbitMQ queue'); | ||
it('#insert should support options parameter', async function() { | ||
const data = { test: 'data' }; | ||
const options = { test: 'ing' }; | ||
await rabbitMq.connect(); | ||
await rabbitMq.createChannel(); | ||
rabbitMq.insert(data, options); | ||
expect(channelMock.sendToQueue).to.have.been.calledWith(queueName, new Buffer(JSON.stringify(data)), options); | ||
}); | ||
it('#insertWithGroupBy should call sentToQueue', async function() { | ||
@@ -110,2 +158,17 @@ const groupBy = 'me.login'; | ||
it('#insertWithGroupBy should support options parameter', async function() { | ||
const groupBy = 'me.login'; | ||
const data = { test: 'data' }; | ||
const options = { test: 'ing' }; | ||
await rabbitMq.connect(); | ||
await rabbitMq.createChannel(); | ||
rabbitMq.insertWithGroupBy(groupBy, data, options); | ||
expect(channelMock.sendToQueue).to.have.been.calledWith( | ||
queueName, | ||
new Buffer(JSON.stringify(data)), | ||
Object.assign({ headers: { groupBy } }, options) | ||
); | ||
}); | ||
it('#purge should empty the queue', async function() { | ||
@@ -135,2 +198,21 @@ await rabbitMq.connect(); | ||
}); | ||
describe('with dead channel', function() { | ||
it('should remove channel from the cache', async function() { | ||
const channels = {}; | ||
const assertedQueues = {}; | ||
await rabbitMq.connect(); | ||
await rabbitMq.createChannel(channels, assertedQueues); | ||
expect(channels.default).not.to.be.undefined; | ||
expect(assertedQueues[queueName]).not.to.be.undefined; | ||
rabbitMq.getChannel().emit('close'); | ||
expect(channels.default).to.be.undefined; | ||
expect(assertedQueues[queueName]).to.be.undefined; | ||
}); | ||
}); | ||
}); |
30630
15
786