Big News: Socket raises $60M Series C at a $1B valuation to secure software supply chains for AI-driven development.Announcement β†’
Sign In

p-queue

Package Overview
Dependencies
Maintainers
1
Versions
53
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

p-queue - npm Package Compare versions

Comparing version
9.0.1
to
9.1.0
+145
-22
dist/index.js

@@ -19,2 +19,6 @@ import { EventEmitter } from 'eventemitter3';

#timeoutId;
#strict;
// Circular buffer implementation for better performance
#strictTicks = [];
#strictTicksStartIndex = 0;
#queue;

@@ -56,2 +60,3 @@ #queueClass;

queueClass: PriorityQueue,
strict: false,
...options,

@@ -65,2 +70,8 @@ };

}
if (options.strict && options.interval === 0) {
throw new TypeError('The `strict` option requires a non-zero `interval`');
}
if (options.strict && options.intervalCap === Number.POSITIVE_INFINITY) {
throw new TypeError('The `strict` option requires a finite `intervalCap`');
}
// TODO: Remove this fallback in the next major version

@@ -72,2 +83,3 @@ // eslint-disable-next-line @typescript-eslint/no-deprecated

this.#interval = options.interval;
this.#strict = options.strict;
this.#queue = new options.queueClass();

@@ -83,4 +95,54 @@ this.#queueClass = options.queueClass;

}
#cleanupStrictTicks(now) {
// Remove ticks outside the current interval window using circular buffer approach
while (this.#strictTicksStartIndex < this.#strictTicks.length) {
const oldestTick = this.#strictTicks[this.#strictTicksStartIndex];
if (oldestTick !== undefined && now - oldestTick >= this.#interval) {
this.#strictTicksStartIndex++;
}
else {
break;
}
}
// Compact the array when it becomes inefficient or fully consumed
// Compact when: (start index is large AND more than half wasted) OR all ticks expired
const shouldCompact = (this.#strictTicksStartIndex > 100 && this.#strictTicksStartIndex > this.#strictTicks.length / 2)
|| this.#strictTicksStartIndex === this.#strictTicks.length;
if (shouldCompact) {
this.#strictTicks = this.#strictTicks.slice(this.#strictTicksStartIndex);
this.#strictTicksStartIndex = 0;
}
}
// Helper methods for interval consumption
#consumeIntervalSlot(now) {
if (this.#strict) {
this.#strictTicks.push(now);
}
else {
this.#intervalCount++;
}
}
#rollbackIntervalSlot() {
if (this.#strict) {
// Pop from the end of the actual data (not from start index)
if (this.#strictTicks.length > this.#strictTicksStartIndex) {
this.#strictTicks.pop();
}
}
else if (this.#intervalCount > 0) {
this.#intervalCount--;
}
}
#getActiveTicksCount() {
return this.#strictTicks.length - this.#strictTicksStartIndex;
}
get #doesIntervalAllowAnother() {
return this.#isIntervalIgnored || this.#intervalCount < this.#intervalCap;
if (this.#isIntervalIgnored) {
return true;
}
if (this.#strict) {
// Cleanup already done by #isIntervalPausedAt before this is called
return this.#getActiveTicksCount() < this.#intervalCap;
}
return this.#intervalCount < this.#intervalCap;
}

@@ -99,8 +161,24 @@ get #doesConcurrentAllowAnother() {

#onResumeInterval() {
this.#onInterval(); // Already schedules update
// Clear timeout ID before processing to prevent race condition
// Must clear before #onInterval to allow new timeouts to be scheduled
this.#timeoutId = undefined;
this.#onInterval();
this.#initializeIntervalIfNeeded();
this.#timeoutId = undefined;
}
get #isIntervalPaused() {
const now = Date.now();
#isIntervalPausedAt(now) {
// Strict mode: check if we need to wait for oldest tick to age out
if (this.#strict) {
this.#cleanupStrictTicks(now);
// If at capacity, need to wait for oldest tick to age out
const activeTicksCount = this.#getActiveTicksCount();
if (activeTicksCount >= this.#intervalCap) {
const oldestTick = this.#strictTicks[this.#strictTicksStartIndex];
// After cleanup, remaining ticks are within interval, so delay is always > 0
const delay = this.#interval - (now - oldestTick);
this.#createIntervalTimeout(delay);
return true;
}
return false;
}
// Fixed window mode (original logic)
if (this.#intervalId === undefined) {

@@ -160,2 +238,7 @@ const delay = this.#intervalEnd - now;

this.#clearTimeoutTimer();
// Compact strict ticks when idle to free memory
if (this.#strict && this.#strictTicksStartIndex > 0) {
const now = Date.now();
this.#cleanupStrictTicks(now);
}
this.emit('idle');

@@ -167,12 +250,11 @@ }

if (!this.#isPaused) {
const canInitializeInterval = !this.#isIntervalPaused;
const now = Date.now();
const canInitializeInterval = !this.#isIntervalPausedAt(now);
if (this.#doesIntervalAllowAnother && this.#doesConcurrentAllowAnother) {
const job = this.#queue.dequeue();
// Increment interval count immediately to prevent race conditions
if (!this.#isIntervalIgnored) {
this.#intervalCount++;
this.#consumeIntervalSlot(now);
this.#scheduleRateLimitUpdate();
}
this.emit('active');
this.#lastExecutionTime = Date.now();
job();

@@ -191,2 +273,6 @@ if (canInitializeInterval) {

}
// Strict mode uses timeouts instead of interval timers
if (this.#strict) {
return;
}
this.#intervalId = setInterval(() => {

@@ -198,6 +284,9 @@ this.#onInterval();

#onInterval() {
if (this.#intervalCount === 0 && this.#pending === 0 && this.#intervalId) {
this.#clearIntervalTimer();
// Non-strict mode uses interval timers and intervalCount
if (!this.#strict) {
if (this.#intervalCount === 0 && this.#pending === 0 && this.#intervalId) {
this.#clearIntervalTimer();
}
this.#intervalCount = this.#carryoverIntervalCount ? this.#pending : 0;
}
this.#intervalCount = this.#carryoverIntervalCount ? this.#pending : 0;
this.#processQueue();

@@ -266,7 +355,8 @@ this.#scheduleRateLimitUpdate();

async add(function_, options = {}) {
// In case `id` is not defined.
options.id ??= (this.#idAssigner++).toString();
// Create a copy to avoid mutating the original options object
options = {
timeout: this.timeout,
...options,
// Assign unique ID if not provided
id: options.id ?? (this.#idAssigner++).toString(),
};

@@ -293,6 +383,3 @@ return new Promise((resolve, reject) => {

catch (error) {
// Decrement the counter that was already incremented
if (!this.#isIntervalIgnored) {
this.#intervalCount--;
}
this.#rollbackIntervalConsumption();
// Clean up tracking before throwing

@@ -302,2 +389,3 @@ this.#runningTasks.delete(taskSymbol);

}
this.#lastExecutionTime = Date.now();
let operation = function_({ signal: options.signal });

@@ -369,2 +457,7 @@ if (options.timeout) {

this.#queue = new this.#queueClass();
// Clear interval timer since queue is now empty (consistent with #tryToStartAnother)
this.#clearIntervalTimer();
// Note: We preserve strict mode rate-limiting state (ticks and timeout)
// because clear() only clears queued tasks, not rate limit history.
// This ensures that rate limits are still enforced after clearing the queue.
// Note: We don't clear #runningTasks as those tasks are still running

@@ -374,2 +467,9 @@ // They will be removed when they complete in the finally block

this.#updateRateLimitState();
// Emit events so waiters (onEmpty, onIdle, onSizeLessThan) can resolve
this.emit('empty');
if (this.#pending === 0) {
this.#clearTimeoutTimer();
this.emit('idle');
}
this.emit('next');
}

@@ -473,3 +573,3 @@ /**

// eslint-disable-next-line @typescript-eslint/promise-function-async
async onError() {
onError() {
return new Promise((_resolve, reject) => {

@@ -549,7 +649,30 @@ const handleError = (error) => {

}
#rollbackIntervalConsumption() {
if (this.#isIntervalIgnored) {
return;
}
this.#rollbackIntervalSlot();
this.#scheduleRateLimitUpdate();
}
#updateRateLimitState() {
const previous = this.#rateLimitedInInterval;
const shouldBeRateLimited = !this.#isIntervalIgnored
&& this.#intervalCount >= this.#intervalCap
&& this.#queue.size > 0;
// Early exit if rate limiting is disabled or queue is empty
if (this.#isIntervalIgnored || this.#queue.size === 0) {
if (previous) {
this.#rateLimitedInInterval = false;
this.emit('rateLimitCleared');
}
return;
}
// Get the current count based on mode
let count;
if (this.#strict) {
const now = Date.now();
this.#cleanupStrictTicks(now);
count = this.#getActiveTicksCount();
}
else {
count = this.#intervalCount;
}
const shouldBeRateLimited = count >= this.#intervalCap;
if (shouldBeRateLimited !== previous) {

@@ -556,0 +679,0 @@ this.#rateLimitedInInterval = shouldBeRateLimited;

@@ -70,2 +70,18 @@ import { type Queue, type RunFunction } from './queue.js';

readonly carryoverConcurrencyCount?: boolean;
/**
Whether to use strict mode for rate limiting (sliding window algorithm).
When enabled, ensures that no more than `intervalCap` tasks execute in any rolling `interval` window, rather than resetting the count at fixed intervals. This provides more predictable and evenly distributed execution.
@default false
For example, with `intervalCap: 2` and `interval: 1000`:
- __Default mode (fixed window)__: Tasks can burst at window boundaries. You could execute 2 tasks at 999ms and 2 more at 1000ms, resulting in 4 tasks within 1ms.
- __Strict mode (sliding window)__: Enforces that no more than 2 tasks execute in any 1000ms rolling window, preventing bursts.
Strict mode is more resource-intensive as it tracks individual execution timestamps. Use it when you need guaranteed rate-limit compliance, such as when interacting with APIs that enforce strict rate limits.
The `carryoverIntervalCount` option has no effect when `strict` mode is enabled, as strict mode tracks actual execution timestamps rather than counting pending tasks.
*/
readonly strict?: boolean;
} & TimeoutOptions;

@@ -120,4 +136,4 @@ export type QueueAddOptions = {

*/
readonly signal?: AbortSignal;
readonly signal?: AbortSignal | undefined;
};
export {};
+1
-1
{
"name": "p-queue",
"version": "9.0.1",
"version": "9.1.0",
"description": "Promise queue with concurrency control",

@@ -5,0 +5,0 @@ "license": "MIT",

@@ -143,2 +143,21 @@ # p-queue

##### strict
Type: `boolean`\
Default: `false`
Whether to use strict mode for rate limiting (sliding window algorithm).
When enabled, ensures that no more than `intervalCap` tasks execute in any rolling `interval` window, rather than resetting the count at fixed intervals. This provides more predictable and evenly distributed execution.
For example, with `intervalCap: 2` and `interval: 1000`:
- **Default mode (fixed window)**: Tasks can burst at window boundaries. You could execute 2 tasks at 999ms and 2 more at 1000ms, resulting in 4 tasks within 1ms.
- **Strict mode (sliding window)**: Enforces that no more than 2 tasks execute in any 1000ms rolling window, preventing bursts.
> [!NOTE]
> Strict mode is more resource-intensive as it tracks individual execution timestamps. Use it when you need guaranteed rate-limit compliance, such as when interacting with APIs that enforce strict rate limits.
> [!NOTE]
> The `carryoverIntervalCount` option has no effect when `strict` mode is enabled, as strict mode tracks actual execution timestamps rather than counting pending tasks.
### queue

@@ -559,3 +578,3 @@

Emitted every time the queue becomes empty and all promises have completed; `queue.size === 0 && queue.pending === 0`.
Emitted whenever the queue becomes idle: both empty and with zero running tasks (`size === 0 && pending === 0`). If no tasks are ever added, it never fires.

@@ -833,2 +852,14 @@ The difference with `empty` is that `idle` guarantees that all work from the queue has finished. `empty` merely signals that the queue is empty, but it could mean that some promises haven't completed yet.

#### When should I use `strict` mode for rate limiting?
Use `strict: true` when:
- You're interacting with APIs that enforce strict rate limits and will throttle or block you if you exceed them, even briefly
- You've experienced issues with the default fixed window mode (such as [#126](https://github.com/sindresorhus/p-queue/issues/126))
- You need guaranteed compliance with rate limits for any rolling time window
Use the default fixed window mode when:
- You don't have strict rate limit requirements
- Performance is more important than perfect rate limit distribution
- You're rate limiting for backpressure management rather than external API constraints
#### How do I implement backpressure?

@@ -835,0 +866,0 @@

import { type Queue, type RunFunction } from './queue.js';
type TimeoutOptions = {
/**
Per-operation timeout in milliseconds. Operations will throw a `TimeoutError` if they don't complete within the specified time.
The timeout begins when the operation is dequeued and starts execution, not while it's waiting in the queue.
@default undefined
Can be overridden per task using the `timeout` option in `.add()`:
@example
```
const queue = new PQueue({timeout: 5000});
// This task uses the global 5s timeout
await queue.add(() => fetchData());
// This task has a 10s timeout
await queue.add(() => slowTask(), {timeout: 10000});
```
*/
timeout?: number;
};
export type Options<QueueType extends Queue<RunFunction, QueueOptions>, QueueOptions extends QueueAddOptions> = {
/**
Concurrency limit.
Minimum: `1`.
@default Infinity
*/
readonly concurrency?: number;
/**
Whether queue tasks within concurrency limit, are auto-executed as soon as they're added.
@default true
*/
readonly autoStart?: boolean;
/**
Class with a `enqueue` and `dequeue` method, and a `size` getter. See the [Custom QueueClass](https://github.com/sindresorhus/p-queue#custom-queueclass) section.
*/
readonly queueClass?: new () => QueueType;
/**
The max number of runs in the given interval of time.
Minimum: `1`.
@default Infinity
*/
readonly intervalCap?: number;
/**
The length of time in milliseconds before the interval count resets. Must be finite.
Minimum: `0`.
@default 0
*/
readonly interval?: number;
/**
Whether the task must finish in the given interval or will be carried over into the next interval count.
@default false
*/
readonly carryoverIntervalCount?: boolean;
/**
@deprecated Renamed to `carryoverIntervalCount`.
*/
readonly carryoverConcurrencyCount?: boolean;
} & TimeoutOptions;
export type QueueAddOptions = {
/**
Priority of operation. Operations with greater priority will be scheduled first.
@default 0
*/
readonly priority?: number;
/**
Unique identifier for the promise function, used to update its priority before execution. If not specified, it is auto-assigned an incrementing BigInt starting from `1n`.
*/
id?: string;
} & TaskOptions & TimeoutOptions;
export type TaskOptions = {
/**
[`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) for cancellation of the operation. When aborted, it will be removed from the queue and the `queue.add()` call will reject with an `AbortError`. If the operation is already running, the signal will need to be handled by the operation itself.
@example
```
import PQueue, {AbortError} from 'p-queue';
import got, {CancelError} from 'got';
const queue = new PQueue();
const controller = new AbortController();
try {
await queue.add(({signal}) => {
const request = got('https://sindresorhus.com');
signal.addEventListener('abort', () => {
request.cancel();
});
try {
return await request;
} catch (error) {
if (!(error instanceof CancelError)) {
throw error;
}
}
}, {signal: controller.signal});
} catch (error) {
if (!(error instanceof AbortError)) {
throw error;
}
}
```
*/
readonly signal?: AbortSignal;
};
export {};