Socket
Socket
Sign inDemoInstall

vega-dataflow

Package Overview
Dependencies
2
Maintainers
2
Versions
98
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.0.2 to 1.0.3

2

package.json
{
"name": "vega-dataflow",
"version": "1.0.2",
"version": "1.0.3",
"description": "Vega streaming dataflow graph.",

@@ -5,0 +5,0 @@ "repository": {

@@ -106,22 +106,2 @@ var util = require('datalib/src/util'),

var schedule = function(a, b) {
if (a.rank !== b.rank) {
// Topological sort
return a.rank - b.rank;
} else {
// If queueing multiple pulses to the same node, then there will be
// at most one pulse with a changeset (add/mod/rem), and the remainder
// will be reflows. Combine the changeset and reflows into a single pulse
// and queue that first. Subsequent reflow-only pulses will be pruned.
var pa = a.pulse, pb = b.pulse,
paCS = pa.add.length || pa.mod.length || pa.rem.length,
pbCS = pb.add.length || pb.mod.length || pb.rem.length;
pa.reflow = pb.reflow = pa.reflow || pb.reflow;
if (paCS && pbCS) throw Error('Both pulses have changesets.');
return paCS ? -1 : 1;
}
};
// Stamp should be specified with caution. It is necessary for inline datasources,

@@ -131,3 +111,4 @@ // which need to be populated during the same cycle even though propagation has

prototype.propagate = function(pulse, node, stamp) {
var v, l, n, p, r, i, len, reflowed;
var pulses = {},
listeners, next, nplse, tpls, ntpls, i, len;

@@ -137,3 +118,6 @@ // new PQ with each propagation cycle so that we can pulse branches

// a new inline datasource).
var pq = new Heap(schedule);
var pq = new Heap(function(a, b) {
// Topological sort on qrank as rank may change during propagation.
return a.qrank() - b.qrank();
});

@@ -143,30 +127,47 @@ if (pulse.stamp) throw Error('Pulse already has a non-zero stamp.');

pulse.stamp = stamp || ++this._stamp;
pq.push({node: node, pulse: pulse, rank: node.rank()});
pulses[node._id] = pulse;
pq.push(node.qrank(true));
while (pq.size() > 0) {
v = pq.peek();
n = v.node;
p = v.pulse;
reflowed = p.reflow && n.last() >= p.stamp;
node = pq.peek();
pulse = pulses[node._id];
if (reflowed) {
// Don't needlessly reflow ops.
pq.pop();
} else if (v.rank !== (r = n.rank())) {
if (node.rank() !== node.qrank()) {
// A node's rank might change during a propagation. Re-queue if so.
v.rank = r;
pq.replace(v);
pq.replace(node.qrank(true));
} else {
// Evaluate node and propagate pulse.
pq.pop();
l = n._listeners;
p = this.evaluate(p, n);
pulses[node._id] = null;
listeners = node._listeners;
pulse = this.evaluate(pulse, node);
// Propagate the pulse.
if (p !== this.doNotPropagate) {
if (!p.reflow && n.reflows()) { // If skipped eval of reflows node
p = ChangeSet.create(p, true);
if (pulse !== this.doNotPropagate) {
// Ensure reflow pulses always send reflow pulses even if skipped.
if (!pulse.reflow && node.reflows()) {
pulse = ChangeSet.create(pulse, true);
}
for (i=0, len=l.length; i<len; ++i) {
pq.push({node: l[i], pulse: p, rank: l[i]._rank});
for (i=0, len=listeners.length; i<len; ++i) {
next = listeners[i];
if ((nplse = pulses[next._id]) !== undefined) {
if (nplse === null) throw Error('Already propagated to node.');
// We've already queued this node. Ensure there should be at most one
// pulse with tuples (add/mod/rem), and the remainder will be reflows.
tpls = pulse.add.length || pulse.mod.length || pulse.rem.length;
ntpls = nplse.add.length || nplse.mod.length || nplse.rem.length;
if (tpls && ntpls) throw Error('Multiple changeset pulses to same node');
// Combine reflow and tuples into a single pulse.
pulses[next._id] = tpls ? pulse : nplse;
pulses[next._id].reflow = pulse.reflow || nplse.reflow;
} else {
// First time we're seeing this node, queue it for propagation.
pq.push(next.qrank(true));
pulses[next._id] = pulse;
}
}

@@ -252,4 +253,4 @@ }

prototype.reevaluate = function(pulse, node) {
var reflowed = !pulse.reflow || (pulse.reflow && node.last() >= pulse.stamp),
run = !!pulse.add.length || !!pulse.rem.length || node.router();
var reflowed = pulse.reflow && node.last() >= pulse.stamp,
run = node.router() || pulse.add.length || pulse.rem.length;

@@ -256,0 +257,0 @@ return run || !reflowed || node.reevaluate(pulse);

var DEPS = require('./Dependencies').ALL,
nodeID = 1;
nodeID = 0;

@@ -19,9 +19,10 @@ function Node(graph) {

prototype.init = function(graph) {
this._id = nodeID++;
this._id = ++nodeID;
this._graph = graph;
this._rank = graph.rank(); // For topologial sort
this._stamp = 0; // Last stamp seen
this._rank = graph.rank(); // Topological sort by rank
this._qrank = null; // Rank when enqueued for propagation
this._stamp = 0; // Last stamp seen
this._listeners = [];
this._registered = {}; // To prevent duplicate listeners
this._listeners._ids = {}; // To prevent duplicate listeners

@@ -44,6 +45,10 @@ // Initialize dependencies.

prototype.qrank = function(/* set */) {
if (!arguments.length) return this._qrank;
return (this._qrank = this._rank, this);
};
prototype.last = function(stamp) {
if (!arguments.length) return this._stamp;
this._stamp = stamp;
return this;
return (this._stamp = stamp, this);
};

@@ -84,4 +89,6 @@

prototype.dependency = function(type, deps) {
var d = this._deps[type];
var d = this._deps[type],
n = d._names || (d._names = {}); // To prevent dupe deps
// Get dependencies of the given type
if (arguments.length === 1) {

@@ -94,8 +101,15 @@ return d;

d.splice(0, d.length);
d._names = {};
} else if (!Array.isArray(deps)) {
if (d.indexOf(deps) < 0) { d.push(deps); }
// Separate this case to avoid cost of array creation
if (n[deps]) return this;
d.push(deps);
n[deps] = 1;
} else {
// TODO: singleton case checks for inclusion already
// Should this be done here as well?
d.push.apply(d, deps);
for (var i=0, len=deps.length, dep; i<len; ++i) {
dep = deps[i];
if (n[dep]) continue;
d.push(dep);
n[dep] = 1;
}
}

@@ -114,6 +128,6 @@

}
if (this._registered[l._id]) return this;
if (this._listeners._ids[l._id]) return this;
this._listeners.push(l);
this._registered[l._id] = 1;
this._listeners._ids[l._id] = 1;
if (this._rank > l._rank) {

@@ -138,3 +152,3 @@ var q = [l],

this._listeners.splice(idx, 1);
this._registered[l._id] = null;
this._listeners._ids[l._id] = null;
}

@@ -146,3 +160,3 @@ return b;

this._listeners = [];
this._registered = {};
this._listeners._ids = {};
};

@@ -172,2 +186,4 @@

Node.reset = function() { nodeID = 0; };
module.exports = Node;
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc