🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
DemoInstallSign in
Socket

azure-queue-client

Package Overview
Dependencies
Maintainers
1
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

azure-queue-client - npm Package Compare versions

Comparing version

to
0.3.0

lib/delay-job-policies/exponential-delay-policy.js

7

lib/azure-queue-client.js
var exports = module.exports;
exports.AzureQueue = require('./azure-queue.js');
exports.AzureQueueListener = require('./azure-queue-listener.js');
exports.AzureQueueListener = require('./azure-queue-listener.js');
exports.AzureQueueDelayedJobPolicies = {
StaticDelayPolicy: require('./delay-job-policies/static-delay-policy.js'),
ExponentialDelayPolicy: require('./delay-job-policies/exponential-delay-policy.js')
};

208

lib/azure-queue-listener.js

@@ -14,110 +14,154 @@ /*

var util = require('util');
var q = require('q');
// some handlers
var _messageHandler = null;
var _queueConfig = { qName: null, qStorageAccountKey: null, qStorageAccountSecret: null, qPolling: 0 };
var _logger = null;
function AzureQueueListener() {
var self = this;
// the log message helper
function logMessage() {
// read the arguments
var logLevel = arguments[0];
var logFormat = arguments[1];
var logParameters = Array.prototype.slice.call(arguments, 2);
// some handlers
var _messageHandler = null;
var _queueConfig = {qName: null, qStorageAccountKey: null, qStorageAccountSecret: null, qPolling: 0};
var _logger = null;
// generate the parameter for the format function
var formatParameters = [];
// the log message helper
function logMessage() {
// read the arguments
var logLevel = arguments[0];
var logFormat = arguments[1];
var logParameters = Array.prototype.slice.call(arguments, 2);
if (_logger === null ) {
formatParameters.push('[%s] ' + logFormat);
formatParameters.push(logLevel);
} else {
formatParameters.push(logFormat);
// generate the parameter for the format function
var formatParameters = [];
if (_logger === null) {
formatParameters.push('[%s] ' + logFormat);
formatParameters.push(logLevel);
} else {
formatParameters.push(logFormat);
}
// add all other parameters
logParameters.forEach(function (param) {
formatParameters.push(param);
});
// build the log string
var logString = util.format.apply(this, formatParameters);
// send to the logger
if (_logger != null) {
_logger(logLevel, logString);
} else {
console.log(logString);
}
}
// add all other parameters
logParameters.forEach(function(param) {
formatParameters.push(param);
});
function scheduleListenerForNewJobs(processedMessage) {
// build the log string
var logString = util.format.apply(this, formatParameters);
// take the default polling
var polling = _queueConfig.qPolling * 1000;
// send to the logger
if (_logger != null) {
_logger(logLevel, logString);
} else {
console.log(logString);
// when we processed a message we should ask directly for new jobs
// and post a nice message
if (processedMessage) {
// reduce the polling to 0
polling = 0;
// show the nice message
logMessage("Info", "");
logMessage("Info", "Waiting for new jobs from message bus...");
}
setTimeout(startCheckingQueuePeriodically, polling);
}
}
function scheduleListenerForNewJobs(processedMessage) {
function startCheckingQueuePeriodically() {
// take the default polling
var polling = _queueConfig.qPolling * 1000;
// try to get a message from our queue
var azureQueue = new AzureQueue(_queueConfig.qName, _queueConfig.qStorageAccountKey, _queueConfig.qStorageAccountSecret);
azureQueue.getMessage().then(function (message) {
// when we processed a message we should ask directly for new jobs
// and post a nice message
if (processedMessage) {
// we got a message check our message handler
if (!_messageHandler || _messageHandler === undefined) {
logMessage("Info", "No message handler configured");
return;
}
// reduce the polling to 0
polling = 0;
// now call the message handler and check what the result is
_messageHandler(message).then(function () {
scheduleListenerForNewJobs(true);
}).catch(function (error) {
logMessage("Error", "Message raised error for last message: " + error);
scheduleListenerForNewJobs(true);
})
}).catch(function (error) {
if (error) {
logMessage("Error", error);
}
// show the nice message
logMessage("Info", "");
logMessage("Info", "Waiting for new jobs from message bus...");
// no message found
scheduleListenerForNewJobs(false);
});
}
setTimeout(startCheckingQueuePeriodically, polling);
}
self.onMessage = function (handler) {
_messageHandler = handler;
};
function startCheckingQueuePeriodically() {
self.listen = function (qName, qStorageAccountKey, qStorageAccountSecret, qPolling, qLogger) {
// try to get a message from our queue
var azureQueue = new AzureQueue(_queueConfig.qName, _queueConfig.qStorageAccountKey, _queueConfig.qStorageAccountSecret);
azureQueue.getMessage().then(function(message) {
// set the config
_queueConfig.qName = qName;
_queueConfig.qStorageAccountKey = qStorageAccountKey;
_queueConfig.qStorageAccountSecret = qStorageAccountSecret;
_queueConfig.qPolling = qPolling;
// we got a message check our message handler
if (!_messageHandler ||_messageHandler === undefined) {
logMessage("Info", "No message handler configured");
return;
}
// set the logger
_logger = qLogger;
// now call the message handler and check what the result is
_messageHandler(message).then(function() {
scheduleListenerForNewJobs(true);
}).catch(function(error) {
logMessage("Error", "Message raised error for last message: " + error);
scheduleListenerForNewJobs(true);
})
}).catch(function(error) {
if (error) {
logMessage("Error", error);
}
// log some status
logMessage('Info', 'Starting polling on job queue: %s (Account: %s)', qName, qStorageAccountKey);
// no message found
scheduleListenerForNewJobs(false);
});
}
// start the polling
startCheckingQueuePeriodically();
};
exports.onMessage = function(handler) {
_messageHandler = handler;
};
/*
* Allows to delay a job for a specific amount of seconds
*/
self.delay = function (processedMessage, retryPolicy) {
exports.listen = function(qName, qStorageAccountKey, qStorageAccountSecret, qPolling, qLogger) {
// process the next tiemout
var nextTimeout = retryPolicy.nextTimeout(processedMessage);
if (nextTimeout < 0) {
return self.error(new Error("Retry counter for delayed job is exceeded"));
}
// set the config
_queueConfig.qName = qName;
_queueConfig.qStorageAccountKey = qStorageAccountKey;
_queueConfig.qStorageAccountSecret = qStorageAccountSecret;
_queueConfig.qPolling = qPolling;
// adjust delay job data
if (!processedMessage.messageData._delayedJob) {
processedMessage.messageData._delayedJob = {count: 0, lastTimeout: 0};
}
processedMessage.messageData._delayedJob.count += 1;
processedMessage.messageData._delayedJob.lastTimeout = nextTimeout;
// set the logger
_logger = qLogger;
// enqueue the job
var azureQueue = new AzureQueue(_queueConfig.qName, _queueConfig.qStorageAccountKey, _queueConfig.qStorageAccountSecret);
return azureQueue.sendMessage(processedMessage.messageType, processedMessage.messageData, {visibilityTimeout: nextTimeout});
};
// log some status
logMessage('Info', 'Starting polling on job queue: %s (Account: %s)', qName, qStorageAccountKey);
/*
* Notifies the infrastructure that the job is done without errors
*/
self.done = function() {
return q.resolve();
};
// start the polling
startCheckingQueuePeriodically();
};
/*
* Notifies the infrastructure that the job is aborted
*/
self.error = function(error) {
return q.reject(error);
}
}
module.exports = exports = AzureQueueListener;

@@ -15,6 +15,7 @@ /*

function sendMessageInternal(messageType, messageData, nameofQueue, queueService) {
function sendMessageInternal(messageType, messageData, nameofQueue, queueService, options) {
var defer = q.defer();
queueService.createMessage(nameofQueue, JSON.stringify({messageType: messageType, messageData: messageData}), function(error, result, response){
queueService.createMessage(nameofQueue, JSON.stringify({messageType: messageType, messageData: messageData}), options, function(error, result, response){
if(error !== null){

@@ -49,3 +50,3 @@ defer.reject(error);

self.sendMessage = function(messageType, messageData) {
self.sendMessage = function(messageType, messageData, options) {
var deferred = q.defer();

@@ -59,3 +60,3 @@

createQueueIfNeeded(queueName, queueService).finally(function() {
sendMessageInternal(messageType, messageData, queueName, queueService).then(function() {
sendMessageInternal(messageType, messageData, queueName, queueService, options).then(function() {
deferred.resolve();

@@ -62,0 +63,0 @@ }).catch(function(error) {

{
"name": "azure-queue-client",
"description": "An azure queue client for node.js which allows to consume jobs",
"version": "0.2.1",
"version": "0.3.0",
"author": "Dirk Eisenberg",

@@ -6,0 +6,0 @@ "main": "./lib/azure-queue-client.js",

@@ -6,4 +6,7 @@ # node-azure-queue-client

* simple to listen a queue
* prepared for job routing
* simple to enqueue jobs
* simple to delay jobs
* prepared for job routing
* support for wrapped xml from Azure Scheduler
* listening on multiple queues supported

@@ -19,14 +22,16 @@ ## Sample

// load the modules
var queueListener = require('../lib/azure-queue-listener.js');
var q = require("Q");
// load the module
var azureQueueClient = new require('azure-queue-client');
// create the listener
var queueListener = new azureQueueClient.AzureQueueListener();
// establish a message handler
queueListener.onMessage(function(message) {
var defer = q.defer();
// log something
console.log('Message received: ' + JSON.stringify(message));
defer.resolve();
return defer.promise;
// job is finished without errors
return queueListener.done();
});

@@ -36,2 +41,2 @@

queueListener.listen(qName, qStorageAccount, qStorageSecret, qPolling, null);
```
```

@@ -7,14 +7,16 @@ // config with your settings

// load the modules
var queueListener = require('../lib/azure-queue-listener.js');
var q = require("Q");
// load the module
var azureQueueClient = new require('../lib/azure-queue-client.js');
// generate the listener
var queueListener = new azureQueueClient.AzureQueueListener();
// establish a message handler
queueListener.onMessage(function(message) {
var defer = q.defer();
// log something
console.log('Message received: ' + JSON.stringify(message));
defer.resolve();
return defer.promise;
// done without errors
return queueListener.done();
});

@@ -21,0 +23,0 @@