speed-limiter
Advanced tools
Comparing version 0.1.5 to 0.2.0
@@ -13,2 +13,6 @@ const { TokenBucket } = require('limiter') | ||
getEnabled () { | ||
return this._enabled | ||
} | ||
getRate () { | ||
@@ -27,5 +31,5 @@ if (!this.bucket) return null | ||
this.enabled = val | ||
this._enabled = val | ||
for (const throttle of this.throttles) { | ||
throttle.enabled = val | ||
throttle.setEnabled(val) | ||
} | ||
@@ -32,0 +36,0 @@ } |
@@ -1,2 +0,2 @@ | ||
const { Transform } = require('stream') | ||
const { Transform } = require('streamx') | ||
const { wait } = require('./utils') | ||
@@ -15,37 +15,60 @@ | ||
this.enabled = params.enabled || params.group.enabled | ||
this.group = params.group | ||
this.setEnabled(params.enabled || params.group.enabled) | ||
this._group = params.group | ||
this._destroyed = false | ||
} | ||
setEnabled (val) { | ||
this.enabled = val | ||
getEnabled () { | ||
return this._enabled | ||
} | ||
_transform (chunk, _, done) { | ||
if (!this.enabled || !this.group.enabled) return done(null, chunk) | ||
getGroup () { | ||
return this._group | ||
} | ||
setEnabled (val = true) { | ||
if (typeof val !== 'boolean') throw new Error('Enabled must be a boolean') | ||
this._enabled = val | ||
} | ||
_transform (chunk, done) { | ||
this._processChunk(chunk, done) | ||
} | ||
async _waitForPositiveRate () { | ||
// Stop pushing chunks if rate is zero | ||
while (this._group.getRate() === 0 && !this._destroyed) { | ||
await wait(1 * 1000) // wait 1 second | ||
} | ||
} | ||
async _waitForTokens (amount) { | ||
return new Promise((resolve, reject) => { | ||
this._group.bucket.removeTokens(amount, (err) => { | ||
if (err) return reject(err) | ||
resolve() | ||
}) | ||
}) | ||
} | ||
async _processChunk (chunk, done) { | ||
if (!this._enabled || !this._group.getEnabled()) return done(null, chunk) | ||
// Stop pushing chunks if rate is zero | ||
await this._waitForPositiveRate() | ||
if (this._destroyed) return | ||
let pos = 0 | ||
let slice = chunk.slice(pos, pos + this.group.chunksize) | ||
const chunksize = this._group.getChunksize() | ||
let slice = chunk.slice(pos, pos + chunksize) | ||
while (slice.length) { | ||
try { | ||
// Check here again because we might be in the middle of a big chunk | ||
if (this.enabled && this.group.enabled) { | ||
if (this._enabled && this._group.getEnabled()) { | ||
// Stop pushing chunks if rate is zero | ||
while (this.group.getRate() === 0) { | ||
await wait(1 * 1000) // wait 1 second | ||
if (this._destroyed) return | ||
} | ||
await this._waitForPositiveRate() | ||
if (this._destroyed) return | ||
await new Promise((resolve, reject) => { | ||
this.group.bucket.removeTokens(slice.length, (err) => { | ||
if (err) return reject(err) | ||
resolve() | ||
}) | ||
}) | ||
// Get tokens from bucket | ||
await this._waitForTokens(slice.length) | ||
if (this._destroyed) return | ||
@@ -58,4 +81,4 @@ } | ||
this.push(slice) | ||
pos += this.group.chunksize | ||
slice = chunk.slice(pos, pos + this.group.chunksize) | ||
pos += chunksize | ||
slice = chunk.slice(pos, pos + chunksize) | ||
} | ||
@@ -67,3 +90,3 @@ | ||
destroy (...args) { | ||
this.group._removeThrottle(this) | ||
this._group._removeThrottle(this) | ||
@@ -70,0 +93,0 @@ this._destroyed = true |
{ | ||
"name": "speed-limiter", | ||
"version": "0.1.5", | ||
"version": "0.2.0", | ||
"description": "Throttle the speed of streams", | ||
@@ -12,3 +12,4 @@ "main": "index.js", | ||
"dependencies": { | ||
"limiter": "^1.1.5" | ||
"limiter": "^1.1.5", | ||
"streamx": "^2.10.3" | ||
}, | ||
@@ -15,0 +16,0 @@ "devDependencies": { |
# speed-limiter | ||
[![NPM Version](https://img.shields.io/npm/v/speed-limiter.svg)](https://www.npmjs.com/package/speed-limiter) | ||
[![Build Status](https://img.shields.io/github/workflow/status/alxhotel/speed-limiter/ci/master)](https://github.com/alxhotel/speed-limiter/actions) | ||
[![Standard - Javascript Style Guide](https://img.shields.io/badge/code_style-standard-brightgreen.svg)](https://standardjs.com) | ||
[![Build Status](https://img.shields.io/github/workflow/status/alxhotel/speed-limiter/ci/main)](https://github.com/alxhotel/speed-limiter/actions) | ||
@@ -22,2 +21,17 @@ Throttle the speed of streams in NodeJS | ||
const throttleGroup = new ThrottleGroup({ rate }) | ||
// Create a new throttle | ||
const throttle = throttleGroup.throttle() | ||
// Use it throttle as any other Transform | ||
let dataReceived = '' | ||
const dataToSend = 'hello' | ||
throttle.on('data', (data) => { | ||
dataReceived += data.toString() | ||
}) | ||
throttle.on('end', () => { | ||
console.log('Ended') | ||
}) | ||
throttle.write(dataToSend) | ||
throttle.end() | ||
``` | ||
@@ -27,14 +41,86 @@ | ||
#### `const throttle = new Throttle()` | ||
#### `const throttleGroup = new ThrottleGroup(opts)` | ||
Initialize the throttle group. | ||
The param `opts` can have these parameters: | ||
```js | ||
{ | ||
enabled: Boolean, // Enables/disables the throttling (defaul=true) | ||
rate: Number, // Sets the max. rate (in bytes/sec) | ||
chunksize: Number, // Sets the chunk size used (deault=rate/10) | ||
} | ||
``` | ||
Note: the `rate` parameter is required | ||
#### `throttleGroup.getEnabled()` | ||
Returns a `boolean`. | ||
If true, the throttling is enabled for the whole `throttleGroup`, otherwise not. | ||
However, if a specific `throttle` in the group has the throttling disabled, then only | ||
that throttle will block the data. | ||
#### `throttleGroup.getRate()` | ||
Returns a `number`. | ||
Gets the bytes/sec rate at which the throttle group rate is set. | ||
#### `throttleGroup.getChunksize()` | ||
Returns a `number`. | ||
Gets the chunk size used in the rate limiter. | ||
#### `throttleGroup.setEnabled(enabled)` | ||
Used to disable or enabling the throttling of all the throttles of `throttleGroup`. | ||
#### `throttleGroup.setRate(rate)` | ||
Sets the maxium rate (in bytes/sec) at which the whole group of throttles can pass data. | ||
#### `throttleGroup.setChunksize(chunksize)` | ||
Sets the chunk size used in the rate limiter. | ||
#### `const throttle = new Throttle(opts)` | ||
Initialize the throttle instance. | ||
#### `const throttleGroup = new ThrottleGroup()` | ||
The param `opts` can have these parameters: | ||
Initialize the throttle group. | ||
```js | ||
{ | ||
enabled: Boolean, // Enables/disables the throttling for that throttle (default=true) | ||
rate: Number, // Sets the max. rate (in bytes/sec) | ||
chunksize: Number, // Sets the chunk size used (default=rate/10) | ||
group: ThrottleGroup, // Sets the throttle group for that throttle (default=null) | ||
} | ||
``` | ||
TODO: add rest of methods | ||
If the `group` parameter is null, then a new `ThrottleGroup` will be created. | ||
Note: the `rate` parameter is required | ||
#### `throttle.getEnabled()` | ||
Returns a `boolean`. | ||
If true, the throttling is enabled for `throttle`, otherwise not. | ||
#### `throttle.getGroup()` | ||
Returns the `ThrottleGroup` of `throttle`. | ||
#### `throttle.setEnabled(enabled)` | ||
Used to disable or enabling the throttling of `throttle`. | ||
## License | ||
MIT. Copyright (c) [Alex](https://github.com/alxhotel) |
@@ -5,10 +5,3 @@ const async = require('async') | ||
const dataToSend = (() => { | ||
let str = '' | ||
const s = '0123456789xyzXYZ' | ||
for (let i = 0; i < 1000; i++) { | ||
str += s | ||
} | ||
return str | ||
})() | ||
const { getRandomData, testThrottle } = require('./common') | ||
@@ -19,27 +12,10 @@ const opts = { | ||
function testThrottle (throttle, cb) { | ||
let dataReceived = '' | ||
throttle.on('data', (chunk) => { | ||
dataReceived += chunk.toString() | ||
}) | ||
throttle.on('end', () => { | ||
const isEqual = (dataToSend === dataReceived) | ||
const after = Date.now() | ||
const speed = dataToSend.length / ((after - before) / 1000) | ||
cb(isEqual, speed) | ||
}) | ||
const before = Date.now() | ||
throttle.write(dataToSend, () => { | ||
throttle.end() | ||
}) | ||
} | ||
test('Throttle must send the data', (t) => { | ||
t.plan(1) | ||
const dataToSend = getRandomData() | ||
const throttle = new Throttle(opts) | ||
testThrottle(throttle, (ok) => { | ||
t.ok(ok, 'received string should equal sent string') | ||
testThrottle(dataToSend, throttle, (data) => { | ||
t.ok(dataToSend.equals(data), 'data received should equal data sent') | ||
@@ -53,2 +29,3 @@ throttle.destroy() | ||
const dataToSend = getRandomData() | ||
const group = new ThrottleGroup(opts) | ||
@@ -59,4 +36,4 @@ | ||
testThrottle(group.throttle(), (ok) => { | ||
t.ok(ok, 'received string should equal sent string') | ||
testThrottle(dataToSend, group.throttle(), (data) => { | ||
t.ok(dataToSend.equals(data), 'data received should equal data sent') | ||
@@ -69,63 +46,1 @@ throttle.destroy() | ||
}) | ||
test('Speed must be lower than the rate set', (t) => { | ||
t.plan(1) | ||
const throttle = new Throttle(opts) | ||
testThrottle(throttle, (_, speed) => { | ||
const lessThanRate = (speed <= opts.rate) | ||
t.ok(lessThanRate, 'speed should be less or equal to rate') | ||
throttle.destroy() | ||
}) | ||
}) | ||
test('Speed must be higher than 10 KB/s, when disabled and rate = 0', (t) => { | ||
t.plan(1) | ||
const throttle = new Throttle({ | ||
rate: 0, | ||
enabled: false | ||
}) | ||
testThrottle(throttle, (_, speed) => { | ||
const lessThanRate = (speed >= 10 * 1000) | ||
t.ok(lessThanRate, 'speed should be more than 10 KB/s') | ||
throttle.destroy() | ||
}) | ||
}) | ||
test('Speed must be higher than 10 KB/s, when disabled and rate is > 0', (t) => { | ||
t.plan(1) | ||
const throttle = new Throttle({ | ||
rate: 100, | ||
enabled: false | ||
}) | ||
testThrottle(throttle, (_, speed) => { | ||
const lessThanRate = (speed >= 10 * 1000) | ||
t.ok(lessThanRate, 'speed should be more than 10 KB/s') | ||
throttle.destroy() | ||
}) | ||
}) | ||
test('Throttle should block everything if rate is zero', (t) => { | ||
const throttle = new Throttle({ | ||
rate: 0, | ||
enabled: true | ||
}) | ||
testThrottle(throttle, () => { | ||
t.fail('throttle has not block all chunks') | ||
}) | ||
setTimeout(() => { | ||
throttle.destroy() | ||
t.pass('throttle has block all chunks') | ||
t.end() | ||
}, 2000) | ||
}) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
14090
11
274
125
2
+ Addedstreamx@^2.10.3
+ Addedb4a@1.6.6(transitive)
+ Addedbare-events@2.4.2(transitive)
+ Addedfast-fifo@1.3.2(transitive)
+ Addedqueue-tick@1.0.1(transitive)
+ Addedstreamx@2.20.1(transitive)
+ Addedtext-decoder@1.2.0(transitive)