Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

better-queue

Package Overview
Dependencies
Maintainers
2
Versions
59
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

better-queue - npm Package Compare versions

Comparing version 2.2.3 to 3.0.1

test/store.js

123

lib/queue.js

@@ -31,3 +31,3 @@ var uuid = require('node-uuid');

self.merge = opts.merge || function (oldTask, newTask, cb) { cb(null, newTask) };
self.id = opts.id || 'id';
self.id = opts.id || false;
self.priority = opts.priority || null;

@@ -39,9 +39,12 @@

self.batchSize = opts.batchSize || 1;
self.batchDelay = opts.batchDelay || 0;
self.afterProcessDelay = opts.afterProcessDelay || 0;
self.concurrent = opts.concurrent || 1;
self.processDelay = opts.processDelay || 0;
self.maxTimeout = opts.maxTimeout || Infinity;
self.idleTimeout = opts.idleTimeout || 0;
self.maxRetries = opts.maxRetries || 0;
self.storeMaxRetries = opts.storeMaxRetries || Infinity;
self.storeRetryTimeout = opts.storeRetryTimeout || 1000;
// Statuses
self.length = 0;
self._stopped = false;

@@ -52,2 +55,3 @@ self._saturated = false;

self._connected = false;
self._storeRetries = 0;

@@ -70,3 +74,3 @@ // Locks

// Initialize Storage
self.use(opts.store || 'memory');
self.use(opts.store || 'sqlite');
if (!self._store) {

@@ -124,5 +128,19 @@ throw new Error('Queue cannot continue without a valid store.')

self._tasksWaitingForConnect = [];
self._store.connect(function (err) {
if (err) return;
self._connectToStore();
}
Queue.prototype._connectToStore = function () {
var self = this;
if (self._connected) return;
if (self._storeRetries >= self.storeMaxRetries) {
return self.emit('error', new Error('failed_connect_to_store'));
}
self._storeRetries++;
self._store.connect(function (err, len) {
if (err) return setTimeout(function () {
self._connectToStore();
}, self.storeRetryTimeout);
self._connected = true;
self.length = len;
self._storeRetries = 0;
if (!self._stopped && self.autoResume) {

@@ -135,2 +153,3 @@ self.resume();

})
}

@@ -147,3 +166,3 @@

setTimeout(function () {
self._processNext();
self._processIfFull();
}, 0)

@@ -241,3 +260,2 @@ }

}
// If something else has written to taskId, then wait.

@@ -251,2 +269,3 @@ if (self._writing[taskId].id !== writeId) {

// Task is in the queue
self.length++;
self.emit('task_queued', taskId, task);

@@ -264,15 +283,7 @@ self._tickets[taskId].queued();

self._hasMore = true;
return finishedWrite();
}
// If we're not already waiting, wait processDelay before starting.
if (!self._timeout) {
self._timeout = setTimeout(function () {
self._timeout = null;
self._processNext();
}, self.processDelay)
}
// Write more
finishedWrite()
// Finish writing
finishedWrite();
self._processIfFull();
})

@@ -375,3 +386,2 @@ }

Queue.prototype._drained = function () {
this._emptied();
if (this._calledDrain) return;

@@ -386,3 +396,3 @@ this._calledDrain = true;

Queue.prototype._processNext = function () {
Queue.prototype._processIfFull = function () {
var self = this;

@@ -394,4 +404,33 @@ self._saturated = (self._running >= self.concurrent);

if (self._fetchingNext) return;
if (!self.length) {
if (!self._hasMore) {
self._emptied();
if (!self._running) {
self._drained();
}
}
return;
}
if (self.length >= self.batchSize) {
if (self._timeout) {
clearTimeout(self._timeout);
self._timeout = null;
}
setImmediate(function () {
self._processNext();
})
} else if (!self._timeout) {
self._timeout = setTimeout(function () {
self._timeout = null;
self._processNext();
}, self.batchDelay)
}
}
Queue.prototype._processNext = function () {
var self = this;
// Fetch next batch
// FIXME: There may still be things writing
self._hasMore = false;

@@ -402,17 +441,25 @@ self._fetchingNext = true;

if (err || !batch) return;
var batchSize = Object.keys(batch).length;
var isEmpty = (batchSize === 0);
if (!self._hasMore) {
var isEmpty = !Object.keys(batch).length;
if (isEmpty && !self._running) {
return self._drained();
if (self.length < batchSize) {
self.length = batchSize;
}
self.length -= batchSize;
if (!self._hasMore && isEmpty) {
self._emptied();
if (!self._running) {
self._drained();
}
if (isEmpty) {
return self._emptied();
}
} else if (!Object.keys(batch).length) {
return setImmediate(function () {
self._processNext();
})
return;
}
// TODO: Collect stats
// The write queue wasn't empty on fetch, so we should fetch more.
if (self._hasMore && isEmpty) {
return self._processIfFull()
}
var tickets = {};

@@ -430,5 +477,3 @@ Object.keys(batch).forEach(function (taskId) {

// Continue processing until saturated
setImmediate(function () {
self._processNext();
})
self._processIfFull();
}

@@ -504,9 +549,9 @@

});
if (self.idleTimeout) {
if (self.afterProcessDelay) {
setTimeout(function () {
self._processNext();
}, self.idleTimeout);
self._processIfFull();
}, self.afterProcessDelay);
} else {
setImmediate(function () {
self._processNext();
self._processIfFull();
})

@@ -513,0 +558,0 @@ }

@@ -21,3 +21,3 @@

MemoryStore.prototype.connect = function (cb) {
cb()
cb(null, this._queue.length);
}

@@ -24,0 +24,0 @@

@@ -35,4 +35,10 @@ var uuid = require('node-uuid');

self.db = new sqlite3.Database(self.path, function (err) {
if (err) return cb(err);
self.db.run("CREATE TABLE IF NOT EXISTS tasks (id TEXT UNIQUE, lock TEXT, task TEXT, priority NUMERIC, added INTEGER PRIMARY KEY AUTOINCREMENT)", cb);
if (err) return cb({ message: 'failed_to_open_sqlite_db' });
self.db.run("CREATE TABLE IF NOT EXISTS tasks (id TEXT UNIQUE, lock TEXT, task TEXT, priority NUMERIC, added INTEGER PRIMARY KEY AUTOINCREMENT)", function (err) {
if (err) return cb({ message: 'failed_to_create_table' });
self.db.get("SELECT count(*) as n FROM tasks WHERE lock = ''", function (err, row) {
if (err) return cb({ message: 'failed_to_fetch_count' });
cb(null, row.n);
});
});
});

@@ -39,0 +45,0 @@ }

{
"name": "better-queue",
"version": "2.2.3",
"version": "3.0.1",
"description": "Better Queue for NodeJS",

@@ -5,0 +5,0 @@ "main": "lib/queue.js",

@@ -357,3 +357,3 @@ # Better Queue - Powerful flow control

cb();
}, { batchSize: 5, processDelay: 2000 })
}, { batchSize: 5, batchDelay: 2000 })
q.push(1);

@@ -365,3 +365,3 @@ setTimeout(function () {

You can also set `idleTimeout`, which will delay processing between tasks.
You can also set `afterProcessDelay`, which will delay processing between tasks.

@@ -371,3 +371,3 @@ ```js

cb(); // Will wait 1 second before taking the next task
}, { idleTimeout: 1000 })
}, { afterProcessDelay: 1000 })
q.push(1);

@@ -590,3 +590,2 @@ q.push(2);

- `merge` - function to merge tasks with the same task ID. Will be run with `oldTask`, `newTask` and a callback `cb(error, mergedTask)`. If you define this function then the callback is expected to be called.
- `id` - By default, this will be the "id" property of the task (if it's an object.) This can be a string representing which property of the task to be used as the ID. It can also be a function that takes in a task and returns a callback `cb(error, taskId)`.
- `priority` - function to determine the priority of a task. Takes in a task and returns callback `cb(error, priority)`.

@@ -596,2 +595,3 @@

- `id` - The property to use as the task ID. This can be a string or a function (for more complicated IDs). The function `(task, cb)` and must call the callback with `cb(error, taskId)`.
- `cancelIfRunning` - If true, when a task with the same ID is running, its worker will be cancelled. Defaults to `false`.

@@ -601,7 +601,9 @@ - `autoResume` - If true, tasks in the store will automatically start processing once it connects to the store. Defaults to `true`.

- `batchSize` - The number of tasks (at most) that can be processed at once. Defaults to `1`.
- `batchDelay` - Number of milliseconds to delay before starting to popping items off the queue. Defaults to `0`.
- `concurrent` - Number of workers that can be running at any given time. Defaults to `1`.
- `processDelay` - Number of milliseconds to delay before starting to popping items off the queue. Defaults to `0`.
- `maxTimeout` - Number of milliseconds before a task is considered timed out. Defaults to `Infinity`.
- `idleTimeout` - Number of milliseconds to delay before processing the next batch of items. Defaults to `1`.
- `afterProcessDelay` - Number of milliseconds to delay before processing the next batch of items. Defaults to `1`.
- `maxRetries` - Maximum number of attempts to retry on a failed task. Defaults to `0`.
- `storeMaxRetries` - Maximum number of attempts before giving up on the store. Defaults to `Infinity`.
- `storeRetryTimeout` - Number of milliseconds to delay before trying to connect to the store again. Defaults to `1000`.
- `store` - Represents the options for the initial store. Can be an object containing `{ type: storeType, ... options ... }`, or the store instance itself.

@@ -608,0 +610,0 @@

@@ -96,11 +96,29 @@ var Queue = require('./lib/queue')

var counter = new Queue(function (task, cb) { cb() }, { id: 'id', store: { type: 'sqlite', path: './abc.db' } })
counter.on('task_finish', function (taskId, result) {
// taskId will be 'jim' or 'bob'
console.log(taskId)
})
counter.push({ id: 'jim', count: 2 });
counter.push({ id: 'jim', count: 2 });
counter.push({ id: 'bob', count: 1 });
// var counter = new Queue(function (task, cb) { cb() }, { id: 'id', store: { type: 'sqlite', path: './abc.db' } })
// counter.on('task_finish', function (taskId, result) {
// // taskId will be 'jim' or 'bob'
// console.log(taskId)
// })
// counter.push({ id: 'jim', count: 2 });
// counter.push({ id: 'jim', count: 2 });
// counter.push({ id: 'bob', count: 1 });
// var q = new Queue(function (b, cb) {
// console.log("Pushed %s.", b.join(', '));
// cb();
// }, { batchSize: 3, batchDelay: 950 })
// q.push(1);
// q.push(2);
// q.push(3);
// q.push(4);
// setTimeout(function () {
// q.push(5);
// q.push(6);
// setTimeout(function () {
// q.push(7);
// q.push(8);
// q.push(9);
// q.push(10);
// }, 1000)
// }, 400)

@@ -152,2 +152,39 @@ var assert = require('assert');

it('should batch delay', function (done) {
var batches = 0;
var q = new Queue(function (batch, cb) {
batches++;
if (batches === 1) {
assert.equal(batch.length, 2);
return cb();
}
if (batches === 2) {
assert.equal(batch.length, 1);
cb();
return done();
}
}, { batchSize: 2, batchDelay: 2 });
q.push(1);
q.push(2);
q.push(3);
})
it('should batch 2', function (done) {
var finished = 0;
var q = new Queue(function (batch, cb) {
finished++;
assert.equal(batch.length, 1);
if (finished >= 2) {
done();
}
cb();
}, { batchSize: 2, batchDelay: 1 });
q.push(1)
.on('queued', function () {
setTimeout(function () {
q.push(2);
}, 2)
})
})
it('should drain and empty', function (done) {

@@ -154,0 +191,0 @@ var emptied = false;

@@ -130,2 +130,3 @@ var assert = require('assert');

}, {
id: 'id',
merge: function (a, b, cb) {

@@ -254,3 +255,3 @@ a.x += b.x;

}
}, { cancelIfRunning: true })
}, { id: 'id', cancelIfRunning: true })
q.push({ id: 1 })

@@ -265,5 +266,3 @@ .on('started', function () {

// TODO: Test progress
// TODO: Test stores
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc