Socket
Socket
Sign inDemoInstall

event-stream

Package Overview
Dependencies
7
Maintainers
1
Versions
84
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

    event-stream

construct pipes of streams of events


Version published
Weekly downloads
3.9M
increased by3.43%
Maintainers
1
Created
Weekly downloads
 

Package description

What is event-stream?

The event-stream npm package is a toolkit for working with Node.js streams. It provides a collection of tools to create, manipulate, and manage streams in a functional programming style. It is useful for handling data in a streaming fashion, which can be more efficient than loading all data into memory at once.

What are event-stream's main functionalities?

Map

Applies a function to each data event in a stream and pushes the result downstream.

es.map(function (data, callback) {
  callback(null, data.toString().toUpperCase());
})

ReadArray

Creates a readable stream from an array of items.

es.readArray(['one', 'two', 'three'])

WriteArray

Collects all data from a stream and passes it as an array to a callback function.

es.writeArray(function (err, array) {
  console.log(array);
})

Duplex

Combines a writable and readable stream into a duplex (readable and writable) stream.

es.duplex(writeStream, readStream)

Pipeline

Pipes streams together and destroys all of them if one of them closes.

es.pipeline(stream1, stream2, stream3)

Other packages similar to event-stream

Readme

Source

EventStream

<img src=https://secure.travis-ci.org/dominictarr/event-stream.png?branch=master>

[browser status] (http://ci.testling.com/dominictarr/event-stream)

Streams are node's best and most misunderstood idea, and EventStream is a toolkit to make creating and working with streams easy.

Normally, streams are only used of IO,
but in event stream we send all kinds of objects down the pipe.
If your application's input and output are streams,
shouldn't the throughput be a stream too?

The EventStream functions resemble the array functions,
because Streams are like Arrays, but laid out in time, rather than in memory.

All the event-stream functions return instances of Stream.

event-stream creates 0.8 streams , which are compatible with 0.10 streams

NOTE: I shall use the term "through stream" to refer to a stream that is writable and readable.

###simple example:


//pretty.js

if(!module.parent) {
  var es = require('event-stream')
  es.pipeline(                         //connect streams together with `pipe`
    process.openStdin(),              //open stdin
    es.split(),                       //split stream to break on newlines
    es.map(function (data, callback) {//turn this async function into a stream
      callback(null
        , inspect(JSON.parse(data)))  //render it nicely
    }),
    process.stdout                    // pipe it to stdout !
    )
  }

run it ...

curl -sS registry.npmjs.org/event-stream | node pretty.js

node Stream documentation

through (write?, end?)

Reemits data synchronously. Easy way to create syncronous through streams. Pass in an optional write and end methods. They will be called in the context of the stream. Use this.pause() and this.resume() to manage flow. Check this.paused to see current flow state. (write always returns !this.paused)

this function is the basis for most of the syncronous streams in event-stream.


es.through(function write(data) {
    this.emit('data', data)
    //this.pause() 
  },
  function end () { //optional
    this.emit('end')
  })

##map (asyncFunction)

Create a through stream from an asyncronous function.

var es = require('event-stream')

es.map(function (data, callback) {
  //transform data
  // ...
  callback(null, data)
})

Each map MUST call the callback. It may callback with data, with an error or with no arguments,

  • callback() drop this data.
    this makes the map work like filter,
    note:callback(null,null) is not the same, and will emit null

  • callback(null, newData) turn data into newData

  • callback(error) emit an error for this item.

Note: if a callback is not called, map will think that it is still being processed,
every call must be answered or the stream will not know when to end.

Also, if the callback is called more than once, every call but the first will be ignored.

mapSync (syncFunction)

Same as map, but the callback is called synchronously. Based on es.through

split (matcher)

Break up a stream and reassemble it so that each line is a chunk. matcher may be a String, or a RegExp

Example, read every line in a file ...

  es.pipeline(
    fs.createReadStream(file, {flags: 'r'}),
    es.split(),
    es.map(function (line, cb) {
       //do something with the line 
       cb(null, line)
    })
  )

split takes the same arguments as string.split except it defaults to '\n' instead of ',', and the optional limit paremeter is ignored. String#split

join (separator)

create a through stream that emits separator between each chunk, just like Array#join.

(for legacy reasons, if you pass a callback instead of a string, join is a synonym for es.wait)

replace (from, to)

Replace all occurences of from with to. from may be a String or a RegExp.
Works just like string.split(from).join(to), but streaming.

parse

Convienience function for parsing JSON chunks. For newline separated JSON, use with es.split

fs.createReadStream(filename)
  .pipe(es.split()) //defaults to lines.
  .pipe(es.parse())

stringify

convert javascript objects into lines of text. The text will have whitespace escaped and have a \n appended, so it will be compatible with es.parse

objectStream
  .pipe(es.stringify())
  .pipe(fs.createWriteStream(filename))

##readable (asyncFunction)

create a readable stream (that respects pause) from an async function.
while the stream is not paused,
the function will be polled with (count, callback),
and this will be the readable stream.


es.readable(function (count, callback) {
  if(streamHasEnded)
    return this.emit('end')
  
  //...
  
  this.emit('data', data) //use this way to emit multiple chunks per call.
      
  callback() // you MUST always call the callback eventually.
             // the function will not be called again until you do this.
})

you can also pass the data and the error to the callback.
you may only call the callback once.
calling the same callback more than once will have no effect.

##readArray (array)

Create a readable stream from an Array.

Just emit each item as a data event, respecting pause and resume.

  var es = require('event-stream')
    , reader = es.readArray([1,2,3])

  reader.pipe(...)

writeArray (callback)

create a writeable stream from a callback,
all data events are stored in an array, which is passed to the callback when the stream ends.

  var es = require('event-stream')
    , reader = es.readArray([1, 2, 3])
    , writer = es.writeArray(function (err, array){
      //array deepEqual [1, 2, 3]
    })

  reader.pipe(writer)

pipeline (stream1,...,streamN)

Turn a pipeline into a single stream. pipeline returns a stream that writes to the first stream and reads from the last stream.

Listening for 'error' will recieve errors from all streams inside the pipe.

connect is an alias for pipeline.


  es.pipeline(                         //connect streams together with `pipe`
    process.openStdin(),              //open stdin
    es.split(),                       //split stream to break on newlines
    es.map(function (data, callback) {//turn this async function into a stream
      callback(null
        , inspect(JSON.parse(data)))  //render it nicely
    }),
    process.stdout                    // pipe it to stdout !
    )

pause ()

A stream that buffers all chunks when paused.

  var ps = es.pause()
  ps.pause() //buffer the stream, also do not allow 'end' 
  ps.resume() //allow chunks through

duplex (writeStream, readStream)

Takes a writable stream and a readable stream and makes them appear as a readable writable stream.

It is assumed that the two streams are connected to each other in some way.

(This is used by pipeline and child.)

  var grep = cp.exec('grep Stream')

  es.duplex(grep.stdin, grep.stdout)

child (child_process)

Create a through stream from a child process ...

  var cp = require('child_process')

  es.child(cp.exec('grep Stream')) // a through stream

wait (callback)

waits for stream to emit 'end'. joins chunks of a stream into a single string. takes an optional callback, which will be passed the complete string when it receives the 'end' event.

also, emits a single 'data' event.


readStream.pipe(es.wait(function (err, text) {
  // have complete text here.
}))

Other Stream Modules

These modules are not included as a part of EventStream but may be useful when working with streams.

reduce (syncFunction, initial)

Like Array.prototype.reduce but for streams. Given a sync reduce function and an initial value it will return a through stream that emits a single data event with the reduced value once the input stream ends.

var reduce = require("stream-reduce");
process.stdin.pipe(reduce(function(acc, data) {
  return acc + data.length;
}, 0)).on("data", function(length) {
  console.log("stdin size:", length);
});

FAQs

Last updated on 11 Dec 2013

Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc