vega-dataflow
Advanced tools
Comparing version 1.2.3 to 1.2.4
{ | ||
"name": "vega-dataflow", | ||
"version": "1.2.3", | ||
"version": "1.2.4", | ||
"description": "Vega streaming dataflow graph.", | ||
@@ -5,0 +5,0 @@ "repository": { |
@@ -95,34 +95,17 @@ var log = require('vega-logging'), | ||
this._inputNode = DataSourceInput(this); | ||
this._outputNode = DataSourceOutput(this); | ||
var graph = this._graph, | ||
mutates = 0, | ||
collector = this._inputNode, | ||
i, node, router, collects; | ||
status; | ||
for (i=0; i<pipeline.length; ++i) { | ||
node = pipeline[i]; | ||
pipeline.unshift(this._inputNode = DataSourceInput(this)); | ||
status = graph.preprocess(pipeline); | ||
if (!node._collector && node.batch()) { | ||
if (router) { | ||
node = new Collector(graph); | ||
pipeline.splice(i, 0, node); | ||
router = false; | ||
} else { | ||
node._collector = collector; | ||
} | ||
} | ||
if ((collects = node.collector())) collector = node; | ||
router = router || node.router() && !collects; | ||
mutates = mutates || node.mutates(); | ||
if (status.router) { | ||
pipeline.push(status.collector = new Collector(graph)); | ||
} | ||
if (router) pipeline.push(collector = new Collector(graph)); | ||
pipeline.unshift(this._inputNode); | ||
pipeline.push(this._outputNode); | ||
this._collector = collector; | ||
this._mutates = !!mutates; | ||
this._graph.connect(this._pipeline = pipeline); | ||
pipeline.push(this._outputNode = DataSourceOutput(this)); | ||
this._collector = status.collector; | ||
this._mutates = !!status.mutates; | ||
graph.connect(this._pipeline = pipeline); | ||
return this; | ||
@@ -132,15 +115,3 @@ }; | ||
prototype.synchronize = function() { | ||
var data = this._data, i, n; | ||
for (i=0, n=data.length; i<n; ++i) { | ||
Tuple.prev_update(data[i]); | ||
} | ||
if (this._inputNode !== this._collector) { | ||
data = this._collector.data(); | ||
for (i=0, n=data.length; i<n; ++i) { | ||
Tuple.prev_update(data[i]); | ||
} | ||
} | ||
this._graph.synchronize(this._pipeline); | ||
return this; | ||
@@ -147,0 +118,0 @@ }; |
@@ -5,2 +5,4 @@ var dl = require('datalib'), | ||
DataSource = require('./DataSource'), | ||
Collector = require('./Collector'), | ||
Tuple = require('./Tuple'), | ||
Signal = require('./Signal'), | ||
@@ -162,2 +164,39 @@ Deps = require('./Dependencies'); | ||
// Process a new branch of the dataflow graph prior to connection: | ||
// (1) Insert new Collector nodes as needed. | ||
// (2) Track + return mutation/routing status of the branch. | ||
prototype.preprocess = function(branch) { | ||
var graph = this, | ||
mutates = 0, | ||
node, router, collector, collects; | ||
for (var i=0; i<branch.length; ++i) { | ||
node = branch[i]; | ||
// Batch nodes need access to a materialized dataset. | ||
if (node.batch() && !node._collector) { | ||
if (router || !collector) { | ||
node = new Collector(graph); | ||
branch.splice(i, 0, node); | ||
router = false; | ||
} else { | ||
node._collector = collector; | ||
} | ||
} | ||
if ((collects = node.collector())) collector = node; | ||
router = router || node.router() && !collects; | ||
mutates = mutates || node.mutates(); | ||
// A collector needs to be inserted after tuple-producing | ||
// nodes for correct previous value tracking. | ||
if (node.produces()) { | ||
branch.splice(i+1, 0, new Collector(graph)); | ||
router = false; | ||
} | ||
} | ||
return {router: router, collector: collector, mutates: mutates}; | ||
}; | ||
prototype.connect = function(branch) { | ||
@@ -210,2 +249,21 @@ var collector, node, data, signals, i, n, j, m; | ||
prototype.synchronize = function(branch) { | ||
var ids = {}, | ||
node, data, i, n, j, m, d, id; | ||
for (i=0, n=branch.length; i<n; ++i) { | ||
node = branch[i]; | ||
if (!node.collector()) continue; | ||
for (j=0, data=node.data(), m=data.length; j<m; ++j) { | ||
id = (d = data[j])._id; | ||
if (ids[id]) continue; | ||
Tuple.prev_update(d); | ||
ids[id] = 1; | ||
} | ||
} | ||
return this; | ||
}; | ||
prototype.reevaluate = function(pulse, node) { | ||
@@ -212,0 +270,0 @@ var reflowed = pulse.reflow && node.last() >= pulse.stamp, |
@@ -11,5 +11,6 @@ var DEPS = require('./Dependencies').ALL, | ||
Collector: 0x02, // Holds a materialized dataset, pulse node to reflow. | ||
Mutates: 0x04, // Sets properties of incoming tuples. | ||
Reflows: 0x08, // Forwards a reflow pulse. | ||
Batch: 0x10 // Performs batch data processing, needs collector. | ||
Produces: 0x04, // Produces new tuples. | ||
Mutates: 0x08, // Sets properties of incoming tuples. | ||
Reflows: 0x10, // Forwards a reflow pulse. | ||
Batch: 0x20 // Performs batch data processing, needs collector. | ||
}; | ||
@@ -72,2 +73,7 @@ | ||
prototype.produces = function(state) { | ||
if (!arguments.length) return (this._flags & Flags.Produces); | ||
return this._setf(Flags.Produces, state); | ||
}; | ||
prototype.mutates = function(state) { | ||
@@ -74,0 +80,0 @@ if (!arguments.length) return (this._flags & Flags.Mutates); |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
520758
6978