Research
Security News
Threat Actor Exposes Playbook for Exploiting npm to Build Blockchain-Powered Botnets
A threat actor's playbook for exploiting the npm ecosystem was exposed on the dark web, detailing how to build a blockchain-powered botnet.
event-stream
Advanced tools
The event-stream npm package is a toolkit for working with Node.js streams. It provides a collection of tools to create, manipulate, and manage streams in a functional programming style. It is useful for handling data in a streaming fashion, which can be more efficient than loading all data into memory at once.
Map
Applies a function to each data event in a stream and pushes the result downstream.
es.map(function (data, callback) {
callback(null, data.toString().toUpperCase());
})
ReadArray
Creates a readable stream from an array of items.
es.readArray(['one', 'two', 'three'])
WriteArray
Collects all data from a stream and passes it as an array to a callback function.
es.writeArray(function (err, array) {
console.log(array);
})
Duplex
Combines a writable and readable stream into a duplex (readable and writable) stream.
es.duplex(writeStream, readStream)
Pipeline
Pipes streams together and destroys all of them if one of them closes.
es.pipeline(stream1, stream2, stream3)
A tiny wrapper around Node.js streams.Transform that makes it easier to create transform streams. It is similar to event-stream's map feature but focuses specifically on transform streams.
Highland.js manages synchronous and asynchronous code easily, using nothing more than standard JavaScript and Node-like streams. It is more functional and provides more utilities for stream manipulation compared to event-stream.
This module turns a pipeline of streams into a single duplex stream. It is similar to event-stream's pipeline feature but with a different API and error handling approach.
Combines an array of streams into a single duplex stream using pump and duplexify. It is similar to event-stream's duplex and pipeline features but uses pump for better error handling.
Streams are nodes best and most misunderstood idea, and EventStream is a toolkit to make creating and working with streams easy.
Normally, streams are only used of IO,
but in event stream we send all kinds of objects down the pipe.
If your application's input and output are streams,
shouldn't the throughput be a stream too?
The EventStream functions resemble the array functions,
because Streams are like Arrays, but laid out in time, rather than in memory.
All the event-stream
functions return instances of Stream
.
Stream API docs: nodejs.org/api/streams
NOTE: I shall use the term "through stream" to refer to a stream that is writable and readable.
###simple example:
//pretty.js
if(!module.parent) {
var es = require('event-stream')
es.connect( //connect streams together with `pipe`
process.openStdin(), //open stdin
es.split(), //split stream to break on newlines
es.map(function (data, callback) {//turn this async function into a stream
callback(null
, inspect(JSON.parse(data))) //render it nicely
}),
process.stdout // pipe it to stdout !
)
}
run it ...
curl -sS registry.npmjs.org/event-stream | node pretty.js
test are in event-stream_tests
##map (asyncFunction)
Create a through stream from an asyncronous function.
var es = require('event-stream')
es.map(function (data, callback) {
//transform data
// ...
callback(null, data)
})
Each map MUST call the callback. It may callback with data, with an error or with no arguments,
callback()
drop this data.
this makes the map work like filter
,
note:callback(null,null)
is not the same, and will emit null
callback(null, newData)
turn data into newData
callback(error)
emit an error for this item.
Note: if a callback is not called,
map
will think that it is still being processed,
every call must be answered or the stream will not know when to end.Also, if the callback is called more than once, every call but the first will be ignored.
##readable (asyncFunction)
create a readable stream (that respects pause) from an async function.
while the stream is not paused,
the function will be polled with (count, callback)
,
and this
will be the readable stream.
es.readable(function (count, callback) {
if(streamHasEnded)
return this.emit('end')
//...
this.emit('data', data) //use this way to emit multiple chunks per call.
callback() // you MUST always call the callback eventually.
// the function will not be called again until you do this.
})
you can also pass the data and the error to the callback.
you may only call the callback once.
calling the same callback more than once will have no effect.
##readArray (array)
Create a readable stream from an Array.
Just emit each item as a data event, respecting pause
and resume
.
var es = require('event-stream')
, reader = es.readArray([1,2,3])
reader.pipe(...)
create a writeable stream from a callback,
all data
events are stored in an array, which is passed to the callback when the stream ends.
var es = require('event-stream')
, reader = es.readArray([1, 2, 3])
, writer = es.writeArray(function (err, array){
//array deepEqual [1, 2, 3]
})
reader.pipe(writer)
Break up a stream and reassemble it so that each line is a chunk.
Example, read every line in a file ...
es.connect(
fs.createReadStream(file, {flags: 'r'}),
es.split(),
es.map(function (line, cb) {
//do something with the line
cb(null, line)
})
)
Connect multiple Streams together into one stream.
connect
will return a Stream. This stream will write to the first stream,
and will emit data from the last stream.
Listening for 'error' will recieve errors from all streams inside the pipe.
es.connect( //connect streams together with `pipe`
process.openStdin(), //open stdin
es.split(), //split stream to break on newlines
es.map(function (data, callback) {//turn this async function into a stream
callback(null
, inspect(JSON.parse(data))) //render it nicely
}),
process.stdout // pipe it to stdout !
)
If the gate is shut
, buffer the stream.
All calls to write will return false (pause upstream),
and end will not be sent downstream.
If the gate is open, let the stream through.
Named shut
instead of close, because close is already kinda meaningful with streams.
Gate is useful for holding off processing a stream until some resource (i.e. a database, or network connection) is ready.
var gate = es.gate()
gate.open() //allow the gate to stream
gate.close() //buffer the stream, also do not allow 'end'
Takes a writable stream and a readable stream and makes them appear as a readable writable stream.
It is assumed that the two streams are connected to each other in some way.
(This is used by connect
and child
.)
var grep = cp.exec('grep Stream')
es.duplex(grep.stdin, grep.stdout)
Create a through stream from a child process ...
var cp = require('child_process')
es.child(cp.exec('grep Stream')) // a through stream
The arguments to pipable must be functions that return
instances of Stream or async functions.
(If a function is returned, it will be turned into a Stream
with es.map
.)
Here is the first example rewritten to use pipeable
.
//examples/pretty_pipeable.js
var inspect = require('util').inspect
if(!module.parent)
require('event-stream').pipeable(function () {
return function (data, callback) {
try {
data = JSON.parse(data)
} catch (err) {} //pass non JSON straight through!
callback(null, inspect(data))
}
})
})
curl -sS registry.npmjs.org/event-stream | node pipeable_pretty.js
## or, turn the pipe into a server!
node pipeable_pretty.js --port 4646
curl -sS registry.npmjs.org/event-stream | curl -sSNT- localhost:4646
https://github.com/felixge/node-growing-file
stream changes on file that is being appended to. just like tail -f
https://github.com/isaacs/sax-js
streaming xml parser
https://github.com/mikeal/request
make http requests. request() returns a through stream!
https://github.com/TooTallNate/node-throttle
throttle streams on a bytes per second basis (binary streams only, of course)
https://github.com/mikeal/morestreams
buffer input until connected to a pipe.
https://github.com/TooTallNate/node-gzip-stack
compress and decompress raw streams.
https://github.com/Floby/node-json-streams
parse json without buffering it first
https://github.com/floby/node-tokenizer
tokenizer
https://github.com/floby/node-parser
general mechanisms for custom parsers
https://github.com/dodo/node-bufferstream
buffer streams until you say (written in C)
https://github.com/tim-smart/node-filter
filter
pipeable string.replace
https://github.com/fictorial/json-line-protocol/issues/1
line reader
https://github.com/jahewson/node-byline/issues/1
line reader
https://github.com/AvianFlu/ntwitter/issues/3
twitter client
https://github.com/swdyh/node-chirpstream/issues/1
twitter client
https://github.com/polotek/evented-twitter/issues/22
twitter client
FAQs
construct pipes of streams of events
We found that event-stream demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
A threat actor's playbook for exploiting the npm ecosystem was exposed on the dark web, detailing how to build a blockchain-powered botnet.
Security News
NVD’s backlog surpasses 20,000 CVEs as analysis slows and NIST announces new system updates to address ongoing delays.
Security News
Research
A malicious npm package disguised as a WhatsApp client is exploiting authentication flows with a remote kill switch to exfiltrate data and destroy files.