Socket
Socket
Sign inDemoInstall

p-queue

Package Overview
Dependencies
Maintainers
2
Versions
44
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

p-queue - npm Package Compare versions

Comparing version 4.0.0 to 5.0.0

192

index.d.ts
/// <reference types="node"/>
import {EventEmitter} from 'events';
export interface QueueAddOptions {
[key: string]: unknown;
}
declare namespace PQueue {
interface QueueAddOptions {
[key: string]: unknown;
}
export interface QueueClass<EnqueueOptionsType extends QueueAddOptions> {
size: number;
interface QueueClass<EnqueueOptionsType extends QueueAddOptions> {
size: number;
enqueue(run: () => void, options?: EnqueueOptionsType): void;
enqueue(run: () => void, options?: EnqueueOptionsType): void;
dequeue(): (() => void) | undefined;
}
dequeue(): (() => void) | undefined;
}
export interface QueueClassConstructor<EnqueueOptionsType extends QueueAddOptions> {
new(): QueueClass<EnqueueOptionsType>;
}
interface QueueClassConstructor<EnqueueOptionsType extends QueueAddOptions> {
new (): QueueClass<EnqueueOptionsType>;
}
export interface Options<EnqueueOptionsType extends QueueAddOptions> {
/**
* Concurrency limit. Minimum: `1`.
*
* @default Infinity
*/
concurrency?: number;
interface Options<EnqueueOptionsType extends QueueAddOptions> {
/**
Concurrency limit. Minimum: `1`.
/**
* Whether queue tasks within concurrency limit, are auto-executed as soon as they're added.
*
* @default true
*/
autoStart?: boolean;
@default Infinity
*/
concurrency?: number;
/**
* Class with a `enqueue` and `dequeue` method, and a `size` getter. See the [Custom QueueClass](https://github.com/sindresorhus/p-queue#custom-queueclass) section.
*/
queueClass?: QueueClassConstructor<EnqueueOptionsType>;
/**
Whether queue tasks within concurrency limit, are auto-executed as soon as they're added.
/**
* The max number of runs in the given interval of time. Minimum: `1`.
*
* @default Infinity
*/
intervalCap?: number;
@default true
*/
autoStart?: boolean;
/**
* The length of time in milliseconds before the interval count resets. Must be finite. Minimum: `0`.
*
* @default 0
*/
interval?: number;
/**
Class with a `enqueue` and `dequeue` method, and a `size` getter. See the [Custom QueueClass](https://github.com/sindresorhus/p-queue#custom-queueclass) section.
*/
queueClass?: QueueClassConstructor<EnqueueOptionsType>;
/**
* Whether the task must finish in the given interval or will be carried over into the next interval count.
*
* @default false
*/
carryoverConcurrencyCount?: boolean;
}
/**
The max number of runs in the given interval of time. Minimum: `1`.
export interface DefaultAddOptions {
/**
* Priority of operation. Operations with greater priority will be scheduled first.
*
* @default 0
*/
priority?: number;
@default Infinity
*/
intervalCap?: number;
/**
The length of time in milliseconds before the interval count resets. Must be finite. Minimum: `0`.
@default 0
*/
interval?: number;
/**
Whether the task must finish in the given interval or will be carried over into the next interval count.
@default false
*/
carryoverConcurrencyCount?: boolean;
}
interface DefaultAddOptions {
/**
Priority of operation. Operations with greater priority will be scheduled first.
@default 0
*/
priority?: number;
}
type Task<TaskResultType> =
| (() => PromiseLike<TaskResultType>)
| (() => TaskResultType);
}
export type Task<TaskResultType> =
| (() => PromiseLike<TaskResultType>)
| (() => TaskResultType);
/**
* Promise queue with concurrency control.
*/
export default class PQueue<
EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions
Promise queue with concurrency control.
*/
declare class PQueue<
EnqueueOptionsType extends PQueue.QueueAddOptions = PQueue.DefaultAddOptions
> extends EventEmitter {
/**
* Size of the queue.
*/
Size of the queue.
*/
readonly size: number;
/**
* Number of pending promises.
*/
Number of pending promises.
*/
readonly pending: number;
/**
* Whether the queue is currently paused.
*/
Whether the queue is currently paused.
*/
readonly isPaused: boolean;
constructor(options?: Options<EnqueueOptionsType>);
constructor(options?: PQueue.Options<EnqueueOptionsType>);
/**
* Adds a sync or async task to the queue. Always returns a promise.
*
* @param fn - Promise-returning/async function.
*/
Adds a sync or async task to the queue. Always returns a promise.
@param fn - Promise-returning/async function.
*/
add<TaskResultType>(
fn: Task<TaskResultType>,
fn: PQueue.Task<TaskResultType>,
options?: EnqueueOptionsType

@@ -109,9 +111,9 @@ ): Promise<TaskResultType>;

/**
* Same as `.add()`, but accepts an array of sync or async functions.
*
* @param fn - Array of Promise-returning/async functions.
* @returns A promise that resolves when all functions are resolved.
*/
Same as `.add()`, but accepts an array of sync or async functions.
@param fn - Array of Promise-returning/async functions.
@returns A promise that resolves when all functions are resolved.
*/
addAll<TaskResultsType>(
fns: Task<TaskResultsType>[],
fns: PQueue.Task<TaskResultsType>[],
options?: EnqueueOptionsType

@@ -121,28 +123,28 @@ ): Promise<TaskResultsType[]>;

/**
* Can be called multiple times. Useful if you for example add additional items at a later time.
*
* @returns A promise that settles when the queue becomes empty.
*/
Can be called multiple times. Useful if you for example add additional items at a later time.
@returns A promise that settles when the queue becomes empty.
*/
onEmpty(): Promise<void>;
/**
* The difference with `.onEmpty` is that `.onIdle` guarantees that all work from the queue has finished. `.onEmpty` merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
*
* @returns A promise that settles when the queue becomes empty, and all promises have completed; `queue.size === 0 && queue.pending === 0`.
*/
The difference with `.onEmpty` is that `.onIdle` guarantees that all work from the queue has finished. `.onEmpty` merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
@returns A promise that settles when the queue becomes empty, and all promises have completed; `queue.size === 0 && queue.pending === 0`.
*/
onIdle(): Promise<void>;
/**
* Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via `options.autoStart = false` or by `.pause()` method.)
*/
Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via `options.autoStart = false` or by `.pause()` method.)
*/
start(): void;
/**
* Clear the queue.
*/
Clear the queue.
*/
clear(): void;
/**
* Put queue execution on hold.
*/
Put queue execution on hold.
*/
pause(): void;

@@ -164,1 +166,3 @@

}
export = PQueue;
'use strict';
const EventEmitter = require('eventemitter3');

@@ -32,7 +31,11 @@

enqueue(run, options) {
options = Object.assign({
priority: 0
}, options);
options = {
priority: 0,
...options
};
const element = {priority: options.priority, run};
const element = {
priority: options.priority,
run
};

@@ -57,7 +60,7 @@ if (this.size && this._queue[this.size - 1].priority >= options.priority) {

class PQueue extends EventEmitter {
module.exports = class PQueue extends EventEmitter {
constructor(options) {
super();
options = Object.assign({
options = {
carryoverConcurrencyCount: false,

@@ -68,4 +71,5 @@ intervalCap: Infinity,

autoStart: true,
queueClass: PriorityQueue
}, options);
queueClass: PriorityQueue,
...options
};

@@ -144,3 +148,5 @@ if (!(typeof options.concurrency === 'number' && options.concurrency >= 1)) {

if (this._timeoutId === null) {
this._timeoutId = setTimeout(() => this._onResumeInterval(), delay);
this._timeoutId = setTimeout(() => {
this._onResumeInterval();
}, delay);
}

@@ -202,5 +208,5 @@

add(fn, options) {
async add(fn, options) {
return new Promise((resolve, reject) => {
const run = () => {
const run = async () => {
this._pendingCount++;

@@ -210,16 +216,8 @@ this._intervalCount++;

try {
Promise.resolve(fn()).then(
val => {
resolve(val);
this._next();
},
err => {
reject(err);
this._next();
}
);
resolve(await fn());
} catch (error) {
reject(error);
this._next();
}
this._next();
};

@@ -232,3 +230,3 @@

addAll(fns, options) {
async addAll(fns, options) {
return Promise.all(fns.map(fn => this.add(fn, options)));

@@ -254,6 +252,6 @@ }

onEmpty() {
async onEmpty() {
// Instantly resolve if the queue is empty
if (this.queue.size === 0) {
return Promise.resolve();
return;
}

@@ -270,6 +268,6 @@

onIdle() {
async onIdle() {
// Instantly resolve if none pending and if nothing else is queued
if (this._pendingCount === 0 && this.queue.size === 0) {
return Promise.resolve();
return;
}

@@ -297,5 +295,2 @@

}
}
module.exports = PQueue;
module.exports.default = PQueue;
};
{
"name": "p-queue",
"version": "4.0.0",
"version": "5.0.0",
"description": "Promise queue with concurrency control",

@@ -8,6 +8,6 @@ "license": "MIT",

"engines": {
"node": ">=6"
"node": ">=8"
},
"scripts": {
"test": "xo && nyc ava && tsd-check",
"test": "xo && nyc ava && tsd",
"bench": "node bench.js"

@@ -45,14 +45,14 @@ },

"devDependencies": {
"@types/node": "^11.9.6",
"ava": "^1.2.1",
"benchmark": "^2.1.2",
"codecov": "^3.1.0",
"@types/node": "^11.13.0",
"ava": "^1.4.1",
"benchmark": "^2.1.4",
"codecov": "^3.3.0",
"delay": "^4.1.0",
"in-range": "^1.0.0",
"nyc": "^13.0.1",
"nyc": "^13.3.0",
"random-int": "^1.0.0",
"time-span": "^2.0.0",
"tsd-check": "^0.3.0",
"tsd": "^0.7.2",
"xo": "^0.24.0"
}
}

@@ -25,13 +25,17 @@ # p-queue [![Build Status](https://travis-ci.org/sindresorhus/p-queue.svg?branch=master)](https://travis-ci.org/sindresorhus/p-queue) [![codecov](https://codecov.io/gh/sindresorhus/p-queue/branch/master/graph/badge.svg)](https://codecov.io/gh/sindresorhus/p-queue)

queue.add(() => got('sindresorhus.com')).then(() => {
(async () => {
await queue.add(() => got('sindresorhus.com'));
console.log('Done: sindresorhus.com');
});
})();
queue.add(() => got('ava.li')).then(() => {
(async () => {
await queue.add(() => got('ava.li'));
console.log('Done: ava.li');
});
})();
getUnicornTask().then(task => queue.add(task)).then(() => {
(async () => {
const task = await getUnicornTask();
await queue.add(task);
console.log('Done: Unicorn task');
});
})();
```

@@ -195,7 +199,12 @@

delay(200).then(() => {
(async () => {
await delay(200);
console.log(`8. Pending promises: ${queue.pending}`);
//=> '8. Pending promises: 0'
queue.add(() => Promise.resolve('πŸ™')).then(console.log.bind(null, '11. Resolved'));
(async () => {
await queue.add(async () => 'πŸ™');
console.log('11. Resolved')
})();

@@ -207,19 +216,26 @@ console.log('9. Added πŸ™');

queue.onIdle().then(() => {
console.log('12. All work is done');
});
});
await queue.onIdle();
console.log('12. All work is done');
})();
queue.add(() => Promise.resolve('πŸ¦„')).then(console.log.bind(null, '5. Resolved'));
(async () => {
await queue.add(async () => 'πŸ¦„');
console.log('5. Resolved')
})();
console.log('1. Added πŸ¦„');
queue.add(() => Promise.resolve('🐴')).then(console.log.bind(null, '6. Resolved'));
(async () => {
await queue.add(async () => '🐴');
console.log('6. Resolved')
})();
console.log('2. Added 🐴');
queue.onEmpty().then(() => {
(async () => {
await queue.onEmpty();
console.log('7. Queue is empty');
});
})();
console.log(`3. Queue size: ${queue.size}`);
//=> '3. Queue size: 1`
console.log(`4. Pending promises: ${queue.pending}`);

@@ -226,0 +242,0 @@ //=> '4. Pending promises: 1'

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚑️ by Socket Inc