leaky-bucket
Advanced tools
Comparing version 0.1.8 to 0.1.9
@@ -6,2 +6,3 @@ !function() { | ||
, log = require('ee-log') | ||
, Promise = (Promise || require('es6-promise').Promise) | ||
, debug = require('ee-argv').has('debug-leaky-bucket') || process.env['debug-leaky-bucket']; | ||
@@ -38,13 +39,34 @@ | ||
// how long can a item wait before it gets a timeout | ||
// defaults to 5 minutes, 300 seconds | ||
, maxWaitingTime: 300 | ||
// indicates how long the next item has to | ||
// wait until its executed, stored as reserved cost, | ||
// not time | ||
, waitTime: 0 | ||
/** | ||
* class constructor | ||
* | ||
* @apram <integer> bucket capacity per minute | ||
* @param <integer> bucket capacity per minute, or options object | ||
* @param <integer> what time can it take to execute the capacity | ||
* @param <integer> items should not wait longer then n seconds, | ||
* if they do, abort them | ||
*/ | ||
, init: function(capacity, slotSize) { | ||
// optional settings | ||
if (type.number(capacity)) this.capacity = capacity; | ||
if (type.number(slotSize)) this.slotSize = slotSize; | ||
, init: function(capacity, iterval, maxWaitingTime) { | ||
if (type.object(capacity) && capacity != null) { | ||
if (type.number(capacity.capacity)) this.capacity = capacity.capacity; | ||
if (type.number(capacity.iterval)) this.slotSize = capacity.iterval; | ||
if (type.number(capacity.maxWaitingTime)) this.maxWaitingTime = capacity.maxWaitingTime; | ||
} | ||
else { | ||
// optional settings | ||
if (type.number(capacity)) this.capacity = capacity; | ||
if (type.number(iterval)) this.slotSize = iterval; | ||
if (type.number(maxWaitingTime)) this.maxWaitingTime = maxWaitingTime; | ||
} | ||
@@ -63,12 +85,9 @@ // compute the refillrate in tokens / second | ||
/** | ||
* throtthle a function | ||
* throttle a function | ||
* | ||
* @param <integer|function> the cost of the operation (defaults to 1) or callabck | ||
* @param <function> optional callback | ||
* @param <boolean> true if the items source is the queue (internal use only) | ||
*/ | ||
, throttle: function(cost, cb, fromQueue) { | ||
var now = Date.now() | ||
, item; | ||
, throttle: function(cost, cb) { | ||
// check the input | ||
@@ -79,3 +98,31 @@ if (type.function(cost)) { | ||
} | ||
else if (!type.number(cost)) cost = 1; | ||
// working with callbacks or promises? | ||
if (type.function(cb)) this._throttle(cost, cb, false); | ||
else { | ||
return new Promise(function(resolve, reject) { | ||
this._throttle(cost, function(err) { | ||
if (err) reject(err); | ||
else resolve(); | ||
}.bind(this), false); | ||
}.bind(this)); | ||
} | ||
} | ||
/** | ||
* private throttle method, works only with callbacks. | ||
* the public interface works with promises too. | ||
* | ||
* @param <integer> the cost of the operation | ||
* @param <function> callback | ||
* @param <boolean> true if the items source is the queue | ||
*/ | ||
, _throttle: function(cost, cb, fromQueue) { | ||
var now = Date.now() | ||
, waitTime | ||
, item; | ||
// refill | ||
@@ -89,3 +136,3 @@ this.left += Math.min((now-this.last)/1000/this.refillRate, this.capacity); | ||
// do we have enough capacity? | ||
// do we have enough capacity to execute now? | ||
if (this.left >= cost) { | ||
@@ -97,2 +144,5 @@ if (debug) log.debug('Executing item with a cost of %s item(s) ...', cost); | ||
// remove from wait time if the item has come from the queue | ||
if (fromQueue) this.waitTime -= cost; | ||
// execute | ||
@@ -102,13 +152,29 @@ cb(); | ||
else { | ||
if (debug) log.debug('Adding item with a cost of %s item(s) to the queue ...', cost); | ||
// we need to compute the time until this item will be executed, | ||
// if the timeout time is exceeded don't queue bit return an errror | ||
waitTime = (this.waitTime*(1000/this.refillRate)-(now-this.last))/1000; | ||
// add to queue, to the beginning if | ||
// the source was the queue itself | ||
item = { | ||
cb: cb | ||
, cost: cost | ||
}; | ||
if (waitTime > this.maxWaitingTime) { | ||
if (debug) log.debug('Rejectin gitme because its waiting time %s exceeds the max waiting time of %s ...', waitTime, this.maxWaitingTime); | ||
if (fromQueue) this.queue.unshift(item); | ||
else this.queue.push(item); | ||
cb(new Error('Timeout exceeded, too many waiting requests! Would take '+waitTime+' seconds to complete, the max waiting time is '+this.maxWaitingTime+'!')); | ||
} | ||
else { | ||
if (debug) log.debug('Adding item with a cost of %s item(s) to the queue ...', cost); | ||
// add to queue, to the beginning if | ||
// the source was the queue itself | ||
item = { | ||
cb: cb | ||
, cost: cost | ||
}; | ||
// increase the wait time variable so we can copute the exact time | ||
// that it will takr to execute the last added item | ||
this.waitTime += cost; | ||
// queu ad the end if the item was added by the user | ||
if (fromQueue) this.queue.unshift(item); | ||
else this.queue.push(item); | ||
} | ||
} | ||
@@ -126,3 +192,3 @@ | ||
this.timer = null; | ||
this.throttle(queuedItem.cost, queuedItem.cb, true); | ||
this._throttle(queuedItem.cost, queuedItem.cb, true); | ||
}.bind(this), Math.round((1-this.left)*this.refillRate*1000)); | ||
@@ -129,0 +195,0 @@ } |
{ | ||
"name" : "leaky-bucket" | ||
, "description" : "A fast and efficient leaky bucket implementation" | ||
, "version" : "0.1.8" | ||
, "version" : "0.1.9" | ||
, "homepage" : "https://github.com/eventEmitter/leaky-bucket" | ||
@@ -23,2 +23,3 @@ , "author" : "Michael van der Weg <michael@joinbox.com> (http://joinbox.com/)" | ||
, "ee-argv" : "0.1.x" | ||
, "es6-promise" : "2.0.x" | ||
} | ||
@@ -25,0 +26,0 @@ , "devDependencies": { |
@@ -23,5 +23,5 @@ # leaky-bucket | ||
The constructor accpets two parameter, both are optional | ||
The constructor accepts three parameters, all are optional | ||
var instance = new LeakyBucket([ItemsPerInterval = 60][, Interval = 60]); | ||
var instance = new LeakyBucket([capacity = 60][, Interval = 60][, maxWaitingTime = 300]); | ||
@@ -39,7 +39,27 @@ | ||
Create a new leaky bucket which is allowed to execute 200 items every 30 seconds with a maxWaitingTime of 60 seconds | ||
var bucket = new LeakyBucket(200, 30, 60); | ||
You may also use an options object instead of the eparameters | ||
var bucket = new LeakyBucket({ | ||
capacity: 200 // items per interval, defaults to 60 | ||
, iterval: 30 // seconds, defaults to 60 | ||
, maxWaitingTime: 60 // seconds, defaults to 300 | ||
}); | ||
### Throttling | ||
The throttle accepts two parameters, of which the first is optional | ||
The throttle accepts two parameters, of which both are optional | ||
- The first parameter can either be a callback function or the cost of the opertion | ||
- The seocnd parameter can be the callback function | ||
If you do not pass a callabck a promise is returned. The first argument of the callback is an error object (or the promise fails) if the item could not be executed becuas the max waiting time was exceeded. | ||
bucktet.throttle([cost], callback); | ||
@@ -52,3 +72,3 @@ | ||
bucket.throttle(function() { | ||
bucket.throttle(function(err) { | ||
// do something | ||
@@ -60,3 +80,3 @@ }); | ||
bucket.throttle(10, function() { | ||
bucket.throttle(10, function(err) { | ||
// do something | ||
@@ -66,2 +86,11 @@ }); | ||
Throttle an using Promises | ||
bucket.throttle().then(function() { | ||
// ok, do your stuff ... | ||
}).catch(function(err) { | ||
// max waiting time exceeded, dont execute anythig | ||
}); | ||
### Flags | ||
@@ -76,3 +105,3 @@ | ||
Rate limit API calls, allowed are no more than 60 requests per second | ||
Rate limit API calls on the client side, allowed are no more than 60 requests per minute | ||
@@ -98,2 +127,23 @@ var LeakyBucket = require('leaky-bucket') | ||
}); | ||
}); | ||
Rate limit API calls on the server side, allowed are no more than 60 requests per minute | ||
var LeakyBucket = require('leaky-bucket') | ||
, request = require('request') | ||
, bucket; | ||
// create bucket instance, 60 request per minute, max waiting time = 0 | ||
bucket = new LeakyBucket(60, 60, 0); | ||
// this let pass all request that are within the liimt and fail all that | ||
// exceed it | ||
bucket.throttle(function(err) { | ||
if (err) response.send(429, 'too many requests!'); | ||
else response.send(200, '{id:4, ...}') | ||
}); |
@@ -166,3 +166,84 @@ | ||
}); | ||
it('should abort items that exceed the max waiting time', function(done) { | ||
var start = Date.now() | ||
, executed = 0 | ||
, maxTime = 11500 | ||
, minTime = 10500 | ||
, capacity = 60 | ||
, items = 100 | ||
, iterator = items | ||
, cb, bucket; | ||
// wait fo rthe bucket | ||
this.timeout(15000); | ||
cb = function() { | ||
var duration; | ||
if (++executed === items) { | ||
duration = Date.now()-start; | ||
assert(duration>=minTime, 'The leaky bucket finished too soon ('+duration+' < '+minTime+') ...'); | ||
assert(duration<maxTime, 'The leaky bucket finished too late ('+duration+' > '+maxTime+') ...'); | ||
done(); | ||
} | ||
} | ||
bucket = new LeakyBucket(capacity, 60, 10); | ||
while(iterator--) bucket.throttle(cb); | ||
}); | ||
it('should work using promises', function(done) { | ||
var start = Date.now() | ||
, executed = 0 | ||
, maxTime = 11500 | ||
, minTime = 10500 | ||
, capacity = 60 | ||
, items = 100 | ||
, errCount = 0 | ||
, expectedErrCount = 29 | ||
, iterator = items | ||
, cb, bucket; | ||
// wait fo rthe bucket | ||
this.timeout(15000); | ||
cb = function(err) { | ||
var duration; | ||
if (err) errCount++; | ||
if (++executed === items) { | ||
duration = Date.now()-start; | ||
assert(duration>=minTime, 'The leaky bucket finished too soon ('+duration+' < '+minTime+') ...'); | ||
assert(duration<maxTime, 'The leaky bucket finished too late ('+duration+' > '+maxTime+') ...'); | ||
assert(errCount===expectedErrCount, 'The leaky bucket should have emitted '+errCount+' errros, it emitted '+expectedErrCount+' errors...'); | ||
done(); | ||
} | ||
} | ||
bucket = new LeakyBucket(capacity, 60, 10); | ||
while(iterator--) bucket.throttle().then(cb).catch(cb); | ||
}); | ||
}); | ||
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
20130
9
305
142
5
+ Addedes6-promise@2.0.x
+ Addedes6-promise@2.0.1(transitive)