༄ streamable
Stream-like manipulation of iterables
A Stream[T]
decorates an Iterable[T]
with a fluent interface enabling the chaining of lazy operations.
| |
---|
🔗 Fluent | chain methods! |
🇹 Typed | type-annotated and mypy able |
💤 Lazy | operations are lazily evaluated at iteration time |
🔄 Concurrent | via threads or processes or asyncio |
🛡️ Robust | unit-tested for Python 3.7 to 3.14 with 100% coverage |
🪶 Minimalist | pip install streamable with no additional dependencies |
1. install
pip install streamable
2. import
from streamable import Stream
3. init
Instantiate a Stream[T]
from an Iterable[T]
.
integers: Stream[int] = Stream(range(10))
4. operate
inverses: Stream[float] = (
integers
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError)
)
5. iterate
- Iterate over a
Stream[T]
as you would over any other Iterable[T]
. - Source elements are processed on-the-fly.
collect it
>>> list(inverses)
[1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
>>> set(inverses)
{0.5, 1.0, 0.2, 0.33, 0.25, 0.17, 0.14, 0.12, 0.11}
reduce it
>>> sum(inverses)
2.82
>>> max(inverses)
1.0
>>> from functools import reduce
>>> reduce(..., inverses)
loop it
>>> for inverse in inverses:
>>> ...
next it
>>> inverses_iter = iter(inverses)
>>> next(inverses_iter)
1.0
>>> next(inverses_iter)
0.5
📒 Operations
.map
Applies a transformation on elements:
negative_integer_strings: Stream[str] = integers.map(lambda n: -n).map(str)
assert list(negative_integer_strings) == ['0', '-1', '-2', '-3', '-4', '-5', '-6', '-7', '-8', '-9']
thread-based concurrency
Applies the transformation via concurrency
threads:
import requests
pokemon_names: Stream[str] = (
Stream(range(1, 4))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.map(requests.get, concurrency=3)
.map(requests.Response.json)
.map(lambda poke: poke["name"])
)
assert list(pokemon_names) == ['bulbasaur', 'ivysaur', 'venusaur']
Preserves the upstream order by default (FIFO), but you can set ordered=False
for First Done First Out.
concurrency
is also the size of the buffer containing not-yet-yielded results. If the buffer is full, the iteration over the upstream is paused until a result is yielded from the buffer.
process-based concurrency
Set via="process"
:
if __name__ == "__main__":
state: List[int] = []
n_integers: int = integers.map(state.append, concurrency=4, via="process").count()
assert n_integers == 10
assert state == []
async-based concurrency
The sibling operation .amap
applies an async function:
import httpx
import asyncio
http_async_client = httpx.AsyncClient()
pokemon_names: Stream[str] = (
Stream(range(1, 4))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.amap(http_async_client.get, concurrency=3)
.map(httpx.Response.json)
.map(lambda poke: poke["name"])
)
assert list(pokemon_names) == ['bulbasaur', 'ivysaur', 'venusaur']
asyncio.get_event_loop().run_until_complete(http_async_client.aclose())
starmap
The star
function decorator transforms a function that takes several positional arguments into a function that takes a tuple:
from streamable import star
zeros: Stream[int] = (
Stream(enumerate(integers))
.map(star(lambda index, integer: index - integer))
)
assert list(zeros) == [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Also convenient with .foreach
, .filter
, ...
.foreach
Applies a side effect on elements:
self_printing_integers: Stream[int] = integers.foreach(print)
assert list(self_printing_integers) == list(integers)
thread-based concurrency
Like .map
it has an optional concurrency
parameter.
process-based concurrency
Like for .map
, set the parameter via="process"
.
async-based concurrency
Like .map
it has a sibling .aforeach
operation for async.
.filter
Keeps only the elements that satisfy a condition:
pair_integers: Stream[int] = integers.filter(lambda n: n % 2 == 0)
assert list(pair_integers) == [0, 2, 4, 6, 8]
.throttle
Limits the number of yields per_second
/per_minute
/per_hour
:
slow_integers: Stream[int] = integers.throttle(per_second=5)
assert list(slow_integers) == list(integers)
and/or ensure a minimum time interval
separates successive yields:
from datetime import timedelta
slow_integers = integers.throttle(interval=timedelta(milliseconds=100))
assert list(slow_integers) == list(integers)
.group
Groups elements into List
s:
integers_5_by_5: Stream[List[int]] = integers.group(size=5)
assert list(integers_5_by_5) == [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
integers_by_parity: Stream[List[int]] = integers.group(by=lambda n: n % 2)
assert list(integers_by_parity) == [[0, 2, 4, 6, 8], [1, 3, 5, 7, 9]]
from datetime import timedelta
integers_within_1s: Stream[List[int]] = (
integers
.throttle(per_second=2)
.group(interval=timedelta(seconds=0.99))
)
assert list(integers_within_1s) == [[0, 1, 2], [3, 4], [5, 6], [7, 8], [9]]
Mix size
/by
/interval
parameters:
integers_2_by_2_by_parity: Stream[List[int]] = integers.group(by=lambda n: n % 2, size=2)
assert list(integers_2_by_2_by_parity) == [[0, 2], [1, 3], [4, 6], [5, 7], [8], [9]]
.flatten
Ungroups elements assuming that they are Iterable
s.
pair_then_odd_integers: Stream[int] = integers_by_parity.flatten()
assert list(pair_then_odd_integers) == [0, 2, 4, 6, 8, 1, 3, 5, 7, 9]
thread-based concurrency
Flattens concurrency
iterables concurrently:
mix_of_0s_and_1s: Stream[int] = Stream([[0] * 4, [1] * 4]).flatten(concurrency=2)
assert list(mix_of_0s_and_1s) == [0, 1, 0, 1, 0, 1, 0, 1]
.catch
Catches a given type of exceptions, and optionally yields a replacement
value:
inverses: Stream[float] = (
integers
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError, replacement=float("inf"))
)
assert list(inverses) == [float("inf"), 1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
You can specify an additional when
condition for the catch:
import requests
from requests.exceptions import SSLError
status_codes_ignoring_resolution_errors: Stream[int] = (
Stream(["https://github.com", "https://foo.bar", "https://github.com/foo/bar"])
.map(requests.get, concurrency=2)
.catch(SSLError, when=lambda exception: "Max retries exceeded with url" in str(exception))
.map(lambda response: response.status_code)
)
assert list(status_codes_ignoring_resolution_errors) == [200, 404]
It has an optional finally_raise: bool
parameter to raise the first catched exception when iteration ends.
.truncate
Ends iteration once a given number of elements have been yielded:
five_first_integers: Stream[int] = integers.truncate(5)
assert list(five_first_integers) == [0, 1, 2, 3, 4]
... or when a condition has become satisfied:
five_first_integers: Stream[int] = integers.truncate(when=lambda n: n == 5)
assert list(five_first_integers) == [0, 1, 2, 3, 4]
.observe
Logs the progress of iterations over this stream, if you iterate on:
observed_slow_integers: Stream[int] = slow_integers.observe("integers")
you will get these logs:
INFO: [duration=0:00:00.502155 errors=0] 1 integers yielded
INFO: [duration=0:00:01.006336 errors=0] 2 integers yielded
INFO: [duration=0:00:02.011921 errors=0] 4 integers yielded
INFO: [duration=0:00:04.029666 errors=0] 8 integers yielded
INFO: [duration=0:00:05.039571 errors=0] 10 integers yielded
The amount of logs will never be overwhelming because they are produced logarithmically e.g. the 11th log will be produced when the iteration reaches the 1024th element.
zip
Use the standard zip
function:
from streamable import star
cubes: Stream[int] = (
Stream(zip(integers, integers, integers))
.map(star(lambda a, b, c: a * b * c))
)
assert list(cubes) == [0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
📦 Notes Box
Contribute
Please help us ! Feel very welcome to:
exhaust the stream
.count
iterates over the stream until exhaustion and returns the count of elements yielded.
>>> assert integers.count() == 10
calling the stream iterates over it until exhaustion and returns it.
>>> verbose_integers: Stream[int] = integers.foreach(print)
>>> assert verbose_integers() is verbose_integers
0
1
2
3
4
5
6
7
8
9
ETL scripts (i.e. scripts fetching -> processing -> pushing data) can benefit from the expressivity of this library.
Here is an example that you can copy-paste and try (it only requires requests
): it creates a CSV file containing all the 67 quadrupeds from the 1st, 2nd and 3rd generations of Pokémons (kudos to PokéAPI)
import csv
from datetime import timedelta
import itertools
import requests
from streamable import Stream
with open("./quadruped_pokemons.csv", mode="w") as file:
fields = ["id", "name", "is_legendary", "base_happiness", "capture_rate"]
writer = csv.DictWriter(file, fields, extrasaction='ignore')
writer.writeheader()
(
Stream(itertools.count(1))
.throttle(per_second=16)
.map(lambda poke_id: f"https://pokeapi.co/api/v2/pokemon-species/{poke_id}")
.map(requests.get, concurrency=8)
.foreach(requests.Response.raise_for_status)
.map(requests.Response.json)
.truncate(when=lambda poke: poke["generation"]["name"] == "generation-iv")
.observe("pokemons")
.filter(lambda poke: poke["shape"]["name"] == "quadruped")
.observe("quadruped pokemons")
.catch(
TypeError,
when=lambda error: str(error) == "'NoneType' object is not subscriptable"
)
.group(interval=timedelta(seconds=5))
.foreach(writer.writerows)
.flatten()
.observe("written pokemons")
.catch(finally_raise=True)
.count()
)
logging level
logging.getLogger("streamable").setLevel(logging.WARNING)
visitor pattern
The Stream
class exposes an .accept
method and you can implement a visitor by extending the streamable.visitors.Visitor
abstract class:
from streamable.visitors import Visitor
class DepthVisitor(Visitor[int]):
def visit_stream(self, stream: Stream) -> int:
if not stream.upstream:
return 1
return 1 + stream.upstream.accept(self)
def depth(stream: Stream) -> int:
return stream.accept(DepthVisitor())
assert depth(Stream(range(10)).map(str).filter()) == 3
as functions
The Stream
's methods are also exposed as functions:
from streamable.functions import catch
inverse_integers: Iterator[int] = map(lambda n: 1 / n, range(10))
safe_inverse_integers: Iterator[int] = catch(inverse_integers, ZeroDivisionError)
compatible with free-threaded Python 3.13+
Benefits from free-threaded Python 3.13+ builds, run via python -X gil=0
.
ty highlighters 🙏