Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

leaky-bucket

Package Overview
Dependencies
Maintainers
1
Versions
36
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

leaky-bucket - npm Package Compare versions

Comparing version 0.1.8 to 0.1.9

localTest.js

112

lib/LeakyBucket.js

@@ -6,2 +6,3 @@ !function() {

, log = require('ee-log')
, Promise = (Promise || require('es6-promise').Promise)
, debug = require('ee-argv').has('debug-leaky-bucket') || process.env['debug-leaky-bucket'];

@@ -38,13 +39,34 @@

// how long can a item wait before it gets a timeout
// defaults to 5 minutes, 300 seconds
, maxWaitingTime: 300
// indicates how long the next item has to
// wait until its executed, stored as reserved cost,
// not time
, waitTime: 0
/**
* class constructor
*
* @apram <integer> bucket capacity per minute
* @param <integer> bucket capacity per minute, or options object
* @param <integer> what time can it take to execute the capacity
* @param <integer> items should not wait longer then n seconds,
* if they do, abort them
*/
, init: function(capacity, slotSize) {
// optional settings
if (type.number(capacity)) this.capacity = capacity;
if (type.number(slotSize)) this.slotSize = slotSize;
, init: function(capacity, iterval, maxWaitingTime) {
if (type.object(capacity) && capacity != null) {
if (type.number(capacity.capacity)) this.capacity = capacity.capacity;
if (type.number(capacity.iterval)) this.slotSize = capacity.iterval;
if (type.number(capacity.maxWaitingTime)) this.maxWaitingTime = capacity.maxWaitingTime;
}
else {
// optional settings
if (type.number(capacity)) this.capacity = capacity;
if (type.number(iterval)) this.slotSize = iterval;
if (type.number(maxWaitingTime)) this.maxWaitingTime = maxWaitingTime;
}

@@ -63,12 +85,9 @@ // compute the refillrate in tokens / second

/**
* throtthle a function
* throttle a function
*
* @param <integer|function> the cost of the operation (defaults to 1) or callabck
* @param <function> optional callback
* @param <boolean> true if the items source is the queue (internal use only)
*/
, throttle: function(cost, cb, fromQueue) {
var now = Date.now()
, item;
, throttle: function(cost, cb) {
// check the input

@@ -79,3 +98,31 @@ if (type.function(cost)) {

}
else if (!type.number(cost)) cost = 1;
// working with callbacks or promises?
if (type.function(cb)) this._throttle(cost, cb, false);
else {
return new Promise(function(resolve, reject) {
this._throttle(cost, function(err) {
if (err) reject(err);
else resolve();
}.bind(this), false);
}.bind(this));
}
}
/**
* private throttle method, works only with callbacks.
* the public interface works with promises too.
*
* @param <integer> the cost of the operation
* @param <function> callback
* @param <boolean> true if the items source is the queue
*/
, _throttle: function(cost, cb, fromQueue) {
var now = Date.now()
, waitTime
, item;
// refill

@@ -89,3 +136,3 @@ this.left += Math.min((now-this.last)/1000/this.refillRate, this.capacity);

// do we have enough capacity?
// do we have enough capacity to execute now?
if (this.left >= cost) {

@@ -97,2 +144,5 @@ if (debug) log.debug('Executing item with a cost of %s item(s) ...', cost);

// remove from wait time if the item has come from the queue
if (fromQueue) this.waitTime -= cost;
// execute

@@ -102,13 +152,29 @@ cb();

else {
if (debug) log.debug('Adding item with a cost of %s item(s) to the queue ...', cost);
// we need to compute the time until this item will be executed,
// if the timeout time is exceeded don't queue bit return an errror
waitTime = (this.waitTime*(1000/this.refillRate)-(now-this.last))/1000;
// add to queue, to the beginning if
// the source was the queue itself
item = {
cb: cb
, cost: cost
};
if (waitTime > this.maxWaitingTime) {
if (debug) log.debug('Rejectin gitme because its waiting time %s exceeds the max waiting time of %s ...', waitTime, this.maxWaitingTime);
if (fromQueue) this.queue.unshift(item);
else this.queue.push(item);
cb(new Error('Timeout exceeded, too many waiting requests! Would take '+waitTime+' seconds to complete, the max waiting time is '+this.maxWaitingTime+'!'));
}
else {
if (debug) log.debug('Adding item with a cost of %s item(s) to the queue ...', cost);
// add to queue, to the beginning if
// the source was the queue itself
item = {
cb: cb
, cost: cost
};
// increase the wait time variable so we can copute the exact time
// that it will takr to execute the last added item
this.waitTime += cost;
// queu ad the end if the item was added by the user
if (fromQueue) this.queue.unshift(item);
else this.queue.push(item);
}
}

@@ -126,3 +192,3 @@

this.timer = null;
this.throttle(queuedItem.cost, queuedItem.cb, true);
this._throttle(queuedItem.cost, queuedItem.cb, true);
}.bind(this), Math.round((1-this.left)*this.refillRate*1000));

@@ -129,0 +195,0 @@ }

{
"name" : "leaky-bucket"
, "description" : "A fast and efficient leaky bucket implementation"
, "version" : "0.1.8"
, "version" : "0.1.9"
, "homepage" : "https://github.com/eventEmitter/leaky-bucket"

@@ -23,2 +23,3 @@ , "author" : "Michael van der Weg <michael@joinbox.com> (http://joinbox.com/)"

, "ee-argv" : "0.1.x"
, "es6-promise" : "2.0.x"
}

@@ -25,0 +26,0 @@ , "devDependencies": {

@@ -23,5 +23,5 @@ # leaky-bucket

The constructor accpets two parameter, both are optional
The constructor accepts three parameters, all are optional
var instance = new LeakyBucket([ItemsPerInterval = 60][, Interval = 60]);
var instance = new LeakyBucket([capacity = 60][, Interval = 60][, maxWaitingTime = 300]);

@@ -39,7 +39,27 @@

Create a new leaky bucket which is allowed to execute 200 items every 30 seconds with a maxWaitingTime of 60 seconds
var bucket = new LeakyBucket(200, 30, 60);
You may also use an options object instead of the eparameters
var bucket = new LeakyBucket({
capacity: 200 // items per interval, defaults to 60
, iterval: 30 // seconds, defaults to 60
, maxWaitingTime: 60 // seconds, defaults to 300
});
### Throttling
The throttle accepts two parameters, of which the first is optional
The throttle accepts two parameters, of which both are optional
- The first parameter can either be a callback function or the cost of the opertion
- The seocnd parameter can be the callback function
If you do not pass a callabck a promise is returned. The first argument of the callback is an error object (or the promise fails) if the item could not be executed becuas the max waiting time was exceeded.
bucktet.throttle([cost], callback);

@@ -52,3 +72,3 @@

bucket.throttle(function() {
bucket.throttle(function(err) {
// do something

@@ -60,3 +80,3 @@ });

bucket.throttle(10, function() {
bucket.throttle(10, function(err) {
// do something

@@ -66,2 +86,11 @@ });

Throttle an using Promises
bucket.throttle().then(function() {
// ok, do your stuff ...
}).catch(function(err) {
// max waiting time exceeded, dont execute anythig
});
### Flags

@@ -76,3 +105,3 @@

Rate limit API calls, allowed are no more than 60 requests per second
Rate limit API calls on the client side, allowed are no more than 60 requests per minute

@@ -98,2 +127,23 @@ var LeakyBucket = require('leaky-bucket')

});
});
Rate limit API calls on the server side, allowed are no more than 60 requests per minute
var LeakyBucket = require('leaky-bucket')
, request = require('request')
, bucket;
// create bucket instance, 60 request per minute, max waiting time = 0
bucket = new LeakyBucket(60, 60, 0);
// this let pass all request that are within the liimt and fail all that
// exceed it
bucket.throttle(function(err) {
if (err) response.send(429, 'too many requests!');
else response.send(200, '{id:4, ...}')
});

@@ -166,3 +166,84 @@

});
it('should abort items that exceed the max waiting time', function(done) {
var start = Date.now()
, executed = 0
, maxTime = 11500
, minTime = 10500
, capacity = 60
, items = 100
, iterator = items
, cb, bucket;
// wait fo rthe bucket
this.timeout(15000);
cb = function() {
var duration;
if (++executed === items) {
duration = Date.now()-start;
assert(duration>=minTime, 'The leaky bucket finished too soon ('+duration+' < '+minTime+') ...');
assert(duration<maxTime, 'The leaky bucket finished too late ('+duration+' > '+maxTime+') ...');
done();
}
}
bucket = new LeakyBucket(capacity, 60, 10);
while(iterator--) bucket.throttle(cb);
});
it('should work using promises', function(done) {
var start = Date.now()
, executed = 0
, maxTime = 11500
, minTime = 10500
, capacity = 60
, items = 100
, errCount = 0
, expectedErrCount = 29
, iterator = items
, cb, bucket;
// wait fo rthe bucket
this.timeout(15000);
cb = function(err) {
var duration;
if (err) errCount++;
if (++executed === items) {
duration = Date.now()-start;
assert(duration>=minTime, 'The leaky bucket finished too soon ('+duration+' < '+minTime+') ...');
assert(duration<maxTime, 'The leaky bucket finished too late ('+duration+' > '+maxTime+') ...');
assert(errCount===expectedErrCount, 'The leaky bucket should have emitted '+errCount+' errros, it emitted '+expectedErrCount+' errors...');
done();
}
}
bucket = new LeakyBucket(capacity, 60, 10);
while(iterator--) bucket.throttle().then(cb).catch(cb);
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc