
Product
Secure Your AI-Generated Code with Socket MCP
Socket MCP brings real-time security checks to AI-generated code, helping developers catch risky dependencies before they enter the codebase.
Brings functional programming data pipelines with robust validation to python and mypy
To bring functional programming data pipelines with robust validation to python and mypy
One problem is trying to express python functions in terms of dataflows. Think of a function that progresses in stages like the following:
source_input -> filter -> transform -> filter -> sum
Dataflows can be more naturally represented with infix notation, with the preceding stage leading to the following stage through chaining. But in python we would find ourselves writing
sum(filter(transform(filter(source_input))))
which is not very handy. Another approach would be creating placeholder variables to store each stage, but this also introduces unnecessary state. If this state is mutable, it goes against the principle of purity in functional programming and the function does not clearly denote a mathematical function.
In data analysis and ETL contexts, we may have to build large dataflows so a better approach is necessary.
a major inspiration for this project and a project which solves the above problem is pipe by Julien Palard. It allows for infix notation and gives a simple @Pipe decorator to extend this to any functionality the user needs.
This project aims to build on Julien Palard's project, but with new design considerations:
The project aims to make it easier to write pure python functions with robust error-checking and all the benefits of static analysis tools like mypy.
pip install checkpipe
import checkpipe as pipe
print(
[1, 2, 3]
| pipe.OfIter[int]
.map(lambda n: n * 2)
| pipe.OfIter[int]
.filter(lambda n: n != 4)
| pipe.OfIter[int]
.to_list()
)
[2, 6]
The above example takes a source input [1, 2, 3]
and transforms it by multiplying each value by 2 into, then keeping only results that aren't 4 and finally consuming this lazy iterator chain into a list result.
When using checkpipe, we are relying on specifying the type of the source
in order for our lambdas to be typed. [1, 2, 3]
is a List[int] and also can be iterated through as an Iterable[int]. Working with this type of source, we
use pipe.OfIter[int]
. This makes use of generics to give us expectations on
the signature of the higher order functions passed to functions like .map
and .filter
. These expectations can be automatically checked by mypy. And vscode is able to know that n
is an integer in the lambdas.
Let's say we want to sum over our source input [1, 2, 3]
and fold it into a single int.
Here's an example to implement that:
import checkpipe as pipe
print(
[1, 2, 3]
| pipe.OfIter[int]
.fold(0, lambda acc, n: acc + n)
)
6
Maybe we want to stop iterating before we finish consuming the list. We can use pipe.stop_iter
as in the following:
import checkpipe as pipe
print(
[1, 2, 3, 4, 5]
| pipe.OfIter[int]
.fold(0, lambda acc, n:
acc + n if n <= 3 else pipe.stop_iter(acc)
)
)
6
import checkpipe as pipe
print(
3
| pipe.Of[int]
.map(lambda n: n+1)
)
4
checkpipe does not only work with iterators. It works directly with types and
allows transformations to the source object as well. In this case, no consumption
of an iterator is necessary. .map(...)
will return the transformed source
directly.
import checkpipe as pipe
from result import Result
print(
[1, 2, 3]
| pipe.OfIter[int]
.map(lambda n: n * 2)
| pipe.OfIter[int]
.check(lambda n: n != 4)
| pipe.OfIter[Result[int, int]]
.to_list()
)
[Ok(2), Err(4), Ok(6)]
Here, we are able to use .OfIter[int].check
to apply a tag on all values in the source. Ok[int]
when they pass the check n != 4
otherwise Err[int]
. This allows us to propogate errors and handle errors in the pipeline itself. Note that when we're consuming the iterator pipeline with .to_list()
, we are referring to a new source Iterator[Result[int, int]]
to reflect the Ok/Err tagging.
We can now proceed to perform more computations on the Ok[int]
results only:
import checkpipe as pipe
from result import Result
print(
[1, 2, 3]
| pipe.OfIter[int]
.map(lambda n: n * 2)
| pipe.OfIter[int]
.check(lambda n: n != 4)
| pipe.OfResultIter[int, int]
.map_ok(lambda n: n + 1)
| pipe.OfIter[Result[int, int]]
.to_list()
)
[Ok(3), Err(4), Ok(7)]
Here, .OfResultIter[int, int]
works with an iterable of Results as a source, and only when it detects an Ok, it performs the computation n+1. So we can see that Ok(2)
became Ok(3)
and Ok(6)
became Ok(7)
, but Err(4)
remains untouched.
We can also use a different type for the error:
import checkpipe as pipe
from result import Result
print(
[1, 2, 3, 4]
| pipe.OfIter[int]
.map(lambda n: n + 2)
| pipe.OfResultIter[int, str]
.check(
lambda n: n % 2 != 0,
lambda n: f'Evens like {n} are not allowd!')
| pipe.OfIter[Result[int, str]]
.to_list()
)
[Ok(3), Err('Evens like 4 are not allowd!'), Ok(5), Err('Evens like 6 are not allowd!')]
Here OfResultIter[int, str]
specifies that errors will be in type str and Ok is in type int. It takes two functions, a predicate to check if the int is okay, and a function that maps from that int to some error message. We can then continue processing on just the Ok[int]
results with .map_ok(...)
just like before:
import checkpipe as pipe
from result import Result
print(
[1, 2, 3, 4]
| pipe.OfIter[int]
.map(lambda n: n + 2)
| pipe.OfResultIter[int, str]
.check(
lambda n: n % 2 != 0,
lambda n: f'Evens like {n} are not allowd!')
| pipe.OfResultIter[int, str]
.map_ok(lambda n: n * 10)
| pipe.OfIter[Result[int, str]]
.to_list()
)
[Ok(30), Err('Evens like 4 are not allowd!'), Ok(50), Err('Evens like 6 are not allowd!')]
We can also chain multiple checks in a row, keeping in mind that checks on Result[T, E]
use the then_check
variants while checks on T
use check
.
import checkpipe as pipe
from result import Result
print(
[1, 2, 3, 4]
| pipe.OfIter[int]
.map(lambda n: n + 2)
| pipe.OfResultIter[int, str]
.check(
lambda n: n % 2 != 0,
lambda n: f'Evens like {n} are not allowd!')
| pipe.OfResultIter[int, str]
.then_check(
lambda n: n != 3,
lambda _: 'The number 3 is specifically not welcome!')
| pipe.OfResultIter[int, str]
.map_ok(lambda n: n * 10)
| pipe.OfIter[Result[int, str]]
.to_list()
)
[Err('The number 3 is specifically not welcome!'), Err('Evens like 4 are not allowd!'), Ok(50), Err('Evens like 6 are not allowd!')]
Sometimes doing a check requires finding a problematic aspect of the source object. For this, we
use the check_using
functions, which take a finder callback which returns None if it finds
nothing problematic, it just tags the source Ok. But if it does find something problematic, it uses
the problematic object to create an Err object.
import checkpipe as pipe
from result import Result
def find_capitalized_word(s: str) -> Optional[str]:
words = s.split(' ')
for word in words:
if str.isupper(word):
return word
return None
print(
[
'this string contains no CAPITALIZED words!',
'this one is all good!'
]
| pipe.OfResultIter[str, str]
.check_using(
find_capitalized_word,
lambda cap_word: f'Bad! You used a capitalized word: {cap_word}')
| pipe.OfIter[Result[str, str]]
.to_list()
)
[Err('Bad! You used a capitalized word: CAPITALIZED'), Ok('this one is all good!')]
Often we might have an error occur during mapping so when we consume we end up with a type like List[Result[T, E]]
. We can flatten the results by shortcircuiting on the first error, turning it into a Result[List[T], E]
. Let's say we're interested in checking that a tuple of (n, m, sub_eq)
satisfy the property that n - m = sub_eq
:
from result import Result
import checkpipe as pipe
from typing import Tuple
print(
[(4, 1, 3), (3, 2, 1), (10, 5, 5), (1, 3, 0)]
| pipe.OfIter[Tuple[int, int, int]]
.check(pipe.tup3_unpack(lambda n, m, sub_eq:
n - m == sub_eq
))
| pipe.OfIter[Result[Tuple[int, int, int], Tuple[int, int, int]]]
.to_list()
)
[Ok((4, 1, 3)), Ok((3, 2, 1)), Ok((10, 5, 5)), Err((1, 3, 0))]
We are able to compute what elements of our iterable satisfy the property, but what if we expect that they all must satisfy it? Then we can flatten the error:
import checkpipe as pipe
from typing import Tuple
print(
[(4, 1, 3), (3, 2, 1), (10, 5, 5), (1, 3, 0)]
| pipe.OfIter[Tuple[int, int, int]]
.check(pipe.tup3_unpack(lambda n, m, sub_eq:
n - m == sub_eq
))
| pipe.OfResultIter[Tuple[int, int, int], Tuple[int, int, int]]
.flatten()
)
Err((1, 3, 0))
As expected, we get the error variant, communicating that the entire list did not satisfy the property for all of its elements. Here's what we get if they do satisfy:
import checkpipe as pipe
from typing import Tuple
print(
[(4, 1, 3), (3, 2, 1), (10, 5, 5), (3, 1, 2)]
| pipe.OfIter[Tuple[int, int, int]]
.check(pipe.tup3_unpack(lambda n, m, sub_eq:
n - m == sub_eq
))
| pipe.OfResultIter[Tuple[int, int, int], Tuple[int, int, int]]
.flatten()
)
Ok([(4, 1, 3), (3, 2, 1), (10, 5, 5), (3, 1, 2)])
Similarly to flattening of Results, sometimes we may map an element in an iterator to None, and we want to guarantee our final list consumed has no Nones in it or it is entirely None. Here is the example:
import checkpipe as pipe
from typing import Tuple
print(
[(4, 1, 3), (3, 2, 1), (10, 5, 5), (1, 3, 2)]
| pipe.OfIter[Tuple[int, int, int]]
.map(pipe.tup3_unpack(lambda n, m, sub_eq:
(n, m, sub_eq) if n - m == sub_eq else None
))
| pipe.OfOptionalIter[Tuple[int, int, int]]
.flatten()
)
None
And the valid example:
import checkpipe as pipe
from typing import Tuple
print(
[(4, 1, 3), (3, 2, 1), (10, 5, 5), (3, 1, 2)]
| pipe.OfIter[Tuple[int, int, int]]
.map(pipe.tup3_unpack(lambda n, m, sub_eq:
(n, m, sub_eq) if n - m == sub_eq else None
))
| pipe.OfOptionalIter[Tuple[int, int, int]]
.flatten()
)
[(4, 1, 3), (3, 2, 1), (10, 5, 5), (3, 1, 2)]
checkpipe comes with support for unpacking tuples of limited size while specifying the types of each element:
import checkpipe as pipe
from typing import Tuple
print(
(4, 2, 'Hello ')
| pipe.Of[Tuple[int, int, str]]
.map(pipe.tup3_unpack(lambda num_underscores, repeat, text:
'"' + ('_' * num_underscores) + (repeat * text) + '"'
))
)
"____Hello Hello "
You can also use the pipe.tupN_unpack functions within a pipe.OfIter[T].map
for instance:
import checkpipe as pipe
from typing import Tuple
print(
[(4, 1, 3), (3, 2, 1), (10, 5, 5), (1, 3, 0)]
| pipe.OfIter[Tuple[int, int, int]]
.map(pipe.tup3_unpack(lambda n, m, sub_eq:
n - m == sub_eq
))
| pipe.OfIter[bool]
.to_list()
)
[True, True, True, False]
We often want to tag an index alongside our data as we iterate. Here is an example:
import checkpipe as pipe
from typing import Tuple
print(
['a', 'b', 'c']
| pipe.OfIter[str]
.enumerate()
| pipe.OfIter[Tuple[int, str]]
.map(pipe.tup2_unpack(lambda i, c:
'X' if i == 1 else c
))
| pipe.OfIter[str]
.to_list()
)
['a', 'X', 'c']
In some cases, our source already has tuple elements and we want to enumerate it, and so our tuples get nested. We can flatten our tuple types.
import checkpipe as pipe
from typing import Tuple
print(
[('a', 'aa', 'aaa'), ('b', 'bb', 'bbb'), ('c', 'cc', 'ccc')]
| pipe.OfIter[Tuple[str, str, str]]
.enumerate()
| pipe.OfIter[Tuple[int, Tuple[str, str, str]]]
.map(pipe.tup2_right_tup3_flatten)
| pipe.OfIter[Tuple[int, str, str, str]]
.map(pipe.tup4_unpack(lambda i, c, cc, ccc:
('d', 'dd', 'ddd') if i == 1 else (c, cc, ccc)
))
| pipe.OfIter[str]
.to_list()
)
[('a', 'aa', 'aaa'), ('d', 'dd', 'ddd'), ('c', 'cc', 'ccc')]
import checkpipe as pipe
from checkpipe import Pipe
from typing import Callable, Iterable
@Pipe
def multiply_by_num(num: int) -> Callable[[Iterable[int]], Iterable[int]]:
def inner(source: Iterable[int]) -> Iterable[int]:
return map(lambda n: n * num, source)
return inner
print(
[1, 2, 3]
| multiply_by_num(3)
| pipe.OfIter[int]
.to_list()
)
[3, 6, 9]
Here we create a new function that could utilize the pipe operator |
, multiply_by_num
. It defines an inner function which takes a source, Iterable[int]
, and it maps it to another Iterable[int]
via the builtin map function.
If we want to utilize generics to create a more type-general pipe function, we could use typevars to infer types from the arguments passed into the function. If we want to inform the function about a more generic source type, we can wrap it in a class then inform of it the expected source type through the class like this:
import checkpipe as pipe
from checkpipe import Pipe
from typing import Generic, TypeVar, Callable, Iterable
T = TypeVar('T')
class Repeat(Generic[T]):
@Pipe
@staticmethod
def repeat(n: int) -> Callable[[Iterable[T]], Iterable[T]]:
def inner(source: Iterable[T]) -> Iterable[T]:
for item in source:
for _ in range(n):
yield item
return inner
print(
['a', 'b', 'c']
| Repeat[str]
.repeat(3)
| pipe.OfIter[str]
.to_list()
)
['a', 'a', 'a', 'b', 'b', 'b', 'c', 'c', 'c']
The pipes are type-safe and they can be checked by mypy. checkpipe cannot
automatically infer the source type from the left of the |
. By specifiying Repeat[str]
, mypy knows
that when the source ['a', 'b', 'c']
is piped to Repeat, that it must comply to being an Iterable[str]
or mypy will error.
If this project brings value to you, please consider supporting me with a monthly sponsorship or buying me a coffee
All contributions are welcome! I would appreciate feedback on improving the library and optimizing for use cases I haven't thought of yet! Please feel free to contact me by opening an issue ticket or emailing lanhikarixx@gmail.com if you want to chat.
This theme is licensed under the MIT license © Mohammed Alzakariya.
FAQs
Brings functional programming data pipelines with robust validation to python and mypy
We found that checkpipe demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket MCP brings real-time security checks to AI-generated code, helping developers catch risky dependencies before they enter the codebase.
Security News
As vulnerability data bottlenecks grow, the federal government is formally investigating NIST’s handling of the National Vulnerability Database.
Research
Security News
Socket’s Threat Research Team has uncovered 60 npm packages using post-install scripts to silently exfiltrate hostnames, IP addresses, DNS servers, and user directories to a Discord-controlled endpoint.