Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@elastic.io/amqp-rpc

Package Overview
Dependencies
Maintainers
2
Versions
15
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@elastic.io/amqp-rpc - npm Package Compare versions

Comparing version 2.0.0-dev.1 to 2.0.0

2

package.json
{
"name": "@elastic.io/amqp-rpc",
"description": "RPC over RabbitMQ for Node.js",
"version": "2.0.0-dev.1",
"version": "2.0.0",
"homepage": "http://elastic.io",

@@ -6,0 +6,0 @@ "author": "elastic.io GmbH <info@elastic.io>",

@@ -9,3 +9,8 @@ 'use strict';

class AMQPEndpoint {
//@todo fix jsdoc
/**
*
* @param {*} connection Connection reference created from `amqplib` library
*
* @param {Object} [params]
*/
constructor(connection, params = {}) {

@@ -17,2 +22,9 @@ this._connection = connection;

/**
* Initialization before starting working
* NOTE! Race condition is not handled here,
* so it's better to not invoke the method several times (e.g. from multiple "threads")
*
* @return {Promise<void>}
*/
async start() {

@@ -27,5 +39,7 @@ if (this._channel) {

/**
* Opposite to this.start() – clearing
* NOTE! Race condition is not handled here,
* so it's better to not invoke the method several times (e.g. from multiple "threads")
*
*
* @returns {Promise}
* @return {Promise<void>}
*/

@@ -32,0 +46,0 @@ async disconnect() {

@@ -43,2 +43,3 @@ const assert = require('assert');

* @returns {Promise<String>} name of endpoint to send messages
* @override
*/

@@ -61,2 +62,3 @@ async start() {

* Stop listening for messages
* @override
*/

@@ -73,4 +75,3 @@ async disconnect() {

} catch (e) {
//it's ok to ignore this error, as queue
//may be deleted by AMQPStreamSender
//it's ok to ignore this error, as the queue might have been deleted by by AMQPStreamSender
}

@@ -86,2 +87,3 @@ }

this.emit('end');
//FIXME disconnect returns promise
this.disconnect();

@@ -88,0 +90,0 @@ return;

@@ -54,4 +54,7 @@ const EventEmitter = require('events');

/**
* Disconnect from event channel
* @returns {Promise}
* Opposite to this.start() – closing communication channel
* NOTE! Race condition is not handled here,
* so it's better to not invoke the method several times (e.g. from multiple "threads")
*
* @return {Promise<void>}
*/

@@ -73,4 +76,7 @@ async disconnect() {

/**
* Create and intialize amqp channel for communication
* @returns {Promise}
* Channel initialization, has to be done before starting working
* NOTE! Race condition is not handled here,
* so it's better to not invoke the method several times (e.g. from multiple "threads")
*
* @return {Promise<void>}
*/

@@ -77,0 +83,0 @@ async start() {

@@ -26,2 +26,5 @@ const Command = require('./Command');

if (!params.requestsQueue) {
throw new Error('params.requestsQueue is required');
}
super(connection, params);

@@ -92,3 +95,3 @@

/**
* Opposite to start.
* Opposite to this.start()
*

@@ -95,0 +98,0 @@ * @returns {Promise}

@@ -49,3 +49,3 @@ const Command = require('./Command');

/**
* Opposite to start.
* Opposite to this.start()
*

@@ -52,0 +52,0 @@ * @returns {Promise}

@@ -140,3 +140,3 @@ 'use strict';

it('Should handle timeouts', async () => {
const client = new AMQPRPCClient(connection, {timeout: 300});
const client = new AMQPRPCClient(connection, {timeout: 300, requestsQueue: 'tmp-queue-1'});
await client.start();

@@ -152,3 +152,3 @@

it('Should reject all requests in flight on disconnecting', async () => {
const client = new AMQPRPCClient(connection);
const client = new AMQPRPCClient(connection, {requestsQueue: 'tmp-queue-2'});
await client.start();

@@ -155,0 +155,0 @@

@@ -29,5 +29,9 @@ 'use strict';

describe('#contructor', () => {
it('should throw when params.requestsQueue is omitted', () => {
expect(() => new AMQPRPCClient(connectionStub, {})).to.throw('params.requestsQueue is required');
});
it('should consider params.repliesQueue', () => {
const repliesQueue = 'q';
const client = new AMQPRPCClient(connectionStub, {repliesQueue});
const repliesQueue = 'replies';
const client = new AMQPRPCClient(connectionStub, {repliesQueue, requestsQueue: 'q'});
expect(client.repliesQueue).to.equal(repliesQueue);

@@ -37,3 +41,3 @@ });

const timeout = 57;
const client = new AMQPRPCClient(connectionStub, {timeout});
const client = new AMQPRPCClient(connectionStub, {timeout, requestsQueue: 'q'});
expect(client._params.timeout).to.equal(timeout);

@@ -47,3 +51,3 @@ });

it('should create amqp channel for work', async () => {
const client = new AMQPRPCClient(connectionStub);
const client = new AMQPRPCClient(connectionStub, {requestsQueue: 'q'});
await client.start();

@@ -55,3 +59,3 @@ expect(connectionStub.createChannel).to.have.been.calledOnce;

it('should create generated amqp queue with options', async () => {
const client = new AMQPRPCClient(connectionStub);
const client = new AMQPRPCClient(connectionStub, {requestsQueue: 'q'});
const queueStub = {

@@ -71,3 +75,3 @@ queue: 'q1'

const repliesQueue = 'qq';
const client = new AMQPRPCClient(connectionStub, {repliesQueue});
const client = new AMQPRPCClient(connectionStub, {repliesQueue, requestsQueue: 'q'});
await client.start();

@@ -79,3 +83,3 @@ expect(channelStub.assertQueue).not.to.be.called;

it('should start listening from queue', async () => {
const client = new AMQPRPCClient(connectionStub);
const client = new AMQPRPCClient(connectionStub, {requestsQueue: 'q'});
let consumerMethod;

@@ -105,3 +109,3 @@ channelStub.consume = (queueName, cb) => {

it('should delete queue if it was created by client', async () => {
const client = new AMQPRPCClient(connectionStub);
const client = new AMQPRPCClient(connectionStub, {requestsQueue: 'q'});
await client.start();

@@ -115,3 +119,3 @@ const repliesQueue = client.repliesQueue;

it('should not delete queue if params.repliesQueue is set', async () => {
const client = new AMQPRPCClient(connectionStub, {repliesQueue: 'qqqq'});
const client = new AMQPRPCClient(connectionStub, {repliesQueue: 'replies', requestsQueue: 'q'});
await client.start();

@@ -127,3 +131,3 @@ await client.disconnect();

channelStub.consume = sinon.stub().returns(Promise.resolve({consumerTag}));
const server = new AMQPRPCClient(connectionStub, {repliesQueue});
const server = new AMQPRPCClient(connectionStub, {repliesQueue, requestsQueue: 'q'});
await server.start();

@@ -136,3 +140,3 @@ await server.disconnect();

it('should close channel', async () => {
const client = new AMQPRPCClient(connectionStub);
const client = new AMQPRPCClient(connectionStub, {requestsQueue: 'q'});
await client.start();

@@ -144,3 +148,3 @@ await client.disconnect();

it('should clear _requests map', async () => {
const client = new AMQPRPCClient(connectionStub);
const client = new AMQPRPCClient(connectionStub, {requestsQueue: 'q'});
await client.start();

@@ -162,3 +166,3 @@ setTimeout(() => {

if (e.message.indexOf('canceled due to client disconnect') === -1) {
//it's another error, than expected, time to harakiri
//this is another error than expected, so it's time to harakiri
throw e;

@@ -165,0 +169,0 @@ }

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc