Socket
Socket
Sign inDemoInstall

queue-processor

Package Overview
Dependencies
39
Maintainers
1
Versions
7
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.1.0 to 1.2.0

70

index.js

@@ -1,33 +0,12 @@

var AWS = require('aws-sdk');
var AWS = require('aws-sdk'),
righto = require('righto');
function deleteMessage(sqs, queue, messageHandle, logger, recurse) {
function deleteMessage(sqs, queue, messageHandle, callback) {
sqs.deleteMessage({
QueueUrl: queue,
ReceiptHandle: messageHandle
}, function(error) {
if (error) {
logger.error('Error deleting message from SQS: ' + error);
}
recurse();
});
}, callback);
}
function createProcessedCallback(sqs, config, message, logger, recurse) {
return function(error) {
if (error) {
logger.error('Message processing error: ' + error);
if (!config.deleteOnError) {
return recurse();
}
}
deleteMessage(sqs, config.queueUrl, message.ReceiptHandle, logger, recurse);
};
}
function getMessage(sqs, config, logger, processingFunction, recurse) {
var currentMessage;
function getMessage(sqs, config, callback) {
sqs.receiveMessage(

@@ -40,16 +19,9 @@ {

function(error, data) {
if (error) {
logger.error('Error recieving message from SQS: ' + error);
}
if (data && data.Messages) {
currentMessage = data.Messages[0];
var nextMessage = data.Messages[0];
return processingFunction(
currentMessage,
createProcessedCallback(sqs, config, currentMessage, logger, recurse)
);
return callback(null, nextMessage);
}
recurse();
callback(error);
}

@@ -71,11 +43,29 @@ );

function recurse() {
setImmediate(function() {
getMessage(sqs, config, logger, processingFunction, recurse);
function processNextMessage() {
var message = righto(getMessage, sqs, config);
var processed = righto.mate(righto.after(righto(processingFunction, message)));
var shouldDelete = righto.handle(processed, function(error, done){
logger.error(error);
done(null, !!config.deleteOnError);
});
var complete = righto(function(message, shouldDelete, done){
if(shouldDelete !== false){
return deleteMessage(sqs, config.queueUrl, message.ReceiptHandle, done);
}
done();
}, message, shouldDelete);
complete(function(error){
if (error) {
logger.error(error);
}
processNextMessage();
});
}
recurse();
processNextMessage();
}
module.exports = setupListener;
{
"name": "queue-processor",
"version": "1.1.0",
"version": "1.2.0",
"description": "Processes an AWS SQS queue",

@@ -27,4 +27,5 @@ "main": "index.js",

"dependencies": {
"aws-sdk": "^2.1.43"
"aws-sdk": "^2.1.43",
"righto": "^2.5.4"
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc