
Security News
Software Engineering Daily Podcast: Feross on AI, Open Source, and Supply Chain Risk
Socket CEO Feross Aboukhadijeh joins Software Engineering Daily to discuss modern software supply chain attacks and rising AI-driven security risks.
threaded_map_reduce
Advanced tools
A Python library that implements map, unordered map, and map-reduce using threads.
This library is intented to be performant for CPU-bound tasks. The map implementation has been tested to be much more performant than the map method of the futures.ThreadPoolExecutor class of the standar library.
mapmap_unorderedmap_reducepip install threaded-map-reduce
(Or, if you use uv:)
uv pip install threaded-map-reduce
from threaded_map_reduce import map
def square(x):
return x * x
nums = range(1000)
result = list(threaded_map(square, nums, num_computing_threads=4, chunk_size=100))
print(result[-10:])
Faster, but order is not preserved:
from threaded_map_reduce import map_unordered
nums = range(1000)
result = list(map_unordered(square, nums, num_computing_threads=4, chunk_size=100))
print(sorted(result))
print(result[-10:])
Useful for reductions such as sums, counts, or any associative operation.
from operator import add
from threaded_map_reduce import map_reduce
def square(x):
return x * x
nums = range(0, 1000)
result = map_reduce(square, add, nums,
num_computing_threads=4,
chunk_size=100)
print(result)
threaded_map(map_fn, items: Iterable, num_computing_threads: int, chunk_size: int = (100,)Runs map_fn over every item in parallel and yields results keeping input order.
threaded_map(map_fn, items: Iterable, num_computing_threads: int, chunk_size: int = (100,)Same as above, but yields items in any order.
map_reduce(map_fn, reduce_fn, iterable: Iterable, num_computing_threads: int, chunk_size: int = 100,)Maps items in parallel, reduces mapped chunks using the provided reducer function, and returns a single result.
The map_reduce performance was tested with the following test:
is_prime implementation for all tests (see the code at the end of this section)To study the performance of the map function implemented in this library we ran a similar benchmark:
is_prime implementation for all tests (see the code at the end of this section)The performance of this library’s map implementation improves with the number of threads. For this task there is no appreciable difference between the ordered and unordered maps.
Compared with the ideal scaling, for two and four threads our implementation is 1.19 and 1.54 times slower respectively. In an ideal scenario, the runtime with N threads to perform the task should be the time with just one thread (using the standard non-threaded map implementation) divided by N.
The chunk size is a critical parameter for performance. Both map and map_reduce process items in chunks, and the parallelization overhead depends strongly on how many items are grouped into each chunk.
Using very small chunks (e.g. one item per chunk) usually produces poor performance, because the cost of thread scheduling and queue operations dominates the useful work. Using very large chunks eventually stops giving additional speedups, and may increase memory usage.
We used the same banchmark as in the previous map section. A chunk size of 1 is 6.7Ă— slower than a chunk size of 500, and increasing the chunk size beyond 500 does not reduce the runtime further. The optimal chunk size depends on the specific workload.
Note that each chunk is materialized as a list before being processed, so larger chunk sizes increase memory usage.
Time used to carry out the task for different chunk sizes:
mapWe compared, using the same mapping benchmark, the performance of this library's map implementation with the one available in the ThreadPoolExecutor found in the futures module of the standard library:
In this benchmark, the ThreadPoolExecutor.map implementation is 4.11 (1 thread) times slower than the threaded_map implementation when using a single
thread, and its performance degrades further as the number of threads
increases (e.g. 5.99 slower with (4 threads)).
The is_prime function used was:
def is_prime(n):
if n == 1:
return False
elif n == 2 or n == 3:
return True
elif n % 2 == 0:
return False
elif n < 9:
return True
elif n % 3 == 0:
return False
r = int(sqrt(n))
for f in range(5, r + 1, 6):
if n % f == 0:
return False
elif n % (f + 2) == 0:
return False
return True
map-reduce alternative implementationsThe map-reduce function was implemented using different architectures to check which one was the most performant.
Architecture:
Future objects are used by the reduce function.The result of this approach was pretty bad, the execution time was around 10 times larger than with a naive non-threaded approach and it did not improved wit the number of threads.
This is the code for this approach.
def map_reduce_with_executor_naive(
map_fn,
reduce_fn,
items,
max_workers,
initial_reduce_value=None,
):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
reduced_result = initial_reduce_value
for item in items:
future = executor.submit(map_fn, item)
futures.append(future)
if len(futures) > max_workers:
future = futures.pop()
result = future.result()
reduced_result = reduce_fn(result, reduced_result)
for future in futures:
reduced_result = reduce_fn(future.result(), reduced_result)
return reduced_result
Another attempt using ThreadPoolExcutor was done, but instead of mapping every item, they were packed in chunks and those chunks were then mapped and reduced.
The time in this case was similar to the non-threaded one, but it did not improved with the number of threads.
Iterators are not thread safe in Python, but we can use a lock to solve the issue.
class ThreadSafeIterator(Iterator):
def __init__(self, it):
self._it = iter(it)
self._lock = threading.Lock()
def __next__(self):
with self._lock:
return next(self._it)
Now that we have a thread safe iterator we can spawn a set of computing threads that will do the map and reduction put their results in a queue and, finally, an additional step to reduce the results of each computing thread.
The time with 1 thread in this implementation is similar to the non-threaded map-reduce, and the time was cut down when 2 or 3 computing threads were added, but it did not improved with more threads.
In the implementation based on the thread-safe iterator the lock used to yield each item might be the bottleneck, so more computing threads might not improve the performance because they will be waiting to get an item to process.
The approach used used by this library is similar to the previous one, but instead of processing items, it processes chunks of items. The class in charge of delivering the chunks to be mapped and reduced makes sure that the chunk yielding is thread-safe by using a lock.
class _ChunkDispenser(Iterator):
def __init__(self, it, chunk_size):
self._it = iter(it)
self._lock = threading.Lock()
self._chunk_size = chunk_size
def __next__(self):
with self._lock:
chunk = list(itertools.islice(self._it, self._chunk_size))
if not chunk:
raise StopIteration
else:
return chunk
The chunks are then mapped and reduced by a pool of computing threads that put their final reduced result into a queue that the main thread reduces when their computing is done.
This implementation gave the best result.
Other more complex approaches we also tried, but none worked as well as the previous one.
MIT License.
FAQs
A threaded version of map reduce
We found that threaded_map_reduce demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Socket CEO Feross Aboukhadijeh joins Software Engineering Daily to discuss modern software supply chain attacks and rising AI-driven security risks.

Security News
GitHub has revoked npm classic tokens for publishing; maintainers must migrate, but OpenJS warns OIDC trusted publishing still has risky gaps for critical projects.

Security News
Rust’s crates.io team is advancing an RFC to add a Security tab that surfaces RustSec vulnerability and unsoundness advisories directly on crate pages.