🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
DemoInstallSign in
Socket

azure-queue-client

Package Overview
Dependencies
Maintainers
1
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

azure-queue-client - npm Package Compare versions

Comparing version

to
0.4.0

81

lib/azure-queue-listener.js

@@ -89,2 +89,8 @@ /*

// ensure we have also a metainformation object
if (!message.messageMetaInformation) { message.messageMetaInformation = {}; }
// set the current state to running
self.setMetaInformation(message, self.runtimeDataKeys.state, 'running');
// now call the message handler and check what the result is

@@ -107,2 +113,11 @@ _messageHandler(message).then(function () {

/*
* Defines the default listener meta data elements
*/
self.runtimeDataKeys = {
state: '_state',
error: '_error',
delayedJob: '_delayedJob'
}
self.onMessage = function (handler) {

@@ -136,17 +151,24 @@ _messageHandler = handler;

// process the next tiemout
var nextTimeout = retryPolicy.nextTimeout(processedMessage);
var nextTimeout = retryPolicy.nextTimeout(processedMessage, self);
if (nextTimeout < 0) {
return self.error(new Error("Retry counter for delayed job is exceeded"));
return self.error(new Error("Retry counter for delayed job is exceeded"), processedMessage);
}
// adjust delay job data
if (!processedMessage.messageData._delayedJob) {
processedMessage.messageData._delayedJob = {count: 0, lastTimeout: 0};
}
processedMessage.messageData._delayedJob.count += 1;
processedMessage.messageData._delayedJob.lastTimeout = nextTimeout;
// get the delayed job date
var delayedJobInformation = self.getMetaInformation(processedMessage, self.runtimeDataKeys.delayedJob, {count: 0, lastTimeout: 0});
// increas the values
delayedJobInformation.count += 1;
delayedJobInformation.lastTimeout = nextTimeout;
// write the meta information back
self.setMetaInformation(processedMessage, self.runtimeDataKeys.delayedJob, delayedJobInformation);
// adjust the state
self.setMetaInformation(processedMessage, self.runtimeDataKeys.state, 'delayed');
self.setMetaInformation(processedMessage, self.runtimeDataKeys.error, null);
// enqueue the job
var azureQueue = new AzureQueue(_queueConfig.qName, _queueConfig.qStorageAccountKey, _queueConfig.qStorageAccountSecret);
return azureQueue.sendMessage(processedMessage.messageType, processedMessage.messageData, {visibilityTimeout: nextTimeout});
return azureQueue.sendMessageWithMetaData(processedMessage.messageType, processedMessage.messageData, processedMessage.messageMetaInformation, {visibilityTimeout: nextTimeout});
};

@@ -157,3 +179,11 @@

*/
self.done = function() {
self.done = function(processedMessage) {
// adjust the state
if (processedMessage) {
self.setMetaInformation(processedMessage, self.runtimeDataKeys.state, 'finished');
self.setMetaInformation(processedMessage, self.runtimeDataKeys.error, null);
}
// done
return q.resolve();

@@ -165,7 +195,36 @@ };

*/
self.error = function(error) {
self.error = function(error, processedMessage) {
// adjust the state
if (processedMessage) {
self.setMetaInformation(processedMessage, self.runtimeDataKeys.state, 'failed');
self.setMetaInformation(processedMessage, self.runtimeDataKeys.error, error);
}
// reject
return q.reject(error);
}
/*
* Returns the current state of the job
*/
self.getState = function(processedMessage) {
return self.getMetaInformation(processedMessage, self.runtimeDataKeys.state);
}
/*
* Allows to set meta information from outside
*/
self.setMetaInformation = function(processedMessage, key, valueObject) {
processedMessage.messageMetaInformation[key] = valueObject;
}
/*
* Returns meta information
*/
self.getMetaInformation = function(processedMessage, key, defaultValue) {
return processedMessage.messageMetaInformation[key] || defaultValue;
}
}
module.exports = exports = AzureQueueListener;

10

lib/azure-queue.js

@@ -15,7 +15,7 @@ /*

function sendMessageInternal(messageType, messageData, nameofQueue, queueService, options) {
function sendMessageInternal(messageType, messageData, messageMetaInformation, nameofQueue, queueService, options) {
var defer = q.defer();
queueService.createMessage(nameofQueue, JSON.stringify({messageType: messageType, messageData: messageData}), options, function(error, result, response){
queueService.createMessage(nameofQueue, JSON.stringify({messageType: messageType, messageData: messageData, messageMetaInformation: messageMetaInformation ? messageMetaInformation : {}}), options, function(error, result, response){
if(error !== null){

@@ -51,2 +51,6 @@ defer.reject(error);

self.sendMessage = function(messageType, messageData, options) {
return self.sendMessageWithMetaData(messageType, messageData, null, options);
}
self.sendMessageWithMetaData = function(messageType, messageData, messageMetaInformation, options) {
var deferred = q.defer();

@@ -60,3 +64,3 @@

createQueueIfNeeded(queueName, queueService).finally(function() {
sendMessageInternal(messageType, messageData, queueName, queueService, options).then(function() {
sendMessageInternal(messageType, messageData, messageMetaInformation, queueName, queueService, options).then(function() {
deferred.resolve();

@@ -63,0 +67,0 @@ }).catch(function(error) {

@@ -7,10 +7,11 @@ var StaticDelayPolicy = require('./static-delay-policy.js');

self.nextTimeout = function(message) {
var t = staticDelayPolicy.nextTimeout(message);
self.nextTimeout = function(message, queueListener) {
var t = staticDelayPolicy.nextTimeout(message, queueListener);
if (t < 0) {
return t;
} else {
t = message.messageData._delayedJob ? message.messageData._delayedJob.lastTimeout : t;
var jobMetaData = queueListener.getMetaInformation(message, queueListener.runtimeDataKeys.delayedJob);
t = jobMetaData ? jobMetaData.lastTimeout : t;
if (self.count(message) === 0) {
if (self.count(message, queueListener) === 0) {
return t;

@@ -23,4 +24,4 @@ } else {

self.count = function(message) {
return staticDelayPolicy.count(message);
self.count = function(message, queueListener) {
return staticDelayPolicy.count(message, queueListener);
};

@@ -27,0 +28,0 @@ }

@@ -11,6 +11,6 @@ function StaticDelayPolicy(timeout, maxRetryCount) {

*/
self.nextTimeout = function(message) {
self.nextTimeout = function(message, queueListener) {
// get the last count
var currentCount = self.count(message);
var currentCount = self.count(message, queueListener);

@@ -25,4 +25,5 @@ // check the max count

self.count = function(message) {
return message.messageData._delayedJob ? message.messageData._delayedJob.count : 0;
self.count = function(message, queueListener) {
var jobMetaData = queueListener.getMetaInformation(message, queueListener.runtimeDataKeys.delayedJob);
return jobMetaData ? jobMetaData.count : 0;
}

@@ -29,0 +30,0 @@ }

{
"name": "azure-queue-client",
"description": "An azure queue client for node.js which allows to consume jobs",
"version": "0.3.0",
"version": "0.4.0",
"author": "Dirk Eisenberg",

@@ -6,0 +6,0 @@ "main": "./lib/azure-queue-client.js",

@@ -12,2 +12,8 @@ # node-azure-queue-client

## Install
```shell
npm install azure-queue-client --save
```
## Sample

@@ -40,2 +46,2 @@

queueListener.listen(qName, qStorageAccount, qStorageSecret, qPolling, null);
```
```

@@ -24,4 +24,4 @@ // config with your settings

// delay the job
console.log("Job was delayed " + exponentialRetryPolicy.count(message) + " times");
console.log("Delaying the job by " + exponentialRetryPolicy.nextTimeout(message) + " seconds");
console.log("Job was delayed " + exponentialRetryPolicy.count(message, queueListener) + " times");
console.log("Delaying the job by " + exponentialRetryPolicy.nextTimeout(message, queueListener) + " seconds");
return queueListener.delay(message, exponentialRetryPolicy);

@@ -28,0 +28,0 @@ });