Comparing version 1.0.0 to 1.1.0
60
main.js
@@ -8,2 +8,3 @@ 'use strict'; | ||
this.db = attrs.db; | ||
this.table = attrs.table || 'jobs'; | ||
this.worker = uuid.v4(); | ||
@@ -19,3 +20,4 @@ } | ||
var queue = new DBQueue({ | ||
db: pool, | ||
db: pool, | ||
table: options.table_name, | ||
}); | ||
@@ -27,10 +29,11 @@ | ||
DBQueue.prototype.insert = function(queue_name, data, done) { | ||
var self = this; | ||
var db = this.db; | ||
var self = this; | ||
var db = this.db; | ||
var table = this.table; | ||
var sql = "" | ||
+ " INSERT INTO jobs (queue, data, worker, create_time, update_time)" | ||
+ " INSERT INTO ?? (queue, data, worker, create_time, update_time)" | ||
+ " VALUES (?, ?, 'unassigned', NOW(), NOW())" | ||
; | ||
db.query(sql, [queue_name, data], function(err, rows, fields) { | ||
db.query(sql, [table, queue_name, data], function(err, rows, fields) { | ||
if (err) { | ||
@@ -46,2 +49,3 @@ return done(err); | ||
var db = this.db; | ||
var table = this.table; | ||
var worker_id = this.worker | ||
@@ -51,16 +55,11 @@ var self = this; | ||
var queue_names = queue_input instanceof Array ? queue_input : [queue_input]; | ||
var queue_ph = queue_names.map(function() { | ||
return '?'; | ||
}).join(','); | ||
var free_job_ids_sql = "" | ||
+ " SELECT *" | ||
+ " FROM jobs" | ||
+ " FROM ??" | ||
+ " WHERE locked_until < NOW()" | ||
+ " AND queue IN (" + queue_ph + ")" | ||
+ " AND queue IN (?)" | ||
+ " ORDER BY RAND()" | ||
+ " LIMIT 1" // eventually query/update batches to be easier on the DB | ||
; | ||
db.query(free_job_ids_sql, queue_names, function(err, rows) { | ||
db.query(free_job_ids_sql, [table, queue_input], function(err, rows) { | ||
if (err) { | ||
@@ -78,8 +77,4 @@ return done(err); | ||
var placeholder_string = rows.map(function() { | ||
return '?'; | ||
}).join(','); | ||
var reserve_jobs_sql = "" | ||
+ " UPDATE jobs" | ||
+ " UPDATE ??" | ||
+ " SET" | ||
@@ -89,5 +84,3 @@ + " worker = ?" | ||
+ " , update_time = NOW()" | ||
+ " WHERE id IN (" | ||
+ placeholder_string | ||
+ " )" | ||
+ " WHERE id IN (?)" | ||
+ " AND locked_until < NOW()" | ||
@@ -97,4 +90,3 @@ + " LIMIT 1" | ||
var values = Array.prototype.concat.apply([worker_id, lock_time], job_ids); | ||
db.query(reserve_jobs_sql, values, function(err, result) { | ||
db.query(reserve_jobs_sql, [table, worker_id, lock_time, job_ids], function(err, result) { | ||
if (err) { | ||
@@ -106,11 +98,8 @@ return done(err); | ||
+ " SELECT *" | ||
+ " FROM jobs" | ||
+ " WHERE id IN (" | ||
+ placeholder_string | ||
+ " )" | ||
+ " FROM ??" | ||
+ " WHERE id IN (?)" | ||
+ " AND worker = ?" | ||
; | ||
var values = Array.prototype.concat.apply(job_ids, [worker_id]); | ||
db.query(find_reserved_jobs_sql, values, function(err, rows) { | ||
db.query(find_reserved_jobs_sql, [table, job_ids, worker_id], function(err, rows) { | ||
if (err) { | ||
@@ -144,14 +133,11 @@ return done(err); | ||
DBQueue.prototype.size = function(queue_input, done) { | ||
var db = this.db; | ||
var queue_names = queue_input instanceof Array ? queue_input : [queue_input]; | ||
var queue_ph = queue_names.map(function() { | ||
return '?'; | ||
}).join(','); | ||
var db = this.db; | ||
var table = this.table; | ||
var total_jobs_sql = "" | ||
+ " SELECT COUNT(1) AS total" | ||
+ " FROM jobs" | ||
+ " WHERE queue IN (" + queue_ph + ")" | ||
+ " FROM ??" | ||
+ " WHERE queue IN (?)" | ||
; | ||
db.query(total_jobs_sql, queue_names, function(err, rows) { | ||
db.query(total_jobs_sql, [table, queue_input], function(err, rows) { | ||
if (err) { | ||
@@ -158,0 +144,0 @@ return done(err); |
{ | ||
"name": "dbqueue", | ||
"version": "1.0.0", | ||
"version": "1.1.0", | ||
"description": "A minimal, durable DB-based message queue system", | ||
@@ -5,0 +5,0 @@ "homepage": "https://github.com/elliotf/node-dbqueue", |
@@ -16,3 +16,5 @@ # DBQueue | ||
host: '127.0.0.1', | ||
port: 3306, // optional, defaults to 3306 | ||
user: 'root', | ||
table_name: 'custom_jobs_table', // optional, defaults to `jobs` | ||
password: '', | ||
@@ -19,0 +21,0 @@ database: 'dbqueue_testing_db', |
@@ -42,20 +42,44 @@ 'use strict'; | ||
// init the DB schema | ||
db.query("SHOW TABLES LIKE 'jobs'", function(err, rows, fields) { | ||
expect(err).to.not.exist(); | ||
var table_schema; | ||
if (rows.length) { | ||
return done(); | ||
} | ||
function createTable(table_name, done) { | ||
var sql = table_schema.replace('CREATE TABLE jobs', 'CREATE TABLE ' + table_name); | ||
fs.readFile(__dirname + '/../schema.sql', function(err, buffer) { | ||
return db.query(sql, function(err) { | ||
expect(err).to.not.exist(); | ||
var sql = buffer.toString(); | ||
done(); | ||
}); | ||
} | ||
db.query(sql, function(err) { | ||
function lazilyCreateTable(table_name, done) { | ||
db.query("SHOW TABLES LIKE '" + table_name + "'", function(err, rows, fields) { | ||
expect(err).to.not.exist(); | ||
if (rows.length) { | ||
return done(); | ||
} | ||
if (table_schema) { | ||
return createTable(table_name, done); | ||
} | ||
fs.readFile(__dirname + '/../schema.sql', function(err, buffer) { | ||
expect(err).to.not.exist(); | ||
return done(); | ||
table_schema = buffer.toString(); | ||
return createTable(table_name, done); | ||
}); | ||
}); | ||
} | ||
lazilyCreateTable('jobs', function(err) { | ||
expect(err).to.not.exist(); | ||
return lazilyCreateTable('custom_jobs_table', function(err) { | ||
expect(err).to.not.exist(); | ||
return done(); | ||
}); | ||
}); | ||
@@ -68,3 +92,7 @@ }); | ||
return done(); | ||
db.query('DELETE FROM custom_jobs_table', function(err, rows, fields) { | ||
expect(err).to.not.exist(); | ||
return done(); | ||
}); | ||
}); | ||
@@ -71,0 +99,0 @@ }); |
@@ -75,3 +75,3 @@ 'use strict'; | ||
} | ||
]) | ||
]); | ||
@@ -359,2 +359,57 @@ return done(); | ||
}); | ||
describe('integration tests', function() { | ||
describe('custom table name', function() { | ||
var queue; | ||
beforeEach(function(done) { | ||
uuid.v4.returns('fake_uuid_for_custom_table'); | ||
var custom_config = _.extend({}, helper.test_db_config, { | ||
table_name: 'custom_jobs_table', | ||
}); | ||
DBQueue.connect(custom_config, function(err, result) { | ||
expect(err).to.not.exist(); | ||
queue = result; | ||
return done(); | ||
}); | ||
}); | ||
context('when provided a custom table name', function() { | ||
it('uses the provided table name', function(done) { | ||
queue.insert('custom_table_queue', 'fake data for custom table queue', function(err) { | ||
expect(err).to.not.exist(); | ||
queue.size('custom_table_queue', function(err, size) { | ||
expect(err).to.not.exist(); | ||
expect(size).to.equal(1); | ||
db.query('SELECT * FROM jobs', function(err, rows) { | ||
expect(err).to.not.exist(); | ||
expect(rows).to.deep.equal([]); | ||
db.query('SELECT * FROM custom_jobs_table', function(err, rows) { | ||
expect(err).to.not.exist(); | ||
expect(rows).to.have.length(1); | ||
queue.consume('custom_table_queue', function(err, job, completionCallback) { | ||
expect(err).to.not.exist(); | ||
expect(job).to.exist(); | ||
expect(job.data).to.equal('fake data for custom table queue'); | ||
return done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
22754
511
89