async-pool-cue
Advanced tools
Comparing version 0.9.0 to 0.10.0
18
index.js
@@ -31,3 +31,3 @@ exports.queue = queue; | ||
if (q.saturated && q.running() === q.concurrency) { | ||
if (q.saturated && q.running() === q.concurrency && q.length() == 0) { | ||
q.saturated(); | ||
@@ -42,3 +42,3 @@ } | ||
saturated: null, | ||
empty: null, | ||
space: null, | ||
drain: null, | ||
@@ -58,7 +58,4 @@ started: false, | ||
} | ||
if (workers < q.concurrency && q.tasks.length) { | ||
if (workers < concurrency && q.tasks.length) { | ||
var task = q.tasks.shift(); | ||
if (q.empty && q.tasks.length === 0) { | ||
q.empty(); | ||
} | ||
workers += 1; | ||
@@ -70,6 +67,11 @@ var next = function () { | ||
} | ||
if (q.drain && q.tasks.length + workers === 0) { | ||
var len = q.length(); | ||
if (q.space && len === 0 && workers === (concurrency - 1)) { | ||
q.space(); | ||
} | ||
if (q.drain && len + workers === 0) { | ||
q.drain(); | ||
} | ||
q.process(); | ||
}; | ||
@@ -138,3 +140,3 @@ worker(task.data, onlyOnce(next)); | ||
q.process(); | ||
if (q.saturated && q.running() === q.concurrency) { | ||
if (q.saturated && q.running() === q.concurrency && q.length() == 0) { | ||
q.saturated(); | ||
@@ -141,0 +143,0 @@ } |
{ | ||
"name": "async-pool-cue", | ||
"version": "0.9.0", | ||
"version": "0.10.0", | ||
"description": "Limit concurrency of actions with a queue for Node.js", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -26,3 +26,48 @@ /*eslint-env mocha*/ | ||
}); | ||
it("should emit various events"); | ||
it("should call 'saturated' whenever full", function() { | ||
var tasks = []; | ||
var pending = []; | ||
var q = poolCue.queue(function(task, next) { | ||
tasks.push(task); | ||
pending.push(next); | ||
}, 3); | ||
q.saturated = s.spy(); | ||
q.push(1); | ||
q.push(2); | ||
s.assert.notCalled(q.saturated); | ||
q.push(3); // 3 running 0 queued | ||
s.assert.calledOnce(q.saturated); | ||
q.push(4); // 3 running 1 queued | ||
s.assert.calledOnce(q.saturated); | ||
pending[0](); // 3 running 0 queued | ||
pending[1](); // 2 running 0 queued | ||
s.assert.calledOnce(q.saturated); | ||
q.push(5); // 3 running 0 queued | ||
s.assert.calledTwice(q.saturated); | ||
}); | ||
it("should call 'space' whenever a slot opens up", function() { | ||
var tasks = []; | ||
var pending = []; | ||
var q = poolCue.queue(function(task, next) { | ||
tasks.push(task); | ||
pending.push(next); | ||
}, 3); | ||
q.space = s.spy(); | ||
q.push(1); | ||
q.push(2); | ||
q.push(3); // 3 running 0 queued | ||
q.push(4); // 3 running 1 queued | ||
s.assert.notCalled(q.space); | ||
pending[0](); // 3 running 0 queued | ||
s.assert.notCalled(q.space); | ||
pending[1](); // 2 running 0 queued | ||
s.assert.calledOnce(q.space); | ||
pending[2](); // 1 running 0 queued | ||
s.assert.calledOnce(q.space); | ||
q.push(5); // 2 running 0 queued | ||
q.push(6); // 3 running 0 queued | ||
s.assert.calledOnce(q.space); | ||
pending[3](); | ||
s.assert.calledTwice(q.space); | ||
}); | ||
@@ -88,3 +133,49 @@ it("should pause and resume", function() { | ||
}); | ||
it("should call 'saturated' whenever full", function() { | ||
var tasks = []; | ||
var pending = []; | ||
var q = poolCue.priorityQueue(function(task, next) { | ||
tasks.push(task); | ||
pending.push(next); | ||
}, 3); | ||
q.saturated = s.spy(); | ||
q.push(1, 0); | ||
q.push(2, 0); | ||
s.assert.notCalled(q.saturated); | ||
q.push(3, 0); // 3 running 0 queued | ||
s.assert.calledOnce(q.saturated); | ||
q.push(4, 0); // 3 running 1 queued | ||
s.assert.calledOnce(q.saturated); | ||
pending[0](); // 3 running 0 queued | ||
pending[1](); // 2 running 0 queued | ||
s.assert.calledOnce(q.saturated); | ||
q.push(5, 0); // 3 running 0 queued | ||
s.assert.calledTwice(q.saturated); | ||
}); | ||
it("should call 'space' whenever a slot opens up", function() { | ||
var tasks = []; | ||
var pending = []; | ||
var q = poolCue.priorityQueue(function(task, next) { | ||
tasks.push(task); | ||
pending.push(next); | ||
}, 3); | ||
q.space = s.spy(); | ||
q.push(1, 0); | ||
q.push(2, 0); | ||
q.push(3, 0); // 3 running 0 queued | ||
q.push(4, 0); // 3 running 1 queued | ||
s.assert.notCalled(q.space); | ||
pending[0](); // 3 running 0 queued | ||
s.assert.notCalled(q.space); | ||
pending[1](); // 2 running 0 queued | ||
s.assert.calledOnce(q.space); | ||
pending[2](); // 1 running 0 queued | ||
s.assert.calledOnce(q.space); | ||
q.push(5, 0); // 2 running 0 queued | ||
q.push(6, 0); // 3 running 0 queued | ||
s.assert.calledOnce(q.space); | ||
pending[3](); | ||
s.assert.calledTwice(q.space); | ||
}); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
13752
305