vega-dataflow
Advanced tools
Comparing version 1.3.2 to 1.4.0
{ | ||
"name": "vega-dataflow", | ||
"version": "1.3.2", | ||
"version": "1.4.0", | ||
"description": "Vega streaming dataflow graph.", | ||
@@ -5,0 +5,0 @@ "repository": { |
@@ -1,3 +0,4 @@ | ||
var log = require('vega-logging'), | ||
ChangeSet = require('./ChangeSet'), | ||
var dl = require('datalib'), | ||
log = require('vega-logging'), | ||
ChangeSet = require('./ChangeSet'), | ||
Collector = require('./Collector'), | ||
@@ -15,2 +16,4 @@ Tuple = require('./Tuple'), | ||
this._output = null; // Output changeset | ||
this._indexes = {}; | ||
this._indexFields = []; | ||
@@ -115,2 +118,29 @@ this._inputNode = null; | ||
prototype.synchronize = function() { | ||
// update indices | ||
var pulse = this.last(), | ||
fields = this._indexFields, | ||
i, j, f, key, index, value; | ||
for (i=0; i<fields.length; ++i) { | ||
key = fields[i]; | ||
index = this._indexes[key]; | ||
f = dl.$(key); | ||
for (j=0; j<pulse.add.length; ++j) { | ||
value = f(pulse.add[j]); | ||
Tuple.prev_init(pulse.add[j]); | ||
index[value] = (index[value] || 0) + 1; | ||
} | ||
for (j=0; j<pulse.rem.length; ++j) { | ||
value = f(pulse.rem[j]); | ||
index[value] = (index[value] || 0) - 1; | ||
} | ||
for (j=0; j<pulse.mod.length; ++j) { | ||
value = f(pulse.mod[j]._prev); | ||
index[value] = (index[value] || 0) - 1; | ||
value = f(pulse.mod[j]); | ||
index[value] = (index[value] || 0) + 1; | ||
} | ||
} | ||
this._graph.synchronize(this._pipeline); | ||
@@ -120,3 +150,21 @@ return this; | ||
prototype.listener = function() { | ||
prototype.getIndex = function(field) { | ||
var data = this.values(), | ||
f = dl.$(field), | ||
index, i, len, value; | ||
if (!this._indexes[field]) { | ||
index = {}; | ||
this._indexes[field] = index; | ||
this._indexFields.push(field); | ||
for (i=0, len=data.length; i<len; ++i) { | ||
value = f(this._data[i]); | ||
index[value] = (index[value] || 0) + 1; | ||
Tuple.prev_init(this._data[i]); | ||
} | ||
} | ||
return this._indexes[field]; | ||
}; | ||
prototype.listener = function() { | ||
return DataSourceListener(this).addListener(this._inputNode); | ||
@@ -129,3 +177,3 @@ }; | ||
} else { | ||
this._outputNode.addListener(l); | ||
this._outputNode.addListener(l); | ||
} | ||
@@ -143,3 +191,3 @@ return this; | ||
// Input node applies the datasource's delta, and propagates it to | ||
// Input node applies the datasource's delta, and propagates it to | ||
// the rest of the pipeline. It receives touches to reflow data. | ||
@@ -158,3 +206,3 @@ function DataSourceInput(ds) { | ||
var delta = ds._input, | ||
var delta = ds._input, | ||
out = ChangeSet.create(input), f; | ||
@@ -189,3 +237,3 @@ | ||
out.add = delta.add; | ||
out.add = delta.add; | ||
out.mod = delta.mod; | ||
@@ -237,3 +285,3 @@ out.rem = delta.rem; | ||
// operators that mutate, and thus would override the source data. | ||
if (ds.mutates()) { | ||
if (ds.mutates()) { | ||
var map = ds._srcMap || (ds._srcMap = {}), // to propagate tuples correctly | ||
@@ -250,3 +298,3 @@ output = ChangeSet.create(input); | ||
output.rem = input.rem.map(function(t) { | ||
output.rem = input.rem.map(function(t) { | ||
var o = map[t._id]; | ||
@@ -253,0 +301,0 @@ return (map[t._id] = null, o); |
@@ -96,3 +96,3 @@ var dl = require('datalib'), | ||
// which need to be populated during the same cycle even though propagation has | ||
// passed that part of the dataflow graph. | ||
// passed that part of the dataflow graph. | ||
prototype.propagate = function(pulse, node, stamp) { | ||
@@ -131,3 +131,3 @@ var pulses = {}, | ||
// Propagate the pulse. | ||
// Propagate the pulse. | ||
if (pulse !== this.doNotPropagate) { | ||
@@ -147,3 +147,3 @@ // Ensure reflow pulses always send reflow pulses even if skipped. | ||
// We've already queued this node. Ensure there should be at most one | ||
// pulse with tuples (add/mod/rem), and the remainder will be reflows. | ||
// pulse with tuples (add/mod/rem), and the remainder will be reflows. | ||
tpls = pulse.add.length || pulse.mod.length || pulse.rem.length; | ||
@@ -154,3 +154,3 @@ ntpls = nplse.add.length || nplse.mod.length || nplse.rem.length; | ||
// Combine reflow and tuples into a single pulse. | ||
// Combine reflow and tuples into a single pulse. | ||
pulses[next._id] = tpls ? pulse : nplse; | ||
@@ -171,3 +171,3 @@ pulses[next._id].reflow = pulse.reflow || nplse.reflow; | ||
// Perform final bookkeeping on the graph, after propagation is complete. | ||
// Perform final bookkeeping on the graph, after propagation is complete. | ||
// - For all updated datasources, synchronize their previous values. | ||
@@ -181,3 +181,3 @@ prototype.done = function(pulse) { | ||
// Process a new branch of the dataflow graph prior to connection: | ||
// (1) Insert new Collector nodes as needed. | ||
// (1) Insert new Collector nodes as needed. | ||
// (2) Track + return mutation/routing status of the branch. | ||
@@ -192,3 +192,3 @@ prototype.preprocess = function(branch) { | ||
// Batch nodes need access to a materialized dataset. | ||
// Batch nodes need access to a materialized dataset. | ||
if (node.batch() && !node._collector) { | ||
@@ -284,5 +284,5 @@ if (router || !collector) { | ||
id = (d = data[j])._id; | ||
if (ids[id]) continue; | ||
if (ids[id]) continue; | ||
Tuple.prev_update(d); | ||
ids[id] = 1; | ||
ids[id] = 1; | ||
} | ||
@@ -289,0 +289,0 @@ } |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
530259
7148