es6-promise-pool
Advanced tools
Comparing version 0.1.2 to 0.2.0
@@ -36,59 +36,171 @@ (function(global) { | ||
var pool = function(source, concurrency, options) { | ||
options = options || {}; | ||
var onResolve = options.onresolve || function() {}; | ||
var onReject = options.onreject || function() {}; | ||
var producer = toProducer(source); | ||
var size = 0; | ||
var consumed = false; | ||
var poolPromise = new Promise(function(resolve, reject) { | ||
var failed = false; | ||
var proceed = function() { | ||
if (!consumed) { | ||
var promise; | ||
while (size < concurrency && !!(promise = producer())) { | ||
promise.then(function(result) { | ||
size--; | ||
if (!failed) { | ||
onResolve(poolPromise, promise, result); | ||
proceed(); | ||
} | ||
}, function(err) { | ||
if (!failed) { | ||
failed = true; | ||
onReject(poolPromise, promise, err); | ||
reject(err); | ||
} | ||
}); | ||
size++; | ||
} | ||
if (!promise) { | ||
consumed = true; | ||
} | ||
} | ||
if (consumed && size === 0) { | ||
resolve(); | ||
} | ||
var PromisePoolEvent = function(target, type, data) { | ||
this.target = target; | ||
this.type = type; | ||
this.data = data; | ||
}; | ||
var PromisePool = function(source, concurrency, options) { | ||
if (typeof concurrency !== 'number' || | ||
Math.floor(concurrency) !== concurrency || | ||
concurrency < 1) { | ||
throw new Error('Invalid concurrency'); | ||
} | ||
this._producer = toProducer(source); | ||
this._concurrency = concurrency; | ||
this._options = options || {}; | ||
this._listeners = {}; | ||
this._producerDone = false; | ||
this._size = 0; | ||
this._resolve = null; | ||
this._reject = null; | ||
}; | ||
PromisePool.prototype.concurrency = function(value) { | ||
if (typeof value !== 'undefined') { | ||
this._concurrency = value; | ||
} | ||
return this._concurrency; | ||
}; | ||
PromisePool.prototype.size = function() { | ||
return this._size; | ||
}; | ||
PromisePool.prototype.start = function() { | ||
var that = this; | ||
return new Promise(function(resolve, reject) { | ||
that._resolve = resolve; | ||
that._reject = reject; | ||
that._proceed(); | ||
}); | ||
}; | ||
PromisePool.prototype.addEventListener = function(type, listener) { | ||
this._listeners[type] = this._listeners[type] || []; | ||
if (this._listeners[type].indexOf(listener) < 0) { | ||
this._listeners[type].push(listener); | ||
} | ||
}; | ||
PromisePool.prototype.removeEventListener = function(type, listener) { | ||
if (this._listeners[type]) { | ||
var p = this._listeners[type].indexOf(listener); | ||
if (p >= 0) { | ||
this._listeners[type].splice(p, 1); | ||
} | ||
} | ||
}; | ||
PromisePool.prototype._fireEvent = function(type, data) { | ||
if (this._listeners[type] && this._listeners[type].length) { | ||
var evt = new PromisePoolEvent(this, type, data); | ||
var listeners = this._listeners[type].slice(); | ||
for (var i = 0, l = listeners.length; i < l; ++i) { | ||
listeners[i].call(this, evt); | ||
} | ||
} | ||
}; | ||
PromisePool.prototype._settle = function(error) { | ||
if (error) { | ||
this._reject(error); | ||
} else { | ||
this._resolve(); | ||
} | ||
this._resolve = this._reject = null; | ||
}; | ||
PromisePool.prototype._onPooledPromiseFulfilled = function(promise, result) { | ||
this._size--; | ||
if (this._resolve) { | ||
this._fireEvent('fulfilled', { | ||
promise: promise, | ||
result: result | ||
}); | ||
this._proceed(); | ||
} | ||
}; | ||
PromisePool.prototype._onPooledPromiseRejected = function(promise, error) { | ||
this._size--; | ||
if (this._reject) { | ||
this._fireEvent('rejected', { | ||
promise: promise, | ||
error: error | ||
}); | ||
this._settle(error || new Error('Unknown error')); | ||
} | ||
}; | ||
PromisePool.prototype._trackPromise = function(promise) { | ||
var that = this; | ||
promise.then(function(result) { | ||
that._onPooledPromiseFulfilled(promise, result); | ||
}, function(error) { | ||
that._onPooledPromiseRejected(promise, error); | ||
}) | ||
['catch'](function(err) { | ||
that._settle(new Error('Promise processing failed: ' + err)); | ||
}); | ||
}; | ||
PromisePool.prototype._proceed = function() { | ||
if (!this._producerDone) { | ||
var promise; | ||
while (this._size < this._concurrency && | ||
!!(promise = this._producer.call(this))) { | ||
this._size++; | ||
this._trackPromise(promise); | ||
} | ||
if (!promise) { | ||
this._producerDone = true; | ||
} | ||
} | ||
if (this._producerDone && this._size === 0) { | ||
this._settle(); | ||
} | ||
}; | ||
var modernizeOption = function(options, listeners, optKey, eventType, eventKey) { | ||
if (options[optKey]) { | ||
var cb = options[optKey]; | ||
listeners[eventType] = function(evt) { | ||
cb(evt.target, evt.data.promise, evt.data[eventKey]); | ||
}; | ||
proceed(); | ||
}); | ||
poolPromise.pool = { | ||
size: function() { | ||
return size; | ||
}, | ||
concurrency: function(value) { | ||
if (typeof value !== 'undefined') { | ||
concurrency = value; | ||
delete options[optKey]; | ||
} | ||
}; | ||
var modernizeOptions = function(options) { | ||
var listeners = {}; | ||
modernizeOption(options, listeners, 'onresolve', 'fulfilled', 'result'); | ||
modernizeOption(options, listeners, 'onreject', 'rejected', 'error'); | ||
return listeners; | ||
}; | ||
var createPool = function(source, concurrency, options) { | ||
// Legacy API: options.onresolve and options.onreject | ||
var listeners; | ||
if (options) { | ||
listeners = modernizeOptions(options); | ||
} | ||
var pool = new PromisePool(source, concurrency, options); | ||
if (listeners) { | ||
for (var type in listeners) { | ||
if (listeners.hasOwnProperty(type)) { | ||
pool.addEventListener(type, listeners[type]); | ||
} | ||
return concurrency; | ||
} | ||
}; | ||
return poolPromise; | ||
} | ||
return pool.start(); | ||
}; | ||
createPool.PromisePool = PromisePool; | ||
if (typeof module !== 'undefined' && typeof module.exports !== 'undefined') { | ||
module.exports = pool; | ||
module.exports = createPool; | ||
} else { | ||
global.promisePool = pool; | ||
global.promisePool = createPool; | ||
} | ||
})(this); |
{ | ||
"name": "es6-promise-pool", | ||
"version": "0.1.2", | ||
"version": "0.2.0", | ||
"description": "Runs Promises in a pool that limits their maximum concurrency.", | ||
@@ -5,0 +5,0 @@ "author": { |
@@ -7,4 +7,4 @@ # Promise Pool [![Build Status](https://travis-ci.org/timdp/es6-promise-pool.svg?branch=master)](https://travis-ci.org/timdp/es6-promise-pool) | ||
An ES6 `Promise` is a great way of handling asynchronous operations. The | ||
`Promise.all` function provides an easy interface to let a bunch of promises | ||
An ECMAScript 6 `Promise` is a great way of handling asynchronous operations. | ||
The `Promise.all` function provides an easy interface to let a bunch of promises | ||
settle concurrently. | ||
@@ -51,2 +51,4 @@ | ||
var PromisePool = promisePool.PromisePool; | ||
// Can also be a generator. See below. | ||
@@ -75,9 +77,11 @@ var promiseProducer = function() { | ||
// See below. | ||
var options = {}; | ||
// Create a pool. | ||
var pool = new PromisePool(promiseProducer, concurrency); | ||
// Create a pool promise and wait for it to settle. | ||
promisePool(promiseProducer, concurrency, options) | ||
.then(function() { | ||
console.log('All promises resolved'); | ||
// Start the pool. | ||
var poolPromise = pool.start(); | ||
// Wait for the pool to settle. | ||
poolPromise.then(function() { | ||
console.log('All promises fulfilled'); | ||
}, function(error) { | ||
@@ -90,3 +94,3 @@ console.log('Some promise rejected: ' + error.message); | ||
The `promisePool` function takes a `Promise`-producing function as its first | ||
The `PromisePool` constructor takes a `Promise`-producing function as its first | ||
argument. Let's first assume that we have this helper function that returns a | ||
@@ -110,5 +114,5 @@ promise for the given `value` after `time` milliseconds: | ||
Now, let's use the helper function above to create five such promises, which | ||
each resolve after a second. Because of the `concurrency` of `3`, the first | ||
three promises will resolve after a second. Then, the remaining two will be | ||
processed and resolve after another second. | ||
are each fulfilled after a second. Because of the `concurrency` of `3`, the | ||
first three promises will be fulfilled after one second. Then, the remaining two | ||
will be processed and fulfilled after another second. | ||
@@ -126,3 +130,5 @@ ```js | ||
promisePool(promiseProducer, 3) | ||
var pool = new PromisePool(promiseProducer, 3); | ||
pool.start() | ||
.then(function() { | ||
@@ -144,3 +150,5 @@ console.log('Complete'); | ||
promisePool(promiseProducer, 3) | ||
var pool = new PromisePool(promiseProducer, 3); | ||
pool.start() | ||
.then(function() { | ||
@@ -151,24 +159,33 @@ console.log('Complete'); | ||
## Options | ||
## Events | ||
The `options` object lets us provide additional callback functions to listen for | ||
promise progress. | ||
We can also ask the promise pool to notify us when an individual promise is | ||
fulfilled or rejected. The pool fires `fulfilled` and `rejected` events exactly | ||
for this purpose. | ||
When a promise settles, either `options.onresolve` or `options.onreject` will be | ||
called. Both functions receive the pool promise (as returned by `promisePool`), | ||
the promise that settled, and either the resolved value or the `Error` that | ||
caused the rejection. | ||
```js | ||
var options = {}; | ||
var pool = new PromisePool(promiseProducer, 3); | ||
options.onresolve = function(poolPromise, promise, result) { | ||
console.log('Resolved: ' + result); | ||
}; | ||
pool.addEventListener('fulfilled', function(event) { | ||
// The event contains: | ||
// - target: the PromisePool itself; | ||
// - data: | ||
// - promise: the Promise that got fulfilled; | ||
// - result: the result of that Promise. | ||
console.log('Fulfilled: ' + event.data.result); | ||
}); | ||
options.onreject = function(poolPromise, promise, error) { | ||
console.log('Rejected: ' + error.message); | ||
}; | ||
pool.addEventListener('rejected', function(event) { | ||
// The event contains: | ||
// - target: the PromisePool itself; | ||
// - data: | ||
// - promise: the Promise that got rejected; | ||
// - error: the Error for the rejection. | ||
console.log('Rejected: ' + event.data.error.message); | ||
}); | ||
promisePool(promiseProducer, concurrency, options); | ||
pool.start() | ||
.then(function() { | ||
console.log('Complete'); | ||
}); | ||
``` | ||
@@ -175,0 +192,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
13650
184
216
0