New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

scramjet

Package Overview
Dependencies
Maintainers
1
Versions
179
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

scramjet

Simple yet powerful live or real-time data computation framework like event-stream, but written in ES6 and using streams2

  • 2.10.0
  • Source
  • npm
  • Socket score

Version published
Weekly downloads
2.6K
decreased by-49.33%
Maintainers
1
Weekly downloads
 
Created
Source

Master Build Status Develop Build Status Dependencies Dev Dependencies

What does it do?

Scramjet is a powerful, yet simple framework written on top of node.js object streams, somewhat similar to the well-known event-stream or highland module, but with a much simplier API and written fully in ES6. It is built upon the logic behind three well known javascript array operations - namingly map, filter and reduce. This means that if you've ever performed operations on an Array in JavaScript - you already know Scramjet like the back of your hand.

The main advantage of scramjet is running asynchronous operations on your data streams. First of all it allows you to perform the transformations both synchronously and asynchronously by using the same API - so now you can "map" your stream from whatever source and call any number of API's consecutively.

The benchmarks are punblished in the scramjet-benchmark repo.

Example

How about a CSV parser of all the parkings in the city of Wrocław from http://www.wroclaw.pl/open-data/...

const request = require("request");
const StringStream = require("scramjet").StringStream;

let columns = null;
request.get("http://www.wroclaw.pl/open-data/opendata/its/parkingi/parkingi.csv")
    .pipe(new StringStream())
    .split("\n")
    .parse((line) => line.split(";"))
    .pop(1, (data) => columns = data)
    .map((data) => columns.reduce((acc, id, i) => (acc[id] = data[i], acc), {}))
    .on("data", console.log.bind(console))

API Docs

Here's the list of the exposed classes and methods, please review the specific documentation for details:

Note that:

  • Most of the methods take a callback argument that operates on the stream items.
  • The callback, unless it's stated otherwise, will receive an argument with the next chunk.
  • If you want to perform your operations asynchronously, return a Promise, otherwise just return the right value.

The quick reference of the exposed classes:

DataStream ⇐ stream.PassThrough

DataStream is the primary stream type for Scramjet. When you parse your stream, just pipe it you can then perform calculations on the data objects streamed through your flow.

Detailed DataStream docs here

MethodDescriptionExample
new DataStream(opts)Create the DataStream.DataStream example
dataStream.TimeSource : ObjectSource of time - must implement the interface of Date.
dataStream.setTimeout : functionsetTimeout method
dataStream.clearTimeout : functionsetTimeout method
dataStream.debug(func) ⇒ DataStreamInjects a debugger statement when called.debug example
dataStream.use(func) ⇒ *Calls the passed in place with the stream as first argument, returns result.use example
dataStream.cluster(hashFunc, count, stringify, parse) ⇒ ClusteredDataStream[NYI] Distributes processing to multiple forked subprocesses.
dataStream.group(func) ⇒ DataStreamSeparates execution to multiple streams using the hashes returned by the passed callbackgroup example
dataStream.tee(func) ⇒ DataStreamDuplicate the streamtee example
dataStream.slice(start, end, func) ⇒ DataStreamGets a slice of the stream to the callback function.slice example
dataStream.accumulate(func, into) ⇒ PromiseAccumulates data into the object.accumulate example
dataStream.reduce(func, into) ⇒ PromiseReduces the stream into a given accumulatorreduce example
dataStream.reduceNow(func, into) ⇒ *Reduces the stream into the given object, returning it immediately.reduceNow example
dataStream.remap(func, Clazz) ⇒ DataStreamRemaps the stream into a new stream.remap example
dataStream.flatMap(func, Clazz) ⇒ DataStreamTakes any method that returns any iterable and flattens the result.flatMap example
dataStream.unshift(item) ↩︎Pushes any data at call time
dataStream.flatten() ⇒ DataStreamA shorthand for streams of Arrays to flatten them.
dataStream.batch(count) ⇒ DataStreamAggregates chunks in arrays given number of number of items long.batch example
dataStream.timeBatch(ms, count) ⇒ DataStreamAggregates chunks to arrays not delaying output by more than the given number of ms.timeBatch example
dataStream.each(func) ↩︎Performs an operation on every chunk, without changing the stream
dataStream.map(func, Clazz) ⇒ DataStreamTransforms stream objects into new ones, just like Array.prototype.mapmap example
dataStream.assign(func) ⇒ DataStreamTransforms stream objects by assigning the properties from the returnedassign example
dataStream.filter(func) ⇒ DataStreamFilters object based on the function outcome, just likefilter example
dataStream.shift(count, func) ⇒ DataStreamShifts the first n items from the stream and pipes the othershift example
dataStream.separate() ⇒ MultiStreamSplits the stream two waysseparate example
dataStream.toBufferStream(serializer) ⇒ BufferStreamCreates a BufferStreamtoBufferStream example
dataStream.stringify(serializer) ⇒ StringStreamCreates a StringStreamstringify example
dataStream.toArray(initial) ⇒ PromiseAggregates the stream into a single Array
DataStream.fromArray(arr) ⇒ DataStreamCreate a DataStream from an ArrayfromArray example
DataStream.fromIterator(iter) ⇒ DataStreamCreate a DataStream from an IteratorfromIterator example

StringStream ⇐ DataStream

A stream of string objects for further transformation on top of DataStream.

Detailed StringStream docs here

MethodDescriptionExample
new StringStream(encoding)Constructs the stream with the given encodingStringStream example
stringStream.shift(bytes, func) ⇒ StringStreamShifts given length of chars from the original streamshift example
stringStream.split(splitter) ⇒ StringStreamSplits the string stream by the specified regexp or stringsplit example
stringStream.append(arg) ⇒ StringStreamAppends given argument to all the items.append example
stringStream.prepend(arg) ⇒ StringStreamPrepends given argument to all the items.prepend example
stringStream.match(splitter) ⇒ StringStreamFinds matches in the string stream and streams the match resultsmatch example
stringStream.toBufferStream() ⇒ StringStreamTransforms the StringStream to BufferStreamtoBufferStream example
stringStream.parse(parser) ⇒ DataStreamParses every string to objectparse example
StringStream.SPLIT_LINEA handly split by line regex to quickly get a line-by-line stream
StringStream.fromString(str, encoding) ⇒ StringStreamCreates a StringStream and writes a specific string.

BufferStream ⇐ DataStream

A factilitation stream created for easy splitting or parsing buffers

Detailed BufferStream docs here

MethodDescriptionExample
new BufferStream(opts)Creates the BufferStreamBufferStream example
bufferStream.shift(chars, func) ⇒ BufferStreamShift given number of bytes from the original streamshift example
bufferStream.split(splitter) ⇒ BufferStreamSplits the buffer stream into buffer objectssplit example
bufferStream.breakup(number) ⇒ BufferStreamBreaks up a stream apart into chunks of the specified lengthbreakup example
bufferStream.toStringStream(encoding) ⇒ StringStreamCreates a string stream from the given buffer streamtoStringStream example
bufferStream.parse(parser) ⇒ DataStream[Parallel] Parses every buffer to objectparse example

MultiStream

An object consisting of multiple streams than can be refined or muxed.

Detailed MultiStream docs here

MethodDescriptionExample
new MultiStream(streams, options)Crates an instance of MultiStream with the specified stream listMultiStream example
multiStream.streams : ArrayArray of all streams
multiStream.map(aFunc) ⇒ MultiStreamReturns new MultiStream with the streams returned by the tranform.map example
multiStream.filter(func) ⇒ MultiStreamFilters the stream list and returns a new MultiStream with only thefilter example
multiStream.dedupe(cmp) ⇒ DataStreamRemoves duplicate items from stream using the given hash functiondedupe example
multiStream.mux(cmp) ⇒ DataStreamMuxes the streams into a single onemux example
multiStream.add(stream)Adds a stream to the MultiStreamadd example
multiStream.remove(stream)Removes a stream from the MultiStreamremove example

Browserifying

Scramjet works in the browser too, there's a nice, self-contained sample in here, just run it:

    git clone https://github.com/signicode/scramjet.git
    cd scramjet
    npm install .
    cd samples/browser
    npm start # point your browser to http://localhost:30035 and open console

If you need your scramjet version for the browser, grab browserify and just run:

    browserify lib/index -standalone scramjet -o /path/to/your/browserified-scramjet.js

With this you can run your transformations in the browser, use websockets to send them back and forth. If you do and fail for some reason, please remember to be issuing those issues - as no one person can test all the use cases and I am but one person.

Usage

Scramjet uses functional programming to run transformations on your data streams in a fashion very similar to the well known event-stream node module. Most transformations are done by passing a transform function. You can write your function in two ways:

  1. Synchronous

Example: a simple stream transform that outputs a stream of objects of the same id property and the length of the value string.

   datastream.map(
       (item) => ({id: item.id, length: item.value.length})
   )
  1. Asynchronous (using Promises)

Example: A simple stream that fetches an url mentioned in the incoming object

   datastream.map(
       (item) => new Promise((resolve, reject) => {
           request(item.url, (err, res, data) => {
               if (err)
                   reject(err); // will emit an "error" event on the stream
               else
                   resolve(data);
           });
       })
   )

The actual logic of this transform function is as if you passed your function to the then method of a Promise resolved with the data from the input stream.

License and contributions

As of version 2.0 Scramjet is MIT Licensed.

Help wanted

The project need's your help! There's lots of work to do - transforming and muxing, joining and splitting, browserifying, modularizing, documenting and issuing those issues.

If you want to help and be part of the Scramjet team, please reach out to me, signicode on Github or email me: scramjet@signicode.com.

Keywords

FAQs

Package last updated on 18 May 2017

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc