Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

dbqueue

Package Overview
Dependencies
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

dbqueue - npm Package Compare versions

Comparing version 1.1.0 to 2.0.0

TODO.md

60

main.js

@@ -47,15 +47,7 @@ 'use strict';

var table = this.table;
var worker_id = this.worker
var worker_id = uuid.v4();
var self = this;
var lock_time = 60 * 15;
var lock_time = 60 * 5;
var free_job_ids_sql = ""
+ " SELECT *"
+ " FROM ??"
+ " WHERE locked_until < NOW()"
+ " AND queue IN (?)"
+ " ORDER BY RAND()"
+ " LIMIT 1" // eventually query/update batches to be easier on the DB
;
db.query(free_job_ids_sql, [table, queue_input], function(err, rows) {
db.query("SELECT NOW() AS now, NOW() + INTERVAL ? SECOND AS lock_until", [lock_time], function(err, result) {
if (err) {

@@ -65,10 +57,5 @@ return done(err);

if (!rows.length) {
return done();
}
var now = result[0].now;
var lock_until = result[0].lock_until;
var job_ids = rows.map(function(row) {
return row.id;
});
var reserve_jobs_sql = ""

@@ -78,10 +65,10 @@ + " UPDATE ??"

+ " worker = ?"
+ " , locked_until = (NOW() + INTERVAL ? SECOND)"
+ " , update_time = NOW()"
+ " WHERE id IN (?)"
+ " AND locked_until < NOW()"
+ " , locked_until = ?"
+ " , update_time = ?"
+ " WHERE locked_until < ?"
+ " AND queue IN (?)"
+ " LIMIT 1"
;
db.query(reserve_jobs_sql, [table, worker_id, lock_time, job_ids], function(err, result) {
db.query(reserve_jobs_sql, [table, worker_id, lock_until, now, now, queue_input], function(err, result) {
if (err) {

@@ -91,10 +78,14 @@ return done(err);

if (!result.affectedRows) {
return done();
}
var find_reserved_jobs_sql = ""
+ " SELECT *"
+ " FROM ??"
+ " WHERE id IN (?)"
+ " AND worker = ?"
+ " WHERE worker = ?"
+ " AND locked_until = ?"
;
db.query(find_reserved_jobs_sql, [table, job_ids, worker_id], function(err, rows) {
db.query(find_reserved_jobs_sql, [table, worker_id, lock_until], function(err, rows) {
if (err) {

@@ -105,3 +96,12 @@ return done(err);

var job = rows[0];
function finished(done) {
function finishedWithJob(err, done) {
if (!(done instanceof Function)) {
done = function() {};
}
if (err) {
return done();
}
var remove_job_sql = ""

@@ -116,9 +116,7 @@ + " DELETE FROM jobs"

if (done instanceof Function) {
done();
}
done();
});
}
return done(null, job, finished);
return done(null, job.data, finishedWithJob);
});

@@ -125,0 +123,0 @@ });

{
"name": "dbqueue",
"version": "1.1.0",
"version": "2.0.0",
"description": "A minimal, durable DB-based message queue system",

@@ -5,0 +5,0 @@ "homepage": "https://github.com/elliotf/node-dbqueue",

@@ -55,6 +55,7 @@ # DBQueue

// then let the queue know the job has been handled
finished();
// passing an err to the finished callback will leave the job on the queue
finished(some_err);
// if you would like to get confirmation that the job has been cleared from the queue:
finished(function(err) {
// or if you would like to get confirmation that the job has been cleared from the queue:
finished(null, function(err) {
if (err) {

@@ -61,0 +62,0 @@ // job is likely still on the queue

@@ -27,4 +27,2 @@ 'use strict';

fake_uuid = 'fakeuuid-0000-1111-2222-333333333333';
this.sinon.stub(uuid, 'v4').returns(fake_uuid)
});

@@ -125,9 +123,8 @@

it('returns a job from the queue', function(done) {
fake_uuid = 'fakeuuid-0000-1111-2222-333333333333';
this.sinon.stub(uuid, 'v4').returns(fake_uuid)
queue.consume('queue_a', function(err, job) {
expect(err).to.not.exist();
expect(withoutTimestamps(job)).to.deep.equal({
queue: 'queue_a',
data: 'fake data for a',
worker: 'fakeuuid-0000-1111-2222-333333333333',
});
expect(job).to.deep.equal('fake data for a');

@@ -199,3 +196,3 @@ return done();

finished(function(err) {
finished(null, function(err) {
expect(err).to.not.exist();

@@ -223,3 +220,3 @@

finished(function(err) {
finished(null, function(err) {
expect(err).to.not.exist();

@@ -239,2 +236,22 @@

context('with an error', function() {
it('leaves the job on the queue', function(done) {
queue.consume('queue_a', function(err, job, finishedWithJob) {
expect(err).to.not.exist();
finishedWithJob(new Error('fake error'), function(err) {
expect(err).to.not.exist();
db.query('SELECT * FROM jobs WHERE queue = ?', ['queue_a'], function(err, rows) {
expect(err).to.not.exist();
expect(rows).to.have.length(1);
return done();
});
});
});
});
});
context('without a callback', function() {

@@ -257,3 +274,3 @@ it('removes the job from the queue without error', function(done) {

});
}, 100);
}, 50);
});

@@ -265,2 +282,34 @@ });

context('when there is more than one job', function() {
beforeEach(function(done) {
queue.insert('queue_a', 'first', function(err) {
expect(err).to.not.exist();
queue.insert('queue_a', 'second', function(err) {
expect(err).to.not.exist();
return done();
});
});
});
it('returns each one', function(done) {
queue.consume('queue_a', function(err, first) {
expect(err).to.not.exist();
expect(first).to.exist();
queue.consume('queue_a', function(err, second) {
expect(err).to.not.exist();
expect(second).to.exist();
expect(second).to.not.deep.equal(first);
return done();
});
});
});
});
context('when the desired queue is empty', function() {

@@ -288,2 +337,22 @@ it('returns nothing', function(done) {

});
context('when all of the jobs are locked', function() {
it('returns nothing', function(done) {
queue.insert('queue_a', 'some data', function(err) {
expect(err).to.not.exist();
queue.consume('queue_a', function(err, job) {
expect(err).to.not.exist();
queue.consume('queue_a', function(err, job) {
expect(err).to.not.exist();
expect(job).to.not.exist();
return done();
});
});
});
});
});
});

@@ -372,3 +441,2 @@

beforeEach(function(done) {
uuid.v4.returns('fake_uuid_for_custom_table');
var custom_config = _.extend({}, helper.test_db_config, {

@@ -410,3 +478,3 @@ table_name: 'custom_jobs_table',

expect(job).to.exist();
expect(job.data).to.equal('fake data for custom table queue');
expect(job).to.equal('fake data for custom table queue');

@@ -413,0 +481,0 @@ return done();

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc