vega-dataflow
Advanced tools
Comparing version 1.1.1 to 1.2.0
{ | ||
"name": "vega-dataflow", | ||
"version": "1.1.1", | ||
"version": "1.2.0", | ||
"description": "Vega streaming dataflow graph.", | ||
@@ -19,3 +19,3 @@ "repository": { | ||
"dependencies": { | ||
"datalib": "^1.3.0", | ||
"datalib": "^1.4.5", | ||
"vega-logging": "^1.0" | ||
@@ -26,2 +26,3 @@ }, | ||
"browserify-shim": "^3.8.9", | ||
"browserify-versionify": "^1.0.4", | ||
"chai": "^3.0.0", | ||
@@ -28,0 +29,0 @@ "istanbul": "latest", |
@@ -34,4 +34,4 @@ var log = require('vega-logging'), | ||
if (input.reflow) { | ||
input.mod = input.mod.concat(Tuple.idFilter(this._data, | ||
input.add, input.mod, input.rem)); | ||
input.mod = input.mod.concat( | ||
Tuple.idFilter(this._data, input.add, input.mod, input.rem)); | ||
input.reflow = false; | ||
@@ -38,0 +38,0 @@ } |
@@ -5,4 +5,3 @@ var log = require('vega-logging'), | ||
Tuple = require('./Tuple'), | ||
Node = require('./Node'), // jshint ignore:line | ||
SENTINEL = require('./Sentinel'); | ||
Node = require('./Node'); // jshint ignore:line | ||
@@ -14,9 +13,10 @@ function DataSource(graph, name, facet) { | ||
this._source = null; | ||
this._facet = facet; | ||
this._input = ChangeSet.create(); | ||
this._facet = facet; | ||
this._input = ChangeSet.create(); | ||
this._output = null; // Output changeset | ||
this._inputNode = null; | ||
this._outputNode = null; | ||
this._pipeline = null; // Pipeline of transformations. | ||
this._collector = null; // Collector to materialize output of pipeline | ||
this._revises = false; // Does any pipeline operator need to track prev? | ||
this._collector = null; // Collector to materialize output of pipeline. | ||
this._mutates = false; // Does any pipeline operator mutate tuples? | ||
@@ -38,8 +38,3 @@ } | ||
prototype.insert = function(tuples) { | ||
var prev = this._revises ? null : undefined; | ||
var insert = tuples.map(function(d) { | ||
return Tuple.ingest(d, prev); | ||
}); | ||
this._input.add = this._input.add.concat(insert); | ||
this._input.add = this._input.add.concat(tuples.map(Tuple.ingest)); | ||
return this; | ||
@@ -86,22 +81,2 @@ }; | ||
function set_prev(d) { | ||
if (d._prev === undefined) d._prev = SENTINEL; | ||
} | ||
prototype.revises = function(p) { | ||
if (!arguments.length) return this._revises; | ||
// If we've not needed prev in the past, but a new dataflow node needs it now | ||
// ensure existing tuples have prev set. | ||
if (!this._revises && p) { | ||
this._data.forEach(set_prev); | ||
// New tuples that haven't yet been merged into _data | ||
this._input.add.forEach(set_prev); | ||
} | ||
this._revises = this._revises || p; | ||
return this; | ||
}; | ||
prototype.mutates = function(m) { | ||
@@ -126,23 +101,85 @@ if (!arguments.length) return this._mutates; | ||
var ds = this; | ||
this._inputNode = DataSourceInput(this); | ||
this._outputNode = DataSourceOutput(this); | ||
// Add a collector to materialize the output of pipeline operators. | ||
if (pipeline.length) { | ||
ds._collector = new Collector(this._graph); | ||
pipeline.push(ds._collector); | ||
ds._revises = pipeline.some(function(p) { return p.revises(); }); | ||
ds._mutates = pipeline.some(function(p) { return p.mutates(); }); | ||
var graph = this._graph, | ||
mutates = 0, | ||
collector = this._inputNode, | ||
i, node, router, collects; | ||
for (i=0; i<pipeline.length; ++i) { | ||
node = pipeline[i]; | ||
if (!node._collector && node.batch()) { | ||
if (router) { | ||
node = new Collector(graph); | ||
pipeline.splice(i, 0, node); | ||
router = false; | ||
} else { | ||
node._collector = collector; | ||
} | ||
} | ||
if ((collects = node.collector())) collector = node; | ||
router = router || node.router() && !collects; | ||
mutates = mutates || node.mutates(); | ||
} | ||
if (router) pipeline.push(collector = new Collector(graph)); | ||
// Input/output nodes masquerade as collector nodes, so they need to | ||
// have a `data` function. dsData is used if a collector isn't available. | ||
function dsData() { return ds._data; } | ||
pipeline.unshift(this._inputNode); | ||
pipeline.push(this._outputNode); | ||
this._collector = collector; | ||
this._mutates = !!mutates; | ||
this._graph.connect(this._pipeline = pipeline); | ||
return this; | ||
}; | ||
// Input node applies the datasource's delta, and propagates it to | ||
// the rest of the pipeline. It receives touches to reflow data. | ||
var input = new Node(this._graph) | ||
prototype.synchronize = function() { | ||
var data = this._data, i, n; | ||
for (i=0, n=data.length; i<n; ++i) { | ||
Tuple.prev_update(data[i]); | ||
} | ||
if (this._inputNode !== this._collector) { | ||
data = this._collector.data(); | ||
for (i=0, n=data.length; i<n; ++i) { | ||
Tuple.prev_update(data[i]); | ||
} | ||
} | ||
return this; | ||
}; | ||
prototype.listener = function() { | ||
return DataSourceListener(this).addListener(this._inputNode); | ||
}; | ||
prototype.addListener = function(l) { | ||
if (l instanceof DataSource) { | ||
this._collector.addListener(l.listener()); | ||
} else { | ||
this._outputNode.addListener(l); | ||
} | ||
return this; | ||
}; | ||
prototype.removeListener = function(l) { | ||
this._outputNode.removeListener(l); | ||
}; | ||
prototype.listeners = function(ds) { | ||
return (ds ? this._collector : this._outputNode).listeners(); | ||
}; | ||
// Input node applies the datasource's delta, and propagates it to | ||
// the rest of the pipeline. It receives touches to reflow data. | ||
function DataSourceInput(ds) { | ||
var input = new Node(ds._graph) | ||
.router(true) | ||
.collector(true); | ||
input.data = dsData; | ||
input.data = function() { | ||
return ds._data; | ||
}; | ||
@@ -171,4 +208,4 @@ input.evaluate = function(input) { | ||
if (input.reflow) { | ||
delta.mod = delta.mod.concat(Tuple.idFilter(ds._data, | ||
delta.add, delta.mod, delta.rem)); | ||
delta.mod = delta.mod.concat( | ||
Tuple.idFilter(ds._data, delta.add, delta.mod, delta.rem)); | ||
} | ||
@@ -186,8 +223,10 @@ | ||
pipeline.unshift(input); | ||
return input; | ||
} | ||
// Output node captures the last changeset seen by this datasource | ||
// (needed for joins and builds) and materializes any nested data. | ||
// If this datasource is faceted, materializes the values in the facet. | ||
var output = new Node(this._graph) | ||
// Output node captures the last changeset seen by this datasource | ||
// (needed for joins and builds) and materializes any nested data. | ||
// If this datasource is faceted, materializes the values in the facet. | ||
function DataSourceOutput(ds) { | ||
var output = new Node(ds._graph) | ||
.router(true) | ||
@@ -197,5 +236,5 @@ .reflows(true) | ||
output.data = ds._collector ? | ||
ds._collector.data.bind(ds._collector) : | ||
dsData; | ||
output.data = function() { | ||
return ds._collector ? ds._collector.data() : ds._data; | ||
}; | ||
@@ -205,3 +244,3 @@ output.evaluate = function(input) { | ||
var output = ChangeSet.create(input, true); | ||
var out = ChangeSet.create(input, true); | ||
@@ -214,44 +253,25 @@ if (ds._facet) { | ||
ds._output = input; | ||
output.data[ds._name] = 1; | ||
return output; | ||
out.data[ds._name] = 1; | ||
return out; | ||
}; | ||
pipeline.push(output); | ||
return output; | ||
} | ||
this._pipeline = pipeline; | ||
this._graph.connect(ds._pipeline); | ||
return this; | ||
}; | ||
function DataSourceListener(ds) { | ||
var l = new Node(ds._graph).router(true); | ||
prototype.finalize = function() { | ||
if (!this._revises) return; | ||
for (var i=0, n=this._data.length; i<n; ++i) { | ||
var x = this._data[i]; | ||
x._prev = (x._prev === undefined) ? undefined : SENTINEL; | ||
} | ||
}; | ||
prototype.listener = function() { | ||
var l = new Node(this._graph).router(true), | ||
dest = this, | ||
prev = this._revises ? null : undefined; | ||
l.evaluate = function(input) { | ||
if (dest.mutates()) { | ||
// Tuple derivation is expensive. Only do so if dest datasource has | ||
// operators that mutate, and thus would pollute the source data. | ||
dest._srcMap = dest._srcMap || {}; // to propagate tuples correctly | ||
var map = dest._srcMap, | ||
output = ChangeSet.create(input); | ||
// Tuple derivation carries a cost. So only derive if the pipeline has | ||
// operators that mutate, and thus would override the source data. | ||
if (ds.mutates()) { | ||
var map = ds._srcMap || (ds._srcMap = {}), // to propagate tuples correctly | ||
output = ChangeSet.create(input); | ||
output.add = input.add.map(function(t) { | ||
var d = dest._mutates ? | ||
Tuple.derive(t, t._prev !== undefined ? t._prev : prev) : | ||
t; | ||
return (map[t._id] = d); | ||
return (map[t._id] = Tuple.derive(t)); | ||
}); | ||
output.mod = input.mod.map(function(t) { | ||
var o = map[t._id]; | ||
return (o._prev = t._prev, o); | ||
return Tuple.rederive(t, map[t._id]); | ||
}); | ||
@@ -261,44 +281,14 @@ | ||
var o = map[t._id]; | ||
map[t._id] = null; | ||
return (o._prev = t._prev, o); | ||
return (map[t._id] = null, o); | ||
}); | ||
return (dest._input = output); | ||
return (ds._input = output); | ||
} else { | ||
return (dest._input = input); | ||
return (ds._input = input); | ||
} | ||
}; | ||
l.addListener(this._pipeline[0]); | ||
return l; | ||
}; | ||
} | ||
prototype.addListener = function(l) { | ||
if (l instanceof DataSource) { | ||
if (this._collector) { | ||
this._collector.addListener(l.listener()); | ||
} else { | ||
this._pipeline[0].addListener(l.listener()); | ||
} | ||
} else { | ||
this._pipeline[this._pipeline.length-1].addListener(l); | ||
} | ||
return this; | ||
}; | ||
prototype.removeListener = function(l) { | ||
this._pipeline[this._pipeline.length-1].removeListener(l); | ||
}; | ||
prototype.listeners = function(ds) { | ||
if (ds) { | ||
return this._collector ? | ||
this._collector.listeners() : | ||
this._pipeline[0].listeners(); | ||
} else { | ||
return this._pipeline[this._pipeline.length-1].listeners(); | ||
} | ||
}; | ||
module.exports = DataSource; |
@@ -1,6 +0,5 @@ | ||
var util = require('datalib/src/util'), | ||
var dl = require('datalib'), | ||
Heap = require('./Heap'), | ||
ChangeSet = require('./ChangeSet'), | ||
DataSource = require('./DataSource'), | ||
Collector = require('./Collector'), | ||
Signal = require('./Signal'), | ||
@@ -94,3 +93,3 @@ Deps = require('./Dependencies'); | ||
if (!Array.isArray(ref)) { | ||
ref = util.field(ref); | ||
ref = dl.field(ref); | ||
} | ||
@@ -176,51 +175,22 @@ | ||
// Connect a branch of dataflow nodes. | ||
// Dependencies are wired to the nearest collector. | ||
function forEachNode(branch, fn) { | ||
var node, collector, router, i; | ||
prototype.connect = function(branch) { | ||
var collector, node, data, signals, i, n, j, m; | ||
for (i=0; i<branch.length; ++i) { | ||
// connect the pipeline | ||
for (i=0, n=branch.length; i<n; ++i) { | ||
node = branch[i]; | ||
// Share collectors between batch transforms. We can reuse an | ||
// existing collector unless a router node has come after it, | ||
// in which case, we splice in a new collector. | ||
if (!node.data && node.batch()) { | ||
if (router) { | ||
branch.splice(i, 0, (node = new Collector(this))); | ||
router = false; | ||
} else { | ||
node.data = collector.data.bind(collector); | ||
} | ||
} | ||
if (node.collector()) collector = node; | ||
router = router || node.router() && !node.collector(); | ||
fn(node, collector, i); | ||
} | ||
} | ||
prototype.connect = function(branch) { | ||
var graph = this; | ||
forEachNode.call(this, branch, function(n, c, i) { | ||
var data = n.dependency(Deps.DATA), | ||
signals = n.dependency(Deps.SIGNALS); | ||
if (data.length > 0) { | ||
data.forEach(function(d) { | ||
graph.data(d) | ||
.revises(n.revises()) | ||
.addListener(c); | ||
}); | ||
data = node.dependency(Deps.DATA); | ||
for (j=0, m=data.length; j<m; ++j) { | ||
this.data(data[j]).addListener(collector); | ||
} | ||
if (signals.length > 0) { | ||
signals.forEach(function(s) { graph.signal(s).addListener(c); }); | ||
signals = node.dependency(Deps.SIGNALS); | ||
for (j=0, m=signals.length; j<m; ++j) { | ||
this.signal(signals[j]).addListener(collector); | ||
} | ||
if (i > 0) { | ||
branch[i-1].addListener(branch[i]); | ||
} | ||
}); | ||
if (i > 0) branch[i-1].addListener(node); | ||
} | ||
@@ -231,18 +201,20 @@ return branch; | ||
prototype.disconnect = function(branch) { | ||
var graph = this; | ||
var collector, node, data, signals, i, n, j, m; | ||
forEachNode.call(this, branch, function(n, c) { | ||
var data = n.dependency(Deps.DATA), | ||
signals = n.dependency(Deps.SIGNALS); | ||
for (i=0, n=branch.length; i<n; ++i) { | ||
node = branch[i]; | ||
if (node.collector()) collector = node; | ||
if (data.length > 0) { | ||
data.forEach(function(d) { graph.data(d).removeListener(c); }); | ||
data = node.dependency(Deps.DATA); | ||
for (j=0, m=data.length; j<m; ++j) { | ||
this.data(data[j]).removeListener(collector); | ||
} | ||
if (signals.length > 0) { | ||
signals.forEach(function(s) { graph.signal(s).removeListener(c); }); | ||
signals = node.dependency(Deps.SIGNALS); | ||
for (j=0, m=signals.length; j<m; ++j) { | ||
this.signal(signals[j]).removeListener(collector); | ||
} | ||
n.disconnect(); | ||
}); | ||
node.disconnect(); | ||
} | ||
@@ -249,0 +221,0 @@ return branch; |
@@ -8,3 +8,2 @@ module.exports = { | ||
Node: require('./Node'), | ||
Sentinel: require('./Sentinel'), | ||
Signal: require('./Signal'), | ||
@@ -11,0 +10,0 @@ Tuple: require('./Tuple'), |
@@ -11,6 +11,5 @@ var DEPS = require('./Dependencies').ALL, | ||
Collector: 0x02, // Holds a materialized dataset, pulse node to reflow. | ||
Mutates: 0x04, // Sets properties of incoming tuples, | ||
Revises: 0x08, // Requires tuple previous values. | ||
Reflows: 0x10, // Forwards a reflow pulse. | ||
Batch: 0x20 // Performs batch data processing, needs collector. | ||
Mutates: 0x04, // Sets properties of incoming tuples. | ||
Reflows: 0x08, // Forwards a reflow pulse. | ||
Batch: 0x10 // Performs batch data processing, needs collector. | ||
}; | ||
@@ -78,7 +77,2 @@ | ||
prototype.revises = function(state) { | ||
if (!arguments.length) return (this._flags & Flags.Revises); | ||
return this._setf(Flags.Revises, state); | ||
}; | ||
prototype.reflows = function(state) { | ||
@@ -85,0 +79,0 @@ if (!arguments.length) return (this._flags & Flags.Reflows); |
101
src/Tuple.js
@@ -1,67 +0,68 @@ | ||
var util = require('datalib/src/util'), | ||
SENTINEL = require('./Sentinel'), | ||
tupleID = 0; | ||
var tupleID = 0; | ||
// Object.create is expensive. So, when ingesting, trust that the | ||
// datum is an object that has been appropriately sandboxed from | ||
// the outside environment. | ||
function ingest(datum, prev) { | ||
function ingest(datum) { | ||
datum = (datum === Object(datum)) ? datum : {data: datum}; | ||
datum._id = ++tupleID; | ||
datum._prev = (prev !== undefined) ? (prev || SENTINEL) : undefined; | ||
if (datum._prev) datum._prev = null; | ||
return datum; | ||
} | ||
function derive(datum, prev) { | ||
return ingest(Object.create(datum), prev); | ||
function idMap(a, ids) { | ||
ids = ids || {}; | ||
for (var i=0, n=a.length; i<n; ++i) { | ||
ids[a[i]._id] = 1; | ||
} | ||
return ids; | ||
} | ||
// WARNING: operators should only call this once per timestamp! | ||
function set(t, k, v) { | ||
var prev = t[k]; | ||
if (prev === v) return false; | ||
set_prev(t, k); | ||
t[k] = v; | ||
return true; | ||
function copy(t, c) { | ||
c = c || {}; | ||
for (var k in t) { | ||
if (k !== '_prev' && k !== '_id') c[k] = t[k]; | ||
} | ||
return c; | ||
} | ||
function set_prev(t, k) { | ||
if (t._prev === undefined) return; | ||
t._prev = (t._prev === SENTINEL) ? {} : t._prev; | ||
t._prev[k] = t[k]; | ||
} | ||
module.exports = { | ||
ingest: ingest, | ||
idMap: idMap, | ||
function has_prev(t) { | ||
return t._prev && t._prev !== SENTINEL; | ||
} | ||
derive: function(d) { | ||
return ingest(copy(d)); | ||
}, | ||
function reset() { | ||
tupleID = 0; | ||
} | ||
rederive: function(d, t) { | ||
return copy(d, t); | ||
}, | ||
function idMap(a) { | ||
for (var ids={}, i=0, n=a.length; i<n; ++i) { | ||
ids[a[i]._id] = 1; | ||
} | ||
return ids; | ||
} | ||
set: function(t, k, v) { | ||
return t[k] === v ? 0 : (t[k] = v, 1); | ||
}, | ||
function idFilter(data) { | ||
var ids = {}; | ||
for (var i=1, len=arguments.length; i<len; ++i) { | ||
util.extend(ids, idMap(arguments[i])); | ||
} | ||
prev: function(t) { | ||
return t._prev || t; | ||
}, | ||
return data.filter(function(x) { return !ids[x._id]; }); | ||
} | ||
prev_init: function(t) { | ||
if (!t._prev) { t._prev = {_id: t._id}; } | ||
}, | ||
module.exports = { | ||
ingest: ingest, | ||
derive: derive, | ||
set: set, | ||
set_prev: set_prev, | ||
has_prev: has_prev, | ||
reset: reset, | ||
idMap: idMap, | ||
idFilter: idFilter | ||
prev_update: function(t) { | ||
var p = t._prev, k, v; | ||
if (p) for (k in t) { | ||
if (k !== '_prev' && k !== '_id') { | ||
p[k] = ((v=t[k]) instanceof Object && v._prev) ? v._prev : v; | ||
} | ||
} | ||
}, | ||
reset: function() { tupleID = 0; }, | ||
idFilter: function(data) { | ||
var ids = {}; | ||
for (var i=arguments.length; --i>0;) { | ||
idMap(arguments[i], ids); | ||
} | ||
return data.filter(function(x) { return !ids[x._id]; }); | ||
} | ||
}; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Uses eval
Supply chain riskPackage uses dynamic code execution (e.g., eval()), which is a dangerous practice. This can prevent the code from running in certain environments and increases the risk that the code may contain exploits or malicious behavior.
Found 1 instance in 1 package
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
Minified code
QualityThis package contains minified code. This may be harmless in some cases where minified code is included in packaged libraries, however packages on npm should not minify code.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
517096
16
6940
7
2
5
13
Updateddatalib@^1.4.5