Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

streaming-iterables

Package Overview
Dependencies
Maintainers
2
Versions
41
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

streaming-iterables

A collection of utilities for async iterables. Designed to replace your streams.

  • 3.2.0
  • Source
  • npm
  • Socket score

Version published
Weekly downloads
17K
decreased by-15.59%
Maintainers
2
Weekly downloads
 
Created
Source

streaming-iterables 🏄‍♂️

Build Status Try streaming-iterables on RunKit install size

A collection of utilities for async iterables. Designed to help replace your streams.

Streams were our last best hope for processing unbounded amounts of data. They've been hard to work with. But now with Node 10 they have become something greater, they've become async iterable. With async iterators you can have less code, do more work, faster.

If you still need streams with async functions, check out sister project bluestream🏄‍♀️!

Install

There are no dependencies.

npm install streaming-iterables

Example

Download a bunch of pokemon (try it here!)

const { buffer, flatten, pipeline, transform } = require('streaming-iterables')
const got = require('got')

// A generator to fetch all the pokemon from the pokemon api
const pokedex = async function* () {
  let offset = 0
  while(true) {
    const url = `https://pokeapi.co/api/v2/pokemon/?offset=${offset}`
    const { body: { results: pokemon } } = await got(url, { json: true })
    if (pokemon.length === 0) {
      return
    }
    offset += pokemon.length
    yield pokemon
  }
}

// lets buffer two pages so they're ready when we want them
const bufferTwo = buffer(2)

// a transform iterator that will load the monsters two at a time and yield them as soon as they're ready
const pokeLoader = transform(2, async ({ url }) => {
  const { body } = await got(url, { json: true })
  return body
})

// string together all our functions
const pokePipe = pipeline(pokedex, bufferTwo, flatten, pokeLoader)

// lets do it team!
const run = async () => {
  for await (const pokemon of pokePipe){
    console.log(`${pokemon.name} ${pokemon.sprites.front_default}`)
  }
}

run().then(() => console.log('caught them all!'))

Overview

Every function is curryable, you can call it with any number of arguments. For example:

import { map } from 'streaming-iterables'

for await (const str of map(String, [1,2,3])) {
  console.log(str)
}
// "1", "2", "3"

const stringable = map(String)
for await (const str of stringable([1,2,3])) {
  console.log(str)
}
// "1", "2", "3"

Since this works with async iterators it polyfills the symbol Symbol.asyncIterator if it doesn't exist. (Not needed after node 10.)

if ((Symbol as any).asyncIterator === undefined) {
  ;(Symbol as any).asyncIterator = Symbol.for('asyncIterator')
}

API

batch

function batch<T>(size: number, iterable: AsyncIterable<T>): AsyncIterableIterator<T[]>
function batch<T>(size: number, iterable: Iterable<T>): IterableIterator<T[]>

Batch objects from iterable into arrays of size length. The final array may be shorter than size if there is not enough items. Returns a sync iterator if the iterable is sync, otherwise an async iterator.

import { batch } from 'streaming-iterables'
import { getPokemon } from 'iterable-pokedex'

// batch 10 pokemon while we process them
for await (const pokemons of batch(10, getPokemon())) {
  console.log(pokemons) // 10 pokemon at a time!
}

buffer

function buffer<T>(size: number, iterable: AsyncIterable<T>): AsyncIterableIterator<T>
function buffer<T>(size: number, iterable: Iterable<T>): IterableIterator<T>

Buffer keeps a number of objects in reserve available for immediate reading. This is helpful with async iterators as it will prefetch results so you don't have to wait for them to load. For sync iterables it will precompute up to size values and keep them in reserve. The internal buffer will start to be filled once .next() is called for the first time. This matches generator function behaviors.

import { buffer } from 'streaming-iterables'
import { getPokemon, trainMonster } from 'iterable-pokedex'

// load 10 monsters in the background while we process them one by one
for await (const monster of buffer(10, getPokemon())) {
  await trainMonster(monster) // got to do some pokéwork
}

collect

function collect<T>(iterable: Iterable<T>): T[]
function collect<T>(iterable: AsyncIterable<T>): Promise<T[]>

Collect all the values from an iterable into an array. Returns an array if you pass it an iterable and a promise for an array if you pass it an async iterable.

import { collect } from 'streaming-iterables'
import { getPokemon } from 'iterable-pokedex'

console.log(await collect(getPokemon()))
// [bulbasaur, ivysaur, venusaur, charmander, ...]

concat

function concat(...iterables: Array<Iterable<any>>): IterableIterator<any>
function concat(...iterables: Array<AnyIterable<any>>): AsyncIterableIterator<any>

Combine multiple iterators into a single iterable. Reads each iterable completely one at a time. Returns a sync iterator if all iterables are sync, otherwise it returns an async iterable.

import { concat } from 'streaming-iterables'
import { getPokemon } from 'iterable-pokedex'
import { getTransformers } from './util'

for await (const hero of concat(getPokemon(2), getTransformers(2))) {
  console.log(hero)
}
// charmander
// bulbasaur <- end of pokemon
// megatron
// bumblebee <- end of transformers

consume

export function consume<T>(iterator: Iterable<T>): void
export function consume<T>(iterator: AsyncIterable<T>): Promise<void>

A promise that resolves after the function drains the iterable of all data. Useful for processing a pipeline of data.

import { consume, map } from 'streaming-iterables'
import { getPokemon, trainMonster } from 'iterable-pokedex'

const train = map(trainMonster)
await consume(train(getPokemon())) // load all the pokemon and train them!

flatMap

function flatMap<T, B>(func: (data: T) => FlatMapValue<B>, iterable: AnyIterable<T>): AsyncIterableIterator<B>

Map func over the iterable, flatten the result and then ignore all null or undefined values. It's the transform function we've always needed. It's equivalent to;

(func, iterable) => filter(i => i !== undefined && i !== null, flatten(map(func, iterable)))

The return value for func is FlatMapValue<B>. Typescript doesn't have recursive types but you can nest iterables as deep as you like.

The ordering of the results is guaranteed.

import { flatMap } from 'streaming-iterables'
import { getPokemon, lookupStats } from 'iterable-pokedex'

async function getDefeatedGyms(pokemon) {
  if (pokemon.gymBattlesWon > 0) {
    const stats = await lookupStats(pokemon)
    return stats.gyms
  }
}

for await (const gym of flatMap(getDefeatedGyms, getPokemon())) {
  console.log(gym.name)
}
// "Pewter Gym"
// "Cerulean Gym"
// "Vermilion Gym"

flatten

function flatten<B>(iterable: AnyIterable<B | AnyIterable<B>>): AsyncIterableIterator<B>

Returns a new iterator by pulling every item out of iterable (and all its sub iterables) and yielding them depth-first. Checks for the iterable interfaces and iterates it if it exists. If the value is a string it is not iterated as that ends up in an infinite loop.

note: Typescript doesn't have recursive types but you can nest iterables as deep as you like.

import { flatten } from 'streaming-iterables'

for await (const item of flatten([1, 2, [3, [4, 5], 6])) {
  console.log(item)
}
// 1
// 2
// 3
// 4
// 5
// 6

flatTransform

function flatTransform<T, R>(concurrency: number, func: (data: T) => FlatMapValue<R>, iterable: AnyIterable<T>): AsyncIterableIterator<R>

Map func over the iterable, flatten the result and then ignore all null or undefined values. Returned async iterables are flattened concurrently too. It's the transform function we've always wanted.

It's similar to;

const filterEmpty = filter(i => i !== undefined && i !== null)
(concurrency, func, iterable) => filterEmpty(flatten(transform(concurrency, func, iterable)))

The return value for func is FlatMapValue<B>. Typescript doesn't have recursive types but you can nest iterables as deep as you like. Only directly returned async iterables are processed concurrently. (Eg, an async generator function's output will be processed concurrently, however returning an async generator's output in an array will not be processed concurrently.)

Order is determined by when async operations resolve. And it will run up to concurrency async operations at once. This includes promises and async iterables returned from func.

Promise Example;

import { flatTransform } from 'streaming-iterables'
import { getPokemon, lookupStats } from 'iterable-pokedex'

async function getDefeatedGyms(pokemon) {
  if (pokemon.gymBattlesWon > 0) {
    const stats = await lookupStats(pokemon)
    return stats.gyms
  }
}

// lookup 10 stats at a time
for await (const gym of flatTransform(10, getDefeatedGyms, getPokemon())) {
  console.log(gym.name)
}
// "Pewter Gym"
// "Cerulean Gym"
// "Vermilion Gym"

Async Generator Example

import { flatTransform } from 'streaming-iterables'
import { getPokemon } from 'iterable-pokedex'
import { findFriendsFB, findFriendsMySpace } from './util'


async function* findFriends (pokemeon) {
  yield await findFriendsFB(pokemon.name)
  yield await findFriendsMySpace(pokemon.name)
}

for await (const pokemon of flatTransform(10, findFriends, getPokemon())) {
  console.log(pokemon.name)
}
// Pikachu
// Meowth
// Ash - FB
// Jessie - FB
// Misty - MySpace
// James - MySpace

fromStream

function fromStream<T>(stream: Readable): AsyncIterable<T>

If you are on a node before node 10, you will have to use fromStream to turn the stream into an async iterator. If this function is used and the stream already has one, the one already present on the stream is used. This recommended for backwards compatibility.

import { fromStream } from 'streaming-iterables'
import { createReadStream } from 'fs'

const pokeLog = fromStream(createReadStream('./pokedex-operating-system.log'))

for await (const pokeData of pokeLog) {
  console.log(pokeData) // Buffer(...)
}

filter

function filter<T>(filterFunc: (data: T) => boolean | Promise<boolean>, iterable: AnyIterable<T>): AsyncIterableIterator<T>

Takes a filterFunc and a iterable, and returns a new async iterator of the same type containing the members of the given iterable which cause the filterFunc to return true.

import { filter } from 'streaming-iterables'
import { getPokemon } from 'iterable-pokedex'

const filterWater = filter(pokemon => pokemon.types.include('Water'))

for await (const pokemon of filterWater(getPokemon())) {
  console.log(pokemon)
}
// squirtle
// vaporeon
// magikarp

getIterator

function getIterator<T>(values: Iterableish<T>): Iterator<T> | AsyncIterator<T>

Get the iterator from any iterable or just return an iterator itself.

map

function map<T, B>(func: (data: T) => B | Promise<B>, iterable: AnyIterable<T>): AsyncIterableIterator<B>

Map a function or async function over all the values of an iterable.

import { consume, map } from 'streaming-iterables'
import got from 'got'

const urls = ['https://http.cat/200', 'https://http.cat/201', 'https://http.cat/202']
const download = map(got)

// download one at a time
for await (page of download(urls)) {
  console.log(page)
}

merge

function merge(...iterables: Array<AnyIterable<any>>): AsyncIterableIterator<any>

Combine multiple iterators into a single iterable. Reads one item off each iterable in order repeatedly until they are all exhausted. If you care less about order and want them faster see parallelMerge().

parallelMap

function parallelMap<T, R>(concurrency: number, func: (data: T) => R | Promise<R>, iterable: AnyIterable<T>): AsyncIterableIterator<R>

Map a function or async function over all the values of an iterable and do them concurrently. Just like map(). If you don't care about order, see the faster transform() function.

import { consume, map } from 'streaming-iterables'
import got from 'got'

const urls = ['https://http.cat/200', 'https://http.cat/201', 'https://http.cat/202']
const download = map(2, got)

// download two at a time
for await (page of download(urls)) {
  console.log(page)
}

parallelMerge

function parallelMerge<T>(...iterables: Array<AnyIterable<T>>): AsyncIterableIterator<T>

Combine multiple iterators into a single iterable. Reads one item off of every iterable and yields them as they resolve. This is useful for pulling items out of a collection of iterables as soon as they're available.

import { parallelMerge } from 'streaming-iterables'
import { getPokemon, getTransformer } from 'iterable-pokedex'

// pokemon are much faster to load btw
const heros = parallelMerge(getPokemon(), getTransformer())
for await (const hero of heros) {
  console.log(hero)
}
// charmander
// bulbasaur
// megatron
// pikachu
// eevee
// bumblebee
// jazz

pipeline

function pipeline(firstFn: Function, ...fns: Function[]): any;

Calls firstFn and then every function in fns with the result of the previous function.

import { pipeline, map, collect } from 'streaming-iterables'
import { getPokemon } from 'iterable-pokedex'
const getName = map(pokemon => pokemon.name)

// equivalent to `await collect(getName(getPokemon()))`
await pipeline(getPokemon, getName, collect)
// charmander
// bulbasaur
// MissingNo.

reduce

function reduce<T, B>(func: (acc: B, value: T) => B, start: B, iterable: AnyIterable<T>): Promise<B>;

An async function that takes a reducer function, an initial value and .

Reduces an iterable to a value which is the accumulated result of running each value from the iterable thru func, where each successive invocation is supplied the return value of the previous.

take

function take<T>(count: number, iterable: AnyIterable<T>): AsyncIterableIterator<T>

Returns a new iterator that reads a specific number of items from iterable.

tap

function tap<T>(func: (data: T) => any, iterable: AnyIterable<T>): AsyncIterableIterator<T>

Returns a new iterator that yields the data it consumes passing the data through to a function. If you provide an async function the iterator will wait for the promise to resolve before yielding the value. This is useful for logging, or processing information and passing it along.

transform

function transform<T, R>(concurrency: number, func: (data: T) => R | Promise<R>, iterable: AnyIterable<T>): AsyncIterableIterator<R>

Map a function or async function over all the values of an iterable. Order is determined by when func resolves. And it will run up to concurrency async func operations at once. If you care about order see parallelMap().

import { consume, transform } from 'streaming-iterables'
import got from 'got'

const urls = ['https://http.cat/200', 'https://http.cat/201', 'https://http.cat/202']
const download = transform(1000, got)

// download all of these at the same time
for await (page of download(urls)) {
  console.log(page)
}

writeToStream

function writeToStream(stream: Writable, iterable: AnyIterable<any>): Promise<void>

Writes the iterable to the stream respecting the stream backpressure. Resolves when the iterable is exhausted.

import { pipeline, map, writeToStream } from 'streaming-iterables'
import { getPokemon } from 'iterable-pokedex'
import { createWriteStream } from 'fs'

const file = createWriteStream('pokemon.ndjson')
const serialize = map(pokemon => `${JSON.stringify(pokemon)}\n`)
await pipeline(getPokemon, serialize, writeToStream(file))
file.end()
// now all the pokemon are written to the file!

Types

Iterableish

type Iterableish<T> = Iterable<T> | Iterator<T> | AsyncIterable<T> | AsyncIterator<T>

Any iterable or iterator.

AnyIterable

type AnyIterable<T> = Iterable<T> | AsyncIterable<T>

Literally any Iterable (async or regular).

FlatMapValue

type FlatMapValue<B> = B | AnyIterable<B> | undefined | null | Promise<B | AnyIterable<B> | undefined | null>

A value, an array of that value, undefined, null or promises for any of them. Used in the flatMap and flatTransform functions as possible return values of the mapping function.

Contributors wanted!

Writing docs and code is a lot of work! Thank you in advance for helping out.

Keywords

FAQs

Package last updated on 13 Dec 2018

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc