queue-promise
Advanced tools
Comparing version 1.2.1 to 1.3.0
@@ -6,49 +6,12 @@ "use strict"; | ||
}); | ||
exports.default = void 0; | ||
var _typeof2 = require("babel-runtime/helpers/typeof"); | ||
var _events = _interopRequireDefault(require("events")); | ||
var _typeof3 = _interopRequireDefault(_typeof2); | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } | ||
var _getIterator2 = require("babel-runtime/core-js/get-iterator"); | ||
function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; var ownKeys = Object.keys(source); if (typeof Object.getOwnPropertySymbols === 'function') { ownKeys = ownKeys.concat(Object.getOwnPropertySymbols(source).filter(function (sym) { return Object.getOwnPropertyDescriptor(source, sym).enumerable; })); } ownKeys.forEach(function (key) { _defineProperty(target, key, source[key]); }); } return target; } | ||
var _getIterator3 = _interopRequireDefault(_getIterator2); | ||
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } | ||
var _promise = require("babel-runtime/core-js/promise"); | ||
var _promise2 = _interopRequireDefault(_promise); | ||
var _extends2 = require("babel-runtime/helpers/extends"); | ||
var _extends3 = _interopRequireDefault(_extends2); | ||
var _map = require("babel-runtime/core-js/map"); | ||
var _map2 = _interopRequireDefault(_map); | ||
var _getPrototypeOf = require("babel-runtime/core-js/object/get-prototype-of"); | ||
var _getPrototypeOf2 = _interopRequireDefault(_getPrototypeOf); | ||
var _classCallCheck2 = require("babel-runtime/helpers/classCallCheck"); | ||
var _classCallCheck3 = _interopRequireDefault(_classCallCheck2); | ||
var _createClass2 = require("babel-runtime/helpers/createClass"); | ||
var _createClass3 = _interopRequireDefault(_createClass2); | ||
var _possibleConstructorReturn2 = require("babel-runtime/helpers/possibleConstructorReturn"); | ||
var _possibleConstructorReturn3 = _interopRequireDefault(_possibleConstructorReturn2); | ||
var _inherits2 = require("babel-runtime/helpers/inherits"); | ||
var _inherits3 = _interopRequireDefault(_inherits2); | ||
var _events = require("events"); | ||
var _events2 = _interopRequireDefault(_events); | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } | ||
/** | ||
@@ -59,66 +22,99 @@ * A small and simple library for promise-based queues. It will resolve enqueued | ||
* | ||
* @example | ||
* const queue = new Queue({ | ||
* concurrent: 1, | ||
* interval: 2000 | ||
* }); | ||
* | ||
* queue.on("resolve", data => console.log(data)); | ||
* queue.on("reject", error => console.error(error)); | ||
* | ||
* queue.enqueue(asyncTaskA); | ||
* queue.enqueue([asyncTaskB, asyncTaskC]); | ||
* | ||
* @class Queue | ||
* @extends EventEmitter | ||
*/ | ||
var Queue = function (_EventEmitter) { | ||
(0, _inherits3.default)(Queue, _EventEmitter); | ||
class Queue extends _events.default { | ||
/** | ||
* Initializes a new Queue instance with provided options. | ||
* A collection to store unresolved tasks. We use a Map here because V8 uses a | ||
* variant of hash tables that generally have O(1) complexity for retrieval | ||
* and lookup. | ||
* | ||
* @param {Object} options | ||
* @param {number} options.concurrent | ||
* @param {number} options.interval | ||
* @param {boolean} options.start | ||
* @see https://codereview.chromium.org/220293002/ | ||
* @type {Map} | ||
* @access private | ||
*/ | ||
/** | ||
* @type {number} Used to generate unique id for each task | ||
* @access private | ||
*/ | ||
/** | ||
* @type {boolean} | ||
* @type {IntervalID} | ||
* @access private | ||
*/ | ||
/** | ||
* @type {number} Amount of tasks currently handled by the Queue | ||
* @access private | ||
*/ | ||
/** | ||
* Amount of tasks currently handled by the Queue. | ||
* | ||
* @type {number} | ||
* @type {Object} options | ||
* @type {number} options.concurrent How many tasks should be resolved at a time | ||
* @type {number} options.interval How often should new tasks be resolved (ms) | ||
* @type {boolean} options.start If should resolve new tasks automatically | ||
* @access public | ||
*/ | ||
/** | ||
* A collection to store unresolved tasks. We use a Map here because V8 uses a | ||
* variant of hash tables that generally have O(1) complexity for retrieval | ||
* and lookup. | ||
* @type {boolean} Whether the queue has already started | ||
* @access public | ||
*/ | ||
/** | ||
* @type {boolean} Whether the queue has been forced to stop | ||
* @access public | ||
*/ | ||
/** | ||
* Initializes a new Queue instance with provided options. | ||
* | ||
* @see https://codereview.chromium.org/220293002/ | ||
* @type {Map} | ||
* @param {Object} options | ||
* @param {number} options.concurrent How many tasks should be resolved at a time | ||
* @param {number} options.interval How often should new tasks be resolved (ms) | ||
* @param {boolean} options.start If should resolve new tasks automatically | ||
* @return {Queue} | ||
*/ | ||
function Queue() { | ||
var options = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : {}; | ||
(0, _classCallCheck3.default)(this, Queue); | ||
constructor(options = {}) { | ||
super(); | ||
var _this = (0, _possibleConstructorReturn3.default)(this, (Queue.__proto__ || (0, _getPrototypeOf2.default)(Queue)).call(this)); | ||
_defineProperty(this, "tasks", new Map()); | ||
_this.tasks = new _map2.default(); | ||
_this.unique = 0; | ||
_this.current = 0; | ||
_this.options = {}; | ||
_this.started = false; | ||
_defineProperty(this, "uniqueId", 0); | ||
_defineProperty(this, "intervalId", void 0); | ||
_this.options = (0, _extends3.default)({ | ||
_defineProperty(this, "currentlyHandled", 0); | ||
_defineProperty(this, "options", { | ||
concurrent: 5, | ||
interval: 500, | ||
start: true | ||
}, options); | ||
}); | ||
_this.options.interval = parseInt(_this.options.interval); | ||
_this.options.concurrent = parseInt(_this.options.concurrent); | ||
_defineProperty(this, "started", false); | ||
// Backward compatibility: | ||
_defineProperty(this, "stopped", false); | ||
this.options = _objectSpread({}, this.options, options); | ||
this.options.interval = parseInt(this.options.interval, 10); | ||
this.options.concurrent = parseInt(this.options.concurrent, 10); // Backward compatibility: | ||
if (options.concurrency) { | ||
_this.options.concurrent = options.concurrency; | ||
this.options.concurrent = parseInt(options.concurrency, 10); | ||
} | ||
return _this; | ||
} | ||
/** | ||
@@ -133,206 +129,141 @@ * Starts the queue if it has not been started yet. | ||
start() { | ||
if (!this.started) { | ||
this.emit("start"); | ||
this.stopped = false; | ||
this.started = true; | ||
this.intervalId = setInterval(this.dequeue.bind(this), this.options.interval); | ||
} | ||
} | ||
/** | ||
* @type {IntervalID} | ||
* Stops the queue. | ||
* | ||
* @emits stop | ||
* @return {void} | ||
* @access public | ||
*/ | ||
stop() { | ||
this.emit("stop"); | ||
this.stopped = true; | ||
this.started = false; | ||
clearInterval(this.intervalId); | ||
} | ||
/** | ||
* @type {Object} | ||
* Goes to the next request and stops the loop if there is no requests left. | ||
* | ||
* @emits end | ||
* @return {void} | ||
* @access private | ||
*/ | ||
finalize() { | ||
if (--this.currentlyHandled === 0 && this.isEmpty) { | ||
this.emit("end"); | ||
this.stop(); // Finalize doesn't force queue to stop as `Queue.stop()` does. New tasks | ||
// should therefore be still resolved automatically if `options.start` was | ||
// set to `true` (see `Queue.enqueue`): | ||
this.stopped = false; | ||
} | ||
} | ||
/** | ||
* Used to generate unique id for each task. | ||
* Resolves n concurrent promises from the queue. | ||
* | ||
* @type {number} | ||
* @return {Promise<any>} | ||
* @emits resolve | ||
* @emits reject | ||
* @access public | ||
*/ | ||
(0, _createClass3.default)(Queue, [{ | ||
key: "start", | ||
value: function start() { | ||
var _this2 = this; | ||
if (!this.started) { | ||
this.emit("start"); | ||
this.started = true; | ||
this.interval = setInterval(function () { | ||
return _this2.dequeue(); | ||
}, this.options.interval); | ||
dequeue() { | ||
const promises = []; | ||
this.tasks.forEach((promise, id) => { | ||
// Maximum amount of parallel concurrencies: | ||
if (this.currentlyHandled >= this.options.concurrent) { | ||
return; | ||
} | ||
} | ||
/** | ||
* Stops the queue. | ||
* | ||
* @emits stop | ||
* @return {void} | ||
* @access public | ||
*/ | ||
this.currentlyHandled++; | ||
this.tasks.delete(id); | ||
promises.push(Promise.resolve(promise())); | ||
}); | ||
return Promise.all(promises).then(values => { | ||
for (let output of values) this.emit("resolve", output); | ||
}, { | ||
key: "stop", | ||
value: function stop() { | ||
this.emit("stop"); | ||
return values; | ||
}).catch(error => { | ||
this.emit("reject", error); | ||
return error; | ||
}).then(output => { | ||
this.finalize(); | ||
return output; | ||
}); | ||
} | ||
/** | ||
* Adds a promise to the queue. | ||
* | ||
* @param {Function|Array} tasks Tasks to add to the queue | ||
* @throws {Error} When task is not a function | ||
* @return {void} | ||
* @access public | ||
*/ | ||
this.started = false; | ||
clearInterval(this.interval); | ||
} | ||
/** | ||
* Goes to the next request and stops the loop if there is no requests left. | ||
* | ||
* @emits end | ||
* @return {void} | ||
* @access private | ||
*/ | ||
}, { | ||
key: "finalize", | ||
value: function finalize() { | ||
if (--this.current === 0 && this.isEmpty) { | ||
this.emit("end"); | ||
this.stop(); | ||
} | ||
enqueue(tasks) { | ||
if (Array.isArray(tasks)) { | ||
tasks.map(task => this.enqueue(task)); | ||
return; | ||
} | ||
/** | ||
* Resolves n concurrent promises from the queue. | ||
* | ||
* @return {Promise<*>} | ||
* @emits resolve | ||
* @emits reject | ||
* @access public | ||
*/ | ||
if (typeof tasks !== "function") { | ||
throw new Error(`You must provide a function, not ${typeof tasks}.`); | ||
} // Start the queue if the queue should resolve new tasks automatically and | ||
// the queue hasn't been forced to stop: | ||
}, { | ||
key: "dequeue", | ||
value: function dequeue() { | ||
var _this3 = this; | ||
var promises = []; | ||
this.tasks.forEach(function (promise, id) { | ||
// Maximum amount of parallel concurrencies: | ||
if (_this3.current + 1 > _this3.options.concurrent) { | ||
return; | ||
} | ||
_this3.current++; | ||
_this3.tasks.delete(id); | ||
promises.push(_promise2.default.resolve(promise())); | ||
}); | ||
return _promise2.default.all(promises).then(function (values) { | ||
var _iteratorNormalCompletion = true; | ||
var _didIteratorError = false; | ||
var _iteratorError = undefined; | ||
try { | ||
for (var _iterator = (0, _getIterator3.default)(values), _step; !(_iteratorNormalCompletion = (_step = _iterator.next()).done); _iteratorNormalCompletion = true) { | ||
var output = _step.value; | ||
_this3.emit("resolve", output); | ||
} | ||
} catch (err) { | ||
_didIteratorError = true; | ||
_iteratorError = err; | ||
} finally { | ||
try { | ||
if (!_iteratorNormalCompletion && _iterator.return) { | ||
_iterator.return(); | ||
} | ||
} finally { | ||
if (_didIteratorError) { | ||
throw _iteratorError; | ||
} | ||
} | ||
} | ||
return values; | ||
}).catch(function (error) { | ||
_this3.emit("reject", error); | ||
return error; | ||
}).then(function (output) { | ||
_this3.finalize(); | ||
return output; | ||
}); | ||
if (this.options.start && !this.stopped) { | ||
this.start(); | ||
} | ||
/** | ||
* Adds a promise to the queue. | ||
* | ||
* @param {Function|Array} promise Promise to add to the queue | ||
* @throws {Error} When promise is not a function | ||
* @return {void} | ||
* @access public | ||
*/ | ||
this.tasks.set(this.uniqueId++, tasks); | ||
} | ||
/** | ||
* @see enqueue | ||
* @access public | ||
*/ | ||
}, { | ||
key: "enqueue", | ||
value: function enqueue(promise) { | ||
var _this4 = this; | ||
if (Array.isArray(promise)) { | ||
promise.map(function (p) { | ||
return _this4.enqueue(p); | ||
}); | ||
return; | ||
} | ||
add(tasks) { | ||
this.enqueue(tasks); | ||
} | ||
/** | ||
* Removes all tasks from the queue. | ||
* | ||
* @return {void} | ||
* @access public | ||
*/ | ||
if (typeof promise !== "function") { | ||
throw new Error("You must provide a function, not " + (typeof promise === "undefined" ? "undefined" : (0, _typeof3.default)(promise)) + "."); | ||
} | ||
// (Re)start the queue if new tasks are being added and the queue has been | ||
// automatically started before: | ||
if (this.options.start) { | ||
this.start(); | ||
} | ||
clear() { | ||
this.tasks.clear(); | ||
} | ||
/** | ||
* Checks whether the queue is empty, i.e. there's no tasks. | ||
* | ||
* @type {boolean} | ||
* @access public | ||
*/ | ||
this.tasks.set(this.unique++, promise); | ||
} | ||
/** | ||
* @see enqueue | ||
* @access public | ||
*/ | ||
get isEmpty() { | ||
return this.tasks.size === 0; | ||
} | ||
}, { | ||
key: "add", | ||
value: function add(promise) { | ||
this.enqueue(promise); | ||
} | ||
} | ||
/** | ||
* Removes all tasks from the queue. | ||
* | ||
* @return {void} | ||
* @access public | ||
*/ | ||
}, { | ||
key: "clear", | ||
value: function clear() { | ||
this.tasks.clear(); | ||
} | ||
/** | ||
* Checks whether the queue is empty, i.e. there's no tasks. | ||
* | ||
* @type {boolean} | ||
* @access public | ||
*/ | ||
}, { | ||
key: "isEmpty", | ||
get: function get() { | ||
return this.tasks.size === 0; | ||
} | ||
}]); | ||
return Queue; | ||
}(_events2.default); | ||
exports.default = Queue; | ||
module.exports = exports["default"]; | ||
module.exports = exports.default; |
{ | ||
"name": "queue-promise", | ||
"version": "1.2.1", | ||
"version": "1.3.0", | ||
"keywords": [ | ||
@@ -12,4 +12,7 @@ "queue", | ||
], | ||
"description": "A simple, dependency-free library for concurrent promise-based queues", | ||
"author": "Łaniewski Bartosz <laniewski.bartozzz@gmail.com> (http://laniewski.me/)", | ||
"engines": { | ||
"node": ">=8.12.0" | ||
}, | ||
"description": "A simple, dependency-free library for concurrent promise-based queues. Comes with with concurrency and timeout control.", | ||
"author": "Łaniewski Bartosz <bartosz@laniewski.me> (https://laniewski.me/)", | ||
"license": "MIT", | ||
@@ -25,25 +28,25 @@ "main": "dist/index.js", | ||
"devDependencies": { | ||
"babel-cli": "^6.26.0", | ||
"babel-core": "^6.26.0", | ||
"babel-eslint": "^8.2.2", | ||
"babel-plugin-add-module-exports": "^0.2.1", | ||
"babel-plugin-transform-async-to-generator": "6.24.1", | ||
"babel-plugin-transform-class-properties": "^6.24.1", | ||
"babel-plugin-transform-object-rest-spread": "^6.26.0", | ||
"babel-plugin-transform-runtime": "6.23.0", | ||
"babel-preset-env": "^1.6.1", | ||
"babel-preset-flow": "^6.23.0", | ||
"chai": "^4.1.2", | ||
"eslint": "^4.19.1", | ||
"eslint-config-prettier": "^2.9.0", | ||
"eslint-config-standard": "^11.0.0", | ||
"eslint-plugin-flowtype": "^2.46.1", | ||
"eslint-plugin-import": "^2.9.0", | ||
"eslint-plugin-node": "^6.0.1", | ||
"eslint-plugin-prettier": "^2.6.0", | ||
"eslint-plugin-promise": "^3.7.0", | ||
"eslint-plugin-standard": "^3.0.1", | ||
"flow-bin": "^0.67.1", | ||
"mocha": "^5.0.5", | ||
"prettier": "^1.11.1" | ||
"@babel/cli": "^7.0.0", | ||
"@babel/core": "^7.0.0", | ||
"@babel/plugin-proposal-async-generator-functions": "^7.0.0", | ||
"@babel/plugin-proposal-class-properties": "^7.0.0", | ||
"@babel/plugin-proposal-object-rest-spread": "^7.0.0", | ||
"@babel/preset-env": "^7.0.0", | ||
"@babel/preset-flow": "^7.0.0", | ||
"@babel/register": "7.0.0", | ||
"babel-eslint": "^10.0.1", | ||
"babel-plugin-add-module-exports": "^1.0.0", | ||
"chai": "^4.2.0", | ||
"eslint": "^5.6.0", | ||
"eslint-config-prettier": "^3.1.0", | ||
"eslint-config-standard": "^12.0.0", | ||
"eslint-plugin-flowtype": "^2.50.3", | ||
"eslint-plugin-import": "2.14.0", | ||
"eslint-plugin-node": "^7.0.1", | ||
"eslint-plugin-prettier": "^2.7.0", | ||
"eslint-plugin-promise": "^4.0.0", | ||
"eslint-plugin-standard": "^4.0.0", | ||
"flow-bin": "^0.81.0", | ||
"mocha": "^5.2.0", | ||
"prettier": "^1.14.3" | ||
}, | ||
@@ -53,3 +56,3 @@ "scripts": { | ||
"test:flow": "flow", | ||
"test:mocha": "mocha --require babel-core/register", | ||
"test:mocha": "mocha --require @babel/register", | ||
"test:eslint": "eslint --fix src", | ||
@@ -56,0 +59,0 @@ "clean": "rm -rf dist", |
@@ -26,5 +26,8 @@ <div align="center"> | ||
const queue = new Queue({ | ||
concurrent: 1, // resolve 1 task at a time | ||
interval: 2000, // resolve new tasks each 2000ms, | ||
start: true, // automatically resolve new tasks when they are added | ||
// How many tasks should be resolved at a time (defaults to `5`): | ||
concurrent: 1, | ||
// How often should new tasks be resolved (in ms – defaults to `500`): | ||
interval: 2000, | ||
// If should resolve new tasks automatically when they are added (defaults to `true`): | ||
start: true | ||
}); | ||
@@ -47,11 +50,11 @@ | ||
| Option | Default | Description | | ||
| :----------- | :------ | :--------------------------------------------------------------------------- | | ||
| `concurrent` | `5` | How many tasks can be handled at the same time | | ||
| `interval` | `500` | How often should new tasks be handled (in ms) | | ||
| `start` | `true` | Whether we should automatically resolve new tasks as soon as they are added | | ||
| Option | Default | Description | | ||
| :----------- | :------ | :-------------------------------------------------------------------------- | | ||
| `concurrent` | `5` | How many tasks can be handled at the same time | | ||
| `interval` | `500` | How often should new tasks be handled (in ms) | | ||
| `start` | `true` | Whether we should automatically resolve new tasks as soon as they are added | | ||
#### **public** `.enqueue(task)`/`.add(task)` | ||
#### **public** `.enqueue(tasks)`/`.add(tasks)` | ||
Puts a new task on the stack. Tasks should be an async function or return a promise. Throws an error if the provided `task` is not a valid function. | ||
Puts a new task on the stack. A task should be an async function (ES2017) or return a [Promise](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise). Throws an error if the provided `task` is not a valid function. | ||
@@ -63,3 +66,3 @@ **Example:** | ||
return await github.getRepos(user); | ||
}; | ||
} | ||
@@ -70,6 +73,3 @@ queue.enqueue(getRepos("userA")); | ||
// …equivalent to: | ||
queue.enqueue([ | ||
getRepos("userA"), | ||
getRepos("userB"), | ||
]); | ||
queue.enqueue([getRepos("userA"), getRepos("userB")]); | ||
``` | ||
@@ -79,3 +79,3 @@ | ||
Resolves _n_ concurrent promises from the queue. Uses global [Promises](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise). | ||
Manually resolves _n_ concurrent (based od `options.concurrent`) promises from the queue. Uses global [Promises](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise). Is called automatically if `options.start` is set to `true`. Emits `resolve` and `reject` events. | ||
@@ -93,3 +93,3 @@ **Example:** | ||
// If "concurrent" is set to 2, two promises are resolved concurrently: | ||
const users = await queue.dequeue(); | ||
const [userA, userB] = await queue.dequeue(); | ||
``` | ||
@@ -106,4 +106,4 @@ | ||
queue.on("resolve", output => …); | ||
queue.on("reject", output => …); | ||
queue.on("resolve", data => …); | ||
queue.on("reject", error => …); | ||
queue.on("end", () => …); | ||
@@ -116,5 +116,17 @@ ``` | ||
```javascript | ||
queue.enqueue(getRepos("userA")); | ||
queue.enqueue(getRepos("userB")); | ||
queue.enqueue(getRepos("userC")); | ||
queue.enqueue(getRepos("userD")); | ||
queue.start(); | ||
// No need to call `dequeue` – you can just listen for events: | ||
queue.on("resolve", data => …); | ||
queue.on("reject", error => …); | ||
``` | ||
#### **public** `.stop()` | ||
Stops the queue. Emits `stop` event. | ||
Forces the queue to stop. New tasks will not be resolved automatically even if `options.start` was set to `true`. Emits `stop` event. | ||
@@ -129,2 +141,6 @@ #### **public** `.clear()` | ||
#### **public** `.stopped` | ||
Whether the queue has been forced to stop. | ||
#### **public** `.isEmpty` | ||
@@ -131,0 +147,0 @@ |
142
src/index.js
@@ -9,2 +9,14 @@ // @flow | ||
* | ||
* @example | ||
* const queue = new Queue({ | ||
* concurrent: 1, | ||
* interval: 2000 | ||
* }); | ||
* | ||
* queue.on("resolve", data => console.log(data)); | ||
* queue.on("reject", error => console.error(error)); | ||
* | ||
* queue.enqueue(asyncTaskA); | ||
* queue.enqueue([asyncTaskB, asyncTaskC]); | ||
* | ||
* @class Queue | ||
@@ -21,2 +33,3 @@ * @extends EventEmitter | ||
* @type {Map} | ||
* @access private | ||
*/ | ||
@@ -26,54 +39,63 @@ tasks: Map<number, Function> = new Map(); | ||
/** | ||
* Used to generate unique id for each task. | ||
* | ||
* @type {number} | ||
* @type {number} Used to generate unique id for each task | ||
* @access private | ||
*/ | ||
unique: number = 0; | ||
uniqueId = 0; | ||
/** | ||
* Amount of tasks currently handled by the Queue. | ||
* | ||
* @type {number} | ||
* @type {IntervalID} | ||
* @access private | ||
*/ | ||
current: number = 0; | ||
intervalId: IntervalID; | ||
/** | ||
* @type {Object} | ||
* @type {number} Amount of tasks currently handled by the Queue | ||
* @access private | ||
*/ | ||
options: Object = {}; | ||
currentlyHandled = 0; | ||
/** | ||
* @type {boolean} | ||
* @type {Object} options | ||
* @type {number} options.concurrent How many tasks should be resolved at a time | ||
* @type {number} options.interval How often should new tasks be resolved (ms) | ||
* @type {boolean} options.start If should resolve new tasks automatically | ||
* @access public | ||
*/ | ||
started: boolean = false; | ||
options = { | ||
concurrent: 5, | ||
interval: 500, | ||
start: true | ||
}; | ||
/** | ||
* @type {IntervalID} | ||
* @type {boolean} Whether the queue has already started | ||
* @access public | ||
*/ | ||
interval: IntervalID; | ||
started = false; | ||
/** | ||
* @type {boolean} Whether the queue has been forced to stop | ||
* @access public | ||
*/ | ||
stopped = false; | ||
/** | ||
* Initializes a new Queue instance with provided options. | ||
* | ||
* @param {Object} options | ||
* @param {number} options.concurrent | ||
* @param {number} options.interval | ||
* @param {boolean} options.start | ||
* @param {number} options.concurrent How many tasks should be resolved at a time | ||
* @param {number} options.interval How often should new tasks be resolved (ms) | ||
* @param {boolean} options.start If should resolve new tasks automatically | ||
* @return {Queue} | ||
*/ | ||
constructor(options: Object = {}): void { | ||
constructor(options: Object = {}) { | ||
super(); | ||
this.options = { | ||
concurrent: 5, | ||
interval: 500, | ||
start: true, | ||
...options | ||
}; | ||
this.options = { ...this.options, ...options }; | ||
this.options.interval = parseInt(this.options.interval, 10); | ||
this.options.concurrent = parseInt(this.options.concurrent, 10); | ||
this.options.interval = parseInt(this.options.interval); | ||
this.options.concurrent = parseInt(this.options.concurrent); | ||
// Backward compatibility: | ||
if (options.concurrency) { | ||
this.options.concurrent = options.concurrency; | ||
this.options.concurrent = parseInt(options.concurrency, 10); | ||
} | ||
@@ -89,8 +111,13 @@ } | ||
*/ | ||
start(): void { | ||
start() { | ||
if (!this.started) { | ||
this.emit("start"); | ||
this.stopped = false; | ||
this.started = true; | ||
this.interval = setInterval(() => this.dequeue(), this.options.interval); | ||
this.intervalId = setInterval( | ||
this.dequeue.bind(this), | ||
this.options.interval | ||
); | ||
} | ||
@@ -106,7 +133,9 @@ } | ||
*/ | ||
stop(): void { | ||
stop() { | ||
this.emit("stop"); | ||
this.stopped = true; | ||
this.started = false; | ||
clearInterval(this.interval); | ||
clearInterval(this.intervalId); | ||
} | ||
@@ -121,6 +150,11 @@ | ||
*/ | ||
finalize(): void { | ||
if (--this.current === 0 && this.isEmpty) { | ||
finalize() { | ||
if (--this.currentlyHandled === 0 && this.isEmpty) { | ||
this.emit("end"); | ||
this.stop(); | ||
// Finalize doesn't force queue to stop as `Queue.stop()` does. New tasks | ||
// should therefore be still resolved automatically if `options.start` was | ||
// set to `true` (see `Queue.enqueue`): | ||
this.stopped = false; | ||
} | ||
@@ -132,3 +166,3 @@ } | ||
* | ||
* @return {Promise<*>} | ||
* @return {Promise<any>} | ||
* @emits resolve | ||
@@ -138,3 +172,3 @@ * @emits reject | ||
*/ | ||
dequeue(): Promise<*> { | ||
dequeue() { | ||
const promises = []; | ||
@@ -144,7 +178,7 @@ | ||
// Maximum amount of parallel concurrencies: | ||
if (this.current + 1 > this.options.concurrent) { | ||
if (this.currentlyHandled >= this.options.concurrent) { | ||
return; | ||
} | ||
this.current++; | ||
this.currentlyHandled++; | ||
this.tasks.delete(id); | ||
@@ -173,24 +207,24 @@ | ||
* | ||
* @param {Function|Array} promise Promise to add to the queue | ||
* @throws {Error} When promise is not a function | ||
* @param {Function|Array} tasks Tasks to add to the queue | ||
* @throws {Error} When task is not a function | ||
* @return {void} | ||
* @access public | ||
*/ | ||
enqueue(promise: Function | Array<Function>): void { | ||
if (Array.isArray(promise)) { | ||
promise.map(p => this.enqueue(p)); | ||
enqueue(tasks: Function | Array<Function>) { | ||
if (Array.isArray(tasks)) { | ||
tasks.map(task => this.enqueue(task)); | ||
return; | ||
} | ||
if (typeof promise !== "function") { | ||
throw new Error(`You must provide a function, not ${typeof promise}.`); | ||
if (typeof tasks !== "function") { | ||
throw new Error(`You must provide a function, not ${typeof tasks}.`); | ||
} | ||
// (Re)start the queue if new tasks are being added and the queue has been | ||
// automatically started before: | ||
if (this.options.start) { | ||
// Start the queue if the queue should resolve new tasks automatically and | ||
// the queue hasn't been forced to stop: | ||
if (this.options.start && !this.stopped) { | ||
this.start(); | ||
} | ||
this.tasks.set(this.unique++, promise); | ||
this.tasks.set(this.uniqueId++, tasks); | ||
} | ||
@@ -202,4 +236,4 @@ | ||
*/ | ||
add(promise: Function): void { | ||
this.enqueue(promise); | ||
add(tasks: Function | Array<Function>) { | ||
this.enqueue(tasks); | ||
} | ||
@@ -213,3 +247,3 @@ | ||
*/ | ||
clear(): void { | ||
clear() { | ||
this.tasks.clear(); | ||
@@ -221,8 +255,8 @@ } | ||
* | ||
* @type {boolean} | ||
* @type {boolean} | ||
* @access public | ||
*/ | ||
get isEmpty(): boolean { | ||
get isEmpty() { | ||
return this.tasks.size === 0; | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
23912
146
533