Socket
Socket
Sign inDemoInstall

queue-promise

Package Overview
Dependencies
0
Maintainers
1
Versions
19
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.3.2 to 1.3.3

271

dist/index.js

@@ -1,270 +0,1 @@

"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.default = void 0;
var _events = _interopRequireDefault(require("events"));
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
function ownKeys(object, enumerableOnly) { var keys = Object.keys(object); if (Object.getOwnPropertySymbols) { var symbols = Object.getOwnPropertySymbols(object); if (enumerableOnly) symbols = symbols.filter(function (sym) { return Object.getOwnPropertyDescriptor(object, sym).enumerable; }); keys.push.apply(keys, symbols); } return keys; }
function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; if (i % 2) { ownKeys(source, true).forEach(function (key) { _defineProperty(target, key, source[key]); }); } else if (Object.getOwnPropertyDescriptors) { Object.defineProperties(target, Object.getOwnPropertyDescriptors(source)); } else { ownKeys(source).forEach(function (key) { Object.defineProperty(target, key, Object.getOwnPropertyDescriptor(source, key)); }); } } return target; }
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
/**
* A small and simple library for promise-based queues. It will resolve enqueued
* functions concurrently at a specified speed. When a task is being resolved or
* rejected, an event will be emitted.
*
* @example
* const queue = new Queue({
* concurrent: 1,
* interval: 2000
* });
*
* queue.on("resolve", data => console.log(data));
* queue.on("reject", error => console.error(error));
*
* queue.enqueue(asyncTaskA);
* queue.enqueue([asyncTaskB, asyncTaskC]);
*
* @class Queue
* @extends EventEmitter
*/
class Queue extends _events.default {
/**
* A collection to store unresolved tasks. We use a Map here because V8 uses a
* variant of hash tables that generally have O(1) complexity for retrieval
* and lookup.
*
* @see https://codereview.chromium.org/220293002/
* @type {Map}
* @access private
*/
/**
* @type {number} Used to generate unique id for each task
* @access private
*/
/**
* @type {IntervalID}
* @access private
*/
/**
* @type {number} Amount of tasks currently handled by the Queue
* @access private
*/
/**
* @type {Object} options
* @type {number} options.concurrent How many tasks should be resolved at a time
* @type {number} options.interval How often should new tasks be resolved (ms)
* @type {boolean} options.start If should resolve new tasks automatically
* @access public
*/
/**
* @type {boolean} Whether the queue has already started
* @access public
*/
/**
* @type {boolean} Whether the queue has been forced to stop
* @access public
*/
/**
* Initializes a new Queue instance with provided options.
*
* @param {Object} options
* @param {number} options.concurrent How many tasks should be resolved at a time
* @param {number} options.interval How often should new tasks be resolved (ms)
* @param {boolean} options.start If should resolve new tasks automatically
* @return {Queue}
*/
constructor(options = {}) {
super();
_defineProperty(this, "tasks", new Map());
_defineProperty(this, "uniqueId", 0);
_defineProperty(this, "intervalId", void 0);
_defineProperty(this, "currentlyHandled", 0);
_defineProperty(this, "options", {
concurrent: 5,
interval: 500,
start: true
});
_defineProperty(this, "started", false);
_defineProperty(this, "stopped", false);
this.options = _objectSpread({}, this.options, {}, options);
this.options.interval = parseInt(this.options.interval, 10);
this.options.concurrent = parseInt(this.options.concurrent, 10); // Backward compatibility:
if (options.concurrency) {
this.options.concurrent = parseInt(options.concurrency, 10);
}
}
/**
* Starts the queue if it has not been started yet.
*
* @emits start
* @return {void}
* @access public
*/
start() {
if (!this.started && !this.isEmpty) {
this.emit("start");
this.stopped = false;
this.started = true;
this.intervalId = setInterval(this.dequeue.bind(this), this.options.interval);
}
}
/**
* Stops the queue.
*
* @emits stop
* @return {void}
* @access public
*/
stop() {
this.emit("stop");
this.stopped = true;
this.started = false;
clearInterval(this.intervalId);
}
/**
* Goes to the next request and stops the loop if there is no requests left.
*
* @emits end
* @return {void}
* @access private
*/
finalize() {
this.currentlyHandled -= 1;
if (this.currentlyHandled === 0 && this.isEmpty) {
this.emit("end");
this.stop(); // Finalize doesn't force queue to stop as `Queue.stop()` does. New tasks
// should therefore be still resolved automatically if `options.start` was
// set to `true` (see `Queue.enqueue`):
this.stopped = false;
}
}
/**
* Resolves n concurrent promises from the queue.
*
* @return {Promise<any>}
* @emits resolve
* @emits reject
* @access public
*/
async dequeue() {
const promises = [];
this.tasks.forEach((promise, id) => {
// Maximum amount of parallel tasks:
if (this.currentlyHandled < this.options.concurrent) {
this.currentlyHandled++;
this.tasks.delete(id);
promises.push(Promise.resolve(promise()).then(value => {
this.emit("resolve", value);
return value;
}).catch(error => {
this.emit("reject", error);
return error;
}).finally(() => {
this.emit("dequeue");
this.finalize();
}));
}
}); // Note: Promise.all will reject if any of the concurrent promises fails,
// regardless if they are finished yet!
const output = await Promise.all(promises);
return this.options.concurrent === 1 ? output[0] : output;
}
/**
* Adds a promise to the queue.
*
* @param {Function|Array} tasks Tasks to add to the queue
* @throws {Error} When task is not a function
* @return {void}
* @access public
*/
enqueue(tasks) {
if (Array.isArray(tasks)) {
tasks.map(task => this.enqueue(task));
return;
}
if (typeof tasks !== "function") {
throw new Error(`You must provide a function, not ${typeof tasks}.`);
} // Start the queue if the queue should resolve new tasks automatically and
// hasn't been forced to stop:
if (this.options.start && !this.stopped) {
this.start();
}
this.tasks.set(this.uniqueId++, tasks);
}
/**
* @see enqueue
* @access public
*/
add(tasks) {
this.enqueue(tasks);
}
/**
* Removes all tasks from the queue.
*
* @return {void}
* @access public
*/
clear() {
this.tasks.clear();
}
/**
* Checks whether the queue is empty, i.e. there's no tasks.
*
* @type {boolean}
* @access public
*/
get isEmpty() {
return this.tasks.size === 0;
}
}
exports.default = Queue;
module.exports = exports.default;
"use strict";var _events=_interopRequireDefault(require("events"));Object.defineProperty(exports,"__esModule",{value:!0}),exports.default=void 0;function _interopRequireDefault(a){return a&&a.__esModule?a:{default:a}}function ownKeys(a,b){var c=Object.keys(a);if(Object.getOwnPropertySymbols){var d=Object.getOwnPropertySymbols(a);b&&(d=d.filter(function(b){return Object.getOwnPropertyDescriptor(a,b).enumerable})),c.push.apply(c,d)}return c}function _objectSpread(a){for(var b,c=1;c<arguments.length;c++)b=null==arguments[c]?{}:arguments[c],c%2?ownKeys(Object(b),!0).forEach(function(c){_defineProperty(a,c,b[c])}):Object.getOwnPropertyDescriptors?Object.defineProperties(a,Object.getOwnPropertyDescriptors(b)):ownKeys(Object(b)).forEach(function(c){Object.defineProperty(a,c,Object.getOwnPropertyDescriptor(b,c))});return a}function _defineProperty(a,b,c){return b in a?Object.defineProperty(a,b,{value:c,enumerable:!0,configurable:!0,writable:!0}):a[b]=c,a}class Queue extends _events.default{constructor(a={}){super(),_defineProperty(this,"tasks",new Map),_defineProperty(this,"uniqueId",0),_defineProperty(this,"intervalId",void 0),_defineProperty(this,"currentlyHandled",0),_defineProperty(this,"options",{concurrent:5,interval:500,start:!0}),_defineProperty(this,"started",!1),_defineProperty(this,"stopped",!1),this.options=_objectSpread({},this.options,{},a),this.options.interval=parseInt(this.options.interval,10),this.options.concurrent=parseInt(this.options.concurrent,10),a.concurrency&&(this.options.concurrent=parseInt(a.concurrency,10))}start(){this.started||this.isEmpty||(this.emit("start"),this.stopped=!1,this.started=!0,this.intervalId=setInterval(this.dequeue.bind(this),this.options.interval))}stop(){this.emit("stop"),this.stopped=!0,this.started=!1,clearInterval(this.intervalId)}finalize(){this.currentlyHandled-=1,0===this.currentlyHandled&&this.isEmpty&&(this.emit("end"),this.stop(),this.stopped=!1)}async dequeue(){const a=[];this.tasks.forEach((b,c)=>{this.currentlyHandled<this.options.concurrent&&(this.currentlyHandled++,this.tasks.delete(c),a.push(Promise.resolve(b()).then(a=>(this.emit("resolve",a),a)).catch(a=>(this.emit("reject",a),a)).finally(()=>{this.emit("dequeue"),this.finalize()})))});const b=await Promise.all(a);return 1===this.options.concurrent?b[0]:b}enqueue(a){if(Array.isArray(a))return void a.map(a=>this.enqueue(a));if("function"!=typeof a)throw new Error(`You must provide a function, not ${typeof a}.`);this.options.start&&!this.stopped&&this.start(),this.tasks.set(this.uniqueId++,a)}add(a){this.enqueue(a)}clear(){this.tasks.clear()}get isEmpty(){return 0===this.tasks.size}}exports.default=Queue,module.exports=exports.default;

34

package.json
{
"name": "queue-promise",
"version": "1.3.2",
"version": "1.3.3",
"keywords": [

@@ -26,19 +26,21 @@ "queue",

},
"dependencies": {},
"devDependencies": {
"@babel/cli": "^7.7.0",
"@babel/core": "^7.7.2",
"@babel/plugin-proposal-async-generator-functions": "^7.7.0",
"@babel/plugin-proposal-class-properties": "^7.7.0",
"@babel/plugin-proposal-object-rest-spread": "^7.6.2",
"@babel/preset-env": "^7.7.1",
"@babel/preset-flow": "^7.0.0",
"@babel/register": "^7.7.0",
"@babel/cli": "^7.7.5",
"@babel/core": "^7.7.5",
"@babel/plugin-proposal-async-generator-functions": "^7.7.4",
"@babel/plugin-proposal-class-properties": "^7.7.4",
"@babel/plugin-proposal-object-rest-spread": "^7.7.4",
"@babel/preset-env": "^7.7.6",
"@babel/preset-flow": "^7.7.4",
"@babel/register": "^7.7.4",
"babel-eslint": "^10.0.3",
"babel-plugin-add-module-exports": "^1.0.2",
"babel-preset-minify": "0.5.1",
"chai": "^4.2.0",
"eslint": "^6.6.0",
"eslint-config-prettier": "^6.5.0",
"eslint": "^6.7.2",
"eslint-config-prettier": "^6.7.0",
"eslint-config-standard": "^14.1.0",
"eslint-plugin-flowtype": "^4.3.0",
"eslint-plugin-import": "^2.18.2",
"eslint-plugin-flowtype": "^4.5.2",
"eslint-plugin-import": "^2.19.1",
"eslint-plugin-node": "^10.0.0",

@@ -48,3 +50,3 @@ "eslint-plugin-prettier": "^3.1.1",

"eslint-plugin-standard": "^4.0.1",
"flow-bin": "^0.111.3",
"flow-bin": "^0.113.0",
"mocha": "^6.2.2",

@@ -54,3 +56,3 @@ "prettier": "^1.19.1"

"scripts": {
"test": "npm run test:eslint && npm run test:flow && npm run test:mocha",
"test": "npm run prepare && npm run test:eslint && npm run test:flow && npm run test:mocha",
"test:flow": "flow",

@@ -62,4 +64,4 @@ "test:mocha": "mocha --require @babel/register",

"watch": "babel src -d dist -w",
"prepare": "npm run clean && npm run test && npm run build"
"prepare": "npm run clean && npm run build"
}
}

@@ -1,2 +0,2 @@

import Queue from "../../src";
import Queue from "../../dist";

@@ -3,0 +3,0 @@ export const queueFactory = (options = {}) => {

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc