Comparing version 2.0.0 to 2.1.0
@@ -7,4 +7,15 @@ # Changelog | ||
## v2.1.0 | ||
* #33 want filter, filterLimit, and filterSeries | ||
* #35 pipeline does not pass rv-object to final callback | ||
## v2.0.0 | ||
** WARNING | ||
Do not use this version (v2.0.0), as it has broken pipeline and forEachPipeline | ||
functions. | ||
**Breaking Changes:** | ||
@@ -11,0 +22,0 @@ |
@@ -18,2 +18,5 @@ /* | ||
exports.forEachPipeline = forEachPipeline; | ||
exports.filter = filter; | ||
exports.filterLimit = filterLimit; | ||
exports.filterSeries = filterSeries; | ||
exports.queue = queue; | ||
@@ -163,3 +166,3 @@ exports.queuev = queuev; | ||
'stop_when': 'error', | ||
'res_type': 'values' | ||
'res_type': 'rv' | ||
}; | ||
@@ -214,3 +217,121 @@ return (waterfall_impl(opts)); | ||
/* | ||
* async.js compatible filter, filterLimit, and filterSeries. Takes an input | ||
* array, optionally a limit, and a single function to filter an array and will | ||
* callback with a new filtered array. This is effectively an asynchronous | ||
* version of Array.prototype.filter. | ||
*/ | ||
function filter(inputs, filterFunc, callback) { | ||
return (filterLimit(inputs, Infinity, filterFunc, callback)); | ||
} | ||
function filterSeries(inputs, filterFunc, callback) { | ||
return (filterLimit(inputs, 1, filterFunc, callback)); | ||
} | ||
function filterLimit(inputs, limit, filterFunc, callback) { | ||
mod_assert.ok(Array.isArray(inputs), | ||
'"inputs" must be specified and must be an array'); | ||
mod_assert.equal(typeof (limit), 'number', | ||
'"limit" must be a number'); | ||
mod_assert.equal(isNaN(limit), false, | ||
'"limit" must be a number'); | ||
mod_assert.equal(typeof (filterFunc), 'function', | ||
'"filterFunc" must be specified and must be a function'); | ||
mod_assert.equal(typeof (callback), 'function', | ||
'"callback" argument must be specified as a function'); | ||
var errors = []; | ||
var q = queue(processInput, limit); | ||
var results = []; | ||
function processInput(input, cb) { | ||
/* | ||
* If the errors array has any members, an error was | ||
* encountered in a previous invocation of filterFunc, so all | ||
* future filtering will be skipped. | ||
*/ | ||
if (errors.length > 0) { | ||
cb(); | ||
return; | ||
} | ||
filterFunc(input.elem, function inputFiltered(err, ans) { | ||
/* | ||
* We ensure here that a filterFunc callback is only | ||
* ever invoked once. | ||
*/ | ||
if (results.hasOwnProperty(input.idx)) { | ||
throw (new mod_verror.VError( | ||
'vasync.filter*: filterFunc idx %d ' + | ||
'invoked its callback twice', input.idx)); | ||
} | ||
/* | ||
* The original element, as well as the answer "ans" | ||
* (truth value) is stored to later be filtered when | ||
* all outstanding jobs are finished. | ||
*/ | ||
results[input.idx] = { | ||
elem: input.elem, | ||
ans: !!ans | ||
}; | ||
/* | ||
* Any error encountered while filtering will result in | ||
* all future operations being skipped, and the error | ||
* object being returned in the users callback. | ||
*/ | ||
if (err) { | ||
errors.push(err); | ||
cb(); | ||
return; | ||
} | ||
cb(); | ||
}); | ||
} | ||
q.once('end', function queueDrained() { | ||
if (errors.length > 0) { | ||
callback(mod_verror.errorFromList(errors)); | ||
return; | ||
} | ||
/* | ||
* results is now an array of objects in the same order of the | ||
* inputs array, where each object looks like: | ||
* | ||
* { | ||
* "ans": <true|false>, | ||
* "elem": <original input element> | ||
* } | ||
* | ||
* we filter out elements that have a false "ans" value, and | ||
* then map the array to contain only the input elements. | ||
*/ | ||
results = results.filter(function filterFalseInputs(input) { | ||
return (input.ans); | ||
}).map(function mapInputElements(input) { | ||
return (input.elem); | ||
}); | ||
callback(null, results); | ||
}); | ||
inputs.forEach(function iterateInput(elem, idx) { | ||
/* | ||
* We retain the array index to ensure that order is | ||
* maintained. | ||
*/ | ||
q.push({ | ||
elem: elem, | ||
idx: idx | ||
}); | ||
}); | ||
q.close(); | ||
return (q); | ||
} | ||
/* | ||
@@ -514,2 +635,36 @@ * async-compatible "queue" function. | ||
/* | ||
* This function is used to implement vasync-functions that need to execute a | ||
* list of functions in a sequence, but differ in how they make use of the | ||
* intermediate callbacks and finall callback, as well as under what conditions | ||
* they stop executing the functions in the list. Examples of such functions | ||
* are `pipeline`, `waterfall`, and `tryEach`. See the documentation for those | ||
* functions to see how they operate. | ||
* | ||
* This function's behavior is influenced via the `opts` object that we pass | ||
* in. This object has the following layout: | ||
* | ||
* { | ||
* 'funcs': array of functions | ||
* 'callback': the final callback | ||
* 'args': { | ||
* 'impl': 'pipeline' or 'tryEach' or 'waterfall' | ||
* 'uarg': the arg passed to each func for 'pipeline' | ||
* } | ||
* 'stop_when': 'error' or 'success' | ||
* 'res_type': 'values' or 'arrays' or 'rv' | ||
* } | ||
* | ||
* In the object, 'res_type' is used to indicate what the type of the result | ||
* values(s) is that we pass to the final callback. We secondarily use | ||
* 'args.impl' to adjust this behavior in an implementation-specific way. For | ||
* example, 'tryEach' only returns an array if it has more than 1 result passed | ||
* to the final callback. Otherwise, it passes a solitary value to the final | ||
* callback. | ||
* | ||
* In case it's not clear, 'rv' in the `res_type` member, is just the | ||
* result-value that we also return. This is the convention in functions that | ||
* originated in `vasync` (pipeline), but not in functions that originated in | ||
* `async` (waterfall, tryEach). | ||
*/ | ||
function waterfall_impl(opts) | ||
@@ -527,4 +682,4 @@ { | ||
mod_assert.ok(opts.res_type === 'values' || | ||
opts.res_type === 'array', | ||
'"opts.res_type" must either be "value" or "array"'); | ||
opts.res_type === 'array' || opts.res_type == 'rv', | ||
'"opts.res_type" must either be "values", "array", or "rv"'); | ||
mod_assert.ok(opts.stop_when === 'error' || | ||
@@ -565,3 +720,9 @@ opts.stop_when === 'success', | ||
next = function (idx, err) { | ||
var res_key, args, entry, nextentry; | ||
/* | ||
* Note that nfunc_args contains the args we will pass to the | ||
* next func in the func-list the user gave us. Except for | ||
* 'tryEach', which passes cb's. However, it will pass | ||
* 'nfunc_args' to its final callback -- see below. | ||
*/ | ||
var res_key, nfunc_args, entry, nextentry; | ||
@@ -583,7 +744,9 @@ if (err === undefined) | ||
opts.args.impl === 'waterfall') { | ||
args = Array.prototype.slice.call(arguments, 2); | ||
nfunc_args = Array.prototype.slice.call(arguments, 2); | ||
res_key = 'results'; | ||
entry['results'] = nfunc_args; | ||
} else if (opts.args.impl === 'pipeline') { | ||
args = [ opts.args.uarg ]; | ||
nfunc_args = [ opts.args.uarg ]; | ||
res_key = 'result'; | ||
entry['result'] = arguments[2]; | ||
} | ||
@@ -595,8 +758,8 @@ | ||
entry['err'] = err; | ||
entry[res_key] = args; | ||
if (err) | ||
if (err) { | ||
rv['nerrors']++; | ||
else | ||
rv['successes'].push(args); | ||
} else { | ||
rv['successes'].push(entry[res_key]); | ||
} | ||
@@ -610,7 +773,9 @@ if ((opts.stop_when === 'error' && err) || | ||
(opts.res_type === 'array' && | ||
args.length <= 1)) { | ||
args.unshift(err); | ||
callback.apply(null, args); | ||
nfunc_args.length <= 1)) { | ||
nfunc_args.unshift(err); | ||
callback.apply(null, nfunc_args); | ||
} else if (opts.res_type === 'array') { | ||
callback(err, args); | ||
callback(err, nfunc_args); | ||
} else if (opts.res_type === 'rv') { | ||
callback(err, rv); | ||
} | ||
@@ -622,3 +787,3 @@ } | ||
current++; | ||
args.push(next.bind(null, current)); | ||
nfunc_args.push(next.bind(null, current)); | ||
setImmediate(function () { | ||
@@ -633,3 +798,3 @@ var nfunc = nextentry['func']; | ||
* pushed `next.bind(null, current)` to the | ||
* `args` array), before we call | ||
* `nfunc_args` array), before we call | ||
* `setImmediate()`. However, this is not the | ||
@@ -642,10 +807,10 @@ * case, because the interface exposed by | ||
* itself _can_ be called with one or more | ||
* results, which we collect into `args` using | ||
* the aformentioned `opts.args.impl` branch | ||
* above, and which we pass to the callback via | ||
* the `opts.res_type` branch above (where | ||
* res_type is set to 'array'). | ||
* results, which we collect into `nfunc_args` | ||
* using the aformentioned `opts.args.impl` | ||
* branch above, and which we pass to the | ||
* callback via the `opts.res_type` branch | ||
* above (where res_type is set to 'array'). | ||
*/ | ||
if (opts.args.impl !== 'tryEach') { | ||
nfunc.apply(null, args); | ||
nfunc.apply(null, nfunc_args); | ||
} else { | ||
@@ -652,0 +817,0 @@ nfunc(next.bind(null, current)); |
{ | ||
"name": "vasync", | ||
"version": "2.0.0", | ||
"version": "2.1.0", | ||
"description": "utilities for observable asynchronous control flow", | ||
@@ -18,3 +18,3 @@ "main": "./lib/vasync.js", | ||
"dependencies": { | ||
"verror": "1.6.0" | ||
"verror": "1.10.0" | ||
}, | ||
@@ -21,0 +21,0 @@ "engines": [ |
@@ -48,2 +48,4 @@ # vasync: observable asynchronous control flow | ||
invoke the same function on N inputs in series (and stop on failure) | ||
* [filter/filterSeries/filterLimit](#filterfilterlimitfilterseries-filter-n-inputs-serially-or-concurrently): | ||
filter N inputs serially or concurrently | ||
* [waterfall](#waterfall-invoke-n-functions-in-series-stop-on-failure-and-propagate-results): | ||
@@ -387,2 +389,43 @@ like pipeline, but propagating results between stages | ||
### filter/filterLimit/filterSeries: filter N inputs serially or concurrently | ||
Synopsis: `filter(inputs, filterFunc, callback)` | ||
Synopsis: `filterSeries(inputs, filterFunc, callback)` | ||
Synopsis: `filterLimit(inputs, limit, filterFunc, callback)` | ||
These functions take an array (of anything) and a function to call on each | ||
element of the array. The function must callback with a true or false value as | ||
the second argument or an error object as the first argument. False values | ||
will result in the element being filtered out of the results array. An error | ||
object passed as the first argument will cause the filter function to stop | ||
processing new elements and callback to the caller with the error immediately. | ||
Original input array order is maintained. | ||
`filter` and `filterSeries` are analogous to calling `filterLimit` with | ||
a limit of `Infinity` and `1` respectively. | ||
```js | ||
var inputs = [ | ||
'joyent.com', | ||
'github.com', | ||
'asdfaqsdfj.com' | ||
]; | ||
function filterFunc(input, cb) { | ||
mod_dns.resolve(input, function (err, results) { | ||
if (err) { | ||
cb(null, false); | ||
} else { | ||
cb(null, true); | ||
} | ||
} | ||
} | ||
mod_vasync.filter(inputs, filterFunc, function (err, results) { | ||
// err => undefined | ||
// results => ['joyent.com', 'github.com'] | ||
}); | ||
``` | ||
### barrier: coordinate multiple concurrent operations | ||
@@ -389,0 +432,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
97730
28
1960
729
+ Addedassert-plus@1.0.0(transitive)
+ Addedcore-util-is@1.0.2(transitive)
+ Addedextsprintf@1.4.1(transitive)
+ Addedverror@1.10.0(transitive)
- Removedextsprintf@1.2.0(transitive)
- Removedverror@1.6.0(transitive)
Updatedverror@1.10.0