Grain
A scheduler for resource-aware parallel external computing on clusters.
Install
pip install grain-scheduler
Overview
Dask-like async-native delayed objects for you to run jobs in an arbitary mix of parallel and sequential manner.
from grain.delayed import delayed
from grain.resource import Cores
@delayed
async def identity(x):
n_cpu, cpus = GVAR.res.N, ','.join(map(str,GVAR.res.c))
await trio.run_process(f'mpirun -np {n_cpu} --bind-to cpulist:ordered --cpu-set {cpu} sleep 1')
return x
@delayed
async def weighted_sum():
r_ = (identity @ Cores(1))(0)
r_ += (identity @ Cores(1))(1) * 1 + (identity @ Cores(1))(2) * 2
return await r_
print(await (weighted_sum() + weighted_sum()))
Check out tutorial for complete demos and how to set up workers.
Resource-awareness
Every job in the job queue has a resource request infomation along with the job to run. Before the executor run each job, it inspects each worker for resource availability. If resource is insufficient, the job queue is suspended until completed jobs return resources. Resources can be CPU cores, virtual memory, both, (or anything user defined following interface grain.resource.Resource
).
Every time a job function runs, it has access to GVAR.res
, a context-local variable giving the information of specific resource dedicated to the job. (e.g. if a job is submitted with Cores(3)
, asking for 3 CPU cores, it might receive allocation like Cores([6,7,9])
.)
Ergonomic user interface
Async-native API introduces minimal changes to the serial code, while enabling access to the entire Python async ecosystem accommodating complex workflows.
Minimal configuration is needed for migrating to a new supercomputing cluster (See sample configs in the example/
dir). When running, the grain
commandline helper provides easy access to dashboard and worker scaling.