Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

steampipes

Package Overview
Dependencies
Maintainers
1
Versions
30
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

steampipes - npm Package Compare versions

Comparing version 6.2.0 to 6.3.0

45

lib/standard-transforms.js

@@ -190,2 +190,47 @@ (function() {

//-----------------------------------------------------------------------------------------------------------
this.$chunkify_keep = function(filter, postprocess = null) {
return this._$chunkify(filter, postprocess, true);
};
this.$chunkify_toss = function(filter, postprocess = null) {
return this._$chunkify(filter, postprocess, false);
};
//-----------------------------------------------------------------------------------------------------------
this._$chunkify = function(filter, postprocess, keep) {
var collector, last;
if (postprocess == null) {
postprocess = function(x) {
return x;
};
}
validate.function(filter);
validate.function(postprocess);
collector = null;
last = Symbol('last');
//.........................................................................................................
return this.$({last}, function(d, send) {
if (d === last) {
if (collector != null) {
send(postprocess(collector));
collector = null;
}
return null;
}
if (filter(d)) {
if (keep) {
(collector != null ? collector : collector = []).push(d);
}
if (collector != null) {
send(postprocess(collector));
collector = null;
}
return null;
}
(collector != null ? collector : collector = []).push(d);
return null;
});
};
//-----------------------------------------------------------------------------------------------------------
/* Given a `settings` object, add values to the stream as `$ settings, ( d, send ) -> send d` would do,

@@ -192,0 +237,0 @@ e.g. `$surround { first: 'first!', between: 'to appear in-between two values', }`. */

152

lib/tests/basic.test.js

@@ -1,2 +0,1 @@

// Generated by CoffeeScript 2.5.1
(function() {

@@ -778,2 +777,126 @@ //###########################################################################################################

//-----------------------------------------------------------------------------------------------------------
this["$chunkify_keep no postprocessing"] = async function(T, done) {
var error, i, len, matcher, probe, probes_and_matchers;
probes_and_matchers = [[[], [], null], ['abcdefg', [['a', 'b', 'c', 'd', 'e', 'f', 'g']], null], ['ab(cdefg)', [['a', 'b', '('], ['c', 'd', 'e', 'f', 'g', ')']], null]];
//.........................................................................................................
for (i = 0, len = probes_and_matchers.length; i < len; i++) {
[probe, matcher, error] = probes_and_matchers[i];
//.......................................................................................................
await T.perform(probe, matcher, error, function() {
return new Promise(function(resolve, reject) {
var pipeline;
pipeline = [];
pipeline.push(probe);
pipeline.push(SP.$chunkify_keep(function(d) {
return d === '(' || d === ')';
}));
pipeline.push(SP.$drain(function(collector) {
return resolve(collector);
}));
SP.pull(...pipeline);
//.....................................................................................................
return null;
});
});
}
//.........................................................................................................
done();
return null;
};
//-----------------------------------------------------------------------------------------------------------
this["$chunkify_toss no postprocessing"] = async function(T, done) {
var error, i, len, matcher, probe, probes_and_matchers;
probes_and_matchers = [[[], [], null], ['abcdefg', [['a', 'b', 'c', 'd', 'e', 'f', 'g']], null], ['ab(cdefg)', [['a', 'b'], ['c', 'd', 'e', 'f', 'g']], null]];
//.........................................................................................................
for (i = 0, len = probes_and_matchers.length; i < len; i++) {
[probe, matcher, error] = probes_and_matchers[i];
//.......................................................................................................
await T.perform(probe, matcher, error, function() {
return new Promise(function(resolve, reject) {
var pipeline;
pipeline = [];
pipeline.push(probe);
pipeline.push(SP.$chunkify_toss(function(d) {
return d === '(' || d === ')';
}));
pipeline.push(SP.$drain(function(collector) {
return resolve(collector);
}));
SP.pull(...pipeline);
//.....................................................................................................
return null;
});
});
}
//.........................................................................................................
done();
return null;
};
//-----------------------------------------------------------------------------------------------------------
this["$chunkify_toss with postprocessing"] = async function(T, done) {
var error, i, len, matcher, probe, probes_and_matchers;
probes_and_matchers = [[[], [], null], ['abcdefg', ['a|b|c|d|e|f|g'], null], ['ab(cdefg)', ['a|b', 'c|d|e|f|g'], null]];
//.........................................................................................................
for (i = 0, len = probes_and_matchers.length; i < len; i++) {
[probe, matcher, error] = probes_and_matchers[i];
//.......................................................................................................
await T.perform(probe, matcher, error, function() {
return new Promise(function(resolve, reject) {
var pipeline;
pipeline = [];
pipeline.push(probe);
pipeline.push(SP.$chunkify_toss((function(d) {
return d === '(' || d === ')';
}), function(chunk) {
return chunk.join('|');
}));
pipeline.push(SP.$drain(function(collector) {
return resolve(collector);
}));
SP.pull(...pipeline);
//.....................................................................................................
return null;
});
});
}
//.........................................................................................................
done();
return null;
};
//-----------------------------------------------------------------------------------------------------------
this["$chunkify_keep with postprocessing"] = async function(T, done) {
var error, i, len, matcher, probe, probes_and_matchers;
probes_and_matchers = [[[], [], null], ['abcdefg', ['a|b|c|d|e|f|g'], null], ['ab(cdefg)', ['a|b|(', 'c|d|e|f|g|)'], null]];
//.........................................................................................................
for (i = 0, len = probes_and_matchers.length; i < len; i++) {
[probe, matcher, error] = probes_and_matchers[i];
//.......................................................................................................
await T.perform(probe, matcher, error, function() {
return new Promise(function(resolve, reject) {
var pipeline;
pipeline = [];
pipeline.push(probe);
pipeline.push(SP.$chunkify_keep((function(d) {
return d === '(' || d === ')';
}), function(chunk) {
return chunk.join('|');
}));
pipeline.push(SP.$drain(function(collector) {
return resolve(collector);
}));
SP.pull(...pipeline);
//.....................................................................................................
return null;
});
});
}
//.........................................................................................................
done();
return null;
};
/*

@@ -1086,14 +1209,19 @@

if (module.parent == null) {
// test @, 'timeout': 30000
// test @[ "$filter" ]
// test @[ "end push source (2)" ]
// test @[ "remit 1" ]
// test @[ "drain with result" ]
// test @[ "remit 2" ]
// test @[ "remit with end detection 1" ]
// test @[ "duct_from_transforms" ]
test(this["composability (through)"]);
// test @[ "$chunkify 1" ]
// test @[ "$chunkify_keep no postprocessing" ]
// test @[ "$chunkify_toss no postprocessing" ]
// test @[ "$chunkify_toss with postprocessing" ]
test(this["$chunkify_keep with postprocessing"]);
}
// test @[ "composability (source)" ]
// test @, 'timeout': 30000
// test @[ "$filter" ]
// test @[ "end push source (2)" ]
// test @[ "remit 1" ]
// test @[ "drain with result" ]
// test @[ "remit 2" ]
// test @[ "remit with end detection 1" ]
// test @[ "duct_from_transforms" ]
// test @[ "composability (through)" ]
// test @[ "composability (source)" ]
// test @[ "composability (sink)" ]

@@ -1120,3 +1248,1 @@ // test @[ "remit with end detection 2" ]

}).call(this);
//# sourceMappingURL=basic.test.js.map

2

package.json
{
"name": "steampipes",
"version": "6.2.0",
"version": "6.3.0",
"description": "Fast, simple data pipelines",

@@ -5,0 +5,0 @@ "main": "lib/main.js",

@@ -97,2 +97,39 @@

* **`@chunkify_*()`**—cut stream by observing boundaries. Depending on whether to keep or toss datoms
recognbized by the `filter` function as boundaries, use either
* **`@$chunkify_keep = ( filter, postprocess = null ) ->`** or
* **`@$chunkify_toss = ( filter, postprocess = null ) ->`**
The second, optional `postprocess` argument must be a function when given; it will receive a list of
datoms and may return any value which will then be sent on. Sample application:
```coffee
filter = ( d ) -> d in [ '(', ')', ]
postprocess = ( chunk ) -> chunk.join '|'
pipeline = []
pipeline.push 'ab(cdefg)'
pipeline.push SP.$chunkify_keep filter, postprocess
pipeline.push SP.$show()
pipeline.push SP.$drain -> resolve()
SP.pull pipeline...
```
will print
```
'a|b|('
'c|d|e|f|g|)'
```
Had we used `SP.$chunkify_toss filter, postprocess` instead, the output would have been
```
'a|b'
'c|d|e|f|g'
```
So if one just wanted to collect all stream items into a single list, one would use either `SP.$collect()`
or else an argument to the drain transform, as in `SP.$drain ( collector ) -> resolve collector`; if one
wanted to collect all stream items into multiple lists, then `SP.$chunkify_{keep|toss} filter, ...` is the
way to go.
### Modifiers and `$before_first()`, `$after_last()`

@@ -312,3 +349,3 @@

pipeline at large and the transform?
* [ ] obscure bug: when a push source is used with a stream that comes from another instance of the
* [X] obscure bug: when a push source is used with a stream that comes from another instance of the
SteamPipes library (as in, `( require 'pathA/steampipes' ).new_push_source()` is used in a pipeline that

@@ -318,5 +355,9 @@ is activated by) `( require 'pathB/steampipes' ).pull pipeline...`) and error with message

object` results. The message should at least hint point at the probable error cause or be avoided at all.
**FIXED in v6.2**: replaced `Symbol 'xy'` with `Symbol.from 'steampipes/xy'` in `SP.marks`.
* [ ] bug: async functions passed into `$drain()`, attached to `push_source.start` and possibly other places
are not called or not called with `await`, thus causing silent failures. Must always reject loudly where
detected or be handled appropriately.
* [X] implement `$chunkify()` (as configurable variant of `$collect()`?); see code comment in standard
transforms
### Future: JS Pipeline Operator

@@ -323,0 +364,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc