New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@emartech/rabbitmq-client

Package Overview
Dependencies
Maintainers
44
Versions
40
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@emartech/rabbitmq-client - npm Package Compare versions

Comparing version 2.0.0 to 2.1.0

4

package.json

@@ -6,3 +6,3 @@ {

"scripts": {
"test": "mocha --reporter spec ./src/setup-tests.spec.js ./src/*.spec.js ./src/**/*.spec.js",
"test": "mocha --reporter spec ./src/*.spec.js ./src/**/*.spec.js",
"semantic-release": "CI=true semantic-release pre && npm publish --access public && semantic-release post"

@@ -45,3 +45,3 @@ },

},
"version": "2.0.0"
"version": "2.1.0"
}

@@ -24,3 +24,2 @@ 'use strict';

this._consumerCanceled = false;
this._queueOptions = configuration.queueOptions;

@@ -64,3 +63,3 @@ if (this._prefetchCount < this._batchSize) {

if (!this._rabbitMqChannel) {
this._rabbitMq = await RabbitMq.create(this._amqpConfig, this._channel, this._connectionType, this._queueOptions);
this._rabbitMq = await RabbitMq.create(this._amqpConfig, this._channel, this._connectionType);
this._rabbitMqChannel = this._rabbitMq.getChannel();

@@ -67,0 +66,0 @@ await this._rabbitMqChannel.prefetch(this._prefetchCount);

@@ -24,5 +24,2 @@ 'use strict';

url: 'amqp://test:secret@192.168.40.10:5672/cubebloc'
},
special: {
url: 'amqp://spec:ial@192.168.40.10:5672/special'
}

@@ -48,4 +45,3 @@ };

createChannel: () => Promise.resolve({
assertQueue: () => {},
on: () => {}
assertQueue: () => {}
})

@@ -82,14 +78,2 @@ };

it('should create a RabbitMQ connection with the defined queueOptions', async function() {
const rabbitMqSpy = sandbox.spy(RabbitMQSingleton, 'create');
stubRabbitMq();
await createConsumer({
connectionType: 'special',
queueOptions: { durable: true }
});
expect(rabbitMqSpy).have.been.calledWith(amqpConfig, channelName, 'special', { durable: true });
});
it('should call onMessages with batched messages', async function() {

@@ -96,0 +80,0 @@ const message1 = createMessage({ content: '{"foo":"bar"}' });

@@ -17,4 +17,2 @@ 'use strict';

this._amqpConfig = amqpConfig;
this._queueOptions = configuration.queueOptions;
this._connectionType = configuration.connectionType || 'default';
}

@@ -27,3 +25,3 @@

try {
const rabbitMq = await RabbitMq.create(this._amqpConfig, this._channel, this._connectionType, this._queueOptions);
const rabbitMq = await RabbitMq.create(this._amqpConfig, this._channel);
const channel = rabbitMq.getChannel();

@@ -30,0 +28,0 @@ await channel.prefetch(this._prefetchCount);

@@ -23,5 +23,2 @@ 'use strict';

url: 'amqp://test:secret@192.168.40.10:5672/cubebloc'
},
test: {
url: 'amqp://test:test@test:5672/test'
}

@@ -91,34 +88,2 @@ };

