Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@coolgk/amqp

Package Overview
Dependencies
Maintainers
1
Versions
17
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@coolgk/amqp - npm Package Compare versions

Comparing version 1.1.4 to 2.0.0

16

amqp.d.ts
/*!
* @package @coolgk/utils
* @version 1.1.4
* @version 2.0.0
* @link https://www.npmjs.com/package/@coolgk/utils

@@ -20,3 +20,3 @@ * @license MIT

export interface IConsumeConfig {
route?: string;
routes?: string | string[];
queueName?: string;

@@ -27,2 +27,3 @@ exchangeName?: string;

exchangeType?: string;
fallbackExchange?: string;
}

@@ -48,9 +49,10 @@ export interface IMessage {

closeConnection(): void;
publish(message: any, callback?: (message: IResponseMessage) => any, {route, exchangeName}?: {
route?: string;
publish(message: any, callback?: (message: IResponseMessage) => any, {routes, exchangeName}?: {
routes?: string;
exchangeName?: string;
}): Promise<boolean>;
consume(callback: (message: IMessage) => any, {route, queueName, exchangeName, exchangeType, priority, prefetch}?: IConsumeConfig): Promise<Replies.Consume>;
private _getChannel();
}): Promise<boolean[]>;
consume(callback: (message: IMessage) => any, {routes, queueName, exchangeName, exchangeType, priority, prefetch, fallbackExchange}?: IConsumeConfig): Promise<Replies.Consume[]>;
getChannel(): any;
private _publish(channel, exchangeName, routes, message, options);
}
export default Amqp;
/*!
* @package @coolgk/utils
* @version 1.1.4
* @version 2.0.0
* @link https://www.npmjs.com/package/@coolgk/utils

@@ -13,2 +13,3 @@ * @license MIT

const fs_1 = require("fs");
const array_1 = require("@coolgk/array");
class Amqp {

@@ -26,4 +27,4 @@ constructor(options) {

}
publish(message, callback, { route = '#', exchangeName = 'defaultExchange' } = {}) {
return this._getChannel().then((channel) => {
publish(message, callback, { routes = '#', exchangeName = 'defaultExchange' } = {}) {
return this.getChannel().then((channel) => {
if (callback) {

@@ -39,6 +40,5 @@ const messageId = this._uuid();

channel.ack(rawResponseMessage);
channel.deleteQueue(queue.queue);
}
});
return channel.publish(exchangeName, String(route), Buffer.from(JSON.stringify(message)), {
return this._publish(channel, exchangeName, routes, message, {
persistent: true,

@@ -50,3 +50,3 @@ correlationId: messageId,

}
return channel.publish(exchangeName, String(route), Buffer.from(JSON.stringify(message)), {
return this._publish(channel, exchangeName, routes, message, {
persistent: true

@@ -56,28 +56,37 @@ });

}
consume(callback, { route = '#', queueName = '', exchangeName = 'defaultExchange', exchangeType = 'topic', priority = 0, prefetch = 0 } = {}) {
return this._getChannel().then((channel) => {
consume(callback, { routes = ['#'], queueName = '', exchangeName = 'defaultExchange', exchangeType = 'topic', priority = 0, prefetch = 1, fallbackExchange = '' } = {}) {
return this.getChannel().then((channel) => {
return channel.prefetch(prefetch).then(() => {
return channel.assertExchange(exchangeName, exchangeType, { durable: true });
return channel.assertExchange(exchangeName, exchangeType, {
durable: false,
arguments: fallbackExchange ? { 'alternate-exchange': fallbackExchange } : {}
});
}).then(() => {
return channel.assertQueue(queueName, { durable: false });
return channel.assertQueue(queueName, { durable: false, exclusive: !queueName });
}).then((queue) => {
return channel.bindQueue(queue.queue, exchangeName, String(route)).then(() => channel.consume(queue.queue, (rawMessage) => {
Promise.resolve(callback({
rawMessage,
message: JSON.parse(rawMessage.content.toString())
})).then((response = '') => {
if (rawMessage
&& rawMessage.properties.replyTo && rawMessage.properties.correlationId) {
channel.sendToQueue(rawMessage.properties.replyTo, Buffer.from(JSON.stringify(response)), {
persistent: true,
correlationId: rawMessage.properties.correlationId
});
}
channel.ack(rawMessage);
});
}, { priority }));
const promises = [];
for (const route of array_1.toArray(routes)) {
promises.push(channel.bindQueue(queue.queue, exchangeName, String(route)));
}
return Promise.all(promises).then(() => {
return channel.consume(queue.queue, (rawMessage) => {
Promise.resolve(callback({
rawMessage,
message: JSON.parse(rawMessage.content.toString())
})).then((response = '') => {
if (rawMessage
&& rawMessage.properties.replyTo && rawMessage.properties.correlationId) {
channel.sendToQueue(rawMessage.properties.replyTo, Buffer.from(JSON.stringify(response)), {
persistent: true,
correlationId: rawMessage.properties.correlationId
});
}
channel.ack(rawMessage);
});
}, { priority });
});
});
});
}
_getChannel() {
getChannel() {
if (!this._channel) {

@@ -101,4 +110,11 @@ if (this._sslPem) {

}
_publish(channel, exchangeName, routes, message, options) {
const promises = [];
for (const route of array_1.toArray(routes)) {
promises.push(channel.publish(exchangeName, String(route), Buffer.from(JSON.stringify(message)), options));
}
return Promise.all(promises);
}
}
exports.Amqp = Amqp;
exports.default = Amqp;

@@ -1,1 +0,1 @@

{"name":"@coolgk/amqp","version":"1.1.4","author":"Daniel Gong <daniel.k.gong@gmail.com>","homepage":"https://www.npmjs.com/package/@coolgk/utils","bugs":{"url":"https://github.com/coolgk/node-utils/issues"},"repository":{"type":"git","url":"https://github.com/coolgk/node-utils.git"},"description":"a simple RabbitMQ (amqp wrapper) class for publishing and consuming messages","keywords":["ampq","rabbitmq","typescript"],"dependencies":{"@types/amqplib":"^0.5.5","amqplib":"^0.5.2","uuid":"^3.1.0","@types/uuid":"^3.4.3"},"pre-commit":{"silent":false,"run":["test:precommit","tslint"]},"main":"./amqp.js","types":"./amqp.d.ts","license":"MIT"}
{"name":"@coolgk/amqp","version":"2.0.0","author":"Daniel Gong <daniel.k.gong@gmail.com>","homepage":"https://www.npmjs.com/package/@coolgk/utils","bugs":{"url":"https://github.com/coolgk/node-utils/issues"},"repository":{"type":"git","url":"https://github.com/coolgk/node-utils.git"},"description":"a simple RabbitMQ (amqp wrapper) class for publishing and consuming messages","keywords":["ampq","rabbitmq","typescript"],"dependencies":{"@types/amqplib":"^0.5.5","amqplib":"^0.5.2","uuid":"^3.1.0","@types/uuid":"^3.4.3","@coolgk/array":"^1.1.4"},"pre-commit":{"silent":false,"run":["tslint","test:precommit"]},"main":"./amqp.js","types":"./amqp.d.ts","license":"MIT"}

@@ -53,4 +53,5 @@

* [.closeConnection()](#Amqp+closeConnection) ⇒ <code>void</code>
* [.publish(message, [callback], [options])](#Amqp+publish) ⇒ <code>promise.&lt;boolean&gt;</code>
* [.publish(message, [callback], [options])](#Amqp+publish) ⇒ <code>promise.&lt;Array.&lt;boolean&gt;&gt;</code>
* [.consume(callback, [options])](#Amqp+consume) ⇒ <code>promise</code>
* [.getChannel()](#Amqp+getChannel) ⇒ <code>promise</code>

@@ -75,3 +76,3 @@ <a name="new_Amqp_new"></a>

### amqp.publish(message, [callback], [options]) ⇒ <code>promise.&lt;boolean&gt;</code>
### amqp.publish(message, [callback], [options]) ⇒ <code>promise.&lt;Array.&lt;boolean&gt;&gt;</code>
**Kind**: instance method of [<code>Amqp</code>](#Amqp)

@@ -84,3 +85,3 @@

| [options] | <code>object</code> | | |
| [options.route] | <code>string</code> | <code>&quot;&#x27;#&#x27;&quot;</code> | route name |
| [options.routes] | <code>string</code> \| <code>Array.&lt;string&gt;</code> | <code>&quot;[&#x27;#&#x27;]&quot;</code> | route names |
| [options.exchangeName] | <code>string</code> | <code>&quot;&#x27;defaultExchange&#x27;&quot;</code> | exchange name |

@@ -97,8 +98,13 @@

| [options] | <code>object</code> | | |
| [options.route] | <code>string</code> | <code>&quot;&#x27;#&#x27;&quot;</code> | exchange route |
| [options.queueName] | <code>string</code> | <code>&quot;&#x27;&#x27;&quot;</code> | queue name for processing request |
| [options.routes] | <code>string</code> \| <code>Array.&lt;string&gt;</code> | <code>&quot;[&#x27;#&#x27;]&quot;</code> | exchange routes |
| [options.queueName] | <code>string</code> | <code>&quot;&#x27;&#x27;&quot;</code> | queue name for processing messages. consumers with the same queue name process messages in round robin style |
| [options.exchangeName] | <code>string</code> | <code>&quot;&#x27;defaultExchange&#x27;&quot;</code> | exchange name |
| [options.exchangeType] | <code>string</code> | <code>&quot;&#x27;topic&#x27;&quot;</code> | exchange type |
| [options.priority] | <code>number</code> | <code>0</code> | priority, larger numbers indicate higher priority |
| [options.prefetch] | <code>number</code> | <code>0</code> | 1 or 0, if to process request one at a time |
| [options.prefetch] | <code>number</code> | <code>1</code> | 1 or 0, if to process request one at a time |
<a name="Amqp+getChannel"></a>
### amqp.getChannel() ⇒ <code>promise</code>
**Kind**: instance method of [<code>Amqp</code>](#Amqp)
**Returns**: <code>promise</code> - - promise<channel>
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc