Comparing version 0.1.8 to 0.1.9
{ | ||
"name": "jqueue", | ||
"version": "0.1.8", | ||
"version": "0.1.9", | ||
"description": "MySQL backed plugable Node.js job queue based on the Beanstalk Job Lifecycle", | ||
@@ -5,0 +5,0 @@ "devDependencies": { |
@@ -7,3 +7,3 @@ var callBack = require('./callback'); | ||
function Queue (dataSource, name) { | ||
function Queue(dataSource, name) { | ||
var dataSource = dataSource; | ||
@@ -14,4 +14,4 @@ var self = this; | ||
function execQuery(query, params, cb) { | ||
dataSource.getConnection(function(error, connection) { | ||
if(error) { | ||
dataSource.getConnection(function (error, connection) { | ||
if (error) { | ||
cb(error); | ||
@@ -25,7 +25,7 @@ } else { | ||
this.getName = function() { | ||
this.getName = function () { | ||
return name; | ||
}; | ||
this.put = function(message, parameter1, parameter2, parameter3) { | ||
this.put = function (message, parameter1, parameter2, parameter3) { | ||
var delay, priority, cb; | ||
@@ -48,5 +48,5 @@ switch (arguments.length) { | ||
var queueMessage = new Message(dataSource, message, name, delay, priority); | ||
writeMessage(queueMessage, function(error, data) { | ||
writeMessage(queueMessage, function (error, data) { | ||
var insertedId = undefined; | ||
if(!error) { | ||
if (!error) { | ||
insertedId = data.insertId; | ||
@@ -58,3 +58,3 @@ } | ||
this.reserve = function(parameter1, parameter2) { | ||
this.reserve = function (parameter1, parameter2) { | ||
var cb, timeToRun; | ||
@@ -72,5 +72,5 @@ switch (arguments.length) { | ||
var version = Math.floor((Math.random() * 100000) + 1); | ||
retrieveMessage(name, timeToRun, version, function(error, data) { | ||
retrieveMessage(name, timeToRun, version, function (error, data) { | ||
var message = undefined; | ||
if(!error && data && data.length) { | ||
if (!error && data && data.length) { | ||
var messageObject = data[0]; | ||
@@ -85,3 +85,3 @@ message = new Message(dataSource, messageObject.data, name, 0, | ||
this.watch = function(parameter1, parameter2, parameter3) { | ||
this.watch = function (parameter1, parameter2, parameter3) { | ||
var timeout, timeToRun, cb; | ||
@@ -104,9 +104,10 @@ switch (arguments.length) { | ||
var watcher = { | ||
cancel: function() {} | ||
cancel: function () { | ||
} | ||
}; | ||
self.reserve(timeToRun, function(error, data) { | ||
if(!error && !data) { | ||
var interval = setInterval(function() { | ||
self.reserve(timeToRun, function(error, data) { | ||
if(error || data) { | ||
self.reserve(timeToRun, function (error, data) { | ||
if (!error && !data) { | ||
var interval = setInterval(function () { | ||
self.reserve(timeToRun, function (error, data) { | ||
if (error || data) { | ||
clearInterval(interval); | ||
@@ -128,3 +129,3 @@ callBack(cb, error, data); | ||
this.kick = function(parameter1, parameter2, parameter3) { | ||
this.kick = function (parameter1, parameter2, parameter3) { | ||
var max, delay, cb; | ||
@@ -146,3 +147,3 @@ switch (arguments.length) { | ||
var callback = function(error, data) { | ||
var callback = function (error, data) { | ||
data = data ? data.affectedRows : undefined; | ||
@@ -153,3 +154,3 @@ callBack(cb, error, data); | ||
delay = delay || 0; | ||
if(max) { | ||
if (max) { | ||
kickMessages(name, max, delay, callback); | ||
@@ -161,3 +162,3 @@ } else { | ||
this.kickMessage = function(id, parameter1, parameter2) { | ||
this.kickMessage = function (id, parameter1, parameter2) { | ||
var delay, cb; | ||
@@ -174,3 +175,3 @@ switch (arguments.length) { | ||
delay = delay || 0; | ||
kickOneMessage(name, id, delay, function(error, data){ | ||
kickOneMessage(name, id, delay, function (error, data) { | ||
callBack(cb, error, data); | ||
@@ -180,3 +181,3 @@ }) | ||
function writeMessage (message, cb) { | ||
function writeMessage(message, cb) { | ||
execQuery('INSERT INTO ?? (status, data, priority, date_time, created_at, modified_at) \ | ||
@@ -187,24 +188,39 @@ VALUES (?,?,?,DATE_ADD(CURRENT_TIMESTAMP, INTERVAL ? SECOND), now(), now())', | ||
function retrieveMessage (queueName, timeToRun, version, cb) { | ||
dataSource.getConnection(function(error, connection) { | ||
if(error) { | ||
function retrieveMessage(queueName, timeToRun, version, cb) { | ||
dataSource.getConnection(function (error, connection) { | ||
if (error) { | ||
cb(error); | ||
} else { | ||
connection.query('SELECT * FROM ?? \ | ||
connection.beginTransaction(function (err) { | ||
if (err) { | ||
cb(err); | ||
} else { | ||
connection.query('SELECT * FROM ?? \ | ||
WHERE (date_time <= CURRENT_TIMESTAMP AND status = ?) OR (time_to_run IS NOT NULL\ | ||
AND time_to_run < CURRENT_TIMESTAMP AND status = ?) ORDER BY priority desc,\ | ||
date_time asc LIMIT 1 FOR UPDATE', [queueName, 'ready', 'reserved'], function (error, data) { | ||
var message = data; | ||
if (!error && message && message.length) { | ||
message[0].status = 'reserved'; | ||
connection.query('UPDATE ?? SET status = ?, version = ?, \ | ||
var message = data; | ||
if (!error && message && message.length) { | ||
message[0].status = 'reserved'; | ||
connection.query('UPDATE ?? SET status = ?, version = ?, \ | ||
time_to_run = DATE_ADD(CURRENT_TIMESTAMP, INTERVAL ? SECOND) WHERE id = ?', | ||
[queueName, message[0].status, version, timeToRun, message[0].id], function (error) { | ||
cb(error, message); | ||
}); | ||
} else { | ||
cb(error, message); | ||
[queueName, message[0].status, version, timeToRun, message[0].id], function (error) { | ||
connection.commit(function (err) { | ||
if (!err) { | ||
connection.release(); | ||
} | ||
cb(error, message); | ||
}); | ||
}); | ||
} else { | ||
connection.commit(function (err) { | ||
if (!err) { | ||
connection.release(); | ||
} | ||
cb(error, message); | ||
}); | ||
} | ||
}); | ||
} | ||
}); | ||
connection.release(); | ||
} | ||
@@ -214,3 +230,3 @@ }); | ||
function kickMessages (queueName, max, delay, cb) { | ||
function kickMessages(queueName, max, delay, cb) { | ||
execQuery('UPDATE ?? SET status = ?, date_time = DATE_ADD(date_time, INTERVAL ? SECOND) \ | ||
@@ -220,3 +236,3 @@ WHERE status = ? ORDER BY date_time asc LIMIT ?', [queueName, 'ready', delay, 'buried', max], cb); | ||
function kickOneMessage (queueName, id, delay, cb) { | ||
function kickOneMessage(queueName, id, delay, cb) { | ||
execQuery('UPDATE ?? SET status = ?, date_time = DATE_ADD(date_time, INTERVAL ? SECOND) \ | ||
@@ -226,3 +242,3 @@ WHERE status = ? AND id = ?', [queueName, 'ready', delay, 'buried', id], cb); | ||
function kickAllMessages (queueName, delay, cb) { | ||
function kickAllMessages(queueName, delay, cb) { | ||
execQuery('UPDATE ?? SET status = ?, date_time = DATE_ADD(date_time, INTERVAL ? SECOND) WHERE status = ?', | ||
@@ -229,0 +245,0 @@ [queueName, 'ready', delay, 'buried'], cb); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
54797
1379
0