azure-queue-client
Advanced tools
Comparing version
@@ -89,2 +89,8 @@ /* | ||
// ensure we have also a metainformation object | ||
if (!message.messageMetaInformation) { message.messageMetaInformation = {}; } | ||
// set the current state to running | ||
self.setMetaInformation(message, self.runtimeDataKeys.state, 'running'); | ||
// now call the message handler and check what the result is | ||
@@ -107,2 +113,11 @@ _messageHandler(message).then(function () { | ||
/* | ||
* Defines the default listener meta data elements | ||
*/ | ||
self.runtimeDataKeys = { | ||
state: '_state', | ||
error: '_error', | ||
delayedJob: '_delayedJob' | ||
} | ||
self.onMessage = function (handler) { | ||
@@ -136,17 +151,24 @@ _messageHandler = handler; | ||
// process the next tiemout | ||
var nextTimeout = retryPolicy.nextTimeout(processedMessage); | ||
var nextTimeout = retryPolicy.nextTimeout(processedMessage, self); | ||
if (nextTimeout < 0) { | ||
return self.error(new Error("Retry counter for delayed job is exceeded")); | ||
return self.error(new Error("Retry counter for delayed job is exceeded"), processedMessage); | ||
} | ||
// adjust delay job data | ||
if (!processedMessage.messageData._delayedJob) { | ||
processedMessage.messageData._delayedJob = {count: 0, lastTimeout: 0}; | ||
} | ||
processedMessage.messageData._delayedJob.count += 1; | ||
processedMessage.messageData._delayedJob.lastTimeout = nextTimeout; | ||
// get the delayed job date | ||
var delayedJobInformation = self.getMetaInformation(processedMessage, self.runtimeDataKeys.delayedJob, {count: 0, lastTimeout: 0}); | ||
// increas the values | ||
delayedJobInformation.count += 1; | ||
delayedJobInformation.lastTimeout = nextTimeout; | ||
// write the meta information back | ||
self.setMetaInformation(processedMessage, self.runtimeDataKeys.delayedJob, delayedJobInformation); | ||
// adjust the state | ||
self.setMetaInformation(processedMessage, self.runtimeDataKeys.state, 'delayed'); | ||
self.setMetaInformation(processedMessage, self.runtimeDataKeys.error, null); | ||
// enqueue the job | ||
var azureQueue = new AzureQueue(_queueConfig.qName, _queueConfig.qStorageAccountKey, _queueConfig.qStorageAccountSecret); | ||
return azureQueue.sendMessage(processedMessage.messageType, processedMessage.messageData, {visibilityTimeout: nextTimeout}); | ||
return azureQueue.sendMessageWithMetaData(processedMessage.messageType, processedMessage.messageData, processedMessage.messageMetaInformation, {visibilityTimeout: nextTimeout}); | ||
}; | ||
@@ -157,3 +179,11 @@ | ||
*/ | ||
self.done = function() { | ||
self.done = function(processedMessage) { | ||
// adjust the state | ||
if (processedMessage) { | ||
self.setMetaInformation(processedMessage, self.runtimeDataKeys.state, 'finished'); | ||
self.setMetaInformation(processedMessage, self.runtimeDataKeys.error, null); | ||
} | ||
// done | ||
return q.resolve(); | ||
@@ -165,7 +195,36 @@ }; | ||
*/ | ||
self.error = function(error) { | ||
self.error = function(error, processedMessage) { | ||
// adjust the state | ||
if (processedMessage) { | ||
self.setMetaInformation(processedMessage, self.runtimeDataKeys.state, 'failed'); | ||
self.setMetaInformation(processedMessage, self.runtimeDataKeys.error, error); | ||
} | ||
// reject | ||
return q.reject(error); | ||
} | ||
/* | ||
* Returns the current state of the job | ||
*/ | ||
self.getState = function(processedMessage) { | ||
return self.getMetaInformation(processedMessage, self.runtimeDataKeys.state); | ||
} | ||
/* | ||
* Allows to set meta information from outside | ||
*/ | ||
self.setMetaInformation = function(processedMessage, key, valueObject) { | ||
processedMessage.messageMetaInformation[key] = valueObject; | ||
} | ||
/* | ||
* Returns meta information | ||
*/ | ||
self.getMetaInformation = function(processedMessage, key, defaultValue) { | ||
return processedMessage.messageMetaInformation[key] || defaultValue; | ||
} | ||
} | ||
module.exports = exports = AzureQueueListener; |
@@ -15,7 +15,7 @@ /* | ||
function sendMessageInternal(messageType, messageData, nameofQueue, queueService, options) { | ||
function sendMessageInternal(messageType, messageData, messageMetaInformation, nameofQueue, queueService, options) { | ||
var defer = q.defer(); | ||
queueService.createMessage(nameofQueue, JSON.stringify({messageType: messageType, messageData: messageData}), options, function(error, result, response){ | ||
queueService.createMessage(nameofQueue, JSON.stringify({messageType: messageType, messageData: messageData, messageMetaInformation: messageMetaInformation ? messageMetaInformation : {}}), options, function(error, result, response){ | ||
if(error !== null){ | ||
@@ -51,2 +51,6 @@ defer.reject(error); | ||
self.sendMessage = function(messageType, messageData, options) { | ||
return self.sendMessageWithMetaData(messageType, messageData, null, options); | ||
} | ||
self.sendMessageWithMetaData = function(messageType, messageData, messageMetaInformation, options) { | ||
var deferred = q.defer(); | ||
@@ -60,3 +64,3 @@ | ||
createQueueIfNeeded(queueName, queueService).finally(function() { | ||
sendMessageInternal(messageType, messageData, queueName, queueService, options).then(function() { | ||
sendMessageInternal(messageType, messageData, messageMetaInformation, queueName, queueService, options).then(function() { | ||
deferred.resolve(); | ||
@@ -63,0 +67,0 @@ }).catch(function(error) { |
@@ -7,10 +7,11 @@ var StaticDelayPolicy = require('./static-delay-policy.js'); | ||
self.nextTimeout = function(message) { | ||
var t = staticDelayPolicy.nextTimeout(message); | ||
self.nextTimeout = function(message, queueListener) { | ||
var t = staticDelayPolicy.nextTimeout(message, queueListener); | ||
if (t < 0) { | ||
return t; | ||
} else { | ||
t = message.messageData._delayedJob ? message.messageData._delayedJob.lastTimeout : t; | ||
var jobMetaData = queueListener.getMetaInformation(message, queueListener.runtimeDataKeys.delayedJob); | ||
t = jobMetaData ? jobMetaData.lastTimeout : t; | ||
if (self.count(message) === 0) { | ||
if (self.count(message, queueListener) === 0) { | ||
return t; | ||
@@ -23,4 +24,4 @@ } else { | ||
self.count = function(message) { | ||
return staticDelayPolicy.count(message); | ||
self.count = function(message, queueListener) { | ||
return staticDelayPolicy.count(message, queueListener); | ||
}; | ||
@@ -27,0 +28,0 @@ } |
@@ -11,6 +11,6 @@ function StaticDelayPolicy(timeout, maxRetryCount) { | ||
*/ | ||
self.nextTimeout = function(message) { | ||
self.nextTimeout = function(message, queueListener) { | ||
// get the last count | ||
var currentCount = self.count(message); | ||
var currentCount = self.count(message, queueListener); | ||
@@ -25,4 +25,5 @@ // check the max count | ||
self.count = function(message) { | ||
return message.messageData._delayedJob ? message.messageData._delayedJob.count : 0; | ||
self.count = function(message, queueListener) { | ||
var jobMetaData = queueListener.getMetaInformation(message, queueListener.runtimeDataKeys.delayedJob); | ||
return jobMetaData ? jobMetaData.count : 0; | ||
} | ||
@@ -29,0 +30,0 @@ } |
{ | ||
"name": "azure-queue-client", | ||
"description": "An azure queue client for node.js which allows to consume jobs", | ||
"version": "0.3.0", | ||
"version": "0.4.0", | ||
"author": "Dirk Eisenberg", | ||
@@ -6,0 +6,0 @@ "main": "./lib/azure-queue-client.js", |
@@ -12,2 +12,8 @@ # node-azure-queue-client | ||
## Install | ||
```shell | ||
npm install azure-queue-client --save | ||
``` | ||
## Sample | ||
@@ -40,2 +46,2 @@ | ||
queueListener.listen(qName, qStorageAccount, qStorageSecret, qPolling, null); | ||
``` | ||
``` |
@@ -24,4 +24,4 @@ // config with your settings | ||
// delay the job | ||
console.log("Job was delayed " + exponentialRetryPolicy.count(message) + " times"); | ||
console.log("Delaying the job by " + exponentialRetryPolicy.nextTimeout(message) + " seconds"); | ||
console.log("Job was delayed " + exponentialRetryPolicy.count(message, queueListener) + " times"); | ||
console.log("Delaying the job by " + exponentialRetryPolicy.nextTimeout(message, queueListener) + " seconds"); | ||
return queueListener.delay(message, exponentialRetryPolicy); | ||
@@ -28,0 +28,0 @@ }); |
18815
17.37%390
15.04%46
17.95%