bandwidth-throttle-stream
Advanced tools
Comparing version 1.0.1 to 1.1.0
@@ -0,1 +1,5 @@ | ||
# 1.1.0 | ||
- Fixes issues when passing requests through an unbounded throttle (`bytesPerSecond` = `Infinity`). | ||
- Adds public `onThroughputMetrics` metrics callback to the `BandwidthThrottleGroup`. | ||
# 1.0.1 | ||
@@ -2,0 +6,0 @@ |
@@ -66,4 +66,6 @@ import Config from './Config'; | ||
* pushes it out to a piped writable stream. | ||
* | ||
* @returns The number of bytes processed through the throttle | ||
*/ | ||
process(maxBytesToProcess?: number): void; | ||
process(maxBytesToProcess?: number): number; | ||
/** | ||
@@ -70,0 +72,0 @@ * Informs the parent group that the throttle is no longer needed and can |
@@ -84,2 +84,4 @@ "use strict"; | ||
* pushes it out to a piped writable stream. | ||
* | ||
* @returns The number of bytes processed through the throttle | ||
*/ | ||
@@ -98,7 +100,8 @@ process(maxBytesToProcess = Infinity) { | ||
} | ||
// If there is more data to be processed, stop here | ||
if (this.pendingBytesReadIndex < this.pendingBytesCount) | ||
return; | ||
// If there are no other promises at this point | ||
// we can consider the request inactive. | ||
// If there is more data to be processed, or there is no pending data but we are | ||
// unthrottled, stop here | ||
if (this.pendingBytesReadIndex < this.pendingBytesCount || | ||
!this.config.isThrottled) | ||
return bytesToPushLength; | ||
// End the request | ||
this.done.resolve(); | ||
@@ -108,2 +111,3 @@ this.handleRequestStop(this); | ||
this.isInFlight = false; | ||
return bytesToPushLength; | ||
} | ||
@@ -146,4 +150,15 @@ /** | ||
return __awaiter(this, void 0, void 0, function* () { | ||
if (this.pendingBytesCount > 0) | ||
return this.done; | ||
// If an empty request was passed through the throttle, end immediately | ||
if (this.pendingBytesCount === 0) | ||
return; | ||
if (!this.config.isThrottled) { | ||
// If the throttle is unbounded, then all data has been | ||
// processed and request can be completed | ||
this.handleRequestStop(this); | ||
this.destroy(); | ||
this.isInFlight = false; | ||
return; | ||
} | ||
// Else, wait for the processing cycle to compelte the request | ||
return this.done; | ||
}); | ||
@@ -150,0 +165,0 @@ } |
import BandwidthThrottle from './BandwidthThrottle'; | ||
import Config from './Config'; | ||
import IConfig from './Interfaces/IConfig'; | ||
import IThroughputData from './Interfaces/IThroughputData'; | ||
/** | ||
@@ -16,8 +17,15 @@ * A class used to configure and bridge between one or more | ||
config: Readonly<Config>; | ||
/** | ||
* An optional callback providing the consumer with metrics pertaining to the throttle | ||
* group's average throughput and utlization percentage. | ||
*/ | ||
onThroughputMetrics: ((throughputData: IThroughputData) => void) | null; | ||
private inFlightRequests; | ||
private bandwidthThrottles; | ||
private clockIntervalId; | ||
private pollThroughputIntervalId; | ||
private lastTickTime; | ||
private tickIndex; | ||
private secondIndex; | ||
private totalBytesProcessed; | ||
private get hasTicked(); | ||
@@ -76,3 +84,4 @@ private get isTicking(); | ||
private processInFlightRequests; | ||
private pollThroughput; | ||
} | ||
export default BandwidthThrottleGroup; |
@@ -24,8 +24,15 @@ "use strict"; | ||
this.config = new Config_1.default(); | ||
/** | ||
* An optional callback providing the consumer with metrics pertaining to the throttle | ||
* group's average throughput and utlization percentage. | ||
*/ | ||
this.onThroughputMetrics = null; | ||
this.inFlightRequests = []; | ||
this.bandwidthThrottles = []; | ||
this.clockIntervalId = null; | ||
this.pollThroughputIntervalId = this.pollThroughput(); | ||
this.lastTickTime = -1; | ||
this.tickIndex = 0; | ||
this.secondIndex = 0; | ||
this.totalBytesProcessed = 0; | ||
Object.assign(this.config, options); | ||
@@ -64,2 +71,3 @@ this.createBandwidthThrottle = this.createBandwidthThrottle.bind(this); | ||
destroy() { | ||
clearInterval(this.pollThroughputIntervalId); | ||
while (this.bandwidthThrottles.length) | ||
@@ -146,3 +154,4 @@ this.bandwidthThrottles.pop().destroy(); | ||
const bytesPerRequestPerTick = getPartitionedIntegerPartAtIndex_1.default(bytesPerRequestPerSecond, this.config.ticksPerSecond, this.tickIndex); | ||
bandwidthThrottle.process(bytesPerRequestPerTick * delayMultiplier); | ||
const bytesProcessed = bandwidthThrottle.process(bytesPerRequestPerTick * delayMultiplier); | ||
this.totalBytesProcessed += bytesProcessed; | ||
if (this.inFlightRequests.length < currentInFlightRequestsCount) { | ||
@@ -168,4 +177,27 @@ i--; | ||
} | ||
pollThroughput() { | ||
const bytesPerSecondSamples = []; | ||
const perSecondMultipler = 1000 / this.config.throughputSampleIntervalMs; | ||
let lastHeapRead = 0; | ||
return _Platform_1.setInterval(() => { | ||
const bytesSinceLastSample = Math.max(0, this.totalBytesProcessed - lastHeapRead); | ||
lastHeapRead = this.totalBytesProcessed; | ||
bytesPerSecondSamples.push(bytesSinceLastSample); | ||
if (bytesSinceLastSample === 0) | ||
this.totalBytesProcessed = 0; | ||
if (bytesPerSecondSamples.length > this.config.throughputSampleSize) | ||
bytesPerSecondSamples.shift(); | ||
const averageBytesPerSecond = (bytesPerSecondSamples.reduce((sum, sample) => sum + sample, 0) / | ||
bytesPerSecondSamples.length) * | ||
perSecondMultipler; | ||
if (typeof this.onThroughputMetrics === 'function') { | ||
this.onThroughputMetrics({ | ||
averageBytesPerSecond, | ||
utilization: Math.min(1, averageBytesPerSecond / this.config.bytesPerSecond) | ||
}); | ||
} | ||
}, this.config.throughputSampleIntervalMs); | ||
} | ||
} | ||
exports.default = BandwidthThrottleGroup; | ||
//# sourceMappingURL=BandwidthThrottleGroup.js.map |
@@ -5,2 +5,4 @@ import IConfig from './Interfaces/IConfig'; | ||
ticksPerSecond: number; | ||
throughputSampleIntervalMs: number; | ||
throughputSampleSize: number; | ||
get isThrottled(): boolean; | ||
@@ -7,0 +9,0 @@ get tickDurationMs(): number; |
@@ -7,2 +7,4 @@ "use strict"; | ||
this.ticksPerSecond = 40; | ||
this.throughputSampleIntervalMs = 1000; | ||
this.throughputSampleSize = 4; | ||
} | ||
@@ -9,0 +11,0 @@ get isThrottled() { |
@@ -22,3 +22,16 @@ interface IConfig { | ||
ticksPerSecond?: number; | ||
/** | ||
* The frequency of samples used to determine the `averageBytesPerSecond` metric. | ||
* | ||
* @default 1000 | ||
*/ | ||
throughputSampleIntervalMs?: number; | ||
/** | ||
* The maximum number of samples that should contribute to the | ||
* `averageBytesPerSecond` metric rolling average. | ||
* | ||
* @default 4 | ||
*/ | ||
throughputSampleSize?: number; | ||
} | ||
export default IConfig; |
{ | ||
"name": "bandwidth-throttle-stream", | ||
"version": "1.0.1", | ||
"version": "1.1.0", | ||
"description": "A Node.js and Deno transform stream for throttling bandwidth", | ||
@@ -5,0 +5,0 @@ "author": "KunkaLabs Limited", |
@@ -43,3 +43,3 @@ ![CI](https://github.com/patrickkunka/bandwidth-throttle-stream/workflows/CI/badge.svg) [![Coverage Status](https://coveralls.io/repos/github/patrickkunka/bandwidth-throttle-stream/badge.svg?branch=master)](https://coveralls.io/github/patrickkunka/bandwidth-throttle-stream?branch=master) | ||
In Deno, all modules are imported from URLs as ES modules. Versioned releases of `bandwidth_throttle_stream` are available from [deno.land/x](https://deno.land/x). Note that as per Deno convention, the package name is delineated with underscores (`_`). | ||
In Deno, all modules are imported from URLs as ES modules. Versioned [releases](https://github.com/patrickkunka/bandwidth-throttle-stream/releases) of `bandwidth_throttle_stream` are available from [deno.land/x](https://deno.land/x). Note that as per Deno convention, the package name is delineated with underscores (`_`). | ||
@@ -46,0 +46,0 @@ ```js |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
70592
60
972