threadx - Create elegant data transformation pipelines.
It lets you thread values through a sequence of operations with a sense of clarity and simplicity that feels natural. And it all revolves around two key elements:
- thread: Passes the result of each step as the input to the next.
- x: A smart placeholder that knows exactly where to inject the previous result, whether in a method call, item lookup, or even unpacking.
Here’s what it looks like in action:
from threadx import thread, x
thread('./data.log',
read_file,
x.splitlines,
(map, x.strip, x),
(map, json.loads, x),
(map, x['time'], x),
sum)
What’s happening here? The file content is being read, split, stripped, converted to JSON, and the execution-time summed—all in a linear and readable way. No intermediary variables, no nesting, just the data flowing from one step to the next.
The data.log
file (generated by inspector) contains entries like this:
{"time": 12000, "fn": "foo", ...}
{"time": 12345, "fn": "bar", ...}
What Makes threadx Interesting?
- Readable Flow: Instead of diving into layers of nested calls, you write each transformation as a clear, sequential step.
- The
x
Factor: x
acts as a placeholder for where the output of the previous step goes. It’s surprisingly flexible, supporting method calls, attribute/item lookups, and more. - No Extra Variables: Avoid the noise of intermediate variables or lambda functions. Your transformations stay clean and minimal.
Table of Contents
Install
pip install threadx
Usage
Import
from threadx import thread, x, stop
Pass result as first argument
thread
allows you to pass the result of the previous step automatically as the first argument in each new function:
thread([1, 2, 3],
sum,
str)
Or, be explicit about it:
thread([1, 2, 3],
(sum, x),
(str, x))
Pass x as nth argument
Want to pass the result into a different argument position? No problem:
thread(10,
(range, x, 20, 3),
list)
thread(20,
(range, 10, x, 3),
list)
thread(3,
(range, 10, 20, x),
list)
Unpacking arguments
Unpacking works as usual
thread([10, 20],
(range, *x, 3),
list)
Method call
Use x.method_name
for method calls, just like magic.
thread(['a', 'b'],
(x.index, 'a'))
thread(['a', 'b'],
(x.count, 'b'))
Attribute lookup
Use x.attribute_name
to lookup class and instance attributes.
thread({'a': 1, 'b': 2},
x.keys,
list)
Getting Item And Slicing
data = {'a': {'b': [1, 2, 3, 4]}}
thread(data,
x['a'],
x['b'][0])
thread(data,
x['a']['b'][:2])
Debugging
Easily inspect intermediate results using stop
. Usefull for debugging.
thread(data,
x['a'],
x['b'],
stop,
sum,
str)
Fewer lambdas
Remove verbose lambdas in simple cases.
data = [[1, 2, 3, 4], [10, 20, 30, 40]]
thread(data,
(map, lambda i: i[0], x),
list)
thread(data,
(map, x[0], x),
list)
thread(range(12),
(filter, lambda i: i % 2 == 0, x),
list)
thread(range(12),
(filter, x % 2 == 0, x),
list)
Build data transformation pipeline
pipeline = (read_file,
x.splitlines,
(map, x.strip, x),
(map, json.loads, x),
(map, x['time'], x),
sum)
thread('./data.log', *pipeline)
Why I Built This
After spending a few years working with Clojure, I found myself missing its threading macros when I returned to Python (for a side project). Sure, Python has some tools for chaining operations, but nothing quite as elegant or powerful as what I was used to.