Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

dbqueue

Package Overview
Dependencies
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

dbqueue - npm Package Compare versions

Comparing version 1.0.0 to 1.1.0

60

main.js

@@ -8,2 +8,3 @@ 'use strict';

this.db = attrs.db;
this.table = attrs.table || 'jobs';
this.worker = uuid.v4();

@@ -19,3 +20,4 @@ }

var queue = new DBQueue({
db: pool,
db: pool,
table: options.table_name,
});

@@ -27,10 +29,11 @@

DBQueue.prototype.insert = function(queue_name, data, done) {
var self = this;
var db = this.db;
var self = this;
var db = this.db;
var table = this.table;
var sql = ""
+ " INSERT INTO jobs (queue, data, worker, create_time, update_time)"
+ " INSERT INTO ?? (queue, data, worker, create_time, update_time)"
+ " VALUES (?, ?, 'unassigned', NOW(), NOW())"
;
db.query(sql, [queue_name, data], function(err, rows, fields) {
db.query(sql, [table, queue_name, data], function(err, rows, fields) {
if (err) {

@@ -46,2 +49,3 @@ return done(err);

var db = this.db;
var table = this.table;
var worker_id = this.worker

@@ -51,16 +55,11 @@ var self = this;

var queue_names = queue_input instanceof Array ? queue_input : [queue_input];
var queue_ph = queue_names.map(function() {
return '?';
}).join(',');
var free_job_ids_sql = ""
+ " SELECT *"
+ " FROM jobs"
+ " FROM ??"
+ " WHERE locked_until < NOW()"
+ " AND queue IN (" + queue_ph + ")"
+ " AND queue IN (?)"
+ " ORDER BY RAND()"
+ " LIMIT 1" // eventually query/update batches to be easier on the DB
;
db.query(free_job_ids_sql, queue_names, function(err, rows) {
db.query(free_job_ids_sql, [table, queue_input], function(err, rows) {
if (err) {

@@ -78,8 +77,4 @@ return done(err);

var placeholder_string = rows.map(function() {
return '?';
}).join(',');
var reserve_jobs_sql = ""
+ " UPDATE jobs"
+ " UPDATE ??"
+ " SET"

@@ -89,5 +84,3 @@ + " worker = ?"

+ " , update_time = NOW()"
+ " WHERE id IN ("
+ placeholder_string
+ " )"
+ " WHERE id IN (?)"
+ " AND locked_until < NOW()"

@@ -97,4 +90,3 @@ + " LIMIT 1"

var values = Array.prototype.concat.apply([worker_id, lock_time], job_ids);
db.query(reserve_jobs_sql, values, function(err, result) {
db.query(reserve_jobs_sql, [table, worker_id, lock_time, job_ids], function(err, result) {
if (err) {

@@ -106,11 +98,8 @@ return done(err);

+ " SELECT *"
+ " FROM jobs"
+ " WHERE id IN ("
+ placeholder_string
+ " )"
+ " FROM ??"
+ " WHERE id IN (?)"
+ " AND worker = ?"
;
var values = Array.prototype.concat.apply(job_ids, [worker_id]);
db.query(find_reserved_jobs_sql, values, function(err, rows) {
db.query(find_reserved_jobs_sql, [table, job_ids, worker_id], function(err, rows) {
if (err) {

@@ -144,14 +133,11 @@ return done(err);

DBQueue.prototype.size = function(queue_input, done) {
var db = this.db;
var queue_names = queue_input instanceof Array ? queue_input : [queue_input];
var queue_ph = queue_names.map(function() {
return '?';
}).join(',');
var db = this.db;
var table = this.table;
var total_jobs_sql = ""
+ " SELECT COUNT(1) AS total"
+ " FROM jobs"
+ " WHERE queue IN (" + queue_ph + ")"
+ " FROM ??"
+ " WHERE queue IN (?)"
;
db.query(total_jobs_sql, queue_names, function(err, rows) {
db.query(total_jobs_sql, [table, queue_input], function(err, rows) {
if (err) {

@@ -158,0 +144,0 @@ return done(err);

{
"name": "dbqueue",
"version": "1.0.0",
"version": "1.1.0",
"description": "A minimal, durable DB-based message queue system",

@@ -5,0 +5,0 @@ "homepage": "https://github.com/elliotf/node-dbqueue",

@@ -16,3 +16,5 @@ # DBQueue

host: '127.0.0.1',
port: 3306, // optional, defaults to 3306
user: 'root',
table_name: 'custom_jobs_table', // optional, defaults to `jobs`
password: '',

@@ -19,0 +21,0 @@ database: 'dbqueue_testing_db',

@@ -42,20 +42,44 @@ 'use strict';

// init the DB schema
db.query("SHOW TABLES LIKE 'jobs'", function(err, rows, fields) {
expect(err).to.not.exist();
var table_schema;
if (rows.length) {
return done();
}
function createTable(table_name, done) {
var sql = table_schema.replace('CREATE TABLE jobs', 'CREATE TABLE ' + table_name);
fs.readFile(__dirname + '/../schema.sql', function(err, buffer) {
return db.query(sql, function(err) {
expect(err).to.not.exist();
var sql = buffer.toString();
done();
});
}
db.query(sql, function(err) {
function lazilyCreateTable(table_name, done) {
db.query("SHOW TABLES LIKE '" + table_name + "'", function(err, rows, fields) {
expect(err).to.not.exist();
if (rows.length) {
return done();
}
if (table_schema) {
return createTable(table_name, done);
}
fs.readFile(__dirname + '/../schema.sql', function(err, buffer) {
expect(err).to.not.exist();
return done();
table_schema = buffer.toString();
return createTable(table_name, done);
});
});
}
lazilyCreateTable('jobs', function(err) {
expect(err).to.not.exist();
return lazilyCreateTable('custom_jobs_table', function(err) {
expect(err).to.not.exist();
return done();
});
});

@@ -68,3 +92,7 @@ });

return done();
db.query('DELETE FROM custom_jobs_table', function(err, rows, fields) {
expect(err).to.not.exist();
return done();
});
});

@@ -71,0 +99,0 @@ });

@@ -75,3 +75,3 @@ 'use strict';

}
])
]);

@@ -359,2 +359,57 @@ return done();

});
describe('integration tests', function() {
describe('custom table name', function() {
var queue;
beforeEach(function(done) {
uuid.v4.returns('fake_uuid_for_custom_table');
var custom_config = _.extend({}, helper.test_db_config, {
table_name: 'custom_jobs_table',
});
DBQueue.connect(custom_config, function(err, result) {
expect(err).to.not.exist();
queue = result;
return done();
});
});
context('when provided a custom table name', function() {
it('uses the provided table name', function(done) {
queue.insert('custom_table_queue', 'fake data for custom table queue', function(err) {
expect(err).to.not.exist();
queue.size('custom_table_queue', function(err, size) {
expect(err).to.not.exist();
expect(size).to.equal(1);
db.query('SELECT * FROM jobs', function(err, rows) {
expect(err).to.not.exist();
expect(rows).to.deep.equal([]);
db.query('SELECT * FROM custom_jobs_table', function(err, rows) {
expect(err).to.not.exist();
expect(rows).to.have.length(1);
queue.consume('custom_table_queue', function(err, job, completionCallback) {
expect(err).to.not.exist();
expect(job).to.exist();
expect(job.data).to.equal('fake data for custom table queue');
return done();
});
});
});
});
});
});
});
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc