Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

jqueue

Package Overview
Dependencies
Maintainers
4
Versions
24
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

jqueue - npm Package Compare versions

Comparing version 0.1.8 to 0.1.9

2

package.json
{
"name": "jqueue",
"version": "0.1.8",
"version": "0.1.9",
"description": "MySQL backed plugable Node.js job queue based on the Beanstalk Job Lifecycle",

@@ -5,0 +5,0 @@ "devDependencies": {

@@ -7,3 +7,3 @@ var callBack = require('./callback');

function Queue (dataSource, name) {
function Queue(dataSource, name) {
var dataSource = dataSource;

@@ -14,4 +14,4 @@ var self = this;

function execQuery(query, params, cb) {
dataSource.getConnection(function(error, connection) {
if(error) {
dataSource.getConnection(function (error, connection) {
if (error) {
cb(error);

@@ -25,7 +25,7 @@ } else {

this.getName = function() {
this.getName = function () {
return name;
};
this.put = function(message, parameter1, parameter2, parameter3) {
this.put = function (message, parameter1, parameter2, parameter3) {
var delay, priority, cb;

@@ -48,5 +48,5 @@ switch (arguments.length) {

var queueMessage = new Message(dataSource, message, name, delay, priority);
writeMessage(queueMessage, function(error, data) {
writeMessage(queueMessage, function (error, data) {
var insertedId = undefined;
if(!error) {
if (!error) {
insertedId = data.insertId;

@@ -58,3 +58,3 @@ }

this.reserve = function(parameter1, parameter2) {
this.reserve = function (parameter1, parameter2) {
var cb, timeToRun;

@@ -72,5 +72,5 @@ switch (arguments.length) {

var version = Math.floor((Math.random() * 100000) + 1);
retrieveMessage(name, timeToRun, version, function(error, data) {
retrieveMessage(name, timeToRun, version, function (error, data) {
var message = undefined;
if(!error && data && data.length) {
if (!error && data && data.length) {
var messageObject = data[0];

@@ -85,3 +85,3 @@ message = new Message(dataSource, messageObject.data, name, 0,

this.watch = function(parameter1, parameter2, parameter3) {
this.watch = function (parameter1, parameter2, parameter3) {
var timeout, timeToRun, cb;

@@ -104,9 +104,10 @@ switch (arguments.length) {

var watcher = {
cancel: function() {}
cancel: function () {
}
};
self.reserve(timeToRun, function(error, data) {
if(!error && !data) {
var interval = setInterval(function() {
self.reserve(timeToRun, function(error, data) {
if(error || data) {
self.reserve(timeToRun, function (error, data) {
if (!error && !data) {
var interval = setInterval(function () {
self.reserve(timeToRun, function (error, data) {
if (error || data) {
clearInterval(interval);

@@ -128,3 +129,3 @@ callBack(cb, error, data);

this.kick = function(parameter1, parameter2, parameter3) {
this.kick = function (parameter1, parameter2, parameter3) {
var max, delay, cb;

@@ -146,3 +147,3 @@ switch (arguments.length) {

var callback = function(error, data) {
var callback = function (error, data) {
data = data ? data.affectedRows : undefined;

@@ -153,3 +154,3 @@ callBack(cb, error, data);

delay = delay || 0;
if(max) {
if (max) {
kickMessages(name, max, delay, callback);

@@ -161,3 +162,3 @@ } else {

this.kickMessage = function(id, parameter1, parameter2) {
this.kickMessage = function (id, parameter1, parameter2) {
var delay, cb;

@@ -174,3 +175,3 @@ switch (arguments.length) {

delay = delay || 0;
kickOneMessage(name, id, delay, function(error, data){
kickOneMessage(name, id, delay, function (error, data) {
callBack(cb, error, data);

@@ -180,3 +181,3 @@ })

function writeMessage (message, cb) {
function writeMessage(message, cb) {
execQuery('INSERT INTO ?? (status, data, priority, date_time, created_at, modified_at) \

@@ -187,24 +188,39 @@ VALUES (?,?,?,DATE_ADD(CURRENT_TIMESTAMP, INTERVAL ? SECOND), now(), now())',

function retrieveMessage (queueName, timeToRun, version, cb) {
dataSource.getConnection(function(error, connection) {
if(error) {
function retrieveMessage(queueName, timeToRun, version, cb) {
dataSource.getConnection(function (error, connection) {
if (error) {
cb(error);
} else {
connection.query('SELECT * FROM ?? \
connection.beginTransaction(function (err) {
if (err) {
cb(err);
} else {
connection.query('SELECT * FROM ?? \
WHERE (date_time <= CURRENT_TIMESTAMP AND status = ?) OR (time_to_run IS NOT NULL\
AND time_to_run < CURRENT_TIMESTAMP AND status = ?) ORDER BY priority desc,\
date_time asc LIMIT 1 FOR UPDATE', [queueName, 'ready', 'reserved'], function (error, data) {
var message = data;
if (!error && message && message.length) {
message[0].status = 'reserved';
connection.query('UPDATE ?? SET status = ?, version = ?, \
var message = data;
if (!error && message && message.length) {
message[0].status = 'reserved';
connection.query('UPDATE ?? SET status = ?, version = ?, \
time_to_run = DATE_ADD(CURRENT_TIMESTAMP, INTERVAL ? SECOND) WHERE id = ?',
[queueName, message[0].status, version, timeToRun, message[0].id], function (error) {
cb(error, message);
});
} else {
cb(error, message);
[queueName, message[0].status, version, timeToRun, message[0].id], function (error) {
connection.commit(function (err) {
if (!err) {
connection.release();
}
cb(error, message);
});
});
} else {
connection.commit(function (err) {
if (!err) {
connection.release();
}
cb(error, message);
});
}
});
}
});
connection.release();
}

@@ -214,3 +230,3 @@ });

function kickMessages (queueName, max, delay, cb) {
function kickMessages(queueName, max, delay, cb) {
execQuery('UPDATE ?? SET status = ?, date_time = DATE_ADD(date_time, INTERVAL ? SECOND) \

@@ -220,3 +236,3 @@ WHERE status = ? ORDER BY date_time asc LIMIT ?', [queueName, 'ready', delay, 'buried', max], cb);

function kickOneMessage (queueName, id, delay, cb) {
function kickOneMessage(queueName, id, delay, cb) {
execQuery('UPDATE ?? SET status = ?, date_time = DATE_ADD(date_time, INTERVAL ? SECOND) \

@@ -226,3 +242,3 @@ WHERE status = ? AND id = ?', [queueName, 'ready', delay, 'buried', id], cb);

function kickAllMessages (queueName, delay, cb) {
function kickAllMessages(queueName, delay, cb) {
execQuery('UPDATE ?? SET status = ?, date_time = DATE_ADD(date_time, INTERVAL ? SECOND) WHERE status = ?',

@@ -229,0 +245,0 @@ [queueName, 'ready', delay, 'buried'], cb);

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc