maximize-iterator
Advanced tools
| var compat = require('async-compatibilty'); | ||
| function callDone(options, callback) { | ||
| if (!options.done || options.counter > 0) return false; | ||
| callback(options.err); | ||
| return true; | ||
| } | ||
| function processDone(options, callback) { | ||
| options.done = true; | ||
| return callDone(options, callback); | ||
| } | ||
| function processError(err, options, callback) { | ||
| if (compat.defaultValue(options.error(err), false)) { | ||
| options.err = err; | ||
| options.done = true; | ||
| } | ||
| return callDone(options, callback); | ||
| } | ||
| module.exports = function createProcessor(next, options, callback) { | ||
| var isProcessing = false; | ||
| return function processor(done) { | ||
| if (done && processDone(options, callback)) return; | ||
| if (isProcessing) return; | ||
| isProcessing = true; | ||
| var counter = 0; | ||
| while (options.counter < options.concurrency) { | ||
| if (options.done || options.stop(counter++)) break; | ||
| if (options.total >= options.limit) return processDone(options, callback); | ||
| options.total++; | ||
| options.counter++; | ||
| next(function (err, value) { | ||
| if (err || value === null) { | ||
| options.counter--; | ||
| if (err ? processError(err, options, callback) : processDone(options, callback)) return; | ||
| return processor(); | ||
| } | ||
| try { | ||
| compat.asyncFunction(options.each, options.async, value, function (err, keep) { | ||
| options.counter--; | ||
| if (err) return processError(err, options, callback); | ||
| else if (!compat.defaultValue(keep, true)) return processDone(options, callback); | ||
| if (!callDone(options, callback)) processor(); | ||
| }); | ||
| } catch (err) { | ||
| options.counter--; | ||
| return processError(err, options, callback); | ||
| } | ||
| }); | ||
| } | ||
| isProcessing = false; | ||
| }; | ||
| }; |
+9
-10
| var nextCallback = require('iterator-next-callback'); | ||
| var callOnce = require('call-once-next-tick'); | ||
| var maximizeNext = require('./lib/maximizeNext'); | ||
| var createProcessor = require('./lib/createProcessor'); | ||
@@ -21,2 +21,3 @@ var DEFAULT_CONCURRENCY = 4096; | ||
| each: fn, | ||
| async: options.async, | ||
| concurrency: options.concurrency || DEFAULT_CONCURRENCY, | ||
@@ -32,17 +33,15 @@ limit: options.limit || DEFAULT_LIMIT, | ||
| counter: 0, | ||
| stop: function (counter) { | ||
| return counter > options.batch; | ||
| }, | ||
| }; | ||
| maximizeNext(nextCallback(iterator), options, callback); | ||
| createProcessor(nextCallback(iterator), options, callOnce(callback))(); | ||
| } else { | ||
| return new Promise(function (resolve, reject) { | ||
| maximizeIterator( | ||
| iterator, | ||
| fn, | ||
| options, | ||
| callOnce(function (err) { | ||
| err ? reject(err) : resolve(); | ||
| }) | ||
| ); | ||
| maximizeIterator(iterator, fn, options, function (err) { | ||
| err ? reject(err) : resolve(); | ||
| }); | ||
| }); | ||
| } | ||
| }; |
+3
-1
| { | ||
| "name": "maximize-iterator", | ||
| "version": "2.1.0", | ||
| "version": "2.2.0", | ||
| "description": "Maximize the parallel calls of an iterator supporting asyncIterator interface", | ||
@@ -31,3 +31,5 @@ "main": "index.js", | ||
| "dependencies": { | ||
| "async-compatibilty": "^1.1.0", | ||
| "call-once-next-tick": "^1.0.0", | ||
| "is-promise": "^2.1.0", | ||
| "iterator-next-callback": "^1.1.0" | ||
@@ -34,0 +36,0 @@ }, |
| var isPromise = require('./isPromise'); | ||
| var getResult = require('./getResult'); | ||
| module.exports = function (keep, callback) { | ||
| if (isPromise(keep)) { | ||
| keep | ||
| .then(function (resolvedKeep) { | ||
| callback(null, getResult(resolvedKeep, true)); | ||
| }) | ||
| .catch(function (err) { | ||
| callback(err); | ||
| }); | ||
| } else { | ||
| callback(null, getResult(keep, true)); | ||
| } | ||
| }; |
| module.exports = function (result, defaultValue) { | ||
| return result === undefined ? defaultValue : result; | ||
| }; |
| module.exports = function (obj) { | ||
| return !!obj && typeof obj === 'object' && typeof obj.then === 'function'; | ||
| }; |
| var nextTick = require('next-tick'); | ||
| var getKeep = require('./getKeep'); | ||
| var getResult = require('./getResult'); | ||
| function callDone(options, callback) { | ||
| if (!options.done || options.counter > 0) return false; | ||
| callback(options.err); | ||
| return true; | ||
| } | ||
| function processDone(options, callback) { | ||
| options.done = true; | ||
| return callDone(options, callback); | ||
| } | ||
| function processError(err, options, callback) { | ||
| if (getResult(options.error(err), false)) { | ||
| options.err = err; | ||
| options.done = true; | ||
| } | ||
| return callDone(options, callback); | ||
| } | ||
| function processAvailable(next, options, callback) { | ||
| var isProcessing = false; | ||
| return function waiter() { | ||
| if (isProcessing) return; | ||
| isProcessing = true; | ||
| var counter = 0; | ||
| while (options.counter < options.concurrency) { | ||
| if (options.done || counter++ > options.batch) break; | ||
| if (options.total >= options.limit) return processDone(options, callback); | ||
| options.total++; | ||
| options.counter++; | ||
| next(function (err, value) { | ||
| if (err || value === null) { | ||
| options.counter--; | ||
| return err ? processError(err, options, callback) : processDone(options, callback); | ||
| } | ||
| try { | ||
| getKeep(options.each(value), function (err, keep) { | ||
| options.counter--; | ||
| if (err) return processError(err, options, callback); | ||
| else if (!keep) return processDone(options, callback); | ||
| if (!callDone(options, callback)) waiter(); | ||
| }); | ||
| } catch (err) { | ||
| options.counter--; | ||
| return processError(err, options, callback); | ||
| } | ||
| }); | ||
| } | ||
| isProcessing = false; | ||
| }; | ||
| } | ||
| module.exports = function maximizeNext(next, options, callback) { | ||
| processAvailable(next, options, callback)(); | ||
| }; |
6236
-7.05%4
100%5
-37.5%91
-21.55%+ Added
+ Added
+ Added
+ Added
+ Added
+ Added