azure-queue-client
Advanced tools
Comparing version
var exports = module.exports; | ||
exports.AzureQueue = require('./azure-queue.js'); | ||
exports.AzureQueueListener = require('./azure-queue-listener.js'); | ||
exports.AzureQueueListener = require('./azure-queue-listener.js'); | ||
exports.AzureQueueDelayedJobPolicies = { | ||
StaticDelayPolicy: require('./delay-job-policies/static-delay-policy.js'), | ||
ExponentialDelayPolicy: require('./delay-job-policies/exponential-delay-policy.js') | ||
}; |
@@ -14,110 +14,154 @@ /* | ||
var util = require('util'); | ||
var q = require('q'); | ||
// some handlers | ||
var _messageHandler = null; | ||
var _queueConfig = { qName: null, qStorageAccountKey: null, qStorageAccountSecret: null, qPolling: 0 }; | ||
var _logger = null; | ||
function AzureQueueListener() { | ||
var self = this; | ||
// the log message helper | ||
function logMessage() { | ||
// read the arguments | ||
var logLevel = arguments[0]; | ||
var logFormat = arguments[1]; | ||
var logParameters = Array.prototype.slice.call(arguments, 2); | ||
// some handlers | ||
var _messageHandler = null; | ||
var _queueConfig = {qName: null, qStorageAccountKey: null, qStorageAccountSecret: null, qPolling: 0}; | ||
var _logger = null; | ||
// generate the parameter for the format function | ||
var formatParameters = []; | ||
// the log message helper | ||
function logMessage() { | ||
// read the arguments | ||
var logLevel = arguments[0]; | ||
var logFormat = arguments[1]; | ||
var logParameters = Array.prototype.slice.call(arguments, 2); | ||
if (_logger === null ) { | ||
formatParameters.push('[%s] ' + logFormat); | ||
formatParameters.push(logLevel); | ||
} else { | ||
formatParameters.push(logFormat); | ||
// generate the parameter for the format function | ||
var formatParameters = []; | ||
if (_logger === null) { | ||
formatParameters.push('[%s] ' + logFormat); | ||
formatParameters.push(logLevel); | ||
} else { | ||
formatParameters.push(logFormat); | ||
} | ||
// add all other parameters | ||
logParameters.forEach(function (param) { | ||
formatParameters.push(param); | ||
}); | ||
// build the log string | ||
var logString = util.format.apply(this, formatParameters); | ||
// send to the logger | ||
if (_logger != null) { | ||
_logger(logLevel, logString); | ||
} else { | ||
console.log(logString); | ||
} | ||
} | ||
// add all other parameters | ||
logParameters.forEach(function(param) { | ||
formatParameters.push(param); | ||
}); | ||
function scheduleListenerForNewJobs(processedMessage) { | ||
// build the log string | ||
var logString = util.format.apply(this, formatParameters); | ||
// take the default polling | ||
var polling = _queueConfig.qPolling * 1000; | ||
// send to the logger | ||
if (_logger != null) { | ||
_logger(logLevel, logString); | ||
} else { | ||
console.log(logString); | ||
// when we processed a message we should ask directly for new jobs | ||
// and post a nice message | ||
if (processedMessage) { | ||
// reduce the polling to 0 | ||
polling = 0; | ||
// show the nice message | ||
logMessage("Info", ""); | ||
logMessage("Info", "Waiting for new jobs from message bus..."); | ||
} | ||
setTimeout(startCheckingQueuePeriodically, polling); | ||
} | ||
} | ||
function scheduleListenerForNewJobs(processedMessage) { | ||
function startCheckingQueuePeriodically() { | ||
// take the default polling | ||
var polling = _queueConfig.qPolling * 1000; | ||
// try to get a message from our queue | ||
var azureQueue = new AzureQueue(_queueConfig.qName, _queueConfig.qStorageAccountKey, _queueConfig.qStorageAccountSecret); | ||
azureQueue.getMessage().then(function (message) { | ||
// when we processed a message we should ask directly for new jobs | ||
// and post a nice message | ||
if (processedMessage) { | ||
// we got a message check our message handler | ||
if (!_messageHandler || _messageHandler === undefined) { | ||
logMessage("Info", "No message handler configured"); | ||
return; | ||
} | ||
// reduce the polling to 0 | ||
polling = 0; | ||
// now call the message handler and check what the result is | ||
_messageHandler(message).then(function () { | ||
scheduleListenerForNewJobs(true); | ||
}).catch(function (error) { | ||
logMessage("Error", "Message raised error for last message: " + error); | ||
scheduleListenerForNewJobs(true); | ||
}) | ||
}).catch(function (error) { | ||
if (error) { | ||
logMessage("Error", error); | ||
} | ||
// show the nice message | ||
logMessage("Info", ""); | ||
logMessage("Info", "Waiting for new jobs from message bus..."); | ||
// no message found | ||
scheduleListenerForNewJobs(false); | ||
}); | ||
} | ||
setTimeout(startCheckingQueuePeriodically, polling); | ||
} | ||
self.onMessage = function (handler) { | ||
_messageHandler = handler; | ||
}; | ||
function startCheckingQueuePeriodically() { | ||
self.listen = function (qName, qStorageAccountKey, qStorageAccountSecret, qPolling, qLogger) { | ||
// try to get a message from our queue | ||
var azureQueue = new AzureQueue(_queueConfig.qName, _queueConfig.qStorageAccountKey, _queueConfig.qStorageAccountSecret); | ||
azureQueue.getMessage().then(function(message) { | ||
// set the config | ||
_queueConfig.qName = qName; | ||
_queueConfig.qStorageAccountKey = qStorageAccountKey; | ||
_queueConfig.qStorageAccountSecret = qStorageAccountSecret; | ||
_queueConfig.qPolling = qPolling; | ||
// we got a message check our message handler | ||
if (!_messageHandler ||_messageHandler === undefined) { | ||
logMessage("Info", "No message handler configured"); | ||
return; | ||
} | ||
// set the logger | ||
_logger = qLogger; | ||
// now call the message handler and check what the result is | ||
_messageHandler(message).then(function() { | ||
scheduleListenerForNewJobs(true); | ||
}).catch(function(error) { | ||
logMessage("Error", "Message raised error for last message: " + error); | ||
scheduleListenerForNewJobs(true); | ||
}) | ||
}).catch(function(error) { | ||
if (error) { | ||
logMessage("Error", error); | ||
} | ||
// log some status | ||
logMessage('Info', 'Starting polling on job queue: %s (Account: %s)', qName, qStorageAccountKey); | ||
// no message found | ||
scheduleListenerForNewJobs(false); | ||
}); | ||
} | ||
// start the polling | ||
startCheckingQueuePeriodically(); | ||
}; | ||
exports.onMessage = function(handler) { | ||
_messageHandler = handler; | ||
}; | ||
/* | ||
* Allows to delay a job for a specific amount of seconds | ||
*/ | ||
self.delay = function (processedMessage, retryPolicy) { | ||
exports.listen = function(qName, qStorageAccountKey, qStorageAccountSecret, qPolling, qLogger) { | ||
// process the next tiemout | ||
var nextTimeout = retryPolicy.nextTimeout(processedMessage); | ||
if (nextTimeout < 0) { | ||
return self.error(new Error("Retry counter for delayed job is exceeded")); | ||
} | ||
// set the config | ||
_queueConfig.qName = qName; | ||
_queueConfig.qStorageAccountKey = qStorageAccountKey; | ||
_queueConfig.qStorageAccountSecret = qStorageAccountSecret; | ||
_queueConfig.qPolling = qPolling; | ||
// adjust delay job data | ||
if (!processedMessage.messageData._delayedJob) { | ||
processedMessage.messageData._delayedJob = {count: 0, lastTimeout: 0}; | ||
} | ||
processedMessage.messageData._delayedJob.count += 1; | ||
processedMessage.messageData._delayedJob.lastTimeout = nextTimeout; | ||
// set the logger | ||
_logger = qLogger; | ||
// enqueue the job | ||
var azureQueue = new AzureQueue(_queueConfig.qName, _queueConfig.qStorageAccountKey, _queueConfig.qStorageAccountSecret); | ||
return azureQueue.sendMessage(processedMessage.messageType, processedMessage.messageData, {visibilityTimeout: nextTimeout}); | ||
}; | ||
// log some status | ||
logMessage('Info', 'Starting polling on job queue: %s (Account: %s)', qName, qStorageAccountKey); | ||
/* | ||
* Notifies the infrastructure that the job is done without errors | ||
*/ | ||
self.done = function() { | ||
return q.resolve(); | ||
}; | ||
// start the polling | ||
startCheckingQueuePeriodically(); | ||
}; | ||
/* | ||
* Notifies the infrastructure that the job is aborted | ||
*/ | ||
self.error = function(error) { | ||
return q.reject(error); | ||
} | ||
} | ||
module.exports = exports = AzureQueueListener; |
@@ -15,6 +15,7 @@ /* | ||
function sendMessageInternal(messageType, messageData, nameofQueue, queueService) { | ||
function sendMessageInternal(messageType, messageData, nameofQueue, queueService, options) { | ||
var defer = q.defer(); | ||
queueService.createMessage(nameofQueue, JSON.stringify({messageType: messageType, messageData: messageData}), function(error, result, response){ | ||
queueService.createMessage(nameofQueue, JSON.stringify({messageType: messageType, messageData: messageData}), options, function(error, result, response){ | ||
if(error !== null){ | ||
@@ -49,3 +50,3 @@ defer.reject(error); | ||
self.sendMessage = function(messageType, messageData) { | ||
self.sendMessage = function(messageType, messageData, options) { | ||
var deferred = q.defer(); | ||
@@ -59,3 +60,3 @@ | ||
createQueueIfNeeded(queueName, queueService).finally(function() { | ||
sendMessageInternal(messageType, messageData, queueName, queueService).then(function() { | ||
sendMessageInternal(messageType, messageData, queueName, queueService, options).then(function() { | ||
deferred.resolve(); | ||
@@ -62,0 +63,0 @@ }).catch(function(error) { |
{ | ||
"name": "azure-queue-client", | ||
"description": "An azure queue client for node.js which allows to consume jobs", | ||
"version": "0.2.1", | ||
"version": "0.3.0", | ||
"author": "Dirk Eisenberg", | ||
@@ -6,0 +6,0 @@ "main": "./lib/azure-queue-client.js", |
@@ -6,4 +6,7 @@ # node-azure-queue-client | ||
* simple to listen a queue | ||
* prepared for job routing | ||
* simple to enqueue jobs | ||
* simple to delay jobs | ||
* prepared for job routing | ||
* support for wrapped xml from Azure Scheduler | ||
* listening on multiple queues supported | ||
@@ -19,14 +22,16 @@ ## Sample | ||
// load the modules | ||
var queueListener = require('../lib/azure-queue-listener.js'); | ||
var q = require("Q"); | ||
// load the module | ||
var azureQueueClient = new require('azure-queue-client'); | ||
// create the listener | ||
var queueListener = new azureQueueClient.AzureQueueListener(); | ||
// establish a message handler | ||
queueListener.onMessage(function(message) { | ||
var defer = q.defer(); | ||
// log something | ||
console.log('Message received: ' + JSON.stringify(message)); | ||
defer.resolve(); | ||
return defer.promise; | ||
// job is finished without errors | ||
return queueListener.done(); | ||
}); | ||
@@ -36,2 +41,2 @@ | ||
queueListener.listen(qName, qStorageAccount, qStorageSecret, qPolling, null); | ||
``` | ||
``` |
@@ -7,14 +7,16 @@ // config with your settings | ||
// load the modules | ||
var queueListener = require('../lib/azure-queue-listener.js'); | ||
var q = require("Q"); | ||
// load the module | ||
var azureQueueClient = new require('../lib/azure-queue-client.js'); | ||
// generate the listener | ||
var queueListener = new azureQueueClient.AzureQueueListener(); | ||
// establish a message handler | ||
queueListener.onMessage(function(message) { | ||
var defer = q.defer(); | ||
// log something | ||
console.log('Message received: ' + JSON.stringify(message)); | ||
defer.resolve(); | ||
return defer.promise; | ||
// done without errors | ||
return queueListener.done(); | ||
}); | ||
@@ -21,0 +23,0 @@ |
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
16030
46.02%10
42.86%339
49.34%39
11.43%2
Infinity%