speed-limiter
Advanced tools
Comparing version 0.2.1 to 0.2.2
@@ -0,1 +1,2 @@ | ||
const { EventEmitter } = require('events') | ||
const { Transform } = require('streamx') | ||
@@ -15,5 +16,6 @@ const { wait } = require('./utils') | ||
this.setEnabled(params.enabled || params.group.enabled) | ||
this._events = new EventEmitter() | ||
this._group = params.group | ||
this._destroyed = false | ||
this.setEnabled(params.enabled || params.group.enabled) | ||
} | ||
@@ -32,2 +34,5 @@ | ||
this._enabled = val | ||
if (this._enabled) this._events.emit('enabled') | ||
else this._events.emit('disabled') | ||
} | ||
@@ -41,3 +46,3 @@ | ||
// Stop pushing chunks if rate is zero | ||
while (this._group.getRate() === 0 && !this._destroyed) { | ||
while (this._group.getRate() === 0 && !this._destroyed && this._areBothEnabled()) { | ||
await wait(1 * 1000) // wait 1 second | ||
@@ -48,7 +53,19 @@ } | ||
async _waitForTokens (amount) { | ||
// Wait for enabled, destroyed or tokens | ||
return new Promise((resolve, reject) => { | ||
this._group.bucket.removeTokens(amount, (err) => { | ||
let done = false | ||
const self = this | ||
function isDone (err) { | ||
self._events.removeListener('disabled', isDone) | ||
self._events.removeListener('destroyed', isDone) | ||
if (done) return | ||
done = true | ||
if (err) return reject(err) | ||
resolve() | ||
}) | ||
} | ||
this._events.once('disabled', isDone) | ||
this._events.once('destroyed', isDone) | ||
// TODO: next version remove lisener in "isDone" | ||
this._group.bucket.removeTokens(amount, isDone) | ||
}) | ||
@@ -61,2 +78,12 @@ } | ||
async _throttleChunk (size) { | ||
// Stop pushing chunks if rate is zero | ||
await this._waitForPositiveRate() | ||
if (this._destroyed) return | ||
if (!this._areBothEnabled()) return | ||
// Get tokens from bucket | ||
await this._waitForTokens(size) | ||
} | ||
async _processChunk (chunk, done) { | ||
@@ -69,16 +96,11 @@ if (!this._areBothEnabled()) return done(null, chunk) | ||
while (slice.length) { | ||
try { | ||
// Check here again because we might be in the middle of a big chunk | ||
// with a lot of small slices | ||
if (this._areBothEnabled()) { | ||
// Stop pushing chunks if rate is zero | ||
await this._waitForPositiveRate() | ||
// Check here again because we might be in the middle of a big chunk | ||
// with a lot of small slices | ||
if (this._areBothEnabled()) { | ||
try { | ||
await this._throttleChunk(slice.length) | ||
if (this._destroyed) return | ||
// Get tokens from bucket | ||
await this._waitForTokens(slice.length) | ||
if (this._destroyed) return | ||
} catch (err) { | ||
return done(err) | ||
} | ||
} catch (err) { | ||
return done(err) | ||
} | ||
@@ -103,2 +125,3 @@ | ||
this._destroyed = true | ||
this._events.emit('destroyed') | ||
@@ -105,0 +128,0 @@ super.destroy(...args) |
{ | ||
"name": "speed-limiter", | ||
"version": "0.2.1", | ||
"version": "0.2.2", | ||
"description": "Throttle the speed of streams", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
16552
348