New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

exp-amqp-connection

Package Overview
Dependencies
Maintainers
5
Versions
58
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

exp-amqp-connection - npm Package Compare versions

Comparing version

to
3.1.1-beta-3

50

index.js

@@ -21,4 +21,4 @@ "use strict";

heartbeat: 10,
resubscribeOnError: true,
productName: getProductName(),
resubscribeOnError: true,
queueArguments: {},

@@ -32,2 +32,4 @@ prefetch: 20,

const api = new EventEmitter();
const amqpEvents = new EventEmitter();
amqpEvents.on("error", (err) => api.emit("error", err));
behaviour = Object.assign({}, defaultBehaviour, behaviour);

@@ -38,9 +40,15 @@

bootstrap(behaviour, (bootstrapErr, bootstrapRes) => {
if (bootstrapErr) api.emit("error", bootstrapErr);
if (bootstrapErr) amqpEvents.emit("error", bootstrapErr);
if (bootstrapRes && bootstrapRes.virgin) {
bootstrapRes.connection.on("error", (err) => api.emit("error", err));
bootstrapRes.connection.on("close", (why) => api.emit("error", why));
api.emit("connected");
bootstrapRes.pubChannel.on("error", (err) => api.emit("error", err));
bootstrapRes.subChannel.on("error", (err) => api.emit("error", err));
bootstrapRes.connection.on("error", (err) => amqpEvents.emit("error", `AMQP connection error: ${err}`));
bootstrapRes.connection.on("close", (err) => amqpEvents.emit("error", `AMQP connection error: ${err}`));
// Only way to detect explicit close from management console....
if (bootstrapRes.connection.connection && bootstrapRes.connection.connection.stream) {
bootstrapRes.connection.connection.stream.on(
"close", (why) => amqpEvents.emit("error", `AMQP connection closed: ${why}`)
);
}
bootstrapRes.pubChannel.on("error", (err) => amqpEvents.emit("error", `AMQP pub channel error: ${err}`));
bootstrapRes.subChannel.on("error", (err) => amqpEvents.emit("error", `AMQP sub channel error: ${err}`));
bootstrapRes.pubChannel.assertExchange(behaviour.exchange, "topic");

@@ -70,3 +78,3 @@ }

if (!message) {
return api.emit("error", "Subscription cancelled");
return amqpEvents.emit("error", "Subscription cancelled");
}

@@ -102,16 +110,15 @@ const ackFun = () => subChannel.ack(message);

let attempt = 1;
if (behaviour.resubscribeOnError) {
const resubscribeOnError = (err) => {
if (err && !resubTimer) {
behaviour.logger.info("Amqp error received. Resubscribing in 5 secs.", err.message);
resubTimer = setTimeout(() => {
attempt = attempt + 1;
doSubscribe(routingKeyOrKeys, queue, handler, attempt);
resubTimer = null;
}, 5000);
}
};
api.on("error", resubscribeOnError);
}
const resubscribeOnError = (err) => {
if (err && !resubTimer && behaviour.resubscribeOnError) {
behaviour.logger.info("Amqp error received. Resubscribing in 5 secs.", err.message);
resubTimer = setTimeout(() => {
attempt = attempt + 1;
doSubscribe(routingKeyOrKeys, queue, handler, attempt);
resubTimer = null;
}, 5000);
}
};
doSubscribe(routingKeyOrKeys, queue, handler, attempt);
amqpEvents.on("error", resubscribeOnError);
};

@@ -152,4 +159,3 @@

"x-dead-letter-exchange": behaviour.exchange,
"x-message-ttl": delay,
"x-expires": delay + 60000
"x-message-ttl": delay
}

@@ -156,0 +162,0 @@ });

@@ -10,3 +10,3 @@ {

],
"version": "3.1.1-beta-2",
"version": "3.1.1-beta-3",
"scripts": {

@@ -13,0 +13,0 @@ "predockertest": "docker-compose up -d rabbitmq && docker-compose build app",

@@ -33,3 +33,3 @@ "use strict";

And("We create a subscription that will nack one message without requeueing", (done) => {
broker.on("subscribed", () => done());
broker.once("subscribed", () => done());
broker.subscribeTmp("testNackRoutingKey", (msg, meta, ack) => {

@@ -36,0 +36,0 @@ received.push({

@@ -69,3 +69,2 @@ "use strict";

Scenario("Unparsable message", () => {

@@ -260,5 +259,5 @@ let nMessages = 0;

Then("An error 320 should be raised", () => {
assert.equal(320, error.code);
assert(error != null);
});
});
});

@@ -11,6 +11,8 @@ "use strict";

const rabbitUrl = process.env.RABBIT_URL || "amqp://guest:guest@localhost";
const defaultBehaviour = { exchange: "e1", confirm: true, url: rabbitUrl };
const defaultBehaviour = { exchange: "e1", confirm: true, url: rabbitUrl, resubscribeOnError: false };
function init(customBehaviour) {
return impl(Object.assign({}, defaultBehaviour, customBehaviour));
const broker = impl(Object.assign({}, defaultBehaviour, customBehaviour));
broker.on("error", () => {});
return broker;
}

@@ -17,0 +19,0 @@

Sorry, the diff of this file is not supported yet