@techloop/amqp-broker
Advanced tools
Comparing version 1.6.1 to 1.6.2
10
index.js
@@ -1,6 +0,6 @@ | ||
const Broker = require("./lib/broker") | ||
const Broker = require("./lib/broker"); | ||
// TODO Sentry | ||
module.exports = (tasks) => { | ||
const broker = new Broker({ tasks }) | ||
return broker.setup() | ||
} | ||
module.exports = (tasks, additionalOptions) => { | ||
const broker = new Broker({ tasks, ...additionalOptions }); | ||
return broker.setup(); | ||
}; |
if (!process.env.RABBITMQ_DSN) { | ||
console.error("ERROR: Missing RABBITMQ_DSN env var. App will now exit") | ||
process.exit(1) | ||
console.error("ERROR: Missing RABBITMQ_DSN env var. App will now exit"); | ||
process.exit(1); | ||
} | ||
var amqp = require("amqp") | ||
var amqp = require("amqp"); | ||
module.exports = function (callback) { | ||
module.exports = function(callback) { | ||
var connection = amqp.createConnection( | ||
{ | ||
url: process.env.RABBITMQ_DSN | ||
}, | ||
{ | ||
defaultExchangeName: process.env.EXCHANGE_NAME | ||
} | ||
) | ||
); | ||
// add this for better debuging | ||
connection.on("error", function (e) { | ||
console.log("Error from amqp: ", e) | ||
callback(e) | ||
}) | ||
connection.on("error", function(e) { | ||
console.log("Error from amqp: ", e); | ||
callback(e); | ||
}); | ||
// Wait for connection to become established. | ||
connection.on("ready", function () { | ||
connection.on("ready", function() { | ||
console.log("Connected to AMQP server"); | ||
const exchange = connection.exchange(process.env.EXCHANGE_NAME, {durable: true, autoDelete: false}); | ||
const exchange = connection.exchange(process.env.EXCHANGE_NAME, { | ||
durable: true, | ||
autoDelete: false | ||
}); | ||
callback(false, connection, exchange); | ||
}) | ||
} | ||
}); | ||
}; |
if (!process.env.EXCHANGE_NAME) { | ||
console.error('ERROR: Missing EXCHANGE_NAME env var. App will now exit') | ||
process.exit(1) | ||
console.error("ERROR: Missing EXCHANGE_NAME env var. App will now exit"); | ||
process.exit(1); | ||
} | ||
const AMQP = require('./amqp') | ||
const QUEUE_BIND_TO = '#' | ||
const AMQP = require("./amqp"); | ||
const QUEUE_BIND_TO = "#"; | ||
const defaultOpts = { | ||
@@ -13,101 +13,114 @@ exchange: process.env.EXCHANGE_NAME, | ||
onQueueBind: (name, bindTo) => { | ||
console.log(`Queue ${name}.${bindTo} binded successfully`) | ||
console.log(`Queue ${name}.${bindTo} binded successfully`); | ||
}, | ||
tasks: [], | ||
events: [] | ||
} | ||
events: [], | ||
bindToExchange: true, | ||
queueOptions: { autoDelete: false, durable: true }, | ||
subscribeOptions: { ack: false }, | ||
createFailedQueue: true | ||
}; | ||
class Broker { | ||
constructor (opts) { | ||
this.set = this.set.bind(this) | ||
this.opts = Object.assign(defaultOpts, opts) | ||
this.q = null | ||
this.connection = null | ||
this.exchange = null | ||
constructor(opts) { | ||
this.set = this.set.bind(this); | ||
this.opts = Object.assign({}, defaultOpts, opts); | ||
this.q = null; | ||
this.connection = null; | ||
this.exchange = null; | ||
} | ||
onQueueOpen (q) { | ||
console.log(`Queue ${q.name} was opened`) | ||
this.set('q', q) | ||
const onQueueBind = this.opts.onQueueBind | ||
const parseIncomingMessage = this.parseIncomingMessage.bind(this) | ||
const splittedBinds = this.opts.bindTo.split(',') | ||
splittedBinds.forEach(bindTo => { | ||
console.log(bindTo) | ||
q.bind(process.env.EXCHANGE_NAME, bindTo, onQueueBind(q.name, bindTo)) | ||
}) | ||
onQueueOpen(q) { | ||
console.log(`Queue ${q.name} was opened`); | ||
this.set("q", q); | ||
const onQueueBind = this.opts.onQueueBind; | ||
const parseIncomingMessage = this.parseIncomingMessage.bind(this); | ||
if (this.opts.bindToExchange) { | ||
const splittedBinds = this.opts.bindTo.split(","); | ||
splittedBinds.forEach(bindTo => { | ||
console.log(bindTo); | ||
q.bind(process.env.EXCHANGE_NAME, bindTo, onQueueBind(q.name, bindTo)); | ||
}); | ||
} | ||
q.subscribe(parseIncomingMessage) | ||
q.subscribe(this.opts.subscribeOptions, parseIncomingMessage); | ||
} | ||
onFailedQueueOpen (q) { | ||
console.log(`Queue ${q.name} was opened`) | ||
this.set('q', q) | ||
const onQueueBind = this.opts.onQueueBind | ||
const splittedBinds = this.opts.bindTo.split(',') | ||
splittedBinds.forEach(bindTo => { | ||
bindTo = `failed.${bindTo}` | ||
console.log(bindTo) | ||
q.bind(this.exchange, bindTo, onQueueBind(q.name, bindTo)) | ||
}) | ||
if (process.env.PROCESS_FAILED_QUEUE === 'true') { | ||
const parseIncomingMessage = this.parseIncomingMessage.bind(this) | ||
q.subscribe(parseIncomingMessage) | ||
onFailedQueueOpen(q) { | ||
console.log(`Queue ${q.name} was opened`); | ||
this.set("q", q); | ||
const onQueueBind = this.opts.onQueueBind; | ||
if (this.opts.bindToExchange) { | ||
const splittedBinds = this.opts.bindTo.split(","); | ||
splittedBinds.forEach(bindTo => { | ||
bindTo = `failed.${bindTo}`; | ||
console.log(bindTo); | ||
q.bind(this.exchange, bindTo, onQueueBind(q.name, bindTo)); | ||
}); | ||
} | ||
if ( | ||
process.env.PROCESS_FAILED_QUEUE === "true" && | ||
this.opts.createFailedQueue | ||
) { | ||
const parseIncomingMessage = this.parseIncomingMessage.bind(this); | ||
q.subscribe(this.opts.subscribeOptions, parseIncomingMessage); | ||
} | ||
} | ||
set (key, value) { | ||
this[key] = value | ||
set(key, value) { | ||
this[key] = value; | ||
} | ||
setup () { | ||
setup() { | ||
if (this.opts.tasks.length === 0) { | ||
throw new Error( | ||
'No tasks were given! Microservice WILL NOT react to amqp messages.' | ||
) | ||
"No tasks were given! Microservice WILL NOT react to amqp messages." | ||
); | ||
} | ||
const onQueueOpen = this.onQueueOpen.bind(this) | ||
const onFailedQueueOpen = this.onFailedQueueOpen.bind(this) | ||
const onQueueOpen = this.onQueueOpen.bind(this); | ||
const onFailedQueueOpen = this.onFailedQueueOpen.bind(this); | ||
AMQP((err, connection, exchange) => { | ||
if (err) { | ||
throw new Error(err) | ||
process.exit(1) | ||
throw new Error(err); | ||
process.exit(1); | ||
} | ||
this.set('connection', connection) | ||
this.set('exchange', exchange) | ||
const options = { autoDelete: false, durable: true } | ||
this.set("connection", connection); | ||
this.set("exchange", exchange); | ||
const options = this.opts.queueOptions; | ||
connection.queue(process.env.QUEUE_NAME, options, q => { | ||
onQueueOpen(q) | ||
}) | ||
connection.queue(`${process.env.QUEUE_NAME}_failed`, options, q => { | ||
onFailedQueueOpen(q) | ||
}) | ||
}) | ||
onQueueOpen(q); | ||
}); | ||
if (this.opts.createFailedQueue) { | ||
connection.queue(`${process.env.QUEUE_NAME}_failed`, options, q => { | ||
onFailedQueueOpen(q); | ||
}); | ||
} | ||
}); | ||
} | ||
parseIncomingMessage (message, headers, deliveryInfo, messageObject) { | ||
let data = new Buffer(message.data).toString() | ||
parseIncomingMessage(message, headers, deliveryInfo, messageObject) { | ||
let data = new Buffer(message.data).toString(); | ||
console.log( | ||
'Received new AMQP message: ', | ||
"Received new AMQP message: ", | ||
data, | ||
' deliveryInfo: ', | ||
" deliveryInfo: ", | ||
deliveryInfo, | ||
' headers:', | ||
" headers:", | ||
headers | ||
) | ||
); | ||
if (!deliveryInfo.routingKey) { | ||
console.error('Received message without routing key, discarding') | ||
return false | ||
console.error("Received message without routing key, discarding"); | ||
return false; | ||
} | ||
if (data) { | ||
try { | ||
data = JSON.parse(data) | ||
data = JSON.parse(data); | ||
} catch (e) { | ||
console.error('ERROR: Message body was not valid JSON: ', data, e) | ||
return false | ||
console.error("ERROR: Message body was not valid JSON: ", data, e); | ||
return false; | ||
} | ||
} | ||
console.log(this.opts.tasks) | ||
const tasks = this.opts.tasks.tasks | ||
console.log(this.opts.tasks); | ||
const tasks = this.opts.tasks.tasks; | ||
const matchingTasks = tasks.filter( | ||
@@ -117,3 +130,3 @@ task => | ||
`failed.${task.key}` === deliveryInfo.routingKey | ||
) | ||
); | ||
if (matchingTasks.length === 0) { | ||
@@ -124,18 +137,39 @@ console.log( | ||
}", please review your bindings.` | ||
) | ||
return false | ||
); | ||
return false; | ||
} | ||
const Task = matchingTasks[0] | ||
console.log('Found matching Task: ', Task) | ||
const Task = matchingTasks[0]; | ||
console.log("Found matching Task: ", Task); | ||
Task.callback({ | ||
msg: data, | ||
key: deliveryInfo.routingKey, | ||
connection: this.connection, | ||
exchange: this.exchange, | ||
deliveryInfo | ||
}) | ||
const options = this.opts; | ||
Promise.resolve() | ||
.then(() => | ||
Task.callback({ | ||
msg: data, | ||
key: deliveryInfo.routingKey, | ||
connection: this.connection, | ||
exchange: this.exchange, | ||
deliveryInfo | ||
}) | ||
) | ||
.then(function() { | ||
console.log("Message processed successfully", data); | ||
if (options.subscribeOptions.ack) { | ||
messageObject.acknowledge(); | ||
} | ||
}) | ||
.catch(function(error) { | ||
console.log( | ||
`Exception handler: rejecting message with routing key: ${ | ||
deliveryInfo.routingKey | ||
}. Message: `, | ||
data | ||
); | ||
if (options.subscribeOptions.ack) { | ||
messageObject.reject(false); | ||
} | ||
}); | ||
} | ||
} | ||
module.exports = Broker | ||
module.exports = Broker; |
{ | ||
"name": "@techloop/amqp-broker", | ||
"version": "1.6.1", | ||
"version": "1.6.2", | ||
"description": "AMQP broker for microservices communication", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
22074
196
12
1