Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

appolo-rabbit

Package Overview
Dependencies
Maintainers
1
Versions
16
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

appolo-rabbit - npm Package Compare versions

Comparing version 8.1.0 to 8.1.1

config/modules/all.js

7

index.ts

@@ -7,5 +7,6 @@ import {Rabbit} from "./src/rabbit"

import {IExchangeOptions} from "./src/exchanges/IExchangeOptions";
import {IConnectionOptions} from "./src/connection/IConnectionOptions";
import {IConnectionOptions, IConnectionParams} from "./src/connection/IConnectionOptions";
import {Handler} from "./src/handlers/handler";
import {IRequestOptions, IPublishOptions,IRetry} from "./src/exchanges/IPublishOptions";
import {QueueMessageModel, QueueModel} from "./src/api/models/queueModel";
import {IRequestOptions, IPublishOptions, IRetry} from "./src/exchanges/IPublishOptions";
import {App, createApp} from '@appolo/engine';

@@ -25,3 +26,3 @@ import {Defaults} from "./src/common/defaults";

IPublishOptions,
IMessage, IBindingOptions
IMessage, IBindingOptions, IConnectionParams, QueueMessageModel, QueueModel
}

@@ -28,0 +29,0 @@

@@ -19,3 +19,3 @@ {

"main": "./index.js",
"version": "8.1.0",
"version": "8.1.1",
"license": "MIT",

@@ -31,2 +31,3 @@ "repository": {

"@appolo/events": "^8.0.2",
"@appolo/http": "^8.1.2",
"@appolo/inject": "^8.0.18",

@@ -33,0 +34,0 @@ "@appolo/utils": "^8.0.38",

@@ -57,3 +57,7 @@ "use strict";

}
sendToQueue(queue, content, options) {
async sendToQueue(queue, content, options) {
if (options.confirm !== undefined ? options.confirm : this._options.confirm) {
await utils_1.Promises.fromCallback(c => this._channel.sendToQueue(queue, content, options, c));
return;
}
this._channel.sendToQueue(queue, content, options);

@@ -64,12 +68,8 @@ }

}
publish(exchange, routingKey, content, options) {
async publish(exchange, routingKey, content, options) {
if (options.confirm !== undefined ? options.confirm : this._options.confirm) {
return new Promise((resolve, reject) => {
this._channel.publish(exchange, routingKey, content, options, (err, ok) => {
err ? reject(err) : resolve();
});
});
await utils_1.Promises.fromCallback(c => this._channel.publish(exchange, routingKey, content, options, c));
return;
}
this._channel.publish(exchange, routingKey, content, options);
return Promise.resolve();
}

@@ -76,0 +76,0 @@ _onChannelClose() {

@@ -11,3 +11,3 @@ import {

import {EventsDispatcher} from "../events/eventsDispatcher";
import {Objects} from "@appolo/utils";
import {Objects, Promises} from "@appolo/utils";
import {IExchangeOptions} from "../exchanges/IExchangeOptions";

@@ -91,4 +91,12 @@ import {IChannelOptions} from "./IChannelOptions";

public sendToQueue(queue: string, content: Buffer, options?: Options.Publish) {
this._channel.sendToQueue(queue, content, options)
public async sendToQueue(queue: string, content: Buffer, options?: Options.Publish & {
confirm?: boolean
}): Promise<void> {
if (options.confirm !== undefined ? options.confirm : this._options.confirm) {
await Promises.fromCallback<any>(c => this._channel.sendToQueue(queue, content, options, c))
return;
}
this._channel.sendToQueue(queue, content, options);
}

@@ -100,10 +108,9 @@

public publish(exchange: string, routingKey: string, content: Buffer, options: Options.Publish & { confirm?: boolean }): Promise<void> {
public async publish(exchange: string, routingKey: string, content: Buffer, options: Options.Publish & {
confirm?: boolean
}): Promise<void> {
if (options.confirm !== undefined ? options.confirm : this._options.confirm) {
return new Promise<void>((resolve, reject) => {
this._channel.publish(exchange, routingKey, content, options, (err, ok) =>{
err ? reject(err) : resolve()
})
})
await Promises.fromCallback(c => this._channel.publish(exchange, routingKey, content, options, c))
return;
}

@@ -113,3 +120,2 @@

return Promise.resolve();
}

@@ -116,0 +122,0 @@

@@ -17,2 +17,3 @@ "use strict";

connection = Object.assign({}, connectionsDefaults_1.ConnectionsDefaults, connection);
this._connectionParams = connection;
this._connection = await (0, amqplib_1.connect)(connection);

@@ -73,2 +74,5 @@ this._connection.on('close', () => this._onConnectionClose());

}
get connectionParams() {
return this._connectionParams;
}
};

@@ -75,0 +79,0 @@ tslib_1.__decorate([

@@ -15,2 +15,3 @@ import {define, inject, singleton} from '@appolo/inject';

import {Channel} from "../channel/channel";
import {IConnectionParams} from "./IConnectionOptions";

@@ -27,2 +28,3 @@ @define()

private _connectionParams: IConnectionParams

@@ -39,2 +41,4 @@ public async createConnection(): Promise<void> {

this._connectionParams = connection as IConnectionParams
this._connection = await connect(connection);

@@ -59,3 +63,3 @@

private _parseUri(uri: string) {
private _parseUri(uri: string): IConnectionParams {
let amqp = url.parse(uri);

@@ -120,2 +124,8 @@ return {

}
public get connectionParams(): IConnectionParams {
return this._connectionParams;
}
}

@@ -23,1 +23,10 @@ export interface IConnectionOptions {

}
export interface IConnectionParams {
username: string,
password: string,
hostname: string,
port: number,
vhost: string
}

@@ -27,3 +27,3 @@ "use strict";

contentEncoding: "utf8",
}, utils_1.Objects.omit(msg, "body", "routingKey", "delay", "retry"));
}, utils_1.Objects.omit(msg, "body", "routingKey", "delay", "retry", "debounce", "throttle", "deduplicationId"));
opts.contentType = this.serializers.getContentType(msg);

@@ -36,19 +36,63 @@ if (msg.persistent !== undefined ? msg.persistent : this._options.persistent) {

}
if (msg.deduplicationId) {
opts.headers["x-deduplication-id"] = msg.deduplicationId;
}
let content = this.serializers.getSerializer(opts.contentType).serialize(msg.body);
if (msg.delay > 0) {
let queueName = `${msg.routingKey}_${utils_2.Guid.guid()}`;
await this._channel.assertQueue(queueName, {
deadLetterRoutingKey: msg.routingKey,
deadLetterExchange: this._options.name,
autoDelete: false,
durable: true,
messageTtl: msg.delay,
expires: msg.delay + 1000
});
this._channel.sendToQueue(queueName, content, opts);
await this._handleDelay(msg, opts, content);
return;
}
let result = await this._channel.publish(this._options.name, msg.routingKey, content, opts);
return result;
if (msg.throttle > 0 && msg.deduplicationId) {
await this._handleThrottle(msg, opts, content);
return;
}
if (msg.debounce > 0 && msg.deduplicationId) {
await this._handleDebounce(msg, opts, content);
return;
}
await this._channel.publish(this._options.name, msg.routingKey, content, opts);
}
async _handleDelay(msg, opts, content) {
let queueName = `${msg.routingKey}_${utils_2.Guid.guid()}`;
let params = this._getDelayQueueParams({ msg, delay: msg.delay, expires: msg.delay + 1000 });
await this._channel.assertQueue(queueName, params);
await this._channel.sendToQueue(queueName, content, opts);
return true;
}
async _handleThrottle(msg, opts, content) {
let queueName = `${msg.routingKey}_${msg.deduplicationId}`;
let queue = await this.rabbitApi.getQueue({ name: queueName });
if (queue && queue.arguments["x-created"] + msg.throttle > Date.now()) {
return true;
}
if (queue && queue.arguments["x-created"] + msg.throttle <= Date.now()) {
await this.rabbitApi.deleteQueue({ name: queueName });
}
let params = this._getDelayQueueParams({ msg, delay: msg.throttle, expires: msg.throttle + 1000 });
await this._channel.assertQueue(queueName, params);
await this._channel.sendToQueue(queueName, content, { ...opts, confirm: true });
return true;
}
async _handleDebounce(msg, opts, content) {
let queueName = `${msg.routingKey}_${msg.deduplicationId}`;
let queue = await this.rabbitApi.getQueue({ name: queueName });
if (queue) {
await this.rabbitApi.deleteQueue({ name: queueName });
}
let params = this._getDelayQueueParams({ msg, delay: msg.debounce, expires: msg.debounce + 1000 });
await this._channel.assertQueue(queueName, params);
await this._channel.sendToQueue(queueName, content, opts);
return true;
}
_getDelayQueueParams(params) {
return {
deadLetterRoutingKey: params.msg.routingKey,
deadLetterExchange: this._options.name,
autoDelete: false,
durable: true,
messageTtl: params.delay,
expires: params.expires,
arguments: { "x-created": Date.now() }
};
}
};

@@ -64,2 +108,5 @@ tslib_1.__decorate([

], Exchange.prototype, "connection", void 0);
tslib_1.__decorate([
(0, inject_1.inject)()
], Exchange.prototype, "rabbitApi", void 0);
Exchange = tslib_1.__decorate([

@@ -66,0 +113,0 @@ (0, inject_1.define)()

@@ -12,2 +12,3 @@ import amqplib = require('amqplib');

import {Connection} from "../connection/connection";
import {RabbitApi} from "../api/rabbitApi";

@@ -20,2 +21,3 @@ @define()

@inject() private connection: Connection;
@inject() private rabbitApi: RabbitApi;

@@ -49,3 +51,3 @@

contentEncoding: "utf8",
} as Partial<Options.Publish>, Objects.omit(msg, "body", "routingKey", "delay", "retry"));
} as Partial<Options.Publish>, Objects.omit(msg, "body", "routingKey", "delay", "retry", "debounce", "throttle", "deduplicationId"));

@@ -62,25 +64,97 @@ opts.contentType = this.serializers.getContentType(msg);

if (msg.deduplicationId) {
opts.headers["x-deduplication-id"] = msg.deduplicationId;
}
let content = this.serializers.getSerializer(opts.contentType).serialize(msg.body);
if (msg.delay > 0) {
let queueName = `${msg.routingKey}_${Guid.guid()}`
await this._channel.assertQueue(queueName, {
deadLetterRoutingKey: msg.routingKey,
deadLetterExchange: this._options.name,
autoDelete: false,
durable: true,
messageTtl: msg.delay,
expires: msg.delay + 1000
})
await this._handleDelay(msg, opts, content)
return;
}
this._channel.sendToQueue(queueName, content, opts);
if (msg.throttle > 0 && msg.deduplicationId) {
await this._handleThrottle(msg, opts, content);
return;
}
if (msg.debounce > 0 && msg.deduplicationId) {
await this._handleDebounce(msg, opts, content);
return;
}
let result = await this._channel.publish(this._options.name, msg.routingKey, content, opts);
await this._channel.publish(this._options.name, msg.routingKey, content, opts);
return result;
}
private async _handleDelay(msg: IPublishOptions, opts: Options.Publish, content: any): Promise<boolean> {
let queueName = `${msg.routingKey}_${Guid.guid()}`
let params = this._getDelayQueueParams({msg, delay: msg.delay, expires: msg.delay + 1000})
await this._channel.assertQueue(queueName, params)
await this._channel.sendToQueue(queueName, content, opts);
return true;
}
private async _handleThrottle(msg: IPublishOptions, opts: Options.Publish, content: any): Promise<boolean> {
let queueName = `${msg.routingKey}_${msg.deduplicationId}`
let queue = await this.rabbitApi.getQueue({name: queueName})
if (queue && queue.arguments["x-created"] + msg.throttle > Date.now()) {
return true;
}
if (queue && queue.arguments["x-created"] + msg.throttle <= Date.now()) {
await this.rabbitApi.deleteQueue({name: queueName});
}
let params = this._getDelayQueueParams({msg, delay: msg.throttle, expires: msg.throttle + 1000})
await this._channel.assertQueue(queueName, params);
await this._channel.sendToQueue(queueName, content, {...opts, confirm: true})
return true;
}
private async _handleDebounce(msg: IPublishOptions, opts: Options.Publish, content: any): Promise<boolean> {
let queueName = `${msg.routingKey}_${msg.deduplicationId}`
let queue = await this.rabbitApi.getQueue({name: queueName})
if (queue) {
await this.rabbitApi.deleteQueue({name: queueName});
}
let params = this._getDelayQueueParams({msg, delay: msg.debounce, expires: msg.debounce + 1000})
await this._channel.assertQueue(queueName, params);
await this._channel.sendToQueue(queueName, content, opts);
return true;
}
private _getDelayQueueParams(params: { msg: IPublishOptions, delay: number, expires: number }) {
return {
deadLetterRoutingKey: params.msg.routingKey,
deadLetterExchange: this._options.name,
autoDelete: false,
durable: true,
messageTtl: params.delay,
expires: params.expires,
arguments: {"x-created": Date.now()}
}
}
}

@@ -25,2 +25,5 @@ export interface IPublishOptions {

delay?: number
debounce?: number
throttle?: number
deduplicationId?: string
retry?: IRetry

@@ -27,0 +30,0 @@ }

@@ -83,2 +83,8 @@ "use strict";

}
get api() {
return this.rabbitApi;
}
get connectionParams() {
return this.connection.connectionParams;
}
};

@@ -101,2 +107,5 @@ tslib_1.__decorate([

tslib_1.__decorate([
(0, inject_1.inject)()
], Rabbit.prototype, "rabbitApi", void 0);
tslib_1.__decorate([
(0, inject_1.init)()

@@ -103,0 +112,0 @@ ], Rabbit.prototype, "_initialize", null);

@@ -18,2 +18,3 @@ import {} from '@appolo/engine';

import {IBindingOptions} from "./queues/IQueueOptions";
import {RabbitApi} from "./api/rabbitApi";

@@ -28,2 +29,3 @@ @define()

@inject() private eventsDispatcher: EventsDispatcher;
@inject() private rabbitApi: RabbitApi;

@@ -45,3 +47,3 @@ @init()

public bind(item: IBindingOptions):Promise<void>{
public bind(item: IBindingOptions): Promise<void> {
return this.topology.bindKey(item)

@@ -61,3 +63,3 @@ }

public request<T, K = any>(exchangeName: string, msg: IRequestOptions): Promise<T> {
public request<T, K = any>(exchangeName: string, msg: IRequestOptions): Promise<T> {

@@ -143,3 +145,10 @@ let exchange = this._getExchange(exchangeName);

public get api(): RabbitApi {
return this.rabbitApi
}
public get connectionParams() {
return this.connection.connectionParams
}
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc