You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

unidist

Package Overview
Dependencies
Maintainers
2
Versions
13
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

unidist - pypi Package Compare versions

Comparing version
0.2.2
to
0.3.0
+69
unidist/core/backends/mpi/core/async_operations.py
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
import unidist.core.backends.mpi.core.common as common
logger = common.get_logger("async_operations", "async_operations.log")
class AsyncOperations:
"""
Class that stores MPI async communication handlers.
Class holds a reference to sending data to prolong data lifetime during send operation.
"""
__instance = None
def __init__(self):
# I-prefixed mpi call handlers
self._send_async_handlers = []
@classmethod
def get_instance(cls):
"""
Get instance of ``AsyncOperations``.
Returns
-------
AsyncOperations
"""
if cls.__instance is None:
cls.__instance = AsyncOperations()
return cls.__instance
def extend(self, handlers_list):
"""
Extend internal list with `handler_list`.
Parameters
----------
handler_list : list
A list of pairs with handler and data reference.
"""
self._send_async_handlers.extend(handlers_list)
def check(self):
"""Check all MPI async send requests readiness and remove a reference to sending data."""
def is_ready(handler):
is_ready = handler.Test()
if is_ready:
logger.debug("CHECK ASYNC HANDLER {} - ready".format(handler))
else:
logger.debug("CHECK ASYNC HANDLER {} - not ready".format(handler))
return is_ready
# tup[0] - mpi async send handler object
self._send_async_handlers[:] = [
tup for tup in self._send_async_handlers if not is_ready(tup[0])
]
def finish(self):
"""Cancel all MPI async send requests."""
for handler, data in self._send_async_handlers:
logger.debug("WAIT ASYNC HANDLER {}".format(handler))
handler.Cancel()
handler.Wait()
self._send_async_handlers.clear()
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Python Multiprocessing backend functionality."""
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Actor specific functionality using Python Multiprocessing backend."""
import unidist.core.backends.pymp.core as pymp
from unidist.core.base.actor import Actor, ActorMethod
from unidist.core.base.object_ref import ObjectRef
class PyMpActorMethod(ActorMethod):
"""
The class implements the interface in ``ActorMethod`` using Python Multiprocessing backend.
Parameters
----------
cls : unidist.core.backends.pymp.core.Actor
An actor class from which method `method_name` will be called.
method_name : str
The name of the method to be called.
"""
def __init__(self, cls, method_name):
self._cls = cls
self._method_name = method_name
self._num_returns = 1
def _remote(self, *args, num_returns=None, **kwargs):
"""
Execute `self._method_name` in a worker process.
Parameters
----------
*args : iterable
Positional arguments to be passed in the method.
num_returns : int, optional
Number of results to be returned. If it isn't
provided, `self._num_returns` will be used.
**kwargs : dict
Keyword arguments to be passed in the method.
Returns
-------
ObjectRef, list or None
Type of returns depends on `num_returns` value:
* if `num_returns == 1`, ``ObjectRef`` will be returned.
* if `num_returns > 1`, list of ``ObjectRef``-s will be returned.
* if `num_returns == 0`, ``None`` will be returned.
"""
if num_returns is None:
num_returns = self._num_returns
class_method = getattr(self._cls, self._method_name)
data_ids = class_method.submit(*args, num_returns=num_returns, **kwargs)
if num_returns == 1:
return ObjectRef(data_ids)
elif num_returns > 1:
return [ObjectRef(data_id) for data_id in data_ids]
elif num_returns == 0:
return None
class PyMpActor(Actor):
"""
The class implements the interface in ``Actor`` using Python Multiprocessing backend.
Parameters
----------
cls : object
Class to be an actor class.
num_cpus : int
The number of CPUs to reserve for the lifetime of the actor.
resources : dict
Custom resources to reserve for the lifetime of the actor.
"""
def __init__(self, cls, num_cpus, resources):
self._cls = cls
self._num_cpus = num_cpus
self._resources = resources
self._actor_handle = None
def __getattr__(self, name):
"""
Get the attribute `name` of the `self._cls` class.
This methods creates the ``PyMpActorMethod`` object that is responsible
for calling method `name` of the `self._cls` class remotely.
Parameters
----------
name : str
Name of the method to be called remotely.
Returns
-------
PyMpActorMethod
"""
return PyMpActorMethod(self._actor_handle, name)
def _remote(self, *args, num_cpus=None, resources=None, **kwargs):
"""
Create actor class, specific for Python Multiprocessing backend, from `self._cls`.
Parameters
----------
*args : iterable
Positional arguments to be passed in `self._cls` class constructor.
num_cpus : int, optional
The number of CPUs to reserve for the lifetime of the actor.
resources : dict, optional
Custom resources to reserve for the lifetime of the actor.
**kwargs : dict
Keyword arguments to be passed in `self._cls` class constructor.
Returns
-------
PyMpActor
"""
if num_cpus is not None or self._num_cpus is not None:
raise NotImplementedError(
"'num_cpus' is not supported yet by Python Multiprocessing backend."
)
if resources is not None or self._resources is not None:
raise NotImplementedError(
"'resources' is not supported yet by Python Multiprocessing backend."
)
self._actor_handle = pymp.Actor(self._cls, *args, **kwargs)
return self
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""An implementation of ``Backend`` interface using Python Multiprocessing backend."""
import socket
from unidist.config import CpuCount
import unidist.core.backends.pymp.core as mp
from unidist.core.backends.pymp.actor import PyMpActor
from unidist.core.backends.pymp.remote_function import (
PyMpRemoteFunction,
)
from unidist.core.base.backend import Backend
class PyMpBackend(Backend):
"""The class that implements the interface in ``Backend`` using Python Multiprocessing backend."""
@staticmethod
def make_remote_function(function, num_cpus, num_returns, resources):
"""
Define a remote function.
function : callable
Function to be a remote function.
num_cpus : int
The number of CPUs to reserve for the remote function.
num_returns : int
The number of ``ObjectRef``-s returned by the remote function invocation.
resources : dict
Custom resources to reserve for the remote function.
Returns
-------
PyMpRemoteFunction
"""
return PyMpRemoteFunction(function, num_cpus, num_returns, resources)
@staticmethod
def make_actor(cls, num_cpus, resources):
"""
Define an actor class.
cls : object
Class to be an actor class.
num_cpus : int
The number of CPUs to reserve for the lifetime of the actor.
resources : dict
Custom resources to reserve for the lifetime of the actor.
Returns
-------
PyMpActor
The actor class type to create.
list
The list of arguments for ``PyMpActor`` constructor.
"""
return PyMpActor, [cls, num_cpus, resources]
@staticmethod
def get(data_ids):
"""
Get a remote object or a list of remote objects
from distributed memory.
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
``DataID`` or a list of ``DataID`` objects to get data from.
Returns
-------
object
A Python object or a list of Python objects.
"""
return mp.get(data_ids)
@staticmethod
def put(data):
"""
Put `data` into distributed memory.
Parameters
----------
data : object
Data to be put.
Returns
-------
unidist.core.backends.common.data_id.DataID
``DataID`` matching to data.
"""
return mp.put(data)
@staticmethod
def wait(data_ids, num_returns=1):
"""
Wait until `data_ids` are finished.
This method returns two lists. The first list consists of
data IDs that correspond to objects that completed computations.
The second list corresponds to the rest of the data IDs (which may or may not be ready).
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
``DataID`` or list of ``DataID``-s to be waited.
num_returns : int, default: 1
The number of ``DataID``-s that should be returned as ready.
Returns
-------
tuple
List of data IDs that are ready and list of the remaining data IDs.
"""
return mp.wait(data_ids, num_returns=num_returns)
@staticmethod
def get_ip():
"""
Get node IP address.
Returns
-------
str
Node IP address.
"""
hostname = socket.gethostname()
return socket.gethostbyname(hostname)
@staticmethod
def num_cpus():
"""
Get the number of CPUs used by the execution backend.
Returns
-------
int
"""
return CpuCount.get()
@staticmethod
def cluster_resources():
"""
Get resources of Multiprocessing cluster.
Returns
-------
dict
Dictionary with node info in the form `{"node_ip": {"CPU": x}}`.
"""
return {PyMpBackend.get_ip(): {"CPU": PyMpBackend.num_cpus()}}
@staticmethod
def shutdown():
"""
Shutdown Python Multiprocessing execution backend.
Note
----
Not supported yet.
"""
raise NotImplementedError(
"'shutdown' is not supported yet by Python Multiprocessing backend."
)
@staticmethod
def is_initialized():
"""
Check if Python Multiprocessing backend has already been initialized.
Returns
-------
bool
True or False.
"""
return mp.is_initialized()
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Python Multiprocessing backend core functionality."""
from .actor import Actor
from .api import put, wait, get, submit, init, is_initialized
__all__ = ["Actor", "put", "wait", "get", "submit", "init", "is_initialized"]
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Actor specific functionality implemented using Python Multiprocessing."""
import cloudpickle as pkl
from multiprocessing.managers import BaseManager
from unidist.core.backends.pymp.core.object_store import ObjectStore, Delayed
from unidist.core.backends.pymp.core.process_manager import (
ProcessManager,
Task,
)
class ActorMethod:
"""
Class is responsible to execute `method_name` of
`cls_obj` in the separate worker-process of `actor` object.
Parameters
----------
cls_obj : multiprocessing.managers.BaseManager
Shared manager-class.
actor : Actor
Actor object.
method_name : str
The name of the method to be called.
obj_store : unidist.core.backends.pymp.core.object_store.ObjectStore
Object storage to share data between workers.
"""
def __init__(self, cls_obj, actor, method_name, obj_store):
self._cls_obj = cls_obj
self._method_name = method_name
self._actor = actor
self._obj_store = obj_store
def submit(self, *args, num_returns=1, **kwargs):
"""
Execute `self._method_name` asynchronously in the worker of `self._actor`.
Parameters
----------
*args : iterable
Positional arguments to be passed in the `self._method_name` method.
num_returns : int, default: 1
Number of results to be returned from `self._method_name`.
**kwargs : dict
Keyword arguments to be passed in the `self._method_name` method.
Returns
-------
unidist.core.backends.common.data_id.DataID, list or None
Type of returns depends on `num_returns` value:
* if `num_returns == 1`, ``DataID`` will be returned.
* if `num_returns > 1`, list of ``DataID``-s will be returned.
* if `num_returns == 0`, ``None`` will be returned.
"""
if num_returns == 0:
data_ids = None
elif num_returns > 1:
data_ids = [self._obj_store.put(Delayed()) for _ in range(num_returns)]
else:
data_ids = self._obj_store.put(Delayed())
cls_method = getattr(self._cls_obj, self._method_name)
task = Task(cls_method, data_ids, self._obj_store, *args, **kwargs)
self._actor.submit(task)
return data_ids
class Actor:
"""
Actor-class to execute methods of wrapped class in a separate worker.
Parameters
----------
cls : object
Class to be an actor class.
*args : iterable
Positional arguments to be passed in `cls` constructor.
**kwargs : dict
Keyword arguments to be passed in `cls` constructor.
Notes
-----
Python multiprocessing manager-class will be created to wrap `cls`.
This makes `cls` class object shared between different workers. Manager-class
starts additional process to share class state between processes.
Methods of `cls` class object are executed in the worker, grabbed from a workers pool.
"""
def __init__(self, cls, *args, **kwargs):
self._worker = None
self._worker_id = None
self._obj_store = ObjectStore.get_instance()
# FIXME : Change "WrappedClass" -> cls.__name__ + "Manager", for example.
BaseManager.register("WrappedClass", cls)
manager = BaseManager()
manager.start()
self._cls_obj = manager.WrappedClass(*args, **kwargs)
self._worker, self._worker_id = ProcessManager.get_instance().grab_worker()
def __getattr__(self, name):
"""
Get the attribute `name` of the `self._cls_obj` class.
This methods creates the ``ActorMethod`` object that is responsible
for calling method `name` of the `self._cls_obj` class remotely.
Parameters
----------
name : str
Name of the method to be called remotely.
Returns
-------
ActorMethod
"""
return ActorMethod(self._cls_obj, self, name, self._obj_store)
def submit(self, task):
"""
Execute `task` asynchronously in the worker grabbed by this actor.
Parameters
----------
task : unidist.core.backends.pymp.core.process_manager.Task
Task object holding callable function.
"""
self._worker.add_task(pkl.dumps(task))
def __del__(self):
"""
Destructor of the actor.
Free worker, grabbed from the workers pool.
"""
if self._worker_id is not None:
ProcessManager.get_instance().free_worker(self._worker_id)
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""High-level API of Python Multiprocessing backend."""
import cloudpickle as pkl
from unidist.config import CpuCount
from unidist.core.backends.pymp.core.object_store import ObjectStore, Delayed
from unidist.core.backends.pymp.core.process_manager import (
ProcessManager,
Task,
)
# The global variable is responsible for if Python Multiprocessing backend has already been initialized
is_multiprocessing_initialized = False
def init(num_workers=CpuCount.get()):
"""
Initialize shared object storage and workers pool.
Parameters
----------
num_workers : int, default: number of CPUs
Number of worker-processes to start.
Notes
-----
Run initialization of singleton objects ``unidist.core.backends.pymp.core.object_store.ObjectStore``
and ``unidist.core.backends.pymp.core.process_manager.ProcessManager``.
"""
ObjectStore.get_instance()
ProcessManager.get_instance(num_workers=num_workers)
global is_multiprocessing_initialized
if not is_multiprocessing_initialized:
is_multiprocessing_initialized = True
def is_initialized():
"""
Check if Python Multiprocessing backend has already been initialized.
Returns
-------
bool
True or False.
"""
global is_multiprocessing_initialized
return is_multiprocessing_initialized
def put(data):
"""
Put data into shared object storage.
Parameters
----------
data : object
Data to be put.
Returns
-------
unidist.core.backends.common.data_id.DataID
An ID of object in shared object storage.
"""
return ObjectStore.get_instance().put(data)
def get(data_ids):
"""
Get a object(s) associated with `data_ids` from the shared object storage.
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
An ID(s) to object(s) to get data from.
Returns
-------
object
A Python object.
"""
return ObjectStore.get_instance().get(data_ids)
def wait(data_ids, num_returns=1):
"""
Wait until `data_ids` are finished.
This method returns two lists. The first list consists of
``DataID``-s that correspond to objects that completed computations.
The second list corresponds to the rest of the ``DataID``-s (which may or may not be ready).
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
``DataID`` or list of ``DataID``-s to be waited.
num_returns : int, default: 1
The number of ``DataID``-s that should be returned as ready.
Returns
-------
tuple
List of data IDs that are ready and list of the remaining data IDs.
"""
return ObjectStore.get_instance().wait(data_ids, num_returns=num_returns)
def submit(func, *args, num_returns=1, **kwargs):
"""
Execute function in a worker process.
Parameters
----------
func : callable
Function to be executed in the worker.
*args : iterable
Positional arguments to be passed in the `func`.
num_returns : int, default: 1
Number of results to be returned from `func`.
**kwargs : dict
Keyword arguments to be passed in the `func`.
Returns
-------
unidist.core.backends.common.data_id.DataID, list or None
Type of returns depends on `num_returns` value:
* if `num_returns == 1`, ``DataID`` will be returned.
* if `num_returns > 1`, list of ``DataID``-s will be returned.
* if `num_returns == 0`, ``None`` will be returned.
"""
obj_store = ObjectStore.get_instance()
if num_returns == 0:
data_ids = None
elif num_returns > 1:
data_ids = [obj_store.put(Delayed()) for _ in range(num_returns)]
else:
data_ids = obj_store.put(Delayed())
task = Task(func, data_ids, obj_store, *args, **kwargs)
ProcessManager.get_instance().submit(pkl.dumps(task))
return data_ids
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Shared object storage related functionality."""
import cloudpickle as pkl
from multiprocessing import Manager
from unidist.core.backends.common.data_id import DataID
class Delayed:
"""Class-type that used for replacement objects during computation of those."""
pass
class ObjectStore:
"""
Class that stores objects and provides access to these from different processes.
Notes
-----
Shared storage is organized using ``multiprocessing.Manager.dict``. This is separate
process which starts work in the class constructor.
"""
__instance = None
def __init__(self):
if ObjectStore.__instance is None:
self.store_delayed = Manager().dict()
def __repr__(self):
return f"Object store: {self.store_delayed}"
@classmethod
def get_instance(cls):
"""
Get instance of ``ObjectStore``.
Returns
-------
unidist.core.backends.pymp.core.object_store.ObjectStore
"""
if cls.__instance is None:
cls.__instance = ObjectStore()
return cls.__instance
def put(self, data, data_id=None):
"""
Put `data` to internal shared dictionary.
Parameters
----------
data : object
Data to be put.
data_id : unidist.core.backends.common.data_id.DataID, optional
An ID to data. If it isn't provided, will be created automatically.
Returns
-------
unidist.core.backends.common.data_id.DataID
An ID of object in internal shared dictionary.
"""
data_id = DataID() if data_id is None else data_id
self.store_delayed[data_id] = pkl.dumps(data) if callable(data) else data
return data_id
def get(self, data_ids):
"""
Get a object(s) associated with `data_ids` from the shared internal dictionary.
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
An ID(s) of object(s) to get data from.
Returns
-------
object
A Python object.
"""
is_list = isinstance(data_ids, list)
if not is_list:
data_ids = [data_ids]
if not all(isinstance(ref, DataID) for ref in data_ids):
raise ValueError(
"`data_ids` must either be a data ID or a list of data IDs."
)
values = [None] * len(data_ids)
for idx, data_id in enumerate(data_ids):
while isinstance(self.store_delayed[data_id], Delayed):
pass
value = self.store_delayed[data_id]
if isinstance(value, Exception):
raise value
values[idx] = pkl.loads(value) if isinstance(value, bytes) else value
return values if is_list else values[0]
def wait(self, data_ids, num_returns=1):
"""
Wait until `data_ids` are finished.
This method returns two lists. The first list consists of
``DataID``-s that correspond to objects that completed computations.
The second list corresponds to the rest of the ``DataID``-s (which may or may not be ready).
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
``DataID`` or list of ``DataID``-s to be waited.
num_returns : int, default: 1
The number of ``DataID``-s that should be returned as ready.
Returns
-------
tuple
List of data IDs that are ready and list of the remaining data IDs.
"""
if not isinstance(data_ids, list):
data_ids = [data_ids]
ready = list()
not_ready = list()
for idx, data_id in enumerate(data_ids[:]):
if not isinstance(self.store_delayed[data_id], Delayed):
ready.append(data_ids.pop(idx))
if len(ready) == num_returns:
break
not_ready = data_ids
while len(ready) != num_returns:
self.get(not_ready[0])
ready.append(not_ready.pop(0))
return ready, not_ready
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Workers related functionality."""
import cloudpickle as pkl
from multiprocessing import (
Process,
JoinableQueue,
)
from unidist.config import CpuCount
from unidist.core.backends.common.data_id import DataID
from unidist.core.backends.pymp.core.object_store import ObjectStore
class Worker(Process):
"""
Class-process that executes tasks from `self.task_queue`.
Parameters
----------
task_queue : multiprocessing.JoinableQueue
A queue of task to execute.
obj_store : unidist.core.backends.pymp.core.object_store.ObjectStore
Shared object storage to read/write data.
"""
def __init__(self, task_queue, obj_store):
Process.__init__(self, daemon=True)
self.task_queue = task_queue
self._obj_store = obj_store
def run(self):
"""Run main infinite loop of process to execute tasks from `self.task_queue`."""
while 1:
task = self.task_queue.get()
task = pkl.loads(task)
data_ids = task.data_ids
try:
value = task()
except Exception as e:
if isinstance(data_ids, list) and len(data_ids) > 1:
for i, data_id in enumerate(data_ids):
self._obj_store.store_delayed[data_id] = e
else:
self._obj_store.store_delayed[data_ids] = e
else:
if data_ids is not None:
if isinstance(data_ids, list) and len(data_ids) > 1:
for data_id, val in zip(data_ids, value):
self._obj_store.store_delayed[data_id] = val
else:
self._obj_store.store_delayed[data_ids] = value
finally:
self.task_queue.task_done()
return
def add_task(self, task):
"""
Add `task` to `self.task_queue`.
Parameters
----------
task : unidist.core.backends.pymp.core.process_manager.Task
Task to be added in the queue.
"""
self.task_queue.put(task)
class ProcessManager:
"""
Class that controls worker pool and assings task to workers.
Parameters
----------
num_workers : int, optional
Number of worker-processes to start. If isn't provided,
will be equal to number of CPUs.
Notes
-----
Constructor starts `num_workers` Multiprocessing Workers.
"""
__instance = None
def __init__(self, num_workers=None):
if ProcessManager.__instance is None:
if num_workers is None:
num_workers = CpuCount.get()
self.workers = [None] * num_workers
self.grabbed_workers = [None] * num_workers
self.__class__._worker_id = 0
obj_store = ObjectStore.get_instance()
for idx in range(num_workers):
self.workers[idx] = Worker(JoinableQueue(), obj_store)
self.workers[idx].start()
self.grabbed_workers[idx] = False
@classmethod
def get_instance(cls, num_workers=None):
"""
Get instance of ``ProcessManager``.
Returns
-------
unidist.core.backends.pymp.core.process_manager.ProcessManager
"""
if cls.__instance is None:
cls.__instance = ProcessManager(num_workers=num_workers)
return cls.__instance
def _next(self):
"""
Get current worker index and move to another with incrementing by one.
Returns
-------
int
"""
idx = self.__class__._worker_id
self.__class__._worker_id += 1
if self.__class__._worker_id == len(self.workers):
self.__class__._worker_id = 0
return idx
def grab_worker(self):
"""
Grab a worker from worker pool.
Grabbed worker is marked as `blocked` and doesn't participate
in the tasks submission.
Returns
-------
unidist.core.backends.pymp.core.process_manager.Worker
Grabbed worker.
int
Index of grabbed worker.
"""
for idx, is_grabbed in enumerate(self.grabbed_workers):
if not is_grabbed:
self.grabbed_workers[idx] = True
return self.workers[idx], idx
raise RuntimeError("Actor can`t be run, no available workers.")
def free_worker(self, idx):
"""
Free worker by index `idx`.
Parameters
----------
idx : int
Index of worker to be freed.
"""
self.grabbed_workers[idx] = False
def submit(self, task):
"""
Add `task` to task queue of one of workers using round-robin.
Parameters
----------
task : unidist.core.backends.pymp.core.process_manager.Task
Task to be added in task queue.
"""
num_skipped = 0
while num_skipped < len(self.workers):
idx = self._next()
if not self.grabbed_workers[idx]:
self.workers[idx].add_task(task)
return
else:
num_skipped += 1
raise RuntimeError("Task can`t be run, no available workers.")
class Task:
"""
Class poses as unified callable object to execute in Multiprocessing Worker.
Parameters
----------
func : callable
A function to be called in object invocation.
data_ids : unidist.core.backends.common.data_id.DataID or list
``DataID``-(s) associated with result(s) of `func` invocation.
obj_store : unidist.core.backends.pymp.core.object_store.ObjectStore
Object storage to share data between workers.
*args : iterable
Positional arguments to be passed in the `func`.
**kwargs : dict
Keyword arguments to be passed in the `func`.
"""
def __init__(self, func, data_ids, obj_store, *args, **kwargs):
self._func = func
self._args = args
self._kwargs = kwargs
self.data_ids = data_ids
self.obj_store = obj_store
def __call__(self):
"""
Execute `self._func`.
If `self._args`/`self._kwargs` has ``DataID`` objects,
automaterialize happens.
Returns
-------
object
The result of `self._func` invocation.
"""
materialized_args = [
self.obj_store.get(arg) if isinstance(arg, DataID) else arg
for arg in self._args
]
materialized_kwargs = {
key: self.obj_store.get(value) if isinstance(value, DataID) else value
for key, value in self._kwargs.items()
}
return self._func(*materialized_args, **materialized_kwargs)
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""An implementation of ``RemoteFunction`` interface using Python Multiprocessing backend."""
import unidist.core.backends.pymp.core as mp
from unidist.core.base.object_ref import ObjectRef
from unidist.core.base.remote_function import RemoteFunction
class PyMpRemoteFunction(RemoteFunction):
"""
The class that implements the interface in ``RemoteFunction`` using Python Multiprocessing backend.
Parameters
----------
function : callable
A function to be called remotely.
num_cpus : int
The number of CPUs to reserve for the remote function.
num_returns : int
The number of ``ObjectRef``-s returned by the remote function invocation.
resources : dict
Custom resources to reserve for the remote function.
"""
def __init__(self, function, num_cpus, num_returns, resources):
self._remote_function = function
self._num_cpus = num_cpus
self._num_returns = 1 if num_returns is None else num_returns
self._resources = resources
def _remote(self, *args, num_cpus=None, num_returns=None, resources=None, **kwargs):
"""
Execute `self._remote_function` in a worker process.
Parameters
----------
*args : iterable
Positional arguments to be passed in the `self._remote_function`.
num_cpus : int, optional
The number of CPUs to reserve for the remote function.
num_returns : int, optional
The number of ``ObjectRef``-s returned by the remote function invocation.
resources : dict, optional
Custom resources to reserve for the remote function.
**kwargs : dict
Keyword arguments to be passed in the `self._remote_function`.
Returns
-------
ObjectRef, list or None
Type of returns depends on `num_returns` value:
* if `num_returns == 1`, ``ObjectRef`` will be returned.
* if `num_returns > 1`, list of ``ObjectRef``-s will be returned.
* if `num_returns == 0`, ``None`` will be returned.
"""
if num_cpus is not None or self._num_cpus is not None:
raise NotImplementedError(
"'num_cpus' is not supported yet by Python Multiprocessing backend."
)
if resources is not None or self._resources is not None:
raise NotImplementedError(
"'resources' is not supported yet by Python Multiprocessing backend."
)
if num_returns is None:
num_returns = self._num_returns
data_ids = mp.submit(
self._remote_function, *args, num_returns=num_returns, **kwargs
)
if num_returns == 1:
return ObjectRef(data_ids)
elif num_returns > 1:
return [ObjectRef(data_id) for data_id in data_ids]
elif num_returns == 0:
return None
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Utilities used to initialize Python Multiprocessing execution backend."""
from unidist.config import CpuCount
def initialize_pymp():
"""
Initialize the Python Multiprocessing execution backend.
Notes
-----
Number of workers for Python Multiprocessing is equal to number of CPUs used by the backend.
"""
from unidist.core.backends.pymp.core import init
init(num_workers=CpuCount.get())
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Python Sequential backend functionality."""
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Actor specific functionality using Python Sequential backend."""
import unidist.core.backends.pyseq.core as py
from unidist.core.base.actor import Actor, ActorMethod
from unidist.core.base.object_ref import ObjectRef
class PySeqActorMethod(ActorMethod):
"""
The class implements the interface in ``ActorMethod`` using Python Sequential backend.
Parameters
----------
cls : object
An actor class from which method `method_name` will be called.
method_name : str
The name of the method to be called.
"""
def __init__(self, cls, method_name):
self._cls = cls
self._method_name = method_name
self._num_returns = 1
def _remote(self, *args, num_returns=None, **kwargs):
"""
Execute `self._method_name`.
Parameters
----------
*args : iterable
Positional arguments to be passed in the method.
num_returns : int, optional
Number of results to be returned. If it isn't
provided, `self._num_returns` will be used.
**kwargs : dict
Keyword arguments to be passed in the method.
Returns
-------
ObjectRef, list or None
Type of returns depends on `num_returns` value:
* if `num_returns == 1`, ``ObjectRef`` will be returned.
* if `num_returns > 1`, list of ``ObjectRef``-s will be returned.
* if `num_returns == 0`, ``None`` will be returned.
"""
if num_returns is None:
num_returns = self._num_returns
class_method = getattr(self._cls, self._method_name)
data_ids = py.submit(class_method, *args, num_returns=num_returns, **kwargs)
if num_returns == 1:
return ObjectRef(data_ids)
elif num_returns > 1:
return [ObjectRef(data_id) for data_id in data_ids]
elif num_returns == 0:
return None
class PySeqActor(Actor):
"""
The class implements the interface in ``Actor`` using Python Sequential backend.
Parameters
----------
cls : object
Class to be an actor class.
num_cpus : int
The number of CPUs to reserve for the lifetime of the actor.
resources : dict
Custom resources to reserve for the lifetime of the actor.
"""
def __init__(self, cls, num_cpus, resources):
self._cls = cls
self._num_cpus = num_cpus
self._resources = resources
self._actor_handle = None
def __getattr__(self, name):
"""
Get the attribute `name` of the `self._cls` class.
This method creates the ``PySeqActorMethod`` object that is responsible
for calling method `name` of the `self._cls` class.
Parameters
----------
name : str
Name of the method to be called.
Returns
-------
PySeqActorMethod
"""
return PySeqActorMethod(self._actor_handle, name)
def _remote(self, *args, num_cpus=None, resources=None, **kwargs):
"""
Create actor class, specific for Python Sequential backend, from `self._cls`.
Parameters
----------
*args : iterable
Positional arguments to be passed in `self._cls` class constructor.
num_cpus : int, optional
The number of CPUs to reserve for the lifetime of the actor.
resources : dict, optional
Custom resources to reserve for the lifetime of the actor.
**kwargs : dict
Keyword arguments to be passed in `self._cls` class constructor.
Returns
-------
PySeqActor
"""
if num_cpus is not None or self._num_cpus is not None:
raise NotImplementedError(
"'num_cpus' is not supported by Python Sequential backend."
)
if resources is not None or self._resources is not None:
raise NotImplementedError(
"'resources' is not supported by Python Sequential backend."
)
self._actor_handle = self._cls(*args, **kwargs)
return self
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""An implementation of ``Backend`` interface using Python Sequential backend."""
import socket
import unidist.core.backends.pyseq.core as py
from unidist.core.backends.pyseq.actor import PySeqActor
from unidist.core.backends.pyseq.remote_function import (
PySeqRemoteFunction,
)
from unidist.core.base.backend import Backend
class PySeqBackend(Backend):
"""The class that implements the interface in ``Backend`` using Python Sequential backend."""
@staticmethod
def make_remote_function(function, num_cpus, num_returns, resources):
"""
Define ``PySeqRemoteFunction``.
function : callable
Function to be ``PySeqRemoteFunction``.
num_cpus : int
The number of CPUs to reserve for ``PySeqRemoteFunction``.
num_returns : int
The number of ``ObjectRef``-s returned by the function invocation.
resources : dict
Custom resources to reserve for the function.
Returns
-------
PySeqRemoteFunction
"""
return PySeqRemoteFunction(function, num_cpus, num_returns, resources)
@staticmethod
def make_actor(cls, num_cpus, resources):
"""
Define an actor class.
cls : object
Class to be an actor class.
num_cpus : int
The number of CPUs to reserve for the lifetime of the actor.
resources : dict
Custom resources to reserve for the lifetime of the actor.
Returns
-------
PySeqActor
The actor class type to create.
list
The list of arguments for ``PySeqActor`` constructor.
"""
return PySeqActor, [cls, num_cpus, resources]
@staticmethod
def get(data_ids):
"""
Get an object or a list of objects from object store.
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
``DataID`` or a list of ``DataID`` objects to get data from.
Returns
-------
object
A Python object or a list of Python objects.
"""
return py.get(data_ids)
@staticmethod
def put(data):
"""
Put `data` into object store.
Parameters
----------
data : object
Data to be put.
Returns
-------
unidist.core.backends.common.data_id.DataID
``DataID`` matching to data.
"""
return py.put(data)
@staticmethod
def wait(data_ids, num_returns=1):
"""
Wait until `data_ids` are finished.
This method returns two lists. The first list consists of
data IDs that correspond to objects that completed computations.
The second list corresponds to the rest of the data IDs.
Parameters
----------
object_refs : unidist.core.backends.common.data_id.DataID or list
``DataID`` or list of ``DataID``-s to be waited.
num_returns : int, default: 1
The number of ``DataID``-s that should be returned as ready.
Returns
-------
tuple
List of data IDs that are ready and list of the remaining data IDs.
Notes
-----
Method serves to maintain behavior compatibility between backends. All objects
completed computation before putting into an object storage for Python Sequential backend.
"""
return data_ids[:num_returns], data_ids[num_returns:]
@staticmethod
def get_ip():
"""
Get node IP address.
Returns
-------
str
Node IP address.
"""
hostname = socket.gethostname()
return socket.gethostbyname(hostname)
@staticmethod
def num_cpus():
"""
Get the number of CPUs used by the execution backend.
Returns
-------
int
"""
return 1
@staticmethod
def cluster_resources():
"""
Get resources of the cluster.
Returns
-------
dict
Dictionary with node info in the form `{"node_ip": {"CPU": x}}`.
"""
return {PySeqBackend.get_ip(): {"CPU": PySeqBackend.num_cpus()}}
@staticmethod
def is_initialized():
"""
Check if Python Sequential backend has already been initialized.
Returns
-------
bool
True or False.
"""
return py.is_initialized()
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Python Sequential backend core functionality."""
from .api import put, get, submit, init, is_initialized
__all__ = ["put", "get", "submit", "init", "is_initialized"]
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""High-level API of Python Sequential backend."""
from unidist.core.backends.common.data_id import DataID
from unidist.core.backends.pyseq.core.object_store import ObjectStore
# The global variable is responsible for if Python Sequential backend has already been initialized
is_python_initialized = False
def init():
"""
Initialize an object storage.
Notes
-----
Run initialization of singleton object ``unidist.core.backends.pyseq.core.object_store.ObjectStore``.
"""
ObjectStore.get_instance()
global is_python_initialized
if not is_python_initialized:
is_python_initialized = True
def is_initialized():
"""
Check if Python Sequential backend has already been initialized.
Returns
-------
bool
True or False.
"""
global is_python_initialized
return is_python_initialized
def put(data):
"""
Put data into object storage.
Parameters
----------
data : object
Data to be put.
Returns
-------
unidist.core.backends.common.data_id.DataID
An ID of object in object storage.
"""
return ObjectStore.get_instance().put(data)
def get(data_ids):
"""
Get object(s) associated with `data_ids` from the object storage.
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
ID(s) to object(s) to get data from.
Returns
-------
object
A Python object.
"""
return ObjectStore.get_instance().get(data_ids)
def submit(func, *args, num_returns=1, **kwargs):
"""
Execute function.
Parameters
----------
func : callable
Function to be executed.
*args : iterable
Positional arguments to be passed in the `func`.
num_returns : int, default: 1
Number of results to be returned from `func`.
**kwargs : dict
Keyword arguments to be passed in the `func`.
Returns
-------
unidist.core.backends.common.data_id.DataID, list or None
Type of returns depends on `num_returns` value:
* if `num_returns == 1`, ``DataID`` will be returned.
* if `num_returns > 1`, list of ``DataID``-s will be returned.
* if `num_returns == 0`, ``None`` will be returned.
"""
obj_store = ObjectStore.get_instance()
materialized_args = [
obj_store.get(arg) if isinstance(arg, DataID) else arg for arg in args
]
materialized_kwargs = {
key: obj_store.get(value) if isinstance(value, DataID) else value
for key, value in kwargs.items()
}
try:
result = func(*materialized_args, **materialized_kwargs)
except Exception as e:
result = [e] * num_returns if num_returns > 1 else e
if num_returns == 0:
data_ids = None
elif num_returns > 1:
data_ids = [obj_store.put(result[idx]) for idx in range(num_returns)]
else:
data_ids = obj_store.put(result)
return data_ids
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Object storage related functionality."""
from unidist.core.backends.common.data_id import DataID
class ObjectStore:
"""Class that stores objects and provides access to these."""
__instance = None
def __init__(self):
if ObjectStore.__instance is None:
self.store = dict()
def __repr__(self):
return f"Object store: {self.store}"
@classmethod
def get_instance(cls):
"""
Get instance of ``ObjectStore``.
Returns
-------
unidist.core.backends.pyseq.core.object_store.ObjectStore
"""
if cls.__instance is None:
cls.__instance = ObjectStore()
return cls.__instance
def put(self, data, data_id=None):
"""
Put `data` to internal dictionary.
Parameters
----------
data : object
Data to be put.
data_id : unidist.core.backends.common.data_id.DataID, optional
An ID of data. If it isn't provided, will be created automatically.
Returns
-------
unidist.core.backends.common.data_id.DataID
An ID of object in internal dictionary.
"""
data_id = DataID() if data_id is None else data_id
self.store[data_id] = data
return data_id
def get(self, data_ids):
"""
Get object(s) associated with `data_ids` from the internal dictionary.
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
ID(s) of object(s) to get data from.
Returns
-------
object
A Python object.
"""
is_list = isinstance(data_ids, list)
if not is_list:
data_ids = [data_ids]
if not all(isinstance(data_id, DataID) for data_id in data_ids):
raise ValueError(
"`data_ids` must either be a data ID or a list of data IDs."
)
def check_exception(value):
if isinstance(value, Exception):
raise value
return value
values = [check_exception(self.store[data_id]) for data_id in data_ids]
return values if is_list else values[0]
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""An implementation of ``RemoteFunction`` interface using Python Sequential backend."""
import unidist.core.backends.pyseq.core as py
from unidist.core.base.object_ref import ObjectRef
from unidist.core.base.remote_function import RemoteFunction
class PySeqRemoteFunction(RemoteFunction):
"""
The class that implements the interface in ``RemoteFunction`` using Python Sequential backend.
Parameters
----------
function : callable
A function to be called.
num_cpus : int
The number of CPUs to reserve for the function.
num_returns : int
The number of ``ObjectRef``-s returned by the function invocation.
resources : dict
Custom resources to reserve for the function.
"""
def __init__(self, function, num_cpus, num_returns, resources):
self._remote_function = function
self._num_cpus = num_cpus
self._num_returns = 1 if num_returns is None else num_returns
self._resources = resources
def _remote(self, *args, num_cpus=None, num_returns=None, resources=None, **kwargs):
"""
Execute `self._remote_function`.
Parameters
----------
*args : iterable
Positional arguments to be passed in the `self._remote_function`.
num_cpus : int, optional
The number of CPUs to reserve for the function.
num_returns : int, optional
The number of ``ObjectRef``-s returned by the function invocation.
resources : dict, optional
Custom resources to reserve for the function.
**kwargs : dict
Keyword arguments to be passed in the `self._remote_function`.
Returns
-------
ObjectRef, list or None
Type of returns depends on `num_returns` value:
* if `num_returns == 1`, ``ObjectRef`` will be returned.
* if `num_returns > 1`, list of ``ObjectRef`` will be returned.
* if `num_returns == 0`, ``None`` will be returned.
"""
if num_cpus is not None or self._num_cpus is not None:
raise NotImplementedError(
"'num_cpus' is not supported by Python Sequential backend."
)
if resources is not None or self._resources is not None:
raise NotImplementedError(
"'resources' is not supported by Python Sequential backend."
)
if num_returns is None:
num_returns = self._num_returns
data_ids = py.submit(
self._remote_function, *args, num_returns=num_returns, **kwargs
)
if num_returns == 1:
return ObjectRef(data_ids)
elif num_returns > 1:
return [ObjectRef(data_id) for data_id in data_ids]
elif num_returns == 0:
return None
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Utilities used to initialize Python Sequential execution backend."""
def initialize_pyseq():
"""
Initialize the Python Sequential execution backend.
Notes
-----
All execution will happen sequentially.
"""
from unidist.core.backends.pyseq.core import init
init()
+11
-3
Metadata-Version: 2.1
Name: unidist
Version: 0.2.2
Version: 0.3.0
Summary: Unified Distributed Execution

@@ -22,4 +22,5 @@ Home-page: https://github.com/modin-project/unidist

<p align="center">
<a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/workflows/master/badge.svg" align="center"></a>
<a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a>
<a href="https://pypi.org/project/unidist/0.2.2/"><img src="https://img.shields.io/badge/pypi-0.2.2-blue.svg" alt="PyPI version" align="center"></a>
<a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a>
</p>

@@ -47,3 +48,3 @@

```bash
pip install unidist # Install unidist with dependencies for Multiprocessing and sequential Python backends
pip install unidist # Install unidist with dependencies for Python Multiprocessing and Python Sequential backends
```

@@ -142,4 +143,11 @@

### Powered by unidist
unidist is meant to be used not only directly by users to get better performance in their workloads,
but also be a core component of other libraries to power those with the performant execution backends.
Refer to `Libraries powered by unidist` section of [Using Unidist](https://unidist.readthedocs.io/en/latest/using_unidist/index.html) page
to get more information on which libraries have already been using unidist.
### Full Documentation
Visit the complete documentation on readthedocs: https://unidist.readthedocs.io.

@@ -7,4 +7,5 @@ <p align="center">

<p align="center">
<a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/workflows/master/badge.svg" align="center"></a>
<a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a>
<a href="https://pypi.org/project/unidist/0.2.2/"><img src="https://img.shields.io/badge/pypi-0.2.2-blue.svg" alt="PyPI version" align="center"></a>
<a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a>
</p>

@@ -32,3 +33,3 @@

```bash
pip install unidist # Install unidist with dependencies for Multiprocessing and sequential Python backends
pip install unidist # Install unidist with dependencies for Python Multiprocessing and Python Sequential backends
```

@@ -127,4 +128,11 @@

### Powered by unidist
unidist is meant to be used not only directly by users to get better performance in their workloads,
but also be a core component of other libraries to power those with the performant execution backends.
Refer to `Libraries powered by unidist` section of [Using Unidist](https://unidist.readthedocs.io/en/latest/using_unidist/index.html) page
to get more information on which libraries have already been using unidist.
### Full Documentation
Visit the complete documentation on readthedocs: https://unidist.readthedocs.io.

@@ -6,3 +6,3 @@ import pathlib

ray_deps = ["ray[default]>=1.4.0"]
ray_deps = ["ray[default]>=1.13.0"]
dask_deps = ["dask[complete]>=2.22.0", "distributed>=2.22.0"]

@@ -9,0 +9,0 @@ mpi_deps = ["mpi4py-mpich", "msgpack>=1.0.0"]

Metadata-Version: 2.1
Name: unidist
Version: 0.2.2
Version: 0.3.0
Summary: Unified Distributed Execution

@@ -22,4 +22,5 @@ Home-page: https://github.com/modin-project/unidist

<p align="center">
<a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/workflows/master/badge.svg" align="center"></a>
<a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a>
<a href="https://pypi.org/project/unidist/0.2.2/"><img src="https://img.shields.io/badge/pypi-0.2.2-blue.svg" alt="PyPI version" align="center"></a>
<a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a>
</p>

@@ -47,3 +48,3 @@

```bash
pip install unidist # Install unidist with dependencies for Multiprocessing and sequential Python backends
pip install unidist # Install unidist with dependencies for Python Multiprocessing and Python Sequential backends
```

@@ -142,4 +143,11 @@

### Powered by unidist
unidist is meant to be used not only directly by users to get better performance in their workloads,
but also be a core component of other libraries to power those with the performant execution backends.
Refer to `Libraries powered by unidist` section of [Using Unidist](https://unidist.readthedocs.io/en/latest/using_unidist/index.html) page
to get more information on which libraries have already been using unidist.
### Full Documentation
Visit the complete documentation on readthedocs: https://unidist.readthedocs.io.

@@ -5,3 +5,3 @@ packaging

[all]
ray[default]>=1.4.0
ray[default]>=1.13.0
dask[complete]>=2.22.0

@@ -21,2 +21,2 @@ distributed>=2.22.0

[ray]
ray[default]>=1.4.0
ray[default]>=1.13.0

@@ -43,2 +43,3 @@ AUTHORS

unidist/core/backends/mpi/core/__init__.py
unidist/core/backends/mpi/core/async_operations.py
unidist/core/backends/mpi/core/common.py

@@ -55,3 +56,2 @@ unidist/core/backends/mpi/core/communication.py

unidist/core/backends/mpi/core/worker/__init__.py
unidist/core/backends/mpi/core/worker/async_operations.py
unidist/core/backends/mpi/core/worker/loop.py

@@ -61,20 +61,20 @@ unidist/core/backends/mpi/core/worker/object_store.py

unidist/core/backends/mpi/core/worker/task_store.py
unidist/core/backends/multiprocessing/__init__.py
unidist/core/backends/multiprocessing/actor.py
unidist/core/backends/multiprocessing/backend.py
unidist/core/backends/multiprocessing/remote_function.py
unidist/core/backends/multiprocessing/utils.py
unidist/core/backends/multiprocessing/core/__init__.py
unidist/core/backends/multiprocessing/core/actor.py
unidist/core/backends/multiprocessing/core/api.py
unidist/core/backends/multiprocessing/core/object_store.py
unidist/core/backends/multiprocessing/core/process_manager.py
unidist/core/backends/python/__init__.py
unidist/core/backends/python/actor.py
unidist/core/backends/python/backend.py
unidist/core/backends/python/remote_function.py
unidist/core/backends/python/utils.py
unidist/core/backends/python/core/__init__.py
unidist/core/backends/python/core/api.py
unidist/core/backends/python/core/object_store.py
unidist/core/backends/pymp/__init__.py
unidist/core/backends/pymp/actor.py
unidist/core/backends/pymp/backend.py
unidist/core/backends/pymp/remote_function.py
unidist/core/backends/pymp/utils.py
unidist/core/backends/pymp/core/__init__.py
unidist/core/backends/pymp/core/actor.py
unidist/core/backends/pymp/core/api.py
unidist/core/backends/pymp/core/object_store.py
unidist/core/backends/pymp/core/process_manager.py
unidist/core/backends/pyseq/__init__.py
unidist/core/backends/pyseq/actor.py
unidist/core/backends/pyseq/backend.py
unidist/core/backends/pyseq/remote_function.py
unidist/core/backends/pyseq/utils.py
unidist/core/backends/pyseq/core/__init__.py
unidist/core/backends/pyseq/core/api.py
unidist/core/backends/pyseq/core/object_store.py
unidist/core/backends/ray/__init__.py

@@ -81,0 +81,0 @@ unidist/core/backends/ray/actor.py

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -11,7 +11,7 @@

{
"date": "2023-01-16T15:56:02+0100",
"date": "2023-03-22T17:08:07+0100",
"dirty": false,
"error": null,
"full-revisionid": "c8b6fce4c2a9b646968b84e508e8d647dbd40ffb",
"version": "0.2.2"
"full-revisionid": "b133787a752b622e1443f3714d07cc98d47a66fb",
"version": "0.3.0"
}

@@ -18,0 +18,0 @@ ''' # END VERSION_JSON

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -16,3 +16,3 @@ # SPDX-License-Identifier: Apache-2.0

from .backends.dask import DaskMemoryLimit, IsDaskCluster, DaskSchedulerAddress
from .backends.mpi import IsMpiSpawnWorkers, MpiHosts
from .backends.mpi import IsMpiSpawnWorkers, MpiHosts, MpiPickleThreshold
from .parameter import ValueSource

@@ -34,2 +34,3 @@

"ValueSource",
"MpiPickleThreshold",
]

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -21,4 +21,4 @@ # SPDX-License-Identifier: Apache-2.0

BackendName.MPI,
BackendName.MP,
BackendName.PY,
BackendName.PYMP,
BackendName.PYSEQ,
)

@@ -70,3 +70,3 @@

return BackendName.MPI
return BackendName.MP
return BackendName.PYMP

@@ -73,0 +73,0 @@

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -7,4 +7,4 @@ # SPDX-License-Identifier: Apache-2.0

from .envvars import IsMpiSpawnWorkers, MpiHosts
from .envvars import IsMpiSpawnWorkers, MpiHosts, MpiPickleThreshold
__all__ = ["IsMpiSpawnWorkers", "MpiHosts"]
__all__ = ["IsMpiSpawnWorkers", "MpiHosts", "MpiPickleThreshold"]

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -21,1 +21,8 @@ # SPDX-License-Identifier: Apache-2.0

varname = "UNIDIST_MPI_HOSTS"
class MpiPickleThreshold(EnvironmentVariable, type=int):
"""Minimum buffer size for serialization with pickle 5 protocol"""
default = 1024**2 // 4 # 0.25 MiB
varname = "UNIDIST_MPI_PICKLE_THRESHOLD"

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,3 +0,3 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -154,7 +154,9 @@ # SPDX-License-Identifier: Apache-2.0

"""
f_format = logging.Formatter("%(message)s")
f_handler = logging.FileHandler(file_name, delay=True)
f_handler.setFormatter(f_format)
logger = logging.getLogger(logger_name)
if not logger.hasHandlers():
f_format = logging.Formatter("%(message)s")
f_handler = logging.FileHandler(file_name, delay=True)
f_handler.setFormatter(f_format)
logger.addHandler(f_handler)
logger = logging.getLogger(logger_name)
if activate:

@@ -164,3 +166,2 @@ logger.setLevel(logging.DEBUG)

logger.setLevel(logging.NOTSET)
logger.addHandler(f_handler)

@@ -167,0 +168,0 @@ return logger

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -27,2 +27,3 @@ # SPDX-License-Identifier: Apache-2.0

from mpi4py import MPI # noqa: E402
from mpi4py.util import pkl5 # noqa: E402

@@ -174,2 +175,8 @@

Target MPI process to transfer data.
Notes
-----
This blocking send is used when we have to wait for completion of the communication,
which is necessary for the pipeline to continue, or when the receiver is waiting for a result.
Otherwise, use non-blocking ``mpi_isend_object``.
"""

@@ -214,2 +221,8 @@ comm.send(data, dest=dest_rank)

Target MPI process to transfer buffer.
Notes
-----
This blocking send is used when we have to wait for completion of the communication,
which is necessary for the pipeline to continue, or when the receiver is waiting for a result.
Otherwise, use non-blocking ``mpi_isend_buffer``.
"""

@@ -242,3 +255,3 @@ comm.send(buffer_size, dest=dest_rank)

def mpi_isend_buffer(comm, data, dest_rank):
def mpi_isend_buffer(comm, buffer_size, buffer, dest_rank):
"""

@@ -251,3 +264,5 @@ Send buffer object to another MPI rank in a non-blocking way.

MPI communicator object.
data : object
buffer_size : int
Buffer size in bytes.
buffer : object
Buffer object to send.

@@ -262,3 +277,8 @@ dest_rank : int

"""
return comm.Isend([data, MPI.CHAR], dest=dest_rank)
requests = []
h1 = comm.isend(buffer_size, dest=dest_rank)
requests.append((h1, None))
h2 = comm.Isend([buffer, MPI.CHAR], dest=dest_rank)
requests.append((h2, buffer))
return requests

@@ -320,3 +340,3 @@

def _send_complex_data_impl(comm, s_data, raw_buffers, len_buffers, dest_rank):
def _send_complex_data_impl(comm, s_data, raw_buffers, buffer_count, dest_rank):
"""

@@ -333,20 +353,25 @@ Send already serialized complex data.

Pickle buffers list, out-of-band data collected with pickle 5 protocol.
len_buffers : list
Size of each buffer from `raw_buffers` list.
buffer_count : list
List of the number of buffers for each object
to be serialized/deserialized using the pickle 5 protocol.
See details in :py:class:`~unidist.core.backends.mpi.core.serialization.ComplexDataSerializer`.
dest_rank : int
Target MPI process to transfer data.
"""
# Send message pack bytestring
mpi_send_buffer(comm, len(s_data), s_data, dest_rank)
# Send the necessary metadata
mpi_send_object(comm, len(raw_buffers), dest_rank)
for raw_buffer in raw_buffers:
mpi_send_buffer(comm, len(raw_buffer.raw()), raw_buffer, dest_rank)
# TODO: do not send if raw_buffers is zero
mpi_send_object(comm, len_buffers, dest_rank)
info = {
"s_data_len": len(s_data),
"buffer_count": buffer_count,
"raw_buffers_len": [len(sbuf) for sbuf in raw_buffers],
}
comm.send(info, dest=dest_rank)
with pkl5._bigmpi as bigmpi:
comm.Send(bigmpi(s_data), dest=dest_rank)
for sbuf in raw_buffers:
comm.Send(bigmpi(sbuf), dest=dest_rank)
def send_complex_data(comm, data, dest_rank):
"""
Send the data that consists of different user provided complex types, lambdas and buffers.
Send the data that consists of different user provided complex types, lambdas and buffers in a blocking way.

@@ -370,2 +395,8 @@ Parameters

A list of buffers amount for each object.
Notes
-----
This blocking send is used when we have to wait for completion of the communication,
which is necessary for the pipeline to continue, or when the receiver is waiting for a result.
Otherwise, use non-blocking ``isend_complex_data``.
"""

@@ -377,12 +408,12 @@ serializer = ComplexDataSerializer()

raw_buffers = serializer.buffers
len_buffers = serializer.len_buffers
buffer_count = serializer.buffer_count
# MPI comminucation
_send_complex_data_impl(comm, s_data, raw_buffers, len_buffers, dest_rank)
_send_complex_data_impl(comm, s_data, raw_buffers, buffer_count, dest_rank)
# For caching purpose
return s_data, raw_buffers, len_buffers
return s_data, raw_buffers, buffer_count
def _isend_complex_data_impl(comm, s_data, raw_buffers, len_buffers, dest_rank):
def _isend_complex_data_impl(comm, s_data, raw_buffers, buffer_count, dest_rank):
"""

@@ -401,4 +432,6 @@ Send serialized complex data.

A list of pickle buffers.
len_buffers : list
A list of buffers amount for each object.
buffer_count : list
List of the number of buffers for each object
to be serialized/deserialized using the pickle 5 protocol.
See details in :py:class:`~unidist.core.backends.mpi.core.serialization.ComplexDataSerializer`.
dest_rank : int

@@ -413,20 +446,17 @@ Target MPI process to transfer data.

handlers = []
info = {
"s_data_len": len(s_data),
"buffer_count": buffer_count,
"raw_buffers_len": [len(sbuf) for sbuf in raw_buffers],
}
# Send message pack bytestring
h1 = mpi_isend_object(comm, len(s_data), dest_rank)
h2 = mpi_isend_buffer(comm, s_data, dest_rank)
h1 = comm.isend(info, dest=dest_rank)
handlers.append((h1, None))
handlers.append((h2, s_data))
# Send the necessary metadata
h3 = mpi_isend_object(comm, len(raw_buffers), dest_rank)
handlers.append((h3, None))
for raw_buffer in raw_buffers:
h4 = mpi_isend_object(comm, len(raw_buffer.raw()), dest_rank)
h5 = mpi_isend_buffer(comm, raw_buffer, dest_rank)
handlers.append((h4, None))
handlers.append((h5, raw_buffer))
# TODO: do not send if raw_buffers is zero
h6 = mpi_isend_object(comm, len_buffers, dest_rank)
handlers.append((h6, len_buffers))
with pkl5._bigmpi as bigmpi:
h2 = comm.Isend(bigmpi(s_data), dest=dest_rank)
handlers.append((h2, s_data))
for sbuf in raw_buffers:
h_sbuf = comm.Isend(bigmpi(sbuf), dest=dest_rank)
handlers.append((h_sbuf, sbuf))

@@ -436,5 +466,5 @@ return handlers

def _isend_complex_data(comm, data, dest_rank):
def isend_complex_data(comm, data, dest_rank):
"""
Send the data that consists of different user provided complex types, lambdas and buffers.
Send the data that consists of different user provided complex types, lambdas and buffers in a non-blocking way.

@@ -470,10 +500,10 @@ Non-blocking asynchronous interface.

raw_buffers = serializer.buffers
len_buffers = serializer.len_buffers
buffer_count = serializer.buffer_count
# Send message pack bytestring
handlers.extend(
_isend_complex_data_impl(comm, s_data, raw_buffers, len_buffers, dest_rank)
_isend_complex_data_impl(comm, s_data, raw_buffers, buffer_count, dest_rank)
)
return handlers, s_data, raw_buffers, len_buffers
return handlers, s_data, raw_buffers, buffer_count

@@ -502,20 +532,15 @@

# in a long running data receive operations.
buf_size = mpi_busy_wait_recv(comm, source_rank)
msgpack_buffer = bytearray(buf_size)
comm.Recv([msgpack_buffer, MPI.CHAR], source=source_rank)
# Recv pickle buffers array for all complex data frames
raw_buffers_size = comm.recv(source=source_rank)
# Pre-allocate pickle buffers list
raw_buffers = [None] * raw_buffers_size
for i in range(raw_buffers_size):
buf_size = comm.recv(source=source_rank)
recv_buffer = bytearray(buf_size)
comm.Recv([recv_buffer, MPI.CHAR], source=source_rank)
raw_buffers[i] = recv_buffer
# Recv len of buffers for each complex data frames
len_buffers = comm.recv(source=source_rank)
info = comm.recv(source=source_rank)
msgpack_buffer = bytearray(info["s_data_len"])
buffer_count = info["buffer_count"]
raw_buffers = list(map(bytearray, info["raw_buffers_len"]))
with pkl5._bigmpi as bigmpi:
comm.Recv(bigmpi(msgpack_buffer), source=source_rank)
for rbuf in raw_buffers:
comm.Recv(bigmpi(rbuf), source=source_rank)
# Set the necessary metadata for unpacking
deserializer = ComplexDataSerializer(raw_buffers, len_buffers)
deserializer = ComplexDataSerializer(raw_buffers, buffer_count)

@@ -531,8 +556,6 @@ # Start unpacking

def send_complex_operation(comm, operation_type, operation_data, dest_rank):
def send_simple_operation(comm, operation_type, operation_data, dest_rank):
"""
Send operation and data that consist of different user provided complex types, lambdas and buffers.
Send an operation type and standard Python data types in a blocking way.
The data is serialized with ``unidist.core.backends.mpi.core.ComplexDataSerializer``.
Parameters

@@ -542,3 +565,3 @@ ----------

MPI communicator object.
operation_type : ``unidist.core.backends.mpi.core.common.Operation``
operation_type : unidist.core.backends.mpi.core.common.Operation
Operation message type.

@@ -549,12 +572,19 @@ operation_data : object

Target MPI process to transfer data.
Notes
-----
* This blocking send is used when we have to wait for completion of the communication,
which is necessary for the pipeline to continue, or when the receiver is waiting for a result.
Otherwise, use non-blocking ``isend_simple_operation``.
* Serialization of the data to be sent takes place just using ``pickle.dump`` in this case.
"""
# Send operation type
comm.send(operation_type, dest=dest_rank)
# Send complex dictionary data
send_complex_data(comm, operation_data, dest_rank)
mpi_send_object(comm, operation_type, dest_rank)
# Send the details of a communication request
mpi_send_object(comm, operation_data, dest_rank)
def send_simple_operation(comm, operation_type, operation_data, dest_rank):
def isend_simple_operation(comm, operation_type, operation_data, dest_rank):
"""
Send an operation and standard Python data types.
Send an operation type and standard Python data types in a non-blocking way.

@@ -572,10 +602,19 @@ Parameters

Returns
-------
list
A list of pairs, ``MPI_Isend`` handler and associated data to send.
Notes
-----
Serialization is a simple pickle.dump in this case.
Serialization of the data to be sent takes place just using ``pickle.dump`` in this case.
"""
# Send operation type
mpi_send_object(comm, operation_type, dest_rank)
# Send request details
mpi_send_object(comm, operation_data, dest_rank)
handlers = []
h1 = mpi_isend_object(comm, operation_type, dest_rank)
handlers.append((h1, operation_type))
# Send the details of a communication request
h2 = mpi_isend_object(comm, operation_data, dest_rank)
handlers.append((h2, operation_data))
return handlers

@@ -606,87 +645,2 @@

def send_operation_data(comm, operation_data, dest_rank, is_serialized=False):
"""
Send data that consists of different user provided complex types, lambdas and buffers.
The data is serialized with ``unidist.core.backends.mpi.core.ComplexDataSerializer``.
Function works with already serialized data.
Parameters
----------
comm : object
MPI communicator object.
operation_data : object
Data object to send.
dest_rank : int
Target MPI process to transfer data.
is_serialized : bool
`operation_data` is already serialized or not.
Returns
-------
dict or None
Serialization data for caching purpose or nothing.
Notes
-----
Function returns ``None`` if `operation_data` is already serialized,
otherwise ``dict`` containing data serialized in this function.
"""
if is_serialized:
# Send already serialized data
s_data = operation_data["s_data"]
raw_buffers = operation_data["raw_buffers"]
len_buffers = operation_data["len_buffers"]
_send_complex_data_impl(comm, s_data, raw_buffers, len_buffers, dest_rank)
return None
else:
# Serialize and send the data
s_data, raw_buffers, len_buffers = send_complex_data(
comm, operation_data, dest_rank
)
return {
"s_data": s_data,
"raw_buffers": raw_buffers,
"len_buffers": len_buffers,
}
def send_operation(
comm, operation_type, operation_data, dest_rank, is_serialized=False
):
"""
Send operation and data that consists of different user provided complex types, lambdas and buffers.
The data is serialized with ``unidist.core.backends.mpi.core.ComplexDataSerializer``.
Function works with already serialized data.
Parameters
----------
comm : object
MPI communicator object.
operation_type : ``unidist.core.backends.mpi.core.common.Operation``
Operation message type.
operation_data : object
Data object to send.
dest_rank : int
Target MPI process to transfer data.
is_serialized : bool
`operation_data` is already serialized or not.
Returns
-------
dict or None
Serialization data for caching purpose.
Notes
-----
Function returns ``None`` if `operation_data` is already serialized,
otherwise ``dict`` containing data serialized in this function.
"""
# Send operation type
mpi_send_object(comm, operation_type, dest_rank)
# Send operation data
return send_operation_data(comm, operation_data, dest_rank, is_serialized)
def isend_complex_operation(

@@ -736,6 +690,6 @@ comm, operation_type, operation_data, dest_rank, is_serialized=False

raw_buffers = operation_data["raw_buffers"]
len_buffers = operation_data["len_buffers"]
buffer_count = operation_data["buffer_count"]
h2_list = _isend_complex_data_impl(
comm, s_data, raw_buffers, len_buffers, dest_rank
comm, s_data, raw_buffers, buffer_count, dest_rank
)

@@ -747,3 +701,3 @@ handlers.extend(h2_list)

# Serialize and send the data
h2_list, s_data, raw_buffers, len_buffers = _isend_complex_data(
h2_list, s_data, raw_buffers, buffer_count = isend_complex_data(
comm, operation_data, dest_rank

@@ -755,29 +709,8 @@ )

"raw_buffers": raw_buffers,
"len_buffers": len_buffers,
"buffer_count": buffer_count,
}
def send_remote_task_operation(comm, operation_type, operation_data, dest_rank):
def isend_serialized_operation(comm, operation_type, operation_data, dest_rank):
"""
Send operation and data that consist of different user provided complex types, lambdas and buffers.
Parameters
----------
comm : object
MPI communicator object.
operation_type : ``unidist.core.backends.mpi.core.common.Operation``
Operation message type.
operation_data : object
Data object to send.
dest_rank : int
Target MPI process to transfer data.
"""
# Send operation type
mpi_send_object(comm, operation_type, dest_rank)
# Serialize and send the complex data
send_complex_data(comm, operation_data, dest_rank)
def send_serialized_operation(comm, operation_type, operation_data, dest_rank):
"""
Send operation and serialized simple data.

@@ -795,7 +728,16 @@

Target MPI process to transfer data.
Returns
-------
list
A list of pairs, ``MPI_Isend`` handler and associated data to send.
"""
handlers = []
# Send operation type
mpi_send_object(comm, operation_type, dest_rank)
# Send request details
mpi_send_buffer(comm, len(operation_data), operation_data, dest_rank)
h1 = mpi_isend_object(comm, operation_type, dest_rank)
handlers.append((h1, operation_type))
# Send the details of a communication request
h2_list = mpi_isend_buffer(comm, len(operation_data), operation_data, dest_rank)
handlers.extend(h2_list)
return handlers

@@ -802,0 +744,0 @@

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -9,2 +9,3 @@ # SPDX-License-Identifier: Apache-2.0

import unidist.core.backends.mpi.core.communication as communication
from unidist.core.backends.mpi.core.async_operations import AsyncOperations
from unidist.core.backends.mpi.core.controller.object_store import object_store

@@ -54,3 +55,4 @@ from unidist.core.backends.mpi.core.controller.garbage_collector import (

}
communication.send_complex_operation(
async_operations = AsyncOperations.get_instance()
h_list, _ = communication.isend_complex_operation(
communication.MPIState.get_instance().comm,

@@ -61,3 +63,3 @@ operation_type,

)
async_operations.extend(h_list)
return output_id

@@ -118,3 +120,4 @@

}
communication.send_complex_operation(
async_operations = AsyncOperations.get_instance()
h_list, _ = communication.isend_complex_operation(
communication.MPIState.get_instance().comm,

@@ -125,2 +128,3 @@ operation_type,

)
async_operations.extend(h_list)

@@ -127,0 +131,0 @@ def _serialization_helper(self):

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -31,2 +31,3 @@ # SPDX-License-Identifier: Apache-2.0

import unidist.core.backends.mpi.core.communication as communication
from unidist.core.backends.mpi.core.async_operations import AsyncOperations
from unidist.config import (

@@ -37,2 +38,3 @@ CpuCount,

ValueSource,
MpiPickleThreshold,
)

@@ -96,5 +98,7 @@

if MpiHosts.get_value_source() != ValueSource.DEFAULT:
py_str += [f"cfg.MpiHosts.put({MpiHosts.get()})"]
py_str += [f"cfg.MpiHosts.put('{MpiHosts.get()}')"]
if CpuCount.get_value_source() != ValueSource.DEFAULT:
py_str += [f"cfg.CpuCount.put({CpuCount.get()})"]
if MpiPickleThreshold.get_value_source() != ValueSource.DEFAULT:
py_str += [f"cfg.MpiPickleThreshold.put({MpiPickleThreshold.get()})"]
py_str += ["unidist.init()"]

@@ -184,4 +188,8 @@ py_str = "; ".join(py_str)

for rank_id in range(communication.MPIRank.MONITOR, mpi_state.world_size):
# We use a blocking send here because we have to wait for
# completion of the communication, which is necessary for the pipeline to continue.
communication.mpi_send_object(mpi_state.comm, common.Operation.CANCEL, rank_id)
logger.debug("Shutdown rank {}".format(rank_id))
async_operations = AsyncOperations.get_instance()
async_operations.finish()
if not MPI.Is_finalized():

@@ -305,2 +313,4 @@ MPI.Finalize()

operation_data = {"id": data_id.base_data_id()}
# We use a blocking send here because we have to wait for
# completion of the communication, which is necessary for the pipeline to continue.
communication.send_simple_operation(

@@ -396,3 +406,4 @@ mpi_state.comm,

}
communication.send_remote_task_operation(
async_operations = AsyncOperations.get_instance()
h_list, _ = communication.isend_complex_operation(
communication.MPIState.get_instance().comm,

@@ -403,2 +414,3 @@ operation_type,

)
async_operations.extend(h_list)

@@ -405,0 +417,0 @@ # Track the task execution

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -12,2 +12,3 @@ # SPDX-License-Identifier: Apache-2.0

import unidist.core.backends.mpi.core.communication as communication
from unidist.core.backends.mpi.core.async_operations import AsyncOperations
from unidist.core.backends.mpi.core.controller.object_store import object_store

@@ -148,2 +149,4 @@

}
# We use a blocking send here because we have to wait for
# completion of the communication, which is necessary for the pipeline to continue.
communication.send_simple_operation(

@@ -181,2 +184,3 @@ mpi_state.comm,

mpi_state = communication.MPIState.get_instance()
async_operations = AsyncOperations.get_instance()
# Push the local master data to the target worker directly

@@ -186,3 +190,3 @@ operation_type = common.Operation.PUT_DATA

serialized_data = object_store.get_serialized_data(data_id)
communication.send_operation(
h_list, _ = communication.isend_complex_operation(
mpi_state.comm,

@@ -199,3 +203,3 @@ operation_type,

}
serialized_data = communication.send_operation(
h_list, serialized_data = communication.isend_complex_operation(
mpi_state.comm,

@@ -208,2 +212,3 @@ operation_type,

object_store.cache_serialized_data(data_id, serialized_data)
async_operations.extend(h_list)
# Remember pushed id

@@ -229,3 +234,4 @@ object_store.cache_send_info(data_id, dest_rank)

}
communication.send_simple_operation(
async_operations = AsyncOperations.get_instance()
h_list = communication.isend_simple_operation(
communication.MPIState.get_instance().comm,

@@ -236,2 +242,3 @@ operation_type,

)
async_operations.extend(h_list)

@@ -238,0 +245,0 @@

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -11,2 +11,3 @@ # SPDX-License-Identifier: Apache-2.0

import unidist.core.backends.mpi.core.communication as communication
from unidist.core.backends.mpi.core.async_operations import AsyncOperations
from unidist.core.backends.mpi.core.serialization import SimpleDataSerializer

@@ -65,9 +66,12 @@ from unidist.core.backends.mpi.core.controller.object_store import object_store

s_cleanup_list = SimpleDataSerializer().serialize_pickle(cleanup_list)
async_operations = AsyncOperations.get_instance()
for rank_id in range(initial_worker_number, mpi_state.world_size):
communication.send_serialized_operation(
mpi_state.comm,
common.Operation.CLEANUP,
s_cleanup_list,
rank_id,
)
if rank_id != mpi_state.rank:
h_list = communication.isend_serialized_operation(
mpi_state.comm,
common.Operation.CLEANUP,
s_cleanup_list,
rank_id,
)
async_operations.extend(h_list)

@@ -110,5 +114,7 @@ def increment_task_counter(self):

)
async_operations = AsyncOperations.get_instance()
# Check completion status of previous async MPI routines
async_operations.check()
if len(self._cleanup_list) > self._cleanup_list_threshold:
if self._cleanup_counter % self._cleanup_threshold == 0:
timestamp_snapshot = time.perf_counter()

@@ -120,2 +126,4 @@ if (timestamp_snapshot - self._timestamp) > self._time_threshold:

# Compare submitted and executed tasks
# We use a blocking send here because we have to wait for
# completion of the communication, which is necessary for the pipeline to continue.
communication.mpi_send_object(

@@ -122,0 +130,0 @@ mpi_state.comm,

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -16,2 +16,3 @@ # SPDX-License-Identifier: Apache-2.0

import unidist.core.backends.mpi.core.communication as communication
from unidist.core.backends.mpi.core.async_operations import AsyncOperations

@@ -60,2 +61,3 @@ # TODO: Find a way to move this after all imports

mpi_state = communication.MPIState.get_instance()
async_operations = AsyncOperations.get_instance()

@@ -70,2 +72,3 @@ while True:

elif operation_type == common.Operation.GET_TASK_COUNT:
# We use a blocking send here because the receiver is waiting for the result.
communication.mpi_send_object(

@@ -77,2 +80,3 @@ mpi_state.comm,

elif operation_type == common.Operation.CANCEL:
async_operations.finish()
if not MPI.Is_finalized():

@@ -83,1 +87,4 @@ MPI.Finalize()

raise ValueError("Unsupported operation!")
# Check completion status of previous async MPI routines
async_operations.check()

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -11,2 +11,3 @@ # SPDX-License-Identifier: Apache-2.0

from collections.abc import KeysView
from mpi4py.MPI import memory

@@ -27,2 +28,4 @@ # Serialization libraries

from unidist.config import MpiPickleThreshold
# Pickle 5 protocol compatible types check

@@ -93,13 +96,23 @@ compatible_modules = ("pandas", "numpy")

A list of ``PickleBuffer`` objects for data decoding.
len_buffers : list, default: None
A list of buffer sizes for data decoding.
buffer_count : list, default: None
List of the number of buffers for each object
to be serialized/deserialized using the pickle 5 protocol.
Notes
-----
Uses a combination of msgpack, cloudpickle and pickle libraries
Uses a combination of msgpack, cloudpickle and pickle libraries.
Msgpack allows to serialize/deserialize internal objects of a container separately,
but send them as one object. For example, for an array of pandas DataFrames,
each DataFrame will be serialized separately using pickle 5,
and all `buffers` will be stored in one array to be sent together.
To deserialize it `buffer_count` is used, which contains information
about the number of `buffers` for each internal object.
"""
def __init__(self, buffers=None, len_buffers=None):
# Minimum buffer size for serialization with pickle 5 protocol
PICKLE_THRESHOLD = MpiPickleThreshold.get()
def __init__(self, buffers=None, buffer_count=None):
self.buffers = buffers if buffers else []
self.len_buffers = len_buffers if len_buffers else []
self.buffer_count = buffer_count if buffer_count else []
self._callback_counter = 0

@@ -116,5 +129,8 @@

"""
self.buffers.append(pickle_buffer)
self._callback_counter += 1
return False
pickle_buffer = memory(pickle_buffer)
if len(pickle_buffer) >= self.PICKLE_THRESHOLD:
self.buffers.append(pickle_buffer)
self._callback_counter += 1
return False
return True

@@ -136,3 +152,3 @@ def _dataframe_encode(self, frame):

s_frame = pkl.dumps(frame, protocol=5, buffer_callback=self._buffer_callback)
self.len_buffers.append(self._callback_counter)
self.buffer_count.append(self._callback_counter)
self._callback_counter = 0

@@ -223,3 +239,3 @@ return {"__pickle5_custom__": True, "as_bytes": s_frame}

frame = pkl.loads(obj["as_bytes"], buffers=self.buffers)
del self.buffers[: self.len_buffers.pop(0)]
del self.buffers[: self.buffer_count.pop(0)]
return frame

@@ -226,0 +242,0 @@ else:

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -21,3 +21,3 @@ # SPDX-License-Identifier: Apache-2.0

from unidist.core.backends.mpi.core.worker.task_store import TaskStore
from unidist.core.backends.mpi.core.worker.async_operations import AsyncOperations
from unidist.core.backends.mpi.core.async_operations import AsyncOperations

@@ -32,5 +32,5 @@ # TODO: Find a way to move this after all imports

# we use the condition to set "worker_0.log" in order to build it succesfully.
log_file = "worker_{}.log".format(mpi_state.rank if mpi_state is not None else 0)
w_logger = common.get_logger("worker", log_file)
logger_name = "worker_{}".format(mpi_state.rank if mpi_state is not None else 0)
log_file = "{}.log".format(logger_name)
w_logger = common.get_logger(logger_name, log_file)
# Actors map {handle : actor}

@@ -111,2 +111,3 @@ actor_map = {}

request = communication.recv_simple_operation(mpi_state.comm, source_rank)
request["id"] = object_store.get_unique_data_id(request["id"])
request_store.process_get_request(

@@ -121,2 +122,3 @@ request["source"], request["id"], request["is_blocking_op"]

)
request["id"] = object_store.get_unique_data_id(request["id"])
object_store.put(request["id"], request["data"])

@@ -134,2 +136,3 @@

request = communication.recv_simple_operation(mpi_state.comm, source_rank)
request["id"] = object_store.get_unique_data_id(request["id"])
object_store.put_data_owner(request["id"], request["owner"])

@@ -146,2 +149,3 @@

w_logger.debug("WAIT for {} id".format(request["id"]._id))
request["id"] = object_store.get_unique_data_id(request["id"])
request_store.process_wait_request(request["id"])

@@ -148,0 +152,0 @@

@@ -1,5 +0,6 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
import weakref
import unidist.core.backends.mpi.core.common as common

@@ -9,11 +10,9 @@ import unidist.core.backends.mpi.core.communication as communication

mpi_state = communication.MPIState.get_instance()
# Logger configuration
# When building documentation we do not have MPI initialized so
# we use the condition to set "worker_0.log" in order to build it succesfully.
log_file = "worker_{}.log".format(
communication.MPIState.get_instance().rank
if communication.MPIState.get_instance() is not None
else 0
)
logger = common.get_logger("worker", log_file)
logger_name = "worker_{}".format(mpi_state.rank if mpi_state is not None else 0)
log_file = "{}.log".format(logger_name)
logger = common.get_logger(logger_name, log_file)

@@ -34,5 +33,9 @@

# Add local data {DataId : Data}
self._data_map = {}
self._data_map = weakref.WeakKeyDictionary()
# "strong" references to data IDs {DataId : DataId}
# we are using dict here to improve performance when getting an element from it,
# whereas other containers would require O(n) complexity
self._data_id_map = {}
# Data owner {DataId : Rank}
self._data_owner_map = {}
self._data_owner_map = weakref.WeakKeyDictionary()
# Data serialized cache

@@ -144,5 +147,29 @@ self._serialization_cache = {}

def get_unique_data_id(self, data_id):
"""
Get the "strong" reference to the data ID if it is already stored locally.
If the passed data ID is not stored locally yet, save and return it.
Parameters
----------
data_id : unidist.core.backends.common.data_id.DataID
An ID to data.
Returns
-------
unidist.core.backends.common.data_id.DataID
The unique ID to data.
Notes
-----
We need to use a unique data ID reference for the garbage colleactor to work correctly.
"""
if data_id not in self._data_id_map:
self._data_id_map[data_id] = data_id
return self._data_id_map[data_id]
def clear(self, cleanup_list):
"""
Clear all local dictionary data ID instances from `cleanup_list`.
Clear "strong" references to data IDs from `cleanup_list`.

@@ -153,12 +180,10 @@ Parameters

List of data IDs.
Notes
-----
The actual data will be collected later when there is no weak or
strong reference to data in the current worker.
"""
for data_id in cleanup_list:
if data_id in self._data_map:
logger.debug("CLEANUP DataMap id {}".format(data_id._id))
del self._data_map[data_id]
if data_id in self._data_owner_map:
logger.debug("CLEANUP DataOwnerMap id {}".format(data_id._id))
del self._data_owner_map[data_id]
if data_id in self._serialization_cache:
del self._serialization_cache[data_id]
self._data_id_map.pop(data_id, None)

@@ -165,0 +190,0 @@ def cache_serialized_data(self, data_id, data):

@@ -5,4 +5,4 @@ from collections import defaultdict

import unidist.core.backends.mpi.core.communication as communication
from unidist.core.backends.mpi.core.async_operations import AsyncOperations
from unidist.core.backends.mpi.core.worker.object_store import ObjectStore
from unidist.core.backends.mpi.core.worker.async_operations import AsyncOperations

@@ -14,4 +14,5 @@

# we use the condition to set "worker_0.log" in order to build it succesfully.
log_file = "worker_{}.log".format(mpi_state.rank if mpi_state is not None else 0)
logger = common.get_logger("worker", log_file)
logger_name = "worker_{}".format(mpi_state.rank if mpi_state is not None else 0)
log_file = "{}.log".format(logger_name)
logger = common.get_logger(logger_name, log_file)

@@ -162,5 +163,8 @@

if data_id in self._wait_request:
# Data is already in DataMap, so not problem here
communication.MPIState.get_instance().comm.send(
data_id, dest=communication.MPIRank.ROOT
# Data is already in DataMap, so not problem here.
# We use a blocking send here because the receiver is waiting for the result.
communication.mpi_send_object(
communication.MPIState.get_instance().comm,
data_id,
communication.MPIRank.ROOT,
)

@@ -170,4 +174,7 @@ del self._wait_request[data_id]

if data_ids in self._wait_request:
communication.MPIState.get_instance().comm.send(
data_ids, dest=communication.MPIRank.ROOT
# We use a blocking send here because the receiver is waiting for the result.
communication.mpi_send_object(
communication.MPIState.get_instance().comm,
data_ids,
communication.MPIRank.ROOT,
)

@@ -193,4 +200,7 @@ del self._wait_request[data_ids]

# Executor wait just for signal
communication.MPIState.get_instance().comm.send(
data_id, dest=communication.MPIRank.ROOT
# We use a blocking send here because the receiver is waiting for the result.
communication.mpi_send_object(
communication.MPIState.get_instance().comm,
data_id,
communication.MPIRank.ROOT,
)

@@ -230,2 +240,3 @@ logger.debug("Wait data {} id is ready".format(data_id._id))

operation_data = object_store.get(data_id)
# We use a blocking send here because the receiver is waiting for the result.
communication.send_complex_data(

@@ -232,0 +243,0 @@ mpi_state.comm,

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -13,15 +13,13 @@ # SPDX-License-Identifier: Apache-2.0

import unidist.core.backends.mpi.core.communication as communication
from unidist.core.backends.mpi.core.async_operations import AsyncOperations
from unidist.core.backends.mpi.core.worker.object_store import ObjectStore
from unidist.core.backends.mpi.core.worker.request_store import RequestStore
mpi_state = communication.MPIState.get_instance()
# Logger configuration
# When building documentation we do not have MPI initialized so
# we use the condition to set "worker_0.log" in order to build it succesfully.
log_file = "worker_{}.log".format(
communication.MPIState.get_instance().rank
if communication.MPIState.get_instance()
else 0
)
w_logger = common.get_logger("worker", log_file)
logger_name = "worker_{}".format(mpi_state.rank if mpi_state is not None else 0)
log_file = "{}.log".format(logger_name)
w_logger = common.get_logger(logger_name, log_file)

@@ -140,3 +138,4 @@

}
communication.send_simple_operation(
async_operations = AsyncOperations.get_instance()
h_list = communication.isend_simple_operation(
communication.MPIState.get_instance().comm,

@@ -147,2 +146,3 @@ operation_type,

)
async_operations.extend(h_list)

@@ -174,10 +174,12 @@ # Save request in order to prevent massive communication during pending task checks

if is_data_id(arg):
if ObjectStore.get_instance().contains(arg):
object_store = ObjectStore.get_instance()
arg = object_store.get_unique_data_id(arg)
if object_store.contains(arg):
value = ObjectStore.get_instance().get(arg)
# Data is already local or was pushed from master
return value, False
elif ObjectStore.get_instance().contains_data_owner(arg):
elif object_store.contains_data_owner(arg):
if not RequestStore.get_instance().is_already_requested(arg):
# Request the data from an owner worker
owner_rank = ObjectStore.get_instance().get_data_owner(arg)
owner_rank = object_store.get_data_owner(arg)
if owner_rank != communication.MPIState.get_instance().rank:

@@ -210,2 +212,3 @@ self.request_worker_data(owner_rank, arg)

"""
object_store = ObjectStore.get_instance()
if inspect.iscoroutinefunction(task):

@@ -243,5 +246,7 @@

for output_id in output_data_ids:
ObjectStore.get_instance().put(output_id, e)
data_id = object_store.get_unique_data_id(output_id)
object_store.put(data_id, e)
else:
ObjectStore.get_instance().put(output_data_ids, e)
data_id = object_store.get_unique_data_id(output_data_ids)
object_store.put(data_id, e)
else:

@@ -254,7 +259,7 @@ if output_data_ids is not None:

for output_id, value in zip(output_data_ids, output_values):
ObjectStore.get_instance().put(output_id, value)
data_id = object_store.get_unique_data_id(output_id)
object_store.put(data_id, value)
else:
ObjectStore.get_instance().put(
output_data_ids, output_values
)
data_id = object_store.get_unique_data_id(output_data_ids)
object_store.put(data_id, output_values)

@@ -265,2 +270,4 @@ RequestStore.get_instance().check_pending_get_requests(

# Monitor the task execution
# We use a blocking send here because we have to wait for
# completion of the communication, which is necessary for the pipeline to continue.
communication.mpi_send_object(

@@ -308,5 +315,7 @@ communication.MPIState.get_instance().comm,

for output_id in output_data_ids:
ObjectStore.get_instance().put(output_id, e)
data_id = object_store.get_unique_data_id(output_id)
object_store.put(data_id, e)
else:
ObjectStore.get_instance().put(output_data_ids, e)
data_id = object_store.get_unique_data_id(output_data_ids)
object_store.put(data_id, e)
else:

@@ -319,6 +328,10 @@ if output_data_ids is not None:

for output_id, value in zip(output_data_ids, output_values):
ObjectStore.get_instance().put(output_id, value)
data_id = object_store.get_unique_data_id(output_id)
object_store.put(data_id, value)
else:
ObjectStore.get_instance().put(output_data_ids, output_values)
# Monitor the task execution
data_id = object_store.get_unique_data_id(output_data_ids)
object_store.put(data_id, output_values)
# Monitor the task execution.
# We use a blocking send here because we have to wait for
# completion of the communication, which is necessary for the pipeline to continue.
communication.mpi_send_object(

@@ -325,0 +338,0 @@ communication.MPIState.get_instance().comm,

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -16,4 +16,4 @@ # SPDX-License-Identifier: Apache-2.0

DASK = "dask"
MP = "multiprocessing"
PY = "python"
PYMP = "pymp"
PYSEQ = "pyseq"

@@ -20,0 +20,0 @@

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -45,16 +45,16 @@ # SPDX-License-Identifier: Apache-2.0

backend_cls = MPIBackend()
elif backend_name == BackendName.MP:
from unidist.core.backends.multiprocessing.backend import MultiProcessingBackend
from unidist.core.backends.multiprocessing.utils import (
initialize_multiprocessing,
elif backend_name == BackendName.PYMP:
from unidist.core.backends.pymp.backend import PyMpBackend
from unidist.core.backends.pymp.utils import (
initialize_pymp,
)
initialize_multiprocessing()
backend_cls = MultiProcessingBackend()
elif backend_name == BackendName.PY:
from unidist.core.backends.python.backend import PythonBackend
from unidist.core.backends.python.utils import initialize_python
initialize_pymp()
backend_cls = PyMpBackend()
elif backend_name == BackendName.PYSEQ:
from unidist.core.backends.pyseq.backend import PySeqBackend
from unidist.core.backends.pyseq.utils import initialize_pyseq
initialize_python()
backend_cls = PythonBackend()
initialize_pyseq()
backend_cls = PySeqBackend()
else:

@@ -78,3 +78,2 @@ raise ImportError("Unrecognized execution backend.")

if backend is None:
backend_name = Backend.get()

@@ -93,12 +92,12 @@ if backend_name == BackendName.RAY:

backend_cls = MPIBackend()
elif backend_name == BackendName.MP:
from unidist.core.backends.multiprocessing.backend import (
MultiProcessingBackend,
elif backend_name == BackendName.PYMP:
from unidist.core.backends.pymp.backend import (
PyMpBackend,
)
backend_cls = MultiProcessingBackend()
elif backend_name == BackendName.PY:
from unidist.core.backends.python.backend import PythonBackend
backend_cls = PyMpBackend()
elif backend_name == BackendName.PYSEQ:
from unidist.core.backends.pyseq.backend import PySeqBackend
backend_cls = PythonBackend()
backend_cls = PySeqBackend()
else:

@@ -105,0 +104,0 @@ raise ValueError("Unrecognized execution backend.")

@@ -1,3 +0,3 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -29,3 +29,3 @@ # SPDX-License-Identifier: Apache-2.0

@pytest.mark.skipif(
sys.platform == "win32" and Backend.get() == BackendName.MP,
sys.platform == "win32" and Backend.get() == BackendName.PYMP,
reason="Details are in https://github.com/modin-project/unidist/issues/70.",

@@ -40,3 +40,3 @@ )

@pytest.mark.skipif(
sys.platform == "win32" and Backend.get() == BackendName.MP,
sys.platform == "win32" and Backend.get() == BackendName.PYMP,
reason="Details are in https://github.com/modin-project/unidist/issues/70.",

@@ -51,3 +51,3 @@ )

@pytest.mark.skipif(
sys.platform == "win32" and Backend.get() == BackendName.MP,
sys.platform == "win32" and Backend.get() == BackendName.PYMP,
reason="Details are in https://github.com/modin-project/unidist/issues/70.",

@@ -63,7 +63,7 @@ )

@pytest.mark.skipif(
Backend.get() == BackendName.MP,
reason="`multiprocessing` backend incorrectly frees grabbed actors. Details are in https://github.com/modin-project/unidist/issues/65.",
Backend.get() == BackendName.PYMP,
reason="`pymp` backend incorrectly frees grabbed actors. Details are in https://github.com/modin-project/unidist/issues/65.",
)
@pytest.mark.skipif(
sys.platform == "win32" and Backend.get() == BackendName.MP,
sys.platform == "win32" and Backend.get() == BackendName.PYMP,
reason="Details are in https://github.com/modin-project/unidist/issues/70.",

@@ -84,4 +84,4 @@ )

@pytest.mark.skipif(
Backend.get() == BackendName.MP,
reason="Proper serialization/deserialization is not implemented yet for multiprocessing",
Backend.get() == BackendName.PYMP,
reason="Proper serialization/deserialization is not implemented yet for pymp",
)

@@ -102,4 +102,4 @@ def test_global_capture():

@pytest.mark.skipif(
Backend.get() == BackendName.MP,
reason="Proper serialization/deserialization is not implemented yet for multiprocessing",
Backend.get() == BackendName.PYMP,
reason="Proper serialization/deserialization is not implemented yet for pymp",
)

@@ -120,3 +120,3 @@ def test_direct_capture():

@pytest.mark.skipif(
Backend.get() == BackendName.MP,
Backend.get() == BackendName.PYMP,
reason="Details are in https://github.com/modin-project/unidist/issues/70.",

@@ -130,4 +130,4 @@ )

@pytest.mark.skipif(
Backend.get() == BackendName.MP,
reason="Run of a remote task inside of an actor method is not implemented yet for multiprocessing",
Backend.get() == BackendName.PYMP,
reason="Run of a remote task inside of an actor method is not implemented yet for pymp",
)

@@ -134,0 +134,0 @@ @pytest.mark.skipif(

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -30,7 +30,7 @@ # SPDX-License-Identifier: Apache-2.0

@pytest.mark.skipif(
Backend.get() in (BackendName.MP, BackendName.PY),
reason="MP and PY backends do not support execution of coroutines.",
Backend.get() in (BackendName.PYMP, BackendName.PYSEQ),
reason="`pymp` and `pyseq` backends do not support execution of coroutines.",
)
@pytest.mark.skipif(
sys.platform == "win32" and Backend.get() == BackendName.MP,
sys.platform == "win32" and Backend.get() == BackendName.PYMP,
reason="Details are in https://github.com/modin-project/unidist/issues/70.",

@@ -47,7 +47,7 @@ )

@pytest.mark.skipif(
Backend.get() in (BackendName.MP, BackendName.PY),
reason="MP and PY backends do not support execution of coroutines.",
Backend.get() in (BackendName.PYMP, BackendName.PYSEQ),
reason="`pymp` and `pyseq` backends do not support execution of coroutines.",
)
@pytest.mark.skipif(
sys.platform == "win32" and Backend.get() == BackendName.MP,
sys.platform == "win32" and Backend.get() == BackendName.PYMP,
reason="Details are in https://github.com/modin-project/unidist/issues/70.",

@@ -62,7 +62,7 @@ )

@pytest.mark.skipif(
Backend.get() in (BackendName.MP, BackendName.PY),
reason="MP and PY backends do not support execution of coroutines.",
Backend.get() in (BackendName.PYMP, BackendName.PYSEQ),
reason="`pymp` and `pyseq` backends do not support execution of coroutines.",
)
@pytest.mark.skipif(
sys.platform == "win32" and Backend.get() == BackendName.MP,
sys.platform == "win32" and Backend.get() == BackendName.PYMP,
reason="Details are in https://github.com/modin-project/unidist/issues/70.",

@@ -78,11 +78,11 @@ )

@pytest.mark.skipif(
Backend.get() in (BackendName.MP, BackendName.PY),
reason="MP and PY backends do not support execution of coroutines.",
Backend.get() in (BackendName.PYMP, BackendName.PYSEQ),
reason="`pymp` and `pyseq` backends do not support execution of coroutines.",
)
@pytest.mark.skipif(
Backend.get() == BackendName.MP,
reason="`multiprocessing` backend incorrectly frees grabbed actors. Details are in https://github.com/modin-project/unidist/issues/65.",
Backend.get() == BackendName.PYMP,
reason="`pymp` backend incorrectly frees grabbed actors. Details are in https://github.com/modin-project/unidist/issues/65.",
)
@pytest.mark.skipif(
sys.platform == "win32" and Backend.get() == BackendName.MP,
sys.platform == "win32" and Backend.get() == BackendName.PYMP,
reason="Details are in https://github.com/modin-project/unidist/issues/70.",

@@ -111,4 +111,4 @@ )

@pytest.mark.skipif(
Backend.get() in (BackendName.MP, BackendName.PY),
reason="Proper serialization/deserialization is not implemented yet for multiprocessing and python",
Backend.get() in (BackendName.PYMP, BackendName.PYSEQ),
reason="Proper serialization/deserialization is not implemented yet for `pymp` and `pyseq`",
)

@@ -129,4 +129,4 @@ def test_global_capture():

@pytest.mark.skipif(
Backend.get() in (BackendName.MP, BackendName.PY),
reason="Proper serialization/deserialization is not implemented yet for multiprocessing and python",
Backend.get() in (BackendName.PYMP, BackendName.PYSEQ),
reason="Proper serialization/deserialization is not implemented yet for `pymp` and `pyseq`",
)

@@ -147,3 +147,3 @@ def test_direct_capture():

@pytest.mark.skipif(
Backend.get() in (BackendName.MP, BackendName.PY),
Backend.get() in (BackendName.PYMP, BackendName.PYSEQ),
reason="Details are in https://github.com/modin-project/unidist/issues/70.",

@@ -157,4 +157,4 @@ )

@pytest.mark.skipif(
Backend.get() in (BackendName.MP, BackendName.PY),
reason="MP and PY backends do not support execution of coroutines.",
Backend.get() in (BackendName.PYMP, BackendName.PYSEQ),
reason="`pymp` and `pyseq` backends do not support execution of coroutines.",
)

@@ -182,4 +182,4 @@ def test_pending_get():

@pytest.mark.skipif(
Backend.get() == BackendName.MP,
reason="Run of a remote task inside of an async actor method is not implemented yet for multiprocessing",
Backend.get() == BackendName.PYMP,
reason="Run of a remote task inside of an async actor method is not implemented yet for `pymp`",
)

@@ -186,0 +186,0 @@ def test_signal_actor():

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -18,7 +18,7 @@ # SPDX-License-Identifier: Apache-2.0

@pytest.mark.skipif(
Backend.get() == BackendName.MP,
reason="Hangs on `multiprocessing` backend. Details are in https://github.com/modin-project/unidist/issues/64.",
Backend.get() == BackendName.PYMP,
reason="Hangs on `pymp` backend. Details are in https://github.com/modin-project/unidist/issues/64.",
)
@pytest.mark.skipif(
sys.platform == "win32" and Backend.get() == BackendName.MP,
sys.platform == "win32" and Backend.get() == BackendName.PYMP,
reason="Details are in https://github.com/modin-project/unidist/issues/70.",

@@ -41,4 +41,4 @@ )

@pytest.mark.skipif(
Backend.get() == BackendName.MP,
reason="Hangs on `multiprocessing` backend. Details are in https://github.com/modin-project/unidist/issues/64.",
Backend.get() == BackendName.PYMP,
reason="Hangs on `pymp` backend. Details are in https://github.com/modin-project/unidist/issues/64.",
)

@@ -71,3 +71,3 @@ def test_wait():

def test_num_cpus():
if Backend.get() == BackendName.PY:
if Backend.get() == BackendName.PYSEQ:
assert_equal(unidist.num_cpus(), 1)

@@ -74,0 +74,0 @@ else:

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -88,4 +88,4 @@ # SPDX-License-Identifier: Apache-2.0

@pytest.mark.skipif(
Backend.get() == BackendName.MP,
reason="Run of a remote task inside of another one is not implemented yet for multiprocessing",
Backend.get() == BackendName.PYMP,
reason="Run of a remote task inside of another one is not implemented yet for pymp",
)

@@ -102,4 +102,4 @@ def test_internal_remote():

@pytest.mark.skipif(
Backend.get() == BackendName.MP,
reason="Serialization of `dict_keys` is not properly implemented yet for multiprocessing",
Backend.get() == BackendName.PYMP,
reason="Serialization of `dict_keys` is not properly implemented yet for pymp",
)

@@ -106,0 +106,0 @@ def test_serialize_dict_keys():

@@ -1,2 +0,2 @@

# Copyright (C) 2021-2022 Modin authors
# Copyright (C) 2021-2023 Modin authors
#

@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0

# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
import unidist.core.backends.mpi.core.common as common
import unidist.core.backends.mpi.core.communication as communication
# Logger configuration
# When building documentation we do not have MPI initialized so
# we use the condition to set "worker_0.log" in order to build it succesfully.
log_file = "worker_{}.log".format(
communication.MPIState.get_instance().rank
if communication.MPIState.get_instance() is not None
else 0
)
logger = common.get_logger("worker", log_file)
class AsyncOperations:
"""
Class that stores MPI async communication handlers.
Class holds a reference to sending data to prolong data lifetime during send operation.
"""
__instance = None
def __init__(self):
# I-prefixed mpi call handlers
self._send_async_handlers = []
@classmethod
def get_instance(cls):
"""
Get instance of ``AsyncOperations``.
Returns
-------
AsyncOperations
"""
if cls.__instance is None:
cls.__instance = AsyncOperations()
return cls.__instance
def extend(self, handlers_list):
"""
Extend internal list with `handler_list`.
Parameters
----------
handler_list : list
A list of pairs with handler and data reference.
"""
self._send_async_handlers.extend(handlers_list)
def check(self):
"""Check all MPI async send requests readiness and remove a reference to sending data."""
def is_ready(handler):
is_ready = handler.Test()
if is_ready:
logger.debug("CHECK ASYNC HANDLER {} - ready".format(handler))
else:
logger.debug("CHECK ASYNC HANDLER {} - not ready".format(handler))
return is_ready
# tup[0] - mpi async send handler object
self._send_async_handlers[:] = [
tup for tup in self._send_async_handlers if not is_ready(tup[0])
]
def finish(self):
"""Cancel all MPI async send requests."""
for handler, data in self._send_async_handlers:
logger.debug("WAIT ASYNC HANDLER {}".format(handler))
handler.Cancel()
handler.Wait()
self._send_async_handlers.clear()
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""MultiProcessing backend functionality."""
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Actor specific functionality using MultiProcessing backend."""
import unidist.core.backends.multiprocessing.core as mp
from unidist.core.base.actor import Actor, ActorMethod
from unidist.core.base.object_ref import ObjectRef
class MultiProcessingActorMethod(ActorMethod):
"""
The class implements the interface in ``ActorMethod`` using MultiProcessing backend.
Parameters
----------
cls : unidist.core.backends.multiprocessing.core.Actor
An actor class from which method `method_name` will be called.
method_name : str
The name of the method to be called.
"""
def __init__(self, cls, method_name):
self._cls = cls
self._method_name = method_name
self._num_returns = 1
def _remote(self, *args, num_returns=None, **kwargs):
"""
Execute `self._method_name` in a worker process.
Parameters
----------
*args : iterable
Positional arguments to be passed in the method.
num_returns : int, optional
Number of results to be returned. If it isn't
provided, `self._num_returns` will be used.
**kwargs : dict
Keyword arguments to be passed in the method.
Returns
-------
ObjectRef, list or None
Type of returns depends on `num_returns` value:
* if `num_returns == 1`, ``ObjectRef`` will be returned.
* if `num_returns > 1`, list of ``ObjectRef``-s will be returned.
* if `num_returns == 0`, ``None`` will be returned.
"""
if num_returns is None:
num_returns = self._num_returns
class_method = getattr(self._cls, self._method_name)
data_ids = class_method.submit(*args, num_returns=num_returns, **kwargs)
if num_returns == 1:
return ObjectRef(data_ids)
elif num_returns > 1:
return [ObjectRef(data_id) for data_id in data_ids]
elif num_returns == 0:
return None
class MultiProcessingActor(Actor):
"""
The class implements the interface in ``Actor`` using MultiProcessing backend.
Parameters
----------
cls : object
Class to be an actor class.
num_cpus : int
The number of CPUs to reserve for the lifetime of the actor.
resources : dict
Custom resources to reserve for the lifetime of the actor.
"""
def __init__(self, cls, num_cpus, resources):
self._cls = cls
self._num_cpus = num_cpus
self._resources = resources
self._actor_handle = None
def __getattr__(self, name):
"""
Get the attribute `name` of the `self._cls` class.
This methods creates the ``MultiProcessingActorMethod`` object that is responsible
for calling method `name` of the `self._cls` class remotely.
Parameters
----------
name : str
Name of the method to be called remotely.
Returns
-------
MultiProcessingActorMethod
"""
return MultiProcessingActorMethod(self._actor_handle, name)
def _remote(self, *args, num_cpus=None, resources=None, **kwargs):
"""
Create actor class, specific for MultiProcessing backend, from `self._cls`.
Parameters
----------
*args : iterable
Positional arguments to be passed in `self._cls` class constructor.
num_cpus : int, optional
The number of CPUs to reserve for the lifetime of the actor.
resources : dict, optional
Custom resources to reserve for the lifetime of the actor.
**kwargs : dict
Keyword arguments to be passed in `self._cls` class constructor.
Returns
-------
MultiProcessingActor
"""
if num_cpus is not None or self._num_cpus is not None:
raise NotImplementedError(
"'num_cpus' is not supported yet by MultiProcessing backend."
)
if resources is not None or self._resources is not None:
raise NotImplementedError(
"'resources' is not supported yet by MultiProcessing backend."
)
self._actor_handle = mp.Actor(self._cls, *args, **kwargs)
return self
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""An implementation of ``Backend`` interface using MultiProcessing."""
import socket
from unidist.config import CpuCount
import unidist.core.backends.multiprocessing.core as mp
from unidist.core.backends.multiprocessing.actor import MultiProcessingActor
from unidist.core.backends.multiprocessing.remote_function import (
MultiProcessingRemoteFunction,
)
from unidist.core.base.backend import Backend
class MultiProcessingBackend(Backend):
"""The class that implements the interface in ``Backend`` using MultiProcessing."""
@staticmethod
def make_remote_function(function, num_cpus, num_returns, resources):
"""
Define a remote function.
function : callable
Function to be a remote function.
num_cpus : int
The number of CPUs to reserve for the remote function.
num_returns : int
The number of ``ObjectRef``-s returned by the remote function invocation.
resources : dict
Custom resources to reserve for the remote function.
Returns
-------
MultiProcessingRemoteFunction
"""
return MultiProcessingRemoteFunction(function, num_cpus, num_returns, resources)
@staticmethod
def make_actor(cls, num_cpus, resources):
"""
Define an actor class.
cls : object
Class to be an actor class.
num_cpus : int
The number of CPUs to reserve for the lifetime of the actor.
resources : dict
Custom resources to reserve for the lifetime of the actor.
Returns
-------
MultiProcessingActor
The actor class type to create.
list
The list of arguments for ``MultiProcessingActor`` constructor.
"""
return MultiProcessingActor, [cls, num_cpus, resources]
@staticmethod
def get(data_ids):
"""
Get a remote object or a list of remote objects
from distributed memory.
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
``DataID`` or a list of ``DataID`` objects to get data from.
Returns
-------
object
A Python object or a list of Python objects.
"""
return mp.get(data_ids)
@staticmethod
def put(data):
"""
Put `data` into distributed memory.
Parameters
----------
data : object
Data to be put.
Returns
-------
unidist.core.backends.common.data_id.DataID
``DataID`` matching to data.
"""
return mp.put(data)
@staticmethod
def wait(data_ids, num_returns=1):
"""
Wait until `data_ids` are finished.
This method returns two lists. The first list consists of
data IDs that correspond to objects that completed computations.
The second list corresponds to the rest of the data IDs (which may or may not be ready).
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
``DataID`` or list of ``DataID``-s to be waited.
num_returns : int, default: 1
The number of ``DataID``-s that should be returned as ready.
Returns
-------
tuple
List of data IDs that are ready and list of the remaining data IDs.
"""
return mp.wait(data_ids, num_returns=num_returns)
@staticmethod
def get_ip():
"""
Get node IP address.
Returns
-------
str
Node IP address.
"""
hostname = socket.gethostname()
return socket.gethostbyname(hostname)
@staticmethod
def num_cpus():
"""
Get the number of CPUs used by the execution backend.
Returns
-------
int
"""
return CpuCount.get()
@staticmethod
def cluster_resources():
"""
Get resources of MultiProcessing cluster.
Returns
-------
dict
Dictionary with node info in the form `{"node_ip": {"CPU": x}}`.
"""
return {
MultiProcessingBackend.get_ip(): {"CPU": MultiProcessingBackend.num_cpus()}
}
@staticmethod
def shutdown():
"""
Shutdown MultiProcessing execution backend.
Note
----
Not supported yet.
"""
raise NotImplementedError(
"'shutdown' is not supported yet by MultiProcessing backend."
)
@staticmethod
def is_initialized():
"""
Check if Multiprocessing backend has already been initialized.
Returns
-------
bool
True or False.
"""
return mp.is_initialized()
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""MultiProcessing backend core functionality."""
from .actor import Actor
from .api import put, wait, get, submit, init, is_initialized
__all__ = ["Actor", "put", "wait", "get", "submit", "init", "is_initialized"]
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Actor specific functionality implemented using Python multiprocessing."""
import cloudpickle as pkl
from multiprocessing.managers import BaseManager
from unidist.core.backends.multiprocessing.core.object_store import ObjectStore, Delayed
from unidist.core.backends.multiprocessing.core.process_manager import (
ProcessManager,
Task,
)
class ActorMethod:
"""
Class is responsible to execute `method_name` of
`cls_obj` in the separate worker-process of `actor` object.
Parameters
----------
cls_obj : multiprocessing.managers.BaseManager
Shared manager-class.
actor : Actor
Actor object.
method_name : str
The name of the method to be called.
obj_store : unidist.core.backends.multiprocessing.core.object_store.ObjectStore
Object storage to share data between workers.
"""
def __init__(self, cls_obj, actor, method_name, obj_store):
self._cls_obj = cls_obj
self._method_name = method_name
self._actor = actor
self._obj_store = obj_store
def submit(self, *args, num_returns=1, **kwargs):
"""
Execute `self._method_name` asynchronously in the worker of `self._actor`.
Parameters
----------
*args : iterable
Positional arguments to be passed in the `self._method_name` method.
num_returns : int, default: 1
Number of results to be returned from `self._method_name`.
**kwargs : dict
Keyword arguments to be passed in the `self._method_name` method.
Returns
-------
unidist.core.backends.common.data_id.DataID, list or None
Type of returns depends on `num_returns` value:
* if `num_returns == 1`, ``DataID`` will be returned.
* if `num_returns > 1`, list of ``DataID``-s will be returned.
* if `num_returns == 0`, ``None`` will be returned.
"""
if num_returns == 0:
data_ids = None
elif num_returns > 1:
data_ids = [self._obj_store.put(Delayed()) for _ in range(num_returns)]
else:
data_ids = self._obj_store.put(Delayed())
cls_method = getattr(self._cls_obj, self._method_name)
task = Task(cls_method, data_ids, self._obj_store, *args, **kwargs)
self._actor.submit(task)
return data_ids
class Actor:
"""
Actor-class to execute methods of wrapped class in a separate worker.
Parameters
----------
cls : object
Class to be an actor class.
*args : iterable
Positional arguments to be passed in `cls` constructor.
**kwargs : dict
Keyword arguments to be passed in `cls` constructor.
Notes
-----
Python multiprocessing manager-class will be created to wrap `cls`.
This makes `cls` class object shared between different workers. Manager-class
starts additional process to share class state between processes.
Methods of `cls` class object are executed in the worker, grabbed from a workers pool.
"""
def __init__(self, cls, *args, **kwargs):
self._worker = None
self._worker_id = None
self._obj_store = ObjectStore.get_instance()
# FIXME : Change "WrappedClass" -> cls.__name__ + "Manager", for example.
BaseManager.register("WrappedClass", cls)
manager = BaseManager()
manager.start()
self._cls_obj = manager.WrappedClass(*args, **kwargs)
self._worker, self._worker_id = ProcessManager.get_instance().grab_worker()
def __getattr__(self, name):
"""
Get the attribute `name` of the `self._cls_obj` class.
This methods creates the ``ActorMethod`` object that is responsible
for calling method `name` of the `self._cls_obj` class remotely.
Parameters
----------
name : str
Name of the method to be called remotely.
Returns
-------
ActorMethod
"""
return ActorMethod(self._cls_obj, self, name, self._obj_store)
def submit(self, task):
"""
Execute `task` asynchronously in the worker grabbed by this actor.
Parameters
----------
task : unidist.core.backends.multiprocessing.core.process_manager.Task
Task object holding callable function.
"""
self._worker.add_task(pkl.dumps(task))
def __del__(self):
"""
Destructor of the actor.
Free worker, grabbed from the workers pool.
"""
if self._worker_id is not None:
ProcessManager.get_instance().free_worker(self._worker_id)
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""High-level API of MultiProcessing backend."""
import cloudpickle as pkl
from unidist.config import CpuCount
from unidist.core.backends.multiprocessing.core.object_store import ObjectStore, Delayed
from unidist.core.backends.multiprocessing.core.process_manager import (
ProcessManager,
Task,
)
# The global variable is responsible for if Multiprocessing backend has already been initialized
is_multiprocessing_initialized = False
def init(num_workers=CpuCount.get()):
"""
Initialize shared object storage and workers pool.
Parameters
----------
num_workers : int, default: number of CPUs
Number of worker-processes to start.
Notes
-----
Run initialization of singleton objects ``unidist.core.backends.multiprocessing.core.object_store.ObjectStore``
and ``unidist.core.backends.multiprocessing.core.process_manager.ProcessManager``.
"""
ObjectStore.get_instance()
ProcessManager.get_instance(num_workers=num_workers)
global is_multiprocessing_initialized
if not is_multiprocessing_initialized:
is_multiprocessing_initialized = True
def is_initialized():
"""
Check if Multiprocessing backend has already been initialized.
Returns
-------
bool
True or False.
"""
global is_multiprocessing_initialized
return is_multiprocessing_initialized
def put(data):
"""
Put data into shared object storage.
Parameters
----------
data : object
Data to be put.
Returns
-------
unidist.core.backends.common.data_id.DataID
An ID of object in shared object storage.
"""
return ObjectStore.get_instance().put(data)
def get(data_ids):
"""
Get a object(s) associated with `data_ids` from the shared object storage.
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
An ID(s) to object(s) to get data from.
Returns
-------
object
A Python object.
"""
return ObjectStore.get_instance().get(data_ids)
def wait(data_ids, num_returns=1):
"""
Wait until `data_ids` are finished.
This method returns two lists. The first list consists of
``DataID``-s that correspond to objects that completed computations.
The second list corresponds to the rest of the ``DataID``-s (which may or may not be ready).
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
``DataID`` or list of ``DataID``-s to be waited.
num_returns : int, default: 1
The number of ``DataID``-s that should be returned as ready.
Returns
-------
tuple
List of data IDs that are ready and list of the remaining data IDs.
"""
return ObjectStore.get_instance().wait(data_ids, num_returns=num_returns)
def submit(func, *args, num_returns=1, **kwargs):
"""
Execute function in a worker process.
Parameters
----------
func : callable
Function to be executed in the worker.
*args : iterable
Positional arguments to be passed in the `func`.
num_returns : int, default: 1
Number of results to be returned from `func`.
**kwargs : dict
Keyword arguments to be passed in the `func`.
Returns
-------
unidist.core.backends.common.data_id.DataID, list or None
Type of returns depends on `num_returns` value:
* if `num_returns == 1`, ``DataID`` will be returned.
* if `num_returns > 1`, list of ``DataID``-s will be returned.
* if `num_returns == 0`, ``None`` will be returned.
"""
obj_store = ObjectStore.get_instance()
if num_returns == 0:
data_ids = None
elif num_returns > 1:
data_ids = [obj_store.put(Delayed()) for _ in range(num_returns)]
else:
data_ids = obj_store.put(Delayed())
task = Task(func, data_ids, obj_store, *args, **kwargs)
ProcessManager.get_instance().submit(pkl.dumps(task))
return data_ids
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Shared object storage related functionality."""
import cloudpickle as pkl
from multiprocessing import Manager
from unidist.core.backends.common.data_id import DataID
class Delayed:
"""Class-type that used for replacement objects during computation of those."""
pass
class ObjectStore:
"""
Class that stores objects and provides access to these from different processes.
Notes
-----
Shared storage is organized using ``multiprocessing.Manager.dict``. This is separate
process which starts work in the class constructor.
"""
__instance = None
def __init__(self):
if ObjectStore.__instance is None:
self.store_delayed = Manager().dict()
def __repr__(self):
return f"Object store: {self.store_delayed}"
@classmethod
def get_instance(cls):
"""
Get instance of ``ObjectStore``.
Returns
-------
unidist.core.backends.multiprocessing.core.object_store.ObjectStore
"""
if cls.__instance is None:
cls.__instance = ObjectStore()
return cls.__instance
def put(self, data, data_id=None):
"""
Put `data` to internal shared dictionary.
Parameters
----------
data : object
Data to be put.
data_id : unidist.core.backends.common.data_id.DataID, optional
An ID to data. If it isn't provided, will be created automatically.
Returns
-------
unidist.core.backends.common.data_id.DataID
An ID of object in internal shared dictionary.
"""
data_id = DataID() if data_id is None else data_id
self.store_delayed[data_id] = pkl.dumps(data) if callable(data) else data
return data_id
def get(self, data_ids):
"""
Get a object(s) associated with `data_ids` from the shared internal dictionary.
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
An ID(s) of object(s) to get data from.
Returns
-------
object
A Python object.
"""
is_list = isinstance(data_ids, list)
if not is_list:
data_ids = [data_ids]
if not all(isinstance(ref, DataID) for ref in data_ids):
raise ValueError(
"`data_ids` must either be a data ID or a list of data IDs."
)
values = [None] * len(data_ids)
for idx, data_id in enumerate(data_ids):
while isinstance(self.store_delayed[data_id], Delayed):
pass
value = self.store_delayed[data_id]
if isinstance(value, Exception):
raise value
values[idx] = pkl.loads(value) if isinstance(value, bytes) else value
return values if is_list else values[0]
def wait(self, data_ids, num_returns=1):
"""
Wait until `data_ids` are finished.
This method returns two lists. The first list consists of
``DataID``-s that correspond to objects that completed computations.
The second list corresponds to the rest of the ``DataID``-s (which may or may not be ready).
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
``DataID`` or list of ``DataID``-s to be waited.
num_returns : int, default: 1
The number of ``DataID``-s that should be returned as ready.
Returns
-------
tuple
List of data IDs that are ready and list of the remaining data IDs.
"""
if not isinstance(data_ids, list):
data_ids = [data_ids]
ready = list()
not_ready = list()
for idx, data_id in enumerate(data_ids[:]):
if not isinstance(self.store_delayed[data_id], Delayed):
ready.append(data_ids.pop(idx))
if len(ready) == num_returns:
break
not_ready = data_ids
while len(ready) != num_returns:
self.get(not_ready[0])
ready.append(not_ready.pop(0))
return ready, not_ready
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Workers related functionality."""
import cloudpickle as pkl
from multiprocessing import (
Process,
JoinableQueue,
)
from unidist.config import CpuCount
from unidist.core.backends.common.data_id import DataID
from unidist.core.backends.multiprocessing.core.object_store import ObjectStore
class Worker(Process):
"""
Class-process that executes tasks from `self.task_queue`.
Parameters
----------
task_queue : multiprocessing.JoinableQueue
A queue of task to execute.
obj_store : unidist.core.backends.multiprocessing.core.object_store.ObjectStore
Shared object storage to read/write data.
"""
def __init__(self, task_queue, obj_store):
Process.__init__(self, daemon=True)
self.task_queue = task_queue
self._obj_store = obj_store
def run(self):
"""Run main infinite loop of process to execute tasks from `self.task_queue`."""
while 1:
task = self.task_queue.get()
task = pkl.loads(task)
data_ids = task.data_ids
try:
value = task()
except Exception as e:
if isinstance(data_ids, list) and len(data_ids) > 1:
for i, data_id in enumerate(data_ids):
self._obj_store.store_delayed[data_id] = e
else:
self._obj_store.store_delayed[data_ids] = e
else:
if data_ids is not None:
if isinstance(data_ids, list) and len(data_ids) > 1:
for data_id, val in zip(data_ids, value):
self._obj_store.store_delayed[data_id] = val
else:
self._obj_store.store_delayed[data_ids] = value
finally:
self.task_queue.task_done()
return
def add_task(self, task):
"""
Add `task` to `self.task_queue`.
Parameters
----------
task : unidist.core.backends.multiprocessing.core.process_manager.Task
Task to be added in the queue.
"""
self.task_queue.put(task)
class ProcessManager:
"""
Class that controls worker pool and assings task to workers.
Parameters
----------
num_workers : int, optional
Number of worker-processes to start. If isn't provided,
will be equal to number of CPUs.
Notes
-----
Constructor starts `num_workers` MultiProcessing Workers.
"""
__instance = None
def __init__(self, num_workers=None):
if ProcessManager.__instance is None:
if num_workers is None:
num_workers = CpuCount.get()
self.workers = [None] * num_workers
self.grabbed_workers = [None] * num_workers
self.__class__._worker_id = 0
obj_store = ObjectStore.get_instance()
for idx in range(num_workers):
self.workers[idx] = Worker(JoinableQueue(), obj_store)
self.workers[idx].start()
self.grabbed_workers[idx] = False
@classmethod
def get_instance(cls, num_workers=None):
"""
Get instance of ``ProcessManager``.
Returns
-------
unidist.core.backends.multiprocessing.core.process_manager.ProcessManager
"""
if cls.__instance is None:
cls.__instance = ProcessManager(num_workers=num_workers)
return cls.__instance
def _next(self):
"""
Get current worker index and move to another with incrementing by one.
Returns
-------
int
"""
idx = self.__class__._worker_id
self.__class__._worker_id += 1
if self.__class__._worker_id == len(self.workers):
self.__class__._worker_id = 0
return idx
def grab_worker(self):
"""
Grab a worker from worker pool.
Grabbed worker is marked as `blocked` and doesn't participate
in the tasks submission.
Returns
-------
unidist.core.backends.multiprocessing.core.process_manager.Worker
Grabbed worker.
int
Index of grabbed worker.
"""
for idx, is_grabbed in enumerate(self.grabbed_workers):
if not is_grabbed:
self.grabbed_workers[idx] = True
return self.workers[idx], idx
raise RuntimeError("Actor can`t be run, no available workers.")
def free_worker(self, idx):
"""
Free worker by index `idx`.
Parameters
----------
idx : int
Index of worker to be freed.
"""
self.grabbed_workers[idx] = False
def submit(self, task):
"""
Add `task` to task queue of one of workers using round-robin.
Parameters
----------
task : unidist.core.backends.multiprocessing.core.process_manager.Task
Task to be added in task queue.
"""
num_skipped = 0
while num_skipped < len(self.workers):
idx = self._next()
if not self.grabbed_workers[idx]:
self.workers[idx].add_task(task)
return
else:
num_skipped += 1
raise RuntimeError("Task can`t be run, no available workers.")
class Task:
"""
Class poses as unified callable object to execute in MultiProcessing Worker.
Parameters
----------
func : callable
A function to be called in object invocation.
data_ids : unidist.core.backends.common.data_id.DataID or list
``DataID``-(s) associated with result(s) of `func` invocation.
obj_store : unidist.core.backends.multiprocessing.core.object_store.ObjectStore
Object storage to share data between workers.
*args : iterable
Positional arguments to be passed in the `func`.
**kwargs : dict
Keyword arguments to be passed in the `func`.
"""
def __init__(self, func, data_ids, obj_store, *args, **kwargs):
self._func = func
self._args = args
self._kwargs = kwargs
self.data_ids = data_ids
self.obj_store = obj_store
def __call__(self):
"""
Execute `self._func`.
If `self._args`/`self._kwargs` has ``DataID`` objects,
automaterialize happens.
Returns
-------
object
The result of `self._func` invocation.
"""
materialized_args = [
self.obj_store.get(arg) if isinstance(arg, DataID) else arg
for arg in self._args
]
materialized_kwargs = {
key: self.obj_store.get(value) if isinstance(value, DataID) else value
for key, value in self._kwargs.items()
}
return self._func(*materialized_args, **materialized_kwargs)
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""An implementation of ``RemoteFunction`` interface using MultiProcessing."""
import unidist.core.backends.multiprocessing.core as mp
from unidist.core.base.object_ref import ObjectRef
from unidist.core.base.remote_function import RemoteFunction
class MultiProcessingRemoteFunction(RemoteFunction):
"""
The class that implements the interface in ``RemoteFunction`` using MultiProcessing.
Parameters
----------
function : callable
A function to be called remotely.
num_cpus : int
The number of CPUs to reserve for the remote function.
num_returns : int
The number of ``ObjectRef``-s returned by the remote function invocation.
resources : dict
Custom resources to reserve for the remote function.
"""
def __init__(self, function, num_cpus, num_returns, resources):
self._remote_function = function
self._num_cpus = num_cpus
self._num_returns = 1 if num_returns is None else num_returns
self._resources = resources
def _remote(self, *args, num_cpus=None, num_returns=None, resources=None, **kwargs):
"""
Execute `self._remote_function` in a worker process.
Parameters
----------
*args : iterable
Positional arguments to be passed in the `self._remote_function`.
num_cpus : int, optional
The number of CPUs to reserve for the remote function.
num_returns : int, optional
The number of ``ObjectRef``-s returned by the remote function invocation.
resources : dict, optional
Custom resources to reserve for the remote function.
**kwargs : dict
Keyword arguments to be passed in the `self._remote_function`.
Returns
-------
ObjectRef, list or None
Type of returns depends on `num_returns` value:
* if `num_returns == 1`, ``ObjectRef`` will be returned.
* if `num_returns > 1`, list of ``ObjectRef``-s will be returned.
* if `num_returns == 0`, ``None`` will be returned.
"""
if num_cpus is not None or self._num_cpus is not None:
raise NotImplementedError(
"'num_cpus' is not supported yet by MultiProcessing backend."
)
if resources is not None or self._resources is not None:
raise NotImplementedError(
"'resources' is not supported yet by MultiProcessing backend."
)
if num_returns is None:
num_returns = self._num_returns
data_ids = mp.submit(
self._remote_function, *args, num_returns=num_returns, **kwargs
)
if num_returns == 1:
return ObjectRef(data_ids)
elif num_returns > 1:
return [ObjectRef(data_id) for data_id in data_ids]
elif num_returns == 0:
return None
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Utilities used to initialize MultiProcessing execution backend."""
from unidist.config import CpuCount
def initialize_multiprocessing():
"""
Initialize the MultiProcessing execution backend.
Notes
-----
Number of workers for MultiProcessing is equal to number of CPUs used by the backend.
"""
from unidist.core.backends.multiprocessing.core import init
init(num_workers=CpuCount.get())
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Python backend functionality."""
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Actor specific functionality using Python backend."""
import unidist.core.backends.python.core as py
from unidist.core.base.actor import Actor, ActorMethod
from unidist.core.base.object_ref import ObjectRef
class PythonActorMethod(ActorMethod):
"""
The class implements the interface in ``ActorMethod`` using Python backend.
Parameters
----------
cls : object
An actor class from which method `method_name` will be called.
method_name : str
The name of the method to be called.
"""
def __init__(self, cls, method_name):
self._cls = cls
self._method_name = method_name
self._num_returns = 1
def _remote(self, *args, num_returns=None, **kwargs):
"""
Execute `self._method_name`.
Parameters
----------
*args : iterable
Positional arguments to be passed in the method.
num_returns : int, optional
Number of results to be returned. If it isn't
provided, `self._num_returns` will be used.
**kwargs : dict
Keyword arguments to be passed in the method.
Returns
-------
ObjectRef, list or None
Type of returns depends on `num_returns` value:
* if `num_returns == 1`, ``ObjectRef`` will be returned.
* if `num_returns > 1`, list of ``ObjectRef``-s will be returned.
* if `num_returns == 0`, ``None`` will be returned.
"""
if num_returns is None:
num_returns = self._num_returns
class_method = getattr(self._cls, self._method_name)
data_ids = py.submit(class_method, *args, num_returns=num_returns, **kwargs)
if num_returns == 1:
return ObjectRef(data_ids)
elif num_returns > 1:
return [ObjectRef(data_id) for data_id in data_ids]
elif num_returns == 0:
return None
class PythonActor(Actor):
"""
The class implements the interface in ``Actor`` using Python backend.
Parameters
----------
cls : object
Class to be an actor class.
num_cpus : int
The number of CPUs to reserve for the lifetime of the actor.
resources : dict
Custom resources to reserve for the lifetime of the actor.
"""
def __init__(self, cls, num_cpus, resources):
self._cls = cls
self._num_cpus = num_cpus
self._resources = resources
self._actor_handle = None
def __getattr__(self, name):
"""
Get the attribute `name` of the `self._cls` class.
This method creates the ``PythonActorMethod`` object that is responsible
for calling method `name` of the `self._cls` class.
Parameters
----------
name : str
Name of the method to be called.
Returns
-------
PythonActorMethod
"""
return PythonActorMethod(self._actor_handle, name)
def _remote(self, *args, num_cpus=None, resources=None, **kwargs):
"""
Create actor class, specific for Python backend, from `self._cls`.
Parameters
----------
*args : iterable
Positional arguments to be passed in `self._cls` class constructor.
num_cpus : int, optional
The number of CPUs to reserve for the lifetime of the actor.
resources : dict, optional
Custom resources to reserve for the lifetime of the actor.
**kwargs : dict
Keyword arguments to be passed in `self._cls` class constructor.
Returns
-------
PythonActor
"""
if num_cpus is not None or self._num_cpus is not None:
raise NotImplementedError("'num_cpus' is not supported by Python backend.")
if resources is not None or self._resources is not None:
raise NotImplementedError("'resources' is not supported by Python backend.")
self._actor_handle = self._cls(*args, **kwargs)
return self
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""An implementation of ``Backend`` interface using Python."""
import socket
import unidist.core.backends.python.core as py
from unidist.core.backends.python.actor import PythonActor
from unidist.core.backends.python.remote_function import (
PythonRemoteFunction,
)
from unidist.core.base.backend import Backend
class PythonBackend(Backend):
"""The class that implements the interface in ``Backend`` using Python."""
@staticmethod
def make_remote_function(function, num_cpus, num_returns, resources):
"""
Define ``PythonRemoteFunction``.
function : callable
Function to be ``PythonRemoteFunction``.
num_cpus : int
The number of CPUs to reserve for ``PythonRemoteFunction``.
num_returns : int
The number of ``ObjectRef``-s returned by the function invocation.
resources : dict
Custom resources to reserve for the function.
Returns
-------
PythonRemoteFunction
"""
return PythonRemoteFunction(function, num_cpus, num_returns, resources)
@staticmethod
def make_actor(cls, num_cpus, resources):
"""
Define an actor class.
cls : object
Class to be an actor class.
num_cpus : int
The number of CPUs to reserve for the lifetime of the actor.
resources : dict
Custom resources to reserve for the lifetime of the actor.
Returns
-------
PythonActor
The actor class type to create.
list
The list of arguments for ``PythonActor`` constructor.
"""
return PythonActor, [cls, num_cpus, resources]
@staticmethod
def get(data_ids):
"""
Get an object or a list of objects from object store.
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
``DataID`` or a list of ``DataID`` objects to get data from.
Returns
-------
object
A Python object or a list of Python objects.
"""
return py.get(data_ids)
@staticmethod
def put(data):
"""
Put `data` into object store.
Parameters
----------
data : object
Data to be put.
Returns
-------
unidist.core.backends.common.data_id.DataID
``DataID`` matching to data.
"""
return py.put(data)
@staticmethod
def wait(data_ids, num_returns=1):
"""
Wait until `data_ids` are finished.
This method returns two lists. The first list consists of
data IDs that correspond to objects that completed computations.
The second list corresponds to the rest of the data IDs.
Parameters
----------
object_refs : unidist.core.backends.common.data_id.DataID or list
``DataID`` or list of ``DataID``-s to be waited.
num_returns : int, default: 1
The number of ``DataID``-s that should be returned as ready.
Returns
-------
tuple
List of data IDs that are ready and list of the remaining data IDs.
Notes
-----
Method serves to maintain behavior compatibility between backends. All objects
completed computation before putting into an object storage for Python backend.
"""
return data_ids[:num_returns], data_ids[num_returns:]
@staticmethod
def get_ip():
"""
Get node IP address.
Returns
-------
str
Node IP address.
"""
hostname = socket.gethostname()
return socket.gethostbyname(hostname)
@staticmethod
def num_cpus():
"""
Get the number of CPUs used by the execution backend.
Returns
-------
int
"""
return 1
@staticmethod
def cluster_resources():
"""
Get resources of the cluster.
Returns
-------
dict
Dictionary with node info in the form `{"node_ip": {"CPU": x}}`.
"""
return {PythonBackend.get_ip(): {"CPU": PythonBackend.num_cpus()}}
@staticmethod
def is_initialized():
"""
Check if Python backend has already been initialized.
Returns
-------
bool
True or False.
"""
return py.is_initialized()
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Python backend core functionality."""
from .api import put, get, submit, init, is_initialized
__all__ = ["put", "get", "submit", "init", "is_initialized"]
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""High-level API of Python backend."""
from unidist.core.backends.common.data_id import DataID
from unidist.core.backends.python.core.object_store import ObjectStore
# The global variable is responsible for if Python backend has already been initialized
is_python_initialized = False
def init():
"""
Initialize an object storage.
Notes
-----
Run initialization of singleton object ``unidist.core.backends.python.core.object_store.ObjectStore``.
"""
ObjectStore.get_instance()
global is_python_initialized
if not is_python_initialized:
is_python_initialized = True
def is_initialized():
"""
Check if Python backend has already been initialized.
Returns
-------
bool
True or False.
"""
global is_python_initialized
return is_python_initialized
def put(data):
"""
Put data into object storage.
Parameters
----------
data : object
Data to be put.
Returns
-------
unidist.core.backends.common.data_id.DataID
An ID of object in object storage.
"""
return ObjectStore.get_instance().put(data)
def get(data_ids):
"""
Get object(s) associated with `data_ids` from the object storage.
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
ID(s) to object(s) to get data from.
Returns
-------
object
A Python object.
"""
return ObjectStore.get_instance().get(data_ids)
def submit(func, *args, num_returns=1, **kwargs):
"""
Execute function.
Parameters
----------
func : callable
Function to be executed.
*args : iterable
Positional arguments to be passed in the `func`.
num_returns : int, default: 1
Number of results to be returned from `func`.
**kwargs : dict
Keyword arguments to be passed in the `func`.
Returns
-------
unidist.core.backends.common.data_id.DataID, list or None
Type of returns depends on `num_returns` value:
* if `num_returns == 1`, ``DataID`` will be returned.
* if `num_returns > 1`, list of ``DataID``-s will be returned.
* if `num_returns == 0`, ``None`` will be returned.
"""
obj_store = ObjectStore.get_instance()
materialized_args = [
obj_store.get(arg) if isinstance(arg, DataID) else arg for arg in args
]
materialized_kwargs = {
key: obj_store.get(value) if isinstance(value, DataID) else value
for key, value in kwargs.items()
}
try:
result = func(*materialized_args, **materialized_kwargs)
except Exception as e:
result = [e] * num_returns if num_returns > 1 else e
if num_returns == 0:
data_ids = None
elif num_returns > 1:
data_ids = [obj_store.put(result[idx]) for idx in range(num_returns)]
else:
data_ids = obj_store.put(result)
return data_ids
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Object storage related functionality."""
from unidist.core.backends.common.data_id import DataID
class ObjectStore:
"""Class that stores objects and provides access to these."""
__instance = None
def __init__(self):
if ObjectStore.__instance is None:
self.store = dict()
def __repr__(self):
return f"Object store: {self.store}"
@classmethod
def get_instance(cls):
"""
Get instance of ``ObjectStore``.
Returns
-------
unidist.core.backends.python.core.object_store.ObjectStore
"""
if cls.__instance is None:
cls.__instance = ObjectStore()
return cls.__instance
def put(self, data, data_id=None):
"""
Put `data` to internal dictionary.
Parameters
----------
data : object
Data to be put.
data_id : unidist.core.backends.common.data_id.DataID, optional
An ID of data. If it isn't provided, will be created automatically.
Returns
-------
unidist.core.backends.common.data_id.DataID
An ID of object in internal dictionary.
"""
data_id = DataID() if data_id is None else data_id
self.store[data_id] = data
return data_id
def get(self, data_ids):
"""
Get object(s) associated with `data_ids` from the internal dictionary.
Parameters
----------
data_ids : unidist.core.backends.common.data_id.DataID or list
ID(s) of object(s) to get data from.
Returns
-------
object
A Python object.
"""
is_list = isinstance(data_ids, list)
if not is_list:
data_ids = [data_ids]
if not all(isinstance(data_id, DataID) for data_id in data_ids):
raise ValueError(
"`data_ids` must either be a data ID or a list of data IDs."
)
def check_exception(value):
if isinstance(value, Exception):
raise value
return value
values = [check_exception(self.store[data_id]) for data_id in data_ids]
return values if is_list else values[0]
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""An implementation of ``RemoteFunction`` interface using Python."""
import unidist.core.backends.python.core as py
from unidist.core.base.object_ref import ObjectRef
from unidist.core.base.remote_function import RemoteFunction
class PythonRemoteFunction(RemoteFunction):
"""
The class that implements the interface in ``RemoteFunction`` using Python.
Parameters
----------
function : callable
A function to be called.
num_cpus : int
The number of CPUs to reserve for the function.
num_returns : int
The number of ``ObjectRef``-s returned by the function invocation.
resources : dict
Custom resources to reserve for the function.
"""
def __init__(self, function, num_cpus, num_returns, resources):
self._remote_function = function
self._num_cpus = num_cpus
self._num_returns = 1 if num_returns is None else num_returns
self._resources = resources
def _remote(self, *args, num_cpus=None, num_returns=None, resources=None, **kwargs):
"""
Execute `self._remote_function`.
Parameters
----------
*args : iterable
Positional arguments to be passed in the `self._remote_function`.
num_cpus : int, optional
The number of CPUs to reserve for the function.
num_returns : int, optional
The number of ``ObjectRef``-s returned by the function invocation.
resources : dict, optional
Custom resources to reserve for the function.
**kwargs : dict
Keyword arguments to be passed in the `self._remote_function`.
Returns
-------
ObjectRef, list or None
Type of returns depends on `num_returns` value:
* if `num_returns == 1`, ``ObjectRef`` will be returned.
* if `num_returns > 1`, list of ``ObjectRef`` will be returned.
* if `num_returns == 0`, ``None`` will be returned.
"""
if num_cpus is not None or self._num_cpus is not None:
raise NotImplementedError("'num_cpus' is not supported by Python backend.")
if resources is not None or self._resources is not None:
raise NotImplementedError("'resources' is not supported by Python backend.")
if num_returns is None:
num_returns = self._num_returns
data_ids = py.submit(
self._remote_function, *args, num_returns=num_returns, **kwargs
)
if num_returns == 1:
return ObjectRef(data_ids)
elif num_returns > 1:
return [ObjectRef(data_id) for data_id in data_ids]
elif num_returns == 0:
return None
# Copyright (C) 2021-2022 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Utilities used to initialize Python execution backend."""
def initialize_python():
"""
Initialize the Python execution backend.
Notes
-----
All execution will happen sequentially.
"""
from unidist.core.backends.python.core import init
init()