Comparing version 1.16.0 to 1.17.0
{ | ||
"name": "fastq", | ||
"version": "1.16.0", | ||
"version": "1.17.0", | ||
"description": "Fast, in memory work queue", | ||
@@ -5,0 +5,0 @@ "main": "queue.js", |
40
queue.js
@@ -7,5 +7,5 @@ 'use strict' | ||
function fastqueue (context, worker, concurrency) { | ||
function fastqueue (context, worker, _concurrency) { | ||
if (typeof context === 'function') { | ||
concurrency = worker | ||
_concurrency = worker | ||
worker = context | ||
@@ -15,4 +15,4 @@ context = null | ||
if (concurrency < 1) { | ||
throw new Error('fastqueue concurrency must be greater than 1') | ||
if (!(_concurrency >= 1)) { | ||
throw new Error('fastqueue concurrency must be equal to or greater than 1') | ||
} | ||
@@ -32,3 +32,19 @@ | ||
paused: false, | ||
concurrency: concurrency, | ||
get concurrency () { | ||
return _concurrency | ||
}, | ||
set concurrency (value) { | ||
if (!(value >= 1)) { | ||
throw new Error('fastqueue concurrency must be equal to or greater than 1') | ||
} | ||
_concurrency = value | ||
if (self.paused) return | ||
for (; queueHead && _running < _concurrency;) { | ||
_running++ | ||
release() | ||
} | ||
}, | ||
running: running, | ||
@@ -83,3 +99,3 @@ resume: resume, | ||
self.paused = false | ||
for (var i = 0; i < self.concurrency; i++) { | ||
for (; queueHead && _running < _concurrency;) { | ||
_running++ | ||
@@ -103,3 +119,3 @@ release() | ||
if (_running === self.concurrency || self.paused) { | ||
if (_running >= _concurrency || self.paused) { | ||
if (queueTail) { | ||
@@ -128,3 +144,3 @@ queueTail.next = current | ||
if (_running === self.concurrency || self.paused) { | ||
if (_running >= _concurrency || self.paused) { | ||
if (queueHead) { | ||
@@ -149,3 +165,3 @@ current.next = queueHead | ||
var next = queueHead | ||
if (next) { | ||
if (next && _running <= _concurrency) { | ||
if (!self.paused) { | ||
@@ -213,5 +229,5 @@ if (queueTail === queueHead) { | ||
function queueAsPromised (context, worker, concurrency) { | ||
function queueAsPromised (context, worker, _concurrency) { | ||
if (typeof context === 'function') { | ||
concurrency = worker | ||
_concurrency = worker | ||
worker = context | ||
@@ -228,3 +244,3 @@ context = null | ||
var queue = fastqueue(context, asyncWrapper, concurrency) | ||
var queue = fastqueue(context, asyncWrapper, _concurrency) | ||
@@ -231,0 +247,0 @@ var pushCb = queue.push |
@@ -9,6 +9,18 @@ 'use strict' | ||
test('concurrency', function (t) { | ||
t.plan(2) | ||
t.plan(6) | ||
t.throws(buildQueue.bind(null, worker, 0)) | ||
t.throws(buildQueue.bind(null, worker, NaN)) | ||
t.doesNotThrow(buildQueue.bind(null, worker, 1)) | ||
var queue = buildQueue(worker, 1) | ||
t.throws(function () { | ||
queue.concurrency = 0 | ||
}) | ||
t.throws(function () { | ||
queue.concurrency = NaN | ||
}) | ||
t.doesNotThrow(function () { | ||
queue.concurrency = 2 | ||
}) | ||
function worker (arg, cb) { | ||
@@ -141,6 +153,7 @@ cb(null, true) | ||
test('pause && resume', function (t) { | ||
t.plan(7) | ||
t.plan(13) | ||
var queue = buildQueue(worker, 1) | ||
var worked = false | ||
var expected = [42, 24] | ||
@@ -156,2 +169,7 @@ t.notOk(queue.paused, 'it should not be paused') | ||
queue.push(24, function (err, result) { | ||
t.error(err, 'no error') | ||
t.equal(result, true, 'result matches') | ||
}) | ||
t.notOk(worked, 'it should be paused') | ||
@@ -161,10 +179,12 @@ t.ok(queue.paused, 'it should be paused') | ||
queue.resume() | ||
queue.pause() | ||
queue.resume() | ||
queue.resume() // second resume is a no-op | ||
t.notOk(queue.paused, 'it should not be paused') | ||
function worker (arg, cb) { | ||
t.equal(arg, 42) | ||
t.notOk(queue.paused, 'it should not be paused') | ||
t.ok(queue.running() <= queue.concurrency, 'should respect the concurrency') | ||
t.equal(arg, expected.shift()) | ||
worked = true | ||
cb(null, true) | ||
process.nextTick(function () { cb(null, true) }) | ||
} | ||
@@ -174,6 +194,6 @@ }) | ||
test('pause in flight && resume', function (t) { | ||
t.plan(9) | ||
t.plan(16) | ||
var queue = buildQueue(worker, 1) | ||
var expected = [42, 24] | ||
var expected = [42, 24, 12] | ||
@@ -186,3 +206,7 @@ t.notOk(queue.paused, 'it should not be paused') | ||
t.ok(queue.paused, 'it should be paused') | ||
process.nextTick(function () { queue.resume() }) | ||
process.nextTick(function () { | ||
queue.resume() | ||
queue.pause() | ||
queue.resume() | ||
}) | ||
}) | ||
@@ -196,5 +220,12 @@ | ||
queue.push(12, function (err, result) { | ||
t.error(err, 'no error') | ||
t.equal(result, true, 'result matches') | ||
t.notOk(queue.paused, 'it should not be paused') | ||
}) | ||
queue.pause() | ||
function worker (arg, cb) { | ||
t.ok(queue.running() <= queue.concurrency, 'should respect the concurrency') | ||
t.equal(arg, expected.shift()) | ||
@@ -206,12 +237,13 @@ process.nextTick(function () { cb(null, true) }) | ||
test('altering concurrency', function (t) { | ||
t.plan(7) | ||
t.plan(24) | ||
var queue = buildQueue(worker, 1) | ||
var count = 0 | ||
queue.pause() | ||
queue.push(24, workDone) | ||
queue.push(24, workDone) | ||
queue.push(24, workDone) | ||
queue.pause() | ||
queue.concurrency = 3 // concurrency changes are ignored while paused | ||
queue.concurrency = 2 | ||
@@ -223,2 +255,15 @@ | ||
queue.concurrency = 3 | ||
t.equal(queue.running(), 3, '3 jobs running') | ||
queue.concurrency = 1 | ||
t.equal(queue.running(), 3, '3 jobs running') // running jobs can't be killed | ||
queue.push(24, workDone) | ||
queue.push(24, workDone) | ||
queue.push(24, workDone) | ||
queue.push(24, workDone) | ||
function workDone (err, result) { | ||
@@ -230,5 +275,4 @@ t.error(err, 'no error') | ||
function worker (arg, cb) { | ||
t.equal(0, count, 'works in parallel') | ||
t.ok(queue.running() <= queue.concurrency, 'should respect the concurrency') | ||
setImmediate(function () { | ||
count++ | ||
cb(null, true) | ||
@@ -235,0 +279,0 @@ }) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
41721
1100