vega-dataflow
Advanced tools
Comparing version 1.0.2 to 1.0.3
{ | ||
"name": "vega-dataflow", | ||
"version": "1.0.2", | ||
"version": "1.0.3", | ||
"description": "Vega streaming dataflow graph.", | ||
@@ -5,0 +5,0 @@ "repository": { |
@@ -106,22 +106,2 @@ var util = require('datalib/src/util'), | ||
var schedule = function(a, b) { | ||
if (a.rank !== b.rank) { | ||
// Topological sort | ||
return a.rank - b.rank; | ||
} else { | ||
// If queueing multiple pulses to the same node, then there will be | ||
// at most one pulse with a changeset (add/mod/rem), and the remainder | ||
// will be reflows. Combine the changeset and reflows into a single pulse | ||
// and queue that first. Subsequent reflow-only pulses will be pruned. | ||
var pa = a.pulse, pb = b.pulse, | ||
paCS = pa.add.length || pa.mod.length || pa.rem.length, | ||
pbCS = pb.add.length || pb.mod.length || pb.rem.length; | ||
pa.reflow = pb.reflow = pa.reflow || pb.reflow; | ||
if (paCS && pbCS) throw Error('Both pulses have changesets.'); | ||
return paCS ? -1 : 1; | ||
} | ||
}; | ||
// Stamp should be specified with caution. It is necessary for inline datasources, | ||
@@ -131,3 +111,4 @@ // which need to be populated during the same cycle even though propagation has | ||
prototype.propagate = function(pulse, node, stamp) { | ||
var v, l, n, p, r, i, len, reflowed; | ||
var pulses = {}, | ||
listeners, next, nplse, tpls, ntpls, i, len; | ||
@@ -137,3 +118,6 @@ // new PQ with each propagation cycle so that we can pulse branches | ||
// a new inline datasource). | ||
var pq = new Heap(schedule); | ||
var pq = new Heap(function(a, b) { | ||
// Topological sort on qrank as rank may change during propagation. | ||
return a.qrank() - b.qrank(); | ||
}); | ||
@@ -143,30 +127,47 @@ if (pulse.stamp) throw Error('Pulse already has a non-zero stamp.'); | ||
pulse.stamp = stamp || ++this._stamp; | ||
pq.push({node: node, pulse: pulse, rank: node.rank()}); | ||
pulses[node._id] = pulse; | ||
pq.push(node.qrank(true)); | ||
while (pq.size() > 0) { | ||
v = pq.peek(); | ||
n = v.node; | ||
p = v.pulse; | ||
reflowed = p.reflow && n.last() >= p.stamp; | ||
node = pq.peek(); | ||
pulse = pulses[node._id]; | ||
if (reflowed) { | ||
// Don't needlessly reflow ops. | ||
pq.pop(); | ||
} else if (v.rank !== (r = n.rank())) { | ||
if (node.rank() !== node.qrank()) { | ||
// A node's rank might change during a propagation. Re-queue if so. | ||
v.rank = r; | ||
pq.replace(v); | ||
pq.replace(node.qrank(true)); | ||
} else { | ||
// Evaluate node and propagate pulse. | ||
pq.pop(); | ||
l = n._listeners; | ||
p = this.evaluate(p, n); | ||
pulses[node._id] = null; | ||
listeners = node._listeners; | ||
pulse = this.evaluate(pulse, node); | ||
// Propagate the pulse. | ||
if (p !== this.doNotPropagate) { | ||
if (!p.reflow && n.reflows()) { // If skipped eval of reflows node | ||
p = ChangeSet.create(p, true); | ||
if (pulse !== this.doNotPropagate) { | ||
// Ensure reflow pulses always send reflow pulses even if skipped. | ||
if (!pulse.reflow && node.reflows()) { | ||
pulse = ChangeSet.create(pulse, true); | ||
} | ||
for (i=0, len=l.length; i<len; ++i) { | ||
pq.push({node: l[i], pulse: p, rank: l[i]._rank}); | ||
for (i=0, len=listeners.length; i<len; ++i) { | ||
next = listeners[i]; | ||
if ((nplse = pulses[next._id]) !== undefined) { | ||
if (nplse === null) throw Error('Already propagated to node.'); | ||
// We've already queued this node. Ensure there should be at most one | ||
// pulse with tuples (add/mod/rem), and the remainder will be reflows. | ||
tpls = pulse.add.length || pulse.mod.length || pulse.rem.length; | ||
ntpls = nplse.add.length || nplse.mod.length || nplse.rem.length; | ||
if (tpls && ntpls) throw Error('Multiple changeset pulses to same node'); | ||
// Combine reflow and tuples into a single pulse. | ||
pulses[next._id] = tpls ? pulse : nplse; | ||
pulses[next._id].reflow = pulse.reflow || nplse.reflow; | ||
} else { | ||
// First time we're seeing this node, queue it for propagation. | ||
pq.push(next.qrank(true)); | ||
pulses[next._id] = pulse; | ||
} | ||
} | ||
@@ -252,4 +253,4 @@ } | ||
prototype.reevaluate = function(pulse, node) { | ||
var reflowed = !pulse.reflow || (pulse.reflow && node.last() >= pulse.stamp), | ||
run = !!pulse.add.length || !!pulse.rem.length || node.router(); | ||
var reflowed = pulse.reflow && node.last() >= pulse.stamp, | ||
run = node.router() || pulse.add.length || pulse.rem.length; | ||
@@ -256,0 +257,0 @@ return run || !reflowed || node.reevaluate(pulse); |
var DEPS = require('./Dependencies').ALL, | ||
nodeID = 1; | ||
nodeID = 0; | ||
@@ -19,9 +19,10 @@ function Node(graph) { | ||
prototype.init = function(graph) { | ||
this._id = nodeID++; | ||
this._id = ++nodeID; | ||
this._graph = graph; | ||
this._rank = graph.rank(); // For topologial sort | ||
this._stamp = 0; // Last stamp seen | ||
this._rank = graph.rank(); // Topological sort by rank | ||
this._qrank = null; // Rank when enqueued for propagation | ||
this._stamp = 0; // Last stamp seen | ||
this._listeners = []; | ||
this._registered = {}; // To prevent duplicate listeners | ||
this._listeners._ids = {}; // To prevent duplicate listeners | ||
@@ -44,6 +45,10 @@ // Initialize dependencies. | ||
prototype.qrank = function(/* set */) { | ||
if (!arguments.length) return this._qrank; | ||
return (this._qrank = this._rank, this); | ||
}; | ||
prototype.last = function(stamp) { | ||
if (!arguments.length) return this._stamp; | ||
this._stamp = stamp; | ||
return this; | ||
return (this._stamp = stamp, this); | ||
}; | ||
@@ -84,4 +89,6 @@ | ||
prototype.dependency = function(type, deps) { | ||
var d = this._deps[type]; | ||
var d = this._deps[type], | ||
n = d._names || (d._names = {}); // To prevent dupe deps | ||
// Get dependencies of the given type | ||
if (arguments.length === 1) { | ||
@@ -94,8 +101,15 @@ return d; | ||
d.splice(0, d.length); | ||
d._names = {}; | ||
} else if (!Array.isArray(deps)) { | ||
if (d.indexOf(deps) < 0) { d.push(deps); } | ||
// Separate this case to avoid cost of array creation | ||
if (n[deps]) return this; | ||
d.push(deps); | ||
n[deps] = 1; | ||
} else { | ||
// TODO: singleton case checks for inclusion already | ||
// Should this be done here as well? | ||
d.push.apply(d, deps); | ||
for (var i=0, len=deps.length, dep; i<len; ++i) { | ||
dep = deps[i]; | ||
if (n[dep]) continue; | ||
d.push(dep); | ||
n[dep] = 1; | ||
} | ||
} | ||
@@ -114,6 +128,6 @@ | ||
} | ||
if (this._registered[l._id]) return this; | ||
if (this._listeners._ids[l._id]) return this; | ||
this._listeners.push(l); | ||
this._registered[l._id] = 1; | ||
this._listeners._ids[l._id] = 1; | ||
if (this._rank > l._rank) { | ||
@@ -138,3 +152,3 @@ var q = [l], | ||
this._listeners.splice(idx, 1); | ||
this._registered[l._id] = null; | ||
this._listeners._ids[l._id] = null; | ||
} | ||
@@ -146,3 +160,3 @@ return b; | ||
this._listeners = []; | ||
this._registered = {}; | ||
this._listeners._ids = {}; | ||
}; | ||
@@ -172,2 +186,4 @@ | ||
Node.reset = function() { nodeID = 0; }; | ||
module.exports = Node; |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
27199
849
0