drr-fair-queue
Advanced tools
Comparing version 1.0.0 to 1.1.0
19
index.js
@@ -13,4 +13,8 @@ var Deque = require('double-ended-queue'); | ||
this.length = 0; | ||
// A queue of flows that have some data to process | ||
this.activeList = new Deque(); | ||
this.activeFlow = null; | ||
// A map of flows from id -> Flow | ||
@@ -21,3 +25,3 @@ this.flows = {}; | ||
this.onUnidle = function () {}; | ||
this.onUnidle = options.onUnidle || function () {}; | ||
}; | ||
@@ -45,8 +49,10 @@ | ||
// then push the flow into the active list | ||
if (count === 1) { | ||
var activeCount = this.activeList.push(flow); | ||
if (activeCount) { | ||
this.onUnidle(); | ||
} | ||
if (count === 1 && this.activeFlow !== flow) { | ||
this.activeList.push(flow); | ||
} | ||
this.length++; | ||
if (this.length === 1) { | ||
this.onUnidle(); | ||
} | ||
}; | ||
@@ -62,2 +68,3 @@ | ||
queue.shift(); | ||
this.length--; | ||
return next.data; | ||
@@ -64,0 +71,0 @@ } else { |
{ | ||
"name": "drr-fair-queue", | ||
"version": "1.0.0", | ||
"version": "1.1.0", | ||
"description": "An fair queue based using Deficit Round-Robin", | ||
"main": "index.js", | ||
"scripts": { | ||
"test": "echo \"Error: no test specified\" && exit 1" | ||
"test": "if node --version | grep -q '^v0.10.'; then mocha --harmony_proxies --harmony_collections; else mocha --harmony_proxies; fi" | ||
}, | ||
"author": "Matt Lavin <matt.lavin@gmail.com>", | ||
"repository": { | ||
"type" : "git", | ||
"url" : "https://github.com/mdlavin/drr-fair-queue.git" | ||
"type": "git", | ||
"url": "https://github.com/mdlavin/drr-fair-queue.git" | ||
}, | ||
@@ -20,9 +20,8 @@ "license": "ISC", | ||
"chai": "^3.2.0", | ||
"harmony-reflect": "^1.4.4", | ||
"jscoverage": "^0.6.0", | ||
"mocha": "^2.2.5", | ||
"sinon": "^1.17.3", | ||
"uuid": "^2.0.1" | ||
}, | ||
"scripts": { | ||
"test": "mocha" | ||
} | ||
} |
@@ -45,1 +45,37 @@ A fair queue based on Deficit Round Robin | ||
Reacting to queued work | ||
----------------------- | ||
To know when work is ready to be processed, you can register a hook as an option | ||
to the constructor. Here is an example program that watches for work and | ||
processes when it's available: | ||
```js | ||
var FairQueue = require('drr-fair-queue'); | ||
var freeWorkers = 1; | ||
function processWork() { | ||
if (freeWorkers === 0) { | ||
// Skipping work because all workers are busy | ||
return; | ||
} | ||
freeWorkers--; | ||
var work = queue.pop(); | ||
if (!work) { | ||
freeWorkers++; | ||
return; | ||
} else { | ||
// Do something with the work | ||
setImmediate(function () { | ||
console.log('Processing work', work); | ||
freeWorkers++; | ||
processWork(); | ||
}); | ||
} | ||
} | ||
var queue = new FairQueue({onUnidle: processWork}); | ||
queue.push('source1', 'item1', 1); | ||
queue.push('source1', 'item2', 1); | ||
queue.push('source2', 'item3', 1); | ||
``` |
93
test.js
var DRRQueue = require('./index'); | ||
var expect = require('chai').expect; | ||
var uuid = require('uuid'); | ||
var sinon = require('sinon'); | ||
var Reflect = require('harmony-reflect'); | ||
function testInvariants(queue) { | ||
// The activeFlow should not also be in the active list | ||
if (queue.activeFlow) { | ||
for (var i=0;i<queue.activeList.length;i++) { | ||
expect(queue.activeList.get(i)).to.not.equal(queue.activeFlow); | ||
} | ||
} | ||
// The queued count should equal the number of elements in all flows | ||
var actualQueued = 0; | ||
if (queue.activeFlow) { | ||
actualQueued += queue.activeFlow.queue.length; | ||
} | ||
for (var a=0;a<queue.activeList.length;a++) { | ||
actualQueued += queue.activeList.get(a).queue.length; | ||
} | ||
expect(queue.length).to.equal(actualQueued); | ||
} | ||
function newQueue(options) { | ||
var queue = new DRRQueue(options); | ||
var proxy = new Proxy(queue, { | ||
get: function(target, name, receiver) { | ||
var result = Reflect.get(target, name, receiver); | ||
if (typeof(result) === 'function') { | ||
return function() { | ||
var returnValue = result.apply(target, arguments); | ||
testInvariants(target); | ||
return returnValue; | ||
}; | ||
} | ||
return result; | ||
} | ||
}); | ||
return proxy; | ||
} | ||
describe('DRRQueue', function () { | ||
@@ -9,3 +50,3 @@ | ||
it('returns undefined when the queue is empty', function () { | ||
var queue = new DRRQueue(); | ||
var queue = newQueue(); | ||
expect(queue.pop()).to.be.undefined; | ||
@@ -15,3 +56,3 @@ }); | ||
it('returns the only element if there is only one', function () { | ||
var queue = new DRRQueue(); | ||
var queue = newQueue(); | ||
var element = uuid.v4(); | ||
@@ -25,3 +66,3 @@ queue.push('only-flow', element, 1); | ||
function () { | ||
var queue = new DRRQueue(); | ||
var queue = newQueue(); | ||
var element1 = uuid.v4(); | ||
@@ -37,3 +78,3 @@ queue.push('only-flow', element1, 1); | ||
it('returns undefined again after emptying the queue', function () { | ||
var queue = new DRRQueue(); | ||
var queue = newQueue(); | ||
var element = uuid.v4(); | ||
@@ -48,3 +89,3 @@ queue.push('only-flow', element, 1); | ||
function () { | ||
var queue = new DRRQueue({quantumSize: 4}); | ||
var queue = newQueue({quantumSize: 4}); | ||
var element = uuid.v4(); | ||
@@ -59,3 +100,3 @@ queue.push('only-flow', element, 2); | ||
function () { | ||
var queue = new DRRQueue({quantumSize: 4}); | ||
var queue = newQueue({quantumSize: 4}); | ||
var element = uuid.v4(); | ||
@@ -70,3 +111,3 @@ queue.push('only-flow', element, 5); | ||
function () { | ||
var queue = new DRRQueue({quantumSize: 4}); | ||
var queue = newQueue({quantumSize: 4}); | ||
var element = uuid.v4(); | ||
@@ -81,3 +122,3 @@ queue.push('only-flow', element, 0); | ||
function () { | ||
var queue = new DRRQueue(); | ||
var queue = newQueue(); | ||
var element1 = 'element1'; | ||
@@ -100,3 +141,3 @@ queue.push('flow1', element1, 1); | ||
function () { | ||
var queue = new DRRQueue({quantumSize: 2}); | ||
var queue = newQueue({quantumSize: 2}); | ||
var element1 = 'element1'; | ||
@@ -119,3 +160,3 @@ queue.push('flow1', element1, 1); | ||
it('requires that the size be a number', function () { | ||
var queue = new DRRQueue({quantumSize: 4}); | ||
var queue = newQueue({quantumSize: 4}); | ||
var callWithAString = function () { | ||
@@ -130,3 +171,3 @@ queue.push('only-flow', 'element', {test: true}); | ||
it('requires that the size be a finite number', function () { | ||
var queue = new DRRQueue({quantumSize: 4}); | ||
var queue = newQueue({quantumSize: 4}); | ||
var callWithAString = function () { | ||
@@ -139,4 +180,34 @@ queue.push('only-flow', 'element', Number.NaN); | ||
}); | ||
it('calls onUnidle if it\'s the first work pushed', function () { | ||
var onUnidle = sinon.stub(); | ||
var queue = newQueue({onUnidle: onUnidle}); | ||
sinon.assert.notCalled(onUnidle); | ||
queue.push('one', 'element', 1); | ||
sinon.assert.calledOnce(onUnidle); | ||
}); | ||
it('calls onUnidle when work is pushed to a deficited active flow', function () { | ||
var onUnidle = sinon.stub(); | ||
var queue = newQueue({onUnidle: onUnidle, quantumSize: 4}); | ||
sinon.assert.notCalled(onUnidle); | ||
queue.push('one', 'element', 1); | ||
sinon.assert.calledOnce(onUnidle); | ||
queue.pop(); | ||
queue.push('one', 'element 2', 1); | ||
sinon.assert.calledTwice(onUnidle); | ||
}); | ||
it('does not call onUnidle if there is pending work in the same queue', function () { | ||
var onUnidle = sinon.stub(); | ||
var queue = newQueue({onUnidle: onUnidle}); | ||
queue.push('one', 'element', 1); | ||
queue.push('two', 'element 1', 1); | ||
sinon.assert.calledOnce(onUnidle); | ||
}); | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
16452
15
280
81
6