Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

node-stream

Package Overview
Dependencies
Maintainers
1
Versions
16
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

node-stream - npm Package Compare versions

Comparing version 1.3.1 to 1.4.0

lib/_utils/makeAsync.js

8

lib/index.js

@@ -25,2 +25,6 @@ var selectVersion = require('./consumers/selectVersion.js');

var pluck = require('./modifiers/pluck.js');
var find = require('./modifiers/find.js');
var findWhere = require('./modifiers/findWhere.js');
var drop = require('./modifiers/drop.js');
var take = require('./modifiers/take.js');

@@ -46,2 +50,6 @@ // Consumers

module.exports.pluck = pluck;
module.exports.find = find;
module.exports.findWhere = findWhere;
module.exports.drop = drop;
module.exports.take = take;

@@ -48,0 +56,0 @@ /**

78

lib/modifiers/filter.js
var through = require('through2');
var makeAsync = require('../_utils/makeAsync.js');
/**
* Creates a new stream with all elements that pass the test implemented by the
* provided function.
* provided function. Similar to Array.filter... but on a stream.
*
* Applies the function `condition` to each item in the stream. The `condition`
* is called with two arguments:
* - The stream `item` and
* - A `callback`.
* If the `condition` passes an error to its `callback`, the stream emits an "error" event.
*
* @static

@@ -17,38 +13,62 @@ * @since 1.0.0

*
* @param {Function} condition - Function that filters elements on the stream. The
* function is passed a `callback(err, keep)` which
* must be called once it's completed.
* @returns {Stream} - Transform stream.
* @param {Function} condition - Function that filters elements on the stream.
* Takes one argument, the value of the item at
* this position in the stream.
* @returns {Stream} - A transform stream with the filtered values.
*
* @example
*
* // remove random chunks of data because data loss is fun
* fs.createReadStream('example.txt')
* .pipe(nodeStream.filter((value, next) => {
* const rnd = Math.random() > 0.6;
* next(null, rnd);
* }))
* // => please don't ever do this
* // If you wanted to create a new stream whose values all passed a certain criteria,
* // you could do something like the following. Assuming "test-scores.txt" is a file
* // containing the following data:
* // Sally...90
* // Tommy...94
* // Jimmy...12
* // Sarah...82
* // Jonny...64
*
* // We can write a function that returns the students who are failing:
* fs.createReadStream('test-scores.txt')
* .pipe(nodeStream.split()) // split on new lines
* .pipe(nodeStream.filter(value => {
* const [student, testScore] = value.toString().split('...');
*
* // return a filtered object stream
* objStream
* return Number(testScore) < 70;
* }));
*
* // The resulting stream would have the following data:
* // Jimmy...12
* // Jonny...64
*
*
* // It is also possible to filter a stream asynchronously for more complex actions.
* // Note: The signature of the function that you pass as the callback is important. It
* // MUST have *two* parameters.
*
* // Assuming "filenames.txt" is a newline-separated list of file names, you could
* // create a new stream with only valid names by doing something like the following:
* fs.createReadStream('filenames.txt')
* .pipe(nodeStream.split()) // split on new lines
* .pipe(nodeStream.filter((value, next) => {
* fs.stat(value, (err, stats) => {
*
* if (value.author === 'stezu') {
* return next(null, true);
* }
* // Error the stream since this file is not valid
* if (err) {
* return next(err);
* }
*
* if (typeof value.author === 'undefined') {
* return next(new Error('unknown data')); // emit an error on the stream
* }
* next(null, stats.isFile());
* });
* }));
*
* return next(null, false);
* }));
* // The resulting stream will contain the filenames that passed the test. Note: If `next`
* // is called with an error as the first argument, the stream will error. This is typical
* // behavior for node callbacks.
*/
function filter(condition) {
var cb = makeAsync(condition, 2);
return through.obj(function (chunk, enc, next) {
condition(chunk, function (err, keep) {
cb(chunk, function (err, keep) {

@@ -55,0 +75,0 @@ if (err) {

var through = require('through2');
var makeAsync = require('../_utils/makeAsync.js');
/**
* Creates a new stream with the results of calling the provided function on
* every element in the stream.
* every item in the stream. Similar to Array.map... but on a stream.
*
* Applies the function `transform` to each item in the stream. The `transform`
* is called with two arguments:
* - The stream `item` and
* - A `callback`.
* If the `transform` passes an error to its `callback`, the stream emits an "error" event.
*
* @static

@@ -17,37 +13,39 @@ * @since 1.0.0

*
* @param {Function} transform - Function that produces a new element on the stream.
* The function is passed a `callback(err, value)` which
* must be called once it's completed.
* @returns {Stream} - Transform stream.
* @param {Function} transform - Function that returns a new element on the
* stream. Takes one argument, the value of the
* item at this position in the stream.
* @returns {Stream} - A transform stream with the modified values.
*
* @example
*
* // replace every period with a comma to create a run-on sentence
* fs.createReadStream('example.txt') // the text has periods. because, english.
* .pipe(nodeStream.map((value, next) => {
* const str = value.toString()
* next(null, str.replace('.', ','));
* }))
* // => the text has periods, because, english,
* // For a simple find/replace, you could do something like the following. Assuming
* // "example.txt" is a file with the text "the text has periods. because, english.",
* // you could replace each period with a comma like so:
* fs.createReadStream('example.txt')
* .pipe(nodeStream.map(value => value.toString().replace('.', ',')));
*
* // The resulting stream will have the value "the text has periods, because, english,".
*
* // parse a newline-separated JSON file
* fs.createReadStream('example.log')
* .pipe(nodeStream.split())
* .pipe(nodeStream.map((value, next) => {
* let parsed;
*
* try {
* parsed = JSON.parse(value);
* } catch(e) {
* return next(e); // failed to parse, emit an error on the stream
* }
* // It is also possible to transform a stream asynchronously for more complex actions.
* // Note: The signature of the function that you pass as the callback is important. It
* // MUST have *two* parameters.
*
* next(null, parsed); // emit a parsed object on the stream
* // Assuming "filenames.txt" is a newline-separated list of file names, you could
* // create a new stream with their contents by doing something like the following:
* fs.createReadStream('filenames.txt')
* .pipe(nodeStream.split()) // split on new lines
* .pipe(nodeStream.map((value, next) => {
* fs.readFile(value, next);
* }));
*
* // The resulting stream will contain the text of each file. Note: If `next` is called
* // with an error as the first argument, the stream will error. This is typical behavior
* // for node callbacks.
*/
function map(transform) {
var cb = makeAsync(transform, 2);
return through.obj(function (value, enc, next) {
transform(value, next);
cb(value, next);
});

@@ -54,0 +52,0 @@ }

var through = require('through2');
var makeAsync = require('../_utils/makeAsync.js');
/**
* Creates a new stream with a single value that's produced by calling a reducer
* with each element of the original array.
* Creates a new stream with a single item that's produced by calling a reducer with
* each item of the original stream. Similar to Array.reduce... but on a stream.
*
* Applies the function `reducer` to each item in the stream. The `reducer`
* is called with three arguments:
* - The value previously returned in the last invocation of the callback, or
* initialValue, if supplied.
* - The stream `item` and
* - A `callback`.
* If the `reducer` passes an error to its `callback`, the stream emits an "error" event.
*
* @static

@@ -19,38 +13,59 @@ * @since 1.0.0

*
* @param {Function} reducer - Function that reduces elements on the stream.
* The function is passed a `callback(err, result)`
* which must be called once it's completed.
* @param {Function} reducer - Function that reduces items in the stream. Takes
* two arguments: the current value of the reduction,
* and the value of the item at this position in the
* stream.
* @param {*} [initialValue] - Value to use as the first argument to the first
* call of the `reducer`.
* @returns {Stream} - Transform stream.
* @returns {Stream} - A transform stream that results from the reduction.
*
* @example
*
* // determine the content length of the given stream
* // If you wanted to determine the content-length of a stream, you could do something like
* // the following. Assuming "example.txt" is a large file, you could determine it's length
* // by doing the following:
* fs.createReadStream('example.txt')
* .pipe(nodeStream.reduce((size, value, next) => {
* next(null, size + value.length);
* }, 0))
* .pipe(nodeStream.reduce((length, value) => length + value.length), 0);
*
* // The resulting stream will have an integer value representing the length of "example.txt".
*
* // find the most popular authors in an object stream
* objStream
* .pipe(nodeStream.reduce((authors, value, next) => {
*
* if (typeof value.author === 'undefined') {
* return next(new Error('unknown data')); // emit an error on the stream
* }
* // It is also possible to reduce a stream asynchronously for more complex actions.
* // Note: The signature of the function that you pass as the callback is important. It
* // MUST have *three* parameters.
*
* if (!authors[value.author]) {
* authors[value.author] = 0;
* }
* // Assuming "twitterers.txt" is a newline-separated list of your favorite tweeters, you
* // could identify which is the most recently active by using the Twitter API:
* fs.createReadStream('twitterers.txt')
* .pipe(nodeStream.split()) // split on new lines
* .pipe(nodeStream.reduce((memo, user, next) => {
* twit.get('search/tweets', { q: `from:${user}`, count: 1 }, (err, data) => {
*
* authors[value.author] += 1;
* // Error the stream since this request failed
* if (err) {
* return next(err);
* }
*
* return next(null, authors);
* }, {}));
* // => { 'paul': 4, 'lisa': 12, 'mary': 1 }
* // This is the first iteration of the reduction, so we automatically save the tweet
* if (!memo) {
* return next(null, data);
* }
*
* // This tweet is the most recent so far, save it for later
* if (new Date(data.statuses.created_at) > new Date(memo.statuses.created_at)) {
* return next(null, data);
* }
*
* // The tweet we have saved is still the most recent
* next(null, memo);
* });
* }));
*
* // The resulting stream will contain the most recent tweet of the users in the list.
* // Note: If `next` is called with an error as the first argument, the stream will error.
* // This is typical behavior for node callbacks.
*/
function reduce(reducer, initialValue) {
var accumulator = initialValue;
var cb = makeAsync(reducer, 3);

@@ -60,3 +75,3 @@ return through.obj(

reducer(accumulator, chunk, function (err, result) {
cb(accumulator, chunk, function (err, result) {
accumulator = result;

@@ -63,0 +78,0 @@ next(err);

{
"name": "node-stream",
"version": "1.3.1",
"version": "1.4.0",
"description": "Utilities for consuming, creating and manipulating node streams.",

@@ -5,0 +5,0 @@ "main": "lib/index.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc