steampipes
Advanced tools
Comparing version 6.2.0 to 6.3.0
@@ -190,2 +190,47 @@ (function() { | ||
//----------------------------------------------------------------------------------------------------------- | ||
this.$chunkify_keep = function(filter, postprocess = null) { | ||
return this._$chunkify(filter, postprocess, true); | ||
}; | ||
this.$chunkify_toss = function(filter, postprocess = null) { | ||
return this._$chunkify(filter, postprocess, false); | ||
}; | ||
//----------------------------------------------------------------------------------------------------------- | ||
this._$chunkify = function(filter, postprocess, keep) { | ||
var collector, last; | ||
if (postprocess == null) { | ||
postprocess = function(x) { | ||
return x; | ||
}; | ||
} | ||
validate.function(filter); | ||
validate.function(postprocess); | ||
collector = null; | ||
last = Symbol('last'); | ||
//......................................................................................................... | ||
return this.$({last}, function(d, send) { | ||
if (d === last) { | ||
if (collector != null) { | ||
send(postprocess(collector)); | ||
collector = null; | ||
} | ||
return null; | ||
} | ||
if (filter(d)) { | ||
if (keep) { | ||
(collector != null ? collector : collector = []).push(d); | ||
} | ||
if (collector != null) { | ||
send(postprocess(collector)); | ||
collector = null; | ||
} | ||
return null; | ||
} | ||
(collector != null ? collector : collector = []).push(d); | ||
return null; | ||
}); | ||
}; | ||
//----------------------------------------------------------------------------------------------------------- | ||
/* Given a `settings` object, add values to the stream as `$ settings, ( d, send ) -> send d` would do, | ||
@@ -192,0 +237,0 @@ e.g. `$surround { first: 'first!', between: 'to appear in-between two values', }`. */ |
@@ -1,2 +0,1 @@ | ||
// Generated by CoffeeScript 2.5.1 | ||
(function() { | ||
@@ -778,2 +777,126 @@ //########################################################################################################### | ||
//----------------------------------------------------------------------------------------------------------- | ||
this["$chunkify_keep no postprocessing"] = async function(T, done) { | ||
var error, i, len, matcher, probe, probes_and_matchers; | ||
probes_and_matchers = [[[], [], null], ['abcdefg', [['a', 'b', 'c', 'd', 'e', 'f', 'g']], null], ['ab(cdefg)', [['a', 'b', '('], ['c', 'd', 'e', 'f', 'g', ')']], null]]; | ||
//......................................................................................................... | ||
for (i = 0, len = probes_and_matchers.length; i < len; i++) { | ||
[probe, matcher, error] = probes_and_matchers[i]; | ||
//....................................................................................................... | ||
await T.perform(probe, matcher, error, function() { | ||
return new Promise(function(resolve, reject) { | ||
var pipeline; | ||
pipeline = []; | ||
pipeline.push(probe); | ||
pipeline.push(SP.$chunkify_keep(function(d) { | ||
return d === '(' || d === ')'; | ||
})); | ||
pipeline.push(SP.$drain(function(collector) { | ||
return resolve(collector); | ||
})); | ||
SP.pull(...pipeline); | ||
//..................................................................................................... | ||
return null; | ||
}); | ||
}); | ||
} | ||
//......................................................................................................... | ||
done(); | ||
return null; | ||
}; | ||
//----------------------------------------------------------------------------------------------------------- | ||
this["$chunkify_toss no postprocessing"] = async function(T, done) { | ||
var error, i, len, matcher, probe, probes_and_matchers; | ||
probes_and_matchers = [[[], [], null], ['abcdefg', [['a', 'b', 'c', 'd', 'e', 'f', 'g']], null], ['ab(cdefg)', [['a', 'b'], ['c', 'd', 'e', 'f', 'g']], null]]; | ||
//......................................................................................................... | ||
for (i = 0, len = probes_and_matchers.length; i < len; i++) { | ||
[probe, matcher, error] = probes_and_matchers[i]; | ||
//....................................................................................................... | ||
await T.perform(probe, matcher, error, function() { | ||
return new Promise(function(resolve, reject) { | ||
var pipeline; | ||
pipeline = []; | ||
pipeline.push(probe); | ||
pipeline.push(SP.$chunkify_toss(function(d) { | ||
return d === '(' || d === ')'; | ||
})); | ||
pipeline.push(SP.$drain(function(collector) { | ||
return resolve(collector); | ||
})); | ||
SP.pull(...pipeline); | ||
//..................................................................................................... | ||
return null; | ||
}); | ||
}); | ||
} | ||
//......................................................................................................... | ||
done(); | ||
return null; | ||
}; | ||
//----------------------------------------------------------------------------------------------------------- | ||
this["$chunkify_toss with postprocessing"] = async function(T, done) { | ||
var error, i, len, matcher, probe, probes_and_matchers; | ||
probes_and_matchers = [[[], [], null], ['abcdefg', ['a|b|c|d|e|f|g'], null], ['ab(cdefg)', ['a|b', 'c|d|e|f|g'], null]]; | ||
//......................................................................................................... | ||
for (i = 0, len = probes_and_matchers.length; i < len; i++) { | ||
[probe, matcher, error] = probes_and_matchers[i]; | ||
//....................................................................................................... | ||
await T.perform(probe, matcher, error, function() { | ||
return new Promise(function(resolve, reject) { | ||
var pipeline; | ||
pipeline = []; | ||
pipeline.push(probe); | ||
pipeline.push(SP.$chunkify_toss((function(d) { | ||
return d === '(' || d === ')'; | ||
}), function(chunk) { | ||
return chunk.join('|'); | ||
})); | ||
pipeline.push(SP.$drain(function(collector) { | ||
return resolve(collector); | ||
})); | ||
SP.pull(...pipeline); | ||
//..................................................................................................... | ||
return null; | ||
}); | ||
}); | ||
} | ||
//......................................................................................................... | ||
done(); | ||
return null; | ||
}; | ||
//----------------------------------------------------------------------------------------------------------- | ||
this["$chunkify_keep with postprocessing"] = async function(T, done) { | ||
var error, i, len, matcher, probe, probes_and_matchers; | ||
probes_and_matchers = [[[], [], null], ['abcdefg', ['a|b|c|d|e|f|g'], null], ['ab(cdefg)', ['a|b|(', 'c|d|e|f|g|)'], null]]; | ||
//......................................................................................................... | ||
for (i = 0, len = probes_and_matchers.length; i < len; i++) { | ||
[probe, matcher, error] = probes_and_matchers[i]; | ||
//....................................................................................................... | ||
await T.perform(probe, matcher, error, function() { | ||
return new Promise(function(resolve, reject) { | ||
var pipeline; | ||
pipeline = []; | ||
pipeline.push(probe); | ||
pipeline.push(SP.$chunkify_keep((function(d) { | ||
return d === '(' || d === ')'; | ||
}), function(chunk) { | ||
return chunk.join('|'); | ||
})); | ||
pipeline.push(SP.$drain(function(collector) { | ||
return resolve(collector); | ||
})); | ||
SP.pull(...pipeline); | ||
//..................................................................................................... | ||
return null; | ||
}); | ||
}); | ||
} | ||
//......................................................................................................... | ||
done(); | ||
return null; | ||
}; | ||
/* | ||
@@ -1086,14 +1209,19 @@ | ||
if (module.parent == null) { | ||
// test @, 'timeout': 30000 | ||
// test @[ "$filter" ] | ||
// test @[ "end push source (2)" ] | ||
// test @[ "remit 1" ] | ||
// test @[ "drain with result" ] | ||
// test @[ "remit 2" ] | ||
// test @[ "remit with end detection 1" ] | ||
// test @[ "duct_from_transforms" ] | ||
test(this["composability (through)"]); | ||
// test @[ "$chunkify 1" ] | ||
// test @[ "$chunkify_keep no postprocessing" ] | ||
// test @[ "$chunkify_toss no postprocessing" ] | ||
// test @[ "$chunkify_toss with postprocessing" ] | ||
test(this["$chunkify_keep with postprocessing"]); | ||
} | ||
// test @[ "composability (source)" ] | ||
// test @, 'timeout': 30000 | ||
// test @[ "$filter" ] | ||
// test @[ "end push source (2)" ] | ||
// test @[ "remit 1" ] | ||
// test @[ "drain with result" ] | ||
// test @[ "remit 2" ] | ||
// test @[ "remit with end detection 1" ] | ||
// test @[ "duct_from_transforms" ] | ||
// test @[ "composability (through)" ] | ||
// test @[ "composability (source)" ] | ||
// test @[ "composability (sink)" ] | ||
@@ -1120,3 +1248,1 @@ // test @[ "remit with end detection 2" ] | ||
}).call(this); | ||
//# sourceMappingURL=basic.test.js.map |
{ | ||
"name": "steampipes", | ||
"version": "6.2.0", | ||
"version": "6.3.0", | ||
"description": "Fast, simple data pipelines", | ||
@@ -5,0 +5,0 @@ "main": "lib/main.js", |
@@ -97,2 +97,39 @@ | ||
* **`@chunkify_*()`**—cut stream by observing boundaries. Depending on whether to keep or toss datoms | ||
recognbized by the `filter` function as boundaries, use either | ||
* **`@$chunkify_keep = ( filter, postprocess = null ) ->`** or | ||
* **`@$chunkify_toss = ( filter, postprocess = null ) ->`** | ||
The second, optional `postprocess` argument must be a function when given; it will receive a list of | ||
datoms and may return any value which will then be sent on. Sample application: | ||
```coffee | ||
filter = ( d ) -> d in [ '(', ')', ] | ||
postprocess = ( chunk ) -> chunk.join '|' | ||
pipeline = [] | ||
pipeline.push 'ab(cdefg)' | ||
pipeline.push SP.$chunkify_keep filter, postprocess | ||
pipeline.push SP.$show() | ||
pipeline.push SP.$drain -> resolve() | ||
SP.pull pipeline... | ||
``` | ||
will print | ||
``` | ||
'a|b|(' | ||
'c|d|e|f|g|)' | ||
``` | ||
Had we used `SP.$chunkify_toss filter, postprocess` instead, the output would have been | ||
``` | ||
'a|b' | ||
'c|d|e|f|g' | ||
``` | ||
So if one just wanted to collect all stream items into a single list, one would use either `SP.$collect()` | ||
or else an argument to the drain transform, as in `SP.$drain ( collector ) -> resolve collector`; if one | ||
wanted to collect all stream items into multiple lists, then `SP.$chunkify_{keep|toss} filter, ...` is the | ||
way to go. | ||
### Modifiers and `$before_first()`, `$after_last()` | ||
@@ -312,3 +349,3 @@ | ||
pipeline at large and the transform? | ||
* [ ] obscure bug: when a push source is used with a stream that comes from another instance of the | ||
* [X] obscure bug: when a push source is used with a stream that comes from another instance of the | ||
SteamPipes library (as in, `( require 'pathA/steampipes' ).new_push_source()` is used in a pipeline that | ||
@@ -318,5 +355,9 @@ is activated by) `( require 'pathB/steampipes' ).pull pipeline...`) and error with message | ||
object` results. The message should at least hint point at the probable error cause or be avoided at all. | ||
**FIXED in v6.2**: replaced `Symbol 'xy'` with `Symbol.from 'steampipes/xy'` in `SP.marks`. | ||
* [ ] bug: async functions passed into `$drain()`, attached to `push_source.start` and possibly other places | ||
are not called or not called with `await`, thus causing silent failures. Must always reject loudly where | ||
detected or be handled appropriately. | ||
* [X] implement `$chunkify()` (as configurable variant of `$collect()`?); see code comment in standard | ||
transforms | ||
### Future: JS Pipeline Operator | ||
@@ -323,0 +364,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
1144673
8182
431