![Scramjet Logo](https://signicode.com/scramjet-logo-light.svg)
Version 4
![Known Vulnerabilities](https://snyk.io/test/github/signicode/scramjet/badge.svg)
What does it do?
Scramjet is a fast and simple functional stream programming framework written on top of node.js object streams. It
exposes a standards inspired javascript API and written fully in native ES6. Thanks to it some built in optimizations
scramjet is much faster and much much simpler than similar frameworks when using asynchronous operations.
It is built upon the logic behind three well known javascript array operations - namingly map, filter and reduce. This
means that if you've ever performed operations on an Array in JavaScript - you already know Scramjet like the back of
your hand.
The main advantage of scramjet is running asynchronous operations on your data streams. First of all it allows you to
perform the transformations both synchronously and asynchronously by using the same API - so now you can "map" your
stream from whatever source and call any number of API's consecutively.
The benchmarks are published in the scramjet-benchmark repo.
Example
How about a CSV parser of all the parkings in the city of Wrocław from http://www.wroclaw.pl/open-data/...
const request = require("request");
const StringStream = require("scramjet").StringStream;
let columns = null;
request.get("http://www.wroclaw.pl/open-data/opendata/its/parkingi/parkingi.csv")
.pipe(new StringStream())
.split("\n")
.parse((line) => line.split(";"))
.pop(1, (data) => columns = data)
.map((data) => columns.reduce((acc, id, i) => (acc[id] = data[i], acc), {}))
.on("data", console.log.bind(console))
Usage
Scramjet uses functional programming to run transformations on your data streams in a fashion very similar to the well
known event-stream node module. Most transformations are done by passing a transform function. You can write your
function in three ways:
- Synchronous
Example: a simple stream transform that outputs a stream of objects of the same id property and the length of the value string.
datastream.map(
(item) => ({id: item.id, length: item.value.length})
)
- Asynchronous using ES2015 async await
Example: A simple stream that uses Fetch API to get all the contents of all entries in the stream
datastream.map(
async (item) => fetch(item)
)
- Asynchronous using Promises
Example: A simple stream that fetches an url mentioned in the incoming object
datastream.map(
(item) => new Promise((resolve, reject) => {
request(item.url, (err, res, data) => {
if (err)
reject(err);
else
resolve(data);
});
})
)
The actual logic of this transform function is as if you passed your function to the then
method of a Promise
resolved with the data from the input stream.
API Docs
Here's the list of the exposed classes and methods, please review the specific documentation for details:
Note that:
- Most of the methods take a callback argument that operates on the stream items.
- The callback, unless it's stated otherwise, will receive an argument with the next chunk.
- If you want to perform your operations asynchronously, return a Promise, otherwise just return the right value.
The quick reference of the exposed classes:
BufferStream ⇐ DataStream
A factilitation stream created for easy splitting or parsing buffers
Detailed BufferStream docs here
Method | Description | Example |
---|
new BufferStream(opts) | Creates the BufferStream | |
bufferStream.shift(chars, func) ⇒ BufferStream | Shift given number of bytes from the original stream | shift example |
bufferStream.split(splitter) ⇒ BufferStream | Splits the buffer stream into buffer objects | split example |
bufferStream.breakup(number) ⇒ BufferStream | Breaks up a stream apart into chunks of the specified length | breakup example |
bufferStream.stringify(encoding) ⇒ StringStream | Creates a string stream from the given buffer stream | stringify example |
bufferStream.parse(parser) ⇒ DataStream | Parses every buffer to object | parse example |
~DataStream ⇐ stream.PassThrough
Detailed DataStream docs here
Method | Description | Example |
---|
new DataStream(opts) | Create the DataStream. | |
dataStream.use(func) ⇒ * | Calls the passed in place with the stream as first argument, returns result. | use example |
dataStream.tee(func) ⇒ DataStream | Duplicate the stream | tee example |
dataStream.reduce(func, into) ⇒ Promise | Reduces the stream into a given accumulator | reduce example |
dataStream.each(func) ↩︎ | Performs an operation on every chunk, without changing the stream | |
dataStream.map(func, Clazz) ⇒ DataStream | Transforms stream objects into new ones, just like Array.prototype.map | map example |
dataStream.filter(func) ⇒ DataStream | Filters object based on the function outcome, just like | filter example |
dataStream.while(func) ⇒ DataStream | Reads the stream while the function outcome is truthy. | |
dataStream.until(func) ⇒ DataStream | Reads the stream until the function outcome is truthy. | |
dataStream.pipe(to, options) ⇒ Writable | Override of node.js Readable pipe. | |
dataStream.toBufferStream(serializer) ⇒ BufferStream | Creates a BufferStream | toBufferStream example |
dataStream.stringify(serializer) ⇒ StringStream | Creates a StringStream | stringify example |
dataStream.toArray(initial) ⇒ Promise | Aggregates the stream into a single Array | |
dataStream.debug(func) ⇒ DataStream | Injects a debugger statement when called. | debug example |
dataStream.cluster(hashFunc, count, stringify, parse) ⇒ ClusteredDataStream | [NYI] Distributes processing to multiple forked subprocesses. | |
dataStream.separate(func, createOptions) ⇒ DataStream | Separates execution to multiple streams using the hashes returned by the passed callback. | separate example |
dataStream.slice(start, end, func) ⇒ DataStream | Gets a slice of the stream to the callback function. | slice example |
dataStream.accumulate(func, into) ⇒ Promise | Accumulates data into the object. | accumulate example |
dataStream.reduceNow(func, into) ⇒ * | Reduces the stream into the given object, returning it immediately. | reduceNow example |
dataStream.remap(func, Clazz) ⇒ DataStream | Remaps the stream into a new stream. | remap example |
dataStream.flatMap(func, Clazz) ⇒ DataStream | Takes any method that returns any iterable and flattens the result. | flatMap example |
dataStream.unshift(item) ↩︎ | Pushes any data at call time | |
dataStream.flatten() ⇒ DataStream | A shorthand for streams of Arrays to flatten them. | |
dataStream.batch(count) ⇒ DataStream | Aggregates chunks in arrays given number of number of items long. | batch example |
dataStream.timeBatch(ms, count) ⇒ DataStream | Aggregates chunks to arrays not delaying output by more than the given number of ms. | timeBatch example |
dataStream.assign(func) ⇒ DataStream | Transforms stream objects by assigning the properties from the returned | assign example |
dataStream.shift(count, func) ⇒ DataStream | Shifts the first n items from the stream and pipes the other | shift example |
DataStream.fromArray(arr) ⇒ DataStream | Create a DataStream from an Array | fromArray example |
DataStream.fromIterator(iter) ⇒ DataStream | Create a DataStream from an Iterator | fromIterator example |
~StringStream ⇐ DataStream
A stream of string objects for further transformation on top of DataStream.
Detailed StringStream docs here
Method | Description | Example |
---|
new StringStream(encoding) | Constructs the stream with the given encoding | |
stringStream.shift(bytes, func) ⇒ StringStream | Shifts given length of chars from the original stream | shift example |
stringStream.split(splitter) ⇒ StringStream | Splits the string stream by the specified regexp or string | split example |
stringStream.match(splitter) ⇒ StringStream | Finds matches in the string stream and streams the match results | match example |
stringStream.toBufferStream() ⇒ StringStream | Transforms the StringStream to BufferStream | toBufferStream example |
stringStream.parse(parser) ⇒ DataStream | Parses every string to object | parse example |
StringStream.SPLIT_LINE | A handly split by line regex to quickly get a line-by-line stream | |
StringStream.fromString(str, encoding) ⇒ StringStream | Creates a StringStream and writes a specific string. | |
~MultiStream
An object consisting of multiple streams than can be refined or muxed.
Detailed MultiStream docs here
Method | Description | Example |
---|
new MultiStream(streams, options) | Crates an instance of MultiStream with the specified stream list | |
multiStream.streams : Array | Array of all streams | |
multiStream.length ⇒ number | Returns the current stream length | |
multiStream.map(aFunc) ⇒ MultiStream | Returns new MultiStream with the streams returned by the tranform. | map example |
multiStream.find(...args) ⇒ DataStream | Calls Array.prototype.find on the streams | |
multiStream.filter(func) ⇒ MultiStream | Filters the stream list and returns a new MultiStream with only the | filter example |
multiStream.mux(cmp) ⇒ DataStream | Muxes the streams into a single one | mux example |
multiStream.add(stream) | Adds a stream to the MultiStream | add example |
multiStream.remove(stream) | Removes a stream from the MultiStream | remove example |
Browserifying
Scramjet works in the browser too, there's a nice, self-contained sample in here, just run it:
git clone https://github.com/signicode/scramjet.git
cd scramjet
npm install .
cd samples/browser
npm start
If you need your scramjet version for the browser, grab browserify and just run:
browserify lib/index -standalone scramjet -o /path/to/your/browserified-scramjet.js
With this you can run your transformations in the browser, use websockets to send them back and forth. If you do and fail for some reason, please remember to be issuing those issues - as no one person can test all the use cases and I am but one person.
License and contributions
As of version 2.0 Scramjet is MIT Licensed.
Help wanted
The project need's your help! There's lots of work to do - transforming and muxing, joining and splitting, browserifying, modularizing, documenting and issuing those issues.
If you want to help and be part of the Scramjet team, please reach out to me, signicode on Github or email me: scramjet@signicode.com.