Socket
Socket
Sign inDemoInstall

steampipes

Package Overview
Dependencies
Maintainers
1
Versions
30
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

steampipes - npm Package Compare versions

Comparing version 3.1.1 to 3.2.0

3

lib/njs-streams-and-files.js

@@ -0,1 +1,2 @@

// Generated by CoffeeScript 2.4.1
(function() {

@@ -241,1 +242,3 @@ 'use strict';

}).call(this);
//# sourceMappingURL=njs-streams-and-files.js.map

@@ -1,2 +0,1 @@

// Generated by CoffeeScript 2.4.1
(function() {

@@ -162,2 +161,4 @@ 'use strict';

transform = transforms[i];
/* TAINT how can `undefined` end up in `transforms`??? */
// continue unless transform?
if (transform[this.marks.isa_duct] != null) {

@@ -378,6 +379,14 @@ ref = transform.transforms;

//-----------------------------------------------------------------------------------------------------------
this.pull = function(...transforms) {
this.pull = async function(...transforms) {
var d, drain, duct, first_bucket, on_end, ref;
duct = this._pull(...transforms);
//.........................................................................................................
/* TAINT `await` makes this an async method; is that a problem? */
if (isa.function(duct.transforms[0].start)) {
duct.transforms[0].start();
} else if (isa.asyncfunction(duct.transforms[0].start)) {
await duct.transforms[0].start();
}
if (duct.type !== 'circuit') {
//.........................................................................................................
return duct;

@@ -452,3 +461,3 @@ }

/* Make `duct` available from the POV of the push source: */
var first_bucket, source;
var drain, first_bucket, on_end, source;
source = duct.transforms[0];

@@ -464,2 +473,16 @@ source.duct = duct;

}
// debug '^333121^', 'duct', duct
// debug '^333121^', 'duct.has_ended', duct.has_ended
// debug '^45899^', 'source.has_ended', duct.has_ended or source.has_ended
/* TAINT code duplication */
if (duct.has_ended || source.has_ended) {
drain = duct.transforms[duct.transforms.length - 1];
if ((on_end = drain.on_end) != null) {
if (drain.call_with_datoms) {
drain.on_end(drain.sink);
} else {
drain.on_end();
}
}
}
return null;

@@ -469,3 +492,1 @@ };

}).call(this);
//# sourceMappingURL=pull-remit.js.map

9

lib/sources.js

@@ -57,3 +57,7 @@ // Generated by CoffeeScript 2.4.1

var drain, on_end;
R.duct.buckets[0].push(this.signals.last);
R.has_ended = true;
if (R.duct == null) {
return;
}
/* NOTE: ensuring that multiple calls to `end()` will be OK */ R.duct.buckets[0].push(this.signals.last);
R.duct.exhaust_pipeline();

@@ -75,3 +79,4 @@ drain = R.duct.transforms[R.duct.transforms.length - 1];

buffer: [],
duct: null
duct: null,
has_ended: false
};

@@ -78,0 +83,0 @@ return R;

@@ -506,2 +506,42 @@ // Generated by CoffeeScript 2.4.1

//-----------------------------------------------------------------------------------------------------------
this["end push source (2)"] = async function(T, done) {
var error, matcher, probe;
[probe, matcher, error] = [["what", "a", "lot", "of", "little", "bottles"], ["what", "a", "lot", "of", "little", "bottles"], null];
await T.perform(probe, matcher, error, function() {
return new Promise(function(resolve, reject) {
var R, i, len, pipeline, source, word;
R = [];
source = SP.new_push_source();
//.......................................................................................................
for (i = 0, len = probe.length; i < len; i++) {
word = probe[i];
source.send(word);
}
source.end();
//.......................................................................................................
pipeline = [];
pipeline.push(source);
pipeline.push(SP.$watch(function(d) {
return info(xrpr(d));
}));
pipeline.push(SP.$collect({
collector: R
}));
pipeline.push(SP.$watch(function(d) {
return info(xrpr(d));
}));
pipeline.push(SP.$drain(function() {
help('ok');
return resolve(R);
}));
SP.pull(...pipeline);
return null;
});
});
//.........................................................................................................
done();
return null;
};
//-----------------------------------------------------------------------------------------------------------
this["duct_from_transforms"] = function(T, done) {

@@ -742,25 +782,2 @@ (() => {

#-----------------------------------------------------------------------------------------------------------
@[ "end push source (2)" ] = ( T, done ) ->
* The proper way to end a push source is to call `source.end()`.
[ probe, matcher, error, ] = [["what","a","lot","of","little","bottles","stop"],["what","a","lot","of","little","bottles"],null]
await T.perform probe, matcher, error, -> return new Promise ( resolve, reject ) ->
R = []
drainer = -> help 'ok'; resolve R
source = SP.new_push_source()
pipeline = []
pipeline.push source
pipeline.push SP.$watch ( d ) -> info xrpr d
pipeline.push $ ( d, send ) -> if d is 'stop' then source.end() else send d
pipeline.push SP.$collect { collector: R, }
pipeline.push SP.$watch ( d ) -> info xrpr d
pipeline.push SP.$drain drainer
pull pipeline...
for word in probe
source.send word
return null
#.........................................................................................................
done()
return null
#-----------------------------------------------------------------------------------------------------------
@[ "end push source (3)" ] = ( T, done ) ->

@@ -1071,3 +1088,4 @@ * The proper way to end a push source is to call `source.end()`; `send.end()` is largely equivalent.

// test @, 'timeout': 30000
test(this["$filter"]);
// test @[ "$filter" ]
test(this["end push source (2)"]);
}

@@ -1088,3 +1106,2 @@

// test @[ "end push source (1)" ]
// test @[ "end push source (2)" ]
// test @[ "end push source (3)" ]

@@ -1091,0 +1108,0 @@ // test @[ "end push source (4)" ]

{
"name": "steampipes",
"version": "3.1.1",
"version": "3.2.0",
"description": "Fast, simple data pipelines",

@@ -5,0 +5,0 @@ "main": "lib/main.js",

@@ -192,1 +192,39 @@ <!-- START doctoc generated TOC please keep comment here to allow auto update -->

```
## Updates
* If source has a method `start()`, it will be called when `SP.pull pipeline...` is called; this enables
push sources to delay issuing data until the pipeline is ready to consume it
## To Do
* [ ] cf `### TAINT how can `undefined` end up in `transforms`??? ###` in `pull-remit.coffee`: Fix bug
* [ ] somehow notify sources (especiall push sources) that pipeline has been pulled (so data may start to
flow); otherwise, if ultimate source is e.g. NodeJS connected via event handlers, those underlying sources
will start on definition, not on pipeline completion, and will spill arbitrary amounts of data into
SteamPipe buffers.
* [ ] compare:
```coffee
source = SP.new_push_source()
source.send 1
source.send 2
pipeline = []
pipeline.push source
pipeline.push SP.$show()
pipeline.push $drain ->
urge '^2262^', "demo_stream ended"
resolve()
source.end() # (1)
SP.pull pipeline...
# source.end() # (2)
```
With `(1)`, the drain condition never triggers; only `(2)` works as intended; i.o.w. `source.end()` must
not be called before `SP.pull()`. This is not acceptable.
* [ ] consider whether `$drain()` should allow to appear mid-stream (it would then pull data from upstream,
downstream must rely on own `$drain()` to obtain data).

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc