exp-amqp-connection
Advanced tools
Comparing version 3.1.1 to 3.1.2-beta
25
index.js
@@ -7,2 +7,3 @@ "use strict"; | ||
const transform = require("./lib/transform"); | ||
const async = require("async"); | ||
@@ -144,10 +145,10 @@ const TMP_Q_TTL = 60000; | ||
const channel = bootstrapRes.pubChannel; | ||
const setupTasks = []; | ||
if (!delayedAssets[name]) { | ||
behaviour.logger.info("Creating delayed queue/exchange pair", name); | ||
channel.assertExchange(name, "fanout", { | ||
behaviour.logger.info("Creating delayed queue/exchange pair:", name); | ||
setupTasks.push( | ||
(done) => channel.assertExchange(name, "fanout", {durable: true, autoDelete: true}, done) | ||
); | ||
const queueArgs = { | ||
durable: true, | ||
autoDelete: true | ||
}); | ||
channel.assertQueue(name, { | ||
durable: true, | ||
autoDelete: true, | ||
@@ -158,9 +159,13 @@ arguments: { | ||
} | ||
}); | ||
channel.bindQueue(name, name, "#", {}); | ||
}; | ||
setupTasks.push((done) => channel.assertQueue(name, queueArgs, done)); | ||
setupTasks.push((done) => channel.bindQueue(name, name, "#", {}, done)); | ||
delayedAssets[name] = true; | ||
} | ||
const encodedMsg = transform.encode(message, meta); | ||
channel.publish(name, routingKey, encodedMsg.buffer, encodedMsg.props, cb); | ||
if (!behaviour.confirm) setImmediate(cb); | ||
async.series(setupTasks, (err) => { | ||
if (err) return cb(err); | ||
channel.publish(name, routingKey, encodedMsg.buffer, encodedMsg.props, cb); | ||
if (!behaviour.confirm) setImmediate(cb); | ||
}); | ||
}); | ||
@@ -167,0 +172,0 @@ }; |
@@ -10,3 +10,3 @@ { | ||
], | ||
"version": "3.1.1", | ||
"version": "3.1.2-beta", | ||
"scripts": { | ||
@@ -13,0 +13,0 @@ "predockertest": "docker-compose up -d rabbitmq && docker-compose build app", |
@@ -5,2 +5,3 @@ "use strict"; | ||
const assert = require("assert"); | ||
const async = require("async"); | ||
@@ -40,3 +41,9 @@ Feature("Publish", () => { | ||
let received = null; | ||
const delay = 2500; | ||
after((done) => utils.shutdown(broker, done)); | ||
before((done) => async.parallel([ | ||
(cb) => utils.deleteRabbitQueue("e1-exp-amqp-delayed-" + delay, cb), | ||
(cb) => utils.deleteRabbitExchange("e1-exp-amqp-delayed" + delay, cb) | ||
], done)); | ||
When("We have a connection", () => { | ||
@@ -51,3 +58,3 @@ broker = utils.init(); | ||
And("We publish a message with a 2.5 second deplay", () => { | ||
broker.delayedPublish("testRoutingKey", "Hello hi", 2500); | ||
broker.delayedPublish("testRoutingKey", "Hello hi", delay); | ||
}); | ||
@@ -67,3 +74,21 @@ | ||
}); | ||
When("We publish another message with a 2.5 second deplay", () => { | ||
received = null; | ||
broker.delayedPublish("testRoutingKey", "Hello hi 2", delay); | ||
}); | ||
When("We wait one second", (done) => { | ||
setTimeout(done, 1000); | ||
}); | ||
Then("It should not have arrived yet", () => { | ||
assert.equal(null, received); | ||
}); | ||
When("We wait 3 more seconds", (done) => { | ||
setTimeout(done, 3000); | ||
}); | ||
Then("The message should have arrived", () => { | ||
assert.equal("Hello hi 2", received); | ||
}); | ||
}); | ||
}); |
@@ -38,2 +38,6 @@ "use strict"; | ||
function deleteRabbitExchange(exchange, done) { | ||
deleteResource(`${adminUrl()}/api/exchange/%2F/${exchange}`, done); | ||
} | ||
function deleteRabbitQueue(queue, done) { | ||
@@ -86,2 +90,3 @@ deleteResource(`${adminUrl()}/api/queues/%2F/${queue}`, done); | ||
shutdown, | ||
deleteRabbitExchange, | ||
deleteRabbitQueue, | ||
@@ -88,0 +93,0 @@ killRabbitConnections, |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Native code
Supply chain riskContains native code (e.g., compiled binaries or shared libraries). Including native code can obscure malicious behavior.
Found 1 instance in 1 package
1244
0
55437
29
2