EventStream
<img src=https://secure.travis-ci.org/dominictarr/event-stream.png?branch=master>
[]
(http://ci.testling.com/dominictarr/event-stream)
Streams are node's best and most misunderstood idea, and
EventStream is a toolkit to make creating and working with streams easy.
Normally, streams are only used of IO,
but in event stream we send all kinds of objects down the pipe.
If your application's input and output are streams,
shouldn't the throughput be a stream too?
The EventStream functions resemble the array functions,
because Streams are like Arrays, but laid out in time, rather than in memory.
All the event-stream
functions return instances of Stream
.
event-stream
creates
0.8 streams
, which are compatible with 0.10 streams
NOTE: I shall use the term "through stream" to refer to a stream that is writable and readable.
###simple example:
if(!module.parent) {
var es = require('event-stream')
es.pipeline(
process.openStdin(),
es.split(),
es.map(function (data, callback) {
callback(null
, inspect(JSON.parse(data)))
}),
process.stdout
)
}
run it ...
curl -sS registry.npmjs.org/event-stream | node pretty.js
node Stream documentation
through (write?, end?)
Reemits data synchronously. Easy way to create syncronous through streams.
Pass in an optional write
and end
methods. They will be called in the
context of the stream. Use this.pause()
and this.resume()
to manage flow.
Check this.paused
to see current flow state. (write always returns !this.paused
)
this function is the basis for most of the syncronous streams in event-stream
.
es.through(function write(data) {
this.emit('data', data)
},
function end () {
this.emit('end')
})
##map (asyncFunction)
Create a through stream from an asyncronous function.
var es = require('event-stream')
es.map(function (data, callback) {
callback(null, data)
})
Each map MUST call the callback. It may callback with data, with an error or with no arguments,
-
callback()
drop this data.
this makes the map work like filter
,
note:callback(null,null)
is not the same, and will emit null
-
callback(null, newData)
turn data into newData
-
callback(error)
emit an error for this item.
Note: if a callback is not called, map
will think that it is still being processed,
every call must be answered or the stream will not know when to end.
Also, if the callback is called more than once, every call but the first will be ignored.
mapSync (syncFunction)
Same as map
, but the callback is called synchronously. Based on es.through
split (matcher)
Break up a stream and reassemble it so that each line is a chunk. matcher may be a String
, or a RegExp
Example, read every line in a file ...
es.pipeline(
fs.createReadStream(file, {flags: 'r'}),
es.split(),
es.map(function (line, cb) {
cb(null, line)
})
)
split
takes the same arguments as string.split
except it defaults to '\n' instead of ',', and the optional limit
paremeter is ignored.
String#split
join (separator)
create a through stream that emits separator
between each chunk, just like Array#join.
(for legacy reasons, if you pass a callback instead of a string, join is a synonym for es.wait
)
replace (from, to)
Replace all occurences of from
with to
. from
may be a String
or a RegExp
.
Works just like string.split(from).join(to)
, but streaming.
parse
Convienience function for parsing JSON chunks. For newline separated JSON,
use with es.split
fs.createReadStream(filename)
.pipe(es.split())
.pipe(es.parse())
stringify
convert javascript objects into lines of text. The text will have whitespace escaped and have a \n
appended, so it will be compatible with es.parse
objectStream
.pipe(es.stringify())
.pipe(fs.createWriteStream(filename))
##readable (asyncFunction)
create a readable stream (that respects pause) from an async function.
while the stream is not paused,
the function will be polled with (count, callback)
,
and this
will be the readable stream.
es.readable(function (count, callback) {
if(streamHasEnded)
return this.emit('end')
this.emit('data', data)
callback()
})
you can also pass the data and the error to the callback.
you may only call the callback once.
calling the same callback more than once will have no effect.
##readArray (array)
Create a readable stream from an Array.
Just emit each item as a data event, respecting pause
and resume
.
var es = require('event-stream')
, reader = es.readArray([1,2,3])
reader.pipe(...)
writeArray (callback)
create a writeable stream from a callback,
all data
events are stored in an array, which is passed to the callback when the stream ends.
var es = require('event-stream')
, reader = es.readArray([1, 2, 3])
, writer = es.writeArray(function (err, array){
})
reader.pipe(writer)
pipeline (stream1,...,streamN)
Turn a pipeline into a single stream. pipeline
returns a stream that writes to the first stream
and reads from the last stream.
Listening for 'error' will recieve errors from all streams inside the pipe.
connect
is an alias for pipeline
.
es.pipeline(
process.openStdin(),
es.split(),
es.map(function (data, callback) {
callback(null
, inspect(JSON.parse(data)))
}),
process.stdout
)
pause ()
A stream that buffers all chunks when paused.
var ps = es.pause()
ps.pause()
ps.resume()
duplex (writeStream, readStream)
Takes a writable stream and a readable stream and makes them appear as a readable writable stream.
It is assumed that the two streams are connected to each other in some way.
(This is used by pipeline
and child
.)
var grep = cp.exec('grep Stream')
es.duplex(grep.stdin, grep.stdout)
child (child_process)
Create a through stream from a child process ...
var cp = require('child_process')
es.child(cp.exec('grep Stream'))
wait (callback)
waits for stream to emit 'end'.
joins chunks of a stream into a single string.
takes an optional callback, which will be passed the
complete string when it receives the 'end' event.
also, emits a single 'data' event.
readStream.pipe(es.wait(function (err, text) {
}))
Other Stream Modules
These modules are not included as a part of EventStream but may be
useful when working with streams.
Like Array.prototype.reduce
but for streams. Given a sync reduce
function and an initial value it will return a through stream that emits
a single data event with the reduced value once the input stream ends.
var reduce = require("stream-reduce");
process.stdin.pipe(reduce(function(acc, data) {
return acc + data.length;
}, 0)).on("data", function(length) {
console.log("stdin size:", length);
});