Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@techloop/amqp-broker

Package Overview
Dependencies
Maintainers
6
Versions
46
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@techloop/amqp-broker - npm Package Compare versions

Comparing version 1.6.1 to 1.6.2

10

index.js

@@ -1,6 +0,6 @@

const Broker = require("./lib/broker")
const Broker = require("./lib/broker");
// TODO Sentry
module.exports = (tasks) => {
const broker = new Broker({ tasks })
return broker.setup()
}
module.exports = (tasks, additionalOptions) => {
const broker = new Broker({ tasks, ...additionalOptions });
return broker.setup();
};
if (!process.env.RABBITMQ_DSN) {
console.error("ERROR: Missing RABBITMQ_DSN env var. App will now exit")
process.exit(1)
console.error("ERROR: Missing RABBITMQ_DSN env var. App will now exit");
process.exit(1);
}
var amqp = require("amqp")
var amqp = require("amqp");
module.exports = function (callback) {
module.exports = function(callback) {
var connection = amqp.createConnection(
{
url: process.env.RABBITMQ_DSN
},
{
defaultExchangeName: process.env.EXCHANGE_NAME
}
)
);
// add this for better debuging
connection.on("error", function (e) {
console.log("Error from amqp: ", e)
callback(e)
})
connection.on("error", function(e) {
console.log("Error from amqp: ", e);
callback(e);
});
// Wait for connection to become established.
connection.on("ready", function () {
connection.on("ready", function() {
console.log("Connected to AMQP server");
const exchange = connection.exchange(process.env.EXCHANGE_NAME, {durable: true, autoDelete: false});
const exchange = connection.exchange(process.env.EXCHANGE_NAME, {
durable: true,
autoDelete: false
});
callback(false, connection, exchange);
})
}
});
};
if (!process.env.EXCHANGE_NAME) {
console.error('ERROR: Missing EXCHANGE_NAME env var. App will now exit')
process.exit(1)
console.error("ERROR: Missing EXCHANGE_NAME env var. App will now exit");
process.exit(1);
}
const AMQP = require('./amqp')
const QUEUE_BIND_TO = '#'
const AMQP = require("./amqp");
const QUEUE_BIND_TO = "#";
const defaultOpts = {

@@ -13,101 +13,114 @@ exchange: process.env.EXCHANGE_NAME,

onQueueBind: (name, bindTo) => {
console.log(`Queue ${name}.${bindTo} binded successfully`)
console.log(`Queue ${name}.${bindTo} binded successfully`);
},
tasks: [],
events: []
}
events: [],
bindToExchange: true,
queueOptions: { autoDelete: false, durable: true },
subscribeOptions: { ack: false },
createFailedQueue: true
};
class Broker {
constructor (opts) {
this.set = this.set.bind(this)
this.opts = Object.assign(defaultOpts, opts)
this.q = null
this.connection = null
this.exchange = null
constructor(opts) {
this.set = this.set.bind(this);
this.opts = Object.assign({}, defaultOpts, opts);
this.q = null;
this.connection = null;
this.exchange = null;
}
onQueueOpen (q) {
console.log(`Queue ${q.name} was opened`)
this.set('q', q)
const onQueueBind = this.opts.onQueueBind
const parseIncomingMessage = this.parseIncomingMessage.bind(this)
const splittedBinds = this.opts.bindTo.split(',')
splittedBinds.forEach(bindTo => {
console.log(bindTo)
q.bind(process.env.EXCHANGE_NAME, bindTo, onQueueBind(q.name, bindTo))
})
onQueueOpen(q) {
console.log(`Queue ${q.name} was opened`);
this.set("q", q);
const onQueueBind = this.opts.onQueueBind;
const parseIncomingMessage = this.parseIncomingMessage.bind(this);
if (this.opts.bindToExchange) {
const splittedBinds = this.opts.bindTo.split(",");
splittedBinds.forEach(bindTo => {
console.log(bindTo);
q.bind(process.env.EXCHANGE_NAME, bindTo, onQueueBind(q.name, bindTo));
});
}
q.subscribe(parseIncomingMessage)
q.subscribe(this.opts.subscribeOptions, parseIncomingMessage);
}
onFailedQueueOpen (q) {
console.log(`Queue ${q.name} was opened`)
this.set('q', q)
const onQueueBind = this.opts.onQueueBind
const splittedBinds = this.opts.bindTo.split(',')
splittedBinds.forEach(bindTo => {
bindTo = `failed.${bindTo}`
console.log(bindTo)
q.bind(this.exchange, bindTo, onQueueBind(q.name, bindTo))
})
if (process.env.PROCESS_FAILED_QUEUE === 'true') {
const parseIncomingMessage = this.parseIncomingMessage.bind(this)
q.subscribe(parseIncomingMessage)
onFailedQueueOpen(q) {
console.log(`Queue ${q.name} was opened`);
this.set("q", q);
const onQueueBind = this.opts.onQueueBind;
if (this.opts.bindToExchange) {
const splittedBinds = this.opts.bindTo.split(",");
splittedBinds.forEach(bindTo => {
bindTo = `failed.${bindTo}`;
console.log(bindTo);
q.bind(this.exchange, bindTo, onQueueBind(q.name, bindTo));
});
}
if (
process.env.PROCESS_FAILED_QUEUE === "true" &&
this.opts.createFailedQueue
) {
const parseIncomingMessage = this.parseIncomingMessage.bind(this);
q.subscribe(this.opts.subscribeOptions, parseIncomingMessage);
}
}
set (key, value) {
this[key] = value
set(key, value) {
this[key] = value;
}
setup () {
setup() {
if (this.opts.tasks.length === 0) {
throw new Error(
'No tasks were given! Microservice WILL NOT react to amqp messages.'
)
"No tasks were given! Microservice WILL NOT react to amqp messages."
);
}
const onQueueOpen = this.onQueueOpen.bind(this)
const onFailedQueueOpen = this.onFailedQueueOpen.bind(this)
const onQueueOpen = this.onQueueOpen.bind(this);
const onFailedQueueOpen = this.onFailedQueueOpen.bind(this);
AMQP((err, connection, exchange) => {
if (err) {
throw new Error(err)
process.exit(1)
throw new Error(err);
process.exit(1);
}
this.set('connection', connection)
this.set('exchange', exchange)
const options = { autoDelete: false, durable: true }
this.set("connection", connection);
this.set("exchange", exchange);
const options = this.opts.queueOptions;
connection.queue(process.env.QUEUE_NAME, options, q => {
onQueueOpen(q)
})
connection.queue(`${process.env.QUEUE_NAME}_failed`, options, q => {
onFailedQueueOpen(q)
})
})
onQueueOpen(q);
});
if (this.opts.createFailedQueue) {
connection.queue(`${process.env.QUEUE_NAME}_failed`, options, q => {
onFailedQueueOpen(q);
});
}
});
}
parseIncomingMessage (message, headers, deliveryInfo, messageObject) {
let data = new Buffer(message.data).toString()
parseIncomingMessage(message, headers, deliveryInfo, messageObject) {
let data = new Buffer(message.data).toString();
console.log(
'Received new AMQP message: ',
"Received new AMQP message: ",
data,
' deliveryInfo: ',
" deliveryInfo: ",
deliveryInfo,
' headers:',
" headers:",
headers
)
);
if (!deliveryInfo.routingKey) {
console.error('Received message without routing key, discarding')
return false
console.error("Received message without routing key, discarding");
return false;
}
if (data) {
try {
data = JSON.parse(data)
data = JSON.parse(data);
} catch (e) {
console.error('ERROR: Message body was not valid JSON: ', data, e)
return false
console.error("ERROR: Message body was not valid JSON: ", data, e);
return false;
}
}
console.log(this.opts.tasks)
const tasks = this.opts.tasks.tasks
console.log(this.opts.tasks);
const tasks = this.opts.tasks.tasks;
const matchingTasks = tasks.filter(

@@ -117,3 +130,3 @@ task =>

`failed.${task.key}` === deliveryInfo.routingKey
)
);
if (matchingTasks.length === 0) {

@@ -124,18 +137,39 @@ console.log(

}", please review your bindings.`
)
return false
);
return false;
}
const Task = matchingTasks[0]
console.log('Found matching Task: ', Task)
const Task = matchingTasks[0];
console.log("Found matching Task: ", Task);
Task.callback({
msg: data,
key: deliveryInfo.routingKey,
connection: this.connection,
exchange: this.exchange,
deliveryInfo
})
const options = this.opts;
Promise.resolve()
.then(() =>
Task.callback({
msg: data,
key: deliveryInfo.routingKey,
connection: this.connection,
exchange: this.exchange,
deliveryInfo
})
)
.then(function() {
console.log("Message processed successfully", data);
if (options.subscribeOptions.ack) {
messageObject.acknowledge();
}
})
.catch(function(error) {
console.log(
`Exception handler: rejecting message with routing key: ${
deliveryInfo.routingKey
}. Message: `,
data
);
if (options.subscribeOptions.ack) {
messageObject.reject(false);
}
});
}
}
module.exports = Broker
module.exports = Broker;
{
"name": "@techloop/amqp-broker",
"version": "1.6.1",
"version": "1.6.2",
"description": "AMQP broker for microservices communication",

@@ -5,0 +5,0 @@ "main": "index.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc