node-stream
Advanced tools
Comparing version 1.3.1 to 1.4.0
@@ -25,2 +25,6 @@ var selectVersion = require('./consumers/selectVersion.js'); | ||
var pluck = require('./modifiers/pluck.js'); | ||
var find = require('./modifiers/find.js'); | ||
var findWhere = require('./modifiers/findWhere.js'); | ||
var drop = require('./modifiers/drop.js'); | ||
var take = require('./modifiers/take.js'); | ||
@@ -46,2 +50,6 @@ // Consumers | ||
module.exports.pluck = pluck; | ||
module.exports.find = find; | ||
module.exports.findWhere = findWhere; | ||
module.exports.drop = drop; | ||
module.exports.take = take; | ||
@@ -48,0 +56,0 @@ /** |
var through = require('through2'); | ||
var makeAsync = require('../_utils/makeAsync.js'); | ||
/** | ||
* Creates a new stream with all elements that pass the test implemented by the | ||
* provided function. | ||
* provided function. Similar to Array.filter... but on a stream. | ||
* | ||
* Applies the function `condition` to each item in the stream. The `condition` | ||
* is called with two arguments: | ||
* - The stream `item` and | ||
* - A `callback`. | ||
* If the `condition` passes an error to its `callback`, the stream emits an "error" event. | ||
* | ||
* @static | ||
@@ -17,38 +13,62 @@ * @since 1.0.0 | ||
* | ||
* @param {Function} condition - Function that filters elements on the stream. The | ||
* function is passed a `callback(err, keep)` which | ||
* must be called once it's completed. | ||
* @returns {Stream} - Transform stream. | ||
* @param {Function} condition - Function that filters elements on the stream. | ||
* Takes one argument, the value of the item at | ||
* this position in the stream. | ||
* @returns {Stream} - A transform stream with the filtered values. | ||
* | ||
* @example | ||
* | ||
* // remove random chunks of data because data loss is fun | ||
* fs.createReadStream('example.txt') | ||
* .pipe(nodeStream.filter((value, next) => { | ||
* const rnd = Math.random() > 0.6; | ||
* next(null, rnd); | ||
* })) | ||
* // => please don't ever do this | ||
* // If you wanted to create a new stream whose values all passed a certain criteria, | ||
* // you could do something like the following. Assuming "test-scores.txt" is a file | ||
* // containing the following data: | ||
* // Sally...90 | ||
* // Tommy...94 | ||
* // Jimmy...12 | ||
* // Sarah...82 | ||
* // Jonny...64 | ||
* | ||
* // We can write a function that returns the students who are failing: | ||
* fs.createReadStream('test-scores.txt') | ||
* .pipe(nodeStream.split()) // split on new lines | ||
* .pipe(nodeStream.filter(value => { | ||
* const [student, testScore] = value.toString().split('...'); | ||
* | ||
* // return a filtered object stream | ||
* objStream | ||
* return Number(testScore) < 70; | ||
* })); | ||
* | ||
* // The resulting stream would have the following data: | ||
* // Jimmy...12 | ||
* // Jonny...64 | ||
* | ||
* | ||
* // It is also possible to filter a stream asynchronously for more complex actions. | ||
* // Note: The signature of the function that you pass as the callback is important. It | ||
* // MUST have *two* parameters. | ||
* | ||
* // Assuming "filenames.txt" is a newline-separated list of file names, you could | ||
* // create a new stream with only valid names by doing something like the following: | ||
* fs.createReadStream('filenames.txt') | ||
* .pipe(nodeStream.split()) // split on new lines | ||
* .pipe(nodeStream.filter((value, next) => { | ||
* fs.stat(value, (err, stats) => { | ||
* | ||
* if (value.author === 'stezu') { | ||
* return next(null, true); | ||
* } | ||
* // Error the stream since this file is not valid | ||
* if (err) { | ||
* return next(err); | ||
* } | ||
* | ||
* if (typeof value.author === 'undefined') { | ||
* return next(new Error('unknown data')); // emit an error on the stream | ||
* } | ||
* next(null, stats.isFile()); | ||
* }); | ||
* })); | ||
* | ||
* return next(null, false); | ||
* })); | ||
* // The resulting stream will contain the filenames that passed the test. Note: If `next` | ||
* // is called with an error as the first argument, the stream will error. This is typical | ||
* // behavior for node callbacks. | ||
*/ | ||
function filter(condition) { | ||
var cb = makeAsync(condition, 2); | ||
return through.obj(function (chunk, enc, next) { | ||
condition(chunk, function (err, keep) { | ||
cb(chunk, function (err, keep) { | ||
@@ -55,0 +75,0 @@ if (err) { |
var through = require('through2'); | ||
var makeAsync = require('../_utils/makeAsync.js'); | ||
/** | ||
* Creates a new stream with the results of calling the provided function on | ||
* every element in the stream. | ||
* every item in the stream. Similar to Array.map... but on a stream. | ||
* | ||
* Applies the function `transform` to each item in the stream. The `transform` | ||
* is called with two arguments: | ||
* - The stream `item` and | ||
* - A `callback`. | ||
* If the `transform` passes an error to its `callback`, the stream emits an "error" event. | ||
* | ||
* @static | ||
@@ -17,37 +13,39 @@ * @since 1.0.0 | ||
* | ||
* @param {Function} transform - Function that produces a new element on the stream. | ||
* The function is passed a `callback(err, value)` which | ||
* must be called once it's completed. | ||
* @returns {Stream} - Transform stream. | ||
* @param {Function} transform - Function that returns a new element on the | ||
* stream. Takes one argument, the value of the | ||
* item at this position in the stream. | ||
* @returns {Stream} - A transform stream with the modified values. | ||
* | ||
* @example | ||
* | ||
* // replace every period with a comma to create a run-on sentence | ||
* fs.createReadStream('example.txt') // the text has periods. because, english. | ||
* .pipe(nodeStream.map((value, next) => { | ||
* const str = value.toString() | ||
* next(null, str.replace('.', ',')); | ||
* })) | ||
* // => the text has periods, because, english, | ||
* // For a simple find/replace, you could do something like the following. Assuming | ||
* // "example.txt" is a file with the text "the text has periods. because, english.", | ||
* // you could replace each period with a comma like so: | ||
* fs.createReadStream('example.txt') | ||
* .pipe(nodeStream.map(value => value.toString().replace('.', ','))); | ||
* | ||
* // The resulting stream will have the value "the text has periods, because, english,". | ||
* | ||
* // parse a newline-separated JSON file | ||
* fs.createReadStream('example.log') | ||
* .pipe(nodeStream.split()) | ||
* .pipe(nodeStream.map((value, next) => { | ||
* let parsed; | ||
* | ||
* try { | ||
* parsed = JSON.parse(value); | ||
* } catch(e) { | ||
* return next(e); // failed to parse, emit an error on the stream | ||
* } | ||
* // It is also possible to transform a stream asynchronously for more complex actions. | ||
* // Note: The signature of the function that you pass as the callback is important. It | ||
* // MUST have *two* parameters. | ||
* | ||
* next(null, parsed); // emit a parsed object on the stream | ||
* // Assuming "filenames.txt" is a newline-separated list of file names, you could | ||
* // create a new stream with their contents by doing something like the following: | ||
* fs.createReadStream('filenames.txt') | ||
* .pipe(nodeStream.split()) // split on new lines | ||
* .pipe(nodeStream.map((value, next) => { | ||
* fs.readFile(value, next); | ||
* })); | ||
* | ||
* // The resulting stream will contain the text of each file. Note: If `next` is called | ||
* // with an error as the first argument, the stream will error. This is typical behavior | ||
* // for node callbacks. | ||
*/ | ||
function map(transform) { | ||
var cb = makeAsync(transform, 2); | ||
return through.obj(function (value, enc, next) { | ||
transform(value, next); | ||
cb(value, next); | ||
}); | ||
@@ -54,0 +52,0 @@ } |
var through = require('through2'); | ||
var makeAsync = require('../_utils/makeAsync.js'); | ||
/** | ||
* Creates a new stream with a single value that's produced by calling a reducer | ||
* with each element of the original array. | ||
* Creates a new stream with a single item that's produced by calling a reducer with | ||
* each item of the original stream. Similar to Array.reduce... but on a stream. | ||
* | ||
* Applies the function `reducer` to each item in the stream. The `reducer` | ||
* is called with three arguments: | ||
* - The value previously returned in the last invocation of the callback, or | ||
* initialValue, if supplied. | ||
* - The stream `item` and | ||
* - A `callback`. | ||
* If the `reducer` passes an error to its `callback`, the stream emits an "error" event. | ||
* | ||
* @static | ||
@@ -19,38 +13,59 @@ * @since 1.0.0 | ||
* | ||
* @param {Function} reducer - Function that reduces elements on the stream. | ||
* The function is passed a `callback(err, result)` | ||
* which must be called once it's completed. | ||
* @param {Function} reducer - Function that reduces items in the stream. Takes | ||
* two arguments: the current value of the reduction, | ||
* and the value of the item at this position in the | ||
* stream. | ||
* @param {*} [initialValue] - Value to use as the first argument to the first | ||
* call of the `reducer`. | ||
* @returns {Stream} - Transform stream. | ||
* @returns {Stream} - A transform stream that results from the reduction. | ||
* | ||
* @example | ||
* | ||
* // determine the content length of the given stream | ||
* // If you wanted to determine the content-length of a stream, you could do something like | ||
* // the following. Assuming "example.txt" is a large file, you could determine it's length | ||
* // by doing the following: | ||
* fs.createReadStream('example.txt') | ||
* .pipe(nodeStream.reduce((size, value, next) => { | ||
* next(null, size + value.length); | ||
* }, 0)) | ||
* .pipe(nodeStream.reduce((length, value) => length + value.length), 0); | ||
* | ||
* // The resulting stream will have an integer value representing the length of "example.txt". | ||
* | ||
* // find the most popular authors in an object stream | ||
* objStream | ||
* .pipe(nodeStream.reduce((authors, value, next) => { | ||
* | ||
* if (typeof value.author === 'undefined') { | ||
* return next(new Error('unknown data')); // emit an error on the stream | ||
* } | ||
* // It is also possible to reduce a stream asynchronously for more complex actions. | ||
* // Note: The signature of the function that you pass as the callback is important. It | ||
* // MUST have *three* parameters. | ||
* | ||
* if (!authors[value.author]) { | ||
* authors[value.author] = 0; | ||
* } | ||
* // Assuming "twitterers.txt" is a newline-separated list of your favorite tweeters, you | ||
* // could identify which is the most recently active by using the Twitter API: | ||
* fs.createReadStream('twitterers.txt') | ||
* .pipe(nodeStream.split()) // split on new lines | ||
* .pipe(nodeStream.reduce((memo, user, next) => { | ||
* twit.get('search/tweets', { q: `from:${user}`, count: 1 }, (err, data) => { | ||
* | ||
* authors[value.author] += 1; | ||
* // Error the stream since this request failed | ||
* if (err) { | ||
* return next(err); | ||
* } | ||
* | ||
* return next(null, authors); | ||
* }, {})); | ||
* // => { 'paul': 4, 'lisa': 12, 'mary': 1 } | ||
* // This is the first iteration of the reduction, so we automatically save the tweet | ||
* if (!memo) { | ||
* return next(null, data); | ||
* } | ||
* | ||
* // This tweet is the most recent so far, save it for later | ||
* if (new Date(data.statuses.created_at) > new Date(memo.statuses.created_at)) { | ||
* return next(null, data); | ||
* } | ||
* | ||
* // The tweet we have saved is still the most recent | ||
* next(null, memo); | ||
* }); | ||
* })); | ||
* | ||
* // The resulting stream will contain the most recent tweet of the users in the list. | ||
* // Note: If `next` is called with an error as the first argument, the stream will error. | ||
* // This is typical behavior for node callbacks. | ||
*/ | ||
function reduce(reducer, initialValue) { | ||
var accumulator = initialValue; | ||
var cb = makeAsync(reducer, 3); | ||
@@ -60,3 +75,3 @@ return through.obj( | ||
reducer(accumulator, chunk, function (err, result) { | ||
cb(accumulator, chunk, function (err, result) { | ||
accumulator = result; | ||
@@ -63,0 +78,0 @@ next(err); |
{ | ||
"name": "node-stream", | ||
"version": "1.3.1", | ||
"version": "1.4.0", | ||
"description": "Utilities for consuming, creating and manipulating node streams.", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
42749
27
1272