Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

vega-dataflow

Package Overview
Dependencies
Maintainers
2
Versions
98
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

vega-dataflow - npm Package Compare versions

Comparing version 1.2.3 to 1.2.4

2

package.json
{
"name": "vega-dataflow",
"version": "1.2.3",
"version": "1.2.4",
"description": "Vega streaming dataflow graph.",

@@ -5,0 +5,0 @@ "repository": {

@@ -95,34 +95,17 @@ var log = require('vega-logging'),

this._inputNode = DataSourceInput(this);
this._outputNode = DataSourceOutput(this);
var graph = this._graph,
mutates = 0,
collector = this._inputNode,
i, node, router, collects;
status;
for (i=0; i<pipeline.length; ++i) {
node = pipeline[i];
pipeline.unshift(this._inputNode = DataSourceInput(this));
status = graph.preprocess(pipeline);
if (!node._collector && node.batch()) {
if (router) {
node = new Collector(graph);
pipeline.splice(i, 0, node);
router = false;
} else {
node._collector = collector;
}
}
if ((collects = node.collector())) collector = node;
router = router || node.router() && !collects;
mutates = mutates || node.mutates();
if (status.router) {
pipeline.push(status.collector = new Collector(graph));
}
if (router) pipeline.push(collector = new Collector(graph));
pipeline.unshift(this._inputNode);
pipeline.push(this._outputNode);
this._collector = collector;
this._mutates = !!mutates;
this._graph.connect(this._pipeline = pipeline);
pipeline.push(this._outputNode = DataSourceOutput(this));
this._collector = status.collector;
this._mutates = !!status.mutates;
graph.connect(this._pipeline = pipeline);
return this;

@@ -132,15 +115,3 @@ };

prototype.synchronize = function() {
var data = this._data, i, n;
for (i=0, n=data.length; i<n; ++i) {
Tuple.prev_update(data[i]);
}
if (this._inputNode !== this._collector) {
data = this._collector.data();
for (i=0, n=data.length; i<n; ++i) {
Tuple.prev_update(data[i]);
}
}
this._graph.synchronize(this._pipeline);
return this;

@@ -147,0 +118,0 @@ };

@@ -5,2 +5,4 @@ var dl = require('datalib'),

DataSource = require('./DataSource'),
Collector = require('./Collector'),
Tuple = require('./Tuple'),
Signal = require('./Signal'),

@@ -162,2 +164,39 @@ Deps = require('./Dependencies');

// Process a new branch of the dataflow graph prior to connection:
// (1) Insert new Collector nodes as needed.
// (2) Track + return mutation/routing status of the branch.
prototype.preprocess = function(branch) {
var graph = this,
mutates = 0,
node, router, collector, collects;
for (var i=0; i<branch.length; ++i) {
node = branch[i];
// Batch nodes need access to a materialized dataset.
if (node.batch() && !node._collector) {
if (router || !collector) {
node = new Collector(graph);
branch.splice(i, 0, node);
router = false;
} else {
node._collector = collector;
}
}
if ((collects = node.collector())) collector = node;
router = router || node.router() && !collects;
mutates = mutates || node.mutates();
// A collector needs to be inserted after tuple-producing
// nodes for correct previous value tracking.
if (node.produces()) {
branch.splice(i+1, 0, new Collector(graph));
router = false;
}
}
return {router: router, collector: collector, mutates: mutates};
};
prototype.connect = function(branch) {

@@ -210,2 +249,21 @@ var collector, node, data, signals, i, n, j, m;

prototype.synchronize = function(branch) {
var ids = {},
node, data, i, n, j, m, d, id;
for (i=0, n=branch.length; i<n; ++i) {
node = branch[i];
if (!node.collector()) continue;
for (j=0, data=node.data(), m=data.length; j<m; ++j) {
id = (d = data[j])._id;
if (ids[id]) continue;
Tuple.prev_update(d);
ids[id] = 1;
}
}
return this;
};
prototype.reevaluate = function(pulse, node) {

@@ -212,0 +270,0 @@ var reflowed = pulse.reflow && node.last() >= pulse.stamp,

@@ -11,5 +11,6 @@ var DEPS = require('./Dependencies').ALL,

Collector: 0x02, // Holds a materialized dataset, pulse node to reflow.
Mutates: 0x04, // Sets properties of incoming tuples.
Reflows: 0x08, // Forwards a reflow pulse.
Batch: 0x10 // Performs batch data processing, needs collector.
Produces: 0x04, // Produces new tuples.
Mutates: 0x08, // Sets properties of incoming tuples.
Reflows: 0x10, // Forwards a reflow pulse.
Batch: 0x20 // Performs batch data processing, needs collector.
};

@@ -72,2 +73,7 @@

prototype.produces = function(state) {
if (!arguments.length) return (this._flags & Flags.Produces);
return this._setf(Flags.Produces, state);
};
prototype.mutates = function(state) {

@@ -74,0 +80,0 @@ if (!arguments.length) return (this._flags & Flags.Mutates);

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc