Socket
Socket
Sign inDemoInstall

vasync

Package Overview
Dependencies
Maintainers
1
Versions
24
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

vasync - npm Package Compare versions

Comparing version 2.0.0 to 2.1.0

tests/filter.js

11

CHANGES.md

@@ -7,4 +7,15 @@ # Changelog

## v2.1.0
* #33 want filter, filterLimit, and filterSeries
* #35 pipeline does not pass rv-object to final callback
## v2.0.0
** WARNING
Do not use this version (v2.0.0), as it has broken pipeline and forEachPipeline
functions.
**Breaking Changes:**

@@ -11,0 +22,0 @@

209

lib/vasync.js

@@ -18,2 +18,5 @@ /*

exports.forEachPipeline = forEachPipeline;
exports.filter = filter;
exports.filterLimit = filterLimit;
exports.filterSeries = filterSeries;
exports.queue = queue;

@@ -163,3 +166,3 @@ exports.queuev = queuev;

'stop_when': 'error',
'res_type': 'values'
'res_type': 'rv'
};

@@ -214,3 +217,121 @@ return (waterfall_impl(opts));

/*
* async.js compatible filter, filterLimit, and filterSeries. Takes an input
* array, optionally a limit, and a single function to filter an array and will
* callback with a new filtered array. This is effectively an asynchronous
* version of Array.prototype.filter.
*/
function filter(inputs, filterFunc, callback) {
return (filterLimit(inputs, Infinity, filterFunc, callback));
}
function filterSeries(inputs, filterFunc, callback) {
return (filterLimit(inputs, 1, filterFunc, callback));
}
function filterLimit(inputs, limit, filterFunc, callback) {
mod_assert.ok(Array.isArray(inputs),
'"inputs" must be specified and must be an array');
mod_assert.equal(typeof (limit), 'number',
'"limit" must be a number');
mod_assert.equal(isNaN(limit), false,
'"limit" must be a number');
mod_assert.equal(typeof (filterFunc), 'function',
'"filterFunc" must be specified and must be a function');
mod_assert.equal(typeof (callback), 'function',
'"callback" argument must be specified as a function');
var errors = [];
var q = queue(processInput, limit);
var results = [];
function processInput(input, cb) {
/*
* If the errors array has any members, an error was
* encountered in a previous invocation of filterFunc, so all
* future filtering will be skipped.
*/
if (errors.length > 0) {
cb();
return;
}
filterFunc(input.elem, function inputFiltered(err, ans) {
/*
* We ensure here that a filterFunc callback is only
* ever invoked once.
*/
if (results.hasOwnProperty(input.idx)) {
throw (new mod_verror.VError(
'vasync.filter*: filterFunc idx %d ' +
'invoked its callback twice', input.idx));
}
/*
* The original element, as well as the answer "ans"
* (truth value) is stored to later be filtered when
* all outstanding jobs are finished.
*/
results[input.idx] = {
elem: input.elem,
ans: !!ans
};
/*
* Any error encountered while filtering will result in
* all future operations being skipped, and the error
* object being returned in the users callback.
*/
if (err) {
errors.push(err);
cb();
return;
}
cb();
});
}
q.once('end', function queueDrained() {
if (errors.length > 0) {
callback(mod_verror.errorFromList(errors));
return;
}
/*
* results is now an array of objects in the same order of the
* inputs array, where each object looks like:
*
* {
* "ans": <true|false>,
* "elem": <original input element>
* }
*
* we filter out elements that have a false "ans" value, and
* then map the array to contain only the input elements.
*/
results = results.filter(function filterFalseInputs(input) {
return (input.ans);
}).map(function mapInputElements(input) {
return (input.elem);
});
callback(null, results);
});
inputs.forEach(function iterateInput(elem, idx) {
/*
* We retain the array index to ensure that order is
* maintained.
*/
q.push({
elem: elem,
idx: idx
});
});
q.close();
return (q);
}
/*

@@ -514,2 +635,36 @@ * async-compatible "queue" function.

/*
* This function is used to implement vasync-functions that need to execute a
* list of functions in a sequence, but differ in how they make use of the
* intermediate callbacks and finall callback, as well as under what conditions
* they stop executing the functions in the list. Examples of such functions
* are `pipeline`, `waterfall`, and `tryEach`. See the documentation for those
* functions to see how they operate.
*
* This function's behavior is influenced via the `opts` object that we pass
* in. This object has the following layout:
*
* {
* 'funcs': array of functions
* 'callback': the final callback
* 'args': {
* 'impl': 'pipeline' or 'tryEach' or 'waterfall'
* 'uarg': the arg passed to each func for 'pipeline'
* }
* 'stop_when': 'error' or 'success'
* 'res_type': 'values' or 'arrays' or 'rv'
* }
*
* In the object, 'res_type' is used to indicate what the type of the result
* values(s) is that we pass to the final callback. We secondarily use
* 'args.impl' to adjust this behavior in an implementation-specific way. For
* example, 'tryEach' only returns an array if it has more than 1 result passed
* to the final callback. Otherwise, it passes a solitary value to the final
* callback.
*
* In case it's not clear, 'rv' in the `res_type` member, is just the
* result-value that we also return. This is the convention in functions that
* originated in `vasync` (pipeline), but not in functions that originated in
* `async` (waterfall, tryEach).
*/
function waterfall_impl(opts)

@@ -527,4 +682,4 @@ {

mod_assert.ok(opts.res_type === 'values' ||
opts.res_type === 'array',
'"opts.res_type" must either be "value" or "array"');
opts.res_type === 'array' || opts.res_type == 'rv',
'"opts.res_type" must either be "values", "array", or "rv"');
mod_assert.ok(opts.stop_when === 'error' ||

@@ -565,3 +720,9 @@ opts.stop_when === 'success',

next = function (idx, err) {
var res_key, args, entry, nextentry;
/*
* Note that nfunc_args contains the args we will pass to the
* next func in the func-list the user gave us. Except for
* 'tryEach', which passes cb's. However, it will pass
* 'nfunc_args' to its final callback -- see below.
*/
var res_key, nfunc_args, entry, nextentry;

@@ -583,7 +744,9 @@ if (err === undefined)

opts.args.impl === 'waterfall') {
args = Array.prototype.slice.call(arguments, 2);
nfunc_args = Array.prototype.slice.call(arguments, 2);
res_key = 'results';
entry['results'] = nfunc_args;
} else if (opts.args.impl === 'pipeline') {
args = [ opts.args.uarg ];
nfunc_args = [ opts.args.uarg ];
res_key = 'result';
entry['result'] = arguments[2];
}

@@ -595,8 +758,8 @@

entry['err'] = err;
entry[res_key] = args;
if (err)
if (err) {
rv['nerrors']++;
else
rv['successes'].push(args);
} else {
rv['successes'].push(entry[res_key]);
}

@@ -610,7 +773,9 @@ if ((opts.stop_when === 'error' && err) ||

(opts.res_type === 'array' &&
args.length <= 1)) {
args.unshift(err);
callback.apply(null, args);
nfunc_args.length <= 1)) {
nfunc_args.unshift(err);
callback.apply(null, nfunc_args);
} else if (opts.res_type === 'array') {
callback(err, args);
callback(err, nfunc_args);
} else if (opts.res_type === 'rv') {
callback(err, rv);
}

@@ -622,3 +787,3 @@ }

current++;
args.push(next.bind(null, current));
nfunc_args.push(next.bind(null, current));
setImmediate(function () {

@@ -633,3 +798,3 @@ var nfunc = nextentry['func'];

* pushed `next.bind(null, current)` to the
* `args` array), before we call
* `nfunc_args` array), before we call
* `setImmediate()`. However, this is not the

@@ -642,10 +807,10 @@ * case, because the interface exposed by

* itself _can_ be called with one or more
* results, which we collect into `args` using
* the aformentioned `opts.args.impl` branch
* above, and which we pass to the callback via
* the `opts.res_type` branch above (where
* res_type is set to 'array').
* results, which we collect into `nfunc_args`
* using the aformentioned `opts.args.impl`
* branch above, and which we pass to the
* callback via the `opts.res_type` branch
* above (where res_type is set to 'array').
*/
if (opts.args.impl !== 'tryEach') {
nfunc.apply(null, args);
nfunc.apply(null, nfunc_args);
} else {

@@ -652,0 +817,0 @@ nfunc(next.bind(null, current));

4

package.json
{
"name": "vasync",
"version": "2.0.0",
"version": "2.1.0",
"description": "utilities for observable asynchronous control flow",

@@ -18,3 +18,3 @@ "main": "./lib/vasync.js",

"dependencies": {
"verror": "1.6.0"
"verror": "1.10.0"
},

@@ -21,0 +21,0 @@ "engines": [

@@ -48,2 +48,4 @@ # vasync: observable asynchronous control flow

invoke the same function on N inputs in series (and stop on failure)
* [filter/filterSeries/filterLimit](#filterfilterlimitfilterseries-filter-n-inputs-serially-or-concurrently):
filter N inputs serially or concurrently
* [waterfall](#waterfall-invoke-n-functions-in-series-stop-on-failure-and-propagate-results):

@@ -387,2 +389,43 @@ like pipeline, but propagating results between stages

### filter/filterLimit/filterSeries: filter N inputs serially or concurrently
Synopsis: `filter(inputs, filterFunc, callback)`
Synopsis: `filterSeries(inputs, filterFunc, callback)`
Synopsis: `filterLimit(inputs, limit, filterFunc, callback)`
These functions take an array (of anything) and a function to call on each
element of the array. The function must callback with a true or false value as
the second argument or an error object as the first argument. False values
will result in the element being filtered out of the results array. An error
object passed as the first argument will cause the filter function to stop
processing new elements and callback to the caller with the error immediately.
Original input array order is maintained.
`filter` and `filterSeries` are analogous to calling `filterLimit` with
a limit of `Infinity` and `1` respectively.
```js
var inputs = [
'joyent.com',
'github.com',
'asdfaqsdfj.com'
];
function filterFunc(input, cb) {
mod_dns.resolve(input, function (err, results) {
if (err) {
cb(null, false);
} else {
cb(null, true);
}
}
}
mod_vasync.filter(inputs, filterFunc, function (err, results) {
// err => undefined
// results => ['joyent.com', 'github.com']
});
```
### barrier: coordinate multiple concurrent operations

@@ -389,0 +432,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc