Scramjet core
This is the minimal, dependency free version of scramjet
used as of Scramjet
version 3.0.0 as a base for scramjet
and scramjet plugins.
Unless you are sure, you should be better off with using the main repo and module.
It is built upon the logic behind three well known javascript array operations - namingly map, filter and reduce. This
means that if you've ever performed operations on an Array in JavaScript - you already know Scramjet like the back of
your hand.
Usage
Scramjet uses functional programming to run transformations on your data streams in a fashion very similar to the well
known event-stream node module. Most transformations are done by passing a transform function.
It's all about chaining, really - you develop your flow based on a chain of calls that return another method like this:
scramjet.from(someReadableStream)
.map(someMapper)
.map(someAsyncAPICall)
.filter(asynchronousFilterOperation)
.catch(errorHandler)
.until(doneCondition)
.toArray();
You can write your transforms in three ways:
- Synchronous
Example: a simple stream transform that outputs a stream of objects of the same id property and the length of the value string.
datastream.map(
(item) => ({id: item.id, length: item.value.length})
)
- Asynchronous using ES2015 async await
Example: A simple stream that uses Fetch API to get all the contents of all entries in the stream
datastream.map(
async (item) => fetch(item)
)
- Asynchronous using Promises
Example: A simple stream that fetches an url mentioned in the incoming object
datastream.map(
(item) => new Promise((resolve, reject) => {
request(item.url, (err, res, data) => {
if (err)
reject(err);
else
resolve(data);
});
})
)
The actual logic of this transform function is as if you passed your function to the then
method of a Promise
resolved with the data from the input stream.
API Docs
Here's the list of the exposed classes and methods, please review the specific documentation for details:
Note that:
- Most of the methods take a callback argument that operates on the stream items.
- The callback, unless it's stated otherwise, will receive an argument with the next chunk.
- You can use
async
functions or return Promise
s wherever you like. - Methods usually return the same class, so are chainable
↺
or are asynchronous ⇄
The quick reference of the exposed classes:
:BufferStream
A facilitation stream created for easy splitting or parsing buffers.
Useful for working on built-in Node.js streams from files, parsing binary formats etc.
A simple use case would be:
fs.createReadStream('pixels.rgba')
.pipe(new BufferStream)
.breakup(4)
.parse(buffer => [
buffer.readInt8(0),
buffer.readInt8(1),
buffer.readInt8(2),
buffer.readInt8(3)
]);
Detailed :BufferStream docs here
Most popular methods:
:DataStream
DataStream is the primary stream type for Scramjet. When you parse your
stream, just pipe it you can then perform calculations on the data objects
streamed through your flow.
Use as:
const { DataStream } = require('scramjet');
await (DataStream.from(aStream)
.map(findInFiles)
.map(sendToAPI)
.run());
Detailed :DataStream docs here
Most popular methods:
new DataStream([opts])
- Create the DataStream.dataStream.map(func, [ClassType]) ↺
- Transforms stream objects into new ones, just like Array.prototype.mapdataStream.filter(func) ↺
- Filters object based on the function outcome, just like Array.prototype.filter.dataStream.reduce(func, into) ⇄
- Reduces the stream into a given accumulatordataStream.do(func) ↺
- Perform an asynchronous operation without changing or resuming the stream.dataStream.all(functions) ↺
- Processes a number of functions in parallel, returns a stream of arrays of results.dataStream.race(functions) ↺
- Processes a number of functions in parallel, returns the first resolved.dataStream.unorder(func)
- Allows processing items without keeping orderdataStream.into(func, into) ↺
- Allows own implementation of stream chaining.dataStream.use(func) ↺
- Calls the passed method in place with the stream as first argument, returns result.dataStream.run() ⇄
- Consumes all stream items doing nothing. Resolves when the stream is ended.dataStream.tap() ↺
- Stops merging transform Functions at the current place in the command chain.dataStream.whenRead() ⇄
- Reads a chunk from the stream and resolves the promise when read.dataStream.whenWrote(chunk) ⇄
- Writes a chunk to the stream and returns a Promise resolved when more chunks can be written.dataStream.whenEnd() ⇄
- Resolves when stream ends - rejects on uncaught errordataStream.whenDrained() ⇄
- Returns a promise that resolves when the stream is draineddataStream.whenError() ⇄
- Returns a promise that resolves (!) when the stream is errorsdataStream.setOptions(options) ↺
- Allows resetting stream options.dataStream.copy(func) ↺
- Returns a copy of the streamdataStream.tee(func) ↺
- Duplicate the streamdataStream.each(func) ↺
- Performs an operation on every chunk, without changing the streamdataStream.while(func) ↺
- Reads the stream while the function outcome is truthy.dataStream.until(func) ↺
- Reads the stream until the function outcome is truthy.dataStream.catch(callback) ↺
- Provides a way to catch errors in chained streams.dataStream.raise(err) ⇄
- Executes all error handlers and if none resolves, then emits an error.dataStream.bufferify(serializer) : BufferStream ↺
- Creates a BufferStream.dataStream.stringify([serializer]) : StringStream ↺
- Creates a StringStream.dataStream.toArray([initial]) : Array.<any> ⇄
- Aggregates the stream into a single ArraydataStream.toGenerator() : Generator.<Promise.<any>>
- Returns an async generatordataStream.toBufferStream(serializer) : BufferStream ↺
- Creates a BufferStream.dataStream.toStringStream([serializer]) : StringStream ↺
- Creates a StringStream.dataStream.toBufferStream(serializer) : BufferStream ↺
- Creates a BufferStream.dataStream.toStringStream([serializer]) : StringStream ↺
- Creates a StringStream.DataStream:from(input, [options]) : DataStream
- Returns a DataStream from pretty much anything sensibly possible.DataStream:pipeline(readable) : DataStream
- Creates a pipeline of streams and returns a scramjet stream.DataStream:fromArray(array, [options]) : DataStream
- Create a DataStream from an ArrayDataStream:fromIterator(iterator, [options]) : DataStream
- Create a DataStream from an Iterator
:MultiStream
An object consisting of multiple streams than can be refined or muxed.
The idea behind a MultiStream is being able to mux and demux streams when needed.
Usage:
new MultiStream([...streams])
.mux();
new MultiStream(function*(){ yield* streams; })
.map(stream => stream.filter(myFilter))
.mux();
Detailed :MultiStream docs here
Most popular methods:
:StringStream
A stream of string objects for further transformation on top of DataStream.
Example:
StringStream.from(async () => (await fetch('https://example.com/data/article.txt')).text())
.lines()
.append("\r\n")
.pipe(fs.createWriteStream('./path/to/file.txt'))
Detailed :StringStream docs here
Most popular methods:
CLI
Check out the command line interface for simplified scramjet usage with scramjet-cli
$ sjr -i http://datasource.org/file.csv ./transform-module-1 ./transform-module-1 | gzip > logs.gz
License and contributions
As of version 2.0 Scramjet is MIT Licensed.
Help wanted
The project need's your help! There's lots of work to do - transforming and muxing, joining and splitting, browserifying, modularizing, documenting and issuing those issues.
If you want to help and be part of the Scramjet team, please reach out to me, scramjetorg on Github or email us: opensource@scramjet.org.