Comparing version 1.1.0 to 2.0.0
60
main.js
@@ -47,15 +47,7 @@ 'use strict'; | ||
var table = this.table; | ||
var worker_id = this.worker | ||
var worker_id = uuid.v4(); | ||
var self = this; | ||
var lock_time = 60 * 15; | ||
var lock_time = 60 * 5; | ||
var free_job_ids_sql = "" | ||
+ " SELECT *" | ||
+ " FROM ??" | ||
+ " WHERE locked_until < NOW()" | ||
+ " AND queue IN (?)" | ||
+ " ORDER BY RAND()" | ||
+ " LIMIT 1" // eventually query/update batches to be easier on the DB | ||
; | ||
db.query(free_job_ids_sql, [table, queue_input], function(err, rows) { | ||
db.query("SELECT NOW() AS now, NOW() + INTERVAL ? SECOND AS lock_until", [lock_time], function(err, result) { | ||
if (err) { | ||
@@ -65,10 +57,5 @@ return done(err); | ||
if (!rows.length) { | ||
return done(); | ||
} | ||
var now = result[0].now; | ||
var lock_until = result[0].lock_until; | ||
var job_ids = rows.map(function(row) { | ||
return row.id; | ||
}); | ||
var reserve_jobs_sql = "" | ||
@@ -78,10 +65,10 @@ + " UPDATE ??" | ||
+ " worker = ?" | ||
+ " , locked_until = (NOW() + INTERVAL ? SECOND)" | ||
+ " , update_time = NOW()" | ||
+ " WHERE id IN (?)" | ||
+ " AND locked_until < NOW()" | ||
+ " , locked_until = ?" | ||
+ " , update_time = ?" | ||
+ " WHERE locked_until < ?" | ||
+ " AND queue IN (?)" | ||
+ " LIMIT 1" | ||
; | ||
db.query(reserve_jobs_sql, [table, worker_id, lock_time, job_ids], function(err, result) { | ||
db.query(reserve_jobs_sql, [table, worker_id, lock_until, now, now, queue_input], function(err, result) { | ||
if (err) { | ||
@@ -91,10 +78,14 @@ return done(err); | ||
if (!result.affectedRows) { | ||
return done(); | ||
} | ||
var find_reserved_jobs_sql = "" | ||
+ " SELECT *" | ||
+ " FROM ??" | ||
+ " WHERE id IN (?)" | ||
+ " AND worker = ?" | ||
+ " WHERE worker = ?" | ||
+ " AND locked_until = ?" | ||
; | ||
db.query(find_reserved_jobs_sql, [table, job_ids, worker_id], function(err, rows) { | ||
db.query(find_reserved_jobs_sql, [table, worker_id, lock_until], function(err, rows) { | ||
if (err) { | ||
@@ -105,3 +96,12 @@ return done(err); | ||
var job = rows[0]; | ||
function finished(done) { | ||
function finishedWithJob(err, done) { | ||
if (!(done instanceof Function)) { | ||
done = function() {}; | ||
} | ||
if (err) { | ||
return done(); | ||
} | ||
var remove_job_sql = "" | ||
@@ -116,9 +116,7 @@ + " DELETE FROM jobs" | ||
if (done instanceof Function) { | ||
done(); | ||
} | ||
done(); | ||
}); | ||
} | ||
return done(null, job, finished); | ||
return done(null, job.data, finishedWithJob); | ||
}); | ||
@@ -125,0 +123,0 @@ }); |
{ | ||
"name": "dbqueue", | ||
"version": "1.1.0", | ||
"version": "2.0.0", | ||
"description": "A minimal, durable DB-based message queue system", | ||
@@ -5,0 +5,0 @@ "homepage": "https://github.com/elliotf/node-dbqueue", |
@@ -55,6 +55,7 @@ # DBQueue | ||
// then let the queue know the job has been handled | ||
finished(); | ||
// passing an err to the finished callback will leave the job on the queue | ||
finished(some_err); | ||
// if you would like to get confirmation that the job has been cleared from the queue: | ||
finished(function(err) { | ||
// or if you would like to get confirmation that the job has been cleared from the queue: | ||
finished(null, function(err) { | ||
if (err) { | ||
@@ -61,0 +62,0 @@ // job is likely still on the queue |
@@ -27,4 +27,2 @@ 'use strict'; | ||
fake_uuid = 'fakeuuid-0000-1111-2222-333333333333'; | ||
this.sinon.stub(uuid, 'v4').returns(fake_uuid) | ||
}); | ||
@@ -125,9 +123,8 @@ | ||
it('returns a job from the queue', function(done) { | ||
fake_uuid = 'fakeuuid-0000-1111-2222-333333333333'; | ||
this.sinon.stub(uuid, 'v4').returns(fake_uuid) | ||
queue.consume('queue_a', function(err, job) { | ||
expect(err).to.not.exist(); | ||
expect(withoutTimestamps(job)).to.deep.equal({ | ||
queue: 'queue_a', | ||
data: 'fake data for a', | ||
worker: 'fakeuuid-0000-1111-2222-333333333333', | ||
}); | ||
expect(job).to.deep.equal('fake data for a'); | ||
@@ -199,3 +196,3 @@ return done(); | ||
finished(function(err) { | ||
finished(null, function(err) { | ||
expect(err).to.not.exist(); | ||
@@ -223,3 +220,3 @@ | ||
finished(function(err) { | ||
finished(null, function(err) { | ||
expect(err).to.not.exist(); | ||
@@ -239,2 +236,22 @@ | ||
context('with an error', function() { | ||
it('leaves the job on the queue', function(done) { | ||
queue.consume('queue_a', function(err, job, finishedWithJob) { | ||
expect(err).to.not.exist(); | ||
finishedWithJob(new Error('fake error'), function(err) { | ||
expect(err).to.not.exist(); | ||
db.query('SELECT * FROM jobs WHERE queue = ?', ['queue_a'], function(err, rows) { | ||
expect(err).to.not.exist(); | ||
expect(rows).to.have.length(1); | ||
return done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
context('without a callback', function() { | ||
@@ -257,3 +274,3 @@ it('removes the job from the queue without error', function(done) { | ||
}); | ||
}, 100); | ||
}, 50); | ||
}); | ||
@@ -265,2 +282,34 @@ }); | ||
context('when there is more than one job', function() { | ||
beforeEach(function(done) { | ||
queue.insert('queue_a', 'first', function(err) { | ||
expect(err).to.not.exist(); | ||
queue.insert('queue_a', 'second', function(err) { | ||
expect(err).to.not.exist(); | ||
return done(); | ||
}); | ||
}); | ||
}); | ||
it('returns each one', function(done) { | ||
queue.consume('queue_a', function(err, first) { | ||
expect(err).to.not.exist(); | ||
expect(first).to.exist(); | ||
queue.consume('queue_a', function(err, second) { | ||
expect(err).to.not.exist(); | ||
expect(second).to.exist(); | ||
expect(second).to.not.deep.equal(first); | ||
return done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
context('when the desired queue is empty', function() { | ||
@@ -288,2 +337,22 @@ it('returns nothing', function(done) { | ||
}); | ||
context('when all of the jobs are locked', function() { | ||
it('returns nothing', function(done) { | ||
queue.insert('queue_a', 'some data', function(err) { | ||
expect(err).to.not.exist(); | ||
queue.consume('queue_a', function(err, job) { | ||
expect(err).to.not.exist(); | ||
queue.consume('queue_a', function(err, job) { | ||
expect(err).to.not.exist(); | ||
expect(job).to.not.exist(); | ||
return done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
@@ -372,3 +441,2 @@ | ||
beforeEach(function(done) { | ||
uuid.v4.returns('fake_uuid_for_custom_table'); | ||
var custom_config = _.extend({}, helper.test_db_config, { | ||
@@ -410,3 +478,3 @@ table_name: 'custom_jobs_table', | ||
expect(job).to.exist(); | ||
expect(job.data).to.equal('fake data for custom table queue'); | ||
expect(job).to.equal('fake data for custom table queue'); | ||
@@ -413,0 +481,0 @@ return done(); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
25109
2
11
554
90