+145
-22
@@ -19,2 +19,6 @@ import { EventEmitter } from 'eventemitter3'; | ||
| #timeoutId; | ||
| #strict; | ||
| // Circular buffer implementation for better performance | ||
| #strictTicks = []; | ||
| #strictTicksStartIndex = 0; | ||
| #queue; | ||
@@ -56,2 +60,3 @@ #queueClass; | ||
| queueClass: PriorityQueue, | ||
| strict: false, | ||
| ...options, | ||
@@ -65,2 +70,8 @@ }; | ||
| } | ||
| if (options.strict && options.interval === 0) { | ||
| throw new TypeError('The `strict` option requires a non-zero `interval`'); | ||
| } | ||
| if (options.strict && options.intervalCap === Number.POSITIVE_INFINITY) { | ||
| throw new TypeError('The `strict` option requires a finite `intervalCap`'); | ||
| } | ||
| // TODO: Remove this fallback in the next major version | ||
@@ -72,2 +83,3 @@ // eslint-disable-next-line @typescript-eslint/no-deprecated | ||
| this.#interval = options.interval; | ||
| this.#strict = options.strict; | ||
| this.#queue = new options.queueClass(); | ||
@@ -83,4 +95,54 @@ this.#queueClass = options.queueClass; | ||
| } | ||
| #cleanupStrictTicks(now) { | ||
| // Remove ticks outside the current interval window using circular buffer approach | ||
| while (this.#strictTicksStartIndex < this.#strictTicks.length) { | ||
| const oldestTick = this.#strictTicks[this.#strictTicksStartIndex]; | ||
| if (oldestTick !== undefined && now - oldestTick >= this.#interval) { | ||
| this.#strictTicksStartIndex++; | ||
| } | ||
| else { | ||
| break; | ||
| } | ||
| } | ||
| // Compact the array when it becomes inefficient or fully consumed | ||
| // Compact when: (start index is large AND more than half wasted) OR all ticks expired | ||
| const shouldCompact = (this.#strictTicksStartIndex > 100 && this.#strictTicksStartIndex > this.#strictTicks.length / 2) | ||
| || this.#strictTicksStartIndex === this.#strictTicks.length; | ||
| if (shouldCompact) { | ||
| this.#strictTicks = this.#strictTicks.slice(this.#strictTicksStartIndex); | ||
| this.#strictTicksStartIndex = 0; | ||
| } | ||
| } | ||
| // Helper methods for interval consumption | ||
| #consumeIntervalSlot(now) { | ||
| if (this.#strict) { | ||
| this.#strictTicks.push(now); | ||
| } | ||
| else { | ||
| this.#intervalCount++; | ||
| } | ||
| } | ||
| #rollbackIntervalSlot() { | ||
| if (this.#strict) { | ||
| // Pop from the end of the actual data (not from start index) | ||
| if (this.#strictTicks.length > this.#strictTicksStartIndex) { | ||
| this.#strictTicks.pop(); | ||
| } | ||
| } | ||
| else if (this.#intervalCount > 0) { | ||
| this.#intervalCount--; | ||
| } | ||
| } | ||
| #getActiveTicksCount() { | ||
| return this.#strictTicks.length - this.#strictTicksStartIndex; | ||
| } | ||
| get #doesIntervalAllowAnother() { | ||
| return this.#isIntervalIgnored || this.#intervalCount < this.#intervalCap; | ||
| if (this.#isIntervalIgnored) { | ||
| return true; | ||
| } | ||
| if (this.#strict) { | ||
| // Cleanup already done by #isIntervalPausedAt before this is called | ||
| return this.#getActiveTicksCount() < this.#intervalCap; | ||
| } | ||
| return this.#intervalCount < this.#intervalCap; | ||
| } | ||
@@ -99,8 +161,24 @@ get #doesConcurrentAllowAnother() { | ||
| #onResumeInterval() { | ||
| this.#onInterval(); // Already schedules update | ||
| // Clear timeout ID before processing to prevent race condition | ||
| // Must clear before #onInterval to allow new timeouts to be scheduled | ||
| this.#timeoutId = undefined; | ||
| this.#onInterval(); | ||
| this.#initializeIntervalIfNeeded(); | ||
| this.#timeoutId = undefined; | ||
| } | ||
| get #isIntervalPaused() { | ||
| const now = Date.now(); | ||
| #isIntervalPausedAt(now) { | ||
| // Strict mode: check if we need to wait for oldest tick to age out | ||
| if (this.#strict) { | ||
| this.#cleanupStrictTicks(now); | ||
| // If at capacity, need to wait for oldest tick to age out | ||
| const activeTicksCount = this.#getActiveTicksCount(); | ||
| if (activeTicksCount >= this.#intervalCap) { | ||
| const oldestTick = this.#strictTicks[this.#strictTicksStartIndex]; | ||
| // After cleanup, remaining ticks are within interval, so delay is always > 0 | ||
| const delay = this.#interval - (now - oldestTick); | ||
| this.#createIntervalTimeout(delay); | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
| // Fixed window mode (original logic) | ||
| if (this.#intervalId === undefined) { | ||
@@ -160,2 +238,7 @@ const delay = this.#intervalEnd - now; | ||
| this.#clearTimeoutTimer(); | ||
| // Compact strict ticks when idle to free memory | ||
| if (this.#strict && this.#strictTicksStartIndex > 0) { | ||
| const now = Date.now(); | ||
| this.#cleanupStrictTicks(now); | ||
| } | ||
| this.emit('idle'); | ||
@@ -167,12 +250,11 @@ } | ||
| if (!this.#isPaused) { | ||
| const canInitializeInterval = !this.#isIntervalPaused; | ||
| const now = Date.now(); | ||
| const canInitializeInterval = !this.#isIntervalPausedAt(now); | ||
| if (this.#doesIntervalAllowAnother && this.#doesConcurrentAllowAnother) { | ||
| const job = this.#queue.dequeue(); | ||
| // Increment interval count immediately to prevent race conditions | ||
| if (!this.#isIntervalIgnored) { | ||
| this.#intervalCount++; | ||
| this.#consumeIntervalSlot(now); | ||
| this.#scheduleRateLimitUpdate(); | ||
| } | ||
| this.emit('active'); | ||
| this.#lastExecutionTime = Date.now(); | ||
| job(); | ||
@@ -191,2 +273,6 @@ if (canInitializeInterval) { | ||
| } | ||
| // Strict mode uses timeouts instead of interval timers | ||
| if (this.#strict) { | ||
| return; | ||
| } | ||
| this.#intervalId = setInterval(() => { | ||
@@ -198,6 +284,9 @@ this.#onInterval(); | ||
| #onInterval() { | ||
| if (this.#intervalCount === 0 && this.#pending === 0 && this.#intervalId) { | ||
| this.#clearIntervalTimer(); | ||
| // Non-strict mode uses interval timers and intervalCount | ||
| if (!this.#strict) { | ||
| if (this.#intervalCount === 0 && this.#pending === 0 && this.#intervalId) { | ||
| this.#clearIntervalTimer(); | ||
| } | ||
| this.#intervalCount = this.#carryoverIntervalCount ? this.#pending : 0; | ||
| } | ||
| this.#intervalCount = this.#carryoverIntervalCount ? this.#pending : 0; | ||
| this.#processQueue(); | ||
@@ -266,7 +355,8 @@ this.#scheduleRateLimitUpdate(); | ||
| async add(function_, options = {}) { | ||
| // In case `id` is not defined. | ||
| options.id ??= (this.#idAssigner++).toString(); | ||
| // Create a copy to avoid mutating the original options object | ||
| options = { | ||
| timeout: this.timeout, | ||
| ...options, | ||
| // Assign unique ID if not provided | ||
| id: options.id ?? (this.#idAssigner++).toString(), | ||
| }; | ||
@@ -293,6 +383,3 @@ return new Promise((resolve, reject) => { | ||
| catch (error) { | ||
| // Decrement the counter that was already incremented | ||
| if (!this.#isIntervalIgnored) { | ||
| this.#intervalCount--; | ||
| } | ||
| this.#rollbackIntervalConsumption(); | ||
| // Clean up tracking before throwing | ||
@@ -302,2 +389,3 @@ this.#runningTasks.delete(taskSymbol); | ||
| } | ||
| this.#lastExecutionTime = Date.now(); | ||
| let operation = function_({ signal: options.signal }); | ||
@@ -369,2 +457,7 @@ if (options.timeout) { | ||
| this.#queue = new this.#queueClass(); | ||
| // Clear interval timer since queue is now empty (consistent with #tryToStartAnother) | ||
| this.#clearIntervalTimer(); | ||
| // Note: We preserve strict mode rate-limiting state (ticks and timeout) | ||
| // because clear() only clears queued tasks, not rate limit history. | ||
| // This ensures that rate limits are still enforced after clearing the queue. | ||
| // Note: We don't clear #runningTasks as those tasks are still running | ||
@@ -374,2 +467,9 @@ // They will be removed when they complete in the finally block | ||
| this.#updateRateLimitState(); | ||
| // Emit events so waiters (onEmpty, onIdle, onSizeLessThan) can resolve | ||
| this.emit('empty'); | ||
| if (this.#pending === 0) { | ||
| this.#clearTimeoutTimer(); | ||
| this.emit('idle'); | ||
| } | ||
| this.emit('next'); | ||
| } | ||
@@ -473,3 +573,3 @@ /** | ||
| // eslint-disable-next-line @typescript-eslint/promise-function-async | ||
| async onError() { | ||
| onError() { | ||
| return new Promise((_resolve, reject) => { | ||
@@ -549,7 +649,30 @@ const handleError = (error) => { | ||
| } | ||
| #rollbackIntervalConsumption() { | ||
| if (this.#isIntervalIgnored) { | ||
| return; | ||
| } | ||
| this.#rollbackIntervalSlot(); | ||
| this.#scheduleRateLimitUpdate(); | ||
| } | ||
| #updateRateLimitState() { | ||
| const previous = this.#rateLimitedInInterval; | ||
| const shouldBeRateLimited = !this.#isIntervalIgnored | ||
| && this.#intervalCount >= this.#intervalCap | ||
| && this.#queue.size > 0; | ||
| // Early exit if rate limiting is disabled or queue is empty | ||
| if (this.#isIntervalIgnored || this.#queue.size === 0) { | ||
| if (previous) { | ||
| this.#rateLimitedInInterval = false; | ||
| this.emit('rateLimitCleared'); | ||
| } | ||
| return; | ||
| } | ||
| // Get the current count based on mode | ||
| let count; | ||
| if (this.#strict) { | ||
| const now = Date.now(); | ||
| this.#cleanupStrictTicks(now); | ||
| count = this.#getActiveTicksCount(); | ||
| } | ||
| else { | ||
| count = this.#intervalCount; | ||
| } | ||
| const shouldBeRateLimited = count >= this.#intervalCap; | ||
| if (shouldBeRateLimited !== previous) { | ||
@@ -556,0 +679,0 @@ this.#rateLimitedInInterval = shouldBeRateLimited; |
+17
-1
@@ -70,2 +70,18 @@ import { type Queue, type RunFunction } from './queue.js'; | ||
| readonly carryoverConcurrencyCount?: boolean; | ||
| /** | ||
| Whether to use strict mode for rate limiting (sliding window algorithm). | ||
| When enabled, ensures that no more than `intervalCap` tasks execute in any rolling `interval` window, rather than resetting the count at fixed intervals. This provides more predictable and evenly distributed execution. | ||
| @default false | ||
| For example, with `intervalCap: 2` and `interval: 1000`: | ||
| - __Default mode (fixed window)__: Tasks can burst at window boundaries. You could execute 2 tasks at 999ms and 2 more at 1000ms, resulting in 4 tasks within 1ms. | ||
| - __Strict mode (sliding window)__: Enforces that no more than 2 tasks execute in any 1000ms rolling window, preventing bursts. | ||
| Strict mode is more resource-intensive as it tracks individual execution timestamps. Use it when you need guaranteed rate-limit compliance, such as when interacting with APIs that enforce strict rate limits. | ||
| The `carryoverIntervalCount` option has no effect when `strict` mode is enabled, as strict mode tracks actual execution timestamps rather than counting pending tasks. | ||
| */ | ||
| readonly strict?: boolean; | ||
| } & TimeoutOptions; | ||
@@ -120,4 +136,4 @@ export type QueueAddOptions = { | ||
| */ | ||
| readonly signal?: AbortSignal; | ||
| readonly signal?: AbortSignal | undefined; | ||
| }; | ||
| export {}; |
+1
-1
| { | ||
| "name": "p-queue", | ||
| "version": "9.0.1", | ||
| "version": "9.1.0", | ||
| "description": "Promise queue with concurrency control", | ||
@@ -5,0 +5,0 @@ "license": "MIT", |
+32
-1
@@ -143,2 +143,21 @@ # p-queue | ||
| ##### strict | ||
| Type: `boolean`\ | ||
| Default: `false` | ||
| Whether to use strict mode for rate limiting (sliding window algorithm). | ||
| When enabled, ensures that no more than `intervalCap` tasks execute in any rolling `interval` window, rather than resetting the count at fixed intervals. This provides more predictable and evenly distributed execution. | ||
| For example, with `intervalCap: 2` and `interval: 1000`: | ||
| - **Default mode (fixed window)**: Tasks can burst at window boundaries. You could execute 2 tasks at 999ms and 2 more at 1000ms, resulting in 4 tasks within 1ms. | ||
| - **Strict mode (sliding window)**: Enforces that no more than 2 tasks execute in any 1000ms rolling window, preventing bursts. | ||
| > [!NOTE] | ||
| > Strict mode is more resource-intensive as it tracks individual execution timestamps. Use it when you need guaranteed rate-limit compliance, such as when interacting with APIs that enforce strict rate limits. | ||
| > [!NOTE] | ||
| > The `carryoverIntervalCount` option has no effect when `strict` mode is enabled, as strict mode tracks actual execution timestamps rather than counting pending tasks. | ||
| ### queue | ||
@@ -559,3 +578,3 @@ | ||
| Emitted every time the queue becomes empty and all promises have completed; `queue.size === 0 && queue.pending === 0`. | ||
| Emitted whenever the queue becomes idle: both empty and with zero running tasks (`size === 0 && pending === 0`). If no tasks are ever added, it never fires. | ||
@@ -833,2 +852,14 @@ The difference with `empty` is that `idle` guarantees that all work from the queue has finished. `empty` merely signals that the queue is empty, but it could mean that some promises haven't completed yet. | ||
| #### When should I use `strict` mode for rate limiting? | ||
| Use `strict: true` when: | ||
| - You're interacting with APIs that enforce strict rate limits and will throttle or block you if you exceed them, even briefly | ||
| - You've experienced issues with the default fixed window mode (such as [#126](https://github.com/sindresorhus/p-queue/issues/126)) | ||
| - You need guaranteed compliance with rate limits for any rolling time window | ||
| Use the default fixed window mode when: | ||
| - You don't have strict rate limit requirements | ||
| - Performance is more important than perfect rate limit distribution | ||
| - You're rate limiting for backpressure management rather than external API constraints | ||
| #### How do I implement backpressure? | ||
@@ -835,0 +866,0 @@ |
| import { type Queue, type RunFunction } from './queue.js'; | ||
| type TimeoutOptions = { | ||
| /** | ||
| Per-operation timeout in milliseconds. Operations will throw a `TimeoutError` if they don't complete within the specified time. | ||
| The timeout begins when the operation is dequeued and starts execution, not while it's waiting in the queue. | ||
| @default undefined | ||
| Can be overridden per task using the `timeout` option in `.add()`: | ||
| @example | ||
| ``` | ||
| const queue = new PQueue({timeout: 5000}); | ||
| // This task uses the global 5s timeout | ||
| await queue.add(() => fetchData()); | ||
| // This task has a 10s timeout | ||
| await queue.add(() => slowTask(), {timeout: 10000}); | ||
| ``` | ||
| */ | ||
| timeout?: number; | ||
| }; | ||
| export type Options<QueueType extends Queue<RunFunction, QueueOptions>, QueueOptions extends QueueAddOptions> = { | ||
| /** | ||
| Concurrency limit. | ||
| Minimum: `1`. | ||
| @default Infinity | ||
| */ | ||
| readonly concurrency?: number; | ||
| /** | ||
| Whether queue tasks within concurrency limit, are auto-executed as soon as they're added. | ||
| @default true | ||
| */ | ||
| readonly autoStart?: boolean; | ||
| /** | ||
| Class with a `enqueue` and `dequeue` method, and a `size` getter. See the [Custom QueueClass](https://github.com/sindresorhus/p-queue#custom-queueclass) section. | ||
| */ | ||
| readonly queueClass?: new () => QueueType; | ||
| /** | ||
| The max number of runs in the given interval of time. | ||
| Minimum: `1`. | ||
| @default Infinity | ||
| */ | ||
| readonly intervalCap?: number; | ||
| /** | ||
| The length of time in milliseconds before the interval count resets. Must be finite. | ||
| Minimum: `0`. | ||
| @default 0 | ||
| */ | ||
| readonly interval?: number; | ||
| /** | ||
| Whether the task must finish in the given interval or will be carried over into the next interval count. | ||
| @default false | ||
| */ | ||
| readonly carryoverIntervalCount?: boolean; | ||
| /** | ||
| @deprecated Renamed to `carryoverIntervalCount`. | ||
| */ | ||
| readonly carryoverConcurrencyCount?: boolean; | ||
| } & TimeoutOptions; | ||
| export type QueueAddOptions = { | ||
| /** | ||
| Priority of operation. Operations with greater priority will be scheduled first. | ||
| @default 0 | ||
| */ | ||
| readonly priority?: number; | ||
| /** | ||
| Unique identifier for the promise function, used to update its priority before execution. If not specified, it is auto-assigned an incrementing BigInt starting from `1n`. | ||
| */ | ||
| id?: string; | ||
| } & TaskOptions & TimeoutOptions; | ||
| export type TaskOptions = { | ||
| /** | ||
| [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) for cancellation of the operation. When aborted, it will be removed from the queue and the `queue.add()` call will reject with an `AbortError`. If the operation is already running, the signal will need to be handled by the operation itself. | ||
| @example | ||
| ``` | ||
| import PQueue, {AbortError} from 'p-queue'; | ||
| import got, {CancelError} from 'got'; | ||
| const queue = new PQueue(); | ||
| const controller = new AbortController(); | ||
| try { | ||
| await queue.add(({signal}) => { | ||
| const request = got('https://sindresorhus.com'); | ||
| signal.addEventListener('abort', () => { | ||
| request.cancel(); | ||
| }); | ||
| try { | ||
| return await request; | ||
| } catch (error) { | ||
| if (!(error instanceof CancelError)) { | ||
| throw error; | ||
| } | ||
| } | ||
| }, {signal: controller.signal}); | ||
| } catch (error) { | ||
| if (!(error instanceof AbortError)) { | ||
| throw error; | ||
| } | ||
| } | ||
| ``` | ||
| */ | ||
| readonly signal?: AbortSignal; | ||
| }; | ||
| export {}; |
| export {}; |
Long strings
Supply chain riskContains long string literals, which may be a sign of obfuscated or packed code.
Found 1 instance in 1 package
Long strings
Supply chain riskContains long string literals, which may be a sign of obfuscated or packed code.
Found 1 instance in 1 package
76804
6.05%1122
2.94%1054
3.03%4
-20%13
-13.33%