Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

drr-fair-queue

Package Overview
Dependencies
Maintainers
1
Versions
4
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

drr-fair-queue - npm Package Compare versions

Comparing version 1.0.0 to 1.1.0

example.js~

19

index.js

@@ -13,4 +13,8 @@ var Deque = require('double-ended-queue');

this.length = 0;
// A queue of flows that have some data to process
this.activeList = new Deque();
this.activeFlow = null;
// A map of flows from id -> Flow

@@ -21,3 +25,3 @@ this.flows = {};

this.onUnidle = function () {};
this.onUnidle = options.onUnidle || function () {};
};

@@ -45,8 +49,10 @@

// then push the flow into the active list
if (count === 1) {
var activeCount = this.activeList.push(flow);
if (activeCount) {
this.onUnidle();
}
if (count === 1 && this.activeFlow !== flow) {
this.activeList.push(flow);
}
this.length++;
if (this.length === 1) {
this.onUnidle();
}
};

@@ -62,2 +68,3 @@

queue.shift();
this.length--;
return next.data;

@@ -64,0 +71,0 @@ } else {

{
"name": "drr-fair-queue",
"version": "1.0.0",
"version": "1.1.0",
"description": "An fair queue based using Deficit Round-Robin",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
"test": "if node --version | grep -q '^v0.10.'; then mocha --harmony_proxies --harmony_collections; else mocha --harmony_proxies; fi"
},
"author": "Matt Lavin <matt.lavin@gmail.com>",
"repository": {
"type" : "git",
"url" : "https://github.com/mdlavin/drr-fair-queue.git"
"type": "git",
"url": "https://github.com/mdlavin/drr-fair-queue.git"
},

@@ -20,9 +20,8 @@ "license": "ISC",

"chai": "^3.2.0",
"harmony-reflect": "^1.4.4",
"jscoverage": "^0.6.0",
"mocha": "^2.2.5",
"sinon": "^1.17.3",
"uuid": "^2.0.1"
},
"scripts": {
"test": "mocha"
}
}

@@ -45,1 +45,37 @@ A fair queue based on Deficit Round Robin

Reacting to queued work
-----------------------
To know when work is ready to be processed, you can register a hook as an option
to the constructor. Here is an example program that watches for work and
processes when it's available:
```js
var FairQueue = require('drr-fair-queue');
var freeWorkers = 1;
function processWork() {
if (freeWorkers === 0) {
// Skipping work because all workers are busy
return;
}
freeWorkers--;
var work = queue.pop();
if (!work) {
freeWorkers++;
return;
} else {
// Do something with the work
setImmediate(function () {
console.log('Processing work', work);
freeWorkers++;
processWork();
});
}
}
var queue = new FairQueue({onUnidle: processWork});
queue.push('source1', 'item1', 1);
queue.push('source1', 'item2', 1);
queue.push('source2', 'item3', 1);
```
var DRRQueue = require('./index');
var expect = require('chai').expect;
var uuid = require('uuid');
var sinon = require('sinon');
var Reflect = require('harmony-reflect');
function testInvariants(queue) {
// The activeFlow should not also be in the active list
if (queue.activeFlow) {
for (var i=0;i<queue.activeList.length;i++) {
expect(queue.activeList.get(i)).to.not.equal(queue.activeFlow);
}
}
// The queued count should equal the number of elements in all flows
var actualQueued = 0;
if (queue.activeFlow) {
actualQueued += queue.activeFlow.queue.length;
}
for (var a=0;a<queue.activeList.length;a++) {
actualQueued += queue.activeList.get(a).queue.length;
}
expect(queue.length).to.equal(actualQueued);
}
function newQueue(options) {
var queue = new DRRQueue(options);
var proxy = new Proxy(queue, {
get: function(target, name, receiver) {
var result = Reflect.get(target, name, receiver);
if (typeof(result) === 'function') {
return function() {
var returnValue = result.apply(target, arguments);
testInvariants(target);
return returnValue;
};
}
return result;
}
});
return proxy;
}
describe('DRRQueue', function () {

@@ -9,3 +50,3 @@

it('returns undefined when the queue is empty', function () {
var queue = new DRRQueue();
var queue = newQueue();
expect(queue.pop()).to.be.undefined;

@@ -15,3 +56,3 @@ });

it('returns the only element if there is only one', function () {
var queue = new DRRQueue();
var queue = newQueue();
var element = uuid.v4();

@@ -25,3 +66,3 @@ queue.push('only-flow', element, 1);

function () {
var queue = new DRRQueue();
var queue = newQueue();
var element1 = uuid.v4();

@@ -37,3 +78,3 @@ queue.push('only-flow', element1, 1);

it('returns undefined again after emptying the queue', function () {
var queue = new DRRQueue();
var queue = newQueue();
var element = uuid.v4();

@@ -48,3 +89,3 @@ queue.push('only-flow', element, 1);

function () {
var queue = new DRRQueue({quantumSize: 4});
var queue = newQueue({quantumSize: 4});
var element = uuid.v4();

@@ -59,3 +100,3 @@ queue.push('only-flow', element, 2);

function () {
var queue = new DRRQueue({quantumSize: 4});
var queue = newQueue({quantumSize: 4});
var element = uuid.v4();

@@ -70,3 +111,3 @@ queue.push('only-flow', element, 5);

function () {
var queue = new DRRQueue({quantumSize: 4});
var queue = newQueue({quantumSize: 4});
var element = uuid.v4();

@@ -81,3 +122,3 @@ queue.push('only-flow', element, 0);

function () {
var queue = new DRRQueue();
var queue = newQueue();
var element1 = 'element1';

@@ -100,3 +141,3 @@ queue.push('flow1', element1, 1);

function () {
var queue = new DRRQueue({quantumSize: 2});
var queue = newQueue({quantumSize: 2});
var element1 = 'element1';

@@ -119,3 +160,3 @@ queue.push('flow1', element1, 1);

it('requires that the size be a number', function () {
var queue = new DRRQueue({quantumSize: 4});
var queue = newQueue({quantumSize: 4});
var callWithAString = function () {

@@ -130,3 +171,3 @@ queue.push('only-flow', 'element', {test: true});

it('requires that the size be a finite number', function () {
var queue = new DRRQueue({quantumSize: 4});
var queue = newQueue({quantumSize: 4});
var callWithAString = function () {

@@ -139,4 +180,34 @@ queue.push('only-flow', 'element', Number.NaN);

});
it('calls onUnidle if it\'s the first work pushed', function () {
var onUnidle = sinon.stub();
var queue = newQueue({onUnidle: onUnidle});
sinon.assert.notCalled(onUnidle);
queue.push('one', 'element', 1);
sinon.assert.calledOnce(onUnidle);
});
it('calls onUnidle when work is pushed to a deficited active flow', function () {
var onUnidle = sinon.stub();
var queue = newQueue({onUnidle: onUnidle, quantumSize: 4});
sinon.assert.notCalled(onUnidle);
queue.push('one', 'element', 1);
sinon.assert.calledOnce(onUnidle);
queue.pop();
queue.push('one', 'element 2', 1);
sinon.assert.calledTwice(onUnidle);
});
it('does not call onUnidle if there is pending work in the same queue', function () {
var onUnidle = sinon.stub();
var queue = newQueue({onUnidle: onUnidle});
queue.push('one', 'element', 1);
queue.push('two', 'element 1', 1);
sinon.assert.calledOnce(onUnidle);
});
});
});

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc