New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

rabbit-queue

Package Overview
Dependencies
Maintainers
1
Versions
49
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rabbit-queue - npm Package Compare versions

Comparing version 1.0.6 to 1.1.0

tsconfig.json

6

.vscode/launch.json

@@ -28,3 +28,3 @@ {

"request": "launch",
"program": "${workspaceRoot}/test/js/baseQueueHandlerTest.js",
"program": "${workspaceRoot}/test/js/index.js",
"stopOnEntry": false,

@@ -43,4 +43,4 @@ "args": [],

"externalConsole": false,
"sourceMaps": false,
"outDir": null
"sourceMaps": true,
"outDir": "${workspaceRoot}/test/js"
},

@@ -47,0 +47,0 @@ {

@@ -6,2 +6,4 @@ // Place your settings in this file to overwrite default and user settings.

}
,
"typescript.check.workspaceVersion": false
}

@@ -0,1 +1,2 @@

/// <reference types="amqplib" />
import Rabbit from './rabbit';

@@ -2,0 +3,0 @@ import * as amqp from 'amqplib';

@@ -17,4 +17,4 @@ "use strict";

assert(typeof logger.debug === 'function', 'logger has no debug method');
assert(typeof logger.warn === 'function', 'logger has no debug method');
assert(typeof logger.error === 'function', 'logger has no debug method');
assert(typeof logger.warn === 'function', 'logger has no warn method');
assert(typeof logger.error === 'function', 'logger has no error method');
this.retries = retries;

@@ -59,3 +59,3 @@ this.retryDelay = retryDelay;

this.logger.debug('[%s] #%s Dequeueing %s ', correlationId, retries + 1, this.queueName);
const result = yield this.handle({ msg: msg, event: event, correlationId: correlationId, startTime: startTime });
const result = yield this.handle({ msg, event, correlationId, startTime });
this.logger.debug('[%s] #%s Acknowledging %s ', correlationId, retries + 1, this.queueName);

@@ -114,3 +114,3 @@ ack(result);

yield this.rabbit.publish(this.dlqName, event, msg.properties);
yield this.afterDlq({ msg: msg, event: event });
yield this.afterDlq({ msg, event });
ack();

@@ -117,0 +117,0 @@ }

@@ -0,1 +1,3 @@

/// <reference types="node" />
/// <reference types="amqplib" />
import * as amqp from 'amqplib';

@@ -2,0 +4,0 @@ export interface Channel extends amqp.Channel {

@@ -0,1 +1,2 @@

/// <reference types="amqplib" />
import * as amqp from 'amqplib';

@@ -8,3 +9,4 @@ import { Channel } from './channel';

publish(channel: Channel, exchange: string, routingKey: string, content: any, headers: amqp.Options.Publish): Promise<{}>;
getReply(channel: Channel, exchange: string, routingKey: string, content: any, headers: amqp.Options.Publish, timeout?: number): Promise<any>;
};
export default _default;
"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator.throw(value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments)).next());
});
};
const logger_1 = require('./logger');
const race_until_1 = require('race-until');
const replyQueue_1 = require('./replyQueue');
Object.defineProperty(exports, "__esModule", { value: true });

@@ -18,4 +28,18 @@ exports.default = {

});
},
getReply(channel, exchange, routingKey, content, headers, timeout) {
return __awaiter(this, void 0, void 0, function* () {
const reply = replyQueue_1.getReply(content, headers, channel, (bufferContent, headers, correlationId, cb) => {
logger_1.getLogger().debug(`[${correlationId}] <- Publishing to reply exchange ${exchange}-${routingKey} ${bufferContent.byteLength} bytes`);
channel.publish(exchange, routingKey, bufferContent, headers, cb);
});
if (timeout) {
return race_until_1.default(reply, timeout, false);
}
else {
return reply;
}
});
}
};
//# sourceMappingURL=exchange.js.map

@@ -0,1 +1,2 @@

/// <reference types="amqplib" />
import * as amqp from 'amqplib';

@@ -2,0 +3,0 @@ import { Channel } from './channel';

@@ -28,3 +28,3 @@ "use strict";

let queueOptions = {
exclusive: exclusive, durable: durable, autoDelete: autoDelete, messageTtl: messageTtl, expires: expires, deadLetterExchange: deadLetterExchange, deadLetterRoutingKey: deadLetterRoutingKey, maxLength: maxLength
exclusive, durable, autoDelete, messageTtl, expires, deadLetterExchange, deadLetterRoutingKey, maxLength
};

@@ -94,3 +94,3 @@ if (priority !== undefined) {

persistent: true,
correlationId: correlationId
correlationId
}, headers);

@@ -108,14 +108,5 @@ const bufferContent = new Buffer(msg);

}
const reply = new Promise((resolve, reject) => {
var msg = JSON.stringify(obj);
var correlationId = headers.correlationId || uuid.v4();
headers = Object.assign({
persistent: false,
correlationId: correlationId,
replyTo: channel.replyName
}, headers);
const bufferContent = new Buffer(msg);
const reply = replyQueue_1.getReply(obj, headers, channel, (bufferContent, headers, correlationId, cb) => {
logger_1.getLogger().debug(`[${correlationId}] <- Publishing to reply queue ${name} ${bufferContent.byteLength} bytes`);
replyQueue_1.addHandler(correlationId, (err, body) => err ? reject(err) : resolve(body));
channel.sendToQueue(name, bufferContent, headers, (err, ok) => err ? reject(err) : ({}));
channel.sendToQueue(name, bufferContent, headers, cb);
});

@@ -122,0 +113,0 @@ if (timeout) {

@@ -0,1 +1,3 @@

/// <reference types="node" />
/// <reference types="amqplib" />
import * as amqp from 'amqplib';

@@ -40,9 +42,10 @@ import { EventEmitter } from 'events';

getReply(name: string, obj: any, headers: amqp.Options.Publish, prefix?: string, timeout?: number): Promise<any>;
getTopicReply(topicName: string, content: any, headers: amqp.Options.Publish, prefix?: string, timeout?: number): Promise<void>;
publishExchange(exchange: string, routingKey: string, content: any, headers: amqp.Options.Publish, prefix?: string): Promise<void>;
publishTopic(routingKey: string, content: any, headers?: amqp.Options.Publish, prefix?: string): Promise<void>;
bindToExchange(name: string, exchange: string, routingKey: string, prefix?: string): Promise<void>;
unbindFromExchange(name: string, exchange: any, routingKey: any, prefix?: string): Promise<void>;
bindToTopic(name: string, routingKey: string, prefix?: string): Promise<void>;
unbindFromTopic(name: string, routingKey: string, prefix?: string): Promise<void>;
publishTopic(topicName: string, content: any, headers?: amqp.Options.Publish, prefix?: string): Promise<void>;
bindToExchange(queueName: string, exchange: string, routingKey: string, prefix?: string): Promise<void>;
unbindFromExchange(queueName: string, exchange: any, topicName: any, prefix?: string): Promise<void>;
bindToTopic(queueName: string, topicName: string, prefix?: string): Promise<void>;
unbindFromTopic(queueName: string, topicName: string, prefix?: string): Promise<void>;
close(): Promise<void>;
}

@@ -129,2 +129,9 @@ "use strict";

}
getTopicReply(topicName, content, headers, prefix, timeout) {
return __awaiter(this, void 0, void 0, function* () {
topicName = this.updateName(topicName, prefix);
yield this.connected;
yield exchange_1.default.getReply(this.channel, 'amq.topic', topicName, content, headers, timeout);
});
}
publishExchange(exchange, routingKey, content, headers, prefix) {

@@ -137,33 +144,33 @@ return __awaiter(this, void 0, void 0, function* () {

}
publishTopic(routingKey, content, headers = {}, prefix) {
publishTopic(topicName, content, headers = {}, prefix) {
return __awaiter(this, void 0, void 0, function* () {
routingKey = this.updateName(routingKey, prefix);
topicName = this.updateName(topicName, prefix);
yield this.connected;
yield exchange_1.default.publish(this.channel, 'amq.topic', routingKey, content, headers);
yield exchange_1.default.publish(this.channel, 'amq.topic', topicName, content, headers);
});
}
bindToExchange(name, exchange, routingKey, prefix) {
bindToExchange(queueName, exchange, routingKey, prefix) {
return __awaiter(this, void 0, void 0, function* () {
name = this.updateName(name, prefix);
queueName = this.updateName(queueName, prefix);
yield this.connected;
yield queue_1.default.bindToExchange(exchange, routingKey, this.channel, name, this.queues[name]);
yield queue_1.default.bindToExchange(exchange, routingKey, this.channel, queueName, this.queues[queueName]);
});
}
unbindFromExchange(name, exchange, routingKey, prefix) {
unbindFromExchange(queueName, exchange, topicName, prefix) {
return __awaiter(this, void 0, void 0, function* () {
name = this.updateName(name, prefix);
queueName = this.updateName(queueName, prefix);
yield this.connected;
yield queue_1.default.unbindFromExchange(exchange, routingKey, this.channel, name, this.queues[name]);
yield queue_1.default.unbindFromExchange(exchange, topicName, this.channel, queueName, this.queues[queueName]);
});
}
bindToTopic(name, routingKey, prefix) {
bindToTopic(queueName, topicName, prefix) {
return __awaiter(this, void 0, void 0, function* () {
name = this.updateName(name, prefix);
yield this.bindToExchange(name, 'amq.topic', routingKey, prefix);
queueName = this.updateName(queueName, prefix);
yield this.bindToExchange(queueName, 'amq.topic', topicName, prefix);
});
}
unbindFromTopic(name, routingKey, prefix) {
unbindFromTopic(queueName, topicName, prefix) {
return __awaiter(this, void 0, void 0, function* () {
name = this.updateName(name, prefix);
yield this.unbindFromExchange(name, 'amq.topic', routingKey, prefix);
queueName = this.updateName(queueName, prefix);
yield this.unbindFromExchange(queueName, 'amq.topic', topicName, prefix);
});

@@ -170,0 +177,0 @@ }

@@ -0,3 +1,7 @@

/// <reference types="node" />
/// <reference types="amqplib" />
import { Channel } from './channel';
import * as amqp from 'amqplib';
export declare function createReplyQueue(channel: Channel): Promise<void>;
export declare function addHandler(correlationId: any, handler: (err: Error, body: string) => void): void;
export declare function getReply(content: any, headers: amqp.Options.Publish, channel: Channel, cb: Function): Promise<{}>;

@@ -12,2 +12,3 @@ "use strict";

const assert = require('assert');
const uuid = require('node-uuid');
let replyHandlers = {};

@@ -30,2 +31,17 @@ function createReplyQueue(channel) {

exports.addHandler = addHandler;
function getReply(content, headers, channel, cb) {
return new Promise((resolve, reject) => {
var msg = JSON.stringify(content);
var correlationId = headers.correlationId || uuid.v4();
headers = Object.assign({
persistent: false,
correlationId,
replyTo: channel.replyName
}, headers);
const bufferContent = new Buffer(msg);
addHandler(correlationId, (err, body) => err ? reject(err) : resolve(body));
cb(bufferContent, headers, correlationId, (err, ok) => err ? reject(err) : ({}));
});
}
exports.getReply = getReply;
function onReply(msg) {

@@ -32,0 +48,0 @@ const id = msg.properties.correlationId;

{
"name": "rabbit-queue",
"version": "1.0.6",
"version": "1.1.0",
"description": "AMQP/RabbitMQ queue management library.",

@@ -14,7 +14,7 @@ "main": "js/index.js",

"scripts": {
"tsc": "tsc -p ts",
"tsc:w": "tsc -w -p ts",
"prepublish": "tsc -p ts",
"pretest": "tslint ts/**.ts && tsc -p test/ts && tsc -p ts",
"test": "istanbul test ./node_modules/.bin/_mocha test/js/**.js -- -R spec",
"build": "tsc -p .",
"watch": "tsc -w -p .",
"prepublish": "npm run build",
"pretest": "tslint ts/**.ts && npm run build",
"test": "istanbul test ./node_modules/.bin/_mocha js/test/**.js -- -R spec",
"test-docker": "docker-compose run rabbit-queue npm run test --coverage",

@@ -33,6 +33,16 @@ "posttest": "remap-istanbul -i coverage/coverage.json -o coverage/lcov-report -t html"

"node-uuid": "^1.4.7",
"race-until": "^1.0.2"
"race-until": "^1.0.2",
"source-map-support": "^0.4.5"
},
"typings": "js/index",
"devDependencies": {
"@types/amqplib": "^0.3.29",
"@types/log4js": "0.0.32",
"@types/mocha": "^2.2.32",
"@types/node": "^6.0.40",
"@types/node-uuid": "0.0.28",
"@types/should": "^8.1.30",
"@types/sinon": "^1.16.30",
"@types/source-map-support": "^0.2.28",
"@types/supertest": "^1.1.31",
"istanbul": "^0.4.4",

@@ -44,4 +54,4 @@ "mocha": "^3.0.2",

"tslint": "^3.14.0",
"typescript": "^1.8.10"
"typescript": "2.0.6"
}
}

@@ -40,2 +40,7 @@ # Rabbit Queue

});
rabbit.bindToTopic('queueName', 'routingKey');
rabbit.getTopicReply('routingKey', { test: 'data' }, { correlationId: '1' }, '', 100)
.then((reply) => console.log('received reply', reply))
.catch((error) => console.log('Timed out after 100ms'));
```

@@ -42,0 +47,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc