Socket
Socket
Sign inDemoInstall

vega-dataflow

Package Overview
Dependencies
Maintainers
2
Versions
98
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

vega-dataflow - npm Package Compare versions

Comparing version 1.1.1 to 1.2.0

vega-dataflow.js

5

package.json
{
"name": "vega-dataflow",
"version": "1.1.1",
"version": "1.2.0",
"description": "Vega streaming dataflow graph.",

@@ -19,3 +19,3 @@ "repository": {

"dependencies": {
"datalib": "^1.3.0",
"datalib": "^1.4.5",
"vega-logging": "^1.0"

@@ -26,2 +26,3 @@ },

"browserify-shim": "^3.8.9",
"browserify-versionify": "^1.0.4",
"chai": "^3.0.0",

@@ -28,0 +29,0 @@ "istanbul": "latest",

4

src/Collector.js

@@ -34,4 +34,4 @@ var log = require('vega-logging'),

if (input.reflow) {
input.mod = input.mod.concat(Tuple.idFilter(this._data,
input.add, input.mod, input.rem));
input.mod = input.mod.concat(
Tuple.idFilter(this._data, input.add, input.mod, input.rem));
input.reflow = false;

@@ -38,0 +38,0 @@ }

@@ -5,4 +5,3 @@ var log = require('vega-logging'),

Tuple = require('./Tuple'),
Node = require('./Node'), // jshint ignore:line
SENTINEL = require('./Sentinel');
Node = require('./Node'); // jshint ignore:line

@@ -14,9 +13,10 @@ function DataSource(graph, name, facet) {

this._source = null;
this._facet = facet;
this._input = ChangeSet.create();
this._facet = facet;
this._input = ChangeSet.create();
this._output = null; // Output changeset
this._inputNode = null;
this._outputNode = null;
this._pipeline = null; // Pipeline of transformations.
this._collector = null; // Collector to materialize output of pipeline
this._revises = false; // Does any pipeline operator need to track prev?
this._collector = null; // Collector to materialize output of pipeline.
this._mutates = false; // Does any pipeline operator mutate tuples?

@@ -38,8 +38,3 @@ }

prototype.insert = function(tuples) {
var prev = this._revises ? null : undefined;
var insert = tuples.map(function(d) {
return Tuple.ingest(d, prev);
});
this._input.add = this._input.add.concat(insert);
this._input.add = this._input.add.concat(tuples.map(Tuple.ingest));
return this;

@@ -86,22 +81,2 @@ };

function set_prev(d) {
if (d._prev === undefined) d._prev = SENTINEL;
}
prototype.revises = function(p) {
if (!arguments.length) return this._revises;
// If we've not needed prev in the past, but a new dataflow node needs it now
// ensure existing tuples have prev set.
if (!this._revises && p) {
this._data.forEach(set_prev);
// New tuples that haven't yet been merged into _data
this._input.add.forEach(set_prev);
}
this._revises = this._revises || p;
return this;
};
prototype.mutates = function(m) {

@@ -126,23 +101,85 @@ if (!arguments.length) return this._mutates;

var ds = this;
this._inputNode = DataSourceInput(this);
this._outputNode = DataSourceOutput(this);
// Add a collector to materialize the output of pipeline operators.
if (pipeline.length) {
ds._collector = new Collector(this._graph);
pipeline.push(ds._collector);
ds._revises = pipeline.some(function(p) { return p.revises(); });
ds._mutates = pipeline.some(function(p) { return p.mutates(); });
var graph = this._graph,
mutates = 0,
collector = this._inputNode,
i, node, router, collects;
for (i=0; i<pipeline.length; ++i) {
node = pipeline[i];
if (!node._collector && node.batch()) {
if (router) {
node = new Collector(graph);
pipeline.splice(i, 0, node);
router = false;
} else {
node._collector = collector;
}
}
if ((collects = node.collector())) collector = node;
router = router || node.router() && !collects;
mutates = mutates || node.mutates();
}
if (router) pipeline.push(collector = new Collector(graph));
// Input/output nodes masquerade as collector nodes, so they need to
// have a `data` function. dsData is used if a collector isn't available.
function dsData() { return ds._data; }
pipeline.unshift(this._inputNode);
pipeline.push(this._outputNode);
this._collector = collector;
this._mutates = !!mutates;
this._graph.connect(this._pipeline = pipeline);
return this;
};
// Input node applies the datasource's delta, and propagates it to
// the rest of the pipeline. It receives touches to reflow data.
var input = new Node(this._graph)
prototype.synchronize = function() {
var data = this._data, i, n;
for (i=0, n=data.length; i<n; ++i) {
Tuple.prev_update(data[i]);
}
if (this._inputNode !== this._collector) {
data = this._collector.data();
for (i=0, n=data.length; i<n; ++i) {
Tuple.prev_update(data[i]);
}
}
return this;
};
prototype.listener = function() {
return DataSourceListener(this).addListener(this._inputNode);
};
prototype.addListener = function(l) {
if (l instanceof DataSource) {
this._collector.addListener(l.listener());
} else {
this._outputNode.addListener(l);
}
return this;
};
prototype.removeListener = function(l) {
this._outputNode.removeListener(l);
};
prototype.listeners = function(ds) {
return (ds ? this._collector : this._outputNode).listeners();
};
// Input node applies the datasource's delta, and propagates it to
// the rest of the pipeline. It receives touches to reflow data.
function DataSourceInput(ds) {
var input = new Node(ds._graph)
.router(true)
.collector(true);
input.data = dsData;
input.data = function() {
return ds._data;
};

@@ -171,4 +208,4 @@ input.evaluate = function(input) {

if (input.reflow) {
delta.mod = delta.mod.concat(Tuple.idFilter(ds._data,
delta.add, delta.mod, delta.rem));
delta.mod = delta.mod.concat(
Tuple.idFilter(ds._data, delta.add, delta.mod, delta.rem));
}

@@ -186,8 +223,10 @@

pipeline.unshift(input);
return input;
}
// Output node captures the last changeset seen by this datasource
// (needed for joins and builds) and materializes any nested data.
// If this datasource is faceted, materializes the values in the facet.
var output = new Node(this._graph)
// Output node captures the last changeset seen by this datasource
// (needed for joins and builds) and materializes any nested data.
// If this datasource is faceted, materializes the values in the facet.
function DataSourceOutput(ds) {
var output = new Node(ds._graph)
.router(true)

@@ -197,5 +236,5 @@ .reflows(true)

output.data = ds._collector ?
ds._collector.data.bind(ds._collector) :
dsData;
output.data = function() {
return ds._collector ? ds._collector.data() : ds._data;
};

@@ -205,3 +244,3 @@ output.evaluate = function(input) {

var output = ChangeSet.create(input, true);
var out = ChangeSet.create(input, true);

@@ -214,44 +253,25 @@ if (ds._facet) {

ds._output = input;
output.data[ds._name] = 1;
return output;
out.data[ds._name] = 1;
return out;
};
pipeline.push(output);
return output;
}
this._pipeline = pipeline;
this._graph.connect(ds._pipeline);
return this;
};
function DataSourceListener(ds) {
var l = new Node(ds._graph).router(true);
prototype.finalize = function() {
if (!this._revises) return;
for (var i=0, n=this._data.length; i<n; ++i) {
var x = this._data[i];
x._prev = (x._prev === undefined) ? undefined : SENTINEL;
}
};
prototype.listener = function() {
var l = new Node(this._graph).router(true),
dest = this,
prev = this._revises ? null : undefined;
l.evaluate = function(input) {
if (dest.mutates()) {
// Tuple derivation is expensive. Only do so if dest datasource has
// operators that mutate, and thus would pollute the source data.
dest._srcMap = dest._srcMap || {}; // to propagate tuples correctly
var map = dest._srcMap,
output = ChangeSet.create(input);
// Tuple derivation carries a cost. So only derive if the pipeline has
// operators that mutate, and thus would override the source data.
if (ds.mutates()) {
var map = ds._srcMap || (ds._srcMap = {}), // to propagate tuples correctly
output = ChangeSet.create(input);
output.add = input.add.map(function(t) {
var d = dest._mutates ?
Tuple.derive(t, t._prev !== undefined ? t._prev : prev) :
t;
return (map[t._id] = d);
return (map[t._id] = Tuple.derive(t));
});
output.mod = input.mod.map(function(t) {
var o = map[t._id];
return (o._prev = t._prev, o);
return Tuple.rederive(t, map[t._id]);
});

@@ -261,44 +281,14 @@

var o = map[t._id];
map[t._id] = null;
return (o._prev = t._prev, o);
return (map[t._id] = null, o);
});
return (dest._input = output);
return (ds._input = output);
} else {
return (dest._input = input);
return (ds._input = input);
}
};
l.addListener(this._pipeline[0]);
return l;
};
}
prototype.addListener = function(l) {
if (l instanceof DataSource) {
if (this._collector) {
this._collector.addListener(l.listener());
} else {
this._pipeline[0].addListener(l.listener());
}
} else {
this._pipeline[this._pipeline.length-1].addListener(l);
}
return this;
};
prototype.removeListener = function(l) {
this._pipeline[this._pipeline.length-1].removeListener(l);
};
prototype.listeners = function(ds) {
if (ds) {
return this._collector ?
this._collector.listeners() :
this._pipeline[0].listeners();
} else {
return this._pipeline[this._pipeline.length-1].listeners();
}
};
module.exports = DataSource;

@@ -1,6 +0,5 @@

var util = require('datalib/src/util'),
var dl = require('datalib'),
Heap = require('./Heap'),
ChangeSet = require('./ChangeSet'),
DataSource = require('./DataSource'),
Collector = require('./Collector'),
Signal = require('./Signal'),

@@ -94,3 +93,3 @@ Deps = require('./Dependencies');

if (!Array.isArray(ref)) {
ref = util.field(ref);
ref = dl.field(ref);
}

@@ -176,51 +175,22 @@

// Connect a branch of dataflow nodes.
// Dependencies are wired to the nearest collector.
function forEachNode(branch, fn) {
var node, collector, router, i;
prototype.connect = function(branch) {
var collector, node, data, signals, i, n, j, m;
for (i=0; i<branch.length; ++i) {
// connect the pipeline
for (i=0, n=branch.length; i<n; ++i) {
node = branch[i];
// Share collectors between batch transforms. We can reuse an
// existing collector unless a router node has come after it,
// in which case, we splice in a new collector.
if (!node.data && node.batch()) {
if (router) {
branch.splice(i, 0, (node = new Collector(this)));
router = false;
} else {
node.data = collector.data.bind(collector);
}
}
if (node.collector()) collector = node;
router = router || node.router() && !node.collector();
fn(node, collector, i);
}
}
prototype.connect = function(branch) {
var graph = this;
forEachNode.call(this, branch, function(n, c, i) {
var data = n.dependency(Deps.DATA),
signals = n.dependency(Deps.SIGNALS);
if (data.length > 0) {
data.forEach(function(d) {
graph.data(d)
.revises(n.revises())
.addListener(c);
});
data = node.dependency(Deps.DATA);
for (j=0, m=data.length; j<m; ++j) {
this.data(data[j]).addListener(collector);
}
if (signals.length > 0) {
signals.forEach(function(s) { graph.signal(s).addListener(c); });
signals = node.dependency(Deps.SIGNALS);
for (j=0, m=signals.length; j<m; ++j) {
this.signal(signals[j]).addListener(collector);
}
if (i > 0) {
branch[i-1].addListener(branch[i]);
}
});
if (i > 0) branch[i-1].addListener(node);
}

@@ -231,18 +201,20 @@ return branch;

prototype.disconnect = function(branch) {
var graph = this;
var collector, node, data, signals, i, n, j, m;
forEachNode.call(this, branch, function(n, c) {
var data = n.dependency(Deps.DATA),
signals = n.dependency(Deps.SIGNALS);
for (i=0, n=branch.length; i<n; ++i) {
node = branch[i];
if (node.collector()) collector = node;
if (data.length > 0) {
data.forEach(function(d) { graph.data(d).removeListener(c); });
data = node.dependency(Deps.DATA);
for (j=0, m=data.length; j<m; ++j) {
this.data(data[j]).removeListener(collector);
}
if (signals.length > 0) {
signals.forEach(function(s) { graph.signal(s).removeListener(c); });
signals = node.dependency(Deps.SIGNALS);
for (j=0, m=signals.length; j<m; ++j) {
this.signal(signals[j]).removeListener(collector);
}
n.disconnect();
});
node.disconnect();
}

@@ -249,0 +221,0 @@ return branch;

@@ -8,3 +8,2 @@ module.exports = {

Node: require('./Node'),
Sentinel: require('./Sentinel'),
Signal: require('./Signal'),

@@ -11,0 +10,0 @@ Tuple: require('./Tuple'),

@@ -11,6 +11,5 @@ var DEPS = require('./Dependencies').ALL,

Collector: 0x02, // Holds a materialized dataset, pulse node to reflow.
Mutates: 0x04, // Sets properties of incoming tuples,
Revises: 0x08, // Requires tuple previous values.
Reflows: 0x10, // Forwards a reflow pulse.
Batch: 0x20 // Performs batch data processing, needs collector.
Mutates: 0x04, // Sets properties of incoming tuples.
Reflows: 0x08, // Forwards a reflow pulse.
Batch: 0x10 // Performs batch data processing, needs collector.
};

@@ -78,7 +77,2 @@

prototype.revises = function(state) {
if (!arguments.length) return (this._flags & Flags.Revises);
return this._setf(Flags.Revises, state);
};
prototype.reflows = function(state) {

@@ -85,0 +79,0 @@ if (!arguments.length) return (this._flags & Flags.Reflows);

@@ -1,67 +0,68 @@

var util = require('datalib/src/util'),
SENTINEL = require('./Sentinel'),
tupleID = 0;
var tupleID = 0;
// Object.create is expensive. So, when ingesting, trust that the
// datum is an object that has been appropriately sandboxed from
// the outside environment.
function ingest(datum, prev) {
function ingest(datum) {
datum = (datum === Object(datum)) ? datum : {data: datum};
datum._id = ++tupleID;
datum._prev = (prev !== undefined) ? (prev || SENTINEL) : undefined;
if (datum._prev) datum._prev = null;
return datum;
}
function derive(datum, prev) {
return ingest(Object.create(datum), prev);
function idMap(a, ids) {
ids = ids || {};
for (var i=0, n=a.length; i<n; ++i) {
ids[a[i]._id] = 1;
}
return ids;
}
// WARNING: operators should only call this once per timestamp!
function set(t, k, v) {
var prev = t[k];
if (prev === v) return false;
set_prev(t, k);
t[k] = v;
return true;
function copy(t, c) {
c = c || {};
for (var k in t) {
if (k !== '_prev' && k !== '_id') c[k] = t[k];
}
return c;
}
function set_prev(t, k) {
if (t._prev === undefined) return;
t._prev = (t._prev === SENTINEL) ? {} : t._prev;
t._prev[k] = t[k];
}
module.exports = {
ingest: ingest,
idMap: idMap,
function has_prev(t) {
return t._prev && t._prev !== SENTINEL;
}
derive: function(d) {
return ingest(copy(d));
},
function reset() {
tupleID = 0;
}
rederive: function(d, t) {
return copy(d, t);
},
function idMap(a) {
for (var ids={}, i=0, n=a.length; i<n; ++i) {
ids[a[i]._id] = 1;
}
return ids;
}
set: function(t, k, v) {
return t[k] === v ? 0 : (t[k] = v, 1);
},
function idFilter(data) {
var ids = {};
for (var i=1, len=arguments.length; i<len; ++i) {
util.extend(ids, idMap(arguments[i]));
}
prev: function(t) {
return t._prev || t;
},
return data.filter(function(x) { return !ids[x._id]; });
}
prev_init: function(t) {
if (!t._prev) { t._prev = {_id: t._id}; }
},
module.exports = {
ingest: ingest,
derive: derive,
set: set,
set_prev: set_prev,
has_prev: has_prev,
reset: reset,
idMap: idMap,
idFilter: idFilter
prev_update: function(t) {
var p = t._prev, k, v;
if (p) for (k in t) {
if (k !== '_prev' && k !== '_id') {
p[k] = ((v=t[k]) instanceof Object && v._prev) ? v._prev : v;
}
}
},
reset: function() { tupleID = 0; },
idFilter: function(data) {
var ids = {};
for (var i=arguments.length; --i>0;) {
idMap(arguments[i], ids);
}
return data.filter(function(x) { return !ids[x._id]; });
}
};
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc