@thi.ng/transducers
Lightweight transducer implementations for ES6 / TypeScript (~24KB minified, full lib).
TOC
About
This library provides altogether 90+ transducers, reducers and sequence
generators (iterators) for composing data transformation pipelines.
The overall concept and many of the core functions offered here are directly
inspired by the original Clojure implementation by Rich Hickey, though the
implementation does differ (also in contrast to some other JS based
implementations) and dozens of less common, but generally highly useful operators
have been added. See full list below.
The
@thi.ng/rstream
& @thi.ng/csp
partner modules provide related functionality, supplementing features of this
library and depending on it.
Since 0.8.0 this project largely supersedes the
@thi.ng/iterators
library for most use cases and offers are more powerful API and potentially
faster execution of composed transformations (due to lack of ES generator
overheads).
Installation
yarn add @thi.ng/transducers
Usage examples
There're several standalone example projects using this library in the
/examples directory.
Almost all functions can be imported selectively, but for development purposes
full module re-exports are defined.
import * as tx from "@thi.ng/transducers";
import { transduce } from "@thi.ng/transducers/transduce";
import { push } from "@thi.ng/transducers/rfn/push";
import { map } from "@thi.ng/transducers/xforms/map";
Basic usage patterns
xform = tx.comp(
tx.filter(x => (x & 1) > 0),
tx.distinct(),
tx.map(x=> x * 3)
);
tx.transduce(xform, tx.push(), [1, 2, 3, 4, 5, 4, 3, 2, 1]);
tx.transduce(xform, tx.conj(), [1, 2, 3, 4, 5, 4, 3, 2, 1]);
[...tx.iterator(xform, [1, 2, 3, 4, 5])]
f = tx.step(xform);
f(1)
f(2)
f(3)
f(4)
f = tx.step(take)
Histogram generation & result grouping
tx.transduce(tx.map(x => x.toUpperCase()), tx.frequencies(), "hello world")
tx.reduce(tx.frequencies(), [1, 1, 1, 2, 3, 4, 4])
tx.reduce(
tx.frequencies(x => x.length),
"my camel is collapsing and needs some water".split(" ")
)
tx.reduce(
tx.groupByMap(x => x.length),
"my camel is collapsing and needs some water".split(" ")
)
[...tx.iterator(tx.page(0, 5), tx.range(12))]
[...tx.iterator(tx.comp(tx.page(1, 5), tx.map(x => x * 10)), tx.range(12))]
[...tx.iterator(tx.comp(tx.page(2, 5), tx.padLast(5, "n/a")), tx.range(12))]
[...tx.iterator(tx.page(3, 5), rtx.ange(12))]
Multiplexing / parallel transducer application
multiplex
and multiplexObj
can be used to transform values in parallel
using the provided transducers (which can be composed as usual) and results in
a tuple or keyed object.
tx.transduce(
tx.multiplex(
tx.map(x => x.charAt(0)),
tx.map(x => x.toUpperCase()),
tx.map(x => x.length)
),
tx.push(),
["Alice", "Bob", "Charlie"]
)
tx.transduce(
tx.multiplexObj({
initial: tx.map(x => x.charAt(0)),
name: tx.map(x => x.toUpperCase()),
len: tx.map(x => x.length)
}),
tx.push(),
["Alice", "Bob", "Charlie"]
)
Moving average using sliding window
tx.transduce(
tx.comp(
tx.partition(5, 1),
tx.map(x => tx.reduce(tx.mean(), x))
),
tx.push(),
[1, 2, 3, 3, 4, 5, 5, 6, 7, 8, 8, 9, 10]
);
tx.transduce(
tx.movingAverage(5),
[1, 2, 3, 3, 4, 5, 5, 6, 7, 8, 8, 9, 10]
);
Benchmark function execution time
fn = () => { for(i=0; i<1e6; i++) let x =Math.cos(i); return x; };
tx.transduce(
tx.comp(tx.benchmark(), tx.take(100)),
tx.mean(),
tx.repeatedly(fn)
);
Apply inspectors to debug transducer pipeline
tx.transduce(
tx.comp(
tx.inspect("orig"),
tx.map(x => x + 1),
tx.inspect("mapped"),
tx.filter(x => (x & 1) > 0)
),
tx.push(),
[1, 2, 3, 4]
);
Stream parsing / structuring
The struct
transducer is simply a composition of: partitionOf -> partition -> rename -> mapKeys
.
See code here.
[...tx.iterator(
tx.struct([["id", 1, (id) => id[0]], ["pos", 2], ["vel", 2], ["color", 4]]),
[0, 100, 200, -1, 0, 1, 0.5, 0, 1, 1, 0, 0, 5, 4, 0, 0, 1, 1]) ]
CSV parsing
tx.transduce(
tx.comp(
tx.mapcat(x => x.split("\n")),
tx.map(x => x.split(",")),
tx.rename({ id: 0, name: 1, alias: 2, num: "length" })
),
tx.push(),
["100,typescript\n101,clojure,clj\n110,rust,rs"]
);
Early termination
tx.transduce(
tx.comp(tx.flatten(), tx.take(7)),
tx.push(),
[1, [2, [3, 4, [5, 6, [7, 8], 9, [10]]]]]
)
Scan operator
xform = tx.comp(
tx.scan(tx.count()),
tx.map(x => [...tx.repeat(x,x)]),
tx.scan(tx.pushCopy())
);
tx.transduce(xform, tx.push(), [1, 1, 1, 1]);
tx.transduce(tx.comp(tx.scan(tx.count), tx.scan(tx.pushCopy)), tx.push(), [1,1,1,1])
Streaming hexdump
This is a higher-order transducer, purely composed from other transducers.
See code here.
src = [65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 33, 48, 49, 50, 51, 126, 122, 121, 120]
[...iterator(hexDump(8, 0x400), src)]
Bitstream
[...tx.iterator(tx.bits(8), [ 0xf0, 0xaa ])]
[...tx.iterator(
tx.comp(
tx.bits(8),
tx.map(x=> x ? "#" : "."),
tx.partition(8),
tx.map(x=>x.join(""))
),
[ 0x00, 0x18, 0x3c, 0x66, 0x66, 0x7e, 0x66, 0x00 ])]
Base64 & UTF-8 en/decoding
enc = tx.transduce(
tx.comp(
tx.map(x => x + 0x80),
tx.base64Encode()
),
tx.str(),
tx.range(-8, 8)
);
[...tx.iterator(
tx.comp(
tx.base64Decode(),
tx.map(x => x - 0x80),
tx.takeWhile(x=> x < 0)
),
enc)]
buf = tx.transduce(
tx.comp(tx.utf8Encode(), tx.base64Encode()),
tx.str(),
"beer (🍺) or hot beverage (☕︎)"
);
tx.transduce(tx.comp(tx.base64Decode(), tx.utf8Decode()), tx.str(), buf)
Weighted random choices
tx.transduce(tx.take(10), tx.push(), tx.choices("abcd", [1, 0.5, 0.25, 0.125]))
tx.transduce(tx.take(1000), tx.frequencies(), tx.choices("abcd", [1, 0.5, 0.25, 0.125]))
API
Documentation is slowly forthcoming in the form of doc comments (incl. code
examples) for a growing number the functions listed below. Please see source
code for now.
Types
Apart from type aliases, the only real types defined are:
Reducer
Reducers are the core building blocks of transducers. Unlike other
implementations using OOP approaches, a Reducer
in this lib is a simple
3-element array of functions, each addressing a separate processing step.
Since v0.6.0 the bundled reducers are all wrapped in functions to provide a
uniform API (and some of them can be preconfigured and/or are stateful
closures). However, it's fine to define stateless reducers as constant arrays.
interface Reducer<A, B> extends Array<any> {
[0]: () => A,
[1]: (acc: A) => A,
[2]: (acc: A, x: B) => A | Reduced<A>,
}
const push: Reducer<any[], any> = [
() => [],
(acc) => acc,
(acc, x) => (acc.push(x), acc),
];
partition
, partitionBy
, streamSort
, streamShuffle
are (examples of)
transducers making use of their 1-arity completing function.
Reduced
class Reduced<T> implements IDeref<T> {
protected value: T;
constructor(val: T);
deref(): T;
}
Simple type wrapper to identify early termination of a reducer. Does not modify
wrapped value by injecting magic properties. Instances can be created via
reduced(x)
and handled via these helper functions:
reduced(x: any): any
isReduced(x: any): boolean
ensureReduced(x: any): Reduced<any>
unreduced(x: any): any
Transducer
From Rich Hickey's original definition:
A transducer is a transformation from one reducing function to another
As shown in the examples above, transducers can be dynamically composed (using
comp()
) to form arbitrary data transformation pipelines without causing large
overheads for intermediate collections.
type Transducer<A, B> = (rfn: Reducer<any, B>) => Reducer<any, A>;
function map<A, B>(fn: (x: A) => B): Transducer<A, B> {
return (rfn: Reducer<any, B>) => {
return [
() => rfn[0](),
(acc) => rfn[1](acc),
(acc, x: A) => rfn[2](acc, fn(x))
];
};
}
function dedupe<T>(): Transducer<T, T> {
return (rfn: Reducer<any, T>) => {
let prev = {};
return [
() => rfn[0](),
(acc) => rfn[1](acc),
(acc, x) => {
acc = prev === x ? acc : rfn[2](acc, x);
prev = x;
return acc;
}
];
};
}
Transformations
comp(f1, f2, ...)
Returns new transducer composed from given transducers. Data flow is from left
to right. Offers fast paths for up to 10 args. If more are given, composition
is done dynamically via for loop.
compR(rfn: Reducer<any, any>, fn: (acc, x) => any): Reducer<any, any>
Helper function to compose reducers.
iterator<A, B>(tx: Transducer<A, B>, xs: Iterable<A>): IterableIterator<B>
Similar to transduce()
, but emits results as ES6 iterator (and hence doesn't
use a reduction function).
reduce<A, B>(rfn: Reducer<A, B>, acc: A, xs: Iterable<B>): A
Reduces iterable using given reducer and optional initial accumulator/result.
transduce<A, B, C>(tx: Transducer<A, B>, rfn: Reducer<C, B>, acc: C, xs: Iterable<A>): C
Transforms iterable using given transducer and combines results with given
reducer and optional initial accumulator/result.
Transducers
base64Decode(): Transducer<string, number>
base64Encode(urlSafe?: boolean, bufSize?: number): Transducer<number, string>
benchmark(): Transducer<any, number>
bits(wordSize?: number, msbFirst?: boolean): Transducer<number, number>
cat<T>(): Transducer<T[], T>
convolve2d(src: number[], width: number, height: number, weights: number[], kwidth: number, kheight: number, wrap?: boolean): Transducer<number[], number>
dedupe<T>(equiv?: (a: T, b: T) => boolean): Transducer<T, T>
delayed<T>(t: number): Transducer<T, Promise<T>>
distinct<T>(mapfn?: (x: T) => any): Transducer<T, T>
drop<T>(n: number): Transducer<T, T>
dropNth<T>(n: number): Transducer<T, T>
dropWhile<T>(pred: Predicate<T>): Transducer<T, T>
duplicate<T>(n?: number): Transducer<T, T>
filter<T>(pred: Predicate<T>): Transducer<T, T>
flatten<T>(): Transducer<T | Iterable<T>, T>
flattenWith<T>(fn: (x: T) => Iterable<T>): Transducer<T | Iterable<T>, T>
hexDump(cols?: number, addr?: number): Transducer<number, string>
indexed<T>(): Transducer<T, [number, T]>
inspect<T>(prefix?: string): Transducer<T, T>
interleave<A, B>(sep: B | (() => B)): Transducer<A, A | B>
interpose<A, B>(sep: B | (() => B)): Transducer<A, A | B>
keep<T>(f?: ((x: T) => any)): Transducer<T, T>
labeled<L, T>(id: L | ((x: T) => L)): Transducer<T, [L, T]>
map<A, B>(fn: (x: A) => B): Transducer<A, B>
mapcat<A, B>(fn: (x: A) => Iterable<B>): Transducer<A, B>
mapDeep(spec: TransformSpec): Transducer<any, any>
mapIndexed<A, B>(fn: (i: number, x: A) => B, offset = 0): Transducer<A, B>
mapKeys(keys: IObjectOf<(x: any) => any>, copy?: boolean): Transducer<any, any>
mapNth<A, B>(n: number, offset: number, fn: (x: A) => B): Transducer<A, B>
mapVals<A, B>(fn: (v: A) => B, copy = true): Transducer<IObjectOf<A>, IObjectOf<B>>
movingAverage(n: number): Transducer<number, number>
movingMedian<A, B>(n: number, key?: ((x: A) => B), cmp?: Comparator<B>): Transducer<A, A>
multiplex<T, A, B>(a: Transducer<T, A>, b: Transducer<T, B>...): Transducer<T, [A, B...]>
multiplexObj<A, B>(xforms: IObjectOf<Transducer<A, any>>, rfn?: Reducer<B, [PropertyKey, any]>): Transducer<A, B>
noop<T>(): Transducer<T, T>
padLast<T>(n: number, fill: T): Transducer<T, T>
page<T>(page: number, pageLen = 10): Transducer<T, T>
partition<T>(size: number, step?: number, all?: boolean): Transducer<T, T[]>
partitionBy<T>(fn: (x: T) => any): Transducer<T, T[]>
partitionOf<T>(sizes: number[]): Transducer<T, T[]>
partitionSort<A, B>(n: number, key?: ((x: A) => B), cmp?: Comparator<B>): Transducer<A, A>
partitionSync<T>(keys: PropertyKey[] | Set<PropertyKey>, keyfn: (x: T) => PropertyKey, reset = true, all = true): Transducer<T, IObjectOf<T>>
pluck<A, B>(key: PropertyKey): Transducer<A, B>
rename<A, B>(kmap: IObjectOf<PropertyKey>, rfn?: Reducer<B, [PropertyKey, A]>): Transducer<A[], B>
sample<T>(prob: number): Transducer<T, T>
scan<A, B>(rfn: Reducer<B, A>, acc?: B): Transducer<A, B>
selectKeys(...keys: PropertyKey[]): Transducer<any, any>
sideEffect<T>(fn: (x: T) => void): Transducer<T, T>
streamShuffle<T>(n: number, maxSwaps?: number): Transducer<T, T>
streamSort<A, B>(n: number, key?: ((x: A) => B), cmp?: Comparator<B>): Transducer<A, A>
struct<T>(fields: StructField[]): Transducer<any, T>
swizzle<T>(order: PropertyKey[]): Transducer<T, any>
take<T>(n: number): Transducer<T, T>
takeLast<T>(n: number): Transducer<T, T>
takeNth<T>(n: number): Transducer<T, T>
takeWhile<T>(pred: Predicate<T>): Transducer<T, T>
throttle<T>(delay: number): Transducer<T, T>
throttleTime<T>(delay: number): Transducer<T, T>
utf8Decode(): Transducer<number, string>
utf8Encode(): Transducer<string, number>
Reducers
add(): Reducer<number, number>
assocMap<A, B>(): Reducer<Map<A, B>, [A, B]>
assocObj<T>(): Reducer<IObjectOf<T>, [PropertyKey, T]>
conj<T>(): Reducer<Set<T>, T>
count(offset?: number, step?: number): Reducer<number, any>
every<T>(pred?: Predicate<T>): Reducer<boolean, T>
frequencies<A, B>(key: (x: A) => B): Reducer<Map<B, number>, A>
groupBinary<T>(bits: number, key: (x: T) => number, branch?: () => IObjectOf<T[]>, leaf?: Reducer<any, T>, left?: PropertyKey, right?: PropertyKey): Reducer<any, T>
groupByMap<A, B, C>(key: (x: A) => B, rfn?: Reducer<C, A>): Reducer<Map<B, C>, A>
groupByObj<A, C>(key: (x: A) => PropertyKey, rfn?: Reducer<C, A>, init?: () => IObjectOf<C>): Reducer<IObjectOf<C>, A>
last(): last<T>(): Reducer<T, T>
max(): Reducer<number, number>
maxCompare<T>(ident: () => T, cmp: Comparator<T> = compare): Reducer<T, T>
mean(): Reducer<number, number>
min(): Reducer<number, number>
minCompare<T>(ident: () => T, cmp: Comparator<T> = compare): Reducer<T, T>
mul(): Reducer<number, number>
push<T>(): Reducer<T[], T>
pushCopy<T>(): Reducer<T[], T>
reductions<A, B>(rfn: Reducer<A, B>): Reducer<A[], B>
some<T>(pred?: Predicate<T>): Reducer<boolean, T>
str(sep = ""): Reducer<string, any>
Generators / Iterators
choices<T>(choices: T[], weights?: number[])
concat<T>(...xs: Iterable<T>[]): IterableIterator<T>
cycle<T>(input: Iterable<T>): IterableIterator<T>
iterate<T>(fn: (x: T) => T, seed: T): IterableIterator<T>
keys(x: any): IterableIterator<string>
pairs(x: any): IterableIterator<[string, any]>
range(from?: number, to?: number, step?: number): IterableIterator<number>
range2d(x1: number, x2: number, y1: number, y2: number, stepx?: number, stepy?: number): IterableIterator<number>
range3d(x1: number, x2: number, y1: number, y2: number, z1: number, z2: number, stepx?: number, stepy?: number, stepz?: number): IterableIterator<number>
repeat<T>(x: T, n?: number): IterableIterator<T>
repeatedly<T>(fn: () => T, n?: number): IterableIterator<T>
reverse<T>(input: Iterable<T>): IterableIterator<any>
tuples(...src: Iterable<any>[]): IterableIterator<any[]>
vals<T>(x: IObjectOf<T>): IterableIterator<T>
Authors
License
© 2016-2018 Karsten Schmidt // Apache Software License 2.0