@coolgk/amqp
Advanced tools
Comparing version 1.1.4 to 2.0.0
/*! | ||
* @package @coolgk/utils | ||
* @version 1.1.4 | ||
* @version 2.0.0 | ||
* @link https://www.npmjs.com/package/@coolgk/utils | ||
@@ -20,3 +20,3 @@ * @license MIT | ||
export interface IConsumeConfig { | ||
route?: string; | ||
routes?: string | string[]; | ||
queueName?: string; | ||
@@ -27,2 +27,3 @@ exchangeName?: string; | ||
exchangeType?: string; | ||
fallbackExchange?: string; | ||
} | ||
@@ -48,9 +49,10 @@ export interface IMessage { | ||
closeConnection(): void; | ||
publish(message: any, callback?: (message: IResponseMessage) => any, {route, exchangeName}?: { | ||
route?: string; | ||
publish(message: any, callback?: (message: IResponseMessage) => any, {routes, exchangeName}?: { | ||
routes?: string; | ||
exchangeName?: string; | ||
}): Promise<boolean>; | ||
consume(callback: (message: IMessage) => any, {route, queueName, exchangeName, exchangeType, priority, prefetch}?: IConsumeConfig): Promise<Replies.Consume>; | ||
private _getChannel(); | ||
}): Promise<boolean[]>; | ||
consume(callback: (message: IMessage) => any, {routes, queueName, exchangeName, exchangeType, priority, prefetch, fallbackExchange}?: IConsumeConfig): Promise<Replies.Consume[]>; | ||
getChannel(): any; | ||
private _publish(channel, exchangeName, routes, message, options); | ||
} | ||
export default Amqp; |
68
amqp.js
/*! | ||
* @package @coolgk/utils | ||
* @version 1.1.4 | ||
* @version 2.0.0 | ||
* @link https://www.npmjs.com/package/@coolgk/utils | ||
@@ -13,2 +13,3 @@ * @license MIT | ||
const fs_1 = require("fs"); | ||
const array_1 = require("@coolgk/array"); | ||
class Amqp { | ||
@@ -26,4 +27,4 @@ constructor(options) { | ||
} | ||
publish(message, callback, { route = '#', exchangeName = 'defaultExchange' } = {}) { | ||
return this._getChannel().then((channel) => { | ||
publish(message, callback, { routes = '#', exchangeName = 'defaultExchange' } = {}) { | ||
return this.getChannel().then((channel) => { | ||
if (callback) { | ||
@@ -39,6 +40,5 @@ const messageId = this._uuid(); | ||
channel.ack(rawResponseMessage); | ||
channel.deleteQueue(queue.queue); | ||
} | ||
}); | ||
return channel.publish(exchangeName, String(route), Buffer.from(JSON.stringify(message)), { | ||
return this._publish(channel, exchangeName, routes, message, { | ||
persistent: true, | ||
@@ -50,3 +50,3 @@ correlationId: messageId, | ||
} | ||
return channel.publish(exchangeName, String(route), Buffer.from(JSON.stringify(message)), { | ||
return this._publish(channel, exchangeName, routes, message, { | ||
persistent: true | ||
@@ -56,28 +56,37 @@ }); | ||
} | ||
consume(callback, { route = '#', queueName = '', exchangeName = 'defaultExchange', exchangeType = 'topic', priority = 0, prefetch = 0 } = {}) { | ||
return this._getChannel().then((channel) => { | ||
consume(callback, { routes = ['#'], queueName = '', exchangeName = 'defaultExchange', exchangeType = 'topic', priority = 0, prefetch = 1, fallbackExchange = '' } = {}) { | ||
return this.getChannel().then((channel) => { | ||
return channel.prefetch(prefetch).then(() => { | ||
return channel.assertExchange(exchangeName, exchangeType, { durable: true }); | ||
return channel.assertExchange(exchangeName, exchangeType, { | ||
durable: false, | ||
arguments: fallbackExchange ? { 'alternate-exchange': fallbackExchange } : {} | ||
}); | ||
}).then(() => { | ||
return channel.assertQueue(queueName, { durable: false }); | ||
return channel.assertQueue(queueName, { durable: false, exclusive: !queueName }); | ||
}).then((queue) => { | ||
return channel.bindQueue(queue.queue, exchangeName, String(route)).then(() => channel.consume(queue.queue, (rawMessage) => { | ||
Promise.resolve(callback({ | ||
rawMessage, | ||
message: JSON.parse(rawMessage.content.toString()) | ||
})).then((response = '') => { | ||
if (rawMessage | ||
&& rawMessage.properties.replyTo && rawMessage.properties.correlationId) { | ||
channel.sendToQueue(rawMessage.properties.replyTo, Buffer.from(JSON.stringify(response)), { | ||
persistent: true, | ||
correlationId: rawMessage.properties.correlationId | ||
}); | ||
} | ||
channel.ack(rawMessage); | ||
}); | ||
}, { priority })); | ||
const promises = []; | ||
for (const route of array_1.toArray(routes)) { | ||
promises.push(channel.bindQueue(queue.queue, exchangeName, String(route))); | ||
} | ||
return Promise.all(promises).then(() => { | ||
return channel.consume(queue.queue, (rawMessage) => { | ||
Promise.resolve(callback({ | ||
rawMessage, | ||
message: JSON.parse(rawMessage.content.toString()) | ||
})).then((response = '') => { | ||
if (rawMessage | ||
&& rawMessage.properties.replyTo && rawMessage.properties.correlationId) { | ||
channel.sendToQueue(rawMessage.properties.replyTo, Buffer.from(JSON.stringify(response)), { | ||
persistent: true, | ||
correlationId: rawMessage.properties.correlationId | ||
}); | ||
} | ||
channel.ack(rawMessage); | ||
}); | ||
}, { priority }); | ||
}); | ||
}); | ||
}); | ||
} | ||
_getChannel() { | ||
getChannel() { | ||
if (!this._channel) { | ||
@@ -101,4 +110,11 @@ if (this._sslPem) { | ||
} | ||
_publish(channel, exchangeName, routes, message, options) { | ||
const promises = []; | ||
for (const route of array_1.toArray(routes)) { | ||
promises.push(channel.publish(exchangeName, String(route), Buffer.from(JSON.stringify(message)), options)); | ||
} | ||
return Promise.all(promises); | ||
} | ||
} | ||
exports.Amqp = Amqp; | ||
exports.default = Amqp; |
@@ -1,1 +0,1 @@ | ||
{"name":"@coolgk/amqp","version":"1.1.4","author":"Daniel Gong <daniel.k.gong@gmail.com>","homepage":"https://www.npmjs.com/package/@coolgk/utils","bugs":{"url":"https://github.com/coolgk/node-utils/issues"},"repository":{"type":"git","url":"https://github.com/coolgk/node-utils.git"},"description":"a simple RabbitMQ (amqp wrapper) class for publishing and consuming messages","keywords":["ampq","rabbitmq","typescript"],"dependencies":{"@types/amqplib":"^0.5.5","amqplib":"^0.5.2","uuid":"^3.1.0","@types/uuid":"^3.4.3"},"pre-commit":{"silent":false,"run":["test:precommit","tslint"]},"main":"./amqp.js","types":"./amqp.d.ts","license":"MIT"} | ||
{"name":"@coolgk/amqp","version":"2.0.0","author":"Daniel Gong <daniel.k.gong@gmail.com>","homepage":"https://www.npmjs.com/package/@coolgk/utils","bugs":{"url":"https://github.com/coolgk/node-utils/issues"},"repository":{"type":"git","url":"https://github.com/coolgk/node-utils.git"},"description":"a simple RabbitMQ (amqp wrapper) class for publishing and consuming messages","keywords":["ampq","rabbitmq","typescript"],"dependencies":{"@types/amqplib":"^0.5.5","amqplib":"^0.5.2","uuid":"^3.1.0","@types/uuid":"^3.4.3","@coolgk/array":"^1.1.4"},"pre-commit":{"silent":false,"run":["tslint","test:precommit"]},"main":"./amqp.js","types":"./amqp.d.ts","license":"MIT"} |
@@ -53,4 +53,5 @@ | ||
* [.closeConnection()](#Amqp+closeConnection) ⇒ <code>void</code> | ||
* [.publish(message, [callback], [options])](#Amqp+publish) ⇒ <code>promise.<boolean></code> | ||
* [.publish(message, [callback], [options])](#Amqp+publish) ⇒ <code>promise.<Array.<boolean>></code> | ||
* [.consume(callback, [options])](#Amqp+consume) ⇒ <code>promise</code> | ||
* [.getChannel()](#Amqp+getChannel) ⇒ <code>promise</code> | ||
@@ -75,3 +76,3 @@ <a name="new_Amqp_new"></a> | ||
### amqp.publish(message, [callback], [options]) ⇒ <code>promise.<boolean></code> | ||
### amqp.publish(message, [callback], [options]) ⇒ <code>promise.<Array.<boolean>></code> | ||
**Kind**: instance method of [<code>Amqp</code>](#Amqp) | ||
@@ -84,3 +85,3 @@ | ||
| [options] | <code>object</code> | | | | ||
| [options.route] | <code>string</code> | <code>"'#'"</code> | route name | | ||
| [options.routes] | <code>string</code> \| <code>Array.<string></code> | <code>"['#']"</code> | route names | | ||
| [options.exchangeName] | <code>string</code> | <code>"'defaultExchange'"</code> | exchange name | | ||
@@ -97,8 +98,13 @@ | ||
| [options] | <code>object</code> | | | | ||
| [options.route] | <code>string</code> | <code>"'#'"</code> | exchange route | | ||
| [options.queueName] | <code>string</code> | <code>"''"</code> | queue name for processing request | | ||
| [options.routes] | <code>string</code> \| <code>Array.<string></code> | <code>"['#']"</code> | exchange routes | | ||
| [options.queueName] | <code>string</code> | <code>"''"</code> | queue name for processing messages. consumers with the same queue name process messages in round robin style | | ||
| [options.exchangeName] | <code>string</code> | <code>"'defaultExchange'"</code> | exchange name | | ||
| [options.exchangeType] | <code>string</code> | <code>"'topic'"</code> | exchange type | | ||
| [options.priority] | <code>number</code> | <code>0</code> | priority, larger numbers indicate higher priority | | ||
| [options.prefetch] | <code>number</code> | <code>0</code> | 1 or 0, if to process request one at a time | | ||
| [options.prefetch] | <code>number</code> | <code>1</code> | 1 or 0, if to process request one at a time | | ||
<a name="Amqp+getChannel"></a> | ||
### amqp.getChannel() ⇒ <code>promise</code> | ||
**Kind**: instance method of [<code>Amqp</code>](#Amqp) | ||
**Returns**: <code>promise</code> - - promise<channel> |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
11769
167
107
5
+ Added@coolgk/array@^1.1.4
+ Added@coolgk/array@1.1.4(transitive)