+38
-2
@@ -33,2 +33,3 @@ import { EventEmitter } from 'eventemitter3'; | ||
| #runningTasks = new Map(); | ||
| #queueAbortListenerCleanupFunctions = new Set(); | ||
| /** | ||
@@ -356,3 +357,6 @@ Get or set the default timeout for all tasks. Can be changed at runtime. | ||
| const taskSymbol = Symbol(`task-${options.id}`); | ||
| this.#queue.enqueue(async () => { | ||
| let cleanupQueueAbortHandler = () => undefined; | ||
| const run = async () => { | ||
| // Task is now running β remove the queued-state abort listener | ||
| cleanupQueueAbortHandler(); | ||
| this.#pending++; | ||
@@ -416,3 +420,32 @@ // Track this running task | ||
| } | ||
| }, options); | ||
| }; | ||
| this.#queue.enqueue(run, options); | ||
| const removeQueuedTask = () => { | ||
| if (this.#queue instanceof PriorityQueue) { | ||
| this.#queue.remove(run); | ||
| return; | ||
| } | ||
| this.#queue.remove?.(options.id); // Intentionally best-effort: queued abort removal is only supported for queue classes that implement `.remove()`. | ||
| }; | ||
| // Handle abort while task is waiting in the queue | ||
| if (options.signal) { | ||
| const { signal } = options; | ||
| const queueAbortHandler = () => { | ||
| cleanupQueueAbortHandler(); | ||
| removeQueuedTask(); | ||
| reject(signal.reason); | ||
| this.#tryToStartAnother(); | ||
| this.emit('next'); | ||
| }; | ||
| cleanupQueueAbortHandler = () => { | ||
| signal.removeEventListener('abort', queueAbortHandler); | ||
| this.#queueAbortListenerCleanupFunctions.delete(cleanupQueueAbortHandler); | ||
| }; | ||
| if (signal.aborted) { | ||
| queueAbortHandler(); | ||
| return; | ||
| } | ||
| signal.addEventListener('abort', queueAbortHandler, { once: true }); | ||
| this.#queueAbortListenerCleanupFunctions.add(cleanupQueueAbortHandler); | ||
| } | ||
| this.emit('add'); | ||
@@ -446,2 +479,5 @@ this.#tryToStartAnother(); | ||
| clear() { | ||
| for (const cleanupQueueAbortHandler of this.#queueAbortListenerCleanupFunctions) { | ||
| cleanupQueueAbortHandler(); | ||
| } | ||
| this.#queue = new this.#queueClass(); | ||
@@ -448,0 +484,0 @@ // Clear interval timer since queue is now empty (consistent with #tryToStartAnother) |
@@ -10,2 +10,4 @@ import { type Queue, type RunFunction } from './queue.js'; | ||
| setPriority(id: string, priority: number): void; | ||
| remove(id: string): void; | ||
| remove(run: RunFunction): void; | ||
| dequeue(): RunFunction | undefined; | ||
@@ -12,0 +14,0 @@ filter(options: Readonly<Partial<PriorityQueueOptions>>): RunFunction[]; |
@@ -26,2 +26,13 @@ import lowerBound from './lower-bound.js'; | ||
| } | ||
| remove(idOrRun) { | ||
| const index = this.#queue.findIndex((element) => { | ||
| if (typeof idOrRun === 'string') { | ||
| return element.id === idOrRun; | ||
| } | ||
| return element.run === idOrRun; | ||
| }); | ||
| if (index !== -1) { | ||
| this.#queue.splice(index, 1); | ||
| } | ||
| } | ||
| dequeue() { | ||
@@ -28,0 +39,0 @@ const item = this.#queue.shift(); |
+1
-0
@@ -8,2 +8,3 @@ export type RunFunction = () => Promise<unknown>; | ||
| setPriority: (id: string, priority: number) => void; | ||
| remove?: (id: string) => void; | ||
| }; |
+1
-1
| { | ||
| "name": "p-queue", | ||
| "version": "9.1.0", | ||
| "version": "9.1.1", | ||
| "description": "Promise queue with concurrency control", | ||
@@ -5,0 +5,0 @@ "license": "MIT", |
+25
-1
@@ -352,2 +352,5 @@ # p-queue | ||
| > [!WARNING] | ||
| > Any promises returned by `.add()` for tasks that were waiting in the queue (not yet running) will **never settle** after calling `.clear()`. This can cause "unsettled top-level await" warnings or hang your process. If you need the promises to settle, use `AbortSignal` for cancellation instead β aborting rejects the `.add()` promise cleanly. | ||
| #### .size | ||
@@ -881,4 +884,8 @@ | ||
| Use `AbortSignal` for targeted cancellation. Aborting removes a waiting task and rejects the `.add()` promise. For bulk operations, use `queue.clear()` or share one `AbortController` across tasks. | ||
| Use `AbortSignal` for targeted cancellation. When aborted, a queued task is removed and the `.add()` promise rejects. For bulk cancellation, share one `AbortController` across tasks. Avoid using `queue.clear()` alone for cancellation: it removes queued tasks but their `.add()` promises will never settle, causing dangling promises. | ||
| Note that aborting only rejects the promise returned by `.add()` β it does not automatically stop the async work inside your function. For a running task, you must handle the signal inside the function itself (see the example below). | ||
| Single-task cancellation: | ||
| ```js | ||
@@ -895,2 +902,19 @@ import PQueue from 'p-queue'; | ||
| Bulk cancellation using a shared `AbortController`: | ||
| ```js | ||
| import PQueue from 'p-queue'; | ||
| const queue = new PQueue({concurrency: 2}); | ||
| const controller = new AbortController(); | ||
| // All tasks share the same signal | ||
| queue.add(({signal}) => doWork(signal), {signal: controller.signal}).catch(() => {}); | ||
| queue.add(({signal}) => doWork(signal), {signal: controller.signal}).catch(() => {}); | ||
| queue.add(({signal}) => doWork(signal), {signal: controller.signal}).catch(() => {}); | ||
| // Cancel all queued (and signal running) tasks β promises reject cleanly | ||
| controller.abort(); | ||
| ``` | ||
| Direct removal methods are not provided as they would leak internals and risk dangling promises. | ||
@@ -897,0 +921,0 @@ |
Long strings
Supply chain riskContains long string literals, which may be a sign of obfuscated or packed code.
Found 1 instance in 1 package
Long strings
Supply chain riskContains long string literals, which may be a sign of obfuscated or packed code.
Found 1 instance in 1 package
80334
4.6%1172
4.46%1078
2.28%