rabbit-queue
Advanced tools
Comparing version
@@ -6,3 +6,3 @@ import Rabbit from './rabbit'; | ||
declare abstract class BaseQueueHandler { | ||
queueName: any; | ||
queueName: string; | ||
rabbit: Rabbit; | ||
@@ -19,2 +19,4 @@ dlqName: string; | ||
prefetch?: number; | ||
options: amqp.Options.AssertQueue & amqp.Options.Consume; | ||
migrateQueue: boolean; | ||
static SCOPES: { | ||
@@ -24,3 +26,3 @@ singleton: 'SINGLETON'; | ||
}; | ||
constructor(queueName: any, rabbit: Rabbit, { retries, retryDelay, logEnabled, scope, createAndSubscribeToQueue, prefetch, }?: { | ||
constructor(queueName: string, rabbit: Rabbit, { retries, retryDelay, logEnabled, scope, createAndSubscribeToQueue, prefetch, migrateQueue, options }?: { | ||
retries?: number; | ||
@@ -32,8 +34,10 @@ retryDelay?: number; | ||
prefetch?: number; | ||
migrateQueue?: boolean; | ||
options?: amqp.Options.AssertQueue; | ||
}); | ||
getDlq(): string; | ||
getCorrelationId(msg: amqp.Message, event?: any): any; | ||
getQueueOptions(): {}; | ||
getDlqOptions(): any; | ||
static prototypeFactory<T extends BaseQueueHandler>(queueName: any, rabbit: Rabbit, options?: {}): T; | ||
getCorrelationId(msg: amqp.Message, _event?: any): any; | ||
getQueueOptions(): amqp.Options.AssertQueue & amqp.Options.Consume; | ||
getDlqOptions(): amqp.Options.AssertQueue & amqp.Options.Consume; | ||
static prototypeFactory<T extends BaseQueueHandler>(queueName: string, rabbit: Rabbit, options?: {}): T; | ||
createQueues(): Promise<void>; | ||
@@ -40,0 +44,0 @@ tryHandle(retries: any, msg: amqp.Message, ack: (error: any, reply: any) => any): Promise<void>; |
@@ -14,4 +14,5 @@ "use strict"; | ||
const logger_1 = require("./logger"); | ||
const migrator_1 = require("./migrator"); | ||
class BaseQueueHandler { | ||
constructor(queueName, rabbit, { retries = 3, retryDelay = 1000, logEnabled = true, scope = BaseQueueHandler.SCOPES.singleton, createAndSubscribeToQueue = true, prefetch = rabbit.prefetch, } = {}) { | ||
constructor(queueName, rabbit, { retries = 3, retryDelay = 1000, logEnabled = true, scope = BaseQueueHandler.SCOPES.singleton, createAndSubscribeToQueue = true, prefetch = rabbit.prefetch, migrateQueue = false, options = {} } = {}) { | ||
this.queueName = queueName; | ||
@@ -27,2 +28,4 @@ this.rabbit = rabbit; | ||
this.dlqName = this.getDlq(); | ||
this.options = options; | ||
this.migrateQueue = migrateQueue; | ||
if (createAndSubscribeToQueue) { | ||
@@ -35,10 +38,10 @@ this.created = this.createQueues(); | ||
} | ||
getCorrelationId(msg, event) { | ||
getCorrelationId(msg, _event) { | ||
return msg.properties.correlationId; | ||
} | ||
getQueueOptions() { | ||
return {}; | ||
return this.options; | ||
} | ||
getDlqOptions() { | ||
return undefined; | ||
return this.options; | ||
} | ||
@@ -52,6 +55,10 @@ static prototypeFactory(queueName, rabbit, options = {}) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
if (this.migrateQueue) { | ||
yield this.rabbit.connected; | ||
yield new migrator_1.Migrator(this.rabbit.consumeConnection).tryMigrateQueue(this.queueName, this.getQueueOptions()); | ||
} | ||
this.queue = yield this.rabbit | ||
.createQueue(this.queueName, Object.assign(Object.assign({}, this.getQueueOptions()), { prefetch: this.prefetch }), (msg, ack) => { | ||
if (this.scope === BaseQueueHandler.SCOPES.singleton) { | ||
this.tryHandle(0, msg, ack).catch((e) => this.logger.error(e)); | ||
this.tryHandle(0, msg, ack).catch(e => this.logger.error(e)); | ||
} | ||
@@ -65,11 +72,14 @@ else { | ||
scope: this.scope, | ||
createAndSubscribeToQueue: false, | ||
createAndSubscribeToQueue: false | ||
}); | ||
instance.tryHandle(0, msg, ack).catch((e) => this.logger.error(e)); | ||
instance.tryHandle(0, msg, ack).catch(e => this.logger.error(e)); | ||
} | ||
}) | ||
.catch((error) => this.logger.error(error)); | ||
.catch(error => this.logger.error(error)); | ||
if (this.migrateQueue) { | ||
yield new migrator_1.Migrator(this.rabbit.consumeConnection).tryMigrateQueue(this.dlqName, this.getDlqOptions()); | ||
} | ||
this.dlq = yield this.rabbit | ||
.createQueue(this.dlqName, this.getDlqOptions()) | ||
.catch((error) => this.logger.error(error)); | ||
.catch(error => this.logger.error(error)); | ||
}); | ||
@@ -93,3 +103,3 @@ } | ||
this.handleError(err, msg); | ||
this.retry(retries, msg, ack).catch((error) => this.logger.error(error)); | ||
this.retry(retries, msg, ack).catch(error => this.logger.error(error)); | ||
} | ||
@@ -107,3 +117,3 @@ }); | ||
stack: err.stack && err.stack.substr(0, 200), | ||
time: new Date().toString(), | ||
time: new Date().toString() | ||
}; | ||
@@ -135,3 +145,3 @@ } | ||
afterDlq(data) { | ||
this.logger.info(`[${this.getCorrelationId(data.msg)}] Added to dlq`); | ||
this.logger.info(`[${this.getCorrelationId(data.msg)}] Running afterDlq`); | ||
} | ||
@@ -158,5 +168,5 @@ addToDLQ(retries, msg, ack) { | ||
singleton: 'SINGLETON', | ||
prototype: 'PROTOTYPE', | ||
prototype: 'PROTOTYPE' | ||
}; | ||
exports.default = BaseQueueHandler; | ||
//# sourceMappingURL=base-queue-handler.js.map |
@@ -14,4 +14,4 @@ /// <reference types="node" /> | ||
static INSTANCE: Rabbit; | ||
consumeConnection: amqp.Connection; | ||
publishConnection: amqp.Connection; | ||
consumeConnection: amqp.ChannelModel; | ||
publishConnection: amqp.ChannelModel; | ||
consumeChannel: Channel; | ||
@@ -41,3 +41,3 @@ publishChannel: Channel; | ||
private emitDisconnected; | ||
createChannel(connection: amqp.Connection): Promise<amqp.ConfirmChannel>; | ||
createChannel(connection: amqp.ChannelModel): Promise<amqp.ConfirmChannel>; | ||
initChannel(channel: Channel, publish?: boolean): Promise<void>; | ||
@@ -44,0 +44,0 @@ private updateName; |
@@ -57,3 +57,3 @@ "use strict"; | ||
this.connected = this.connect(); | ||
yield this.connected.catch((error) => this.emitDisconnected(error)); | ||
yield this.connected.catch(error => this.emitDisconnected(error)); | ||
}); | ||
@@ -67,4 +67,4 @@ } | ||
return __awaiter(this, void 0, void 0, function* () { | ||
connection.once('close', (error) => this.emitDisconnected(error)); | ||
connection.on('error', (error) => this.emitDisconnected(error)); | ||
connection.once('close', error => this.emitDisconnected(error)); | ||
connection.on('error', error => this.emitDisconnected(error)); | ||
return connection.createConfirmChannel(); | ||
@@ -76,3 +76,3 @@ }); | ||
channel.prefetch(this.prefetch); | ||
channel.on('close', (error) => this.emitDisconnected(error)); | ||
channel.on('close', error => this.emitDisconnected(error)); | ||
if (!publish && this.replyPattern) { | ||
@@ -79,0 +79,0 @@ yield reply_queue_1.createReplyQueue(this.consumeChannel); |
{ | ||
"name": "rabbit-queue", | ||
"version": "5.5.0", | ||
"version": "5.6.0", | ||
"description": "AMQP/RabbitMQ queue management library.", | ||
@@ -54,3 +54,3 @@ "main": "js/index.js", | ||
"dependencies": { | ||
"amqplib": "*", | ||
"amqplib": ">=0.10.0", | ||
"race-until": "~2.3.1", | ||
@@ -61,3 +61,3 @@ "uuid": "~8.3.2" | ||
"devDependencies": { | ||
"@types/amqplib": "0.8.2", | ||
"@types/amqplib": "^0.10.7", | ||
"@types/mocha": "~9.0.0", | ||
@@ -73,2 +73,3 @@ "@types/node": "~16.4.1", | ||
"nyc": "~15.1.0", | ||
"prettier": "^3.5.3", | ||
"should": "^13.2.3", | ||
@@ -75,0 +76,0 @@ "sinon": "~11.1.1", |
@@ -220,3 +220,36 @@ # Rabbit Queue | ||
``` | ||
const rabbit = new Rabbit('amqp://localhost'); | ||
class DemoHandler extends BaseQueueHandler { | ||
handle({ msg, event, correlationId, startTime }) { | ||
console.log('Received: ', event); | ||
console.log('With correlation id: ' + correlationId); | ||
} | ||
afterDlq({ msg, event }) { | ||
// Something to do after added to dlq | ||
} | ||
} | ||
new DemoHandler('demoQueue', rabbit, { | ||
retries: 3, | ||
retryDelay: 1000, | ||
logEnabled: true, //log queue processing time | ||
scope: 'SINGLETON', //can also be 'PROTOTYPE' to create a new instance every time | ||
createAndSubscribeToQueue: true, // used internally no need to overwrite | ||
migrateQueue: true // if you want to migrate the queue from one configuration to another - otherwise it will throw an error if the queue already exists with different configuration | ||
}); | ||
rabbit.publish('demoQueue', { test: 'data' }, { correlationId: '4' }); | ||
``` | ||
### Migrating Queues | ||
Rabbit-queue supports migrating queues from one configuration to another. | ||
RabbitMQ will fail and close the channel if you try to create a queue with the same name but different configuration. | ||
To migrate a queue, you can use the `migrateQueue` option. | ||
```javascript | ||
const rabbit = new Rabbit(process.env.RABBIT_URL || 'amqp://localhost'); | ||
### Changelog | ||
@@ -223,0 +256,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Wildcard dependency
QualityPackage has a dependency with a floating version range. This can cause issues if the dependency publishes a new major version.
Found 1 instance in 1 package
108689
15.35%41
17.14%1251
15.3%0
-100%367
9.88%18
5.88%Updated