maximize-iterator
Advanced tools
+9
-1
@@ -7,2 +7,3 @@ var nextCallback = require('iterator-next-callback'); | ||
| var DEFAULT_CONCURRENCY = 4096; | ||
| var DEFAULT_LIMIT = Infinity; | ||
| var MAXIMUM_BATCH = 10; | ||
@@ -20,5 +21,12 @@ | ||
| options = { | ||
| each: fn, | ||
| concurrency: options.concurrency || DEFAULT_CONCURRENCY, | ||
| limit: options.limit || DEFAULT_LIMIT, | ||
| batch: options.batch || MAXIMUM_BATCH, | ||
| each: fn, | ||
| error: | ||
| options.error || | ||
| function () { | ||
| return true; // default is exit on error | ||
| }, | ||
| total: 0, | ||
| counter: 0, | ||
@@ -25,0 +33,0 @@ }; |
+2
-2
@@ -8,3 +8,3 @@ var isPromise = require('./isPromise'); | ||
| .then(function (resolvedKeep) { | ||
| callback(null, getResult(resolvedKeep)); | ||
| callback(null, getResult(resolvedKeep, true)); | ||
| }) | ||
@@ -15,4 +15,4 @@ .catch(function (err) { | ||
| } else { | ||
| callback(null, getResult(keep)); | ||
| callback(null, getResult(keep, true)); | ||
| } | ||
| }; |
+2
-2
@@ -1,3 +0,3 @@ | ||
| module.exports = function (result) { | ||
| return result === undefined ? true : result; | ||
| module.exports = function (result, defaultValue) { | ||
| return result === undefined ? defaultValue : result; | ||
| }; |
+57
-31
@@ -0,38 +1,64 @@ | ||
| var nextTick = require('next-tick'); | ||
| var getKeep = require('./getKeep'); | ||
| var getResult = require('./getResult'); | ||
| module.exports = function maximizeNext(next, options, callback) { | ||
| var counter = 0; | ||
| while (options.counter < options.concurrency) { | ||
| if (options.done || counter++ > options.batch) return; | ||
| options.counter++; | ||
| function callDone(options, callback) { | ||
| if (!options.done || options.counter > 0) return false; | ||
| callback(options.err); | ||
| return true; | ||
| } | ||
| next(function (err, value) { | ||
| if (value === null) { | ||
| options.counter--; | ||
| options.done = true; | ||
| if (options.counter <= 0) return callback(options.err); | ||
| return; | ||
| } | ||
| function processDone(options, callback) { | ||
| options.done = true; | ||
| return callDone(options, callback); | ||
| } | ||
| try { | ||
| getKeep(options.each(err, value), function (err1, keep) { | ||
| function processError(err, options, callback) { | ||
| if (getResult(options.error(err), false)) { | ||
| options.err = err; | ||
| options.done = true; | ||
| } | ||
| return callDone(options, callback); | ||
| } | ||
| function processAvailable(next, options, callback) { | ||
| var isProcessing = false; | ||
| return function waiter() { | ||
| if (isProcessing) return; | ||
| isProcessing = true; | ||
| var counter = 0; | ||
| while (options.counter < options.concurrency) { | ||
| if (options.done || counter++ > options.batch) break; | ||
| if (options.total >= options.limit) return processDone(options, callback); | ||
| options.total++; | ||
| options.counter++; | ||
| next(function (err, value) { | ||
| if (err || value === null) { | ||
| options.counter--; | ||
| if (err1) { | ||
| options.err = err1; | ||
| options.done = true; | ||
| } else if (!keep) { | ||
| options.done = true; | ||
| } | ||
| if (options.done && options.counter <= 0) return callback(options.err); | ||
| return err ? processError(err, options, callback) : processDone(options, callback); | ||
| } | ||
| return maximizeNext(next, options, callback); | ||
| }); | ||
| } catch (err) { | ||
| options.counter--; | ||
| options.err = err; | ||
| options.done = true; | ||
| if (options.counter <= 0) return callback(options.err); | ||
| } | ||
| }); | ||
| } | ||
| try { | ||
| getKeep(options.each(value), function (err, keep) { | ||
| options.counter--; | ||
| if (err) return processError(err, options, callback); | ||
| else if (!keep) return processDone(options, callback); | ||
| if (!callDone(options, callback)) waiter(); | ||
| }); | ||
| } catch (err) { | ||
| options.counter--; | ||
| return processError(err, options, callback); | ||
| } | ||
| }); | ||
| } | ||
| isProcessing = false; | ||
| }; | ||
| } | ||
| module.exports = function maximizeNext(next, options, callback) { | ||
| processAvailable(next, options, callback)(); | ||
| }; |
+1
-1
| { | ||
| "name": "maximize-iterator", | ||
| "version": "2.0.0", | ||
| "version": "2.1.0", | ||
| "description": "Maximize the parallel calls of an iterator supporting asyncIterator interface", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
+2
-2
@@ -11,3 +11,3 @@ ## maximize-iterator | ||
| var iterator = // create it somehow with a next method returing {done: value: } | ||
| await maximize(iterator, (err, value) => { /* do something including false stop */ }, { concurrency: 1024 }); | ||
| await maximize(iterator, (value) => { /* do something including false stop */ }, { concurrency: 1024, limit: Infinity, error: (err) => { return true; /* filter errors */ } }); | ||
| })(); | ||
@@ -17,5 +17,5 @@ | ||
| var iterator = // create it somehow with a next method returing {done: value: } | ||
| maximize(iterator, (err, value) => { /* do something including false stop */ }, { concurrency: 1024 }, (err) => { | ||
| maximize(iterator, (value) => { /* do something including false stop */ }, { concurrency: 1024, limit: Infinity, error: (err) => { return true; /* filter errors */ } }, (err) => { | ||
| /* done */ | ||
| }); | ||
| ``` |
6709
19.29%116
31.82%