New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

object-streaming-tools

Package Overview
Dependencies
Maintainers
1
Versions
13
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

object-streaming-tools

Helper functions to simplify creating and concatenating object streams in NodeJs

  • 1.4.0
  • latest
  • Source
  • npm
  • Socket score

Version published
Maintainers
1
Created
Source

Object Streaming Tools

Helper functions to simplify creating and concatenating object streams in NodeJS

Motivation

Writing NodeJs streams can be quite challenging. While I myself have create snippets in my IDE to make my life easier when creating them, frequently I found myself writing the same code over and over again. Since I am lazy, I don't really like working this way. And frankly, the German in me simply wanted - nay demanded! - more efficient, DRYer code.

Looking for a solution early 2016, I first explored RxJs3 . While I very much appreciated the beauty of that project's approach, it seemed overkill for what I needed. And when I noticed the significant differences between version 3 and the then up-and-coming version 4, I decided to take another path. I also looked at Highland.js, which is very similar in its approach to our goals here, but was not quit there yet, when we started this.

For the enterprise level application I was designing for and working on with a team at Copperleaf Technologies, I cooked up my first few helper tools that we then continued to develop as a team throughout the year.

At version 1.0, this library was at the state we shipped it with that application in May 2017. Copperleaf Technologies has graciously allowed me to take ownership of the project, so here it is.

I hope some of you will find it useful.

Basics


start with anything

const just = require( 'object-streaming-tools/lib/just' );

just( 'Hello World!' )
  .on( 'data', console.log );

// output:
// Hello World!

iterate

const fromArray = require( 'object-streaming-tools/lib/fromArray' );

fromArray( [ 1, 2, 3 ] )
  .on( 'data', console.log );

// output:
// 1
// 2
// 3

iterate with the spread (...) operator

const just = require( 'object-streaming-tools/lib/just' );

just( ...[ 1, 2, 3 ] )
  .on( 'data', console.log );

// output:
// 1
// 2
// 3

streamify non-streams

when using async functions
const just  = require( 'object-streaming-tools/lib/just' );
const apply = require( 'object-streaming-tools/lib/apply' );

function log( s, next ){

  console.log( s );
  this.emit( 'bar', 'THIS IS SPARTA!' ); // 'this' context is the apply stream
  setImmediate( next, null, s );

}

just( 'foo' )
  .pipe( apply( log ) )
  .on( 'bar', console.log )
  .resume();

// output:
// foo
// THIS IS SPARTA!
when using synchronous functions

Note: Just demonstrating here a technique to achieve this using asyncify from the async library, which is also used internally.

const range    = require( 'object-streaming-tools/lib/range' );
const apply    = require( 'object-streaming-tools/lib/apply' );
const asyncify = require( 'async/asyncify' );

range( 1, 3 )
  .pipe( apply( asyncify( console.log ) ) )
  .resume();

// output:
// 1
// 2
// 3

filter

const just     = require( 'object-streaming-tools/lib/just' );
const filter   = require( 'object-streaming-tools/lib/filter' );
const asyncify = require( 'async/asyncify' );

just( ...[ 0, 1, 2, 3 ] )
  .pipe( filter( asyncify( ( x )=>x >= 2 ) ) )
  .on( 'data', console.log );

// output:
// 2
// 3

// Get rejected items
just( ...[ 0, 1, 2, 3 ] )
  .pipe( filter( asyncify( ( x )=>x >= 2 ) ) )
  .on( filter.RejectedEventKey, console.log )
  .resume();

// output:
// 0
// 1


get a range of numbers

const range = require( 'object-streaming-tools/lib/range' );

range( 1, 3 )
  .on( 'data', console.log );

// output:
// 1
// 2
// 3

iterate over object properties

const just  = require( 'object-streaming-tools/lib/just' );
const forIn = require( 'object-streaming-tools/lib/forIn' );

just( { foo: 'bar' } )
  .pipe( forIn() )
  .on( 'data', ( { key, value } )=>console.log( key, value )  );

// output:
// foo bar

group items in a list by a key's values

by using the key's identity
const just     = require( 'object-streaming-tools/lib/just' );
const apply    = require( 'object-streaming-tools/lib/apply' );
const asyncify = require( 'async/asyncify' );
const keyBy    = require( 'object-streaming-tools/lib/keyBy' );

just( ...[ { foo: 'bar' }, { foo: 'baz' } ] )
  .pipe( keyBy( 'foo' ) )
  .pipe( apply( asyncify( JSON.stringify ) ) )
  .pipe( apply( asyncify( console.log ) ) )
  .resume();

// output:
// { "bar": { "foo": "bar" }, "baz": { "foo": "baz" } }
by using a function to calculate the key
const just     = require( 'object-streaming-tools/lib/just' );
const keyBy    = require( 'object-streaming-tools/lib/keyBy' );

just( ...[ { foo: 'bar'  }, { foo: 'baz' } ] )
  .pipe( keyBy( ( { foo } )=>foo ) )
  .on( 'data' , ( result )=>console.log( JSON.stringify( result ) ) );

// output:
// { "bar": { "foo": "bar" }, "baz": { "foo": "baz" } }

Note: Both approaches above demonstrate various techniques to achieve the same result. Neither technique is meant to be prescriptive.


emit values of an object

const just  = require( 'object-streaming-tools/lib/just' );
const values = require( 'object-streaming-tools/lib/forIn' );

just( { foo: 'bar' } )
  .pipe( values() )
  .on( 'data', console.log );

// output:
// bar

switch things up

const range    = require( 'object-streaming-tools/lib/range' );
const switchBy = require( 'object-streaming-tools/lib/switchBy' );
const asyncify = require( 'async/asyncify' );

const lookup = [ 'one', 'three', 'five' ];

const forTrue  = { ifMatches: true,  thenDo: asyncify( x=>lookup[ x ] ) };
const forFalse = { ifMatches: false, thenDo: asyncify( x=>x ) };

range( 1, 5 )
  .pipe( switchBy( asyncify( x=>!!x % 2 ), [ forTrue, forFalse ] ) )
  .on( 'data', console.log );

// output:
// one
// 2
// three
// 4
// five

start from a callback

const fs           = require( 'fs-extra' );
const fromCallback = require( 'object-streaming-tools/lib/fromCallback' );

fromCallback( fs.readJson.bind( null, 'list.json') )
  .pipe( flatten() )
  .on( 'data', console.log;

// output, given the file contents of 'list.json' => [ "foo", "bar", "baz" ]:
// foo bar baz

emit arrays of a specified length

const items = [1, 2, 3, 4, 5, 6];

just(...items)
  .pipe( asLengthLimitedArrays( 4 ) )
  .on( 'data', console.log );

// output:
// [ 1, 2, 3, 4 ]
// [ 5, 6 ]

creates a slice of the stream starting from start index and up to, but not including, end index

end defaults to Infinity
const items = ['val1', 'val2', 'val3', 'val4', 'val5', 'val6'];
const start = 2;
just(...items)
  .pipe( emitRange( start ) )
  .on( 'data', console.log );

// output:
// val3
// val4
// val5
// val6
from start to end
const items = ['val1', 'val2', 'val3', 'val4', 'val5', 'val6'];
const start = 2;
const end = 5;
just(...items)
  .pipe( emitRange( start, end ) )
  .on( 'data', console.log );

// output:
// val3
// val4

emit only unique items in a stream

unique
const items = [1, 2, 3, 4, 4, 5];
just(...items)
  .pipe( unique() )
  .on( 'data', console.log );

// output:
// 1
// 2
// 3
// 4
// 5
uniqueBy
via a string iteratee
const items = [{id: 'foo'}, {id: 'bar'}, {id: 'foo'}];
const attributeName = 'id'
just(...items)
  .pipe( uniqueBy( attributeName ) )
  .on( 'data', console.log );

// output:
// {id: 'foo'}
// {id: 'bar'}
via an iteratee function
const items = [2.1, 1.2, 2.3];
just(...items)
  .pipe( uniqueBy( Math.floor ) )
  .on( 'data', console.log );

// output:
// 2.1
// 1.2

TBC

Keywords

FAQs

Package last updated on 31 Mar 2020

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc