
Product
Introducing Scala and Kotlin Support in Socket
Socket now supports Scala and Kotlin, bringing AI-powered threat detection to JVM projects with easy manifest generation and fast, accurate scans.
daskqueue is small python library built on top of Dask and Dask Distributed that implements a very lightweight Distributed Task queue.
Think of this library as a simpler version of Celery built entirely on Dask. Running Celery on HPC environnment (for instance) is usually very tricky whereas spawning a Dask Cluster is a lot easier to manage, debug and cleanup.
Dask is an amazing library for parallel computing written entirely in Python. It is easy to install and offer both a high level API wrapping common collections (arrays, bags, dataframes) and a low level API for written custom code (Task graph with Delayed and Futures).
For all its greatness, Dask implements a central scheduler (basically a simple tornado eventloop) involved in every decision, which can sometimes create a central bottleneck. This is a pretty serious limitation when trying use Dask in high throughput situation. A simple Task Queue is usually the best approach when trying to distribute millions of tasks.
The daskqueue python library leverages Dask Actors to implement distributed queues with a simple load balancer QueuePool
and a Consummer
class to consumme message from these queues.
We used Actors because:
Actors are stateful, they can hold on to and mutate state. They are allowed to update their state in-place, which is very useful when spawning distributed queues !
NO CENTRAL SCHEDULING NEEDED : Operations on actors do not inform the central scheduler, and so do not contribute to the 4000 task/second overhead. They also avoid an extra network hop and so have lower latencies. Actors can communicate between themselves in a P2P manner, which makes it pretty neat when having a huge number of queues and consummers.
Note : Dask provides a Queue implementation but they are mediated by the central scheduler, and so they are not ideal for sending large amounts of data (everything you send will be routed through a central point) and add additionnal overhead on the scheduler when trying to put millions of tasks.
daskqueue requires Python 3.6 or newer. You can install manually by cloning the repository:
$ pip install daskqueue
This simple example show how to copy files in parallel using Dask workers and a distributed queue:
from distributed import Client
from daskqueue import QueuePool, ConsumerPool
from daskqueue.utils import logger
def process_item():
return sum(i * i for i in range(10**8))
if __name__ == "__main__":
client = Client(
n_workers=5,
# task function doesn't release the GIL
threads_per_worker=1,
direct_to_workers=True,
)
## Params
n_queues = 1
n_consumers = 5
queue_pool = QueuePool(client, n_queues=n_queues)
consumer_pool = ConsumerPool(client, queue_pool, n_consumers=n_consumers)
consumer_pool.start()
for i in range(5):
queue_pool.submit(process_item)
# Wait for all work to be done
consumer_pool.join()
## Get results
results = consumer_pool.results()
Take a look at the examples/
folder to get some usage.
You should think of daskqueue as a very simple distributed version of aiomultiprocessing. We have these basic classes:
Queue
: The daskqueue library provides two queue types :
TransientQueue
: The default queue class. The submitted messages are appended to an in memory FIFO queue.DurableQueue
: This is a disk backed queue for persisting the messages. The tasks are served in FIFO manner. Durable queues append serialized message to a fixed-sized file called LogSegment
. The durable queues also append queue operations to and an IndexSegment
. The index segment is a combination of a bitcask index for segment offsets and WAL file : it is an append only file where we record message status after each queue operation (ready, delivered, acked and failed) and an offset to the message in on of the LogSegments
QueuePoolActor
: Basic Pool actor, it holds a reference to queues and their sizes. It interfaces with the Client and the Consummers. The QueuePool implements a round robin batch submit.
ConsumerBaseClass
: Abstract class interfaces implementing all the fetching logic for you worker. The Consumers have a start()
method where we run an unfinite fetch loop to pop items from queue assigned by QueuePool. The consumer directly communicate with the Queue, providing highly scalable workflows. The Consummer will then get an item from the queue and schedule process_item
on the dask worker's ThreadPoolExecutor, freeing the worker's eventloop to communicate with the scheduler, fetch tasks asynchronously etc ....
The daskqueue library is very well suited for IO bound jobs: by running multiple consummers and queues, communication asynchronously, we can bypass the dask scheduler limit and process **millions of tasks 🥰 !! **
The example copy code above was ran on cluster of 20 consummers and 5 queues. The tasks ran are basic file copy between two location (copying form NFS filer). We copied 200 000 files (~ 1.1To) without ever breaking a sweat !
We can clearly see the network saturation:
Looking at the scheduler metrics, we can have a mean of 19.3%
You can take a look at the benchmark/
directory for various benchmarks ran using daskqueue
vs dask
:
As for the limitation, given the current implementation, you should be mindfull of the following limitations (this list will be updated regularly):
concurrency_limit
as the maximum number of active, concurrent jobs each worker process will pick up from its queue at once.Contributions are what makes the open-source community such an amazing place to learn, inspire, and create. This project is still very very rough! Any contributions you make will benefit everybody else and are greatly appreciated 😍 😍 😍 !
Please try to create bug reports that are:
Releases are published automatically when a tag is pushed to GitHub.
git checkout master
git pull
# Set next version number
export RELEASE=x.x.x
# Create tags
git commit --allow-empty -m "Release $RELEASE"
git tag -a $RELEASE -m "Version $RELEASE"
# Push
git push upstream --tags
daskqueue is copyright Amine Dirhoussi, and licensed under
the MIT license. I am providing code in this repository to you under an open
source license. This is my personal repository; the license you receive to
my code is from me and not from my employer. See the LICENSE
file for details.
FAQs
daskqueue distributed queue package
We found that daskqueue demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket now supports Scala and Kotlin, bringing AI-powered threat detection to JVM projects with easy manifest generation and fast, accurate scans.
Application Security
/Security News
Socket CEO Feross Aboukhadijeh and a16z partner Joel de la Garza discuss vibe coding, AI-driven software development, and how the rise of LLMs, despite their risks, still points toward a more secure and innovative future.
Research
/Security News
Threat actors hijacked Toptal’s GitHub org, publishing npm packages with malicious payloads that steal tokens and attempt to wipe victim systems.