vega-dataflow
Advanced tools
Comparing version
{ | ||
"name": "vega-dataflow", | ||
"version": "1.3.2", | ||
"version": "1.4.0", | ||
"description": "Vega streaming dataflow graph.", | ||
@@ -5,0 +5,0 @@ "repository": { |
@@ -1,3 +0,4 @@ | ||
var log = require('vega-logging'), | ||
ChangeSet = require('./ChangeSet'), | ||
var dl = require('datalib'), | ||
log = require('vega-logging'), | ||
ChangeSet = require('./ChangeSet'), | ||
Collector = require('./Collector'), | ||
@@ -15,2 +16,4 @@ Tuple = require('./Tuple'), | ||
this._output = null; // Output changeset | ||
this._indexes = {}; | ||
this._indexFields = []; | ||
@@ -115,2 +118,29 @@ this._inputNode = null; | ||
prototype.synchronize = function() { | ||
// update indices | ||
var pulse = this.last(), | ||
fields = this._indexFields, | ||
i, j, f, key, index, value; | ||
for (i=0; i<fields.length; ++i) { | ||
key = fields[i]; | ||
index = this._indexes[key]; | ||
f = dl.$(key); | ||
for (j=0; j<pulse.add.length; ++j) { | ||
value = f(pulse.add[j]); | ||
Tuple.prev_init(pulse.add[j]); | ||
index[value] = (index[value] || 0) + 1; | ||
} | ||
for (j=0; j<pulse.rem.length; ++j) { | ||
value = f(pulse.rem[j]); | ||
index[value] = (index[value] || 0) - 1; | ||
} | ||
for (j=0; j<pulse.mod.length; ++j) { | ||
value = f(pulse.mod[j]._prev); | ||
index[value] = (index[value] || 0) - 1; | ||
value = f(pulse.mod[j]); | ||
index[value] = (index[value] || 0) + 1; | ||
} | ||
} | ||
this._graph.synchronize(this._pipeline); | ||
@@ -120,3 +150,21 @@ return this; | ||
prototype.listener = function() { | ||
prototype.getIndex = function(field) { | ||
var data = this.values(), | ||
f = dl.$(field), | ||
index, i, len, value; | ||
if (!this._indexes[field]) { | ||
index = {}; | ||
this._indexes[field] = index; | ||
this._indexFields.push(field); | ||
for (i=0, len=data.length; i<len; ++i) { | ||
value = f(this._data[i]); | ||
index[value] = (index[value] || 0) + 1; | ||
Tuple.prev_init(this._data[i]); | ||
} | ||
} | ||
return this._indexes[field]; | ||
}; | ||
prototype.listener = function() { | ||
return DataSourceListener(this).addListener(this._inputNode); | ||
@@ -129,3 +177,3 @@ }; | ||
} else { | ||
this._outputNode.addListener(l); | ||
this._outputNode.addListener(l); | ||
} | ||
@@ -143,3 +191,3 @@ return this; | ||
// Input node applies the datasource's delta, and propagates it to | ||
// Input node applies the datasource's delta, and propagates it to | ||
// the rest of the pipeline. It receives touches to reflow data. | ||
@@ -158,3 +206,3 @@ function DataSourceInput(ds) { | ||
var delta = ds._input, | ||
var delta = ds._input, | ||
out = ChangeSet.create(input), f; | ||
@@ -189,3 +237,3 @@ | ||
out.add = delta.add; | ||
out.add = delta.add; | ||
out.mod = delta.mod; | ||
@@ -237,3 +285,3 @@ out.rem = delta.rem; | ||
// operators that mutate, and thus would override the source data. | ||
if (ds.mutates()) { | ||
if (ds.mutates()) { | ||
var map = ds._srcMap || (ds._srcMap = {}), // to propagate tuples correctly | ||
@@ -250,3 +298,3 @@ output = ChangeSet.create(input); | ||
output.rem = input.rem.map(function(t) { | ||
output.rem = input.rem.map(function(t) { | ||
var o = map[t._id]; | ||
@@ -253,0 +301,0 @@ return (map[t._id] = null, o); |
@@ -96,3 +96,3 @@ var dl = require('datalib'), | ||
// which need to be populated during the same cycle even though propagation has | ||
// passed that part of the dataflow graph. | ||
// passed that part of the dataflow graph. | ||
prototype.propagate = function(pulse, node, stamp) { | ||
@@ -131,3 +131,3 @@ var pulses = {}, | ||
// Propagate the pulse. | ||
// Propagate the pulse. | ||
if (pulse !== this.doNotPropagate) { | ||
@@ -147,3 +147,3 @@ // Ensure reflow pulses always send reflow pulses even if skipped. | ||
// We've already queued this node. Ensure there should be at most one | ||
// pulse with tuples (add/mod/rem), and the remainder will be reflows. | ||
// pulse with tuples (add/mod/rem), and the remainder will be reflows. | ||
tpls = pulse.add.length || pulse.mod.length || pulse.rem.length; | ||
@@ -154,3 +154,3 @@ ntpls = nplse.add.length || nplse.mod.length || nplse.rem.length; | ||
// Combine reflow and tuples into a single pulse. | ||
// Combine reflow and tuples into a single pulse. | ||
pulses[next._id] = tpls ? pulse : nplse; | ||
@@ -171,3 +171,3 @@ pulses[next._id].reflow = pulse.reflow || nplse.reflow; | ||
// Perform final bookkeeping on the graph, after propagation is complete. | ||
// Perform final bookkeeping on the graph, after propagation is complete. | ||
// - For all updated datasources, synchronize their previous values. | ||
@@ -181,3 +181,3 @@ prototype.done = function(pulse) { | ||
// Process a new branch of the dataflow graph prior to connection: | ||
// (1) Insert new Collector nodes as needed. | ||
// (1) Insert new Collector nodes as needed. | ||
// (2) Track + return mutation/routing status of the branch. | ||
@@ -192,3 +192,3 @@ prototype.preprocess = function(branch) { | ||
// Batch nodes need access to a materialized dataset. | ||
// Batch nodes need access to a materialized dataset. | ||
if (node.batch() && !node._collector) { | ||
@@ -284,5 +284,5 @@ if (router || !collector) { | ||
id = (d = data[j])._id; | ||
if (ids[id]) continue; | ||
if (ids[id]) continue; | ||
Tuple.prev_update(d); | ||
ids[id] = 1; | ||
ids[id] = 1; | ||
} | ||
@@ -289,0 +289,0 @@ } |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
530259
1.58%7148
1.94%