Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

es6-promise-pool

Package Overview
Dependencies
Maintainers
1
Versions
26
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

es6-promise-pool - npm Package Compare versions

Comparing version 0.1.2 to 0.2.0

210

es6-promise-pool.js

@@ -36,59 +36,171 @@ (function(global) {

var pool = function(source, concurrency, options) {
options = options || {};
var onResolve = options.onresolve || function() {};
var onReject = options.onreject || function() {};
var producer = toProducer(source);
var size = 0;
var consumed = false;
var poolPromise = new Promise(function(resolve, reject) {
var failed = false;
var proceed = function() {
if (!consumed) {
var promise;
while (size < concurrency && !!(promise = producer())) {
promise.then(function(result) {
size--;
if (!failed) {
onResolve(poolPromise, promise, result);
proceed();
}
}, function(err) {
if (!failed) {
failed = true;
onReject(poolPromise, promise, err);
reject(err);
}
});
size++;
}
if (!promise) {
consumed = true;
}
}
if (consumed && size === 0) {
resolve();
}
var PromisePoolEvent = function(target, type, data) {
this.target = target;
this.type = type;
this.data = data;
};
var PromisePool = function(source, concurrency, options) {
if (typeof concurrency !== 'number' ||
Math.floor(concurrency) !== concurrency ||
concurrency < 1) {
throw new Error('Invalid concurrency');
}
this._producer = toProducer(source);
this._concurrency = concurrency;
this._options = options || {};
this._listeners = {};
this._producerDone = false;
this._size = 0;
this._resolve = null;
this._reject = null;
};
PromisePool.prototype.concurrency = function(value) {
if (typeof value !== 'undefined') {
this._concurrency = value;
}
return this._concurrency;
};
PromisePool.prototype.size = function() {
return this._size;
};
PromisePool.prototype.start = function() {
var that = this;
return new Promise(function(resolve, reject) {
that._resolve = resolve;
that._reject = reject;
that._proceed();
});
};
PromisePool.prototype.addEventListener = function(type, listener) {
this._listeners[type] = this._listeners[type] || [];
if (this._listeners[type].indexOf(listener) < 0) {
this._listeners[type].push(listener);
}
};
PromisePool.prototype.removeEventListener = function(type, listener) {
if (this._listeners[type]) {
var p = this._listeners[type].indexOf(listener);
if (p >= 0) {
this._listeners[type].splice(p, 1);
}
}
};
PromisePool.prototype._fireEvent = function(type, data) {
if (this._listeners[type] && this._listeners[type].length) {
var evt = new PromisePoolEvent(this, type, data);
var listeners = this._listeners[type].slice();
for (var i = 0, l = listeners.length; i < l; ++i) {
listeners[i].call(this, evt);
}
}
};
PromisePool.prototype._settle = function(error) {
if (error) {
this._reject(error);
} else {
this._resolve();
}
this._resolve = this._reject = null;
};
PromisePool.prototype._onPooledPromiseFulfilled = function(promise, result) {
this._size--;
if (this._resolve) {
this._fireEvent('fulfilled', {
promise: promise,
result: result
});
this._proceed();
}
};
PromisePool.prototype._onPooledPromiseRejected = function(promise, error) {
this._size--;
if (this._reject) {
this._fireEvent('rejected', {
promise: promise,
error: error
});
this._settle(error || new Error('Unknown error'));
}
};
PromisePool.prototype._trackPromise = function(promise) {
var that = this;
promise.then(function(result) {
that._onPooledPromiseFulfilled(promise, result);
}, function(error) {
that._onPooledPromiseRejected(promise, error);
})
['catch'](function(err) {
that._settle(new Error('Promise processing failed: ' + err));
});
};
PromisePool.prototype._proceed = function() {
if (!this._producerDone) {
var promise;
while (this._size < this._concurrency &&
!!(promise = this._producer.call(this))) {
this._size++;
this._trackPromise(promise);
}
if (!promise) {
this._producerDone = true;
}
}
if (this._producerDone && this._size === 0) {
this._settle();
}
};
var modernizeOption = function(options, listeners, optKey, eventType, eventKey) {
if (options[optKey]) {
var cb = options[optKey];
listeners[eventType] = function(evt) {
cb(evt.target, evt.data.promise, evt.data[eventKey]);
};
proceed();
});
poolPromise.pool = {
size: function() {
return size;
},
concurrency: function(value) {
if (typeof value !== 'undefined') {
concurrency = value;
delete options[optKey];
}
};
var modernizeOptions = function(options) {
var listeners = {};
modernizeOption(options, listeners, 'onresolve', 'fulfilled', 'result');
modernizeOption(options, listeners, 'onreject', 'rejected', 'error');
return listeners;
};
var createPool = function(source, concurrency, options) {
// Legacy API: options.onresolve and options.onreject
var listeners;
if (options) {
listeners = modernizeOptions(options);
}
var pool = new PromisePool(source, concurrency, options);
if (listeners) {
for (var type in listeners) {
if (listeners.hasOwnProperty(type)) {
pool.addEventListener(type, listeners[type]);
}
return concurrency;
}
};
return poolPromise;
}
return pool.start();
};
createPool.PromisePool = PromisePool;
if (typeof module !== 'undefined' && typeof module.exports !== 'undefined') {
module.exports = pool;
module.exports = createPool;
} else {
global.promisePool = pool;
global.promisePool = createPool;
}
})(this);
{
"name": "es6-promise-pool",
"version": "0.1.2",
"version": "0.2.0",
"description": "Runs Promises in a pool that limits their maximum concurrency.",

@@ -5,0 +5,0 @@ "author": {

@@ -7,4 +7,4 @@ # Promise Pool [![Build Status](https://travis-ci.org/timdp/es6-promise-pool.svg?branch=master)](https://travis-ci.org/timdp/es6-promise-pool)

An ES6 `Promise` is a great way of handling asynchronous operations. The
`Promise.all` function provides an easy interface to let a bunch of promises
An ECMAScript 6 `Promise` is a great way of handling asynchronous operations.
The `Promise.all` function provides an easy interface to let a bunch of promises
settle concurrently.

@@ -51,2 +51,4 @@

var PromisePool = promisePool.PromisePool;
// Can also be a generator. See below.

@@ -75,9 +77,11 @@ var promiseProducer = function() {

// See below.
var options = {};
// Create a pool.
var pool = new PromisePool(promiseProducer, concurrency);
// Create a pool promise and wait for it to settle.
promisePool(promiseProducer, concurrency, options)
.then(function() {
console.log('All promises resolved');
// Start the pool.
var poolPromise = pool.start();
// Wait for the pool to settle.
poolPromise.then(function() {
console.log('All promises fulfilled');
}, function(error) {

@@ -90,3 +94,3 @@ console.log('Some promise rejected: ' + error.message);

The `promisePool` function takes a `Promise`-producing function as its first
The `PromisePool` constructor takes a `Promise`-producing function as its first
argument. Let's first assume that we have this helper function that returns a

@@ -110,5 +114,5 @@ promise for the given `value` after `time` milliseconds:

Now, let's use the helper function above to create five such promises, which
each resolve after a second. Because of the `concurrency` of `3`, the first
three promises will resolve after a second. Then, the remaining two will be
processed and resolve after another second.
are each fulfilled after a second. Because of the `concurrency` of `3`, the
first three promises will be fulfilled after one second. Then, the remaining two
will be processed and fulfilled after another second.

@@ -126,3 +130,5 @@ ```js

promisePool(promiseProducer, 3)
var pool = new PromisePool(promiseProducer, 3);
pool.start()
.then(function() {

@@ -144,3 +150,5 @@ console.log('Complete');

promisePool(promiseProducer, 3)
var pool = new PromisePool(promiseProducer, 3);
pool.start()
.then(function() {

@@ -151,24 +159,33 @@ console.log('Complete');

## Options
## Events
The `options` object lets us provide additional callback functions to listen for
promise progress.
We can also ask the promise pool to notify us when an individual promise is
fulfilled or rejected. The pool fires `fulfilled` and `rejected` events exactly
for this purpose.
When a promise settles, either `options.onresolve` or `options.onreject` will be
called. Both functions receive the pool promise (as returned by `promisePool`),
the promise that settled, and either the resolved value or the `Error` that
caused the rejection.
```js
var options = {};
var pool = new PromisePool(promiseProducer, 3);
options.onresolve = function(poolPromise, promise, result) {
console.log('Resolved: ' + result);
};
pool.addEventListener('fulfilled', function(event) {
// The event contains:
// - target: the PromisePool itself;
// - data:
// - promise: the Promise that got fulfilled;
// - result: the result of that Promise.
console.log('Fulfilled: ' + event.data.result);
});
options.onreject = function(poolPromise, promise, error) {
console.log('Rejected: ' + error.message);
};
pool.addEventListener('rejected', function(event) {
// The event contains:
// - target: the PromisePool itself;
// - data:
// - promise: the Promise that got rejected;
// - error: the Error for the rejection.
console.log('Rejected: ' + event.data.error.message);
});
promisePool(promiseProducer, concurrency, options);
pool.start()
.then(function() {
console.log('Complete');
});
```

@@ -175,0 +192,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc