Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

exp-amqp-connection

Package Overview
Dependencies
Maintainers
5
Versions
58
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

exp-amqp-connection - npm Package Compare versions

Comparing version 3.1.1 to 3.1.2-beta

25

index.js

@@ -7,2 +7,3 @@ "use strict";

const transform = require("./lib/transform");
const async = require("async");

@@ -144,10 +145,10 @@ const TMP_Q_TTL = 60000;

const channel = bootstrapRes.pubChannel;
const setupTasks = [];
if (!delayedAssets[name]) {
behaviour.logger.info("Creating delayed queue/exchange pair", name);
channel.assertExchange(name, "fanout", {
behaviour.logger.info("Creating delayed queue/exchange pair:", name);
setupTasks.push(
(done) => channel.assertExchange(name, "fanout", {durable: true, autoDelete: true}, done)
);
const queueArgs = {
durable: true,
autoDelete: true
});
channel.assertQueue(name, {
durable: true,
autoDelete: true,

@@ -158,9 +159,13 @@ arguments: {

}
});
channel.bindQueue(name, name, "#", {});
};
setupTasks.push((done) => channel.assertQueue(name, queueArgs, done));
setupTasks.push((done) => channel.bindQueue(name, name, "#", {}, done));
delayedAssets[name] = true;
}
const encodedMsg = transform.encode(message, meta);
channel.publish(name, routingKey, encodedMsg.buffer, encodedMsg.props, cb);
if (!behaviour.confirm) setImmediate(cb);
async.series(setupTasks, (err) => {
if (err) return cb(err);
channel.publish(name, routingKey, encodedMsg.buffer, encodedMsg.props, cb);
if (!behaviour.confirm) setImmediate(cb);
});
});

@@ -167,0 +172,0 @@ };

2

package.json

@@ -10,3 +10,3 @@ {

],
"version": "3.1.1",
"version": "3.1.2-beta",
"scripts": {

@@ -13,0 +13,0 @@ "predockertest": "docker-compose up -d rabbitmq && docker-compose build app",

@@ -5,2 +5,3 @@ "use strict";

const assert = require("assert");
const async = require("async");

@@ -40,3 +41,9 @@ Feature("Publish", () => {

let received = null;
const delay = 2500;
after((done) => utils.shutdown(broker, done));
before((done) => async.parallel([
(cb) => utils.deleteRabbitQueue("e1-exp-amqp-delayed-" + delay, cb),
(cb) => utils.deleteRabbitExchange("e1-exp-amqp-delayed" + delay, cb)
], done));
When("We have a connection", () => {

@@ -51,3 +58,3 @@ broker = utils.init();

And("We publish a message with a 2.5 second deplay", () => {
broker.delayedPublish("testRoutingKey", "Hello hi", 2500);
broker.delayedPublish("testRoutingKey", "Hello hi", delay);
});

@@ -67,3 +74,21 @@

});
When("We publish another message with a 2.5 second deplay", () => {
received = null;
broker.delayedPublish("testRoutingKey", "Hello hi 2", delay);
});
When("We wait one second", (done) => {
setTimeout(done, 1000);
});
Then("It should not have arrived yet", () => {
assert.equal(null, received);
});
When("We wait 3 more seconds", (done) => {
setTimeout(done, 3000);
});
Then("The message should have arrived", () => {
assert.equal("Hello hi 2", received);
});
});
});

@@ -38,2 +38,6 @@ "use strict";

function deleteRabbitExchange(exchange, done) {
deleteResource(`${adminUrl()}/api/exchange/%2F/${exchange}`, done);
}
function deleteRabbitQueue(queue, done) {

@@ -86,2 +90,3 @@ deleteResource(`${adminUrl()}/api/queues/%2F/${queue}`, done);

shutdown,
deleteRabbitExchange,
deleteRabbitQueue,

@@ -88,0 +93,0 @@ killRabbitConnections,

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc