stream-json
stream-json
is a collection of node.js stream components for creating custom standard-compliant JSON processors, which requires a minimal memory footprint. It can parse JSON files far exceeding available memory. Even individual primitive data items (keys, strings, and numbers) can be streamed piece-wise. Streaming SAX-inspired event-based API is included as well.
Available components:
- Streaming JSON parsers:
- Streaming JSON
Parser
is manually implemented based on RegExp
. - Streaming JSON
AltParser
implemented manually without regular expressions. - Streaming JSON
ClassicParser
based on parser-toolkit.
Streamer
, which converts tokens into SAX-like event stream.Packer
, which can assemble numbers, strings, and object keys from individual chunks. It is useful, when user knows that individual data items can fit the available memory. Overall, it makes the API simpler.Combo
, which actually packs Parser
, Streamer
, and Packer
together. Its advantage over individual components is speed.Filter
, which is a flexible tool to select only important sub-objects using either a regular expression, or a function.Emitter
, which converts an event stream into events by bridging stream.Writable
with EventEmitter
.Source
, which is a helper that connects streams using pipe()
and converts an event stream on the end of pipe into events, similar to Emitter
.- Various utilities:
Assembler
to assemble full objects from an event stream.StreamArray
handles a frequent use case: a huge array of relatively small objects similar to Django-produced database dumps. It streams array components individually taking care of assembling them automatically.StreamFilteredArray
is a companion for StreamArray
. The difference is that it allows to filter out unneeded objects in an efficient way without assembling them fully.FilterObjects
filters complete objects and primitives.
Additionally a helper function is available in the main file, which creates a Source
object with a default set of stream components.
This toolkit is distributed under New BSD license.
See the full documentation below.
Introduction
The simplest example (streaming from a file):
var makeSource = require("stream-json");
var source = makeSource();
var fs = require("fs");
var objectCounter = 0;
source.on("startObject", function(){ ++objectCounter; });
source.on("end", function(){
console.log("Found ", objectCounter, " objects.");
});
fs.createReadStream("sample.json").pipe(source.input);
Installation
npm install stream-json
Documentation
Parser
This is the workhorse of the package. It is a Transform stream, which consumes text, and produces a stream of tokens. It is always the first in a pipe chain being directly fed with a text from a file, a socket, the standard input, or any other text stream. Its Writeable
part operates in a buffer mode, while its Readable
part operates in an objectMode.
var Parser = require("stream-json/Parser");
var parser = new Parser(options);
var next = fs.createReadStream(fname).pipe(parser);
options
can contain some technical parameters, and it is rarely needs to be specified. You can find it thoroughly documented in node.js' Stream documentation.
The test files for Parser
: tests/test_parser.js
, tests\manual\test_parser.js
. Actually all test files in tests/
use Parser
.
If you want to catch parsing errors, attach an error listener directly to a parser component — unlike data errors do not travel through stream pipes.
Streamer
Streamer
is a Transform stream, which consumes a stream of tokens, and produces a stream of events. It is always the second in a pipe chain after the Parser
. It knows JSON semantics and produces actionable events. It operates in an objectMode.
var Streamer = require("stream-json/Streamer");
var streamer = new Streamer(options);
var next = fs.createReadStream(fname).
pipe(parser).pipe(streamer);
options
can contain some technical parameters, and it is rarely needs to be specified. You can find it thoroughly documented in node.js' Stream documentation.
Following is a list of all event objects produced by Streamer
:
{name: "startObject"};
{name: "endObject"};
{name: "startArray"};
{name: "endArray"};
{name: "startKey"};
{name: "stringChunk", value: "actual string value"};
{name: "endKey"};
{name: "startString"};
{name: "stringChunk", value: "actual string value"};
{name: "endString"};
{name: "startNumber"};
{name: "numberChunk", value: "actual string value"};
{name: "endNumber"};
{name: "nullValue", value: null};
{name: "trueValue", value: true};
{name: "falseValue", value: false};
The event stream is well-formed:
- All
startXXX
are balanced with endXXX
. - Between
startKey
and endKey
can be zero or more stringChunk
events. No other event are allowed. - Between
startString
and endString
can be zero or more stringChunk
events. No other event are allowed. - Between
startNumber
and endNumber
can be one or more numberChunk
events. No other event are allowed.
- All number chunks combined constitute a valid number value.
- Number chunk values are strings, not numbers!
- After
startObject
optional key-value pairs emitted in a strict pattern: a key-related events, a value, and this cycle can be continued until all key-value pairs are streamed.
The test files for Streamer
: tests/test_streamer.js
and tests/manual/test_streamer.js
.
Packer
Packer
is a Transform stream, which passes through a stream of events, optionally assembles keys, strings, and/or numbers from chunks, and adds new events with assembled values. It is a companion for Streamer
, which frees users from implementing the assembling logic, when it is known that keys, strings, and/or numbers will fit in the available memory. It operates in an objectMode.
var Packer = require("stream-json/Packer");
var packer = new Packer(options);
var next = fs.createReadStream(fname).
pipe(parser).pipe(streamer).pipe(packer);
options
contains some important parameters, and should be specified. It can contain some technical properties thoroughly documented in node.js' Stream documentation. Additionally it recognizes following properties:
-
packKeys
can be true
or false
(the default). If true
, a key value is returned as a new event:
{name: "keyValue", value: "assembled key value"}
keyValue
event always follows endKey
.
-
packStrings
can be true
or false
(the default). If true
, a string value is returned as a new event:
{name: "stringValue", value: "assembled string value"}
stringValue
event always follows endString
.
-
packNumbers
can be true
or false
(the default). If true
, a number value is returned as a new event:
{name: "numberValue", value: "assembled number value"}
numberValue
event always follows endNumber
.
value
of this event is a string, not a number. If user wants to convert it to a number, they can do it themselves. The simplest way to do it (assuming your platform and JavaScript can handle it), is to force it to a number:
var n = +event.value;
The test files for Packer
: tests/test_packer.js
and tests/manual/test_packer.js
.
Combo
Combo
is a Transform stream, which combines Parser
, Streamer
, and Packer
. It accepts the same extra options as Packer
, and produces a stream of objects expected from Streamer
, and augmented by Packer
. Please refer to the documentation of those three components for more details.
While logically Combo
is a combination of three existing components, it has an important advantage: speed. The codes for Parser
, Streamer
, and Packer
are merged together in one component avoiding overhead of node.js streams completely, which is significant. It is recommended to use Combo
over a chain of Parser
+ Streamer
, or Parser
+ Streamer
+ Packer
.
The test file for Combo
: tests/test_combo.js
.
Emitter
Emitter
is a Writeable stream, which consumes a stream of events, and emits them on itself (all streams are instances of EventEmitter). The standard finish
event is used to indicate the end of a stream. It operates in an objectMode.
var Emitter = require("stream-json/Emitter");
var emitter = new Emitter(options);
emitter.on("startArray", function(){
console.log("array!");
});
emitter.on("numberValue", function(value){
console.log("number:", value);
});
emitter.on("finish", function(){
console.log("done");
});
fs.createReadStream(fname).
pipe(parser).pipe(streamer).pipe(packer).pipe(emitter);
options
can contain some technical parameters, and it is rarely needs to be specified. You can find it thoroughly documented in node.js' Stream documentation.
The test file for Emitter
: tests/test_emitter.js
.
Filter
Filter
is a Transform stream, which is an advance selector for sub-objects from a stream of events. It operates in an objectMode.
var Filter = require("stream-json/Filter");
var filter = new Filter(options);
var next = fs.createReadStream(fname).
pipe(parser).pipe(streamer).pipe(filter);
options
contains some important parameters, and should be specified. It can contain some technical properties thoroughly documented in node.js' Stream documentation. Additionally it recognizes following properties:
separator
is a string to use to separate key and index values forming a path in a current object. By default it is .
(a dot).filter
can be a regular expression, or a function. By default it allows all events.
- If it is a function, this function is called in a context of a
Filter
object with two parameters:
path
, which is an array of current key and index values. All keys are represented as strings, while all array indices are represented as numbers. It can be used to understand what kind of object we are dealing with.event
is an event object described above.
The function should return a Boolean value, with true
indicating that we are interested in this event, and it should be passed through.
- If it is a regular expression, then a current
path
is joined be a separator
and tested against the regular expression. If a match was found, it indicates that the event should be passed through. Otherwise it will be rejected.
Filter
produces a well-formed event stream.
The test files for Filter
: tests/test_filter.js
and tests/manual/test_filter.js
.
Path examples
Given a JSON object:
{"a": [true, false, 0, null]}
The path of false
as an array:
["a", 1]
The same path converted to a string joined by a default separator .
:
"a.1"
Source
Source
is a convenience object. It connects individual streams with pipes, and attaches itself to the end emitting all events on itself (just like Emitter
). The standard end
event is used to indicate the end of a stream. It is based on EventEmitter.
var Source = require("stream-json/Source");
var source = new Source([parser, streamer, packer]);
source.on("startArray", function(){
console.log("array!");
});
source.on("numberValue", function(value){
console.log("number:", value);
});
fs.createReadStream(fname).pipe(source.input);
The constructor of Source
accepts one mandatory parameter:
streams
should be a non-empty array of pipeable streams. At the end the last stream should produce a stream of events.
Source
exposes three public properties:
streams
— an array of streams so you can inspect them individually, if needed. They are connected sequentially in the array order.input
— the beginning of a pipeline, which should be used as an input for a JSON stream.output
— the end of a pipeline, which can be used to pipe the resulting stream of objects for futher processing.
The test files for Source
: tests/test_source.js
and tests/manual/test_source.js
.
main: makeSource()
The main file contains a helper function, which creates a commonly used configuration of streams, and returns a Source
object.
var makeSource = require("stream-json");
var source = makeSource(options);
source.on("startArray", function(){
console.log("array!");
});
source.on("numberValue", function(value){
console.log("number:", value);
});
fs.createReadStream(fname).pipe(source.input);
options
can contain some technical parameters, and it is completely optional. You can find it thoroughly documented in node.js' Stream documentation, and here. It is directly passed to Combo
, so it will use its custom parameters.
Algorithm:
makeSource()
checks if either of packKeys
, packStrings
, or packNumbers
are specified in options.- If any of them are
true
, a Combo
instance is created with options
. - If all of them are unspecified, all pack flags are assumed to be
true
, and a Combo
is created. - A newly created instance of
Combo
is used to create a Source
instance.
The most common use case is to call makeSource()
without parametrs. This scenario assumes that all key, string, and/or number values can be kept in memory, so user can use simplified events keyValue
, stringValue
, and numberValue
.
The test files for makeSource()
are tests/test_source.js
, tests/manual/test_main.js
, and tests/manual/test_chunk.js
.
ClassicParser
It is a drop-in replacement for Parser
, but it can emit whitespace, and token position information, yet it is slower than the main parser. It was the main parser for 0.1.x versions.
The test file for ClassicParser
: tests/test_classic.js
.
AltParser
It is another drop-in replacement for Parser
. Just like ClassicParser
it can emit whitespace, and token position information, but can be slower than the current main parser on platforms with optimized RegExp
implementation. It is faster than Parser
on node.js 0.10.
In general, its speed depends heavily on the implementation of regular expressions by node.js. When node.js has switched from an interpreted regular expressions, to the JIT compiled ones, both ClassicParser
, and Parser
got a nice performance boost. Yet, even the latest (as of 0.12) JIT compiler uses a simple yet non-linear algorithm to implement regular expressions instead of NFA and/or DFA. Future enhancements to node.js would make RegExp
-based parsers faster, potentially overtaking manually written JavaScript-only implementations.
The test file for AltParser
: tests/test_alternative.js
.
utils/Assembler
A helper class to convert a JSON stream to a fully assembled JS object. It can be used to assemble sub-objects.
var makeSource = require("stream-json");
var Assembler = require("stream-json/utils/Assembler");
var source = makeSource(options),
assembler = new Assembler();
source.output.on("data", function(chunk){
assembler[chunk.name] && assembler[chunk.name](chunk.value);
});
source.output.on("end", function(){
console.log(assembler.current);
});
fs.createReadStream(fname).pipe(source.input);
Assembler
is a simple state machine with an explicit stack. It exposes three properties:
current
— an object we are working with at the moment. It can be either an object or an array.
- Initial value is
null
. - If top-level object is a primitive value (
null
, true
, false
, a number, or a string), it will be placed in current
too.
key
— is a key value (a string) for a currently processed value, or null
, if not expected.
- If
current
is an object, a primitive value will be added directly to it using a current value of key
.
- After use
key
is assigned null
to prevent memory leaks.
- If
current
is an array, a primitive value will be added directly to it by push()
.
stack
— an array of parent objects.
stack
always grows/shrinks by two items: a value of current
and a value of key
.- When an object or an array is closed, it is added to its parent, which is removed from the stack to become a current object again.
- While adding to a parent a saved key is used if needed. Otherwise the second value is ignored.
- When an object or an array is started, the
current
object and key
are saved to stack
.
Obviously Assembler
should be used only when you are sure that the result will fit into memory. It automatically means that all primitive values (strings or numbers) are small enough to fit in memory too. As such Assembler
is meant to be used after Packer
, which reconstructs keys, strings, and numbers from possible chunks.
On the other hand, we use stream-json
when JSON streams are big, and JSON.parse()
is not an option. But we use Assembler
to assemble sub-objects. One way to do it is to start directing calls to Assembler
when we already selected a sub-object with Filter
. Another way is shown in StreamArray
.
The test file for Assembler
: tests/test_assembler.js
.
utils/StreamArray
This utility deals with a frequent use case: our JSON is an array of various sub-objects. The assumption is that while individual array items fit in memory, the array itself does not. Such files are frequently produced by various database dump utilities, e.g., Django's dumpdata.
It is a Transform stream, which opertes in an objectMode.
StreamArray
produces a stream of objects in following format:
{index, value}
Where index
is a numeric index in the array starting from 0, and value
is a corresponding value. All objects are produced strictly sequentially.
var StreamArray = require("stream-json/utils/StreamArray");
var stream = StreamArray.make();
stream.output.on("data", function(object){
console.log(object.index, object.value);
});
stream.output.on("end", function(){
console.log("done");
});
fs.createReadStream(fname).pipe(stream.input);
StreamArray
is a constructor, which optionally takes one object: options
. options
can contain some technical parameters, and it is rarely needs to be specified. You can find it thoroughly documented in node.js' Stream documentation.
Directly on StreamArray
there is a class-level helper function make()
, which helps to construct a proper pipeline. It is similar to makeSource()
and takes the same argument options
. Internally it creates and connects Combo
and StreamArray
, and returns an object with three properties:
streams
— an array of streams so you can inspect them individually, if needed. They are connected sequentially in the array order.input
— the beginning of a pipeline, which should be used as an input for a JSON stream.output
— the end of a pipeline, which can be used for events, or to pipe the resulting stream of objects for futher processing.
The test file for StreamArray
: tests/test_array.js
.
utils/StreamObject
Similar to StreamArray
, except that instead of breaking an array into its elements it breaks an object into key/value pairs. Each pair has two properties: key
and value
.
Like StreamArray
, StreamObject
is both a constructor and has a static make()
function for common use cases.
var StreamObject = require("stream-json/utils/StreamObject");
var stream = StreamObject.make();
stream.output.on("data", function(object){
console.log(object.key, object.value);
});
stream.output.on("end", function(){
console.log("done");
});
fs.createReadStream(fname).pipe(stream.input);
See the StreamArray
documentation for more information.
utils/StreamFilteredArray
This utility handles the same use case as StreamArray
, but in addition it allows to check the objects as they are being built to reject, or accept them. Rejected objects are not assembled, and filtered out.
It is a Transform stream, which opertes in an objectMode.
Just like StreamArray
, StreamFilteredArray
produces a stream of objects in following format:
{index, value}
Where index
is a numeric index in the array starting from 0, and value
is a corresponding value. All objects are produced strictly sequentially.
var StreamFilteredArray = require("stream-json/utils/StreamFilteredArray");
function f(assembler){
if(assembler.stack.length == 2 && assembler.key === null){
if(assembler.current.hasOwnProperty("active")){
return assembler.current.active;
}
}
}
var stream = StreamFilteredArray.make({objectFilter: f});
stream.output.on("data", function(object){
console.log(object.index, object.value);
});
stream.output.on("end", function(){
console.log("done");
});
fs.createReadStream(fname).pipe(stream.input);
StreamFilteredArray
is a constructor, which optionally takes one object: options
. options
can contain some technical parameters, which are rarely needs to be specified. You can find it thoroughly documented in node.js' Stream documentation. But additionally it recognizes the following property:
objectFilter
is a function, which takes an Assembler
instance as its only argument, and may return following values to indicate its decision:
- any truthy value indicates that we are interested in this object.
StreamFilteredArray
will stop polling our filter function and will assemble the object for future use. false
(the exact value) indicates that we should skip this object. StreamFilteredArray
will stop polling our filter function, and will stop assembling the object, discarding it completely.- any other falsy value indicates that we have not enough information (most likely because the object was not assembled yet to make a decision).
StreamFilteredArray
will poll our filter function next time the object changes.
The default for objectFilter
allows passing all objects.
In general objectFilter
is called on incomplete objects. It means that if a decision is based on a value of a certain properties, those properties could be unprocessed at that moment. In such case it is reasonable to delay a decision by returning a falsy (but not false
) value, like undefined
.
Complete objects are not submitted to a filter function and accepted automatically. It means that all primitive values: booleans, numbers, strings, null
objects are streamed, and not consulted with objectFilter
.
If you want to filter out complete objects, including primitive values, use FilterObjects
.
StreamFilteredArray
instances expose one property:
objectFilter
is a function, which us called for every top-level streamable object. It can be replaced with another function at any time. Usually it is replaced between objects after an accept/reject decision is made.
Directly on StreamFilteredArray
there is a class-level helper function make()
, which is an exact clone of StreamArray.make()
.
The test file for StreamFilteredArray
: tests/test_filtered_array.js
.
utils/FilterObjects
This utility filters out complete objects (and primitive values) working with a stream in the same format as StreamArray
and StreamFilteredArray
:
{index, value}
Where index
is a numeric index in the array starting from 0, and value
is a corresponding value. All objects are produced strictly sequentially.
It is a Transform stream, which opertes in an objectMode.
var StreamArray = require("stream-json/utils/StreamArray");
var FilterObjects = require("stream-json/utils/FilterObjects");
function f(item){
if(item.index % 2 && item.value &&
typeof item.value == "object" &&
!(item.value instanceof Array)){
return true;
}
return false;
}
var stream = StreamArray.make(),
filter = new FilterObjects({itemFilter: f});
filter.on("data", function(object){
console.log(object.index, object.value);
});
filter.on("end", function(){
console.log("done");
});
fs.createReadStream(fname).pipe(stream.input).pipe(filter);
FilterObjects
is a constructor, which optionally takes one object: options
. options
can contain some technical parameters, which are rarely needs to be specified. You can find it thoroughly documented in node.js' Stream documentation. But additionally it recognizes the following property:
itemFilter
is a function, which takes a {index, value}
object as its only argument, and may return following values to indicate its decision:
- any truthy value to accept the object.
- any falsy value to reject the object.
The default for itemFilter
accepts all objects.
FilterObjects
instances expose one property:
itemFilter
is a function, which us called for every top-level streamable object. It can be replaced with another function at any time.
The test file for FilterObjects
: tests/test_filter_objects.js
.
Advanced use
The whole library is organized as a set of small components, which can be combined to produce the most effective pipeline. All components are based on node.js streams, and events. They implement all required standard APIs. It is easy to add your own components to solve your unique tasks.
The code of all components are compact and simple. Please take a look at their source code to see how things are implemented, so you can produce your own components in no time.
Obviously, if a bug is found, or a way to simplify existing components, or new generic components are created, which can be reused in a variety of projects, don't hesitate to open a ticket, and/or create a pull request.
Credits
The test file tests/sample.json.gz
is a combination of several publicly available datasets merged and compressed with gzip:
Apendix A: tokens
Parser
, AltParser
, and ClassicParser
produce a stream of tokens cortesy of parser-toolkit. While normally user should use Streamer
to convert them to a much simpler JSON-aware event stream, or use Combo
directly, in some cases it can be advantageous to deal with raw tokens.
Each token is an object with following properties:
id
is a string, which uniquely identifies a token.value
is a string, which corresponds to this token, and was actually matched.line
is a line number, where this token was found. All lines are counted from 1.pos
is a position number inside a line (in characters, so \t
is one character). Position is counted from 1.
Warning: Parser
does not incliude line
and pos
in its tokens.
JSON grammar is defined in Grammar.js
. It is taken almost verbatim from JSON.org.
Following tokens are produced (listed by id
):
ws
: white spaces, usually ignored. (Not produced by Parser
.)-
: a unary negation used in a negative number either to start a number, or as an exponent sign.+
: used as an exponent sign.0
: zero, as is - '0'.nonZero
: non-zero digit - /[1-9]/
..
: a decimal point used in a number.exponent
: 'e' or 'E' as an exponent symbol in a number written in scientific notation.numericChunk
: a string of digits."
: a double quote, used to open and close a string.plainChunk
: a string of non-escaped characters, used inside a string.escapedChars
: an escaped character, used inside a string.true
: represents a literal true
.false
: represents a literal false
.null
: represents a literal null
.{
: starts an object literal.}
: closes an object literal.[
: starts an array literal.]
: closes an array literal.,
: separates components of an array, or an object.:
: separates a key and its value in an object literal.
Release History
- 0.4.2 refreshed dependencies.
- 0.4.1 added
StreamObject
by Sam Noedel - 0.4.0 new high-performant Combo component, switched to the previous parser.
- 0.3.0 new even faster parser, bug fixes.
- 0.2.2 refreshed dependencies.
- 0.2.1 added utilities to filter objects on the fly.
- 0.2.0 new faster parser, formal unit tests, added utilities to assemble objects on the fly.
- 0.1.0 bug fixes, more documentation.
- 0.0.5 bug fixes.
- 0.0.4 improved grammar.
- 0.0.3 the technical release.
- 0.0.2 bug fixes.
- 0.0.1 the initial release.