🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
Book a DemoInstallSign in
Socket

rabbit-queue

Package Overview
Dependencies
Maintainers
3
Versions
50
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rabbit-queue - npm Package Compare versions

Comparing version

to
5.6.0

js/migrate.d.ts

16

js/base-queue-handler.d.ts

@@ -6,3 +6,3 @@ import Rabbit from './rabbit';

declare abstract class BaseQueueHandler {
queueName: any;
queueName: string;
rabbit: Rabbit;

@@ -19,2 +19,4 @@ dlqName: string;

prefetch?: number;
options: amqp.Options.AssertQueue & amqp.Options.Consume;
migrateQueue: boolean;
static SCOPES: {

@@ -24,3 +26,3 @@ singleton: 'SINGLETON';

};
constructor(queueName: any, rabbit: Rabbit, { retries, retryDelay, logEnabled, scope, createAndSubscribeToQueue, prefetch, }?: {
constructor(queueName: string, rabbit: Rabbit, { retries, retryDelay, logEnabled, scope, createAndSubscribeToQueue, prefetch, migrateQueue, options }?: {
retries?: number;

@@ -32,8 +34,10 @@ retryDelay?: number;

prefetch?: number;
migrateQueue?: boolean;
options?: amqp.Options.AssertQueue;
});
getDlq(): string;
getCorrelationId(msg: amqp.Message, event?: any): any;
getQueueOptions(): {};
getDlqOptions(): any;
static prototypeFactory<T extends BaseQueueHandler>(queueName: any, rabbit: Rabbit, options?: {}): T;
getCorrelationId(msg: amqp.Message, _event?: any): any;
getQueueOptions(): amqp.Options.AssertQueue & amqp.Options.Consume;
getDlqOptions(): amqp.Options.AssertQueue & amqp.Options.Consume;
static prototypeFactory<T extends BaseQueueHandler>(queueName: string, rabbit: Rabbit, options?: {}): T;
createQueues(): Promise<void>;

@@ -40,0 +44,0 @@ tryHandle(retries: any, msg: amqp.Message, ack: (error: any, reply: any) => any): Promise<void>;

@@ -14,4 +14,5 @@ "use strict";

const logger_1 = require("./logger");
const migrator_1 = require("./migrator");
class BaseQueueHandler {
constructor(queueName, rabbit, { retries = 3, retryDelay = 1000, logEnabled = true, scope = BaseQueueHandler.SCOPES.singleton, createAndSubscribeToQueue = true, prefetch = rabbit.prefetch, } = {}) {
constructor(queueName, rabbit, { retries = 3, retryDelay = 1000, logEnabled = true, scope = BaseQueueHandler.SCOPES.singleton, createAndSubscribeToQueue = true, prefetch = rabbit.prefetch, migrateQueue = false, options = {} } = {}) {
this.queueName = queueName;

@@ -27,2 +28,4 @@ this.rabbit = rabbit;

this.dlqName = this.getDlq();
this.options = options;
this.migrateQueue = migrateQueue;
if (createAndSubscribeToQueue) {

@@ -35,10 +38,10 @@ this.created = this.createQueues();

}
getCorrelationId(msg, event) {
getCorrelationId(msg, _event) {
return msg.properties.correlationId;
}
getQueueOptions() {
return {};
return this.options;
}
getDlqOptions() {
return undefined;
return this.options;
}

@@ -52,6 +55,10 @@ static prototypeFactory(queueName, rabbit, options = {}) {

return __awaiter(this, void 0, void 0, function* () {
if (this.migrateQueue) {
yield this.rabbit.connected;
yield new migrator_1.Migrator(this.rabbit.consumeConnection).tryMigrateQueue(this.queueName, this.getQueueOptions());
}
this.queue = yield this.rabbit
.createQueue(this.queueName, Object.assign(Object.assign({}, this.getQueueOptions()), { prefetch: this.prefetch }), (msg, ack) => {
if (this.scope === BaseQueueHandler.SCOPES.singleton) {
this.tryHandle(0, msg, ack).catch((e) => this.logger.error(e));
this.tryHandle(0, msg, ack).catch(e => this.logger.error(e));
}

@@ -65,11 +72,14 @@ else {

scope: this.scope,
createAndSubscribeToQueue: false,
createAndSubscribeToQueue: false
});
instance.tryHandle(0, msg, ack).catch((e) => this.logger.error(e));
instance.tryHandle(0, msg, ack).catch(e => this.logger.error(e));
}
})
.catch((error) => this.logger.error(error));
.catch(error => this.logger.error(error));
if (this.migrateQueue) {
yield new migrator_1.Migrator(this.rabbit.consumeConnection).tryMigrateQueue(this.dlqName, this.getDlqOptions());
}
this.dlq = yield this.rabbit
.createQueue(this.dlqName, this.getDlqOptions())
.catch((error) => this.logger.error(error));
.catch(error => this.logger.error(error));
});

@@ -93,3 +103,3 @@ }

this.handleError(err, msg);
this.retry(retries, msg, ack).catch((error) => this.logger.error(error));
this.retry(retries, msg, ack).catch(error => this.logger.error(error));
}

@@ -107,3 +117,3 @@ });

stack: err.stack && err.stack.substr(0, 200),
time: new Date().toString(),
time: new Date().toString()
};

@@ -135,3 +145,3 @@ }

afterDlq(data) {
this.logger.info(`[${this.getCorrelationId(data.msg)}] Added to dlq`);
this.logger.info(`[${this.getCorrelationId(data.msg)}] Running afterDlq`);
}

@@ -158,5 +168,5 @@ addToDLQ(retries, msg, ack) {

singleton: 'SINGLETON',
prototype: 'PROTOTYPE',
prototype: 'PROTOTYPE'
};
exports.default = BaseQueueHandler;
//# sourceMappingURL=base-queue-handler.js.map

@@ -14,4 +14,4 @@ /// <reference types="node" />

static INSTANCE: Rabbit;
consumeConnection: amqp.Connection;
publishConnection: amqp.Connection;
consumeConnection: amqp.ChannelModel;
publishConnection: amqp.ChannelModel;
consumeChannel: Channel;

@@ -41,3 +41,3 @@ publishChannel: Channel;

private emitDisconnected;
createChannel(connection: amqp.Connection): Promise<amqp.ConfirmChannel>;
createChannel(connection: amqp.ChannelModel): Promise<amqp.ConfirmChannel>;
initChannel(channel: Channel, publish?: boolean): Promise<void>;

@@ -44,0 +44,0 @@ private updateName;

@@ -57,3 +57,3 @@ "use strict";

this.connected = this.connect();
yield this.connected.catch((error) => this.emitDisconnected(error));
yield this.connected.catch(error => this.emitDisconnected(error));
});

@@ -67,4 +67,4 @@ }

return __awaiter(this, void 0, void 0, function* () {
connection.once('close', (error) => this.emitDisconnected(error));
connection.on('error', (error) => this.emitDisconnected(error));
connection.once('close', error => this.emitDisconnected(error));
connection.on('error', error => this.emitDisconnected(error));
return connection.createConfirmChannel();

@@ -76,3 +76,3 @@ });

channel.prefetch(this.prefetch);
channel.on('close', (error) => this.emitDisconnected(error));
channel.on('close', error => this.emitDisconnected(error));
if (!publish && this.replyPattern) {

@@ -79,0 +79,0 @@ yield reply_queue_1.createReplyQueue(this.consumeChannel);

{
"name": "rabbit-queue",
"version": "5.5.0",
"version": "5.6.0",
"description": "AMQP/RabbitMQ queue management library.",

@@ -54,3 +54,3 @@ "main": "js/index.js",

"dependencies": {
"amqplib": "*",
"amqplib": ">=0.10.0",
"race-until": "~2.3.1",

@@ -61,3 +61,3 @@ "uuid": "~8.3.2"

"devDependencies": {
"@types/amqplib": "0.8.2",
"@types/amqplib": "^0.10.7",
"@types/mocha": "~9.0.0",

@@ -73,2 +73,3 @@ "@types/node": "~16.4.1",

"nyc": "~15.1.0",
"prettier": "^3.5.3",
"should": "^13.2.3",

@@ -75,0 +76,0 @@ "sinon": "~11.1.1",

@@ -220,3 +220,36 @@ # Rabbit Queue

```
const rabbit = new Rabbit('amqp://localhost');
class DemoHandler extends BaseQueueHandler {
handle({ msg, event, correlationId, startTime }) {
console.log('Received: ', event);
console.log('With correlation id: ' + correlationId);
}
afterDlq({ msg, event }) {
// Something to do after added to dlq
}
}
new DemoHandler('demoQueue', rabbit, {
retries: 3,
retryDelay: 1000,
logEnabled: true, //log queue processing time
scope: 'SINGLETON', //can also be 'PROTOTYPE' to create a new instance every time
createAndSubscribeToQueue: true, // used internally no need to overwrite
migrateQueue: true // if you want to migrate the queue from one configuration to another - otherwise it will throw an error if the queue already exists with different configuration
});
rabbit.publish('demoQueue', { test: 'data' }, { correlationId: '4' });
```
### Migrating Queues
Rabbit-queue supports migrating queues from one configuration to another.
RabbitMQ will fail and close the channel if you try to create a queue with the same name but different configuration.
To migrate a queue, you can use the `migrateQueue` option.
```javascript
const rabbit = new Rabbit(process.env.RABBIT_URL || 'amqp://localhost');
### Changelog

@@ -223,0 +256,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet