queue-processor
Advanced tools
Comparing version 1.1.0 to 1.2.0
70
index.js
@@ -1,33 +0,12 @@ | ||
var AWS = require('aws-sdk'); | ||
var AWS = require('aws-sdk'), | ||
righto = require('righto'); | ||
function deleteMessage(sqs, queue, messageHandle, logger, recurse) { | ||
function deleteMessage(sqs, queue, messageHandle, callback) { | ||
sqs.deleteMessage({ | ||
QueueUrl: queue, | ||
ReceiptHandle: messageHandle | ||
}, function(error) { | ||
if (error) { | ||
logger.error('Error deleting message from SQS: ' + error); | ||
} | ||
recurse(); | ||
}); | ||
}, callback); | ||
} | ||
function createProcessedCallback(sqs, config, message, logger, recurse) { | ||
return function(error) { | ||
if (error) { | ||
logger.error('Message processing error: ' + error); | ||
if (!config.deleteOnError) { | ||
return recurse(); | ||
} | ||
} | ||
deleteMessage(sqs, config.queueUrl, message.ReceiptHandle, logger, recurse); | ||
}; | ||
} | ||
function getMessage(sqs, config, logger, processingFunction, recurse) { | ||
var currentMessage; | ||
function getMessage(sqs, config, callback) { | ||
sqs.receiveMessage( | ||
@@ -40,16 +19,9 @@ { | ||
function(error, data) { | ||
if (error) { | ||
logger.error('Error recieving message from SQS: ' + error); | ||
} | ||
if (data && data.Messages) { | ||
currentMessage = data.Messages[0]; | ||
var nextMessage = data.Messages[0]; | ||
return processingFunction( | ||
currentMessage, | ||
createProcessedCallback(sqs, config, currentMessage, logger, recurse) | ||
); | ||
return callback(null, nextMessage); | ||
} | ||
recurse(); | ||
callback(error); | ||
} | ||
@@ -71,11 +43,29 @@ ); | ||
function recurse() { | ||
setImmediate(function() { | ||
getMessage(sqs, config, logger, processingFunction, recurse); | ||
function processNextMessage() { | ||
var message = righto(getMessage, sqs, config); | ||
var processed = righto.mate(righto.after(righto(processingFunction, message))); | ||
var shouldDelete = righto.handle(processed, function(error, done){ | ||
logger.error(error); | ||
done(null, !!config.deleteOnError); | ||
}); | ||
var complete = righto(function(message, shouldDelete, done){ | ||
if(shouldDelete !== false){ | ||
return deleteMessage(sqs, config.queueUrl, message.ReceiptHandle, done); | ||
} | ||
done(); | ||
}, message, shouldDelete); | ||
complete(function(error){ | ||
if (error) { | ||
logger.error(error); | ||
} | ||
processNextMessage(); | ||
}); | ||
} | ||
recurse(); | ||
processNextMessage(); | ||
} | ||
module.exports = setupListener; |
{ | ||
"name": "queue-processor", | ||
"version": "1.1.0", | ||
"version": "1.2.0", | ||
"description": "Processes an AWS SQS queue", | ||
@@ -27,4 +27,5 @@ "main": "index.js", | ||
"dependencies": { | ||
"aws-sdk": "^2.1.43" | ||
"aws-sdk": "^2.1.43", | ||
"righto": "^2.5.4" | ||
} | ||
} |
5293
2
56
+ Addedrighto@^2.5.4
+ Addedabbott@1.1.3(transitive)
+ Addedrighto@2.6.0(transitive)