vega-dataflow
Advanced tools
Comparing version 1.4.0 to 1.4.1
{ | ||
"name": "vega-dataflow", | ||
"version": "1.4.0", | ||
"version": "1.4.1", | ||
"description": "Vega streaming dataflow graph.", | ||
@@ -5,0 +5,0 @@ "repository": { |
@@ -117,29 +117,2 @@ var dl = require('datalib'), | ||
prototype.synchronize = function() { | ||
// update indices | ||
var pulse = this.last(), | ||
fields = this._indexFields, | ||
i, j, f, key, index, value; | ||
for (i=0; i<fields.length; ++i) { | ||
key = fields[i]; | ||
index = this._indexes[key]; | ||
f = dl.$(key); | ||
for (j=0; j<pulse.add.length; ++j) { | ||
value = f(pulse.add[j]); | ||
Tuple.prev_init(pulse.add[j]); | ||
index[value] = (index[value] || 0) + 1; | ||
} | ||
for (j=0; j<pulse.rem.length; ++j) { | ||
value = f(pulse.rem[j]); | ||
index[value] = (index[value] || 0) - 1; | ||
} | ||
for (j=0; j<pulse.mod.length; ++j) { | ||
value = f(pulse.mod[j]._prev); | ||
index[value] = (index[value] || 0) - 1; | ||
value = f(pulse.mod[j]); | ||
index[value] = (index[value] || 0) + 1; | ||
} | ||
} | ||
this._graph.synchronize(this._pipeline); | ||
@@ -159,5 +132,5 @@ return this; | ||
for (i=0, len=data.length; i<len; ++i) { | ||
value = f(this._data[i]); | ||
value = f(data[i]); | ||
index[value] = (index[value] || 0) + 1; | ||
Tuple.prev_init(this._data[i]); | ||
Tuple.prev_init(data[i]); | ||
} | ||
@@ -252,2 +225,29 @@ } | ||
function updateIndices(pulse) { | ||
var fields = ds._indexFields, | ||
i, j, f, key, index, value; | ||
for (i=0; i<fields.length; ++i) { | ||
key = fields[i]; | ||
index = ds._indexes[key]; | ||
f = dl.$(key); | ||
for (j=0; j<pulse.add.length; ++j) { | ||
value = f(pulse.add[j]); | ||
Tuple.prev_init(pulse.add[j]); | ||
index[value] = (index[value] || 0) + 1; | ||
} | ||
for (j=0; j<pulse.rem.length; ++j) { | ||
value = f(pulse.rem[j]); | ||
index[value] = (index[value] || 0) - 1; | ||
} | ||
for (j=0; j<pulse.mod.length; ++j) { | ||
value = f(pulse.mod[j]._prev); | ||
index[value] = (index[value] || 0) - 1; | ||
value = f(pulse.mod[j]); | ||
index[value] = (index[value] || 0) + 1; | ||
} | ||
} | ||
} | ||
output.data = function() { | ||
@@ -260,2 +260,3 @@ return ds._collector ? ds._collector.data() : ds._data; | ||
updateIndices(input); | ||
var out = ChangeSet.create(input, true); | ||
@@ -262,0 +263,0 @@ |
@@ -96,6 +96,9 @@ var dl = require('datalib'), | ||
// which need to be populated during the same cycle even though propagation has | ||
// passed that part of the dataflow graph. | ||
prototype.propagate = function(pulse, node, stamp) { | ||
// passed that part of the dataflow graph. | ||
// If skipSignals is true, Signal nodes do not get reevaluated but their listeners | ||
// are queued for propagation. This is useful when setting signal values in batch | ||
// (e.g., time travel to the initial state). | ||
prototype.propagate = function(pulse, node, stamp, skipSignals) { | ||
var pulses = {}, | ||
listeners, next, nplse, tpls, ntpls, i, len; | ||
listeners, next, nplse, tpls, ntpls, i, len, isSg; | ||
@@ -119,2 +122,3 @@ // new PQ with each propagation cycle so that we can pulse branches | ||
node = pq.peek(); | ||
isSg = node instanceof Signal; | ||
pulse = pulses[node._id]; | ||
@@ -130,4 +134,7 @@ | ||
listeners = node._listeners; | ||
pulse = this.evaluate(pulse, node); | ||
if (!isSg || (isSg && !skipSignals)) { | ||
pulse = this.evaluate(pulse, node); | ||
} | ||
// Propagate the pulse. | ||
@@ -134,0 +141,0 @@ if (pulse !== this.doNotPropagate) { |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
523613
7062