New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

rabbit-queue

Package Overview
Dependencies
Maintainers
1
Versions
49
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rabbit-queue - npm Package Compare versions

Comparing version 4.7.1 to 5.0.0-beta.0

js/get-logger.d.ts

3

js/base-queue-handler.d.ts
import Rabbit from './rabbit';
import * as amqp from 'amqplib';
import Queue from './queue';
import getLogger from './logger';
declare abstract class BaseQueueHandler {

@@ -10,3 +11,3 @@ queueName: any;

retryDelay: number;
logger: any;
logger: ReturnType<typeof getLogger>;
queue: Queue;

@@ -13,0 +14,0 @@ dlq: Queue;

"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());

@@ -11,4 +12,4 @@ });

Object.defineProperty(exports, "__esModule", { value: true });
const log4js = require("@log4js-node/log4js-api");
const encode_decode_1 = require("./encode-decode");
const logger_1 = require("./logger");
class BaseQueueHandler {

@@ -18,3 +19,3 @@ constructor(queueName, rabbit, { retries = 3, retryDelay = 1000, logEnabled = true, scope = BaseQueueHandler.SCOPES.singleton, createAndSubscribeToQueue = true, prefetch = rabbit.prefetch } = {}) {

this.rabbit = rabbit;
const logger = log4js.getLogger(`rabbit-queue.${queueName}`);
const logger = logger_1.default(`rabbit-queue.${queueName}`);
this.prefetch = prefetch;

@@ -45,3 +46,3 @@ this.retries = retries;

const Constructor = this;
const instance = new Constructor(queueName, rabbit, Object.assign({}, options, { scope: BaseQueueHandler.SCOPES.prototype }));
const instance = new Constructor(queueName, rabbit, Object.assign(Object.assign({}, options), { scope: BaseQueueHandler.SCOPES.prototype }));
return instance;

@@ -52,3 +53,3 @@ }

this.queue = yield this.rabbit
.createQueue(this.queueName, Object.assign({}, this.getQueueOptions(), { prefetch: this.prefetch }), (msg, ack) => {
.createQueue(this.queueName, Object.assign(Object.assign({}, this.getQueueOptions()), { prefetch: this.prefetch }), (msg, ack) => {
if (this.scope === BaseQueueHandler.SCOPES.singleton) {

@@ -55,0 +56,0 @@ this.tryHandle(0, msg, ack).catch(e => this.logger.error(e));

"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());

@@ -23,5 +24,5 @@ });

const queue_1 = require("./queue");
const log4js = require("@log4js-node/log4js-api");
const encode_decode_1 = require("./encode-decode");
const logger = log4js.getLogger('rabbit-queue');
const logger_1 = require("./logger");
const logger = logger_1.default('rabbit-queue');
let delayedQueue = {};

@@ -57,3 +58,3 @@ let delayedQueueReply;

const timestamp = new Date().getTime();
queue_1.default.publish({ queueName, obj, timestamp }, Object.assign({ expiration }, headers, { contentType: 'application/json' }), channel, delayedQueue[name].name, delayedQueue[name]);
queue_1.default.publish({ queueName, obj, timestamp }, Object.assign(Object.assign({ expiration }, headers), { contentType: 'application/json' }), channel, delayedQueue[name].name, delayedQueue[name]);
});

@@ -70,3 +71,3 @@ }

Actually took ${new Date().getTime() - timestamp} ms.`);
yield queue_1.default.publish(obj, Object.assign({}, properties, { headers: rest }), channel, queueName);
yield queue_1.default.publish(obj, Object.assign(Object.assign({}, properties), { headers: rest }), channel, queueName);
ack();

@@ -73,0 +74,0 @@ });

"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());

@@ -13,6 +14,6 @@ });

const reply_queue_1 = require("./reply-queue");
const log4js = require("@log4js-node/log4js-api");
const encode_decode_1 = require("./encode-decode");
const uuid = require("uuid");
const logger = log4js.getLogger('rabbit-queue');
const logger_1 = require("./logger");
const logger = logger_1.default('rabbit-queue');
exports.default = {

@@ -19,0 +20,0 @@ defaultHeaders: {

"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());

@@ -21,6 +22,6 @@ });

const race_until_1 = require("race-until");
const log4js = require("@log4js-node/log4js-api");
const stream_1 = require("stream");
const encode_decode_1 = require("./encode-decode");
const logger = log4js.getLogger('rabbit-queue');
const logger_1 = require("./logger");
const logger = logger_1.default('rabbit-queue');
class Queue {

@@ -201,2 +202,3 @@ constructor(channel, name, options) {

}
exports.default = Queue;
Queue.STOP_PROPAGATION = { stopPropagation: true };

@@ -206,3 +208,2 @@ Queue.ERROR_DURING_REPLY = { error: true, error_code: 999 };

Queue.STOP_STREAM_MESSAGE = { stopStream: true };
exports.default = Queue;
//# sourceMappingURL=queue.js.map

@@ -6,2 +6,3 @@ /// <reference types="node" />

import Queue from './queue';
import getLogger from './logger';
export default class Rabbit extends EventEmitter {

@@ -13,4 +14,7 @@ url: string;

static STOP_STREAM: string;
connection: amqp.Connection;
channel: Channel;
static INSTANCE: Rabbit;
consumeConnection: amqp.Connection;
publishConnection: amqp.Connection;
consumeChannel: Channel;
publishChannel: Channel;
connected: Promise<any>;

@@ -27,2 +31,3 @@ lock: Promise<void>;

socketOptions: any;
logger: ReturnType<typeof getLogger>;
constructor(url: string, { prefetch, replyPattern, prefix, scheduledPublish, socketOptions }?: {

@@ -38,4 +43,4 @@ prefetch?: number;

private emitDisconnected;
createChannel(connection: amqp.Connection): Promise<amqp.Channel>;
initChannel(channel: Channel): Promise<void>;
createChannel(connection: amqp.Connection): Promise<amqp.ConfirmChannel>;
initChannel(channel: Channel, publish?: boolean): Promise<void>;
private updateName;

@@ -42,0 +47,0 @@ createQueue(name: string, options?: amqp.Options.AssertQueue & {

"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());

@@ -18,4 +19,3 @@ });

const assert = require("assert");
const log4js = require("@log4js-node/log4js-api");
const logger = log4js.getLogger('rabbit-queue');
const logger_1 = require("./logger");
class Rabbit extends events_1.EventEmitter {

@@ -27,2 +27,6 @@ constructor(url, { prefetch = 1, replyPattern = true, prefix = '', scheduledPublish = false, socketOptions = {} } = {}) {

this.connecting = false;
if (!Rabbit.INSTANCE) {
Rabbit.INSTANCE = this;
}
this.logger = logger_1.default('rabbit-queue');
assert(url, 'Url is required!');

@@ -42,5 +46,10 @@ this.prefetch = prefetch;

this.connecting = true;
let connection = yield amqp.connect(this.url, this.socketOptions);
let channel = yield this.createChannel(connection);
yield this.initChannel(channel);
this.consumeConnection = yield amqp.connect(this.url, this.socketOptions);
this.consumeChannel = yield this.createChannel(this.consumeConnection);
yield this.initChannel(this.consumeChannel);
this.publishConnection = yield amqp.connect(this.url, this.socketOptions);
this.publishChannel = yield this.createChannel(this.publishConnection);
yield this.initChannel(this.publishChannel, true);
this.emit('connected');
this.connecting = false;
});

@@ -60,21 +69,17 @@ }

return __awaiter(this, void 0, void 0, function* () {
this.connection = connection;
this.connection.once('close', error => this.emitDisconnected(error));
this.connection.on('error', error => this.emitDisconnected(error));
connection.once('close', error => this.emitDisconnected(error));
connection.on('error', error => this.emitDisconnected(error));
return connection.createConfirmChannel();
});
}
initChannel(channel) {
initChannel(channel, publish = false) {
return __awaiter(this, void 0, void 0, function* () {
this.channel = channel;
this.channel.prefetch(this.prefetch);
this.channel.on('close', error => this.emitDisconnected(error));
if (this.replyPattern) {
yield reply_queue_1.createReplyQueue(this.channel);
channel.prefetch(this.prefetch);
channel.on('close', error => this.emitDisconnected(error));
if (!publish && this.replyPattern) {
yield reply_queue_1.createReplyQueue(this.consumeChannel);
}
if (this.scheduledPublish) {
yield delay_queue_1.createDelayQueueReply(this.channel, this.updateName('delay'));
if (!publish && this.scheduledPublish) {
yield delay_queue_1.createDelayQueueReply(this.consumeChannel, this.updateName('delay'));
}
this.emit('connected');
this.connecting = false;
});

@@ -98,6 +103,6 @@ }

yield this.connected;
const queue = new queue_1.default(this.channel, name, options);
const queue = new queue_1.default(this.consumeChannel, name, options);
this.queues[name] = queue;
yield queue.created;
logger.debug(`created queue ${name}`);
this.logger.debug(`created queue ${name}`);
if (handler) {

@@ -111,3 +116,3 @@ let localLock;

this.prefetch = options.prefetch;
this.lock = Promise.resolve(this.channel.prefetch(options.prefetch)).then(() => queue.subscribe(handler));
this.lock = Promise.resolve(this.consumeChannel.prefetch(options.prefetch)).then(() => queue.subscribe(handler));
yield this.lock;

@@ -127,3 +132,3 @@ }

yield this.connected;
yield queue_1.default.destroy(this.channel, name);
yield queue_1.default.destroy(this.consumeChannel, name);
});

@@ -149,3 +154,3 @@ }

yield this.connected;
yield queue_1.default.publish(obj, headers, this.channel, name, this.queues[name]);
yield queue_1.default.publish(obj, headers, this.publishChannel, name, this.queues[name]);
});

@@ -160,3 +165,3 @@ }

yield this.connected;
yield delay_queue_1.publishWithDelay(this.updateName('delay'), obj, properties, this.channel, name);
yield delay_queue_1.publishWithDelay(this.updateName('delay'), obj, properties, this.consumeChannel, name);
});

@@ -168,3 +173,3 @@ }

yield this.connected;
return yield queue_1.default.getReply(obj, properties, this.channel, name, this.queues[name], timeout);
return yield queue_1.default.getReply(obj, properties, this.publishChannel, name, this.queues[name], timeout);
});

@@ -176,3 +181,3 @@ }

yield this.connected;
return yield exchange_1.default.getReply(this.channel, 'amq.topic', topicName, content, properties, timeout);
return yield exchange_1.default.getReply(this.publishChannel, 'amq.topic', topicName, content, properties, timeout);
});

@@ -184,3 +189,3 @@ }

yield this.connected;
yield exchange_1.default.publish(this.channel, exchange, routingKey, content, headers);
yield exchange_1.default.publish(this.publishChannel, exchange, routingKey, content, headers);
});

@@ -192,3 +197,3 @@ }

yield this.connected;
yield exchange_1.default.publish(this.channel, 'amq.topic', topicName, content, headers);
yield exchange_1.default.publish(this.publishChannel, 'amq.topic', topicName, content, headers);
});

@@ -200,3 +205,3 @@ }

yield this.connected;
yield queue_1.default.bindToExchange(exchange, routingKey, this.channel, queueName, this.queues[queueName]);
yield queue_1.default.bindToExchange(exchange, routingKey, this.consumeChannel, queueName, this.queues[queueName]);
});

@@ -208,3 +213,3 @@ }

yield this.connected;
yield queue_1.default.unbindFromExchange(exchange, topicName, this.channel, queueName, this.queues[queueName]);
yield queue_1.default.unbindFromExchange(exchange, topicName, this.consumeChannel, queueName, this.queues[queueName]);
});

@@ -226,9 +231,10 @@ }

return __awaiter(this, void 0, void 0, function* () {
yield this.connection.close();
yield this.consumeConnection.close();
yield this.publishConnection.close();
});
}
}
exports.default = Rabbit;
Rabbit.STOP_PROPAGATION = queue_1.default.STOP_PROPAGATION;
Rabbit.STOP_STREAM = queue_1.default.STOP_STREAM;
exports.default = Rabbit;
//# sourceMappingURL=rabbit.js.map
"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());

@@ -14,6 +15,6 @@ });

const queue_1 = require("./queue");
const log4js = require("@log4js-node/log4js-api");
const stream_1 = require("stream");
const encode_decode_1 = require("./encode-decode");
const logger = log4js.getLogger('rabbit-queue');
const logger_1 = require("./logger");
const logger = logger_1.default('rabbit-queue');
let replyHandlers = {};

@@ -45,3 +46,3 @@ let streamHandlers = {};

correlationId,
replyTo: channel.replyName,
replyTo: options.channel.replyName,
contentType: 'application/json'

@@ -48,0 +49,0 @@ }, properties);

{
"name": "rabbit-queue",
"version": "4.7.1",
"version": "5.0.0-beta.0",
"description": "AMQP/RabbitMQ queue management library.",

@@ -54,31 +54,26 @@ "main": "js/index.js",

"dependencies": {
"@log4js-node/log4js-api": "^1.0.0",
"amqplib": "^0.5.5",
"race-until": "^2.2.0",
"uuid": "^3.0.1"
"uuid": "^7.0.0"
},
"typings": "js/index",
"peerDependencies": {
"log4js": "*"
},
"devDependencies": {
"@types/amqplib": "0.3.29",
"@types/mocha": "2.2.32",
"@types/amqplib": "^0.5.13",
"@types/mocha": "^7.0.1",
"@types/node": "12.6.8",
"@types/node-uuid": "0.0.28",
"@types/should": "13.0.0",
"@types/sinon": "1.16.30",
"@types/source-map-support": "0.2.28",
"@types/uuid": "2.0.29",
"@types/should": "^13.0.0",
"@types/sinon": "^7.5.1",
"@types/source-map-support": "^0.5.1",
"@types/uuid": "^3.4.7",
"husky": "^3.0.0",
"lint-staged": "^9.0.2",
"mocha": "^6.2.0",
"nyc": "^14.1.1",
"should": "13.2.3",
"sinon": "1.17.5",
"source-map-support": "^0.4.18",
"ts-node": "^8.3.0",
"tslint": "5.18.0",
"typescript": "3.5.3"
"mocha": "^7.0.1",
"nyc": "^15.0.0",
"should": "^13.2.3",
"sinon": "^9.0.0",
"source-map-support": "^0.5.16",
"ts-node": "^8.6.2",
"tslint": "^6.0.0",
"typescript": "^3.8.2"
}
}

@@ -187,23 +187,23 @@ # Rabbit Queue

```javascript
const rabbit = new Rabbit(process.env.RABBIT_URL || 'amqp://localhost');
class DemoHandler extends BaseQueueHandler {
handle({ msg, event, correlationId, startTime }) {
const stream = new Readable({ read() {} });
stream.push('streaming');
stream.push('events');
stream.push(null); //the end
return stream;
}
}
const rabbit = new Rabbit(process.env.RABBIT_URL || 'amqp://localhost');
class DemoHandler extends BaseQueueHandler {
handle({ msg, event, correlationId, startTime }) {
const stream = new Readable({ read() {} });
stream.push('streaming');
stream.push('events');
stream.push(null); //the end
return stream;
}
}
new DemoHandler('demoQueue', rabbit, {
retries: 3,
retryDelay: 1,
logEnabled: true
});
new DemoHandler('demoQueue', rabbit, {
retries: 3,
retryDelay: 1,
logEnabled: true
});
const reply = await rabbit.getReply('demoQueue', { test: 'data' }, { correlationId: '5' });
for await (const chunk of reply) {
console.log(`Received chunk: ${chunk.toString()}`);
}
const reply = await rabbit.getReply('demoQueue', { test: 'data' }, { correlationId: '5' });
for await (const chunk of reply) {
console.log(`Received chunk: ${chunk.toString()}`);
}
```

@@ -213,23 +213,9 @@

Rabbit-queue uses log4js-api so you need to install log4js for logging to work.
Rabbit-queue logs to console by default.
It also emits events for each log so that you can use your own logger
Creations of queues, bindings are logged in debug level.
Message enqueues, dequeues are logged in info level.
Errors in BaseQueueHandler are logged in error level. (You can add your own error logging logic in afterDlq method.)
eg:
Using log4js v2 and custom appenders like [log4js_honeybadger_appender](https://www.npmjs.com/package/log4js_honeybadger_appender) you can directly log rabbit-queue errors directly to honeybadger.
log4js configuration example
```javascript
log4js.configure({
appenders: {
out: { type: 'stdout', layout: { type: 'basic' } }
},
honeybadger: { type: 'log4js_honeybadger_appender' },
categories: {
default: { appenders: ['out'], level: 'info' },
'rabbit-queue': { appenders: ['out', 'honeybadger'], level: 'debug' }
}
});
rabbit.on('log', (component, level, ...args) => console.log(`[${level}] ${component}`, ...args));
```

@@ -239,2 +225,8 @@

### v4.x.x to v5.x.x
- Support for Node LTS v6 and v8 was dropped. You should use Node v10 and higher.
- Two connections are created by default to rabbitMQ. One for consuming and one for producing messages.
- Dependency to log4js was dropped. Console is used by default but you can easily plug your own logger.
### New in v4.7.x

@@ -245,9 +237,8 @@

```js
rabbit
.createQueue('queueName', { durable: false, prefetch: 100 }, (msg, ack) => {
console.log(msg.content.toString());
ack(null, 'response');
});
rabbit.createQueue('queueName', { durable: false, prefetch: 100 }, (msg, ack) => {
console.log(msg.content.toString());
ack(null, 'response');
});
// or
// or

@@ -258,3 +249,3 @@ class DemoHandler extends BaseQueueHandler {

new DemoHandler('demoQueue', rabbit, { prefetch: 100 })
new DemoHandler('demoQueue', rabbit, { prefetch: 100 });
```

@@ -265,2 +256,5 @@

Also note that if you use rabbit prior to 3.3.0 the behavior might be different.
See https://www.rabbitmq.com/consumer-prefetch.html for more details
### v4.2.x to > v4.4.x

@@ -273,8 +267,12 @@

```js
const reply = await rabbit.getReply('demoQueue', { test: 'data' }, { headers: { test: 1, backpressure: true }, correlationId: '1' });
const reply = await rabbit.getReply(
'demoQueue',
{ test: 'data' },
{ headers: { test: 1, backpressure: true }, correlationId: '1' }
);
for await (const chunk of reply) {
console.log(`Received chunk: ${chunk.toString()}`);
if ("sufficient_data_received") {
reply.emit(Queue.STOP_STREAM);
}
console.log(`Received chunk: ${chunk.toString()}`);
if ('sufficient_data_received') {
reply.emit(Queue.STOP_STREAM);
}
}

@@ -300,3 +298,3 @@ ```

If used the rpc that responds to this request will stop sending messages until the receiving stream has consumed those messages or has buffered them (By default nodejs stream buffer for json streams is 16 objects). If this does not happen within the timeout the process will stop.
If used the rpc that responds to this request will stop sending messages until the receiving stream has consumed those messages or has buffered them (By default nodejs stream buffer for json streams is 16 objects). If this does not happen within the timeout the process will stop.

@@ -303,0 +301,0 @@ Use this feature only if both requesting and receiving part have rabbit-queue > 4.2.0

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc