steampipes
Advanced tools
Comparing version 0.1.0 to 3.0.0
582
lib/main.js
@@ -0,4 +1,5 @@ | ||
// Generated by CoffeeScript 2.4.1 | ||
(function() { | ||
'use strict'; | ||
var $watch, CND, assign, badge, debug, declare, echo, first_of, help, info, isa, jr, last_of, misfit, remit_defaults, rpr, size_of, type_of, types, urge, validate, warn, whisper; | ||
var CND, FS, L, Multimix, Steampipes, badge, debug, echo, help, info, isa, jr, rpr, type_of, urge, validate, warn, whisper; | ||
@@ -10,3 +11,3 @@ //########################################################################################################### | ||
badge = 'STEAMPIPES/MAIN'; | ||
badge = 'STEAMPIPES/BASICS'; | ||
@@ -30,564 +31,61 @@ debug = CND.get_logger('debug', badge); | ||
assign = Object.assign; | ||
//........................................................................................................... | ||
types = require('./_types'); | ||
this.types = require('./types'); | ||
({isa, validate, declare, first_of, last_of, size_of, type_of} = types); | ||
({isa, validate, type_of} = this.types); | ||
misfit = Symbol('misfit'); | ||
Multimix = require('multimix'); | ||
//=========================================================================================================== | ||
FS = require('fs'); | ||
//----------------------------------------------------------------------------------------------------------- | ||
/* Signals are special values that, when sent down the pipeline, may alter behavior: */ | ||
this.signals = Object.freeze({ | ||
last: Symbol('last'), // Used to signal last data item | ||
end: Symbol('end') // Request stream to terminate | ||
}); | ||
Steampipes = (function() { | ||
var filename, filenames, i, len, path; | ||
//----------------------------------------------------------------------------------------------------------- | ||
/* Marks are special values that identify types, behavior of pipeline elements etc: */ | ||
this.marks = Object.freeze({ | ||
isa_source: Symbol('isa_source'), // Marks a source as such | ||
isa_through: Symbol('isa_through'), // Marks a through as such | ||
isa_sink: Symbol('isa_sink'), // Marks a sink as such | ||
isa_duct: Symbol('isa_duct'), // Marks a duct as such | ||
isa_pusher: Symbol('isa_pusher'), // Marks a push source as such | ||
send_last: Symbol('send_last') // Marks transforms expecting a certain value before EOS | ||
}); | ||
//----------------------------------------------------------------------------------------------------------- | ||
remit_defaults = Object.freeze({ | ||
first: misfit, | ||
last: misfit, | ||
between: misfit, | ||
after: misfit, | ||
before: misfit | ||
}); | ||
//=========================================================================================================== | ||
//----------------------------------------------------------------------------------------------------------- | ||
this._get_remit_settings = function(settings, method) { | ||
var arity, remit_arity; | ||
switch (remit_arity = arguments.length) { | ||
case 1: | ||
[method, settings] = [settings, null]; | ||
break; | ||
case 2: | ||
settings = {...remit_defaults, ...settings}; | ||
break; | ||
default: | ||
throw new Error(`µ19358 expected 1 or 2 arguments, got ${remit_arity}`); | ||
} | ||
//......................................................................................................... | ||
validate.function(method); | ||
if ((arity = method.length) !== 2) { | ||
throw new Error(`µ20123 method arity ${arity} not implemented`); | ||
} | ||
if (settings != null) { | ||
if (settings.leapfrog != null) { | ||
validate.function(settings.leapfrog); | ||
//----------------------------------------------------------------------------------------------------------- | ||
class Steampipes extends Multimix { | ||
//--------------------------------------------------------------------------------------------------------- | ||
constructor(settings = null) { | ||
super(); | ||
this.settings = settings; | ||
} | ||
settings._surround = (settings.first !== misfit) || (settings.last !== misfit) || (settings.between !== misfit) || (settings.after !== misfit) || (settings.before !== misfit); | ||
} | ||
//......................................................................................................... | ||
return {settings, method}; | ||
}; | ||
//----------------------------------------------------------------------------------------------------------- | ||
this.remit = this.$ = (...P) => { | ||
var ME, R, data_after, data_before, data_between, data_first, data_last, do_leapfrog, has_returned, is_first, method, on_end, self, send, send_after, send_before, send_between, send_first, send_last, settings, tsend; | ||
({settings, method} = this._get_remit_settings(...P)); | ||
has_returned = false; | ||
send = null; | ||
//......................................................................................................... | ||
tsend = (d) => { | ||
if (has_returned) { | ||
throw new Error("µ55663 illegal to call send() after method has returned"); | ||
} | ||
return send(d); | ||
}; | ||
tsend.end = function() { | ||
return send.end(); | ||
}; | ||
//......................................................................................................... | ||
if (settings == null) { | ||
/* fast track without surround features */ | ||
return (d, send_) => { | ||
send = send_; | ||
has_returned = false; | ||
method(d, tsend); | ||
has_returned = true; | ||
return null; | ||
}; | ||
} | ||
//......................................................................................................... | ||
self = null; | ||
do_leapfrog = settings.leapfrog; | ||
data_first = settings.first; | ||
data_before = settings.before; | ||
data_between = settings.between; | ||
data_after = settings.after; | ||
data_last = settings.last; | ||
send_first = data_first !== misfit; | ||
send_before = data_before !== misfit; | ||
send_between = data_between !== misfit; | ||
send_after = data_after !== misfit; | ||
send_last = data_last !== misfit; | ||
on_end = null; | ||
is_first = true; | ||
ME = this; | ||
//......................................................................................................... | ||
/* slow track with surround features */ | ||
R = (d, send_) => { | ||
// debug 'µ55641', d, d is @signals.last | ||
send = send_; | ||
has_returned = false; | ||
//....................................................................................................... | ||
if (send_last && d === this.signals.last) { | ||
method(data_last, tsend); | ||
} else { | ||
//....................................................................................................... | ||
if (is_first) { | ||
(send_first ? method(data_first, tsend) : void 0); | ||
} else { | ||
(send_between ? method(data_between, tsend) : void 0); | ||
} | ||
if (send_before) { | ||
method(data_before, tsend); | ||
} | ||
is_first = false; | ||
//..................................................................................................... | ||
// When leapfrogging is being called for, only call method if the jumper returns false: | ||
if ((!do_leapfrog) || (!settings.leapfrog(d))) { | ||
method(d, tsend); | ||
} else { | ||
send(d); | ||
} | ||
if (send_after) { | ||
//..................................................................................................... | ||
method(data_after, tsend); | ||
} | ||
} | ||
has_returned = true; | ||
return null; | ||
}; | ||
if (send_last) { | ||
//......................................................................................................... | ||
R[this.marks.send_last] = this.marks.send_last; | ||
} | ||
return R; | ||
}; | ||
//----------------------------------------------------------------------------------------------------------- | ||
this.$map = function(method) { | ||
return (d, send) => { | ||
return send(method(d)); | ||
}; | ||
}; | ||
// @extend object_with_class_properties | ||
filenames = FS.readdirSync(__dirname); | ||
this.$drain = function(on_end = null) { | ||
var ref; | ||
return {[ref = this.marks.isa_sink]: ref, on_end}; | ||
}; | ||
this.$pass = function() { | ||
return (d, send) => { | ||
return send(d); | ||
}; | ||
}; | ||
//----------------------------------------------------------------------------------------------------------- | ||
this.$show = function(settings) { | ||
var ref, title; | ||
title = ((ref = settings != null ? settings.title : void 0) != null ? ref : '-->') + ' '; | ||
return this.$((d, send) => { | ||
info(title + jr(d)); | ||
return send(d); | ||
}); | ||
}; | ||
// #----------------------------------------------------------------------------------------------------------- | ||
// @$xs = ( method ) -> ( d, send ) => | ||
// if ( d.$stamped ? false ) then return send d | ||
// method d, send | ||
//----------------------------------------------------------------------------------------------------------- | ||
$watch = function(settings, method) { | ||
/* If any `surround` feature is called for, wrap all surround values so that we can safely | ||
distinguish between them and ordinary stream values; this is necessary to prevent them from leaking | ||
into the regular stream outside the `$watch` transform: */ | ||
var arity, key, take_second, value; | ||
switch (arity = arguments.length) { | ||
case 1: | ||
method = settings; | ||
return this.$((d, send) => { | ||
method(d); | ||
return send(d); | ||
}); | ||
//....................................................................................................... | ||
case 2: | ||
if (settings == null) { | ||
return this.$watch(method); | ||
} | ||
take_second = Symbol('take-second'); | ||
settings = assign({}, settings); | ||
for (key in settings) { | ||
value = settings[key]; | ||
settings[key] = [take_second, value]; | ||
} | ||
//..................................................................................................... | ||
return this.$(settings, (d, send) => { | ||
if ((CND.isa_list(d)) && (d[0] === take_second)) { | ||
method(d[1]); | ||
} else { | ||
method(d); | ||
send(d); | ||
} | ||
return null; | ||
}); | ||
} | ||
//......................................................................................................... | ||
throw new Error(`µ18244 expected one or two arguments, got ${arity}`); | ||
}; | ||
this.$watch = $watch.bind(this); | ||
//----------------------------------------------------------------------------------------------------------- | ||
this.$as_text = function(settings) { | ||
return (d, send) => { | ||
var ref, serialize; | ||
serialize = (ref = settings != null ? settings['serialize'] : void 0) != null ? ref : JSON.stringify; | ||
return this.$map((data) => { | ||
return serialize(data); | ||
}); | ||
}; | ||
}; | ||
//----------------------------------------------------------------------------------------------------------- | ||
this._classify_transform = function(transform) { | ||
var R, name, name1, name2; | ||
R = (() => { | ||
var type; | ||
if (transform[this.marks.isa_duct] != null) { | ||
return { | ||
type: transform.type | ||
}; | ||
for (i = 0, len = filenames.length; i < len; i++) { | ||
filename = filenames[i]; | ||
if (!filename.endsWith('.js')) { | ||
continue; | ||
} | ||
if (transform[this.marks.isa_pusher] != null) { | ||
return { | ||
type: 'source', | ||
isa_pusher: true | ||
}; | ||
if (filename.startsWith('_')) { | ||
continue; | ||
} | ||
if (transform[Symbol.iterator] != null) { | ||
return { | ||
type: 'source' | ||
}; | ||
if (filename === 'main.js') { | ||
continue; | ||
} | ||
switch (type = type_of(transform)) { | ||
case 'function': | ||
return { | ||
type: 'through' | ||
}; | ||
case 'generatorfunction': | ||
return { | ||
type: 'source', | ||
must_call: true | ||
}; | ||
if (filename === 'types.js') { | ||
continue; | ||
} | ||
if (transform[this.marks.isa_sink] != null) { | ||
return { | ||
type: 'sink', | ||
on_end: transform.on_end | ||
}; | ||
} | ||
throw new Error(`µ44521 expected an iterable, a function, a generator function or a sink, got a ${type}`); | ||
})(); | ||
switch (R.type) { | ||
case 'source': | ||
if (transform[name = this.marks.isa_source] == null) { | ||
transform[name] = this.marks.isa_source; | ||
} | ||
break; | ||
case 'through': | ||
if (transform[name1 = this.marks.isa_through] == null) { | ||
transform[name1] = this.marks.isa_through; | ||
} | ||
break; | ||
case 'sink': | ||
if (transform[name2 = this.marks.isa_sink] == null) { | ||
transform[name2] = this.marks.isa_sink; | ||
} | ||
path = './' + filename; | ||
Steampipes.include(require(path)); | ||
} | ||
return R; | ||
}; | ||
//----------------------------------------------------------------------------------------------------------- | ||
this._flatten_transforms = function(transforms, R = null) { | ||
var i, j, len, len1, ref, t, transform; | ||
if (R == null) { | ||
R = []; | ||
} | ||
for (i = 0, len = transforms.length; i < len; i++) { | ||
transform = transforms[i]; | ||
if (transform[this.marks.isa_duct] != null) { | ||
ref = transform.transforms; | ||
for (j = 0, len1 = ref.length; j < len1; j++) { | ||
t = ref[j]; | ||
/* TAINT necessary to do this recursively? */ | ||
R.push(t); | ||
} | ||
} else { | ||
R.push(transform); | ||
} | ||
} | ||
return R; | ||
}; | ||
return Steampipes; | ||
//----------------------------------------------------------------------------------------------------------- | ||
this._new_duct = function(transforms) { | ||
var R, b, blurbs, i, idx, key, ref, ref1, transform; | ||
transforms = this._flatten_transforms(transforms); | ||
R = {[ref = this.marks.isa_duct]: ref, transforms}; | ||
blurbs = (function() { | ||
var i, len, results; | ||
results = []; | ||
for (i = 0, len = transforms.length; i < len; i++) { | ||
transform = transforms[i]; | ||
results.push(this._classify_transform(transform)); | ||
} | ||
return results; | ||
}).call(this); | ||
if (transforms.length === 0) { | ||
return { | ||
...R, | ||
is_empty: true | ||
}; | ||
} | ||
//......................................................................................................... | ||
R.first = blurbs[0]; | ||
if (transforms.length === 1) { | ||
R.is_single = true; | ||
R.last = R.first; | ||
R.type = R.first.type; | ||
} else { | ||
R.last = blurbs[transforms.length - 1]; | ||
switch (key = `${R.first.type}/${R.last.type}`) { | ||
case 'source/through': | ||
R.type = 'source'; | ||
break; | ||
case 'through/sink': | ||
R.type = 'sink'; | ||
break; | ||
case 'through/through': | ||
R.type = 'through'; | ||
break; | ||
case 'source/sink': | ||
R.type = 'circuit'; | ||
break; | ||
default: | ||
throw new Error(`µ44521 illegal duct configuration ${rpr(key)}`); | ||
} | ||
for (idx = i = 1, ref1 = blurbs.length - 1; i < ref1; idx = i += +1) { | ||
if ((b = blurbs[idx]).type !== 'through') { | ||
throw new Error(`µ44522 illegal duct configuration at transform index ${idx}: ${rpr(b)}`); | ||
} | ||
} | ||
} | ||
return R; | ||
}; | ||
}).call(this); | ||
//----------------------------------------------------------------------------------------------------------- | ||
this._pull = function(...transforms) { | ||
var _, duct, exhaust_pipeline, last, local_sink, local_source, mem_source, mem_sources, original_source, send; | ||
duct = this._new_duct(transforms); | ||
({transforms} = duct); | ||
original_source = null; | ||
if (duct.last.type === 'source') { | ||
throw new Error("µ77764 source as last transform not yet supported"); | ||
} | ||
if (duct.first.type === 'sink') { | ||
throw new Error("µ77765 sink as first transform not yet supported"); | ||
} | ||
//......................................................................................................... | ||
if (duct.first.type === 'source') { | ||
original_source = transforms.shift(); | ||
if (duct.first.must_call) { | ||
original_source = original_source(); | ||
} | ||
} | ||
//......................................................................................................... | ||
if (duct.last.type === 'sink') { | ||
transforms.pop(); | ||
} | ||
if (duct.type !== 'circuit') { | ||
//......................................................................................................... | ||
return duct; | ||
} | ||
//......................................................................................................... | ||
duct.original_source = original_source; | ||
duct.mem_source = mem_source = []; | ||
duct.mem_sources = mem_sources = [ | ||
mem_source, | ||
...((function() { | ||
var i, | ||
ref, | ||
results; | ||
results = []; | ||
for (_ = i = 0, ref = transforms.length; (0 <= ref ? i < ref : i > ref); _ = 0 <= ref ? ++i : --i) { | ||
results.push([]); | ||
} | ||
return results; | ||
})()) | ||
]; | ||
duct.has_ended = false; | ||
local_sink = null; | ||
local_source = null; | ||
last = this.signals.last; | ||
//......................................................................................................... | ||
send = (d) => { | ||
if (d === this.signals.end) { | ||
return duct.has_ended = true; | ||
} | ||
return local_sink.push(d); | ||
}; | ||
send.end = () => { | ||
return duct.has_ended = true; | ||
}; | ||
//......................................................................................................... | ||
exhaust_pipeline = () => { | ||
var d, has_data, i, idx, len, transform; | ||
while (true) { | ||
has_data = false; | ||
for (idx = i = 0, len = transforms.length; i < len; idx = ++i) { | ||
transform = transforms[idx]; | ||
if ((local_source = mem_sources[idx]).length === 0) { | ||
continue; | ||
} | ||
has_data = true; | ||
local_sink = mem_sources[idx + 1]; | ||
d = local_source.shift(); | ||
if (d === last) { | ||
if (transform[this.marks.send_last] != null) { | ||
transform(d, send); | ||
} | ||
send(last); | ||
} else { | ||
transform(d, send); | ||
} | ||
} | ||
if (!has_data) { | ||
break; | ||
} | ||
} | ||
return null; | ||
}; | ||
//......................................................................................................... | ||
duct.send = send; | ||
duct.exhaust_pipeline = exhaust_pipeline; | ||
//......................................................................................................... | ||
return duct; | ||
}; | ||
// @specs = {} | ||
// @isa = Multimix.get_keymethod_proxy @, isa | ||
// # @validate = Multimix.get_keymethod_proxy @, validate | ||
// declarations.declare_types.apply @ | ||
//----------------------------------------------------------------------------------------------------------- | ||
this.pull = function(...transforms) { | ||
var d, duct, ref; | ||
duct = this._pull(...transforms); | ||
if (duct.type !== 'circuit') { | ||
return duct; | ||
} | ||
if (duct.original_source[this.marks.isa_pusher] != null) { | ||
return this._push(duct); | ||
} | ||
ref = duct.original_source; | ||
//......................................................................................................... | ||
for (d of ref) { | ||
if (duct.has_ended) { | ||
break; | ||
} | ||
// continue if d is @signals.discard | ||
duct.mem_source.push(d); | ||
duct.exhaust_pipeline(); | ||
} | ||
//......................................................................................................... | ||
duct.mem_source.push(this.signals.last); | ||
duct.exhaust_pipeline(); | ||
if (duct.last.on_end != null) { | ||
duct.last.on_end(); | ||
} | ||
return null; | ||
}; | ||
//########################################################################################################### | ||
module.exports = L = new Steampipes(); | ||
//----------------------------------------------------------------------------------------------------------- | ||
this._push = function(duct) { | ||
/* Make `duct` available from the POV of the push source: */ | ||
duct.original_source.duct = duct; | ||
/* copy buffered data (from before when `pull()` was called) to `original_source`: */ | ||
duct.mem_source.splice(duct.mem_source.length, 0, ...duct.original_source.buffer); | ||
/* Process any data as may have accumulated at this point: */ | ||
duct.exhaust_pipeline(); | ||
return null; | ||
}; | ||
L.Steampipes = Steampipes; | ||
//=========================================================================================================== | ||
// SOURCES | ||
//----------------------------------------------------------------------------------------------------------- | ||
this.new_value_source = function*(x) { | ||
return (yield* x); | ||
}; | ||
}).call(this); | ||
//----------------------------------------------------------------------------------------------------------- | ||
this.new_push_source = function() { | ||
var R, end, ref, send; | ||
send = (d) => { | ||
if (R.duct == null) { | ||
return R.buffer.push(d); | ||
} | ||
R.buffer = null; | ||
if (d === this.signals.end) { | ||
return end(); | ||
} | ||
R.duct.mem_source.push(d); | ||
R.duct.exhaust_pipeline(); | ||
return null; | ||
}; | ||
end = () => { | ||
R.duct.mem_source.push(this.signals.last); | ||
R.duct.exhaust_pipeline(); | ||
if (R.duct.last.on_end != null) { | ||
R.duct.last.on_end(); | ||
} | ||
return R.duct = null; | ||
}; | ||
R = { | ||
[ref = this.marks.isa_pusher]: ref, | ||
send, | ||
end, | ||
buffer: [], | ||
duct: null | ||
}; | ||
return R; | ||
}; | ||
//=========================================================================================================== | ||
//----------------------------------------------------------------------------------------------------------- | ||
this.$collect = function(settings) { | ||
var collector, last, ref; | ||
collector = (ref = settings != null ? settings.collector : void 0) != null ? ref : []; | ||
last = Symbol('last'); | ||
return this.$({last}, (d, send) => { | ||
if (d === last) { | ||
return send(collector); | ||
} | ||
collector.push(d); | ||
return null; | ||
}); | ||
}; | ||
}).call(this); | ||
//# sourceMappingURL=main.js.map |
(function() { | ||
'use strict'; | ||
var $, $async, $send_three, CND, SP, after, badge, debug, echo, help, info, jr, rpr, test, urge, warn, whisper; | ||
var $, $async, $send_three, $show, $watch, CND, SP, after, badge, debug, defer, echo, help, info, jr, rpr, test, urge, warn, whisper; | ||
@@ -10,3 +10,3 @@ //########################################################################################################### | ||
badge = 'STEAMPIPES/TESTS/ASYNC-MAP'; | ||
badge = 'STEAMPIPES/TESTS/ASYNC'; | ||
@@ -35,4 +35,6 @@ debug = CND.get_logger('debug', badge); | ||
({$, $async} = SP); | ||
({$, $async, $watch, $show} = SP.export()); | ||
defer = setImmediate; | ||
//----------------------------------------------------------------------------------------------------------- | ||
@@ -44,32 +46,55 @@ after = function(dts, f) { | ||
//----------------------------------------------------------------------------------------------------------- | ||
this["async 1"] = function(T, done) { | ||
var matcher, ok, pipeline, probe; | ||
this["async 0"] = async function(T, done) { | ||
var i, len, matcher, ok, probe, ref, use_async; | ||
ok = false; | ||
[probe, matcher] = ["abcdef", "1a-2a-1b-2b-1c-2c-1d-2d-1e-2e-1f-2f"]; | ||
pipeline = []; | ||
pipeline.push(SP.new_value_source(Array.from(probe))); | ||
pipeline.push($async(function(d, send, done) { | ||
send(`1${d}`); | ||
send(`2${d}`); | ||
return done(); | ||
})); | ||
pipeline.push(SP.$surround({ | ||
between: '-' | ||
})); | ||
pipeline.push(SP.$join()); | ||
//......................................................................................................... | ||
pipeline.push(SP.$watch(function(result) { | ||
echo(CND.gold(jr([probe, result]))); | ||
T.eq(result, matcher); | ||
return ok = true; | ||
})); | ||
//......................................................................................................... | ||
pipeline.push(SP.$drain(function() { | ||
if (!ok) { | ||
T.fail("failed to pass test"); | ||
} | ||
return done(); | ||
})); | ||
//......................................................................................................... | ||
SP.pull(...pipeline); | ||
ref = [true, false]; | ||
for (i = 0, len = ref.length; i < len; i++) { | ||
use_async = ref[i]; | ||
await (() => { | ||
return new Promise((resolve) => { | ||
var pipeline; | ||
pipeline = []; | ||
pipeline.push(probe); | ||
// pipeline.push $watch ( d ) -> info 'µ1', jr d | ||
if (use_async) { | ||
pipeline.push($async(function(d, send, done) { | ||
defer(function() { | ||
return send(`1${d}`); | ||
}); | ||
return after(0.1, function() { | ||
send(`2${d}`); | ||
return done(); | ||
}); | ||
})); | ||
} else { | ||
pipeline.push($(function(d, send) { | ||
send(`1${d}`); | ||
return send(`2${d}`); | ||
})); | ||
} | ||
// pipeline.push $watch ( d ) -> urge 'µ2', jr d | ||
pipeline.push(SP.$surround({ | ||
between: '-' | ||
})); | ||
pipeline.push(SP.$join()); | ||
//......................................................................................................... | ||
pipeline.push(SP.$watch(function(result) { | ||
echo(CND.gold(jr([probe, result]))); | ||
T.eq(result, matcher); | ||
return ok = true; | ||
})); | ||
//......................................................................................................... | ||
pipeline.push(SP.$drain(function() { | ||
if (!ok) { | ||
T.fail("failed to pass test"); | ||
} | ||
return resolve(); | ||
})); | ||
//......................................................................................................... | ||
return SP.pull(...pipeline); | ||
}); | ||
})(); | ||
} | ||
done(); | ||
return null; | ||
@@ -80,3 +105,3 @@ }; | ||
$send_three = function() { | ||
return SP.$async(function(d, send, done) { | ||
return $async(function(d, send, done) { | ||
var count, i, nr; | ||
@@ -86,3 +111,5 @@ count = 0; | ||
(function(d, nr) { | ||
return after(Math.random() / 5, function() { | ||
var dt; | ||
dt = Math.random() / 10; | ||
return after(dt, function() { | ||
count += 1; | ||
@@ -104,7 +131,8 @@ send(`(${d}:${nr})`); | ||
ok = false; | ||
probe = "abcdef"; | ||
probe = "fdcabe"; | ||
matcher = "(a:1)(a:2)(a:3)(b:1)(b:2)(b:3)(c:1)(c:2)(c:3)(d:1)(d:2)(d:3)(e:1)(e:2)(e:3)(f:1)(f:2)(f:3)"; | ||
pipeline = []; | ||
pipeline.push(SP.new_value_source(Array.from(probe))); | ||
pipeline.push(Array.from(probe)); | ||
pipeline.push($send_three()); | ||
// pipeline.push $show { title: '2', } | ||
pipeline.push(SP.$sort()); | ||
@@ -128,2 +156,3 @@ pipeline.push(SP.$join()); | ||
//......................................................................................................... | ||
// T.throws /contains asynchronous transform/, -> SP.pull pipeline... | ||
SP.pull(...pipeline); | ||
@@ -135,8 +164,10 @@ return null; | ||
if (module.parent == null) { | ||
test(this); | ||
test(this, { | ||
timeout: 10000 | ||
}); | ||
} | ||
// test @[ "async 1" ] | ||
// test @[ "async 2" ] | ||
// test @[ "async 0" ], { timeout: 10000, } | ||
// test @[ "async 2" ], { timeout: 10000, } | ||
}).call(this); |
@@ -0,1 +1,2 @@ | ||
// Generated by CoffeeScript 2.4.1 | ||
(function() { | ||
@@ -41,3 +42,3 @@ //########################################################################################################### | ||
({$, $async, $watch, $show} = SP); | ||
({$, $async, $watch, $show} = SP.export()); | ||
@@ -215,6 +216,104 @@ //........................................................................................................... | ||
//----------------------------------------------------------------------------------------------------------- | ||
this["remit"] = function(T, done) { | ||
this["remit 1"] = function(T, done) { | ||
var pipeline, result; | ||
result = []; | ||
pipeline = []; | ||
pipeline.push([1, 2, 3]); | ||
pipeline.push($(function(d, send) { | ||
info('µ1', d); | ||
return send(d + 10); | ||
})); | ||
pipeline.push($(function(d, send) { | ||
info('µ2', d); | ||
send(d); | ||
return send(d + 10); | ||
})); | ||
pipeline.push($(function(d, send) { | ||
info('µ3', d); | ||
result.push(d); | ||
return send(d); | ||
})); | ||
pipeline.push(SP.$drain(function() { | ||
// debug 'µ11121', jr result | ||
T.eq(result, [11, 21, 12, 22, 13, 23]); | ||
return done(); | ||
})); | ||
return SP.pull(...pipeline); | ||
}; | ||
//----------------------------------------------------------------------------------------------------------- | ||
this["drain with result"] = function(T, done) { | ||
var duct, pipeline; | ||
pipeline = []; | ||
pipeline.push([1, 2, 3]); | ||
pipeline.push($(function(d, send) { | ||
info('µ1', d); | ||
return send(d + 10); | ||
})); | ||
pipeline.push($(function(d, send) { | ||
info('µ2', d); | ||
send(d); | ||
return send(d + 10); | ||
})); | ||
pipeline.push(SP.$drain(function(result) { | ||
// debug 'µ1112-1', duct | ||
// debug 'µ1112-2', jr result | ||
T.eq(result, [11, 21, 12, 22, 13, 23]); | ||
return done(); | ||
})); | ||
return duct = SP.pull(...pipeline); | ||
}; | ||
//----------------------------------------------------------------------------------------------------------- | ||
this["drain with sink 1"] = function(T, done) { | ||
var duct, pipeline, sink; | ||
sink = []; | ||
pipeline = []; | ||
pipeline.push([1, 2, 3]); | ||
pipeline.push($(function(d, send) { | ||
info('µ1', d); | ||
return send(d + 10); | ||
})); | ||
pipeline.push($(function(d, send) { | ||
info('µ2', d); | ||
send(d); | ||
return send(d + 10); | ||
})); | ||
pipeline.push(SP.$drain({sink}, function(result) { | ||
// debug 'µ1112-1', duct | ||
// debug 'µ1112-2', jr result | ||
T.ok(result === sink); | ||
T.eq(result, [11, 21, 12, 22, 13, 23]); | ||
return done(); | ||
})); | ||
return duct = SP.pull(...pipeline); | ||
}; | ||
//----------------------------------------------------------------------------------------------------------- | ||
this["drain with sink 2"] = function(T, done) { | ||
var duct, pipeline, sink; | ||
sink = []; | ||
pipeline = []; | ||
pipeline.push([1, 2, 3]); | ||
pipeline.push($(function(d, send) { | ||
info('µ1', d); | ||
return send(d + 10); | ||
})); | ||
pipeline.push($(function(d, send) { | ||
info('µ2', d); | ||
send(d); | ||
return send(d + 10); | ||
})); | ||
pipeline.push(SP.$drain({sink}, function() { | ||
T.eq(sink, [11, 21, 12, 22, 13, 23]); | ||
return done(); | ||
})); | ||
return duct = SP.pull(...pipeline); | ||
}; | ||
//----------------------------------------------------------------------------------------------------------- | ||
this["remit 2"] = function(T, done) { | ||
var pipeline, result; | ||
result = []; | ||
pipeline = []; | ||
pipeline.push(Array.from('abcd')); | ||
@@ -425,16 +524,17 @@ pipeline.push(SP.$map(function(d) { | ||
T.eq(r.type, 'source'); | ||
T.eq((ref1 = r.first.isa_pusher) != null ? ref1 : false, true); | ||
return T.eq(r.transforms[0][SP.marks.isa_source], SP.marks.isa_source); | ||
return T.eq((ref1 = r.first.isa_pusher) != null ? ref1 : false, true); | ||
})(); | ||
(() => { //......................................................................................................... | ||
var on_end, r, sink; | ||
var drain, on_end, r, sink; | ||
r = SP._new_duct([sink = SP.$drain(on_end = (function() {}))]); | ||
drain = r.transforms[r.transforms.length - 1]; | ||
T.eq(r.first, r.last); | ||
T.eq(r.is_single, true); | ||
T.eq(r.first.type, 'sink'); | ||
T.eq(r.type, 'sink'); | ||
T.eq(r.last.on_end, on_end); | ||
return T.eq(r.transforms[0][SP.marks.isa_sink], SP.marks.isa_sink); | ||
return T.eq(r.type, 'sink'); | ||
})(); | ||
(() => { //......................................................................................................... | ||
(() => { // T.eq r.last.on_end, on_end | ||
// T.eq r.transforms[ 0 ][ SP.marks.steampipes ], SP.marks.steampipes | ||
// T.eq r.transforms[ 0 ].type, 'sink' | ||
//......................................................................................................... | ||
var r, through; | ||
@@ -446,4 +546,3 @@ r = SP._new_duct([through = SP.$((function(d, send) {}))]); | ||
T.eq(r.type, 'through'); | ||
T.eq(r.transforms[0], through); | ||
return T.eq(r.transforms[0][SP.marks.isa_through], SP.marks.isa_through); | ||
return T.eq(r.transforms[0], through); | ||
})(); | ||
@@ -456,4 +555,3 @@ (() => { //......................................................................................................... | ||
T.eq(r.first.type, 'source'); | ||
T.eq(r.type, 'source'); | ||
return T.eq(r.transforms[0][SP.marks.isa_source], SP.marks.isa_source); | ||
return T.eq(r.type, 'source'); | ||
})(); | ||
@@ -478,3 +576,3 @@ (() => { //......................................................................................................... | ||
//----------------------------------------------------------------------------------------------------------- | ||
this["composability"] = async function(T, done) { | ||
this["composability (through)"] = async function(T, done) { | ||
var error, matcher, probe; | ||
@@ -484,3 +582,3 @@ [probe, matcher, error] = [["what", "a", "lot", "of", "little", "bottles"], ["what", "a", "lot", "of", "little", "bottles"], null]; | ||
return new Promise(function(resolve, reject) { | ||
var R, duct_A, pipeline_A, pipeline_B, source; | ||
var R, duct_A, duct_B, length_of_A, length_of_B, pipeline_A, pipeline_B, source; | ||
R = []; | ||
@@ -496,4 +594,5 @@ source = probe; | ||
})); | ||
length_of_A = pipeline_A.length; | ||
duct_A = SP.pull(...pipeline_A); | ||
T.eq(duct_A.transforms.length, 2); | ||
T.eq(duct_A.transforms.length, length_of_A); | ||
T.eq(duct_A.type, 'through'); | ||
@@ -508,6 +607,9 @@ //....................................................................................................... | ||
pipeline_B.push(SP.$drain(function() { | ||
help('ok'); | ||
return resolve(R); | ||
return help('ok'); | ||
})); | ||
SP.pull(...pipeline_B); | ||
length_of_B = pipeline_B.length - 1 + length_of_A; | ||
duct_B = SP.pull(...pipeline_B); | ||
T.eq(duct_B.transforms.length, length_of_B); | ||
T.eq(duct_B.type, 'circuit'); | ||
resolve(R); | ||
return null; | ||
@@ -521,2 +623,91 @@ }); | ||
//----------------------------------------------------------------------------------------------------------- | ||
this["composability (source)"] = async function(T, done) { | ||
var error, matcher, probe; | ||
[probe, matcher, error] = ["𦇻𦑛𦖵𦩮𦫦𧞈", Array.from('𦇻𦑛𦖵𦩮𦫦𧞈'), null]; | ||
await T.perform(probe, matcher, error, function() { | ||
return new Promise(function(resolve, reject) { | ||
var R, duct_A, duct_B, length_of_A, length_of_B, pipeline_A, pipeline_B, source; | ||
R = []; | ||
source = probe; | ||
//....................................................................................................... | ||
pipeline_A = []; | ||
pipeline_A.push(source); | ||
pipeline_A.push(SP.$watch(function(d) { | ||
return info(xrpr(d)); | ||
})); | ||
pipeline_A.push(SP.$collect({ | ||
collector: R | ||
})); | ||
length_of_A = pipeline_A.length; | ||
duct_A = SP.pull(...pipeline_A); | ||
T.eq(duct_A.transforms.length, length_of_A); | ||
T.eq(duct_A.type, 'source'); | ||
//....................................................................................................... | ||
pipeline_B = []; | ||
pipeline_B.push(duct_A); | ||
pipeline_B.push(SP.$watch(function(d) { | ||
return info(xrpr(d)); | ||
})); | ||
pipeline_B.push(SP.$drain(function() { | ||
return help('ok'); | ||
})); | ||
length_of_B = pipeline_B.length - 1 + length_of_A; | ||
duct_B = SP.pull(...pipeline_B); | ||
T.eq(duct_B.transforms.length, length_of_B); | ||
T.eq(duct_B.type, 'circuit'); | ||
resolve(R); | ||
return null; | ||
}); | ||
}); | ||
//......................................................................................................... | ||
done(); | ||
return null; | ||
}; | ||
//----------------------------------------------------------------------------------------------------------- | ||
this["composability (sink)"] = async function(T, done) { | ||
var error, matcher, probe; | ||
[probe, matcher, error] = ["𦇻𦑛𦖵𦩮𦫦𧞈", Array.from('𦇻𦑛𦖵𦩮𦫦𧞈'), null]; | ||
await T.perform(probe, matcher, error, function() { | ||
return new Promise(function(resolve, reject) { | ||
var R, duct_A, duct_B, length_of_A, length_of_B, pipeline_A, pipeline_B, source; | ||
R = []; | ||
source = probe; | ||
//....................................................................................................... | ||
pipeline_A = []; | ||
pipeline_A.push(SP.$watch(function(d) { | ||
return info(xrpr(d)); | ||
})); | ||
pipeline_A.push(SP.$collect({ | ||
collector: R | ||
})); | ||
pipeline_A.push(SP.$drain(function() { | ||
return help('ok'); | ||
})); | ||
length_of_A = pipeline_A.length; | ||
duct_A = SP.pull(...pipeline_A); | ||
T.eq(duct_A.transforms.length, length_of_A); | ||
T.eq(duct_A.type, 'sink'); | ||
//....................................................................................................... | ||
pipeline_B = []; | ||
pipeline_B.push(source); | ||
pipeline_B.push(SP.$watch(function(d) { | ||
return info(xrpr(d)); | ||
})); | ||
pipeline_B.push(duct_A); | ||
length_of_B = pipeline_B.length - 1 + length_of_A; | ||
duct_B = SP.pull(...pipeline_B); | ||
T.eq(duct_B.transforms.length, length_of_B); | ||
T.eq(duct_B.type, 'circuit'); | ||
// debug 'µ11124', duct_B | ||
resolve(R); | ||
return null; | ||
}); | ||
}); | ||
//......................................................................................................... | ||
done(); | ||
return null; | ||
}; | ||
/* | ||
@@ -819,52 +1010,2 @@ | ||
#----------------------------------------------------------------------------------------------------------- | ||
@[ "leapfrog 1" ] = ( T, done ) -> | ||
* through = require 'pull-through' | ||
probes_and_matchers = [ | ||
[[[ 1 .. 10], ( ( d ) -> d %% 2 isnt 0 ), ],[1,102,3,104,5,106,7,108,9,110],null] | ||
] | ||
#......................................................................................................... | ||
collector = [] | ||
for [ probe, matcher, error, ] in probes_and_matchers | ||
await T.perform probe, matcher, error, -> new Promise ( resolve ) -> | ||
[ values | ||
jumper ] = probe | ||
#..................................................................................................... | ||
source = SP.new_value_source values | ||
collector = [] | ||
pipeline = [] | ||
pipeline.push source | ||
pipeline.push SP.$ { leapfrog: jumper, }, ( d, send ) -> send 100 + d | ||
pipeline.push SP.$collect { collector, } | ||
pipeline.push SP.$drain -> resolve collector | ||
pull pipeline... | ||
#......................................................................................................... | ||
done() | ||
return null | ||
#----------------------------------------------------------------------------------------------------------- | ||
@[ "leapfrog 2" ] = ( T, done ) -> | ||
* through = require 'pull-through' | ||
probes_and_matchers = [ | ||
[[[ 1 .. 10], ( ( d ) -> d %% 2 isnt 0 ), ],[1,102,3,104,5,106,7,108,9,110],null] | ||
] | ||
#......................................................................................................... | ||
collector = [] | ||
for [ probe, matcher, error, ] in probes_and_matchers | ||
await T.perform probe, matcher, error, -> new Promise ( resolve ) -> | ||
[ values | ||
jumper ] = probe | ||
#..................................................................................................... | ||
source = SP.new_value_source values | ||
collector = [] | ||
pipeline = [] | ||
pipeline.push source | ||
pipeline.push SP.$ { leapfrog: jumper, }, ( d, send ) -> send 100 + d | ||
pipeline.push SP.$collect { collector, } | ||
pipeline.push SP.$drain -> resolve collector | ||
pull pipeline... | ||
#......................................................................................................... | ||
done() | ||
return null | ||
#----------------------------------------------------------------------------------------------------------- | ||
@[ "$scramble" ] = ( T, done ) -> | ||
@@ -908,6 +1049,10 @@ probes_and_matchers = [ | ||
// test @[ "remit" ] | ||
// test @[ "remit 1" ] | ||
// test @[ "drain with result" ] | ||
// test @[ "remit 2" ] | ||
// test @[ "remit with end detection 1" ] | ||
// test @[ "duct_from_transforms" ] | ||
// test @[ "composability" ] | ||
// test @[ "composability (through)" ] | ||
// test @[ "composability (source)" ] | ||
// test @[ "composability (sink)" ] | ||
// test @[ "remit with end detection 2" ] | ||
@@ -934,1 +1079,3 @@ // test @[ "remit with surrounds" ] | ||
}).call(this); | ||
//# sourceMappingURL=basic.test.js.map |
@@ -0,4 +1,5 @@ | ||
// Generated by CoffeeScript 2.4.1 | ||
(function() { | ||
//########################################################################################################### | ||
var CND, FS, L, OS, PATH, alert, badge, debug, echo, glob, help, info, log, rpr, test, urge, warn, whisper; | ||
var CND, FS, L, PATH, alert, badge, debug, echo, help, info, log, rpr, test, urge, warn, whisper; | ||
@@ -9,3 +10,3 @@ CND = require('cnd'); | ||
badge = 'STEAMPIPES/TESTS/TEE'; | ||
badge = 'STEAMPIPES/TESTS/MAIN'; | ||
@@ -35,8 +36,4 @@ log = CND.get_logger('plain', badge); | ||
OS = require('os'); | ||
test = require('guy-test'); | ||
glob = require('globby'); | ||
//########################################################################################################### | ||
@@ -46,7 +43,13 @@ L = this; | ||
(function() { | ||
var i, key, len, module, path, paths, value; | ||
paths = glob.sync(PATH.join(__dirname, '*.test.js')); | ||
for (i = 0, len = paths.length; i < len; i++) { | ||
path = paths[i]; | ||
// debug '39838', path | ||
var filename, filenames, i, key, len, module, path, value; | ||
filenames = FS.readdirSync(__dirname); | ||
for (i = 0, len = filenames.length; i < len; i++) { | ||
filename = filenames[i]; | ||
if (!filename.endsWith('.test.js')) { | ||
continue; | ||
} | ||
if (filename.startsWith('_')) { | ||
continue; | ||
} | ||
path = PATH.join(__dirname, filename); | ||
module = require(path); | ||
@@ -58,3 +61,2 @@ for (key in module) { | ||
} | ||
debug('39838', path, key); | ||
if (L[key] != null) { | ||
@@ -66,8 +68,9 @@ throw new Error(`duplicate key ${rpr(key)}`); | ||
} | ||
test(L, { | ||
return test(L, { | ||
timeout: 5000 | ||
}); | ||
return help("tested:"); | ||
})(); | ||
}).call(this); | ||
//# sourceMappingURL=main.js.map |
{ | ||
"name": "steampipes", | ||
"version": "0.1.0", | ||
"version": "3.0.0", | ||
"description": "Fast, simple data pipelines", | ||
@@ -31,3 +31,5 @@ "main": "lib/main.js", | ||
"intertype": "^2.5.0", | ||
"letsfreezethat": "^1.1.1" | ||
"letsfreezethat": "^1.1.1", | ||
"multimix": "^2.0.0", | ||
"pipedreams": "^12.0.0" | ||
}, | ||
@@ -34,0 +36,0 @@ "devDependencies": { |
@@ -34,2 +34,55 @@ <!-- START doctoc generated TOC please keep comment here to allow auto update --> | ||
### How to Construct Transforms | ||
#### Sinks | ||
Arbitrary objects can act as sinks provided they have a `sink` property; this property must be either set to | ||
`true` for a generic sink or else be an object that has `push()` method (such as a list). A sink may, | ||
furthermore, also have an `on_end()` method which, if set, must be a function that takes zero or one | ||
argument. | ||
If the `sink` property is a list, then it will receive all data items that arrive through the pipeline (the | ||
resultant data of the pipeline); if it is `true`, then those data items will be discarded. | ||
The `on_end()` method will be called when streaming has terminated (since the source was exhausted or a | ||
transform called `aend.end()`); if it takes one argument, then that will be the list of resultant data. If | ||
both the `sink` property has been set to a list and `on_end()` takes an argument, then that value will be | ||
the `sink` property (you probably only want the one or the other in most cases). | ||
```coffee | ||
{ sink: true, } | ||
{ sink: true, on_end: ( -> do_something() ), } | ||
{ sink: true, on_end: ( ( result ) -> do_something result ), } | ||
{ sink: x, on_end: ( ( result ) -> do_something result ### NB result is x ### ), } | ||
``` | ||
The only SteamPipes method that produces a sink is `$drain()` (it should really be called `sink()` but for | ||
compatibility with PipeStreams the name has been kept as a holdover from `pull-stream`). `$drain()` takes | ||
zero, one or two arguments: | ||
```coffee | ||
$drain() is equiv. to { sink: true, } | ||
$drain -> ... is equiv. to { sink: true, on_end: ( -> ... ), } | ||
$drain ( x ) -> ... is equiv. to { sink: true, on_end: ( ( x ) -> ... ), } | ||
$drain { sink: x, }, -> ... is equiv. to { sink: x, on_end: ( -> ... ), } | ||
$drain { sink: x, }, ( x ) -> ... is equiv. to { sink: x, on_end: ( ( x ) -> ... ), } | ||
``` | ||
### Asynchronous Sources and Transforms | ||
Asynchronous transforms can be constructed using the 'asynchronous remit' method, `$async()`. The method | ||
passed into `$async()` must accept three arguments, namely `d` (the data item coming down the pipeline), | ||
`send` (the method to send data down the pipeline), and, in addition to synchronous transforms, `done`, | ||
which is a callback function used to signal completion (it is analogous to the `resulve` argument of | ||
promises, `new Promise ( resulve, reject ) ->` and indeed implemented as such). An example: | ||
```coffee | ||
X███████████████ | ||
X███████████████ | ||
X███████████████ | ||
X███████████████ | ||
``` | ||
### Ducts | ||
@@ -102,5 +155,5 @@ | ||
There's no API to *abort* a stream—i.e. make the stream and all transforms cease and desist immediately—but | ||
you can always wrap the `pull pipeline...` invocation into a `try`/`catch` clause and throw a custom | ||
symbolic value: | ||
There's no API to abort a stream—i.e. make the stream and all transforms stop processing immediately—but one | ||
can always wrap the `pull pipeline...` invocation into a `try`/`catch` clause and throw a custom symbolic | ||
value: | ||
@@ -112,3 +165,3 @@ ```coffee | ||
... | ||
throw 'OHNOES!' | ||
throw 'abort' | ||
... | ||
@@ -119,5 +172,5 @@ ... | ||
catch error | ||
throw error if error isnt 'OHNOES!' | ||
throw error if error isnt 'abort' | ||
warn "the stream was aborted" | ||
... | ||
``` |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Shell access
Supply chain riskThis module accesses the system shell. Accessing the system shell increases the risk of executing arbitrary code.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
713723
78
5530
0
173
5
12
+ Addedmultimix@^2.0.0
+ Addedpipedreams@^12.0.0
+ Addeddata-uri-to-buffer@2.0.2(transitive)
+ Addedemittery@0.6.0(transitive)
+ Addedget-source@1.0.42(transitive)
+ Addedletsfreezethat@2.2.5(transitive)
+ Addedlodash.flattendeep@4.4.0(transitive)
+ Addedlooper@3.0.04.0.0(transitive)
+ Addedpipedreams@12.1.1(transitive)
+ Addedpipestreams@8.0.1(transitive)
+ Addedpull-cont@0.1.1(transitive)
+ Addedpull-many@1.0.9(transitive)
+ Addedpull-map-last@1.0.0(transitive)
+ Addedpull-merge@1.0.4(transitive)
+ Addedpull-mux@0.1.3(transitive)
+ Addedpull-pair@1.1.0(transitive)
+ Addedpull-paramap@1.2.2(transitive)
+ Addedpull-pause@0.0.2(transitive)
+ Addedpull-pushable@2.2.0(transitive)
+ Addedpull-split@0.2.1(transitive)
+ Addedpull-stream@3.7.0(transitive)
+ Addedpull-stream-to-stream@2.0.0(transitive)
+ Addedpull-tee@2.0.1(transitive)
+ Addedpull-through@1.0.18(transitive)
+ Addedpull-utf8-decoder@1.0.2(transitive)
+ Addedsource-map@0.6.1(transitive)
+ Addedstack-trace@0.0.10(transitive)
+ Addedstream-to-pull-stream@1.7.3(transitive)
+ Addedtimsort@0.3.0(transitive)