off-sqs-debearloper
Advanced tools
Comparing version 0.1.0 to 0.1.1
258
app.js
@@ -1,257 +0,1 @@ | ||
/* OFF-SQS v0.1.6 | ||
* AWS SQS Manager from HUBALLIN.com | ||
* [developers] Rubén Sospedra, Edravis and Emilio Ríos, Debearloper | ||
* GNU License 2014 | ||
*/ | ||
var AWS = require('aws-sdk'), | ||
amqp = require('amqplib') | ||
_config = require('./config'), | ||
exec = require('exec'), | ||
when = require('when'); | ||
// Load credentials and rise up the SQS object | ||
if (_config.env.production){ | ||
AWS.config.update(_config.AWS_CONFIG_FILE); | ||
var sqs = new AWS.SQS(); | ||
} else if (_config.env.development){ | ||
exec('brew services start rabbitmq', function(err, out, code) { | ||
if (err instanceof Error) | ||
throw err; | ||
}); | ||
} | ||
var getSQSMessages = function(clbk,url){ | ||
var receiveParams = { | ||
QueueUrl: url, | ||
MaxNumberOfMessages: 10, | ||
WaitTimeSeconds: 20 //Change to 20 seconds? | ||
}; | ||
sqs.receiveMessage(receiveParams, function(err, data) { | ||
if (err) console.log("/38, " + err); | ||
else { | ||
if(data['Messages']){ | ||
// Start the crawling service | ||
var parsedData = parseMessage(data); | ||
clbk(parsedData); | ||
// deletes the received messages from the queue | ||
var toDelete=[]; | ||
for (var i=0; i<data['Messages'].length;i++){ | ||
toDelete.push({ | ||
'Id':data['Messages'][i]['MessageId'], | ||
'ReceiptHandle':data['Messages'][i]['ReceiptHandle'] | ||
}); | ||
} | ||
var deleteParams ={ | ||
QueueUrl: url, | ||
Entries : toDelete | ||
}; | ||
sqs.deleteMessageBatch(deleteParams, function(err, deleted) { | ||
if (err) { | ||
console.log("/54, " + err); | ||
} | ||
}); | ||
} | ||
//thanks to this recursive method, we can stay listening | ||
getMessages(clbk, url); | ||
} | ||
}); | ||
} | ||
var getRabbitMQMessages = function(clbk, url){ | ||
amqp.connect('amqp://localhost').then(function(conn){ | ||
process.once('SIGINT', function(){ | ||
conn.close(); | ||
}); | ||
return conn.createChannel().then(function(channel){ | ||
var ok = channel.assertQueue(url, {durable: false}); | ||
ok = ok.then(function(_qok){ | ||
return channel.consume(url, function(msg){ | ||
clbk(msg.content.toString()); | ||
}, {noAck: true}); | ||
}); | ||
return ok.then(function(_consumeOk){ | ||
console.log("[x] Waiting for messages."); | ||
}); | ||
}); | ||
}).then(null, console.warn); | ||
} | ||
//Consumer | ||
//the url param is the Unic Identificator of the queue | ||
var getMessages = function (clbk, url){ | ||
if (_config.env.production){ | ||
getSQSMessages(clbk, url); | ||
} else if(_config.env.development){ | ||
getRabbitMQMessages(clbk, url); | ||
} | ||
}; | ||
//In Amazon SQS, we need to create the queues, not in RabbitMQ | ||
var createQueue = function (_queueName){ | ||
var deferred = when.defer(); | ||
var cqParams = { | ||
QueueName: _queueName, | ||
Attributes: {} | ||
}; | ||
sqs.createQueue(cqParams, function(err, res){ | ||
if(err){ | ||
console.log("/67, " + err); | ||
// stop recursivity | ||
} else { | ||
deferred.resolve(); | ||
} | ||
}); | ||
return deferred.promise; | ||
}; | ||
var listRabbitQueues = function(_queueName){ | ||
var deferred = when.defer(); | ||
deferred.resolve(_queueName); | ||
return deferred.promise; | ||
} | ||
var listSQSQueues = function(_queueName){ | ||
var deferred = when.defer(); | ||
sqs.listQueues(function(err, res){ | ||
var queueFinder = false; | ||
if(res.QueueUrls) | ||
for(var i in res.QueueUrls) | ||
if(res.QueueUrls[i].split('/').pop() === _queueName) | ||
queueFinder = true; | ||
if(queueFinder){ | ||
getQueue(_queueName).then(function(queueUrl){ | ||
deferred.resolve(queueUrl); | ||
}); | ||
} else { | ||
createQueue(_queueName).then(function(){ | ||
getQueue(_queueName).then(function(queueUrl){ | ||
deferred.resolve(queueUrl); | ||
}) | ||
}); | ||
}; | ||
}); | ||
return deferred.promise; | ||
} | ||
var listQueues = function (_queueName){ | ||
var deferred = when.defer(); | ||
if (_config.env.production){ | ||
return listSQSQueues(_queueName); }; | ||
return listRabbitQueues(_queueName); | ||
}; | ||
var getQueue = function (_queueName){ | ||
var deferred = when.defer(); | ||
sqs.getQueueUrl({QueueName: _queueName}, function(err, data){ | ||
if(err){ | ||
console.log("/63, " + err); | ||
} else{ | ||
console.log("By using the SQS queue: " + data.QueueUrl); | ||
deferred.resolve(data.QueueUrl); | ||
} | ||
}); | ||
return deferred.promise; | ||
}; | ||
var sendWithSqs = function (sendingData, url){ | ||
if(typeof sendingData != 'string') | ||
sendingData = JSON.stringify(sendingData); | ||
var params = { | ||
MessageBody: sendingData, | ||
QueueUrl: url | ||
}; | ||
//At this moment, we send the message. | ||
sqs.sendMessage(params, function(err, data){ | ||
if(err){ | ||
console.log("" + err); | ||
} else { | ||
console.log("Success send"); | ||
} | ||
}); | ||
} | ||
var sendWithRabbitMQ = function (sendingData, url){ | ||
amqp.connect('amqp://localhost').then(function(conn){ | ||
return when(conn.createChannel().then(function(channel){ | ||
//nombre de la cola | ||
var queue = url; | ||
var ok = channel.assertQueue(queue, {durable: false}); | ||
return ok.then(function(_qok){ | ||
var dataToSend = JSON.stringify([sendingData]); | ||
channel.sendToQueue(queue, new Buffer(dataToSend)); | ||
console.log("[x] Send '%s'", sendingData); | ||
return channel.close(); | ||
}); | ||
})).ensure(function() { | ||
conn.close(); | ||
}); | ||
}).then(null, console.warn); | ||
} | ||
var sendMessages = function (sendingData, url){ | ||
if (_config.env.development){ | ||
sendWithRabbitMQ(sendingData, url); | ||
} | ||
else if (_config.env.production){ | ||
sendWithSqs(sendingData, url); | ||
} | ||
}; | ||
var parseMessage = function (data){ | ||
var url = []; | ||
for (var i in data.Messages){ | ||
url.push(data.Messages[i].Body); | ||
} | ||
return url; | ||
}; | ||
module.exports = { | ||
sender: function(queueName, data) { | ||
listQueues(queueName).then(function(queueUrl){ | ||
sendMessages(data, queueUrl); | ||
}); | ||
}, | ||
receiver: function(queueName) { | ||
var deferred = when.defer(); | ||
listQueues(queueName).then(function(queueUrl){ | ||
deferred.resolve({ | ||
receiver: getMessages, | ||
url: queueUrl | ||
}); | ||
}); | ||
return deferred.promise; | ||
}, | ||
getQueueUrl: function(queueName){ | ||
var deferred = when.defer(); | ||
listQueues(queueName).then(function(queueUrl){ | ||
deferred.resolve(queueUrl); | ||
}); | ||
return deferred.promise; | ||
}, | ||
parseMessage: function(data){ | ||
return parseMessage(data); | ||
} | ||
} | ||
module.exports = require('./lib'); |
{ | ||
"name": "off-sqs-debearloper", | ||
"description": "Helper app that manages SQS and RabbitMQ transactions", | ||
"version": "0.1.0", | ||
"version": "0.1.1", | ||
"repository": { | ||
@@ -6,0 +6,0 @@ "type": "git", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
10821
11
246
2
1