Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

vega-dataflow

Package Overview
Dependencies
Maintainers
2
Versions
98
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

vega-dataflow - npm Package Compare versions

Comparing version 1.3.2 to 1.4.0

2

package.json
{
"name": "vega-dataflow",
"version": "1.3.2",
"version": "1.4.0",
"description": "Vega streaming dataflow graph.",

@@ -5,0 +5,0 @@ "repository": {

@@ -1,3 +0,4 @@

var log = require('vega-logging'),
ChangeSet = require('./ChangeSet'),
var dl = require('datalib'),
log = require('vega-logging'),
ChangeSet = require('./ChangeSet'),
Collector = require('./Collector'),

@@ -15,2 +16,4 @@ Tuple = require('./Tuple'),

this._output = null; // Output changeset
this._indexes = {};
this._indexFields = [];

@@ -115,2 +118,29 @@ this._inputNode = null;

prototype.synchronize = function() {
// update indices
var pulse = this.last(),
fields = this._indexFields,
i, j, f, key, index, value;
for (i=0; i<fields.length; ++i) {
key = fields[i];
index = this._indexes[key];
f = dl.$(key);
for (j=0; j<pulse.add.length; ++j) {
value = f(pulse.add[j]);
Tuple.prev_init(pulse.add[j]);
index[value] = (index[value] || 0) + 1;
}
for (j=0; j<pulse.rem.length; ++j) {
value = f(pulse.rem[j]);
index[value] = (index[value] || 0) - 1;
}
for (j=0; j<pulse.mod.length; ++j) {
value = f(pulse.mod[j]._prev);
index[value] = (index[value] || 0) - 1;
value = f(pulse.mod[j]);
index[value] = (index[value] || 0) + 1;
}
}
this._graph.synchronize(this._pipeline);

@@ -120,3 +150,21 @@ return this;

prototype.listener = function() {
prototype.getIndex = function(field) {
var data = this.values(),
f = dl.$(field),
index, i, len, value;
if (!this._indexes[field]) {
index = {};
this._indexes[field] = index;
this._indexFields.push(field);
for (i=0, len=data.length; i<len; ++i) {
value = f(this._data[i]);
index[value] = (index[value] || 0) + 1;
Tuple.prev_init(this._data[i]);
}
}
return this._indexes[field];
};
prototype.listener = function() {
return DataSourceListener(this).addListener(this._inputNode);

@@ -129,3 +177,3 @@ };

} else {
this._outputNode.addListener(l);
this._outputNode.addListener(l);
}

@@ -143,3 +191,3 @@ return this;

// Input node applies the datasource's delta, and propagates it to
// Input node applies the datasource's delta, and propagates it to
// the rest of the pipeline. It receives touches to reflow data.

@@ -158,3 +206,3 @@ function DataSourceInput(ds) {

var delta = ds._input,
var delta = ds._input,
out = ChangeSet.create(input), f;

@@ -189,3 +237,3 @@

out.add = delta.add;
out.add = delta.add;
out.mod = delta.mod;

@@ -237,3 +285,3 @@ out.rem = delta.rem;

// operators that mutate, and thus would override the source data.
if (ds.mutates()) {
if (ds.mutates()) {
var map = ds._srcMap || (ds._srcMap = {}), // to propagate tuples correctly

@@ -250,3 +298,3 @@ output = ChangeSet.create(input);

output.rem = input.rem.map(function(t) {
output.rem = input.rem.map(function(t) {
var o = map[t._id];

@@ -253,0 +301,0 @@ return (map[t._id] = null, o);

@@ -96,3 +96,3 @@ var dl = require('datalib'),

// which need to be populated during the same cycle even though propagation has
// passed that part of the dataflow graph.
// passed that part of the dataflow graph.
prototype.propagate = function(pulse, node, stamp) {

@@ -131,3 +131,3 @@ var pulses = {},

// Propagate the pulse.
// Propagate the pulse.
if (pulse !== this.doNotPropagate) {

@@ -147,3 +147,3 @@ // Ensure reflow pulses always send reflow pulses even if skipped.

// We've already queued this node. Ensure there should be at most one
// pulse with tuples (add/mod/rem), and the remainder will be reflows.
// pulse with tuples (add/mod/rem), and the remainder will be reflows.
tpls = pulse.add.length || pulse.mod.length || pulse.rem.length;

@@ -154,3 +154,3 @@ ntpls = nplse.add.length || nplse.mod.length || nplse.rem.length;

// Combine reflow and tuples into a single pulse.
// Combine reflow and tuples into a single pulse.
pulses[next._id] = tpls ? pulse : nplse;

@@ -171,3 +171,3 @@ pulses[next._id].reflow = pulse.reflow || nplse.reflow;

// Perform final bookkeeping on the graph, after propagation is complete.
// Perform final bookkeeping on the graph, after propagation is complete.
// - For all updated datasources, synchronize their previous values.

@@ -181,3 +181,3 @@ prototype.done = function(pulse) {

// Process a new branch of the dataflow graph prior to connection:
// (1) Insert new Collector nodes as needed.
// (1) Insert new Collector nodes as needed.
// (2) Track + return mutation/routing status of the branch.

@@ -192,3 +192,3 @@ prototype.preprocess = function(branch) {

// Batch nodes need access to a materialized dataset.
// Batch nodes need access to a materialized dataset.
if (node.batch() && !node._collector) {

@@ -284,5 +284,5 @@ if (router || !collector) {

id = (d = data[j])._id;
if (ids[id]) continue;
if (ids[id]) continue;
Tuple.prev_update(d);
ids[id] = 1;
ids[id] = 1;
}

@@ -289,0 +289,0 @@ }

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc