Socket
Socket
Sign inDemoInstall

p-queue

Package Overview
Dependencies
1
Maintainers
2
Versions
44
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

p-queue


Version published
Maintainers
2
Created

Package description

What is p-queue?

The p-queue package is a promise-based queue for Node.js that enables the execution of tasks in a controlled concurrency environment. It allows for rate-limiting, pausing, and resuming of tasks, and ensures that tasks are executed in a predictable manner.

What are p-queue's main functionalities?

Concurrency Control

This feature allows you to control the number of tasks that are run concurrently. In this example, the concurrency is set to 2, meaning that only two tasks will be processed at the same time.

const PQueue = require('p-queue').default;
const queue = new PQueue({concurrency: 2});

async function task(input) {
  // Task implementation
}

queue.add(() => task('task 1'));
queue.add(() => task('task 2'));
queue.add(() => task('task 3'));

Rate Limiting

This feature allows you to limit the rate at which tasks are executed. In this example, the queue is configured to process a maximum of 2 tasks per 1000 milliseconds.

const PQueue = require('p-queue').default;
const queue = new PQueue({
  interval: 1000,
  intervalCap: 2
});

async function task(input) {
  // Task implementation
}

for (let i = 0; i < 6; i++) {
  queue.add(() => task(`task ${i + 1}`));
}

Pausing and Resuming

This feature allows you to pause the processing of tasks and resume it later. In this example, the queue is paused immediately after creation, tasks are added, and then the queue is resumed after a timeout.

const PQueue = require('p-queue').default;
const queue = new PQueue();

queue.pause();

async function task(input) {
  // Task implementation
}

queue.add(() => task('task 1'));
queue.add(() => task('task 2'));

setTimeout(() => {
  queue.start(); // Resume the queue after 2000ms
}, 2000);

Priority Queueing

This feature allows you to add tasks with a priority level. Tasks with a lower priority number are executed first. In this example, the 'high priority' task will be executed before the 'low priority' task.

const PQueue = require('p-queue').default;
const queue = new PQueue();

async function task(input) {
  // Task implementation
}

queue.add(() => task('low priority'), {priority: 1});
queue.add(() => task('high priority'), {priority: 0});

Other packages similar to p-queue

Readme

Source

p-queue Build Status codecov

Promise queue with concurrency control

Useful for rate-limiting async (or sync) operations. For example, when interacting with a REST API or when doing CPU/memory intensive tasks.

Install

$ npm install p-queue

Usage

Here we run only one promise at the time. For example, set concurrency to 4 to run four promises at the same time.

const PQueue = require('p-queue');
const got = require('got');

const queue = new PQueue({concurrency: 1});

queue.add(() => got('sindresorhus.com')).then(() => {
	console.log('Done: sindresorhus.com');
});

queue.add(() => got('ava.li')).then(() => {
	console.log('Done: ava.li');
});

getUnicornTask().then(task => queue.add(task)).then(() => {
	console.log('Done: Unicorn task');
});

API

PQueue([options])

Returns a new queue instance, which is an EventEmitter3 subclass.

options

Type: Object

concurrency

Type: number
Default: Infinity
Minimum: 1

Concurrency limit.

autoStart

Type: boolean
Default: true

Whether queue tasks within concurrency limit, are auto-executed as soon as they're added.

queueClass

Type: Function

Class with a enqueue and dequeue method, and a size getter. See the Custom QueueClass section.

intervalCap

Type: number
Default: Infinity
Minimum: 1

The max number of runs in the given interval of time.

interval

Type: number
Default: 0
Minimum: 0

The length of time in milliseconds before the interval count resets. Must be finite.

carryoverConcurrencyCount

Type: boolean
Default: false

Whether the task must finish in the given interval or will be carried over into the next interval count.

queue

PQueue instance.

.add(fn, [options])

Adds a sync or async task to the queue. Always returns a promise.

fn

Type: Function

Promise-returning/async function.

options

Type: Object

priority

Type: number
Default: 0

Priority of operation. Operations with greater priority will be scheduled first.

.addAll(fns, [options])

Same as .add(), but accepts an array of sync or async functions and returns a promise that resolves when all functions are resolved.

.pause()

Put queue execution on hold.

.start()

Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via options.autoStart = false or by .pause() method.)

.onEmpty()

Returns a promise that settles when the queue becomes empty.

Can be called multiple times. Useful if you for example add additional items at a later time.

.onIdle()

Returns a promise that settles when the queue becomes empty, and all promises have completed; queue.size === 0 && queue.pending === 0.

The difference with .onEmpty is that .onIdle guarantees that all work from the queue has finished. .onEmpty merely signals that the queue is empty, but it could mean that some promises haven't completed yet.

.clear()

Clear the queue.

.size

Size of the queue.

.pending

Number of pending promises.

.isPaused

Whether the queue is currently paused.

Events

active

Emitted as each item is processed in the queue for the purpose of tracking progress.

const delay = require('delay');
const PQueue = require('p-queue');

const queue = new PQueue({concurrency: 2});

let count = 0;
queue.on('active', () => {
	console.log(`Working on item #${++count}.  Size: ${queue.size}  Pending: ${queue.pending}`);
});

queue.add(() => Promise.resolve());
queue.add(() => delay(2000));
queue.add(() => Promise.resolve());
queue.add(() => Promise.resolve());
queue.add(() => delay(500));

Advanced example

A more advanced example to help you understand the flow.

const delay = require('delay');
const PQueue = require('p-queue');

const queue = new PQueue({concurrency: 1});

delay(200).then(() => {
	console.log(`8. Pending promises: ${queue.pending}`);
	//=> '8. Pending promises: 0'

	queue.add(() => Promise.resolve('🐙')).then(console.log.bind(null, '11. Resolved'));

	console.log('9. Added 🐙');

	console.log(`10. Pending promises: ${queue.pending}`);
	//=> '10. Pending promises: 1'

	queue.onIdle().then(() => {
		console.log('12. All work is done');
	});
});

queue.add(() => Promise.resolve('🦄')).then(console.log.bind(null, '5. Resolved'));
console.log('1. Added 🦄');

queue.add(() => Promise.resolve('🐴')).then(console.log.bind(null, '6. Resolved'));
console.log('2. Added 🐴');

queue.onEmpty().then(() => {
	console.log('7. Queue is empty');
});

console.log(`3. Queue size: ${queue.size}`);
//=> '3. Queue size: 1`
console.log(`4. Pending promises: ${queue.pending}`);
//=> '4. Pending promises: 1'
$ node example.js
1. Added 🦄
2. Added 🐴
3. Queue size: 1
4. Pending promises: 1
5. Resolved 🦄
6. Resolved 🐴
7. Queue is empty
8. Pending promises: 0
9. Added 🐙
10. Pending promises: 1
11. Resolved 🐙
12. All work is done

Custom QueueClass

For implementing more complex scheduling policies, you can provide a QueueClass in the options:

class QueueClass {
	constructor() {
		this._queue = [];
	}
	enqueue(run, options) {
		this._queue.push(run);
	}
	dequeue() {
		return this._queue.shift();
	}
	get size() {
		return this._queue.length;
	}
}

p-queue will call corresponding methods to put and get operations from this queue.

  • p-limit - Run multiple promise-returning & async functions with limited concurrency
  • p-throttle - Throttle promise-returning & async functions
  • p-debounce - Debounce promise-returning & async functions
  • p-all - Run promise-returning & async functions concurrently with optional limited concurrency
  • More…

License

MIT

Keywords

FAQs

Last updated on 08 Mar 2019

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc