bottleneckp
Advanced tools
Comparing version 1.0.3 to 1.1.0
@@ -60,3 +60,3 @@ var PriorityQueue = function(size) { | ||
function Bottleneck(maxConcurrent, rateLimit, priorityRange, defaultPriority) { | ||
function Bottleneck(maxConcurrent, rateLimit, priorityRange, defaultPriority, cluster) { | ||
if(isNaN(maxConcurrent) || isNaN(rateLimit)) { | ||
@@ -78,2 +78,3 @@ throw "maxConcurrent and rateLimit must be numbers"; | ||
this.cluster = cluster; | ||
this.rateLimit = parseInt(rateLimit); | ||
@@ -88,2 +89,6 @@ this.maxConcurrent = this.rateLimit ? 1 : parseInt(maxConcurrent); | ||
Bottleneck.prototype.setName = function(name) { | ||
this.name = name; | ||
} | ||
Bottleneck.prototype.submit = function(options, clientCallback) { | ||
@@ -106,7 +111,9 @@ var self = this; | ||
var self = this; | ||
if(self._tasksRunning < self.maxConcurrent && self._waitingClients.size()) { | ||
if(self._tasksRunning < self.maxConcurrent && self.hasWaitingClients()) { | ||
++self._tasksRunning; | ||
var wait = Math.max(this._nextRequest - Date.now(), 0); | ||
var wait = Math.max(self._nextRequest - Date.now(), 0); | ||
self._nextRequest = Date.now() + wait + self.rateLimit; | ||
var next = self._waitingClients.dequeue(); | ||
var obj = self.dequeue(); | ||
var next = obj.next; | ||
var limiter = obj.limiter; | ||
setTimeout(function(){ | ||
@@ -117,3 +124,3 @@ var done = function() { | ||
} | ||
next(done); | ||
next(done, limiter); | ||
}, wait); | ||
@@ -124,4 +131,24 @@ } | ||
Bottleneck.prototype.hasWaitingClients = function() { | ||
if(this._waitingClients.size()) { | ||
return true; | ||
} | ||
if(this.cluster && this.cluster._waitingClients()) { | ||
return true; | ||
} | ||
return false; | ||
} | ||
Bottleneck.prototype.dequeue = function() { | ||
if(this._waitingClients.size()) { | ||
return { | ||
next: this._waitingClients.dequeue(), | ||
limiter: null | ||
}; | ||
} | ||
return this.cluster.dequeue(this.name); | ||
} | ||
Bottleneck.Cluster = Bottleneck.prototype.Cluster = require("./Cluster"); | ||
module.exports = Bottleneck; | ||
module.exports = Bottleneck; |
hasProp = {}.hasOwnProperty; | ||
var Cluster = function(maxConcurrent, rateLimit, priorityRange) { | ||
var Cluster = function(maxConcurrent, rateLimit, priorityRange, defaultPriority, homogeneous) { | ||
this.maxConcurrent = maxConcurrent; | ||
this.rateLimit = rateLimit; | ||
this.priorityRange = priorityRange; | ||
this.defaultPrioty = defaultPriority; | ||
this.homogeneous = homogeneous ? true : false; | ||
this.limiters = {}; | ||
@@ -16,3 +18,7 @@ this.Bottleneck = require("./Bottleneck"); | ||
} | ||
return (ref = this.limiters[key]) != null ? ref : (this.limiters[key] = new this.Bottleneck(this.maxConcurrent, this.rateLimit, this.priorityRange)); | ||
if((ref = this.limiters[key]) == null) { | ||
ref = this.limiters[key] = new this.Bottleneck(this.maxConcurrent, this.rateLimit, this.priorityRange, this.defaultPriority, this.homogeneous ? this : null); | ||
ref.setName(key); | ||
} | ||
return ref; | ||
}; | ||
@@ -43,2 +49,36 @@ | ||
Cluster.prototype._waitingClients = function() { | ||
var count = 0; | ||
var keys = this.keys(); | ||
keys.forEach(function(key) { | ||
count += this.limiters[key]._waitingClients.size(); | ||
}, this); | ||
return count; | ||
}; | ||
Cluster.prototype.dequeue = function(name) { | ||
var keys = this.keys(); | ||
for(var i = 0; i < keys.length; ++i) { | ||
if(this.limiters[keys[i]]._waitingClients.size()) { | ||
return { | ||
next: this.limiters[keys[i]]._waitingClients.dequeue(), | ||
limiter: name | ||
}; | ||
} | ||
} | ||
} | ||
Cluster.prototype.status = function() { | ||
var status = []; | ||
var keys = this.keys(); | ||
keys.forEach(function(key) { | ||
status.push([ | ||
'key: '+key, | ||
'running: '+this.limiters[key]._tasksRunning, | ||
'waiting: '+this.limiters[key]._waitingClients.size() | ||
].join()); | ||
}, this); | ||
return status.join(';'); | ||
} | ||
Cluster.prototype.startAutoCleanup = function() { | ||
@@ -70,2 +110,2 @@ var base; | ||
module.exports = Cluster; | ||
module.exports = Cluster; |
{ | ||
"name": "bottleneckp", | ||
"version": "1.0.3", | ||
"version": "1.1.0", | ||
"description": "asynchronous rate limiter with priority", | ||
@@ -5,0 +5,0 @@ "main": "./lib/Bottleneck.js", |
@@ -114,3 +114,25 @@ describe('General', function () { | ||
}) | ||
describe('homogeneous', function() { | ||
it('should share tasks', function(done) { | ||
var c = clusterTest(1, 0, 10, 5, true) | ||
c.cluster.key('1').submit(5, c.slowJob('1', 1)) | ||
c.cluster.key('1').submit(5, c.fastJob('1', 2)) | ||
c.cluster.key('1').submit(5, c.fastJob('1', 3)) | ||
c.cluster.key('2').submit(5, c.fastJob('2', 4)) | ||
console.assert(c.size() === 2) | ||
c.cluster.key('1').submit(5, c.last('1', 5, { | ||
checkResults:[ | ||
{in:'2',out:null,value:4}, | ||
{in:'1',out:'2',value:2}, | ||
{in:'1',out:'2',value:3}, | ||
{in:'1',out:null,value:1}, | ||
{in:'1',out:'2',value:5} | ||
], | ||
checkDuration:800, | ||
done:done | ||
})) | ||
}) | ||
}) | ||
}) | ||
}) |
global.TEST = true | ||
global.Bottleneck = require('../lib/Bottleneck.js') | ||
global.makeTest = function (arg1, arg2, arg3) { | ||
global.Cluster = require('../lib/Cluster.js') | ||
global.makeTest = function (arg1, arg2, arg3, arg4) { | ||
var start = Date.now() | ||
var calls = [] | ||
var limiter = new Bottleneck(arg1, arg2, arg3) | ||
var limiter = new Bottleneck(arg1, arg2, arg3, arg4) | ||
var getResults = function () { | ||
@@ -54,3 +55,69 @@ return { | ||
} | ||
global.clusterTest = function(arg1, arg2, arg3, arg4, arg5) { | ||
var start = Date.now() | ||
var calls = [] | ||
var cluster = new Cluster(arg1, arg2, arg3, arg4, arg5) | ||
var getResults = function () { | ||
return { | ||
elapsed: Date.now() - start, | ||
callsDuration: calls[calls.length - 1].time, | ||
calls: calls | ||
} | ||
} | ||
var context = { | ||
cluster:cluster, | ||
size: function() { | ||
return cluster._waitingClients() | ||
}, | ||
fastJob: function(inLimiter, value) { | ||
return function(cb, outLimiter) { | ||
setTimeout(function() { | ||
calls.push({in:inLimiter,out:outLimiter,value:value,time:Date.now()-start}) | ||
cb() | ||
}, 100) | ||
} | ||
}, | ||
slowJob: function(inLimiter, value) { | ||
return function(cb, outLimiter) { | ||
setTimeout(function() { | ||
calls.push({in:inLimiter,out:outLimiter,value:value,time:Date.now()-start}) | ||
cb() | ||
}, 500) | ||
} | ||
}, | ||
last: function(inLimiter, value, check) { | ||
return function(cb, outLimiter) { | ||
setTimeout(function() { | ||
calls.push({in:inLimiter,out:outLimiter,value:value,time:Date.now()-start}) | ||
cb() | ||
console.assert(cluster._waitingClients() === 0) | ||
if(check.checkResults) | ||
context.checkResultsOrder(check.checkResults) | ||
if(check.checkDuration) | ||
context.checkDuration(check.checkDuration) | ||
check.done() | ||
}, 500) | ||
} | ||
}, | ||
checkResultsOrder: function (order) { | ||
for(var i = 0; i < order.length; i++) { | ||
console.assert(order[i].in == calls[i].in) | ||
console.assert(order[i].out == calls[i].out) | ||
console.assert(order[i].value == calls[i].value) | ||
} | ||
}, | ||
checkDuration: function (shouldBe) { | ||
var results = getResults() | ||
var min = shouldBe - 10 | ||
var max = shouldBe + 50 | ||
console.assert(results.callsDuration > min) | ||
console.assert(results.callsDuration < max) | ||
} | ||
} | ||
return context | ||
} | ||
var fs = require('fs') | ||
@@ -57,0 +124,0 @@ var files = fs.readdirSync('./test') |
17558
478