Socket
Socket
Sign inDemoInstall

queue-promise

Package Overview
Dependencies
Maintainers
1
Versions
19
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

queue-promise - npm Package Compare versions

Comparing version 1.0.0 to 1.1.0

.eslintrc.json

342

dist/index.js
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
value: true
});

@@ -26,191 +26,235 @@

/**
* A simple and small library for promise-based queues.
* A small and simple library for promise-based queues. It will resolve enqueued
* functions concurrently at a specified speed. When a task is being resolved or
* rejected, an event will be emitted.
*
* @class Queue
* @extends EventEmitter
*/
var Queue = function (_EventEmitter) {
_inherits(Queue, _EventEmitter);
_inherits(Queue, _EventEmitter);
/**
* Initializes a new Queue instance with provided options.
*
* @param {object} options
* @param {number} options.concurrency how many promises can be
* handled at the same time
* @param {number} options.interval how often should new promises be
* handled (in ms)
*/
/**
* Initializes a new Queue instance with provided options.
*
* @param {Object} options
* @param {number} options.concurrent
* @param {number} options.interval
*/
/**
* Whenever the queue has started.
*
* @type {boolean}
*/
/**
* @type {boolean}
*/
/**
* Amount of promises currently handled.
*
* @type {number}
*/
/**
* Amount of tasks currently handled by the Queue.
*
* @type {number}
*/
/**
* A collection to store unresolved promises in.
*
* @type {Map}
*/
function Queue() {
var options = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : {};
/**
* A stack to store unresolved tasks.
*
* @type {Map}
*/
function Queue() {
var options = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : {};
_classCallCheck(this, Queue);
_classCallCheck(this, Queue);
// Default options:
var _this = _possibleConstructorReturn(this, (Queue.__proto__ || Object.getPrototypeOf(Queue)).call(this));
// Default options:
var _this = _possibleConstructorReturn(this, (Queue.__proto__ || Object.getPrototypeOf(Queue)).call(this));
_this.collection = new Map();
_this.unique = 0;
_this.current = 0;
_this.options = {};
_this.started = false;
_this.interval = 0;
_this.options = _extends({
concurrency: 5,
interval: 500
}, options);
return _this;
_this.stack = new Map();
_this.unique = 0;
_this.current = 0;
_this.options = {};
_this.started = false;
_this.options = _extends({
concurrent: 5,
interval: 500
}, options);
// Backward compatibility:
if (options.concurrency) {
_this.options.concurrent = options.concurrency;
}
return _this;
}
/**
* Starts the queue if it has not been started yet.
*
* @emits start
* @emits tick
* @emits request
* @emits error
*/
/**
* Starts the queue if it has not been started yet.
*
* @emits start
* @emits tick
* @emits resolve
* @emits reject
* @return {void}
* @access public
*/
/**
* Queue interval.
*
* @type {number}
*/
/**
* @type {IntervalID}
*/
/**
* Queue config.
*
* @type {Object}
*/
/**
* Queue config.
*
* @type {Object}
*/
/**
* Used to generate unique id for each promise.
*
* @type {number}
*/
/**
* Used to generate unique id for each task.
*
* @type {number}
*/
_createClass(Queue, [{
key: "start",
value: function start() {
var _this2 = this;
_createClass(Queue, [{
key: "start",
value: function start() {
var _this2 = this;
if (this.started) {
return;
if (this.started) {
return;
}
this.emit("start");
this.started = true;
this.interval = setInterval(function () {
_this2.emit("tick");
_this2.stack.forEach(function (promise, id) {
// Maximum amount of parallel concurrencies:
if (_this2.current + 1 > _this2.options.concurrent) {
return;
}
_this2.current++;
_this2.remove(id);
Promise.resolve(promise()).then(function () {
for (var _len = arguments.length, output = Array(_len), _key = 0; _key < _len; _key++) {
output[_key] = arguments[_key];
}
this.emit("start");
_this2.emit.apply(_this2, ["resolve"].concat(output));
}).catch(function (error) {
_this2.emit("reject", error);
}).then(function () {
_this2.next();
});
});
}, parseInt(this.options.interval));
}
this.started = true;
this.interval = setInterval(function () {
_this2.emit("tick");
/**
* Stops the queue.
*
* @emits stop
* @return {void}
* @access public
*/
_this2.collection.forEach(function (promise, id) {
// Maximum amount of parallel concurrencies:
if (_this2.current + 1 > _this2.options.concurrency) {
return;
}
}, {
key: "stop",
value: function stop() {
this.emit("stop");
_this2.current++;
_this2.remove(id);
clearInterval(this.interval);
promise().then(function () {
for (var _len = arguments.length, output = Array(_len), _key = 0; _key < _len; _key++) {
output[_key] = arguments[_key];
}
this.started = false;
}
_this2.emit.apply(_this2, ["resolve"].concat(output));
}).catch(function (error) {
_this2.emit("reject", error);
}).then(function () {
_this2.next();
});
});
}, parseInt(this.options.interval));
}
/**
* Goes to the next request and stops the loop if there is no requests left.
*
* @emits end
* @return {void}
* @access private
*/
/**
* Stops the queue.
*
* @emits stop
*/
}, {
key: "next",
value: function next() {
if (--this.current === 0 && this.stack.size === 0) {
this.emit("end");
this.stop();
}
}
}, {
key: "stop",
value: function stop() {
this.emit("stop");
/**
* Adds a promise to the queue.
*
* @param {Function} promise Promise to add to the queue
* @throws {Error} When promise is not a function
* @return {void}
* @access public
*/
clearInterval(this.interval);
}, {
key: "add",
value: function add(promise) {
if (typeof promise !== "function") {
throw new Error("You must provide a function, not " + (typeof promise === "undefined" ? "undefined" : _typeof(promise)) + ".");
}
this.started = false;
this.interval = 0;
}
this.stack.set(this.unique++, promise);
}
/**
* Goes to the next request and stops the loop if there is no requests left.
*
* @emits end
*/
/**
* Removes a task from the queue.
*
* @param {number} key Promise id
* @return {boolean}
* @access private
*/
}, {
key: "next",
value: function next() {
if (--this.current === 0 && this.collection.size === 0) {
this.emit("end");
this.stop();
}
}
}, {
key: "remove",
value: function remove(key) {
return this.stack.delete(key);
}
/**
* Adds a promise to the queue.
*
* @param {Promise} promise Promise to add to the queue
* @throws {Error} when the promise is not a function
*/
/**
* @see add
* @access public
*/
}, {
key: "add",
value: function add(promise) {
if (Promise.resolve(promise) == promise) {
throw new Error("You must provide a valid Promise, not " + (typeof promise === "undefined" ? "undefined" : _typeof(promise)) + ".");
}
}, {
key: "push",
value: function push(promise) {
this.add(promise);
}
this.collection.set(this.unique++, promise);
}
/**
* @see remove
* @access private
*/
/**
* Removes a promise from the queue.
*
* @param {number} key Promise id
* @return {boolean}
*/
}, {
key: "pop",
value: function pop(key) {
return this.remove(key);
}
}, {
key: "remove",
value: function remove(key) {
return this.collection.delete(key);
}
}]);
/**
* @see remove
* @access private
*/
return Queue;
}, {
key: "shift",
value: function shift(key) {
return this.remove(key);
}
}]);
return Queue;
}(_events2.default);

@@ -217,0 +261,0 @@

{
"name": "queue-promise",
"version": "1.0.0",
"keywords": [
"queue",
"queue-tasks",
"queue-request",
"promise",
"promise-chain",
"promise-queue"
],
"description": "A simple and small library for promise-based queues",
"author": "Łaniewski Bartosz <laniewski.bartozzz@gmail.com> (http://laniewski.me/)",
"license": "MIT",
"main": "dist/index.js",
"repository": {
"type": "git",
"url": "https://github.com/Bartozzz/queue-promise.git"
},
"bugs": {
"url": "https://github.com/Bartozzz/queue-promise/issues"
},
"dependencies": {
"babel-runtime": "^6.26.0"
},
"devDependencies": {
"flow-bin": "^0.56.0",
"mocha": "^4.0.0",
"babel": "^6.23.0",
"babel-cli": "^6.26.0",
"babel-eslint": "^8.0.1",
"babel-preset-flow": "^6.23.0",
"babel-preset-es2015": "^6.24.1",
"babel-plugin-add-module-exports": "^0.2.1",
"babel-plugin-transform-class-properties": "^6.24.1",
"babel-plugin-transform-object-rest-spread": "^6.26.0",
"eslint": "^4.8.0",
"eslint-config-google": "^0.9.1",
"eslint-plugin-flowtype": "^2.35.0",
"eslint-plugin-import": "^2.7.0"
},
"scripts": {
"flow": "flow",
"clean": "rm -rf dist",
"build": "babel src -d dist",
"watch": "babel src -d dist -w",
"eslint": "node_modules/.bin/eslint --fix src",
"prepare": "npm run clean && npm run eslint && npm run flow && npm run build"
}
"name": "queue-promise",
"version": "1.1.0",
"keywords": [
"queue",
"queue-tasks",
"queue-promise",
"promise",
"promise-chain",
"promise-queue"
],
"description": "A simple, dependency-free library for concurrent promise-based queues",
"author": "Łaniewski Bartosz <laniewski.bartozzz@gmail.com> (http://laniewski.me/)",
"license": "MIT",
"main": "dist/index.js",
"repository": {
"type": "git",
"url": "https://github.com/Bartozzz/queue-promise.git"
},
"bugs": {
"url": "https://github.com/Bartozzz/queue-promise/issues"
},
"devDependencies": {
"babel-cli": "^6.26.0",
"babel-core": "^6.26.0",
"babel-eslint": "^8.2.2",
"babel-plugin-transform-object-rest-spread": "^6.26.0",
"babel-plugin-transform-class-properties": "^6.24.1",
"babel-plugin-add-module-exports": "^0.2.1",
"babel-preset-env": "^1.6.1",
"babel-preset-flow": "^6.23.0",
"chai": "^4.1.2",
"eslint": "^4.19.1",
"eslint-config-prettier": "^2.9.0",
"eslint-config-standard": "^11.0.0",
"eslint-plugin-flowtype": "^2.46.1",
"eslint-plugin-import": "^2.9.0",
"eslint-plugin-node": "^6.0.1",
"eslint-plugin-prettier": "^2.6.0",
"eslint-plugin-promise": "^3.7.0",
"eslint-plugin-standard": "^3.0.1",
"flow-bin": "^0.67.1",
"mocha": "^5.0.5",
"prettier": "^1.11.1"
},
"scripts": {
"test": "npm run test:eslint && npm run test:flow && npm run test:mocha",
"test:flow": "flow",
"test:mocha": "mocha --require babel-core/register",
"test:eslint": "eslint --fix src",
"clean": "rm -rf dist",
"build": "babel src -d dist",
"watch": "babel src -d dist -w",
"prepare": "npm run clean && npm run test && npm run build"
}
}

@@ -8,5 +8,6 @@ <div align="center">

[![npm downloads](https://img.shields.io/npm/dt/queue-promise.svg)](https://www.npmjs.com/package/queue-promise)
<br>
<br>
A simple and small library for promise-based queues.
`queue-promise` is a small, dependency-free library for promise-based queues. It will resolve enqueued functions concurrently at a given speed. When a task is being resolved or rejected, an event will be emitted.
</div>

@@ -25,15 +26,15 @@

const q = new Queue( {
concurrency : 1,
interval : 2000
} );
const queue = new Queue({
concurrent: 1, // resolve 1 task at a time
interval: 2000 // resolve a new task once in 2000ms
});
q.on( "resolve", data => console.log( data ) );
q.on( "reject", error => console.error( error ) );
queue.on("resolve", data => console.log(data));
queue.on("reject", error => console.error(error));
q.add( asyncTaskA ); // resolved/rejected after 0s
q.add( asyncTaskB ); // resolved/rejected after 2s
q.add( asyncTaskC ); // resolved/rejected after 4s
q.add( asyncTaskD ); // resolved/rejected after 6s
q.start();
queue.push(asyncTaskA); // resolved/rejected after 0s
queue.push(asyncTaskB); // resolved/rejected after 2s
queue.push(asyncTaskC); // resolved/rejected after 4s
queue.push(asyncTaskD); // resolved/rejected after 6s
queue.start();
```

@@ -43,19 +44,23 @@

#### `new Queue( options )`
#### `new Queue(options)`
Create a new `Queue` instance with optionally injected options.
Create a new `Queue` instance.
| Option | Default | Description |
|:------------|:--------|:--------------------------------------------------|
| concurrency | 5 | How many promises can be handled at the same time |
| interval | 500 | How often should new promises be handled (in ms) |
| Option | Default | Description |
| :--------- | :------ | :--------------------------------------------- |
| concurrent | 5 | How many tasks can be handled at the same time |
| interval | 500 | How often should new tasks be handled (in ms) |
#### **public** `.add( promise )`
#### **public** `.add(task)`/`.push(task)`
Adds a promise to the queue.
Puts a new task on the stack. Throws an error if the provided `task` is not a valid function.
#### **public** `.start()`
Starts the queue.
Starts the queue. `Queue` will use global [Promises](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise).
#### **public** `.started`
Whether the queue has been started or not.
#### **public** `.stop()`

@@ -65,2 +70,6 @@

#### **public** `.on(event, callback)`
Sets a `callback` for an `event`. You can set callback for those events: `start`, `stop`, `tick`, `resolve`, `reject`, `end`.
#### **private** `.next()`

@@ -70,4 +79,10 @@

#### **public** `.on( event, callback )`
#### **private** `.remove(key)`/`.pop(key)`/`.shift(key)`
Sets a `callback` for an `event`. You can set callback for those events: `start`, `stop`, `tick`, `resolve`, `reject`, `end`.
Removes a task from the queue.
## Tests
```bash
$ npm test
```
// @flow
import EventEmitter from "events";
/**
* A simple and small library for promise-based queues.
* A small and simple library for promise-based queues. It will resolve enqueued
* functions concurrently at a specified speed. When a task is being resolved or
* rejected, an event will be emitted.
*
* @class Queue
* @extends EventEmitter
*/
export default class Queue extends EventEmitter {
/**
* A collection to store unresolved promises in.
*
* @type {Map}
*/
collection: Map<number, () => Promise<*>> = new Map;
/**
* A stack to store unresolved tasks.
*
* @type {Map}
*/
stack: Map<number, Function> = new Map();
/**
* Used to generate unique id for each promise.
*
* @type {number}
*/
unique: number = 0;
/**
* Used to generate unique id for each task.
*
* @type {number}
*/
unique: number = 0;
/**
* Amount of promises currently handled.
*
* @type {number}
*/
current: number = 0;
/**
* Amount of tasks currently handled by the Queue.
*
* @type {number}
*/
current: number = 0;
/**
* Queue config.
*
* @type {Object}
*/
options: Object = {};
/**
* Queue config.
*
* @type {Object}
*/
options: Object = {};
/**
* Whenever the queue has started.
*
* @type {boolean}
*/
started: boolean = false;
/**
* @type {boolean}
*/
started: boolean = false;
/**
* Queue interval.
*
* @type {number}
*/
interval: number = 0;
/**
* @type {IntervalID}
*/
interval: IntervalID;
/**
* Initializes a new Queue instance with provided options.
*
* @param {object} options
* @param {number} options.concurrency how many promises can be
* handled at the same time
* @param {number} options.interval how often should new promises be
* handled (in ms)
*/
constructor(options: Object = {}): void {
super();
/**
* Initializes a new Queue instance with provided options.
*
* @param {Object} options
* @param {number} options.concurrent
* @param {number} options.interval
*/
constructor(options: Object = {}): void {
super();
// Default options:
this.options = {
concurrency: 5,
interval: 500,
...options,
};
// Default options:
this.options = {
concurrent: 5,
interval: 500,
...options
};
// Backward compatibility:
if (options.concurrency) {
this.options.concurrent = options.concurrency;
}
}
/**
* Starts the queue if it has not been started yet.
*
* @emits start
* @emits tick
* @emits request
* @emits error
*/
start(): void {
if (this.started) {
return;
}
/**
* Starts the queue if it has not been started yet.
*
* @emits start
* @emits tick
* @emits resolve
* @emits reject
* @return {void}
* @access public
*/
start(): void {
if (this.started) {
return;
}
this.emit("start");
this.emit("start");
this.started = true;
this.interval = setInterval(() => {
this.emit("tick");
this.started = true;
this.interval = setInterval(() => {
this.emit("tick");
this.collection.forEach((promise, id) => {
// Maximum amount of parallel concurrencies:
if (this.current + 1 > this.options.concurrency) {
return;
}
this.stack.forEach((promise, id) => {
// Maximum amount of parallel concurrencies:
if (this.current + 1 > this.options.concurrent) {
return;
}
this.current++;
this.remove(id);
this.current++;
this.remove(id);
promise()
.then((...output) => {
this.emit("resolve", ...output);
})
.catch((error) => {
this.emit("reject", error);
})
.then(() => {
this.next();
});
});
}, parseInt(this.options.interval));
}
Promise.resolve(promise())
.then((...output) => {
this.emit("resolve", ...output);
})
.catch(error => {
this.emit("reject", error);
})
.then(() => {
this.next();
});
});
}, parseInt(this.options.interval));
}
/**
* Stops the queue.
*
* @emits stop
*/
stop(): void {
this.emit("stop");
/**
* Stops the queue.
*
* @emits stop
* @return {void}
* @access public
*/
stop(): void {
this.emit("stop");
clearInterval(this.interval);
clearInterval(this.interval);
this.started = false;
this.interval = 0;
this.started = false;
}
/**
* Goes to the next request and stops the loop if there is no requests left.
*
* @emits end
* @return {void}
* @access private
*/
next(): void {
if (--this.current === 0 && this.stack.size === 0) {
this.emit("end");
this.stop();
}
}
/**
* Goes to the next request and stops the loop if there is no requests left.
*
* @emits end
*/
next(): void {
if (--this.current === 0 && this.collection.size === 0) {
this.emit("end");
this.stop();
}
/**
* Adds a promise to the queue.
*
* @param {Function} promise Promise to add to the queue
* @throws {Error} When promise is not a function
* @return {void}
* @access public
*/
add(promise: Function): void {
if (typeof promise !== "function") {
throw new Error(`You must provide a function, not ${typeof promise}.`);
}
/**
* Adds a promise to the queue.
*
* @param {Promise} promise Promise to add to the queue
* @throws {Error} when the promise is not a function
*/
add(promise: () => Promise<*>): void {
if (Promise.resolve(promise) == promise) {
throw new Error(
`You must provide a valid Promise, not ${typeof promise}.`
);
}
this.stack.set(this.unique++, promise);
}
this.collection.set(this.unique++, promise);
}
/**
* Removes a task from the queue.
*
* @param {number} key Promise id
* @return {boolean}
* @access private
*/
remove(key: number): boolean {
return this.stack.delete(key);
}
/**
* Removes a promise from the queue.
*
* @param {number} key Promise id
* @return {boolean}
*/
remove(key: number): boolean {
return this.collection.delete(key);
}
/**
* @see add
* @access public
*/
push(promise: Function): void {
this.add(promise);
}
/**
* @see remove
* @access private
*/
pop(key: number): boolean {
return this.remove(key);
}
/**
* @see remove
* @access private
*/
shift(key: number): boolean {
return this.remove(key);
}
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc