Security News
New Python Packaging Proposal Aims to Solve Phantom Dependency Problem with SBOMs
PEP 770 proposes adding SBOM support to Python packages to improve transparency and catch hidden non-Python dependencies that security tools often miss.
readable-stream
Advanced tools
The readable-stream package is a userland stream module, compatible with the built-in stream module provided by Node.js. It offers the same interface and functionality as the native module, but with additional updates and bug fixes. It is particularly useful for ensuring consistent stream behavior across different Node.js versions.
Creating a readable stream
This feature allows you to create a readable stream that you can pipe to other streams or consume manually. The 'read' method is called when the stream wants to pull more data.
const { Readable } = require('readable-stream');
const myReadableStream = new Readable({
read(size) {
this.push('some data');
this.push(null); // No more data
}
});
myReadableStream.on('data', (chunk) => {
console.log(chunk.toString());
});
Creating a writable stream
This feature allows you to create a writable stream where you can write data. The 'write' method is called when the stream receives data to write.
const { Writable } = require('readable-stream');
const myWritableStream = new Writable({
write(chunk, encoding, callback) {
process.stdout.write(chunk);
callback();
}
});
process.stdin.pipe(myWritableStream);
Creating a transform stream
This feature allows you to create a transform stream that can modify data as it is read from a readable stream before it is written to a writable stream.
const { Transform } = require('readable-stream');
const myTransformStream = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(myTransformStream).pipe(process.stdout);
Creating a duplex stream
This feature allows you to create a duplex stream that is both readable and writable. It can be used to read data from one source and write to another.
const { Duplex } = require('readable-stream');
const myDuplexStream = new Duplex({
read(size) {
this.push('data from read method');
this.push(null);
},
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});
myDuplexStream.on('data', (chunk) => {
console.log(chunk.toString());
});
myDuplexStream.write('data for write method');
Through2 is a tiny wrapper around Node.js streams.Transform that makes it easier to create transform streams. It is similar to readable-stream's Transform, but with a simpler API for most common use cases.
Highland.js manages synchronous and asynchronous code easily, using nothing more than standard JavaScript and Node-like streams. It is more functional in nature compared to readable-stream and provides a higher level abstraction for handling streams.
Stream-browserify is a browser-compatible version of Node.js' core stream module, similar to readable-stream. It allows the use of Node.js-style streams in the browser, but it is specifically designed to polyfill the native Node.js stream module for browser use.
Buffer List (bl) is a storage object for collections of Node Buffers, which can be used with streams. Unlike readable-stream, it focuses on buffering and manipulating binary data rather than providing the stream API itself.
Stability: 1 - Experimental
A new kind of readable streams for Node.js
This is an abstract class designed to be extended. It also provides a
wrap
method that you can use to provide the simpler readable API for
streams that have the "readable stream" interface of Node 0.8 and
before.
Note that Duplex, Transform, Writable, and PassThrough streams are also provided as base classes. See the full API details below.
Writable streams in node are relatively straightforward to use and
extend. The write
method either returns false
if you would like
the user to back off a bit, in which case a drain
event at some
point in the future will let them continue writing, or anything other
than false if the bytes could be completely handled and another
write
should be performed, or The end()
method lets the user
indicate that no more bytes will be written. That's pretty much the
entire required interface for writing.
However, readable streams in Node 0.8 and before are rather complicated.
data
events start coming right away, no matter what. There
is no way to do other actions before consuming data, without
handling buffering yourself.pause()
and resume()
methods, and take care of
buffering yourself.pause()
was purely advisory, so even while
paused, you still have to be careful that you might get some
data. This caused a lot of subtle b ugs.So, while writers only have to implement write()
, end()
, and
drain
, readers have to implement (at minimum):
pause()
methodresume()
methoddata
eventend
eventAnd read consumers had to always be prepared for their backpressure advice to simply be ignored.
If you are using a readable stream, and want to just get the first 10 bytes, make a decision, and then pass the rest off to somewhere else, then you have to handle buffering, pausing, slicing, and so on. This is all rather brittle and easy to get wrong for all but the most trivial use cases.
Additionally, this all made the reader.pipe(writer)
method
unnecessarily complicated and difficult to extend without breaking
something. Backpressure and error handling is especially challenging
and brittle.
The reader does not have pause/resume methods. If you want to consume
the bytes, you call read()
. If bytes are not being consumed, then
effectively the stream is in a paused state. It exerts backpressure
on upstream connections, doesn't read from files, etc. Any data that
was already in the process of being read will be placed in a buffer.
If read()
returns null
, then a future readable
event will be
fired when there are more bytes ready to be consumed.
This is simpler and conceptually closer to the underlying mechanisms.
The resulting pipe()
method is much shorter and simpler. The
problems of data events happening while paused are alleviated.
It's not particularly difficult to wrap old-style streams in this new interface, or to wrap this type of stream in the old-style interface.
The Readable
class provides a wrap(oldStream)
method that takes an
argument which is an old-style stream with data
events and pause()
and resume()
methods, and uses that as the data source. For
example:
var r = new Readable();
r.wrap(oldReadableStream);
// now you can use r.read(), and it will emit 'readable' events
// but the data is based on whatever oldReadableStream spits out of
// its 'data' events.
In order to work with programs that use the old interface, some magic is unfortunately required. At some point in the future, this magic will be removed.
The Readable
class will automatically convert into an old-style
data
-emitting stream if any listeners are added to the data
event.
So, this works fine, though you of course lose a lot of the benefits of
the new interface:
var r = new ReadableThing();
r.on('data', function(chunk) {
// ...
// magic is happening! oh no! the animals are walking upright!
// the brooms are sweeping the floors all by themselves!
});
// this will also turn on magic-mode:
r.pause();
// now pause, resume, etc. are patched into place, and r will
// continually call read() until it returns null, emitting the
// returned chunks in 'data' events.
r.on('end', function() {
// ...
});
A base class for implementing Readable streams. Override the
_read(n,cb)
method to fetch data asynchronously and take advantage
of the buffering built into the Readable class.
Extend the Readable class, and provide a _read(n,cb)
implementation
method.
var Readable = require('readable-stream');
var util = require('util');
util.inherits(MyReadable, Readable);
function MyReadable(options) {
Readable.call(this, options);
}
MyReadable.prototype._read = function(n, cb) {
// your magic goes here.
// call the cb at some time in the future with either up to n bytes,
// or an error, like cb(err, resultData)
//
// The code in the Readable class will call this to keep an internal
// buffer at a healthy level, as the user calls var chunk=stream.read(n)
// to consume chunks.
};
var r = new MyReadable();
r.on('end', function() {
// no more bytes will be provided.
});
r.on('readable', function() {
// now is the time to call read() again.
});
// to get some bytes out of it:
var data = r.read(optionalLengthArgument);
// now data is either null, or a buffer of optionalLengthArgument
// length. If you don't provide an argument, then it returns whatever
// it has.
// typically you'd just r.pipe() into some writable stream, but you
// can of course do stuff like this, as well:
function flow() {
var chunk;
while (null !== (chunk = r.read())) {
doSomethingWithData(chunk);
}
r.once('readable', flow);
}
flow();
options
{Object}
lowWaterMark
{Number} The minimum number of bytes before the
stream is considered 'readable'. Default = 0
bufferSize
{Number} The number of bytes to try to read from the
underlying _read
function. Default = 16 * 1024
Make sure to call the Readable
constructor in your extension
classes, or else the stream will not be properly initialized.
size
{Number} Optional number of bytes to read. If not provided,
then return however many bytes are available.Pulls the requested number of bytes out of the internal buffer. If
that many bytes are not available, then it returns null
.
size
{Number} Number of bytes to read from the underlying
asynchronous data source.callback
{Function} Callback function
error
{Error Object}data
{Buffer | null}Note: This function is not implemented in the Readable base class.
Rather, it is up to you to implement _read
in your extension
classes.
_read
should fetch the specified number of bytes, and call the
provided callback with cb(error, data)
, where error
is any error
encountered, and data
is the returned data.
This method is prefixed with an underscore because it is internal to the class that defines it, and should not be called directly by user programs. However, you are expected to override this method in your own extension classes.
The size
argument is purely advisory. You may call the callback
with more or fewer bytes. However, if you call the callback with
null
, or an empty buffer, then it will assume that the end of the
data was reached.
destination
{Writable Stream object}Continually read()
data out of the readable stream, and write()
it
into the writable stream. When the writable.write(chunk)
call
returns false
, then it will back off until the next drain
event,
to do backpressure.
Piping to multiple destinations is supported. The slowest destination
stream will limit the speed of the pipe()
flow.
Note that this puts the readable stream into a state where not very
much can be done with it. You can no longer read()
from the stream
in other code, without upsetting the pipe() process. However, since
multiple pipe destinations are supported, you can always create a
PassThrough
stream, and pipe the reader to that. For example:
var r = new ReadableWhatever();
var pt = new PassThrough();
r.pipe(someWritableThing);
r.pipe(pt);
// now I can call pt.read() to my heart's content.
// note that if I *don't* call pt.read(), then it'll back up and
// prevent the pipe() from flowing!
destination
{Writable Stream object} OptionalRemove the provided destination
stream from the pipe flow. If no
argument is provided, then it will unhook all piped destinations.
An event that signals more data is now available to be read from the
stream. Emitted when more data arrives, after previously calling
read()
and getting a null result.
An event that signals that no more data will ever be available on this stream. It's over.
An object that tracks the state of the stream. Buffer information, whether or not it has reached the end of the underlying data source, etc., are all tracked on this object.
You are strongly encouraged not to modify this in any way, but it is often useful to read from.
A base class for creating Writable streams. Similar to Readable, you
can create child classes by overriding the asynchronous
_write(chunk,cb)
method, and it will take care of buffering,
backpressure, and so on.
options
{Object}
highWaterMark
{Number} The number of bytes to store up before it
starts returning false
from write() calls. Default = 16 * 1024
lowWaterMark
{Number} The number of bytes that the buffer must
get down to before it emits drain
. Default = 1024
Make sure to call the Writable
constructor in your extension
classes, or else the stream will not be properly initialized.
chunk
{Buffer | String}encoding
{String} An encoding argument to turn the string chunk
into a buffer. Only relevant if chunk
is a string.
Default = 'utf8'
.false
if you should not write until the next drain
event, or some other value otherwise.The basic write function.
chunk
{Buffer}callback
{Function}
error
{Error | null} Call with an error object as the first
argument to indicate that the write() failed for unfixable
reasons.Note: This function is not implemented in the Writable base class.
Rather, it is up to you to implement _write
in your extension
classes.
_write
should do whatever has to be done in this specific Writable
class, to handle the bytes being written. Write to a file, send along
a socket, encrypt as an mp3, whatever needs to be done. Do your I/O
asynchronously, and call the callback when it's complete.
This method is prefixed with an underscore because it is internal to the class that defines it, and should not be called directly by user programs. However, you are expected to override this method in your own extension classes.
chunk
{Buffer | String}encoding
{String}If a chunk (and, optionally, an encoding) are provided, then that
chunk is first passed to this.write(chunk, encoding)
.
This method is a way to signal to the writable stream that you will not be writing any more data. It should be called exactly once for every writable stream.
Calling write()
after calling end()
will trigger an error.
Emitted when calling source.pipe(writable)
. See above for the
description of the readable.pipe()
method.
Emitted when calling source.unpipe(writable)
. See above for the
description of the readable.unpipe()
method.
If a call to writable.write()
returns false, then at some point in
the future, this event will tell you to start writing again.
When the stream has been ended, and all the data in its internal
buffer has been consumed, then it emits a finish
event to let you
know that it's completely done.
This is particularly handy if you want to know when it is safe to shut down a socket or close a file descriptor. At this time, the writable stream may be safely disposed. Its mission in life has been accomplished.
A base class for Duplex streams (ie, streams that are both readable and writable).
Since JS doesn't have multiple prototypal inheritance, this class
prototypally inherits from Readable, and then parasitically from
Writable. It is thus up to the user to implement both the lowlevel
_read(n,cb)
method as well as the lowlevel _write(chunk,cb)
method on extension duplex classes.
For cases where the written data is transformed into the output, it
may be simpler to use the Transform
class instead.
options
{Object} Passed to both the Writable and Readable
constructors.Make sure to call the Duplex
constructor in your extension
classes, or else the stream will not be properly initialized.
If options.allowHalfOpen
is set to the value false
, then the
stream will automatically end the readable side when the writable
side ends, and vice versa.
true
Set this flag to either true
or false
to determine whether or not
to automatically close the writable side when the readable side ends,
and vice versa.
A duplex (ie, both readable and writable) stream that is designed to make it easy to implement transform operations such as encryption, decryption, compression, and so on.
Transform streams are instanceof
Readable, but they have all of the
methods and properties of both Readable and Writable streams. See
above for the list of events and methods that Transform inherits from
Writable and Readable.
Override the _transform(chunk, outputFunction, callback)
method in
your implementation classes to take advantage of it.
options
{Object} Passed to both the Writable and Readable
constructors.Make sure to call the Transform
constructor in your extension
classes, or else the stream will not be properly initialized.
chunk
{Buffer} The chunk to be transformed.outputFn
{Function} Call this function with any output data to be
passed to the readable interface.callback
{Function} Call this function (optionally with an error
argument) when you are done processing the supplied chunk.Note: This function is not implemented in the Transform base class.
Rather, it is up to you to implement _transform
in your extension
classes.
_transform
should do whatever has to be done in this specific
Transform class, to handle the bytes being written, and pass them off
to the readable portion of the interface. Do asynchronous I/O,
process things, and so on.
Call the callback function only when the current chunk is completely
consumed. Note that this may mean that you call the outputFn
zero
or more times, depending on how much data you want to output as a
result of this chunk.
This method is prefixed with an underscore because it is internal to the class that defines it, and should not be called directly by user programs. However, you are expected to override this method in your own extension classes.
outputFn
{Function} Call this function with any output data to be
passed to the readable interface.callback
{Function} Call this function (optionally with an error
argument) when you are done flushing any remaining data.Note: This function is not implemented in the Transform base class.
Rather, it is up to you to implement _flush
in your extension
classes optionally, if it applies to your use case.
In some cases, your transform operation may need to emit a bit more
data at the end of the stream. For example, a Zlib
compression
stream will store up some internal state so that it can optimally
compress the output. At the end, however, it needs to do the best it
can with what is left, so that the data will be complete.
In those cases, you can implement a _flush
method, which will be
called at the very end, after all the written data is consumed, but
before emitting end
to signal the end of the readable side. Just
like with _transform
, call outputFn
zero or more times, as
appropriate, and call callback
when the flush operation is complete.
This method is prefixed with an underscore because it is internal to the class that defines it, and should not be called directly by user programs. However, you are expected to override this method in your own extension classes.
This is a trivial implementation of a Transform
stream that simply
passes the input bytes across to the output. Its purpose is mainly
for examples and testing, but there are occasionally use cases where
it can come in handy.
FAQs
Node.js Streams, a user-land copy of the stream library from Node.js
The npm package readable-stream receives a total of 106,430,549 weekly downloads. As such, readable-stream popularity was classified as popular.
We found that readable-stream demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 3 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
PEP 770 proposes adding SBOM support to Python packages to improve transparency and catch hidden non-Python dependencies that security tools often miss.
Security News
Socket CEO Feross Aboukhadijeh discusses open source security challenges, including zero-day attacks and supply chain risks, on the Cyber Security Council podcast.
Security News
Research
Socket researchers uncover how threat actors weaponize Out-of-Band Application Security Testing (OAST) techniques across the npm, PyPI, and RubyGems ecosystems to exfiltrate sensitive data.