appolo-rabbit
Advanced tools
Comparing version 8.1.0 to 8.1.1
@@ -7,5 +7,6 @@ import {Rabbit} from "./src/rabbit" | ||
import {IExchangeOptions} from "./src/exchanges/IExchangeOptions"; | ||
import {IConnectionOptions} from "./src/connection/IConnectionOptions"; | ||
import {IConnectionOptions, IConnectionParams} from "./src/connection/IConnectionOptions"; | ||
import {Handler} from "./src/handlers/handler"; | ||
import {IRequestOptions, IPublishOptions,IRetry} from "./src/exchanges/IPublishOptions"; | ||
import {QueueMessageModel, QueueModel} from "./src/api/models/queueModel"; | ||
import {IRequestOptions, IPublishOptions, IRetry} from "./src/exchanges/IPublishOptions"; | ||
import {App, createApp} from '@appolo/engine'; | ||
@@ -25,3 +26,3 @@ import {Defaults} from "./src/common/defaults"; | ||
IPublishOptions, | ||
IMessage, IBindingOptions | ||
IMessage, IBindingOptions, IConnectionParams, QueueMessageModel, QueueModel | ||
} | ||
@@ -28,0 +29,0 @@ |
@@ -19,3 +19,3 @@ { | ||
"main": "./index.js", | ||
"version": "8.1.0", | ||
"version": "8.1.1", | ||
"license": "MIT", | ||
@@ -31,2 +31,3 @@ "repository": { | ||
"@appolo/events": "^8.0.2", | ||
"@appolo/http": "^8.1.2", | ||
"@appolo/inject": "^8.0.18", | ||
@@ -33,0 +34,0 @@ "@appolo/utils": "^8.0.38", |
@@ -57,3 +57,7 @@ "use strict"; | ||
} | ||
sendToQueue(queue, content, options) { | ||
async sendToQueue(queue, content, options) { | ||
if (options.confirm !== undefined ? options.confirm : this._options.confirm) { | ||
await utils_1.Promises.fromCallback(c => this._channel.sendToQueue(queue, content, options, c)); | ||
return; | ||
} | ||
this._channel.sendToQueue(queue, content, options); | ||
@@ -64,12 +68,8 @@ } | ||
} | ||
publish(exchange, routingKey, content, options) { | ||
async publish(exchange, routingKey, content, options) { | ||
if (options.confirm !== undefined ? options.confirm : this._options.confirm) { | ||
return new Promise((resolve, reject) => { | ||
this._channel.publish(exchange, routingKey, content, options, (err, ok) => { | ||
err ? reject(err) : resolve(); | ||
}); | ||
}); | ||
await utils_1.Promises.fromCallback(c => this._channel.publish(exchange, routingKey, content, options, c)); | ||
return; | ||
} | ||
this._channel.publish(exchange, routingKey, content, options); | ||
return Promise.resolve(); | ||
} | ||
@@ -76,0 +76,0 @@ _onChannelClose() { |
@@ -11,3 +11,3 @@ import { | ||
import {EventsDispatcher} from "../events/eventsDispatcher"; | ||
import {Objects} from "@appolo/utils"; | ||
import {Objects, Promises} from "@appolo/utils"; | ||
import {IExchangeOptions} from "../exchanges/IExchangeOptions"; | ||
@@ -91,4 +91,12 @@ import {IChannelOptions} from "./IChannelOptions"; | ||
public sendToQueue(queue: string, content: Buffer, options?: Options.Publish) { | ||
this._channel.sendToQueue(queue, content, options) | ||
public async sendToQueue(queue: string, content: Buffer, options?: Options.Publish & { | ||
confirm?: boolean | ||
}): Promise<void> { | ||
if (options.confirm !== undefined ? options.confirm : this._options.confirm) { | ||
await Promises.fromCallback<any>(c => this._channel.sendToQueue(queue, content, options, c)) | ||
return; | ||
} | ||
this._channel.sendToQueue(queue, content, options); | ||
} | ||
@@ -100,10 +108,9 @@ | ||
public publish(exchange: string, routingKey: string, content: Buffer, options: Options.Publish & { confirm?: boolean }): Promise<void> { | ||
public async publish(exchange: string, routingKey: string, content: Buffer, options: Options.Publish & { | ||
confirm?: boolean | ||
}): Promise<void> { | ||
if (options.confirm !== undefined ? options.confirm : this._options.confirm) { | ||
return new Promise<void>((resolve, reject) => { | ||
this._channel.publish(exchange, routingKey, content, options, (err, ok) =>{ | ||
err ? reject(err) : resolve() | ||
}) | ||
}) | ||
await Promises.fromCallback(c => this._channel.publish(exchange, routingKey, content, options, c)) | ||
return; | ||
} | ||
@@ -113,3 +120,2 @@ | ||
return Promise.resolve(); | ||
} | ||
@@ -116,0 +122,0 @@ |
@@ -17,2 +17,3 @@ "use strict"; | ||
connection = Object.assign({}, connectionsDefaults_1.ConnectionsDefaults, connection); | ||
this._connectionParams = connection; | ||
this._connection = await (0, amqplib_1.connect)(connection); | ||
@@ -73,2 +74,5 @@ this._connection.on('close', () => this._onConnectionClose()); | ||
} | ||
get connectionParams() { | ||
return this._connectionParams; | ||
} | ||
}; | ||
@@ -75,0 +79,0 @@ tslib_1.__decorate([ |
@@ -15,2 +15,3 @@ import {define, inject, singleton} from '@appolo/inject'; | ||
import {Channel} from "../channel/channel"; | ||
import {IConnectionParams} from "./IConnectionOptions"; | ||
@@ -27,2 +28,3 @@ @define() | ||
private _connectionParams: IConnectionParams | ||
@@ -39,2 +41,4 @@ public async createConnection(): Promise<void> { | ||
this._connectionParams = connection as IConnectionParams | ||
this._connection = await connect(connection); | ||
@@ -59,3 +63,3 @@ | ||
private _parseUri(uri: string) { | ||
private _parseUri(uri: string): IConnectionParams { | ||
let amqp = url.parse(uri); | ||
@@ -120,2 +124,8 @@ return { | ||
} | ||
public get connectionParams(): IConnectionParams { | ||
return this._connectionParams; | ||
} | ||
} |
@@ -23,1 +23,10 @@ export interface IConnectionOptions { | ||
} | ||
export interface IConnectionParams { | ||
username: string, | ||
password: string, | ||
hostname: string, | ||
port: number, | ||
vhost: string | ||
} |
@@ -27,3 +27,3 @@ "use strict"; | ||
contentEncoding: "utf8", | ||
}, utils_1.Objects.omit(msg, "body", "routingKey", "delay", "retry")); | ||
}, utils_1.Objects.omit(msg, "body", "routingKey", "delay", "retry", "debounce", "throttle", "deduplicationId")); | ||
opts.contentType = this.serializers.getContentType(msg); | ||
@@ -36,19 +36,63 @@ if (msg.persistent !== undefined ? msg.persistent : this._options.persistent) { | ||
} | ||
if (msg.deduplicationId) { | ||
opts.headers["x-deduplication-id"] = msg.deduplicationId; | ||
} | ||
let content = this.serializers.getSerializer(opts.contentType).serialize(msg.body); | ||
if (msg.delay > 0) { | ||
let queueName = `${msg.routingKey}_${utils_2.Guid.guid()}`; | ||
await this._channel.assertQueue(queueName, { | ||
deadLetterRoutingKey: msg.routingKey, | ||
deadLetterExchange: this._options.name, | ||
autoDelete: false, | ||
durable: true, | ||
messageTtl: msg.delay, | ||
expires: msg.delay + 1000 | ||
}); | ||
this._channel.sendToQueue(queueName, content, opts); | ||
await this._handleDelay(msg, opts, content); | ||
return; | ||
} | ||
let result = await this._channel.publish(this._options.name, msg.routingKey, content, opts); | ||
return result; | ||
if (msg.throttle > 0 && msg.deduplicationId) { | ||
await this._handleThrottle(msg, opts, content); | ||
return; | ||
} | ||
if (msg.debounce > 0 && msg.deduplicationId) { | ||
await this._handleDebounce(msg, opts, content); | ||
return; | ||
} | ||
await this._channel.publish(this._options.name, msg.routingKey, content, opts); | ||
} | ||
async _handleDelay(msg, opts, content) { | ||
let queueName = `${msg.routingKey}_${utils_2.Guid.guid()}`; | ||
let params = this._getDelayQueueParams({ msg, delay: msg.delay, expires: msg.delay + 1000 }); | ||
await this._channel.assertQueue(queueName, params); | ||
await this._channel.sendToQueue(queueName, content, opts); | ||
return true; | ||
} | ||
async _handleThrottle(msg, opts, content) { | ||
let queueName = `${msg.routingKey}_${msg.deduplicationId}`; | ||
let queue = await this.rabbitApi.getQueue({ name: queueName }); | ||
if (queue && queue.arguments["x-created"] + msg.throttle > Date.now()) { | ||
return true; | ||
} | ||
if (queue && queue.arguments["x-created"] + msg.throttle <= Date.now()) { | ||
await this.rabbitApi.deleteQueue({ name: queueName }); | ||
} | ||
let params = this._getDelayQueueParams({ msg, delay: msg.throttle, expires: msg.throttle + 1000 }); | ||
await this._channel.assertQueue(queueName, params); | ||
await this._channel.sendToQueue(queueName, content, { ...opts, confirm: true }); | ||
return true; | ||
} | ||
async _handleDebounce(msg, opts, content) { | ||
let queueName = `${msg.routingKey}_${msg.deduplicationId}`; | ||
let queue = await this.rabbitApi.getQueue({ name: queueName }); | ||
if (queue) { | ||
await this.rabbitApi.deleteQueue({ name: queueName }); | ||
} | ||
let params = this._getDelayQueueParams({ msg, delay: msg.debounce, expires: msg.debounce + 1000 }); | ||
await this._channel.assertQueue(queueName, params); | ||
await this._channel.sendToQueue(queueName, content, opts); | ||
return true; | ||
} | ||
_getDelayQueueParams(params) { | ||
return { | ||
deadLetterRoutingKey: params.msg.routingKey, | ||
deadLetterExchange: this._options.name, | ||
autoDelete: false, | ||
durable: true, | ||
messageTtl: params.delay, | ||
expires: params.expires, | ||
arguments: { "x-created": Date.now() } | ||
}; | ||
} | ||
}; | ||
@@ -64,2 +108,5 @@ tslib_1.__decorate([ | ||
], Exchange.prototype, "connection", void 0); | ||
tslib_1.__decorate([ | ||
(0, inject_1.inject)() | ||
], Exchange.prototype, "rabbitApi", void 0); | ||
Exchange = tslib_1.__decorate([ | ||
@@ -66,0 +113,0 @@ (0, inject_1.define)() |
@@ -12,2 +12,3 @@ import amqplib = require('amqplib'); | ||
import {Connection} from "../connection/connection"; | ||
import {RabbitApi} from "../api/rabbitApi"; | ||
@@ -20,2 +21,3 @@ @define() | ||
@inject() private connection: Connection; | ||
@inject() private rabbitApi: RabbitApi; | ||
@@ -49,3 +51,3 @@ | ||
contentEncoding: "utf8", | ||
} as Partial<Options.Publish>, Objects.omit(msg, "body", "routingKey", "delay", "retry")); | ||
} as Partial<Options.Publish>, Objects.omit(msg, "body", "routingKey", "delay", "retry", "debounce", "throttle", "deduplicationId")); | ||
@@ -62,25 +64,97 @@ opts.contentType = this.serializers.getContentType(msg); | ||
if (msg.deduplicationId) { | ||
opts.headers["x-deduplication-id"] = msg.deduplicationId; | ||
} | ||
let content = this.serializers.getSerializer(opts.contentType).serialize(msg.body); | ||
if (msg.delay > 0) { | ||
let queueName = `${msg.routingKey}_${Guid.guid()}` | ||
await this._channel.assertQueue(queueName, { | ||
deadLetterRoutingKey: msg.routingKey, | ||
deadLetterExchange: this._options.name, | ||
autoDelete: false, | ||
durable: true, | ||
messageTtl: msg.delay, | ||
expires: msg.delay + 1000 | ||
}) | ||
await this._handleDelay(msg, opts, content) | ||
return; | ||
} | ||
this._channel.sendToQueue(queueName, content, opts); | ||
if (msg.throttle > 0 && msg.deduplicationId) { | ||
await this._handleThrottle(msg, opts, content); | ||
return; | ||
} | ||
if (msg.debounce > 0 && msg.deduplicationId) { | ||
await this._handleDebounce(msg, opts, content); | ||
return; | ||
} | ||
let result = await this._channel.publish(this._options.name, msg.routingKey, content, opts); | ||
await this._channel.publish(this._options.name, msg.routingKey, content, opts); | ||
return result; | ||
} | ||
private async _handleDelay(msg: IPublishOptions, opts: Options.Publish, content: any): Promise<boolean> { | ||
let queueName = `${msg.routingKey}_${Guid.guid()}` | ||
let params = this._getDelayQueueParams({msg, delay: msg.delay, expires: msg.delay + 1000}) | ||
await this._channel.assertQueue(queueName, params) | ||
await this._channel.sendToQueue(queueName, content, opts); | ||
return true; | ||
} | ||
private async _handleThrottle(msg: IPublishOptions, opts: Options.Publish, content: any): Promise<boolean> { | ||
let queueName = `${msg.routingKey}_${msg.deduplicationId}` | ||
let queue = await this.rabbitApi.getQueue({name: queueName}) | ||
if (queue && queue.arguments["x-created"] + msg.throttle > Date.now()) { | ||
return true; | ||
} | ||
if (queue && queue.arguments["x-created"] + msg.throttle <= Date.now()) { | ||
await this.rabbitApi.deleteQueue({name: queueName}); | ||
} | ||
let params = this._getDelayQueueParams({msg, delay: msg.throttle, expires: msg.throttle + 1000}) | ||
await this._channel.assertQueue(queueName, params); | ||
await this._channel.sendToQueue(queueName, content, {...opts, confirm: true}) | ||
return true; | ||
} | ||
private async _handleDebounce(msg: IPublishOptions, opts: Options.Publish, content: any): Promise<boolean> { | ||
let queueName = `${msg.routingKey}_${msg.deduplicationId}` | ||
let queue = await this.rabbitApi.getQueue({name: queueName}) | ||
if (queue) { | ||
await this.rabbitApi.deleteQueue({name: queueName}); | ||
} | ||
let params = this._getDelayQueueParams({msg, delay: msg.debounce, expires: msg.debounce + 1000}) | ||
await this._channel.assertQueue(queueName, params); | ||
await this._channel.sendToQueue(queueName, content, opts); | ||
return true; | ||
} | ||
private _getDelayQueueParams(params: { msg: IPublishOptions, delay: number, expires: number }) { | ||
return { | ||
deadLetterRoutingKey: params.msg.routingKey, | ||
deadLetterExchange: this._options.name, | ||
autoDelete: false, | ||
durable: true, | ||
messageTtl: params.delay, | ||
expires: params.expires, | ||
arguments: {"x-created": Date.now()} | ||
} | ||
} | ||
} |
@@ -25,2 +25,5 @@ export interface IPublishOptions { | ||
delay?: number | ||
debounce?: number | ||
throttle?: number | ||
deduplicationId?: string | ||
retry?: IRetry | ||
@@ -27,0 +30,0 @@ } |
@@ -83,2 +83,8 @@ "use strict"; | ||
} | ||
get api() { | ||
return this.rabbitApi; | ||
} | ||
get connectionParams() { | ||
return this.connection.connectionParams; | ||
} | ||
}; | ||
@@ -101,2 +107,5 @@ tslib_1.__decorate([ | ||
tslib_1.__decorate([ | ||
(0, inject_1.inject)() | ||
], Rabbit.prototype, "rabbitApi", void 0); | ||
tslib_1.__decorate([ | ||
(0, inject_1.init)() | ||
@@ -103,0 +112,0 @@ ], Rabbit.prototype, "_initialize", null); |
@@ -18,2 +18,3 @@ import {} from '@appolo/engine'; | ||
import {IBindingOptions} from "./queues/IQueueOptions"; | ||
import {RabbitApi} from "./api/rabbitApi"; | ||
@@ -28,2 +29,3 @@ @define() | ||
@inject() private eventsDispatcher: EventsDispatcher; | ||
@inject() private rabbitApi: RabbitApi; | ||
@@ -45,3 +47,3 @@ @init() | ||
public bind(item: IBindingOptions):Promise<void>{ | ||
public bind(item: IBindingOptions): Promise<void> { | ||
return this.topology.bindKey(item) | ||
@@ -61,3 +63,3 @@ } | ||
public request<T, K = any>(exchangeName: string, msg: IRequestOptions): Promise<T> { | ||
public request<T, K = any>(exchangeName: string, msg: IRequestOptions): Promise<T> { | ||
@@ -143,3 +145,10 @@ let exchange = this._getExchange(exchangeName); | ||
public get api(): RabbitApi { | ||
return this.rabbitApi | ||
} | ||
public get connectionParams() { | ||
return this.connection.connectionParams | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
152884
111
3020
6
+ Added@appolo/http@^8.1.2
+ Added@appolo/http@8.1.10(transitive)
+ Addedappolo-cache@8.0.0(transitive)
+ Addedasynckit@0.4.0(transitive)
+ Addedaxios@1.7.8(transitive)
+ Addedcombined-stream@1.0.8(transitive)
+ Addeddelayed-stream@1.0.0(transitive)
+ Addedfollow-redirects@1.15.9(transitive)
+ Addedform-data@4.0.1(transitive)
+ Addedmime-db@1.52.0(transitive)
+ Addedmime-types@2.1.35(transitive)
+ Addedproxy-from-env@1.1.0(transitive)