steampipes
Advanced tools
Comparing version 3.1.1 to 3.2.0
@@ -0,1 +1,2 @@ | ||
// Generated by CoffeeScript 2.4.1 | ||
(function() { | ||
@@ -241,1 +242,3 @@ 'use strict'; | ||
}).call(this); | ||
//# sourceMappingURL=njs-streams-and-files.js.map |
@@ -1,2 +0,1 @@ | ||
// Generated by CoffeeScript 2.4.1 | ||
(function() { | ||
@@ -162,2 +161,4 @@ 'use strict'; | ||
transform = transforms[i]; | ||
/* TAINT how can `undefined` end up in `transforms`??? */ | ||
// continue unless transform? | ||
if (transform[this.marks.isa_duct] != null) { | ||
@@ -378,6 +379,14 @@ ref = transform.transforms; | ||
//----------------------------------------------------------------------------------------------------------- | ||
this.pull = function(...transforms) { | ||
this.pull = async function(...transforms) { | ||
var d, drain, duct, first_bucket, on_end, ref; | ||
duct = this._pull(...transforms); | ||
//......................................................................................................... | ||
/* TAINT `await` makes this an async method; is that a problem? */ | ||
if (isa.function(duct.transforms[0].start)) { | ||
duct.transforms[0].start(); | ||
} else if (isa.asyncfunction(duct.transforms[0].start)) { | ||
await duct.transforms[0].start(); | ||
} | ||
if (duct.type !== 'circuit') { | ||
//......................................................................................................... | ||
return duct; | ||
@@ -452,3 +461,3 @@ } | ||
/* Make `duct` available from the POV of the push source: */ | ||
var first_bucket, source; | ||
var drain, first_bucket, on_end, source; | ||
source = duct.transforms[0]; | ||
@@ -464,2 +473,16 @@ source.duct = duct; | ||
} | ||
// debug '^333121^', 'duct', duct | ||
// debug '^333121^', 'duct.has_ended', duct.has_ended | ||
// debug '^45899^', 'source.has_ended', duct.has_ended or source.has_ended | ||
/* TAINT code duplication */ | ||
if (duct.has_ended || source.has_ended) { | ||
drain = duct.transforms[duct.transforms.length - 1]; | ||
if ((on_end = drain.on_end) != null) { | ||
if (drain.call_with_datoms) { | ||
drain.on_end(drain.sink); | ||
} else { | ||
drain.on_end(); | ||
} | ||
} | ||
} | ||
return null; | ||
@@ -469,3 +492,1 @@ }; | ||
}).call(this); | ||
//# sourceMappingURL=pull-remit.js.map |
@@ -57,3 +57,7 @@ // Generated by CoffeeScript 2.4.1 | ||
var drain, on_end; | ||
R.duct.buckets[0].push(this.signals.last); | ||
R.has_ended = true; | ||
if (R.duct == null) { | ||
return; | ||
} | ||
/* NOTE: ensuring that multiple calls to `end()` will be OK */ R.duct.buckets[0].push(this.signals.last); | ||
R.duct.exhaust_pipeline(); | ||
@@ -75,3 +79,4 @@ drain = R.duct.transforms[R.duct.transforms.length - 1]; | ||
buffer: [], | ||
duct: null | ||
duct: null, | ||
has_ended: false | ||
}; | ||
@@ -78,0 +83,0 @@ return R; |
@@ -506,2 +506,42 @@ // Generated by CoffeeScript 2.4.1 | ||
//----------------------------------------------------------------------------------------------------------- | ||
this["end push source (2)"] = async function(T, done) { | ||
var error, matcher, probe; | ||
[probe, matcher, error] = [["what", "a", "lot", "of", "little", "bottles"], ["what", "a", "lot", "of", "little", "bottles"], null]; | ||
await T.perform(probe, matcher, error, function() { | ||
return new Promise(function(resolve, reject) { | ||
var R, i, len, pipeline, source, word; | ||
R = []; | ||
source = SP.new_push_source(); | ||
//....................................................................................................... | ||
for (i = 0, len = probe.length; i < len; i++) { | ||
word = probe[i]; | ||
source.send(word); | ||
} | ||
source.end(); | ||
//....................................................................................................... | ||
pipeline = []; | ||
pipeline.push(source); | ||
pipeline.push(SP.$watch(function(d) { | ||
return info(xrpr(d)); | ||
})); | ||
pipeline.push(SP.$collect({ | ||
collector: R | ||
})); | ||
pipeline.push(SP.$watch(function(d) { | ||
return info(xrpr(d)); | ||
})); | ||
pipeline.push(SP.$drain(function() { | ||
help('ok'); | ||
return resolve(R); | ||
})); | ||
SP.pull(...pipeline); | ||
return null; | ||
}); | ||
}); | ||
//......................................................................................................... | ||
done(); | ||
return null; | ||
}; | ||
//----------------------------------------------------------------------------------------------------------- | ||
this["duct_from_transforms"] = function(T, done) { | ||
@@ -742,25 +782,2 @@ (() => { | ||
#----------------------------------------------------------------------------------------------------------- | ||
@[ "end push source (2)" ] = ( T, done ) -> | ||
* The proper way to end a push source is to call `source.end()`. | ||
[ probe, matcher, error, ] = [["what","a","lot","of","little","bottles","stop"],["what","a","lot","of","little","bottles"],null] | ||
await T.perform probe, matcher, error, -> return new Promise ( resolve, reject ) -> | ||
R = [] | ||
drainer = -> help 'ok'; resolve R | ||
source = SP.new_push_source() | ||
pipeline = [] | ||
pipeline.push source | ||
pipeline.push SP.$watch ( d ) -> info xrpr d | ||
pipeline.push $ ( d, send ) -> if d is 'stop' then source.end() else send d | ||
pipeline.push SP.$collect { collector: R, } | ||
pipeline.push SP.$watch ( d ) -> info xrpr d | ||
pipeline.push SP.$drain drainer | ||
pull pipeline... | ||
for word in probe | ||
source.send word | ||
return null | ||
#......................................................................................................... | ||
done() | ||
return null | ||
#----------------------------------------------------------------------------------------------------------- | ||
@[ "end push source (3)" ] = ( T, done ) -> | ||
@@ -1071,3 +1088,4 @@ * The proper way to end a push source is to call `source.end()`; `send.end()` is largely equivalent. | ||
// test @, 'timeout': 30000 | ||
test(this["$filter"]); | ||
// test @[ "$filter" ] | ||
test(this["end push source (2)"]); | ||
} | ||
@@ -1088,3 +1106,2 @@ | ||
// test @[ "end push source (1)" ] | ||
// test @[ "end push source (2)" ] | ||
// test @[ "end push source (3)" ] | ||
@@ -1091,0 +1108,0 @@ // test @[ "end push source (4)" ] |
{ | ||
"name": "steampipes", | ||
"version": "3.1.1", | ||
"version": "3.2.0", | ||
"description": "Fast, simple data pipelines", | ||
@@ -5,0 +5,0 @@ "main": "lib/main.js", |
@@ -192,1 +192,39 @@ <!-- START doctoc generated TOC please keep comment here to allow auto update --> | ||
``` | ||
## Updates | ||
* If source has a method `start()`, it will be called when `SP.pull pipeline...` is called; this enables | ||
push sources to delay issuing data until the pipeline is ready to consume it | ||
## To Do | ||
* [ ] cf `### TAINT how can `undefined` end up in `transforms`??? ###` in `pull-remit.coffee`: Fix bug | ||
* [ ] somehow notify sources (especiall push sources) that pipeline has been pulled (so data may start to | ||
flow); otherwise, if ultimate source is e.g. NodeJS connected via event handlers, those underlying sources | ||
will start on definition, not on pipeline completion, and will spill arbitrary amounts of data into | ||
SteamPipe buffers. | ||
* [ ] compare: | ||
```coffee | ||
source = SP.new_push_source() | ||
source.send 1 | ||
source.send 2 | ||
pipeline = [] | ||
pipeline.push source | ||
pipeline.push SP.$show() | ||
pipeline.push $drain -> | ||
urge '^2262^', "demo_stream ended" | ||
resolve() | ||
source.end() # (1) | ||
SP.pull pipeline... | ||
# source.end() # (2) | ||
``` | ||
With `(1)`, the drain condition never triggers; only `(2)` works as intended; i.o.w. `source.end()` must | ||
not be called before `SP.pull()`. This is not acceptable. | ||
* [ ] consider whether `$drain()` should allow to appear mid-stream (it would then pull data from upstream, | ||
downstream must rely on own `$drain()` to obtain data). | ||
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
6407
230
884225