streaming-iterables 🏄♂️
A collection of utilities for async iterables. Designed to help replace your streams.
Streams were our last best hope for processing unbounded amounts of data. They've been hard to work with. But now with Node 10 they have become something greater, they've become async iterable. With async iterators you can have less code, do more work, faster.
If you still need streams with async functions, check out sister project bluestream
🏄♀️!
Install
There are no dependencies
npm install streaming-iterables
Example
Download a bunch of pokemon (try it here!)
const { buffer, flatten, pipeline, transform } = require('streaming-iterables')
const got = require('got')
const pokedex = async function* () {
let offset = 0
while(true) {
const url = `https://pokeapi.co/api/v2/pokemon/?offset=${offset}`
const { body: { results: pokemon } } = await got(url, { json: true })
if (pokemon.length === 0) {
return
}
offset += pokemon.length
yield pokemon
}
}
const bufferTwo = buffer(2)
const pokeLoader = transform(2, async ({ url }) => {
const { body } = await got(url, { json: true })
return body
})
const pokePipe = pipeline(pokedex, bufferTwo, flatten, pokeLoader)
const run = async () => {
for await (const pokemon of pokePipe){
console.log(`${pokemon.name} ${pokemon.sprites.front_default}`)
}
}
run().then(() => console.log('caught them all!'))
Overview
Every function is curryable, you can call it with any number of arguments. For example:
import { map } from 'streaming-iterables'
for await (const str of map(String, [1,2,3])) {
console.log(str)
}
const stringable = map(String)
for await (const str of stringable([1,2,3])) {
console.log(str)
}
Since this works with async iterators it polyfills the symbol Symbol.asyncIterator
if it doesn't exist. (Not needed after node 10.)
if ((Symbol as any).asyncIterator === undefined) {
;(Symbol as any).asyncIterator = Symbol.for('asyncIterator')
}
API
batch
function batch<t>(size: number, iterable: AnyIterable<T>): AsyncIterableIterator<T[]>
Batch objects from iterable
into arrays of size
length. The final array may be shorter than size if there is not enough items.
import { batch } from 'streaming-iterables'
import { getPokemon, trainMonster } from './util'
for await (const pokemons of batch(10, getPokemon())) {
console.log(pokemons)
}
buffer
function buffer<T>(size: number, iterable: AnyIterable<T>): AsyncIterableIterator<T>
Buffer keeps a number of objects in reserve available for immediate reading. This is helpful with async iterators as it will prefetch results so you don't have to wait for them to load.
import { buffer } from 'streaming-iterables'
import { getPokemon, trainMonster } from './util'
for await (const monster of buffer(10, getPokemon())) {
await trainMonster(monster)
}
collect
function collect<T>(iterable: AnyIterable<T>): Promise<T[]>
Collect all the values from an iterable into an array.
import { collect } from 'streaming-iterables'
import { getPokemon } from './util'
console.log(await collect(getPokemon()))
concat
function concat(...iterables: Array<AnyIterable<any>>): AsyncIterableIterator<any>
Combine multiple iterators into a single iterable. Reads each iterable one at a time.
import { concat } from 'streaming-iterables'
import { getPokemon, getTransformers } from './util'
for await (const hero of concat(getPokemon(2), getTransformers(2))) {
console.log(hero)
}
consume
function consume<T>(iterator: AnyIterable<T>): Promise<void>
A promise that resolves after the function drains the iterable of all data. Useful for processing a pipeline of data.
import { consume, map } from 'streaming-iterables'
import { getPokemon, trainMonster } from './util'
const train = map(trainMonster)
await consume(train(getPokemon()))
flatMap
function flatMap<T, B>(func: (data: T) => FlatMapValue<B>, iterable: AnyIterable<T>): AsyncIterableIterator<B>
Map func
over the iterable
, flatten the result and then ignore all null or undefined values. It's the transform function we've always needed. It's equivalent to;
(func, iterable) => filter(i => i !== undefined && i !== null, flatten(map(func, iterable)))
The return value for func
is FlatMapValue<B>
. Typescript doesn't have recursive types but you can nest iterables as deep as you like.
The ordering of the results is guaranteed.
import { flatMap } from 'streaming-iterables'
import { getPokemon, lookupStats } from './util'
async function getDefeatedGyms(pokemon) {
if (pokemon.gymBattlesWon > 0) {
const stats = await lookupStats(pokemon)
return stats.gyms
}
}
for await (const gym of flatMap(getDefeatedGyms, getPokemon())) {
console.log(gym.name)
}
flatten
function flatten<B>(iterable: AnyIterable<B | AnyIterable<B>>): AsyncIterableIterator<B>
Returns a new iterator by pulling every item out of iterable
(and all its sub iterables) and yielding them depth-first. Checks for the iterable interfaces and iterates it if it exists. If the value is a string it is not iterated as that ends up in an infinite loop.
note: Typescript doesn't have recursive types but you can nest iterables as deep as you like.
import { flatten } from 'streaming-iterables'
for await (const item of flatten([1, 2, [3, [4, 5], 6])) {
console.log(item)
}
flatTransform
function flatTransform<T, R>(concurrency: number, func: (data: T) => FlatMapValue<R>, iterable: AnyIterable<T>): AsyncIterableIterator<R>
Map func
over the iterable
, flatten the result and then ignore all null or undefined values. It's the transform function we've always needed. It's equivalent to;
(concurrency, func, iterable) => filter(i => i !== undefined && i !== null, flatten(transform(concurrency, func, iterable)))
The return value for func
is FlatMapValue<B>
. Typescript doesn't have recursive types but you can nest iterables as deep as you like.
Order is determined by when func
resolves. And it will run up to concurrency
async func
operations at once.
import { flatTransform } from 'streaming-iterables'
import { getPokemon, lookupStats } from './util'
async function getDefeatedGyms(pokemon) {
if (pokemon.gymBattlesWon > 0) {
const stats = await lookupStats(pokemon)
return stats.gyms
}
}
for await (const gym of flatTransform(10, getDefeatedGyms, getPokemon())) {
console.log(gym.name)
}
fromStream
function fromStream<T>(stream: Readable): AsyncIterable<T>
If you are on a node before node 10, you will have to use fromStream
to turn the stream into an async iterator. If this function is used and the stream already has one, the one already present on the stream is used. This recommended for backwards compatibility.
import { fromStream } from 'streaming-iterables'
import { createReadStream } from 'fs'
const pokeLog = fromStream(createReadStream('./pokedex-operating-system.log'))
for await (const pokeData of pokeLog) {
console.log(pokeData)
}
filter
function filter<T>(filterFunc: (data: T) => boolean | Promise<boolean>, iterable: AnyIterable<T>): AsyncIterableIterator<T>
Takes a filterFunc
and a iterable
, and returns a new async iterator of the same type containing the members of the given iterable which cause the filterFunc
to return true.
import { filter } from 'streaming-iterables'
import { getPokemon } from './util'
const filterWater = filter(pokemon => pokemon.elements.include('water'))
for await (const pokemon of filterWater(getPokemon())) {
console.log(pokemon)
}
getIterator
function getIterator<T>(values: Iterableish<T>): Iterator<T> | AsyncIterator<T>
Get the iterator from any iterable or just return an iterator itself.
map
function map<T, B>(func: (data: T) => B | Promise<B>, iterable: AnyIterable<T>): AsyncIterableIterator<B>
Map a function or async function over all the values of an iterable.
import { consume, map } from 'streaming-iterables'
import got from 'got'
const urls = ['https://http.cat/200', 'https://http.cat/201', 'https://http.cat/202']
const download = map(got)
for await (page of download(urls)) {
console.log(page)
}
merge
function merge(...iterables: Array<AnyIterable<any>>): AsyncIterableIterator<any>
Combine multiple iterators into a single iterable. Reads one item off each iterable in order repeatedly until they are all exhausted. If you care less about order and want them faster see parallelMerge()
.
parallelMap
function parallelMap<T, R>(concurrency: number, func: (data: T) => R | Promise<R>, iterable: AnyIterable<T>): AsyncIterableIterator<R>
Map a function or async function over all the values of an iterable and do them concurrently. Just like map()
. If you don't care about order, see the faster transform()
function.
import { consume, map } from 'streaming-iterables'
import got from 'got'
const urls = ['https://http.cat/200', 'https://http.cat/201', 'https://http.cat/202']
const download = map(2, got)
for await (page of download(urls)) {
console.log(page)
}
parallelMerge
function parallelMerge<T>(...iterables: Array<AnyIterable<T>>): AsyncIterableIterator<T>
Combine multiple iterators into a single iterable. Reads one item off of every iterable and yields them as they resolve. This is useful for pulling items out of a collection of iterables as soon as they're available.
import { parallelMerge } from 'streaming-iterables'
import { getPokemon, getTransformer } from './util'
const heros = parallelMerge(getPokemon(), getTransformer())
for await (const hero of heros) {
console.log(hero)
}
pipeline
function pipeline(firstFn: Function, ...fns: Function[]): any;
Calls firstFn
and then every function in fns
with the result of the previous function.
import { pipeline, map, collect } from 'streaming-iterables'
import { getPokemon } from './util'
const getName = map(pokemon => pokemon.name)
await pipeline(getPokemon, getName, collect)
reduce
function reduce<T, B>(func: (acc: B, value: T) => B, start: B, iterable: AnyIterable<T>): Promise<B>;
An async function that takes a reducer function, an initial value and .
Reduces an iterable to a value which is the accumulated result of running each value from the iterable thru func
, where each successive invocation is supplied the return value of the previous.
take
function take<T>(count: number, iterable: AnyIterable<T>): AsyncIterableIterator<T>
A passthrough iterator that reads a specific number of items from an iterator.
tap
function tap<T>(func: (data: T) => any, iterable: AnyIterable<T>): AsyncIterableIterator<T>
A passthrough iterator that yields the data it consumes passing the data through to a function. If you provide an async function the iterator will wait for the promise to resolve before yielding the value. This is useful for logging, or processing information and passing it along.
transform
function transform<T, R>(concurrency: number, func: (data: T) => R | Promise<R>, iterable: AnyIterable<T>): AsyncIterableIterator<R>
Map a function or async function over all the values of an iterable. Order is determined by when func
resolves. And it will run up to concurrency
async func
operations at once. If you care about order see parallelMap()
.
import { consume, transform } from 'streaming-iterables'
import got from 'got'
const urls = ['https://http.cat/200', 'https://http.cat/201', 'https://http.cat/202']
const download = transform(1000, got)
for await (page of download(urls)) {
console.log(page)
}
writeToStream
function writeToStream(stream: Writable, iterable: AnyIterable<any>): Promise<void>
Writes the iterable
to the stream respecting the stream backpressure. Resolves when the iterable is exhausted.
import { pipeline, map, writeToStream } from 'streaming-iterables'
import { getPokemon } from './util'
import { createWriteStream } from 'fs'
const file = createWriteStream('pokemon.ndjson')
const serialize = map(pokemon => `${JSON.stringify(pokemon)}\n`)
await pipeline(getPokemon, serialize, writeToStream(file))
file.end()
Types
Iterableish
type Iterableish<T> = Iterable<T> | Iterator<T> | AsyncIterable<T> | AsyncIterator<T>
Any iterable or iterator.
AnyIterable
type AnyIterable<T> = Iterable<T> | AsyncIterable<T>
Literally any Iterable
(async or regular).
FlatMapValue
type FlatMapValue<B> = B | AnyIterable<B> | undefined | null | Promise<B | AnyIterable<B> | undefined | null>
A value, an array of that value, undefined, null or promises for any of them. Used in the flatMap
and flatTransform
functions as possible return values of the mapping function.
Contributors wanted!
Writing docs and code is a lot of work! Thank you in advance for helping out.