Comparing version 1.2.0 to 1.3.0
{ | ||
"name": "fastq", | ||
"version": "1.2.0", | ||
"version": "1.3.0", | ||
"description": "Fast, in memory work queue", | ||
@@ -16,3 +16,3 @@ "main": "queue.js", | ||
"type": "git", | ||
"url": "git+https://github.com/mcollina/fastqueue.git" | ||
"url": "git+https://github.com/mcollina/fastq.git" | ||
}, | ||
@@ -28,5 +28,5 @@ "keywords": [ | ||
"bugs": { | ||
"url": "https://github.com/mcollina/fastqueue/issues" | ||
"url": "https://github.com/mcollina/fastq/issues" | ||
}, | ||
"homepage": "https://github.com/mcollina/fastqueue#readme", | ||
"homepage": "https://github.com/mcollina/fastq#readme", | ||
"devDependencies": { | ||
@@ -33,0 +33,0 @@ "async": "^1.2.1", |
112
queue.js
@@ -5,5 +5,5 @@ 'use strict' | ||
function fastqueue (context, worker, limit) { | ||
function fastqueue (context, worker, concurrency) { | ||
if (typeof context === 'function') { | ||
limit = worker | ||
concurrency = worker | ||
worker = context | ||
@@ -16,4 +16,18 @@ context = null | ||
var queueTail = null | ||
var _running = 0 | ||
var self = { | ||
push: push | ||
push: push, | ||
drain: noop, | ||
saturated: noop, | ||
pause: pause, | ||
paused: false, | ||
concurrency: concurrency, | ||
running: running, | ||
resume: resume, | ||
idle: idle, | ||
length: length, | ||
unshift: unshift, | ||
empty: noop, | ||
kill: kill | ||
} | ||
@@ -23,2 +37,35 @@ | ||
function running () { | ||
return _running | ||
} | ||
function pause () { | ||
self.paused = true | ||
} | ||
function length () { | ||
var current = queueHead | ||
var counter = 0 | ||
while (current) { | ||
current = current.next | ||
counter++ | ||
} | ||
return counter | ||
} | ||
function resume () { | ||
if (!self.paused) return | ||
self.paused = false | ||
for (var i = 0; i < self.concurrency; i++) { | ||
_running++ | ||
release() | ||
} | ||
} | ||
function idle () { | ||
return _running === 0 | ||
} | ||
function push (value, done) { | ||
@@ -32,3 +79,3 @@ var current = cache.get() | ||
if (limit === 0) { | ||
if (_running === self.concurrency || self.paused) { | ||
if (queueTail) { | ||
@@ -40,5 +87,6 @@ queueTail.next = current | ||
queueTail = current | ||
self.saturated() | ||
} | ||
} else { | ||
limit-- | ||
_running++ | ||
worker.call(context, current.value, current.worked) | ||
@@ -48,16 +96,54 @@ } | ||
function unshift (value, done) { | ||
var current = cache.get() | ||
current.context = context | ||
current.release = release | ||
current.value = value | ||
current.callback = done | ||
if (_running === self.concurrency || self.paused) { | ||
if (queueHead) { | ||
current.next = queueHead | ||
queueHead = current | ||
} else { | ||
queueHead = current | ||
queueTail = current | ||
self.saturated() | ||
} | ||
} else { | ||
_running++ | ||
worker.call(context, current.value, current.worked) | ||
} | ||
} | ||
function release (holder) { | ||
cache.release(holder) | ||
if (holder) { | ||
cache.release(holder) | ||
} | ||
var next = queueHead | ||
if (next) { | ||
if (queueTail === queueHead) { | ||
queueTail = null | ||
if (!self.paused) { | ||
if (queueTail === queueHead) { | ||
queueTail = null | ||
} | ||
queueHead = next.next | ||
next.next = null | ||
worker.call(context, next.value, next.worked) | ||
if (queueTail === null) { | ||
self.empty() | ||
} | ||
} else { | ||
_running-- | ||
} | ||
queueHead = next.next | ||
next.next = null | ||
worker.call(context, next.value, next.worked) | ||
} else { | ||
limit++ | ||
} else if (--_running === 0) { | ||
self.drain() | ||
} | ||
} | ||
function kill () { | ||
queueHead = null | ||
queueTail = null | ||
self.drain = noop | ||
} | ||
} | ||
@@ -64,0 +150,0 @@ |
122
README.md
# fastq [![build status](https://secure.travis-ci.org/mcollina/fastq.png)](http://travis-ci.org/mcollina/fastq) | ||
Fast, in memory work queue. | ||
Fast, in memory work queue. `fastq` is API compatible with | ||
[`async.queue`](https://github.com/caolan/async#queueworker-concurrency) | ||
Benchmarks (1 million tasks): | ||
* setImmedidate: 1715ms | ||
* fastq: 1824ms | ||
* async.queue: 6158ms | ||
* setImmedidate: 1313ms | ||
* fastq: 1462ms | ||
* async.queue: 3989ms | ||
Obtained on node 0.12.3, on a HP Spectre 360 (the Build 2015 edition). | ||
Obtained on node 4.2.2, on a MacBook Pro 2014 (i7, 16GB of RAM). | ||
@@ -19,2 +20,6 @@ If you need zero-overhead series function call, check out | ||
* <a href="#install">Installation</a> | ||
* <a href="#basic">Basic Example</a> | ||
* <a href="#api">API</a> | ||
* <a href="#licence">Licence & copyright</a> | ||
@@ -62,4 +67,111 @@ ## Install | ||
## API | ||
* <a href="#fastqueue"><code>fastqueue()</code></a> | ||
* <a href="#push"><code>queue#<b>push()</b></code></a> | ||
* <a href="#unshift"><code>queue#<b>unshift()</b></code></a> | ||
* <a href="#pause"><code>queue#<b>pause()</b></code></a> | ||
* <a href="#resume"><code>queue#<b>resume()</b></code></a> | ||
* <a href="#idle"><code>queue#<b>idle()</b></code></a> | ||
* <a href="#length"><code>queue#<b>length()</b></code></a> | ||
* <a href="#kill"><code>queue#<b>kill()</b></code></a> | ||
* <a href="#concurrency"><code>queue#<b>concurrency</b></code></a> | ||
* <a href="#drain"><code>queue#<b>drain</b></code></a> | ||
* <a href="#empty"><code>queue#<b>empty</b></code></a> | ||
* <a href="#saturated"><code>queue#<b>saturated</b></code></a> | ||
------------------------------------------------------- | ||
<a name="fastqueue"></a> | ||
### fastqueue([that], worker, concurrency) | ||
Creates a new queue. | ||
Arguments: | ||
* `that`, optional context of the `worker` function. | ||
* `worker`, worker function, it would be called with `that` as `this`, | ||
if that is specified. | ||
* `concurrency`, number of concurrent tasks that could be executed in | ||
parallel. | ||
------------------------------------------------------- | ||
<a name="push"></a> | ||
### queue.push(task, done) | ||
Add a task at the end of the queue. `done(err, result)` will be called | ||
when the task was processed. | ||
------------------------------------------------------- | ||
<a name="unshift"></a> | ||
### queue.unshift(task, done) | ||
Add a task at the beginning of the queue. `done(err, result)` will be called | ||
when the task was processed. | ||
------------------------------------------------------- | ||
<a name="pause"></a> | ||
### queue.pause() | ||
Pause the processing of tasks. Currently worked tasks are not | ||
stopped. | ||
------------------------------------------------------- | ||
<a name="resume"></a> | ||
### queue.resume() | ||
Resume the processing of tasks. | ||
------------------------------------------------------- | ||
<a name="idle"></a> | ||
### queue.idle() | ||
Returns `true` if there are tasks being processed or waiting to be processed. | ||
`false` otherwise. | ||
------------------------------------------------------- | ||
<a name="length"></a> | ||
### queue.length() | ||
Returns the number of tasks waiting to be processed (in the queue). | ||
------------------------------------------------------- | ||
<a name="kill"></a> | ||
### queue.kill() | ||
Removes all tasks waiting to be processed, and reset `drain` to an empty | ||
function. | ||
------------------------------------------------------- | ||
<a name="concurrency"></a> | ||
### queue.concurrency | ||
Property that returns the number of concurrent tasks that could be executed in | ||
parallel. It can be altered at runtime. | ||
------------------------------------------------------- | ||
<a name="drain"></a> | ||
### queue.drain | ||
Function that will be called when the last | ||
item from the queue has been processed by a worker. | ||
It can be altered at runtime. | ||
------------------------------------------------------- | ||
<a name="empty"></a> | ||
### queue.empty | ||
Function that will be called when the last | ||
item from the queue has been assigned to a worker. | ||
It can be altered at runtime. | ||
------------------------------------------------------- | ||
<a name="saturated"></a> | ||
### queue.saturated | ||
Function that will be called when the queue hits the concurrency | ||
limit. | ||
It can be altered at runtime. | ||
## License | ||
ISC |
283
test.js
@@ -58,3 +58,2 @@ 'use strict' | ||
function worker (arg, cb) { | ||
console.log('received', arg) | ||
t.equal(arg, toExec[count], 'arg matches') | ||
@@ -107,1 +106,283 @@ count++ | ||
}) | ||
test('drain', function (t) { | ||
t.plan(4) | ||
var queue = buildQueue(worker, 1) | ||
var worked = false | ||
queue.push(42, function (err, result) { | ||
t.error(err, 'no error') | ||
t.equal(result, true, 'result matches') | ||
}) | ||
queue.drain = function () { | ||
t.equal(true, worked, 'drained') | ||
} | ||
function worker (arg, cb) { | ||
t.equal(arg, 42) | ||
worked = true | ||
setImmediate(cb, null, true) | ||
} | ||
}) | ||
test('pause && resume', function (t) { | ||
t.plan(7) | ||
var queue = buildQueue(worker, 1) | ||
var worked = false | ||
t.notOk(queue.paused, 'it should not be paused') | ||
queue.pause() | ||
queue.push(42, function (err, result) { | ||
t.error(err, 'no error') | ||
t.equal(result, true, 'result matches') | ||
}) | ||
t.notOk(worked, 'it should be paused') | ||
t.ok(queue.paused, 'it should be paused') | ||
queue.resume() | ||
t.notOk(queue.paused, 'it should not be paused') | ||
function worker (arg, cb) { | ||
t.equal(arg, 42) | ||
worked = true | ||
cb(null, true) | ||
} | ||
}) | ||
test('altering concurrency', function (t) { | ||
t.plan(7) | ||
var queue = buildQueue(worker, 1) | ||
var count = 0 | ||
queue.pause() | ||
queue.push(24, workDone) | ||
queue.push(24, workDone) | ||
queue.concurrency = 2 | ||
queue.resume() | ||
t.equal(queue.running(), 2, '2 jobs running') | ||
function workDone (err, result) { | ||
t.error(err, 'no error') | ||
t.equal(result, true, 'result matches') | ||
} | ||
function worker (arg, cb) { | ||
t.equal(0, count, 'works in parallel') | ||
setImmediate(function () { | ||
count++ | ||
cb(null, true) | ||
}) | ||
} | ||
}) | ||
test('idle()', function (t) { | ||
t.plan(12) | ||
var queue = buildQueue(worker, 1) | ||
t.ok(queue.idle(), 'queue is idle') | ||
queue.push(42, function (err, result) { | ||
t.error(err, 'no error') | ||
t.equal(result, true, 'result matches') | ||
t.notOk(queue.idle(), 'queue is not idle') | ||
}) | ||
queue.push(42, function (err, result) { | ||
t.error(err, 'no error') | ||
t.equal(result, true, 'result matches') | ||
// it will go idle after executing this function | ||
setImmediate(function () { | ||
t.ok(queue.idle(), 'queue is now idle') | ||
}) | ||
}) | ||
t.notOk(queue.idle(), 'queue is not idle') | ||
function worker (arg, cb) { | ||
t.notOk(queue.idle(), 'queue is not idle') | ||
t.equal(arg, 42) | ||
setImmediate(cb, null, true) | ||
} | ||
}) | ||
test('saturated', function (t) { | ||
t.plan(9) | ||
var queue = buildQueue(worker, 1) | ||
var preworked = 0 | ||
var worked = 0 | ||
queue.saturated = function () { | ||
t.pass('saturated') | ||
t.equal(preworked, 1, 'started 1 task') | ||
t.equal(worked, 0, 'worked zero task') | ||
} | ||
queue.push(42, done) | ||
queue.push(42, done) | ||
function done (err, result) { | ||
t.error(err, 'no error') | ||
t.equal(result, true, 'result matches') | ||
} | ||
function worker (arg, cb) { | ||
t.equal(arg, 42) | ||
preworked++ | ||
setImmediate(function () { | ||
worked++ | ||
cb(null, true) | ||
}) | ||
} | ||
}) | ||
test('length', function (t) { | ||
t.plan(7) | ||
var queue = buildQueue(worker, 1) | ||
t.equal(queue.length(), 0, 'nothing waiting') | ||
queue.push(42, done) | ||
t.equal(queue.length(), 0, 'nothing waiting') | ||
queue.push(42, done) | ||
t.equal(queue.length(), 1, 'one task waiting') | ||
queue.push(42, done) | ||
t.equal(queue.length(), 2, 'two tasks waiting') | ||
function done (err, result) { | ||
t.error(err, 'no error') | ||
} | ||
function worker (arg, cb) { | ||
setImmediate(function () { | ||
cb(null, true) | ||
}) | ||
} | ||
}) | ||
test('unshift', function (t) { | ||
t.plan(8) | ||
var queue = buildQueue(worker, 1) | ||
var expected = [1, 2, 3, 4] | ||
queue.push(1, done) | ||
queue.push(4, done) | ||
queue.unshift(3, done) | ||
queue.unshift(2, done) | ||
function done (err, result) { | ||
t.error(err, 'no error') | ||
} | ||
function worker (arg, cb) { | ||
t.equal(expected.shift(), arg, 'tasks come in order') | ||
setImmediate(function () { | ||
cb(null, true) | ||
}) | ||
} | ||
}) | ||
test('unshift && empty', function (t) { | ||
t.plan(2) | ||
var queue = buildQueue(worker, 1) | ||
var completed = false | ||
queue.pause() | ||
queue.empty = function () { | ||
t.notOk(completed, 'the task has not completed yet') | ||
} | ||
queue.unshift(1, done) | ||
queue.resume() | ||
function done (err, result) { | ||
completed = true | ||
t.error(err, 'no error') | ||
} | ||
function worker (arg, cb) { | ||
setImmediate(function () { | ||
cb(null, true) | ||
}) | ||
} | ||
}) | ||
test('push && empty', function (t) { | ||
t.plan(2) | ||
var queue = buildQueue(worker, 1) | ||
var completed = false | ||
queue.pause() | ||
queue.empty = function () { | ||
t.notOk(completed, 'the task has not completed yet') | ||
} | ||
queue.push(1, done) | ||
queue.resume() | ||
function done (err, result) { | ||
completed = true | ||
t.error(err, 'no error') | ||
} | ||
function worker (arg, cb) { | ||
setImmediate(function () { | ||
cb(null, true) | ||
}) | ||
} | ||
}) | ||
test('kill', function (t) { | ||
t.plan(5) | ||
var queue = buildQueue(worker, 1) | ||
var expected = [1] | ||
var predrain = queue.drain | ||
queue.drain = function drain () { | ||
t.fail('drain should never be called') | ||
} | ||
queue.push(1, done) | ||
queue.push(4, done) | ||
queue.unshift(3, done) | ||
queue.unshift(2, done) | ||
queue.kill() | ||
function done (err, result) { | ||
t.error(err, 'no error') | ||
setImmediate(function () { | ||
t.equal(queue.length(), 0, 'no queued tasks') | ||
t.equal(queue.running(), 0, 'no running tasks') | ||
t.equal(queue.drain, predrain, 'drain is back to default') | ||
}) | ||
} | ||
function worker (arg, cb) { | ||
t.equal(expected.shift(), arg, 'tasks come in order') | ||
setImmediate(function () { | ||
cb(null, true) | ||
}) | ||
} | ||
}) |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
18458
483
176