What is p-queue?
The p-queue package is a promise-based queue for Node.js that enables the execution of tasks in a controlled concurrency environment. It allows for rate-limiting, pausing, and resuming of tasks, and ensures that tasks are executed in a predictable manner.
What are p-queue's main functionalities?
Concurrency Control
This feature allows you to control the number of tasks that are run concurrently. In this example, the concurrency is set to 2, meaning that only two tasks will be processed at the same time.
const PQueue = require('p-queue').default;
const queue = new PQueue({concurrency: 2});
async function task(input) {
// Task implementation
}
queue.add(() => task('task 1'));
queue.add(() => task('task 2'));
queue.add(() => task('task 3'));
Rate Limiting
This feature allows you to limit the rate at which tasks are executed. In this example, the queue is configured to process a maximum of 2 tasks per 1000 milliseconds.
const PQueue = require('p-queue').default;
const queue = new PQueue({
interval: 1000,
intervalCap: 2
});
async function task(input) {
// Task implementation
}
for (let i = 0; i < 6; i++) {
queue.add(() => task(`task ${i + 1}`));
}
Pausing and Resuming
This feature allows you to pause the processing of tasks and resume it later. In this example, the queue is paused immediately after creation, tasks are added, and then the queue is resumed after a timeout.
const PQueue = require('p-queue').default;
const queue = new PQueue();
queue.pause();
async function task(input) {
// Task implementation
}
queue.add(() => task('task 1'));
queue.add(() => task('task 2'));
setTimeout(() => {
queue.start(); // Resume the queue after 2000ms
}, 2000);
Priority Queueing
This feature allows you to add tasks with a priority level. Tasks with a lower priority number are executed first. In this example, the 'high priority' task will be executed before the 'low priority' task.
const PQueue = require('p-queue').default;
const queue = new PQueue();
async function task(input) {
// Task implementation
}
queue.add(() => task('low priority'), {priority: 1});
queue.add(() => task('high priority'), {priority: 0});
Other packages similar to p-queue
async
The 'async' package provides a variety of functions for working with asynchronous JavaScript, including queue management. It is similar to p-queue but offers a broader set of utilities for asynchronous control flow.
bottleneck
Bottleneck is a lightweight and powerful rate limiting library for Node.js. It is similar to p-queue in its ability to rate limit tasks but also provides features like clustering support for distributed rate limiting.
bull
Bull is a Redis-backed queue package for handling distributed jobs and messages in Node.js. It offers functionality similar to p-queue with additional features like repeatable jobs, delayed jobs, and job event listeners.
p-queue
Promise queue with concurrency control
Useful for rate-limiting async operations. For example, when interacting with a REST API or when doing CPU/memory intensive tasks.
Install
$ npm install p-queue
Usage
Here we run only one promise at the time. For example, set concurrency
to 4 to run four promises at the time.
const PQueue = require('p-queue');
const got = require('got');
const queue = new PQueue({concurrency: 1});
queue.add(() => got('sindresorhus.com')).then(() => {
console.log('Done: sindresorhus.com');
});
queue.add(() => got('ava.li')).then(() => {
console.log('Done: ava.li');
});
getUnicornTask().then(task => queue.add(task)).then(() => {
console.log('Done: Unicorn task');
});
API
PQueue([options])
Returns a new queue
instance.
options
Type: Object
concurrency
Type: number
Default: Infinity
Minimum: 1
Concurrency limit.
queueClass
Type: Function
Class with a enqueue
and dequeue
method, and a size
getter. See the Custom QueueClass section.
queue
PQueue
instance.
.add(fn, [options])
Returns the promise returned by calling fn
.
fn
Type: Function
Promise-returning/async function.
options
Type: Object
priority
Type: number
Default: 0
Priority of operation. Operations with greater priority will be scheduled first.
.addAll(fns, [options])
Same as .add()
, but accepts an array of async functions and returns a promise that resolves when all async functions are resolved.
.onEmpty()
Returns a promise that settles when the queue becomes empty.
Can be called multiple times. Useful if you for example add additional items at a later time.
.onIdle()
Returns a promise that settles when the queue becomes empty, and all promises have completed; queue.size === 0 && queue.pending === 0
.
The difference with .onEmpty
is that .onIdle
guarantees that all work from the queue has finished. .onEmpty
merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
.clear()
Clear the queue.
.size
Size of the queue.
.pending
Number of pending promises.
Advanced example
A more advanced example to help you understand the flow.
const delay = require('delay');
const PQueue = require('p-queue');
const queue = new PQueue({concurrency: 1});
delay(200).then(() => {
console.log(`8. Pending promises: ${queue.pending}`);
queue.add(() => Promise.resolve('🐙')).then(console.log.bind(null, '11. Resolved'));
console.log('9. Added 🐙');
console.log(`10. Pending promises: ${queue.pending}`);
queue.onIdle().then(() => {
console.log('12. All work is done');
});
});
queue.add(() => Promise.resolve('🦄')).then(console.log.bind(null, '5. Resolved'));
console.log('1. Added 🦄');
queue.add(() => Promise.resolve('🐴')).then(console.log.bind(null, '6. Resolved'));
console.log('2. Added 🐴');
queue.onEmpty().then(() => {
console.log('7. Queue is empty');
});
console.log(`3. Queue size: ${queue.size}`);
console.log(`4. Pending promises: ${queue.pending}`);
$ node example.js
1. Added 🦄
2. Added 🐴
3. Queue size: 1
4. Pending promises: 1
5. Resolved 🦄
6. Resolved 🐴
7. Queue is empty
8. Pending promises: 0
9. Added 🐙
10. Pending promises: 1
11. Queue is empty again
12. Resolved 🐙
Custom QueueClass
For implementing more complex scheduling policies, you can provide a QueueClass in the options:
class QueueClass {
constructor() {
this._queue = [];
}
enqueue(run, options) {
this._queue.push(run);
}
dequeue() {
return this._queue.shift();
}
get size() {
return this._queue.length;
}
}
p-queue
will call corresponding methods to put and get operations from this queue.
Related
- p-limit - Run multiple promise-returning & async functions with limited concurrency
- p-throttle - Throttle promise-returning & async functions
- p-debounce - Debounce promise-returning & async functions
- p-all - Run promise-returning & async functions concurrently with optional limited concurrency
- More…
Created by
License
MIT © Sindre Sorhus