it('should create a RabbitMQ connection with the right preferences', async function() {
const configuration = {
logger: loggerName,
channel: channelName,
connectionType: 'test',
onMessage: async function() {}
};
const rabbitMqStub = sandbox.stub(RabbitMQSingleton, 'create');
const rabbitMQConsumer = RabbitMQConsumer.create(amqpConfig, configuration);
await rabbitMQConsumer.process();
expect(rabbitMqStub).have.been.calledWith(amqpConfig, channelName, 'test');
});
it('should create a RabbitMQ connection with the desired queue options', async function() {
const configuration = {
logger: loggerName,
channel: channelName,
queueOptions: { durable: true },
onMessage: async function() {}
};
const rabbitMqStub = sandbox.stub(RabbitMQSingleton, 'create');
const rabbitMQConsumer = RabbitMQConsumer.create(amqpConfig, configuration);
await rabbitMQConsumer.process();
expect(rabbitMqStub).have.been.calledWith(amqpConfig, channelName, 'default', { durable: true });
});
it('should not retry when message is not parsable as JSON', async function() {

@@ -125,0 +90,0 @@ const message = { content: new Buffer('Not a JSON') };

'use strict';
const Pool = require('./pool');
const RabbitMq = require('./rabbit-mq');
let channels = {};
let connections = {};
let assertedQueues = {};
module.exports = {
create: async (amqpConfig, queueName, connectionType, queueOptions) => {
const rabbitMq = Pool.create(amqpConfig, connectionType).getClient(queueName, queueOptions);
await rabbitMq.connect();
await rabbitMq.createChannel();
create: async (amqpConfig, queueName, connectionType) => {
const rabbitMq = new RabbitMq(amqpConfig, queueName, connectionType);
await rabbitMq.connect(connections);
await rabbitMq.createChannel(channels, assertedQueues);

@@ -11,0 +15,0 @@ return rabbitMq;

@@ -10,21 +10,19 @@ 'use strict';

class RabbitMq {
constructor(amqpConfig, queueName, queueOptions = {}) {
constructor(amqpConfig, queueName, connectionType = 'default') {
this.queueName = queueName;
this.queueOptions = queueOptions;
this._amqpConfig = amqpConfig;
this._connectionType = connectionType;
this._connection = null;
this._queueAsserted = false;
this._channel = null;
}
async connect() {
if (!this._connectionProgress) {
const options = this._getOpts();
this._connectionProgress = amqp.connect(this._config.url, options);
this._connectionProgress.then(connection => {
this._connection = connection;
});
async connect(connections = {}) {
if (connections[this._connectionType]) {
this._connection = await connections[this._connectionType];
return;
}
await this._connectionProgress;
const opts = this._getOpts();
connections[this._connectionType] = amqp.connect(this._config.url, opts);
this._connection = await connections[this._connectionType];
}

@@ -38,44 +36,34 @@

async createChannel() {
async createChannel(channels = {}, assertedQueues = {}) {
this._validate();
let registerCloseListener = false;
if (!this._channelCreationProgress) {
this._channelCreationProgress = this._connection.createChannel().
then(channel => {
this._channel = channel;
return channel;
}).
then(this._handleErrorEvent.bind(this)).
then(this._handleCloseEvent.bind(this)).
then(this._assertQueue.bind(this));
if (!channels[this._connectionType]) {
channels[this._connectionType] = this._connection.createChannel();
registerCloseListener = true;
}
await this._channelCreationProgress;
}
this._channel = await channels[this._connectionType];
_handleErrorEvent(channel) {
channel.on('error', error => {
logger.error('Channel error', error.message, JSON.stringify(error));
});
return channel;
}
if (registerCloseListener) {
this._channel.on('error', error => {
logger.error('Channel error', error.message, JSON.stringify(error));
});
_handleCloseEvent(channel) {
channel.on('close', () => {
this._channel = null;
this._channelCreationProgress = null;
this._queueAsserted = false;
logger.error('Channel close');
});
this._channel.on('close', () => {
delete channels[this._connectionType];
delete assertedQueues[this.queueName];
logger.error('Channel close');
});
}
return channel;
await this._assertQueue(assertedQueues);
}
async _assertQueue(channel) {
if (!this._queueAsserted) {
this._queueAsserted = true;
async _assertQueue(assertedQueues) {
if (!assertedQueues[this.queueName]) {
assertedQueues[this.queueName] = this._channel.assertQueue(this.queueName, { durable: false });
}
await channel.assertQueue(this.queueName, this.queueOptions);
await assertedQueues[this.queueName];
}

@@ -91,14 +79,16 @@

insert(data) {
return this._channel.sendToQueue(this.queueName, new Buffer(JSON.stringify(data)));
}
insertWithGroupBy(groupBy, data) {
insert(data, options = {}) {
return this._channel.sendToQueue(
this.queueName,
new Buffer(JSON.stringify(data)),
{ headers: { groupBy } }
options
);
}
insertWithGroupBy(groupBy, data, options = {}) {
const insertOptions = Object.assign({}, options, { headers: { groupBy } });
return this.insert(data, insertOptions);
}
async purge() {

@@ -123,3 +113,3 @@ await this._channel.purgeQueue(this.queueName);

get _config() {
return this._amqpConfig;
return this._amqpConfig[this._connectionType];
}

@@ -126,0 +116,0 @@ }

'use strict';
const amqp = require('amqplib');
const Pool = require('./pool');
const RabbitMq = require('./rabbit-mq');
const chai = require('chai');
const sinon = require('sinon');
const chaiAsPromised = require('chai-as-promised');
const sinonChai = require('sinon-chai');
const EventEmitter = require('events');
chai.use(sinonChai);
chai.use(chaiAsPromised);
const expect = chai.expect;
const config = {

@@ -17,3 +26,3 @@ default: {

let rabbitMq;
let sandbox;
let sandbox = sinon.sandbox.create();

@@ -24,4 +33,2 @@ let connectionMock;

beforeEach(async function() {
sandbox = this.sandbox;
channelMock = Object.assign(new EventEmitter(), {

@@ -40,5 +47,9 @@ sendToQueue: sandbox.stub().returns(true),

sandbox.stub(amqp, 'connect').resolves(connectionMock);
rabbitMq = Pool.createFromNewPool(config, 'default').getClient(queueName, { durable: true, autoDelete: true });
rabbitMq = new RabbitMq(config, queueName);
});
afterEach(async function() {
sandbox.restore();
});
it('#connect should call amqp connect with rigth parameters', async function() {

@@ -55,5 +66,16 @@ await rabbitMq.connect();

await rabbitMq.connect(connections);
const connection = await connections.default;
expect(connection).to.be.equal(connectionMock);
});
it('#connect should reuse existing connection if it was already created', async function() {
const localConnectionMock = {
close: sandbox.stub().resolves(true)
};
const connections = { default: Promise.resolve(localConnectionMock) };
await rabbitMq.connect(connections);
expect(amqp.connect).to.have.been.calledOnce;
await rabbitMq.closeConnection();
expect(localConnectionMock.close).to.have.been.calledOnce;
});

@@ -71,13 +93,30 @@

const channels = {};
const assertedQueues = {};
await rabbitMq.connect();
await rabbitMq.createChannel();
await rabbitMq.createChannel();
await rabbitMq.createChannel(channels, assertedQueues);
expect(channelMock.assertQueue).to.have.been.calledWith(queueName, { durable: true, autoDelete: true });
expect(channelMock.assertQueue).to.have.been.calledOnce;
expect(connectionMock.createChannel).to.have.been.calledOnce;
const channel = await channels.default;
expect(channel).to.be.equal(channelMock);
expect(channelMock.assertQueue).to.have.been.calledWith(queueName, { durable: false });
expect(await assertedQueues[queueName]).to.eq(assertQueueValue);
});
it('#createChannel should reuse existing channel and assertQueue if it was already created', async function() {
const localChannelMock = Object.assign({}, channelMock);
const channels = { default: Promise.resolve(localChannelMock) };
const assertedQueues = {};
assertedQueues[queueName] = 'called';
await rabbitMq.connect();
await rabbitMq.createChannel(channels, assertedQueues);
expect(await rabbitMq.getChannel()).to.be.eq(localChannelMock);
expect(localChannelMock.assertQueue).not.to.have.been.called;
});
it('#createChannel should check if queueName was set', async function() {
rabbitMq = Pool.createFromNewPool(config, 'default').getClient();
rabbitMq = new RabbitMq(config);
await rabbitMq.connect();

@@ -95,2 +134,11 @@ await expect(rabbitMq.createChannel()).to.be.rejectedWith('No RabbitMQ queue');

it('#insert should support options parameter', async function() {
const data = { test: 'data' };
const options = { test: 'ing' };
await rabbitMq.connect();
await rabbitMq.createChannel();
rabbitMq.insert(data, options);
expect(channelMock.sendToQueue).to.have.been.calledWith(queueName, new Buffer(JSON.stringify(data)), options);
});
it('#insertWithGroupBy should call sentToQueue', async function() {

@@ -110,2 +158,17 @@ const groupBy = 'me.login';

it('#insertWithGroupBy should support options parameter', async function() {
const groupBy = 'me.login';
const data = { test: 'data' };
const options = { test: 'ing' };
await rabbitMq.connect();
await rabbitMq.createChannel();
rabbitMq.insertWithGroupBy(groupBy, data, options);
expect(channelMock.sendToQueue).to.have.been.calledWith(
queueName,
new Buffer(JSON.stringify(data)),
Object.assign({ headers: { groupBy } }, options)
);
});
it('#purge should empty the queue', async function() {

@@ -135,2 +198,21 @@ await rabbitMq.connect();

});
describe('with dead channel', function() {
it('should remove channel from the cache', async function() {
const channels = {};
const assertedQueues = {};
await rabbitMq.connect();
await rabbitMq.createChannel(channels, assertedQueues);
expect(channels.default).not.to.be.undefined;
expect(assertedQueues[queueName]).not.to.be.undefined;
rabbitMq.getChannel().emit('close');
expect(channels.default).to.be.undefined;
expect(assertedQueues[queueName]).to.be.undefined;
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc