Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

steampipes

Package Overview
Dependencies
Maintainers
1
Versions
30
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

steampipes - npm Package Compare versions

Comparing version 0.1.0 to 3.0.0

lib/experiments/tests/circular-pipelines.test.js

582

lib/main.js

@@ -0,4 +1,5 @@

// Generated by CoffeeScript 2.4.1
(function() {
'use strict';
var $watch, CND, assign, badge, debug, declare, echo, first_of, help, info, isa, jr, last_of, misfit, remit_defaults, rpr, size_of, type_of, types, urge, validate, warn, whisper;
var CND, FS, L, Multimix, Steampipes, badge, debug, echo, help, info, isa, jr, rpr, type_of, urge, validate, warn, whisper;

@@ -10,3 +11,3 @@ //###########################################################################################################

badge = 'STEAMPIPES/MAIN';
badge = 'STEAMPIPES/BASICS';

@@ -30,564 +31,61 @@ debug = CND.get_logger('debug', badge);

assign = Object.assign;
//...........................................................................................................
types = require('./_types');
this.types = require('./types');
({isa, validate, declare, first_of, last_of, size_of, type_of} = types);
({isa, validate, type_of} = this.types);
misfit = Symbol('misfit');
Multimix = require('multimix');
//===========================================================================================================
FS = require('fs');
//-----------------------------------------------------------------------------------------------------------
/* Signals are special values that, when sent down the pipeline, may alter behavior: */
this.signals = Object.freeze({
last: Symbol('last'), // Used to signal last data item
end: Symbol('end') // Request stream to terminate
});
Steampipes = (function() {
var filename, filenames, i, len, path;
//-----------------------------------------------------------------------------------------------------------
/* Marks are special values that identify types, behavior of pipeline elements etc: */
this.marks = Object.freeze({
isa_source: Symbol('isa_source'), // Marks a source as such
isa_through: Symbol('isa_through'), // Marks a through as such
isa_sink: Symbol('isa_sink'), // Marks a sink as such
isa_duct: Symbol('isa_duct'), // Marks a duct as such
isa_pusher: Symbol('isa_pusher'), // Marks a push source as such
send_last: Symbol('send_last') // Marks transforms expecting a certain value before EOS
});
//-----------------------------------------------------------------------------------------------------------
remit_defaults = Object.freeze({
first: misfit,
last: misfit,
between: misfit,
after: misfit,
before: misfit
});
//===========================================================================================================
//-----------------------------------------------------------------------------------------------------------
this._get_remit_settings = function(settings, method) {
var arity, remit_arity;
switch (remit_arity = arguments.length) {
case 1:
[method, settings] = [settings, null];
break;
case 2:
settings = {...remit_defaults, ...settings};
break;
default:
throw new Error(`µ19358 expected 1 or 2 arguments, got ${remit_arity}`);
}
//.........................................................................................................
validate.function(method);
if ((arity = method.length) !== 2) {
throw new Error(`µ20123 method arity ${arity} not implemented`);
}
if (settings != null) {
if (settings.leapfrog != null) {
validate.function(settings.leapfrog);
//-----------------------------------------------------------------------------------------------------------
class Steampipes extends Multimix {
//---------------------------------------------------------------------------------------------------------
constructor(settings = null) {
super();
this.settings = settings;
}
settings._surround = (settings.first !== misfit) || (settings.last !== misfit) || (settings.between !== misfit) || (settings.after !== misfit) || (settings.before !== misfit);
}
//.........................................................................................................
return {settings, method};
};
//-----------------------------------------------------------------------------------------------------------
this.remit = this.$ = (...P) => {
var ME, R, data_after, data_before, data_between, data_first, data_last, do_leapfrog, has_returned, is_first, method, on_end, self, send, send_after, send_before, send_between, send_first, send_last, settings, tsend;
({settings, method} = this._get_remit_settings(...P));
has_returned = false;
send = null;
//.........................................................................................................
tsend = (d) => {
if (has_returned) {
throw new Error("µ55663 illegal to call send() after method has returned");
}
return send(d);
};
tsend.end = function() {
return send.end();
};
//.........................................................................................................
if (settings == null) {
/* fast track without surround features */
return (d, send_) => {
send = send_;
has_returned = false;
method(d, tsend);
has_returned = true;
return null;
};
}
//.........................................................................................................
self = null;
do_leapfrog = settings.leapfrog;
data_first = settings.first;
data_before = settings.before;
data_between = settings.between;
data_after = settings.after;
data_last = settings.last;
send_first = data_first !== misfit;
send_before = data_before !== misfit;
send_between = data_between !== misfit;
send_after = data_after !== misfit;
send_last = data_last !== misfit;
on_end = null;
is_first = true;
ME = this;
//.........................................................................................................
/* slow track with surround features */
R = (d, send_) => {
// debug 'µ55641', d, d is @signals.last
send = send_;
has_returned = false;
//.......................................................................................................
if (send_last && d === this.signals.last) {
method(data_last, tsend);
} else {
//.......................................................................................................
if (is_first) {
(send_first ? method(data_first, tsend) : void 0);
} else {
(send_between ? method(data_between, tsend) : void 0);
}
if (send_before) {
method(data_before, tsend);
}
is_first = false;
//.....................................................................................................
// When leapfrogging is being called for, only call method if the jumper returns false:
if ((!do_leapfrog) || (!settings.leapfrog(d))) {
method(d, tsend);
} else {
send(d);
}
if (send_after) {
//.....................................................................................................
method(data_after, tsend);
}
}
has_returned = true;
return null;
};
if (send_last) {
//.........................................................................................................
R[this.marks.send_last] = this.marks.send_last;
}
return R;
};
//-----------------------------------------------------------------------------------------------------------
this.$map = function(method) {
return (d, send) => {
return send(method(d));
};
};
// @extend object_with_class_properties
filenames = FS.readdirSync(__dirname);
this.$drain = function(on_end = null) {
var ref;
return {[ref = this.marks.isa_sink]: ref, on_end};
};
this.$pass = function() {
return (d, send) => {
return send(d);
};
};
//-----------------------------------------------------------------------------------------------------------
this.$show = function(settings) {
var ref, title;
title = ((ref = settings != null ? settings.title : void 0) != null ? ref : '-->') + ' ';
return this.$((d, send) => {
info(title + jr(d));
return send(d);
});
};
// #-----------------------------------------------------------------------------------------------------------
// @$xs = ( method ) -> ( d, send ) =>
// if ( d.$stamped ? false ) then return send d
// method d, send
//-----------------------------------------------------------------------------------------------------------
$watch = function(settings, method) {
/* If any `surround` feature is called for, wrap all surround values so that we can safely
distinguish between them and ordinary stream values; this is necessary to prevent them from leaking
into the regular stream outside the `$watch` transform: */
var arity, key, take_second, value;
switch (arity = arguments.length) {
case 1:
method = settings;
return this.$((d, send) => {
method(d);
return send(d);
});
//.......................................................................................................
case 2:
if (settings == null) {
return this.$watch(method);
}
take_second = Symbol('take-second');
settings = assign({}, settings);
for (key in settings) {
value = settings[key];
settings[key] = [take_second, value];
}
//.....................................................................................................
return this.$(settings, (d, send) => {
if ((CND.isa_list(d)) && (d[0] === take_second)) {
method(d[1]);
} else {
method(d);
send(d);
}
return null;
});
}
//.........................................................................................................
throw new Error(`µ18244 expected one or two arguments, got ${arity}`);
};
this.$watch = $watch.bind(this);
//-----------------------------------------------------------------------------------------------------------
this.$as_text = function(settings) {
return (d, send) => {
var ref, serialize;
serialize = (ref = settings != null ? settings['serialize'] : void 0) != null ? ref : JSON.stringify;
return this.$map((data) => {
return serialize(data);
});
};
};
//-----------------------------------------------------------------------------------------------------------
this._classify_transform = function(transform) {
var R, name, name1, name2;
R = (() => {
var type;
if (transform[this.marks.isa_duct] != null) {
return {
type: transform.type
};
for (i = 0, len = filenames.length; i < len; i++) {
filename = filenames[i];
if (!filename.endsWith('.js')) {
continue;
}
if (transform[this.marks.isa_pusher] != null) {
return {
type: 'source',
isa_pusher: true
};
if (filename.startsWith('_')) {
continue;
}
if (transform[Symbol.iterator] != null) {
return {
type: 'source'
};
if (filename === 'main.js') {
continue;
}
switch (type = type_of(transform)) {
case 'function':
return {
type: 'through'
};
case 'generatorfunction':
return {
type: 'source',
must_call: true
};
if (filename === 'types.js') {
continue;
}
if (transform[this.marks.isa_sink] != null) {
return {
type: 'sink',
on_end: transform.on_end
};
}
throw new Error(`µ44521 expected an iterable, a function, a generator function or a sink, got a ${type}`);
})();
switch (R.type) {
case 'source':
if (transform[name = this.marks.isa_source] == null) {
transform[name] = this.marks.isa_source;
}
break;
case 'through':
if (transform[name1 = this.marks.isa_through] == null) {
transform[name1] = this.marks.isa_through;
}
break;
case 'sink':
if (transform[name2 = this.marks.isa_sink] == null) {
transform[name2] = this.marks.isa_sink;
}
path = './' + filename;
Steampipes.include(require(path));
}
return R;
};
//-----------------------------------------------------------------------------------------------------------
this._flatten_transforms = function(transforms, R = null) {
var i, j, len, len1, ref, t, transform;
if (R == null) {
R = [];
}
for (i = 0, len = transforms.length; i < len; i++) {
transform = transforms[i];
if (transform[this.marks.isa_duct] != null) {
ref = transform.transforms;
for (j = 0, len1 = ref.length; j < len1; j++) {
t = ref[j];
/* TAINT necessary to do this recursively? */
R.push(t);
}
} else {
R.push(transform);
}
}
return R;
};
return Steampipes;
//-----------------------------------------------------------------------------------------------------------
this._new_duct = function(transforms) {
var R, b, blurbs, i, idx, key, ref, ref1, transform;
transforms = this._flatten_transforms(transforms);
R = {[ref = this.marks.isa_duct]: ref, transforms};
blurbs = (function() {
var i, len, results;
results = [];
for (i = 0, len = transforms.length; i < len; i++) {
transform = transforms[i];
results.push(this._classify_transform(transform));
}
return results;
}).call(this);
if (transforms.length === 0) {
return {
...R,
is_empty: true
};
}
//.........................................................................................................
R.first = blurbs[0];
if (transforms.length === 1) {
R.is_single = true;
R.last = R.first;
R.type = R.first.type;
} else {
R.last = blurbs[transforms.length - 1];
switch (key = `${R.first.type}/${R.last.type}`) {
case 'source/through':
R.type = 'source';
break;
case 'through/sink':
R.type = 'sink';
break;
case 'through/through':
R.type = 'through';
break;
case 'source/sink':
R.type = 'circuit';
break;
default:
throw new Error(`µ44521 illegal duct configuration ${rpr(key)}`);
}
for (idx = i = 1, ref1 = blurbs.length - 1; i < ref1; idx = i += +1) {
if ((b = blurbs[idx]).type !== 'through') {
throw new Error(`µ44522 illegal duct configuration at transform index ${idx}: ${rpr(b)}`);
}
}
}
return R;
};
}).call(this);
//-----------------------------------------------------------------------------------------------------------
this._pull = function(...transforms) {
var _, duct, exhaust_pipeline, last, local_sink, local_source, mem_source, mem_sources, original_source, send;
duct = this._new_duct(transforms);
({transforms} = duct);
original_source = null;
if (duct.last.type === 'source') {
throw new Error("µ77764 source as last transform not yet supported");
}
if (duct.first.type === 'sink') {
throw new Error("µ77765 sink as first transform not yet supported");
}
//.........................................................................................................
if (duct.first.type === 'source') {
original_source = transforms.shift();
if (duct.first.must_call) {
original_source = original_source();
}
}
//.........................................................................................................
if (duct.last.type === 'sink') {
transforms.pop();
}
if (duct.type !== 'circuit') {
//.........................................................................................................
return duct;
}
//.........................................................................................................
duct.original_source = original_source;
duct.mem_source = mem_source = [];
duct.mem_sources = mem_sources = [
mem_source,
...((function() {
var i,
ref,
results;
results = [];
for (_ = i = 0, ref = transforms.length; (0 <= ref ? i < ref : i > ref); _ = 0 <= ref ? ++i : --i) {
results.push([]);
}
return results;
})())
];
duct.has_ended = false;
local_sink = null;
local_source = null;
last = this.signals.last;
//.........................................................................................................
send = (d) => {
if (d === this.signals.end) {
return duct.has_ended = true;
}
return local_sink.push(d);
};
send.end = () => {
return duct.has_ended = true;
};
//.........................................................................................................
exhaust_pipeline = () => {
var d, has_data, i, idx, len, transform;
while (true) {
has_data = false;
for (idx = i = 0, len = transforms.length; i < len; idx = ++i) {
transform = transforms[idx];
if ((local_source = mem_sources[idx]).length === 0) {
continue;
}
has_data = true;
local_sink = mem_sources[idx + 1];
d = local_source.shift();
if (d === last) {
if (transform[this.marks.send_last] != null) {
transform(d, send);
}
send(last);
} else {
transform(d, send);
}
}
if (!has_data) {
break;
}
}
return null;
};
//.........................................................................................................
duct.send = send;
duct.exhaust_pipeline = exhaust_pipeline;
//.........................................................................................................
return duct;
};
// @specs = {}
// @isa = Multimix.get_keymethod_proxy @, isa
// # @validate = Multimix.get_keymethod_proxy @, validate
// declarations.declare_types.apply @
//-----------------------------------------------------------------------------------------------------------
this.pull = function(...transforms) {
var d, duct, ref;
duct = this._pull(...transforms);
if (duct.type !== 'circuit') {
return duct;
}
if (duct.original_source[this.marks.isa_pusher] != null) {
return this._push(duct);
}
ref = duct.original_source;
//.........................................................................................................
for (d of ref) {
if (duct.has_ended) {
break;
}
// continue if d is @signals.discard
duct.mem_source.push(d);
duct.exhaust_pipeline();
}
//.........................................................................................................
duct.mem_source.push(this.signals.last);
duct.exhaust_pipeline();
if (duct.last.on_end != null) {
duct.last.on_end();
}
return null;
};
//###########################################################################################################
module.exports = L = new Steampipes();
//-----------------------------------------------------------------------------------------------------------
this._push = function(duct) {
/* Make `duct` available from the POV of the push source: */
duct.original_source.duct = duct;
/* copy buffered data (from before when `pull()` was called) to `original_source`: */
duct.mem_source.splice(duct.mem_source.length, 0, ...duct.original_source.buffer);
/* Process any data as may have accumulated at this point: */
duct.exhaust_pipeline();
return null;
};
L.Steampipes = Steampipes;
//===========================================================================================================
// SOURCES
//-----------------------------------------------------------------------------------------------------------
this.new_value_source = function*(x) {
return (yield* x);
};
}).call(this);
//-----------------------------------------------------------------------------------------------------------
this.new_push_source = function() {
var R, end, ref, send;
send = (d) => {
if (R.duct == null) {
return R.buffer.push(d);
}
R.buffer = null;
if (d === this.signals.end) {
return end();
}
R.duct.mem_source.push(d);
R.duct.exhaust_pipeline();
return null;
};
end = () => {
R.duct.mem_source.push(this.signals.last);
R.duct.exhaust_pipeline();
if (R.duct.last.on_end != null) {
R.duct.last.on_end();
}
return R.duct = null;
};
R = {
[ref = this.marks.isa_pusher]: ref,
send,
end,
buffer: [],
duct: null
};
return R;
};
//===========================================================================================================
//-----------------------------------------------------------------------------------------------------------
this.$collect = function(settings) {
var collector, last, ref;
collector = (ref = settings != null ? settings.collector : void 0) != null ? ref : [];
last = Symbol('last');
return this.$({last}, (d, send) => {
if (d === last) {
return send(collector);
}
collector.push(d);
return null;
});
};
}).call(this);
//# sourceMappingURL=main.js.map
(function() {
'use strict';
var $, $async, $send_three, CND, SP, after, badge, debug, echo, help, info, jr, rpr, test, urge, warn, whisper;
var $, $async, $send_three, $show, $watch, CND, SP, after, badge, debug, defer, echo, help, info, jr, rpr, test, urge, warn, whisper;

@@ -10,3 +10,3 @@ //###########################################################################################################

badge = 'STEAMPIPES/TESTS/ASYNC-MAP';
badge = 'STEAMPIPES/TESTS/ASYNC';

@@ -35,4 +35,6 @@ debug = CND.get_logger('debug', badge);

({$, $async} = SP);
({$, $async, $watch, $show} = SP.export());
defer = setImmediate;
//-----------------------------------------------------------------------------------------------------------

@@ -44,32 +46,55 @@ after = function(dts, f) {

//-----------------------------------------------------------------------------------------------------------
this["async 1"] = function(T, done) {
var matcher, ok, pipeline, probe;
this["async 0"] = async function(T, done) {
var i, len, matcher, ok, probe, ref, use_async;
ok = false;
[probe, matcher] = ["abcdef", "1a-2a-1b-2b-1c-2c-1d-2d-1e-2e-1f-2f"];
pipeline = [];
pipeline.push(SP.new_value_source(Array.from(probe)));
pipeline.push($async(function(d, send, done) {
send(`1${d}`);
send(`2${d}`);
return done();
}));
pipeline.push(SP.$surround({
between: '-'
}));
pipeline.push(SP.$join());
//.........................................................................................................
pipeline.push(SP.$watch(function(result) {
echo(CND.gold(jr([probe, result])));
T.eq(result, matcher);
return ok = true;
}));
//.........................................................................................................
pipeline.push(SP.$drain(function() {
if (!ok) {
T.fail("failed to pass test");
}
return done();
}));
//.........................................................................................................
SP.pull(...pipeline);
ref = [true, false];
for (i = 0, len = ref.length; i < len; i++) {
use_async = ref[i];
await (() => {
return new Promise((resolve) => {
var pipeline;
pipeline = [];
pipeline.push(probe);
// pipeline.push $watch ( d ) -> info 'µ1', jr d
if (use_async) {
pipeline.push($async(function(d, send, done) {
defer(function() {
return send(`1${d}`);
});
return after(0.1, function() {
send(`2${d}`);
return done();
});
}));
} else {
pipeline.push($(function(d, send) {
send(`1${d}`);
return send(`2${d}`);
}));
}
// pipeline.push $watch ( d ) -> urge 'µ2', jr d
pipeline.push(SP.$surround({
between: '-'
}));
pipeline.push(SP.$join());
//.........................................................................................................
pipeline.push(SP.$watch(function(result) {
echo(CND.gold(jr([probe, result])));
T.eq(result, matcher);
return ok = true;
}));
//.........................................................................................................
pipeline.push(SP.$drain(function() {
if (!ok) {
T.fail("failed to pass test");
}
return resolve();
}));
//.........................................................................................................
return SP.pull(...pipeline);
});
})();
}
done();
return null;

@@ -80,3 +105,3 @@ };

$send_three = function() {
return SP.$async(function(d, send, done) {
return $async(function(d, send, done) {
var count, i, nr;

@@ -86,3 +111,5 @@ count = 0;

(function(d, nr) {
return after(Math.random() / 5, function() {
var dt;
dt = Math.random() / 10;
return after(dt, function() {
count += 1;

@@ -104,7 +131,8 @@ send(`(${d}:${nr})`);

ok = false;
probe = "abcdef";
probe = "fdcabe";
matcher = "(a:1)(a:2)(a:3)(b:1)(b:2)(b:3)(c:1)(c:2)(c:3)(d:1)(d:2)(d:3)(e:1)(e:2)(e:3)(f:1)(f:2)(f:3)";
pipeline = [];
pipeline.push(SP.new_value_source(Array.from(probe)));
pipeline.push(Array.from(probe));
pipeline.push($send_three());
// pipeline.push $show { title: '2', }
pipeline.push(SP.$sort());

@@ -128,2 +156,3 @@ pipeline.push(SP.$join());

//.........................................................................................................
// T.throws /contains asynchronous transform/, -> SP.pull pipeline...
SP.pull(...pipeline);

@@ -135,8 +164,10 @@ return null;

if (module.parent == null) {
test(this);
test(this, {
timeout: 10000
});
}
// test @[ "async 1" ]
// test @[ "async 2" ]
// test @[ "async 0" ], { timeout: 10000, }
// test @[ "async 2" ], { timeout: 10000, }
}).call(this);

@@ -0,1 +1,2 @@

// Generated by CoffeeScript 2.4.1
(function() {

@@ -41,3 +42,3 @@ //###########################################################################################################

({$, $async, $watch, $show} = SP);
({$, $async, $watch, $show} = SP.export());

@@ -215,6 +216,104 @@ //...........................................................................................................

//-----------------------------------------------------------------------------------------------------------
this["remit"] = function(T, done) {
this["remit 1"] = function(T, done) {
var pipeline, result;
result = [];
pipeline = [];
pipeline.push([1, 2, 3]);
pipeline.push($(function(d, send) {
info('µ1', d);
return send(d + 10);
}));
pipeline.push($(function(d, send) {
info('µ2', d);
send(d);
return send(d + 10);
}));
pipeline.push($(function(d, send) {
info('µ3', d);
result.push(d);
return send(d);
}));
pipeline.push(SP.$drain(function() {
// debug 'µ11121', jr result
T.eq(result, [11, 21, 12, 22, 13, 23]);
return done();
}));
return SP.pull(...pipeline);
};
//-----------------------------------------------------------------------------------------------------------
this["drain with result"] = function(T, done) {
var duct, pipeline;
pipeline = [];
pipeline.push([1, 2, 3]);
pipeline.push($(function(d, send) {
info('µ1', d);
return send(d + 10);
}));
pipeline.push($(function(d, send) {
info('µ2', d);
send(d);
return send(d + 10);
}));
pipeline.push(SP.$drain(function(result) {
// debug 'µ1112-1', duct
// debug 'µ1112-2', jr result
T.eq(result, [11, 21, 12, 22, 13, 23]);
return done();
}));
return duct = SP.pull(...pipeline);
};
//-----------------------------------------------------------------------------------------------------------
this["drain with sink 1"] = function(T, done) {
var duct, pipeline, sink;
sink = [];
pipeline = [];
pipeline.push([1, 2, 3]);
pipeline.push($(function(d, send) {
info('µ1', d);
return send(d + 10);
}));
pipeline.push($(function(d, send) {
info('µ2', d);
send(d);
return send(d + 10);
}));
pipeline.push(SP.$drain({sink}, function(result) {
// debug 'µ1112-1', duct
// debug 'µ1112-2', jr result
T.ok(result === sink);
T.eq(result, [11, 21, 12, 22, 13, 23]);
return done();
}));
return duct = SP.pull(...pipeline);
};
//-----------------------------------------------------------------------------------------------------------
this["drain with sink 2"] = function(T, done) {
var duct, pipeline, sink;
sink = [];
pipeline = [];
pipeline.push([1, 2, 3]);
pipeline.push($(function(d, send) {
info('µ1', d);
return send(d + 10);
}));
pipeline.push($(function(d, send) {
info('µ2', d);
send(d);
return send(d + 10);
}));
pipeline.push(SP.$drain({sink}, function() {
T.eq(sink, [11, 21, 12, 22, 13, 23]);
return done();
}));
return duct = SP.pull(...pipeline);
};
//-----------------------------------------------------------------------------------------------------------
this["remit 2"] = function(T, done) {
var pipeline, result;
result = [];
pipeline = [];
pipeline.push(Array.from('abcd'));

@@ -425,16 +524,17 @@ pipeline.push(SP.$map(function(d) {

T.eq(r.type, 'source');
T.eq((ref1 = r.first.isa_pusher) != null ? ref1 : false, true);
return T.eq(r.transforms[0][SP.marks.isa_source], SP.marks.isa_source);
return T.eq((ref1 = r.first.isa_pusher) != null ? ref1 : false, true);
})();
(() => { //.........................................................................................................
var on_end, r, sink;
var drain, on_end, r, sink;
r = SP._new_duct([sink = SP.$drain(on_end = (function() {}))]);
drain = r.transforms[r.transforms.length - 1];
T.eq(r.first, r.last);
T.eq(r.is_single, true);
T.eq(r.first.type, 'sink');
T.eq(r.type, 'sink');
T.eq(r.last.on_end, on_end);
return T.eq(r.transforms[0][SP.marks.isa_sink], SP.marks.isa_sink);
return T.eq(r.type, 'sink');
})();
(() => { //.........................................................................................................
(() => { // T.eq r.last.on_end, on_end
// T.eq r.transforms[ 0 ][ SP.marks.steampipes ], SP.marks.steampipes
// T.eq r.transforms[ 0 ].type, 'sink'
//.........................................................................................................
var r, through;

@@ -446,4 +546,3 @@ r = SP._new_duct([through = SP.$((function(d, send) {}))]);

T.eq(r.type, 'through');
T.eq(r.transforms[0], through);
return T.eq(r.transforms[0][SP.marks.isa_through], SP.marks.isa_through);
return T.eq(r.transforms[0], through);
})();

@@ -456,4 +555,3 @@ (() => { //.........................................................................................................

T.eq(r.first.type, 'source');
T.eq(r.type, 'source');
return T.eq(r.transforms[0][SP.marks.isa_source], SP.marks.isa_source);
return T.eq(r.type, 'source');
})();

@@ -478,3 +576,3 @@ (() => { //.........................................................................................................

//-----------------------------------------------------------------------------------------------------------
this["composability"] = async function(T, done) {
this["composability (through)"] = async function(T, done) {
var error, matcher, probe;

@@ -484,3 +582,3 @@ [probe, matcher, error] = [["what", "a", "lot", "of", "little", "bottles"], ["what", "a", "lot", "of", "little", "bottles"], null];

return new Promise(function(resolve, reject) {
var R, duct_A, pipeline_A, pipeline_B, source;
var R, duct_A, duct_B, length_of_A, length_of_B, pipeline_A, pipeline_B, source;
R = [];

@@ -496,4 +594,5 @@ source = probe;

}));
length_of_A = pipeline_A.length;
duct_A = SP.pull(...pipeline_A);
T.eq(duct_A.transforms.length, 2);
T.eq(duct_A.transforms.length, length_of_A);
T.eq(duct_A.type, 'through');

@@ -508,6 +607,9 @@ //.......................................................................................................

pipeline_B.push(SP.$drain(function() {
help('ok');
return resolve(R);
return help('ok');
}));
SP.pull(...pipeline_B);
length_of_B = pipeline_B.length - 1 + length_of_A;
duct_B = SP.pull(...pipeline_B);
T.eq(duct_B.transforms.length, length_of_B);
T.eq(duct_B.type, 'circuit');
resolve(R);
return null;

@@ -521,2 +623,91 @@ });

//-----------------------------------------------------------------------------------------------------------
this["composability (source)"] = async function(T, done) {
var error, matcher, probe;
[probe, matcher, error] = ["𦇻𦑛𦖵𦩮𦫦𧞈", Array.from('𦇻𦑛𦖵𦩮𦫦𧞈'), null];
await T.perform(probe, matcher, error, function() {
return new Promise(function(resolve, reject) {
var R, duct_A, duct_B, length_of_A, length_of_B, pipeline_A, pipeline_B, source;
R = [];
source = probe;
//.......................................................................................................
pipeline_A = [];
pipeline_A.push(source);
pipeline_A.push(SP.$watch(function(d) {
return info(xrpr(d));
}));
pipeline_A.push(SP.$collect({
collector: R
}));
length_of_A = pipeline_A.length;
duct_A = SP.pull(...pipeline_A);
T.eq(duct_A.transforms.length, length_of_A);
T.eq(duct_A.type, 'source');
//.......................................................................................................
pipeline_B = [];
pipeline_B.push(duct_A);
pipeline_B.push(SP.$watch(function(d) {
return info(xrpr(d));
}));
pipeline_B.push(SP.$drain(function() {
return help('ok');
}));
length_of_B = pipeline_B.length - 1 + length_of_A;
duct_B = SP.pull(...pipeline_B);
T.eq(duct_B.transforms.length, length_of_B);
T.eq(duct_B.type, 'circuit');
resolve(R);
return null;
});
});
//.........................................................................................................
done();
return null;
};
//-----------------------------------------------------------------------------------------------------------
this["composability (sink)"] = async function(T, done) {
var error, matcher, probe;
[probe, matcher, error] = ["𦇻𦑛𦖵𦩮𦫦𧞈", Array.from('𦇻𦑛𦖵𦩮𦫦𧞈'), null];
await T.perform(probe, matcher, error, function() {
return new Promise(function(resolve, reject) {
var R, duct_A, duct_B, length_of_A, length_of_B, pipeline_A, pipeline_B, source;
R = [];
source = probe;
//.......................................................................................................
pipeline_A = [];
pipeline_A.push(SP.$watch(function(d) {
return info(xrpr(d));
}));
pipeline_A.push(SP.$collect({
collector: R
}));
pipeline_A.push(SP.$drain(function() {
return help('ok');
}));
length_of_A = pipeline_A.length;
duct_A = SP.pull(...pipeline_A);
T.eq(duct_A.transforms.length, length_of_A);
T.eq(duct_A.type, 'sink');
//.......................................................................................................
pipeline_B = [];
pipeline_B.push(source);
pipeline_B.push(SP.$watch(function(d) {
return info(xrpr(d));
}));
pipeline_B.push(duct_A);
length_of_B = pipeline_B.length - 1 + length_of_A;
duct_B = SP.pull(...pipeline_B);
T.eq(duct_B.transforms.length, length_of_B);
T.eq(duct_B.type, 'circuit');
// debug 'µ11124', duct_B
resolve(R);
return null;
});
});
//.........................................................................................................
done();
return null;
};
/*

@@ -819,52 +1010,2 @@

#-----------------------------------------------------------------------------------------------------------
@[ "leapfrog 1" ] = ( T, done ) ->
* through = require 'pull-through'
probes_and_matchers = [
[[[ 1 .. 10], ( ( d ) -> d %% 2 isnt 0 ), ],[1,102,3,104,5,106,7,108,9,110],null]
]
#.........................................................................................................
collector = []
for [ probe, matcher, error, ] in probes_and_matchers
await T.perform probe, matcher, error, -> new Promise ( resolve ) ->
[ values
jumper ] = probe
#.....................................................................................................
source = SP.new_value_source values
collector = []
pipeline = []
pipeline.push source
pipeline.push SP.$ { leapfrog: jumper, }, ( d, send ) -> send 100 + d
pipeline.push SP.$collect { collector, }
pipeline.push SP.$drain -> resolve collector
pull pipeline...
#.........................................................................................................
done()
return null
#-----------------------------------------------------------------------------------------------------------
@[ "leapfrog 2" ] = ( T, done ) ->
* through = require 'pull-through'
probes_and_matchers = [
[[[ 1 .. 10], ( ( d ) -> d %% 2 isnt 0 ), ],[1,102,3,104,5,106,7,108,9,110],null]
]
#.........................................................................................................
collector = []
for [ probe, matcher, error, ] in probes_and_matchers
await T.perform probe, matcher, error, -> new Promise ( resolve ) ->
[ values
jumper ] = probe
#.....................................................................................................
source = SP.new_value_source values
collector = []
pipeline = []
pipeline.push source
pipeline.push SP.$ { leapfrog: jumper, }, ( d, send ) -> send 100 + d
pipeline.push SP.$collect { collector, }
pipeline.push SP.$drain -> resolve collector
pull pipeline...
#.........................................................................................................
done()
return null
#-----------------------------------------------------------------------------------------------------------
@[ "$scramble" ] = ( T, done ) ->

@@ -908,6 +1049,10 @@ probes_and_matchers = [

// test @[ "remit" ]
// test @[ "remit 1" ]
// test @[ "drain with result" ]
// test @[ "remit 2" ]
// test @[ "remit with end detection 1" ]
// test @[ "duct_from_transforms" ]
// test @[ "composability" ]
// test @[ "composability (through)" ]
// test @[ "composability (source)" ]
// test @[ "composability (sink)" ]
// test @[ "remit with end detection 2" ]

@@ -934,1 +1079,3 @@ // test @[ "remit with surrounds" ]

}).call(this);
//# sourceMappingURL=basic.test.js.map

@@ -0,4 +1,5 @@

// Generated by CoffeeScript 2.4.1
(function() {
//###########################################################################################################
var CND, FS, L, OS, PATH, alert, badge, debug, echo, glob, help, info, log, rpr, test, urge, warn, whisper;
var CND, FS, L, PATH, alert, badge, debug, echo, help, info, log, rpr, test, urge, warn, whisper;

@@ -9,3 +10,3 @@ CND = require('cnd');

badge = 'STEAMPIPES/TESTS/TEE';
badge = 'STEAMPIPES/TESTS/MAIN';

@@ -35,8 +36,4 @@ log = CND.get_logger('plain', badge);

OS = require('os');
test = require('guy-test');
glob = require('globby');
//###########################################################################################################

@@ -46,7 +43,13 @@ L = this;

(function() {
var i, key, len, module, path, paths, value;
paths = glob.sync(PATH.join(__dirname, '*.test.js'));
for (i = 0, len = paths.length; i < len; i++) {
path = paths[i];
// debug '39838', path
var filename, filenames, i, key, len, module, path, value;
filenames = FS.readdirSync(__dirname);
for (i = 0, len = filenames.length; i < len; i++) {
filename = filenames[i];
if (!filename.endsWith('.test.js')) {
continue;
}
if (filename.startsWith('_')) {
continue;
}
path = PATH.join(__dirname, filename);
module = require(path);

@@ -58,3 +61,2 @@ for (key in module) {

}
debug('39838', path, key);
if (L[key] != null) {

@@ -66,8 +68,9 @@ throw new Error(`duplicate key ${rpr(key)}`);

}
test(L, {
return test(L, {
timeout: 5000
});
return help("tested:");
})();
}).call(this);
//# sourceMappingURL=main.js.map
{
"name": "steampipes",
"version": "0.1.0",
"version": "3.0.0",
"description": "Fast, simple data pipelines",

@@ -31,3 +31,5 @@ "main": "lib/main.js",

"intertype": "^2.5.0",
"letsfreezethat": "^1.1.1"
"letsfreezethat": "^1.1.1",
"multimix": "^2.0.0",
"pipedreams": "^12.0.0"
},

@@ -34,0 +36,0 @@ "devDependencies": {

@@ -34,2 +34,55 @@ <!-- START doctoc generated TOC please keep comment here to allow auto update -->

### How to Construct Transforms
#### Sinks
Arbitrary objects can act as sinks provided they have a `sink` property; this property must be either set to
`true` for a generic sink or else be an object that has `push()` method (such as a list). A sink may,
furthermore, also have an `on_end()` method which, if set, must be a function that takes zero or one
argument.
If the `sink` property is a list, then it will receive all data items that arrive through the pipeline (the
resultant data of the pipeline); if it is `true`, then those data items will be discarded.
The `on_end()` method will be called when streaming has terminated (since the source was exhausted or a
transform called `aend.end()`); if it takes one argument, then that will be the list of resultant data. If
both the `sink` property has been set to a list and `on_end()` takes an argument, then that value will be
the `sink` property (you probably only want the one or the other in most cases).
```coffee
{ sink: true, }
{ sink: true, on_end: ( -> do_something() ), }
{ sink: true, on_end: ( ( result ) -> do_something result ), }
{ sink: x, on_end: ( ( result ) -> do_something result ### NB result is x ### ), }
```
The only SteamPipes method that produces a sink is `$drain()` (it should really be called `sink()` but for
compatibility with PipeStreams the name has been kept as a holdover from `pull-stream`). `$drain()` takes
zero, one or two arguments:
```coffee
$drain() is equiv. to { sink: true, }
$drain -> ... is equiv. to { sink: true, on_end: ( -> ... ), }
$drain ( x ) -> ... is equiv. to { sink: true, on_end: ( ( x ) -> ... ), }
$drain { sink: x, }, -> ... is equiv. to { sink: x, on_end: ( -> ... ), }
$drain { sink: x, }, ( x ) -> ... is equiv. to { sink: x, on_end: ( ( x ) -> ... ), }
```
### Asynchronous Sources and Transforms
Asynchronous transforms can be constructed using the 'asynchronous remit' method, `$async()`. The method
passed into `$async()` must accept three arguments, namely `d` (the data item coming down the pipeline),
`send` (the method to send data down the pipeline), and, in addition to synchronous transforms, `done`,
which is a callback function used to signal completion (it is analogous to the `resulve` argument of
promises, `new Promise ( resulve, reject ) ->` and indeed implemented as such). An example:
```coffee
X███████████████
X███████████████
X███████████████
X███████████████
```
### Ducts

@@ -102,5 +155,5 @@

There's no API to *abort* a stream—i.e. make the stream and all transforms cease and desist immediately—but
you can always wrap the `pull pipeline...` invocation into a `try`/`catch` clause and throw a custom
symbolic value:
There's no API to abort a stream—i.e. make the stream and all transforms stop processing immediately—but one
can always wrap the `pull pipeline...` invocation into a `try`/`catch` clause and throw a custom symbolic
value:

@@ -112,3 +165,3 @@ ```coffee

...
throw 'OHNOES!'
throw 'abort'
...

@@ -119,5 +172,5 @@ ...

catch error
throw error if error isnt 'OHNOES!'
throw error if error isnt 'abort'
warn "the stream was aborted"
...
```

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc