unidist
Advanced tools
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| import unidist.core.backends.mpi.core.common as common | ||
| logger = common.get_logger("async_operations", "async_operations.log") | ||
| class AsyncOperations: | ||
| """ | ||
| Class that stores MPI async communication handlers. | ||
| Class holds a reference to sending data to prolong data lifetime during send operation. | ||
| """ | ||
| __instance = None | ||
| def __init__(self): | ||
| # I-prefixed mpi call handlers | ||
| self._send_async_handlers = [] | ||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get instance of ``AsyncOperations``. | ||
| Returns | ||
| ------- | ||
| AsyncOperations | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = AsyncOperations() | ||
| return cls.__instance | ||
| def extend(self, handlers_list): | ||
| """ | ||
| Extend internal list with `handler_list`. | ||
| Parameters | ||
| ---------- | ||
| handler_list : list | ||
| A list of pairs with handler and data reference. | ||
| """ | ||
| self._send_async_handlers.extend(handlers_list) | ||
| def check(self): | ||
| """Check all MPI async send requests readiness and remove a reference to sending data.""" | ||
| def is_ready(handler): | ||
| is_ready = handler.Test() | ||
| if is_ready: | ||
| logger.debug("CHECK ASYNC HANDLER {} - ready".format(handler)) | ||
| else: | ||
| logger.debug("CHECK ASYNC HANDLER {} - not ready".format(handler)) | ||
| return is_ready | ||
| # tup[0] - mpi async send handler object | ||
| self._send_async_handlers[:] = [ | ||
| tup for tup in self._send_async_handlers if not is_ready(tup[0]) | ||
| ] | ||
| def finish(self): | ||
| """Cancel all MPI async send requests.""" | ||
| for handler, data in self._send_async_handlers: | ||
| logger.debug("WAIT ASYNC HANDLER {}".format(handler)) | ||
| handler.Cancel() | ||
| handler.Wait() | ||
| self._send_async_handlers.clear() |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Python Multiprocessing backend functionality.""" |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Actor specific functionality using Python Multiprocessing backend.""" | ||
| import unidist.core.backends.pymp.core as pymp | ||
| from unidist.core.base.actor import Actor, ActorMethod | ||
| from unidist.core.base.object_ref import ObjectRef | ||
| class PyMpActorMethod(ActorMethod): | ||
| """ | ||
| The class implements the interface in ``ActorMethod`` using Python Multiprocessing backend. | ||
| Parameters | ||
| ---------- | ||
| cls : unidist.core.backends.pymp.core.Actor | ||
| An actor class from which method `method_name` will be called. | ||
| method_name : str | ||
| The name of the method to be called. | ||
| """ | ||
| def __init__(self, cls, method_name): | ||
| self._cls = cls | ||
| self._method_name = method_name | ||
| self._num_returns = 1 | ||
| def _remote(self, *args, num_returns=None, **kwargs): | ||
| """ | ||
| Execute `self._method_name` in a worker process. | ||
| Parameters | ||
| ---------- | ||
| *args : iterable | ||
| Positional arguments to be passed in the method. | ||
| num_returns : int, optional | ||
| Number of results to be returned. If it isn't | ||
| provided, `self._num_returns` will be used. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in the method. | ||
| Returns | ||
| ------- | ||
| ObjectRef, list or None | ||
| Type of returns depends on `num_returns` value: | ||
| * if `num_returns == 1`, ``ObjectRef`` will be returned. | ||
| * if `num_returns > 1`, list of ``ObjectRef``-s will be returned. | ||
| * if `num_returns == 0`, ``None`` will be returned. | ||
| """ | ||
| if num_returns is None: | ||
| num_returns = self._num_returns | ||
| class_method = getattr(self._cls, self._method_name) | ||
| data_ids = class_method.submit(*args, num_returns=num_returns, **kwargs) | ||
| if num_returns == 1: | ||
| return ObjectRef(data_ids) | ||
| elif num_returns > 1: | ||
| return [ObjectRef(data_id) for data_id in data_ids] | ||
| elif num_returns == 0: | ||
| return None | ||
| class PyMpActor(Actor): | ||
| """ | ||
| The class implements the interface in ``Actor`` using Python Multiprocessing backend. | ||
| Parameters | ||
| ---------- | ||
| cls : object | ||
| Class to be an actor class. | ||
| num_cpus : int | ||
| The number of CPUs to reserve for the lifetime of the actor. | ||
| resources : dict | ||
| Custom resources to reserve for the lifetime of the actor. | ||
| """ | ||
| def __init__(self, cls, num_cpus, resources): | ||
| self._cls = cls | ||
| self._num_cpus = num_cpus | ||
| self._resources = resources | ||
| self._actor_handle = None | ||
| def __getattr__(self, name): | ||
| """ | ||
| Get the attribute `name` of the `self._cls` class. | ||
| This methods creates the ``PyMpActorMethod`` object that is responsible | ||
| for calling method `name` of the `self._cls` class remotely. | ||
| Parameters | ||
| ---------- | ||
| name : str | ||
| Name of the method to be called remotely. | ||
| Returns | ||
| ------- | ||
| PyMpActorMethod | ||
| """ | ||
| return PyMpActorMethod(self._actor_handle, name) | ||
| def _remote(self, *args, num_cpus=None, resources=None, **kwargs): | ||
| """ | ||
| Create actor class, specific for Python Multiprocessing backend, from `self._cls`. | ||
| Parameters | ||
| ---------- | ||
| *args : iterable | ||
| Positional arguments to be passed in `self._cls` class constructor. | ||
| num_cpus : int, optional | ||
| The number of CPUs to reserve for the lifetime of the actor. | ||
| resources : dict, optional | ||
| Custom resources to reserve for the lifetime of the actor. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in `self._cls` class constructor. | ||
| Returns | ||
| ------- | ||
| PyMpActor | ||
| """ | ||
| if num_cpus is not None or self._num_cpus is not None: | ||
| raise NotImplementedError( | ||
| "'num_cpus' is not supported yet by Python Multiprocessing backend." | ||
| ) | ||
| if resources is not None or self._resources is not None: | ||
| raise NotImplementedError( | ||
| "'resources' is not supported yet by Python Multiprocessing backend." | ||
| ) | ||
| self._actor_handle = pymp.Actor(self._cls, *args, **kwargs) | ||
| return self |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """An implementation of ``Backend`` interface using Python Multiprocessing backend.""" | ||
| import socket | ||
| from unidist.config import CpuCount | ||
| import unidist.core.backends.pymp.core as mp | ||
| from unidist.core.backends.pymp.actor import PyMpActor | ||
| from unidist.core.backends.pymp.remote_function import ( | ||
| PyMpRemoteFunction, | ||
| ) | ||
| from unidist.core.base.backend import Backend | ||
| class PyMpBackend(Backend): | ||
| """The class that implements the interface in ``Backend`` using Python Multiprocessing backend.""" | ||
| @staticmethod | ||
| def make_remote_function(function, num_cpus, num_returns, resources): | ||
| """ | ||
| Define a remote function. | ||
| function : callable | ||
| Function to be a remote function. | ||
| num_cpus : int | ||
| The number of CPUs to reserve for the remote function. | ||
| num_returns : int | ||
| The number of ``ObjectRef``-s returned by the remote function invocation. | ||
| resources : dict | ||
| Custom resources to reserve for the remote function. | ||
| Returns | ||
| ------- | ||
| PyMpRemoteFunction | ||
| """ | ||
| return PyMpRemoteFunction(function, num_cpus, num_returns, resources) | ||
| @staticmethod | ||
| def make_actor(cls, num_cpus, resources): | ||
| """ | ||
| Define an actor class. | ||
| cls : object | ||
| Class to be an actor class. | ||
| num_cpus : int | ||
| The number of CPUs to reserve for the lifetime of the actor. | ||
| resources : dict | ||
| Custom resources to reserve for the lifetime of the actor. | ||
| Returns | ||
| ------- | ||
| PyMpActor | ||
| The actor class type to create. | ||
| list | ||
| The list of arguments for ``PyMpActor`` constructor. | ||
| """ | ||
| return PyMpActor, [cls, num_cpus, resources] | ||
| @staticmethod | ||
| def get(data_ids): | ||
| """ | ||
| Get a remote object or a list of remote objects | ||
| from distributed memory. | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| ``DataID`` or a list of ``DataID`` objects to get data from. | ||
| Returns | ||
| ------- | ||
| object | ||
| A Python object or a list of Python objects. | ||
| """ | ||
| return mp.get(data_ids) | ||
| @staticmethod | ||
| def put(data): | ||
| """ | ||
| Put `data` into distributed memory. | ||
| Parameters | ||
| ---------- | ||
| data : object | ||
| Data to be put. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID | ||
| ``DataID`` matching to data. | ||
| """ | ||
| return mp.put(data) | ||
| @staticmethod | ||
| def wait(data_ids, num_returns=1): | ||
| """ | ||
| Wait until `data_ids` are finished. | ||
| This method returns two lists. The first list consists of | ||
| data IDs that correspond to objects that completed computations. | ||
| The second list corresponds to the rest of the data IDs (which may or may not be ready). | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| ``DataID`` or list of ``DataID``-s to be waited. | ||
| num_returns : int, default: 1 | ||
| The number of ``DataID``-s that should be returned as ready. | ||
| Returns | ||
| ------- | ||
| tuple | ||
| List of data IDs that are ready and list of the remaining data IDs. | ||
| """ | ||
| return mp.wait(data_ids, num_returns=num_returns) | ||
| @staticmethod | ||
| def get_ip(): | ||
| """ | ||
| Get node IP address. | ||
| Returns | ||
| ------- | ||
| str | ||
| Node IP address. | ||
| """ | ||
| hostname = socket.gethostname() | ||
| return socket.gethostbyname(hostname) | ||
| @staticmethod | ||
| def num_cpus(): | ||
| """ | ||
| Get the number of CPUs used by the execution backend. | ||
| Returns | ||
| ------- | ||
| int | ||
| """ | ||
| return CpuCount.get() | ||
| @staticmethod | ||
| def cluster_resources(): | ||
| """ | ||
| Get resources of Multiprocessing cluster. | ||
| Returns | ||
| ------- | ||
| dict | ||
| Dictionary with node info in the form `{"node_ip": {"CPU": x}}`. | ||
| """ | ||
| return {PyMpBackend.get_ip(): {"CPU": PyMpBackend.num_cpus()}} | ||
| @staticmethod | ||
| def shutdown(): | ||
| """ | ||
| Shutdown Python Multiprocessing execution backend. | ||
| Note | ||
| ---- | ||
| Not supported yet. | ||
| """ | ||
| raise NotImplementedError( | ||
| "'shutdown' is not supported yet by Python Multiprocessing backend." | ||
| ) | ||
| @staticmethod | ||
| def is_initialized(): | ||
| """ | ||
| Check if Python Multiprocessing backend has already been initialized. | ||
| Returns | ||
| ------- | ||
| bool | ||
| True or False. | ||
| """ | ||
| return mp.is_initialized() |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Python Multiprocessing backend core functionality.""" | ||
| from .actor import Actor | ||
| from .api import put, wait, get, submit, init, is_initialized | ||
| __all__ = ["Actor", "put", "wait", "get", "submit", "init", "is_initialized"] |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Actor specific functionality implemented using Python Multiprocessing.""" | ||
| import cloudpickle as pkl | ||
| from multiprocessing.managers import BaseManager | ||
| from unidist.core.backends.pymp.core.object_store import ObjectStore, Delayed | ||
| from unidist.core.backends.pymp.core.process_manager import ( | ||
| ProcessManager, | ||
| Task, | ||
| ) | ||
| class ActorMethod: | ||
| """ | ||
| Class is responsible to execute `method_name` of | ||
| `cls_obj` in the separate worker-process of `actor` object. | ||
| Parameters | ||
| ---------- | ||
| cls_obj : multiprocessing.managers.BaseManager | ||
| Shared manager-class. | ||
| actor : Actor | ||
| Actor object. | ||
| method_name : str | ||
| The name of the method to be called. | ||
| obj_store : unidist.core.backends.pymp.core.object_store.ObjectStore | ||
| Object storage to share data between workers. | ||
| """ | ||
| def __init__(self, cls_obj, actor, method_name, obj_store): | ||
| self._cls_obj = cls_obj | ||
| self._method_name = method_name | ||
| self._actor = actor | ||
| self._obj_store = obj_store | ||
| def submit(self, *args, num_returns=1, **kwargs): | ||
| """ | ||
| Execute `self._method_name` asynchronously in the worker of `self._actor`. | ||
| Parameters | ||
| ---------- | ||
| *args : iterable | ||
| Positional arguments to be passed in the `self._method_name` method. | ||
| num_returns : int, default: 1 | ||
| Number of results to be returned from `self._method_name`. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in the `self._method_name` method. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID, list or None | ||
| Type of returns depends on `num_returns` value: | ||
| * if `num_returns == 1`, ``DataID`` will be returned. | ||
| * if `num_returns > 1`, list of ``DataID``-s will be returned. | ||
| * if `num_returns == 0`, ``None`` will be returned. | ||
| """ | ||
| if num_returns == 0: | ||
| data_ids = None | ||
| elif num_returns > 1: | ||
| data_ids = [self._obj_store.put(Delayed()) for _ in range(num_returns)] | ||
| else: | ||
| data_ids = self._obj_store.put(Delayed()) | ||
| cls_method = getattr(self._cls_obj, self._method_name) | ||
| task = Task(cls_method, data_ids, self._obj_store, *args, **kwargs) | ||
| self._actor.submit(task) | ||
| return data_ids | ||
| class Actor: | ||
| """ | ||
| Actor-class to execute methods of wrapped class in a separate worker. | ||
| Parameters | ||
| ---------- | ||
| cls : object | ||
| Class to be an actor class. | ||
| *args : iterable | ||
| Positional arguments to be passed in `cls` constructor. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in `cls` constructor. | ||
| Notes | ||
| ----- | ||
| Python multiprocessing manager-class will be created to wrap `cls`. | ||
| This makes `cls` class object shared between different workers. Manager-class | ||
| starts additional process to share class state between processes. | ||
| Methods of `cls` class object are executed in the worker, grabbed from a workers pool. | ||
| """ | ||
| def __init__(self, cls, *args, **kwargs): | ||
| self._worker = None | ||
| self._worker_id = None | ||
| self._obj_store = ObjectStore.get_instance() | ||
| # FIXME : Change "WrappedClass" -> cls.__name__ + "Manager", for example. | ||
| BaseManager.register("WrappedClass", cls) | ||
| manager = BaseManager() | ||
| manager.start() | ||
| self._cls_obj = manager.WrappedClass(*args, **kwargs) | ||
| self._worker, self._worker_id = ProcessManager.get_instance().grab_worker() | ||
| def __getattr__(self, name): | ||
| """ | ||
| Get the attribute `name` of the `self._cls_obj` class. | ||
| This methods creates the ``ActorMethod`` object that is responsible | ||
| for calling method `name` of the `self._cls_obj` class remotely. | ||
| Parameters | ||
| ---------- | ||
| name : str | ||
| Name of the method to be called remotely. | ||
| Returns | ||
| ------- | ||
| ActorMethod | ||
| """ | ||
| return ActorMethod(self._cls_obj, self, name, self._obj_store) | ||
| def submit(self, task): | ||
| """ | ||
| Execute `task` asynchronously in the worker grabbed by this actor. | ||
| Parameters | ||
| ---------- | ||
| task : unidist.core.backends.pymp.core.process_manager.Task | ||
| Task object holding callable function. | ||
| """ | ||
| self._worker.add_task(pkl.dumps(task)) | ||
| def __del__(self): | ||
| """ | ||
| Destructor of the actor. | ||
| Free worker, grabbed from the workers pool. | ||
| """ | ||
| if self._worker_id is not None: | ||
| ProcessManager.get_instance().free_worker(self._worker_id) |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """High-level API of Python Multiprocessing backend.""" | ||
| import cloudpickle as pkl | ||
| from unidist.config import CpuCount | ||
| from unidist.core.backends.pymp.core.object_store import ObjectStore, Delayed | ||
| from unidist.core.backends.pymp.core.process_manager import ( | ||
| ProcessManager, | ||
| Task, | ||
| ) | ||
| # The global variable is responsible for if Python Multiprocessing backend has already been initialized | ||
| is_multiprocessing_initialized = False | ||
| def init(num_workers=CpuCount.get()): | ||
| """ | ||
| Initialize shared object storage and workers pool. | ||
| Parameters | ||
| ---------- | ||
| num_workers : int, default: number of CPUs | ||
| Number of worker-processes to start. | ||
| Notes | ||
| ----- | ||
| Run initialization of singleton objects ``unidist.core.backends.pymp.core.object_store.ObjectStore`` | ||
| and ``unidist.core.backends.pymp.core.process_manager.ProcessManager``. | ||
| """ | ||
| ObjectStore.get_instance() | ||
| ProcessManager.get_instance(num_workers=num_workers) | ||
| global is_multiprocessing_initialized | ||
| if not is_multiprocessing_initialized: | ||
| is_multiprocessing_initialized = True | ||
| def is_initialized(): | ||
| """ | ||
| Check if Python Multiprocessing backend has already been initialized. | ||
| Returns | ||
| ------- | ||
| bool | ||
| True or False. | ||
| """ | ||
| global is_multiprocessing_initialized | ||
| return is_multiprocessing_initialized | ||
| def put(data): | ||
| """ | ||
| Put data into shared object storage. | ||
| Parameters | ||
| ---------- | ||
| data : object | ||
| Data to be put. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID | ||
| An ID of object in shared object storage. | ||
| """ | ||
| return ObjectStore.get_instance().put(data) | ||
| def get(data_ids): | ||
| """ | ||
| Get a object(s) associated with `data_ids` from the shared object storage. | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| An ID(s) to object(s) to get data from. | ||
| Returns | ||
| ------- | ||
| object | ||
| A Python object. | ||
| """ | ||
| return ObjectStore.get_instance().get(data_ids) | ||
| def wait(data_ids, num_returns=1): | ||
| """ | ||
| Wait until `data_ids` are finished. | ||
| This method returns two lists. The first list consists of | ||
| ``DataID``-s that correspond to objects that completed computations. | ||
| The second list corresponds to the rest of the ``DataID``-s (which may or may not be ready). | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| ``DataID`` or list of ``DataID``-s to be waited. | ||
| num_returns : int, default: 1 | ||
| The number of ``DataID``-s that should be returned as ready. | ||
| Returns | ||
| ------- | ||
| tuple | ||
| List of data IDs that are ready and list of the remaining data IDs. | ||
| """ | ||
| return ObjectStore.get_instance().wait(data_ids, num_returns=num_returns) | ||
| def submit(func, *args, num_returns=1, **kwargs): | ||
| """ | ||
| Execute function in a worker process. | ||
| Parameters | ||
| ---------- | ||
| func : callable | ||
| Function to be executed in the worker. | ||
| *args : iterable | ||
| Positional arguments to be passed in the `func`. | ||
| num_returns : int, default: 1 | ||
| Number of results to be returned from `func`. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in the `func`. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID, list or None | ||
| Type of returns depends on `num_returns` value: | ||
| * if `num_returns == 1`, ``DataID`` will be returned. | ||
| * if `num_returns > 1`, list of ``DataID``-s will be returned. | ||
| * if `num_returns == 0`, ``None`` will be returned. | ||
| """ | ||
| obj_store = ObjectStore.get_instance() | ||
| if num_returns == 0: | ||
| data_ids = None | ||
| elif num_returns > 1: | ||
| data_ids = [obj_store.put(Delayed()) for _ in range(num_returns)] | ||
| else: | ||
| data_ids = obj_store.put(Delayed()) | ||
| task = Task(func, data_ids, obj_store, *args, **kwargs) | ||
| ProcessManager.get_instance().submit(pkl.dumps(task)) | ||
| return data_ids |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Shared object storage related functionality.""" | ||
| import cloudpickle as pkl | ||
| from multiprocessing import Manager | ||
| from unidist.core.backends.common.data_id import DataID | ||
| class Delayed: | ||
| """Class-type that used for replacement objects during computation of those.""" | ||
| pass | ||
| class ObjectStore: | ||
| """ | ||
| Class that stores objects and provides access to these from different processes. | ||
| Notes | ||
| ----- | ||
| Shared storage is organized using ``multiprocessing.Manager.dict``. This is separate | ||
| process which starts work in the class constructor. | ||
| """ | ||
| __instance = None | ||
| def __init__(self): | ||
| if ObjectStore.__instance is None: | ||
| self.store_delayed = Manager().dict() | ||
| def __repr__(self): | ||
| return f"Object store: {self.store_delayed}" | ||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get instance of ``ObjectStore``. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.pymp.core.object_store.ObjectStore | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = ObjectStore() | ||
| return cls.__instance | ||
| def put(self, data, data_id=None): | ||
| """ | ||
| Put `data` to internal shared dictionary. | ||
| Parameters | ||
| ---------- | ||
| data : object | ||
| Data to be put. | ||
| data_id : unidist.core.backends.common.data_id.DataID, optional | ||
| An ID to data. If it isn't provided, will be created automatically. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID | ||
| An ID of object in internal shared dictionary. | ||
| """ | ||
| data_id = DataID() if data_id is None else data_id | ||
| self.store_delayed[data_id] = pkl.dumps(data) if callable(data) else data | ||
| return data_id | ||
| def get(self, data_ids): | ||
| """ | ||
| Get a object(s) associated with `data_ids` from the shared internal dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| An ID(s) of object(s) to get data from. | ||
| Returns | ||
| ------- | ||
| object | ||
| A Python object. | ||
| """ | ||
| is_list = isinstance(data_ids, list) | ||
| if not is_list: | ||
| data_ids = [data_ids] | ||
| if not all(isinstance(ref, DataID) for ref in data_ids): | ||
| raise ValueError( | ||
| "`data_ids` must either be a data ID or a list of data IDs." | ||
| ) | ||
| values = [None] * len(data_ids) | ||
| for idx, data_id in enumerate(data_ids): | ||
| while isinstance(self.store_delayed[data_id], Delayed): | ||
| pass | ||
| value = self.store_delayed[data_id] | ||
| if isinstance(value, Exception): | ||
| raise value | ||
| values[idx] = pkl.loads(value) if isinstance(value, bytes) else value | ||
| return values if is_list else values[0] | ||
| def wait(self, data_ids, num_returns=1): | ||
| """ | ||
| Wait until `data_ids` are finished. | ||
| This method returns two lists. The first list consists of | ||
| ``DataID``-s that correspond to objects that completed computations. | ||
| The second list corresponds to the rest of the ``DataID``-s (which may or may not be ready). | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| ``DataID`` or list of ``DataID``-s to be waited. | ||
| num_returns : int, default: 1 | ||
| The number of ``DataID``-s that should be returned as ready. | ||
| Returns | ||
| ------- | ||
| tuple | ||
| List of data IDs that are ready and list of the remaining data IDs. | ||
| """ | ||
| if not isinstance(data_ids, list): | ||
| data_ids = [data_ids] | ||
| ready = list() | ||
| not_ready = list() | ||
| for idx, data_id in enumerate(data_ids[:]): | ||
| if not isinstance(self.store_delayed[data_id], Delayed): | ||
| ready.append(data_ids.pop(idx)) | ||
| if len(ready) == num_returns: | ||
| break | ||
| not_ready = data_ids | ||
| while len(ready) != num_returns: | ||
| self.get(not_ready[0]) | ||
| ready.append(not_ready.pop(0)) | ||
| return ready, not_ready |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Workers related functionality.""" | ||
| import cloudpickle as pkl | ||
| from multiprocessing import ( | ||
| Process, | ||
| JoinableQueue, | ||
| ) | ||
| from unidist.config import CpuCount | ||
| from unidist.core.backends.common.data_id import DataID | ||
| from unidist.core.backends.pymp.core.object_store import ObjectStore | ||
| class Worker(Process): | ||
| """ | ||
| Class-process that executes tasks from `self.task_queue`. | ||
| Parameters | ||
| ---------- | ||
| task_queue : multiprocessing.JoinableQueue | ||
| A queue of task to execute. | ||
| obj_store : unidist.core.backends.pymp.core.object_store.ObjectStore | ||
| Shared object storage to read/write data. | ||
| """ | ||
| def __init__(self, task_queue, obj_store): | ||
| Process.__init__(self, daemon=True) | ||
| self.task_queue = task_queue | ||
| self._obj_store = obj_store | ||
| def run(self): | ||
| """Run main infinite loop of process to execute tasks from `self.task_queue`.""" | ||
| while 1: | ||
| task = self.task_queue.get() | ||
| task = pkl.loads(task) | ||
| data_ids = task.data_ids | ||
| try: | ||
| value = task() | ||
| except Exception as e: | ||
| if isinstance(data_ids, list) and len(data_ids) > 1: | ||
| for i, data_id in enumerate(data_ids): | ||
| self._obj_store.store_delayed[data_id] = e | ||
| else: | ||
| self._obj_store.store_delayed[data_ids] = e | ||
| else: | ||
| if data_ids is not None: | ||
| if isinstance(data_ids, list) and len(data_ids) > 1: | ||
| for data_id, val in zip(data_ids, value): | ||
| self._obj_store.store_delayed[data_id] = val | ||
| else: | ||
| self._obj_store.store_delayed[data_ids] = value | ||
| finally: | ||
| self.task_queue.task_done() | ||
| return | ||
| def add_task(self, task): | ||
| """ | ||
| Add `task` to `self.task_queue`. | ||
| Parameters | ||
| ---------- | ||
| task : unidist.core.backends.pymp.core.process_manager.Task | ||
| Task to be added in the queue. | ||
| """ | ||
| self.task_queue.put(task) | ||
| class ProcessManager: | ||
| """ | ||
| Class that controls worker pool and assings task to workers. | ||
| Parameters | ||
| ---------- | ||
| num_workers : int, optional | ||
| Number of worker-processes to start. If isn't provided, | ||
| will be equal to number of CPUs. | ||
| Notes | ||
| ----- | ||
| Constructor starts `num_workers` Multiprocessing Workers. | ||
| """ | ||
| __instance = None | ||
| def __init__(self, num_workers=None): | ||
| if ProcessManager.__instance is None: | ||
| if num_workers is None: | ||
| num_workers = CpuCount.get() | ||
| self.workers = [None] * num_workers | ||
| self.grabbed_workers = [None] * num_workers | ||
| self.__class__._worker_id = 0 | ||
| obj_store = ObjectStore.get_instance() | ||
| for idx in range(num_workers): | ||
| self.workers[idx] = Worker(JoinableQueue(), obj_store) | ||
| self.workers[idx].start() | ||
| self.grabbed_workers[idx] = False | ||
| @classmethod | ||
| def get_instance(cls, num_workers=None): | ||
| """ | ||
| Get instance of ``ProcessManager``. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.pymp.core.process_manager.ProcessManager | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = ProcessManager(num_workers=num_workers) | ||
| return cls.__instance | ||
| def _next(self): | ||
| """ | ||
| Get current worker index and move to another with incrementing by one. | ||
| Returns | ||
| ------- | ||
| int | ||
| """ | ||
| idx = self.__class__._worker_id | ||
| self.__class__._worker_id += 1 | ||
| if self.__class__._worker_id == len(self.workers): | ||
| self.__class__._worker_id = 0 | ||
| return idx | ||
| def grab_worker(self): | ||
| """ | ||
| Grab a worker from worker pool. | ||
| Grabbed worker is marked as `blocked` and doesn't participate | ||
| in the tasks submission. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.pymp.core.process_manager.Worker | ||
| Grabbed worker. | ||
| int | ||
| Index of grabbed worker. | ||
| """ | ||
| for idx, is_grabbed in enumerate(self.grabbed_workers): | ||
| if not is_grabbed: | ||
| self.grabbed_workers[idx] = True | ||
| return self.workers[idx], idx | ||
| raise RuntimeError("Actor can`t be run, no available workers.") | ||
| def free_worker(self, idx): | ||
| """ | ||
| Free worker by index `idx`. | ||
| Parameters | ||
| ---------- | ||
| idx : int | ||
| Index of worker to be freed. | ||
| """ | ||
| self.grabbed_workers[idx] = False | ||
| def submit(self, task): | ||
| """ | ||
| Add `task` to task queue of one of workers using round-robin. | ||
| Parameters | ||
| ---------- | ||
| task : unidist.core.backends.pymp.core.process_manager.Task | ||
| Task to be added in task queue. | ||
| """ | ||
| num_skipped = 0 | ||
| while num_skipped < len(self.workers): | ||
| idx = self._next() | ||
| if not self.grabbed_workers[idx]: | ||
| self.workers[idx].add_task(task) | ||
| return | ||
| else: | ||
| num_skipped += 1 | ||
| raise RuntimeError("Task can`t be run, no available workers.") | ||
| class Task: | ||
| """ | ||
| Class poses as unified callable object to execute in Multiprocessing Worker. | ||
| Parameters | ||
| ---------- | ||
| func : callable | ||
| A function to be called in object invocation. | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| ``DataID``-(s) associated with result(s) of `func` invocation. | ||
| obj_store : unidist.core.backends.pymp.core.object_store.ObjectStore | ||
| Object storage to share data between workers. | ||
| *args : iterable | ||
| Positional arguments to be passed in the `func`. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in the `func`. | ||
| """ | ||
| def __init__(self, func, data_ids, obj_store, *args, **kwargs): | ||
| self._func = func | ||
| self._args = args | ||
| self._kwargs = kwargs | ||
| self.data_ids = data_ids | ||
| self.obj_store = obj_store | ||
| def __call__(self): | ||
| """ | ||
| Execute `self._func`. | ||
| If `self._args`/`self._kwargs` has ``DataID`` objects, | ||
| automaterialize happens. | ||
| Returns | ||
| ------- | ||
| object | ||
| The result of `self._func` invocation. | ||
| """ | ||
| materialized_args = [ | ||
| self.obj_store.get(arg) if isinstance(arg, DataID) else arg | ||
| for arg in self._args | ||
| ] | ||
| materialized_kwargs = { | ||
| key: self.obj_store.get(value) if isinstance(value, DataID) else value | ||
| for key, value in self._kwargs.items() | ||
| } | ||
| return self._func(*materialized_args, **materialized_kwargs) |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """An implementation of ``RemoteFunction`` interface using Python Multiprocessing backend.""" | ||
| import unidist.core.backends.pymp.core as mp | ||
| from unidist.core.base.object_ref import ObjectRef | ||
| from unidist.core.base.remote_function import RemoteFunction | ||
| class PyMpRemoteFunction(RemoteFunction): | ||
| """ | ||
| The class that implements the interface in ``RemoteFunction`` using Python Multiprocessing backend. | ||
| Parameters | ||
| ---------- | ||
| function : callable | ||
| A function to be called remotely. | ||
| num_cpus : int | ||
| The number of CPUs to reserve for the remote function. | ||
| num_returns : int | ||
| The number of ``ObjectRef``-s returned by the remote function invocation. | ||
| resources : dict | ||
| Custom resources to reserve for the remote function. | ||
| """ | ||
| def __init__(self, function, num_cpus, num_returns, resources): | ||
| self._remote_function = function | ||
| self._num_cpus = num_cpus | ||
| self._num_returns = 1 if num_returns is None else num_returns | ||
| self._resources = resources | ||
| def _remote(self, *args, num_cpus=None, num_returns=None, resources=None, **kwargs): | ||
| """ | ||
| Execute `self._remote_function` in a worker process. | ||
| Parameters | ||
| ---------- | ||
| *args : iterable | ||
| Positional arguments to be passed in the `self._remote_function`. | ||
| num_cpus : int, optional | ||
| The number of CPUs to reserve for the remote function. | ||
| num_returns : int, optional | ||
| The number of ``ObjectRef``-s returned by the remote function invocation. | ||
| resources : dict, optional | ||
| Custom resources to reserve for the remote function. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in the `self._remote_function`. | ||
| Returns | ||
| ------- | ||
| ObjectRef, list or None | ||
| Type of returns depends on `num_returns` value: | ||
| * if `num_returns == 1`, ``ObjectRef`` will be returned. | ||
| * if `num_returns > 1`, list of ``ObjectRef``-s will be returned. | ||
| * if `num_returns == 0`, ``None`` will be returned. | ||
| """ | ||
| if num_cpus is not None or self._num_cpus is not None: | ||
| raise NotImplementedError( | ||
| "'num_cpus' is not supported yet by Python Multiprocessing backend." | ||
| ) | ||
| if resources is not None or self._resources is not None: | ||
| raise NotImplementedError( | ||
| "'resources' is not supported yet by Python Multiprocessing backend." | ||
| ) | ||
| if num_returns is None: | ||
| num_returns = self._num_returns | ||
| data_ids = mp.submit( | ||
| self._remote_function, *args, num_returns=num_returns, **kwargs | ||
| ) | ||
| if num_returns == 1: | ||
| return ObjectRef(data_ids) | ||
| elif num_returns > 1: | ||
| return [ObjectRef(data_id) for data_id in data_ids] | ||
| elif num_returns == 0: | ||
| return None |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Utilities used to initialize Python Multiprocessing execution backend.""" | ||
| from unidist.config import CpuCount | ||
| def initialize_pymp(): | ||
| """ | ||
| Initialize the Python Multiprocessing execution backend. | ||
| Notes | ||
| ----- | ||
| Number of workers for Python Multiprocessing is equal to number of CPUs used by the backend. | ||
| """ | ||
| from unidist.core.backends.pymp.core import init | ||
| init(num_workers=CpuCount.get()) |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Python Sequential backend functionality.""" |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Actor specific functionality using Python Sequential backend.""" | ||
| import unidist.core.backends.pyseq.core as py | ||
| from unidist.core.base.actor import Actor, ActorMethod | ||
| from unidist.core.base.object_ref import ObjectRef | ||
| class PySeqActorMethod(ActorMethod): | ||
| """ | ||
| The class implements the interface in ``ActorMethod`` using Python Sequential backend. | ||
| Parameters | ||
| ---------- | ||
| cls : object | ||
| An actor class from which method `method_name` will be called. | ||
| method_name : str | ||
| The name of the method to be called. | ||
| """ | ||
| def __init__(self, cls, method_name): | ||
| self._cls = cls | ||
| self._method_name = method_name | ||
| self._num_returns = 1 | ||
| def _remote(self, *args, num_returns=None, **kwargs): | ||
| """ | ||
| Execute `self._method_name`. | ||
| Parameters | ||
| ---------- | ||
| *args : iterable | ||
| Positional arguments to be passed in the method. | ||
| num_returns : int, optional | ||
| Number of results to be returned. If it isn't | ||
| provided, `self._num_returns` will be used. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in the method. | ||
| Returns | ||
| ------- | ||
| ObjectRef, list or None | ||
| Type of returns depends on `num_returns` value: | ||
| * if `num_returns == 1`, ``ObjectRef`` will be returned. | ||
| * if `num_returns > 1`, list of ``ObjectRef``-s will be returned. | ||
| * if `num_returns == 0`, ``None`` will be returned. | ||
| """ | ||
| if num_returns is None: | ||
| num_returns = self._num_returns | ||
| class_method = getattr(self._cls, self._method_name) | ||
| data_ids = py.submit(class_method, *args, num_returns=num_returns, **kwargs) | ||
| if num_returns == 1: | ||
| return ObjectRef(data_ids) | ||
| elif num_returns > 1: | ||
| return [ObjectRef(data_id) for data_id in data_ids] | ||
| elif num_returns == 0: | ||
| return None | ||
| class PySeqActor(Actor): | ||
| """ | ||
| The class implements the interface in ``Actor`` using Python Sequential backend. | ||
| Parameters | ||
| ---------- | ||
| cls : object | ||
| Class to be an actor class. | ||
| num_cpus : int | ||
| The number of CPUs to reserve for the lifetime of the actor. | ||
| resources : dict | ||
| Custom resources to reserve for the lifetime of the actor. | ||
| """ | ||
| def __init__(self, cls, num_cpus, resources): | ||
| self._cls = cls | ||
| self._num_cpus = num_cpus | ||
| self._resources = resources | ||
| self._actor_handle = None | ||
| def __getattr__(self, name): | ||
| """ | ||
| Get the attribute `name` of the `self._cls` class. | ||
| This method creates the ``PySeqActorMethod`` object that is responsible | ||
| for calling method `name` of the `self._cls` class. | ||
| Parameters | ||
| ---------- | ||
| name : str | ||
| Name of the method to be called. | ||
| Returns | ||
| ------- | ||
| PySeqActorMethod | ||
| """ | ||
| return PySeqActorMethod(self._actor_handle, name) | ||
| def _remote(self, *args, num_cpus=None, resources=None, **kwargs): | ||
| """ | ||
| Create actor class, specific for Python Sequential backend, from `self._cls`. | ||
| Parameters | ||
| ---------- | ||
| *args : iterable | ||
| Positional arguments to be passed in `self._cls` class constructor. | ||
| num_cpus : int, optional | ||
| The number of CPUs to reserve for the lifetime of the actor. | ||
| resources : dict, optional | ||
| Custom resources to reserve for the lifetime of the actor. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in `self._cls` class constructor. | ||
| Returns | ||
| ------- | ||
| PySeqActor | ||
| """ | ||
| if num_cpus is not None or self._num_cpus is not None: | ||
| raise NotImplementedError( | ||
| "'num_cpus' is not supported by Python Sequential backend." | ||
| ) | ||
| if resources is not None or self._resources is not None: | ||
| raise NotImplementedError( | ||
| "'resources' is not supported by Python Sequential backend." | ||
| ) | ||
| self._actor_handle = self._cls(*args, **kwargs) | ||
| return self |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """An implementation of ``Backend`` interface using Python Sequential backend.""" | ||
| import socket | ||
| import unidist.core.backends.pyseq.core as py | ||
| from unidist.core.backends.pyseq.actor import PySeqActor | ||
| from unidist.core.backends.pyseq.remote_function import ( | ||
| PySeqRemoteFunction, | ||
| ) | ||
| from unidist.core.base.backend import Backend | ||
| class PySeqBackend(Backend): | ||
| """The class that implements the interface in ``Backend`` using Python Sequential backend.""" | ||
| @staticmethod | ||
| def make_remote_function(function, num_cpus, num_returns, resources): | ||
| """ | ||
| Define ``PySeqRemoteFunction``. | ||
| function : callable | ||
| Function to be ``PySeqRemoteFunction``. | ||
| num_cpus : int | ||
| The number of CPUs to reserve for ``PySeqRemoteFunction``. | ||
| num_returns : int | ||
| The number of ``ObjectRef``-s returned by the function invocation. | ||
| resources : dict | ||
| Custom resources to reserve for the function. | ||
| Returns | ||
| ------- | ||
| PySeqRemoteFunction | ||
| """ | ||
| return PySeqRemoteFunction(function, num_cpus, num_returns, resources) | ||
| @staticmethod | ||
| def make_actor(cls, num_cpus, resources): | ||
| """ | ||
| Define an actor class. | ||
| cls : object | ||
| Class to be an actor class. | ||
| num_cpus : int | ||
| The number of CPUs to reserve for the lifetime of the actor. | ||
| resources : dict | ||
| Custom resources to reserve for the lifetime of the actor. | ||
| Returns | ||
| ------- | ||
| PySeqActor | ||
| The actor class type to create. | ||
| list | ||
| The list of arguments for ``PySeqActor`` constructor. | ||
| """ | ||
| return PySeqActor, [cls, num_cpus, resources] | ||
| @staticmethod | ||
| def get(data_ids): | ||
| """ | ||
| Get an object or a list of objects from object store. | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| ``DataID`` or a list of ``DataID`` objects to get data from. | ||
| Returns | ||
| ------- | ||
| object | ||
| A Python object or a list of Python objects. | ||
| """ | ||
| return py.get(data_ids) | ||
| @staticmethod | ||
| def put(data): | ||
| """ | ||
| Put `data` into object store. | ||
| Parameters | ||
| ---------- | ||
| data : object | ||
| Data to be put. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID | ||
| ``DataID`` matching to data. | ||
| """ | ||
| return py.put(data) | ||
| @staticmethod | ||
| def wait(data_ids, num_returns=1): | ||
| """ | ||
| Wait until `data_ids` are finished. | ||
| This method returns two lists. The first list consists of | ||
| data IDs that correspond to objects that completed computations. | ||
| The second list corresponds to the rest of the data IDs. | ||
| Parameters | ||
| ---------- | ||
| object_refs : unidist.core.backends.common.data_id.DataID or list | ||
| ``DataID`` or list of ``DataID``-s to be waited. | ||
| num_returns : int, default: 1 | ||
| The number of ``DataID``-s that should be returned as ready. | ||
| Returns | ||
| ------- | ||
| tuple | ||
| List of data IDs that are ready and list of the remaining data IDs. | ||
| Notes | ||
| ----- | ||
| Method serves to maintain behavior compatibility between backends. All objects | ||
| completed computation before putting into an object storage for Python Sequential backend. | ||
| """ | ||
| return data_ids[:num_returns], data_ids[num_returns:] | ||
| @staticmethod | ||
| def get_ip(): | ||
| """ | ||
| Get node IP address. | ||
| Returns | ||
| ------- | ||
| str | ||
| Node IP address. | ||
| """ | ||
| hostname = socket.gethostname() | ||
| return socket.gethostbyname(hostname) | ||
| @staticmethod | ||
| def num_cpus(): | ||
| """ | ||
| Get the number of CPUs used by the execution backend. | ||
| Returns | ||
| ------- | ||
| int | ||
| """ | ||
| return 1 | ||
| @staticmethod | ||
| def cluster_resources(): | ||
| """ | ||
| Get resources of the cluster. | ||
| Returns | ||
| ------- | ||
| dict | ||
| Dictionary with node info in the form `{"node_ip": {"CPU": x}}`. | ||
| """ | ||
| return {PySeqBackend.get_ip(): {"CPU": PySeqBackend.num_cpus()}} | ||
| @staticmethod | ||
| def is_initialized(): | ||
| """ | ||
| Check if Python Sequential backend has already been initialized. | ||
| Returns | ||
| ------- | ||
| bool | ||
| True or False. | ||
| """ | ||
| return py.is_initialized() |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Python Sequential backend core functionality.""" | ||
| from .api import put, get, submit, init, is_initialized | ||
| __all__ = ["put", "get", "submit", "init", "is_initialized"] |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """High-level API of Python Sequential backend.""" | ||
| from unidist.core.backends.common.data_id import DataID | ||
| from unidist.core.backends.pyseq.core.object_store import ObjectStore | ||
| # The global variable is responsible for if Python Sequential backend has already been initialized | ||
| is_python_initialized = False | ||
| def init(): | ||
| """ | ||
| Initialize an object storage. | ||
| Notes | ||
| ----- | ||
| Run initialization of singleton object ``unidist.core.backends.pyseq.core.object_store.ObjectStore``. | ||
| """ | ||
| ObjectStore.get_instance() | ||
| global is_python_initialized | ||
| if not is_python_initialized: | ||
| is_python_initialized = True | ||
| def is_initialized(): | ||
| """ | ||
| Check if Python Sequential backend has already been initialized. | ||
| Returns | ||
| ------- | ||
| bool | ||
| True or False. | ||
| """ | ||
| global is_python_initialized | ||
| return is_python_initialized | ||
| def put(data): | ||
| """ | ||
| Put data into object storage. | ||
| Parameters | ||
| ---------- | ||
| data : object | ||
| Data to be put. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID | ||
| An ID of object in object storage. | ||
| """ | ||
| return ObjectStore.get_instance().put(data) | ||
| def get(data_ids): | ||
| """ | ||
| Get object(s) associated with `data_ids` from the object storage. | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| ID(s) to object(s) to get data from. | ||
| Returns | ||
| ------- | ||
| object | ||
| A Python object. | ||
| """ | ||
| return ObjectStore.get_instance().get(data_ids) | ||
| def submit(func, *args, num_returns=1, **kwargs): | ||
| """ | ||
| Execute function. | ||
| Parameters | ||
| ---------- | ||
| func : callable | ||
| Function to be executed. | ||
| *args : iterable | ||
| Positional arguments to be passed in the `func`. | ||
| num_returns : int, default: 1 | ||
| Number of results to be returned from `func`. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in the `func`. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID, list or None | ||
| Type of returns depends on `num_returns` value: | ||
| * if `num_returns == 1`, ``DataID`` will be returned. | ||
| * if `num_returns > 1`, list of ``DataID``-s will be returned. | ||
| * if `num_returns == 0`, ``None`` will be returned. | ||
| """ | ||
| obj_store = ObjectStore.get_instance() | ||
| materialized_args = [ | ||
| obj_store.get(arg) if isinstance(arg, DataID) else arg for arg in args | ||
| ] | ||
| materialized_kwargs = { | ||
| key: obj_store.get(value) if isinstance(value, DataID) else value | ||
| for key, value in kwargs.items() | ||
| } | ||
| try: | ||
| result = func(*materialized_args, **materialized_kwargs) | ||
| except Exception as e: | ||
| result = [e] * num_returns if num_returns > 1 else e | ||
| if num_returns == 0: | ||
| data_ids = None | ||
| elif num_returns > 1: | ||
| data_ids = [obj_store.put(result[idx]) for idx in range(num_returns)] | ||
| else: | ||
| data_ids = obj_store.put(result) | ||
| return data_ids |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Object storage related functionality.""" | ||
| from unidist.core.backends.common.data_id import DataID | ||
| class ObjectStore: | ||
| """Class that stores objects and provides access to these.""" | ||
| __instance = None | ||
| def __init__(self): | ||
| if ObjectStore.__instance is None: | ||
| self.store = dict() | ||
| def __repr__(self): | ||
| return f"Object store: {self.store}" | ||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get instance of ``ObjectStore``. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.pyseq.core.object_store.ObjectStore | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = ObjectStore() | ||
| return cls.__instance | ||
| def put(self, data, data_id=None): | ||
| """ | ||
| Put `data` to internal dictionary. | ||
| Parameters | ||
| ---------- | ||
| data : object | ||
| Data to be put. | ||
| data_id : unidist.core.backends.common.data_id.DataID, optional | ||
| An ID of data. If it isn't provided, will be created automatically. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID | ||
| An ID of object in internal dictionary. | ||
| """ | ||
| data_id = DataID() if data_id is None else data_id | ||
| self.store[data_id] = data | ||
| return data_id | ||
| def get(self, data_ids): | ||
| """ | ||
| Get object(s) associated with `data_ids` from the internal dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| ID(s) of object(s) to get data from. | ||
| Returns | ||
| ------- | ||
| object | ||
| A Python object. | ||
| """ | ||
| is_list = isinstance(data_ids, list) | ||
| if not is_list: | ||
| data_ids = [data_ids] | ||
| if not all(isinstance(data_id, DataID) for data_id in data_ids): | ||
| raise ValueError( | ||
| "`data_ids` must either be a data ID or a list of data IDs." | ||
| ) | ||
| def check_exception(value): | ||
| if isinstance(value, Exception): | ||
| raise value | ||
| return value | ||
| values = [check_exception(self.store[data_id]) for data_id in data_ids] | ||
| return values if is_list else values[0] |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """An implementation of ``RemoteFunction`` interface using Python Sequential backend.""" | ||
| import unidist.core.backends.pyseq.core as py | ||
| from unidist.core.base.object_ref import ObjectRef | ||
| from unidist.core.base.remote_function import RemoteFunction | ||
| class PySeqRemoteFunction(RemoteFunction): | ||
| """ | ||
| The class that implements the interface in ``RemoteFunction`` using Python Sequential backend. | ||
| Parameters | ||
| ---------- | ||
| function : callable | ||
| A function to be called. | ||
| num_cpus : int | ||
| The number of CPUs to reserve for the function. | ||
| num_returns : int | ||
| The number of ``ObjectRef``-s returned by the function invocation. | ||
| resources : dict | ||
| Custom resources to reserve for the function. | ||
| """ | ||
| def __init__(self, function, num_cpus, num_returns, resources): | ||
| self._remote_function = function | ||
| self._num_cpus = num_cpus | ||
| self._num_returns = 1 if num_returns is None else num_returns | ||
| self._resources = resources | ||
| def _remote(self, *args, num_cpus=None, num_returns=None, resources=None, **kwargs): | ||
| """ | ||
| Execute `self._remote_function`. | ||
| Parameters | ||
| ---------- | ||
| *args : iterable | ||
| Positional arguments to be passed in the `self._remote_function`. | ||
| num_cpus : int, optional | ||
| The number of CPUs to reserve for the function. | ||
| num_returns : int, optional | ||
| The number of ``ObjectRef``-s returned by the function invocation. | ||
| resources : dict, optional | ||
| Custom resources to reserve for the function. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in the `self._remote_function`. | ||
| Returns | ||
| ------- | ||
| ObjectRef, list or None | ||
| Type of returns depends on `num_returns` value: | ||
| * if `num_returns == 1`, ``ObjectRef`` will be returned. | ||
| * if `num_returns > 1`, list of ``ObjectRef`` will be returned. | ||
| * if `num_returns == 0`, ``None`` will be returned. | ||
| """ | ||
| if num_cpus is not None or self._num_cpus is not None: | ||
| raise NotImplementedError( | ||
| "'num_cpus' is not supported by Python Sequential backend." | ||
| ) | ||
| if resources is not None or self._resources is not None: | ||
| raise NotImplementedError( | ||
| "'resources' is not supported by Python Sequential backend." | ||
| ) | ||
| if num_returns is None: | ||
| num_returns = self._num_returns | ||
| data_ids = py.submit( | ||
| self._remote_function, *args, num_returns=num_returns, **kwargs | ||
| ) | ||
| if num_returns == 1: | ||
| return ObjectRef(data_ids) | ||
| elif num_returns > 1: | ||
| return [ObjectRef(data_id) for data_id in data_ids] | ||
| elif num_returns == 0: | ||
| return None |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Utilities used to initialize Python Sequential execution backend.""" | ||
| def initialize_pyseq(): | ||
| """ | ||
| Initialize the Python Sequential execution backend. | ||
| Notes | ||
| ----- | ||
| All execution will happen sequentially. | ||
| """ | ||
| from unidist.core.backends.pyseq.core import init | ||
| init() |
+11
-3
| Metadata-Version: 2.1 | ||
| Name: unidist | ||
| Version: 0.2.2 | ||
| Version: 0.3.0 | ||
| Summary: Unified Distributed Execution | ||
@@ -22,4 +22,5 @@ Home-page: https://github.com/modin-project/unidist | ||
| <p align="center"> | ||
| <a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/workflows/master/badge.svg" align="center"></a> | ||
| <a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/0.2.2/"><img src="https://img.shields.io/badge/pypi-0.2.2-blue.svg" alt="PyPI version" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a> | ||
| </p> | ||
@@ -47,3 +48,3 @@ | ||
| ```bash | ||
| pip install unidist # Install unidist with dependencies for Multiprocessing and sequential Python backends | ||
| pip install unidist # Install unidist with dependencies for Python Multiprocessing and Python Sequential backends | ||
| ``` | ||
@@ -142,4 +143,11 @@ | ||
| ### Powered by unidist | ||
| unidist is meant to be used not only directly by users to get better performance in their workloads, | ||
| but also be a core component of other libraries to power those with the performant execution backends. | ||
| Refer to `Libraries powered by unidist` section of [Using Unidist](https://unidist.readthedocs.io/en/latest/using_unidist/index.html) page | ||
| to get more information on which libraries have already been using unidist. | ||
| ### Full Documentation | ||
| Visit the complete documentation on readthedocs: https://unidist.readthedocs.io. |
+10
-2
@@ -7,4 +7,5 @@ <p align="center"> | ||
| <p align="center"> | ||
| <a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/workflows/master/badge.svg" align="center"></a> | ||
| <a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/0.2.2/"><img src="https://img.shields.io/badge/pypi-0.2.2-blue.svg" alt="PyPI version" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a> | ||
| </p> | ||
@@ -32,3 +33,3 @@ | ||
| ```bash | ||
| pip install unidist # Install unidist with dependencies for Multiprocessing and sequential Python backends | ||
| pip install unidist # Install unidist with dependencies for Python Multiprocessing and Python Sequential backends | ||
| ``` | ||
@@ -127,4 +128,11 @@ | ||
| ### Powered by unidist | ||
| unidist is meant to be used not only directly by users to get better performance in their workloads, | ||
| but also be a core component of other libraries to power those with the performant execution backends. | ||
| Refer to `Libraries powered by unidist` section of [Using Unidist](https://unidist.readthedocs.io/en/latest/using_unidist/index.html) page | ||
| to get more information on which libraries have already been using unidist. | ||
| ### Full Documentation | ||
| Visit the complete documentation on readthedocs: https://unidist.readthedocs.io. |
+1
-1
@@ -6,3 +6,3 @@ import pathlib | ||
| ray_deps = ["ray[default]>=1.4.0"] | ||
| ray_deps = ["ray[default]>=1.13.0"] | ||
| dask_deps = ["dask[complete]>=2.22.0", "distributed>=2.22.0"] | ||
@@ -9,0 +9,0 @@ mpi_deps = ["mpi4py-mpich", "msgpack>=1.0.0"] |
| Metadata-Version: 2.1 | ||
| Name: unidist | ||
| Version: 0.2.2 | ||
| Version: 0.3.0 | ||
| Summary: Unified Distributed Execution | ||
@@ -22,4 +22,5 @@ Home-page: https://github.com/modin-project/unidist | ||
| <p align="center"> | ||
| <a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/workflows/master/badge.svg" align="center"></a> | ||
| <a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/0.2.2/"><img src="https://img.shields.io/badge/pypi-0.2.2-blue.svg" alt="PyPI version" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a> | ||
| </p> | ||
@@ -47,3 +48,3 @@ | ||
| ```bash | ||
| pip install unidist # Install unidist with dependencies for Multiprocessing and sequential Python backends | ||
| pip install unidist # Install unidist with dependencies for Python Multiprocessing and Python Sequential backends | ||
| ``` | ||
@@ -142,4 +143,11 @@ | ||
| ### Powered by unidist | ||
| unidist is meant to be used not only directly by users to get better performance in their workloads, | ||
| but also be a core component of other libraries to power those with the performant execution backends. | ||
| Refer to `Libraries powered by unidist` section of [Using Unidist](https://unidist.readthedocs.io/en/latest/using_unidist/index.html) page | ||
| to get more information on which libraries have already been using unidist. | ||
| ### Full Documentation | ||
| Visit the complete documentation on readthedocs: https://unidist.readthedocs.io. |
@@ -5,3 +5,3 @@ packaging | ||
| [all] | ||
| ray[default]>=1.4.0 | ||
| ray[default]>=1.13.0 | ||
| dask[complete]>=2.22.0 | ||
@@ -21,2 +21,2 @@ distributed>=2.22.0 | ||
| [ray] | ||
| ray[default]>=1.4.0 | ||
| ray[default]>=1.13.0 |
@@ -43,2 +43,3 @@ AUTHORS | ||
| unidist/core/backends/mpi/core/__init__.py | ||
| unidist/core/backends/mpi/core/async_operations.py | ||
| unidist/core/backends/mpi/core/common.py | ||
@@ -55,3 +56,2 @@ unidist/core/backends/mpi/core/communication.py | ||
| unidist/core/backends/mpi/core/worker/__init__.py | ||
| unidist/core/backends/mpi/core/worker/async_operations.py | ||
| unidist/core/backends/mpi/core/worker/loop.py | ||
@@ -61,20 +61,20 @@ unidist/core/backends/mpi/core/worker/object_store.py | ||
| unidist/core/backends/mpi/core/worker/task_store.py | ||
| unidist/core/backends/multiprocessing/__init__.py | ||
| unidist/core/backends/multiprocessing/actor.py | ||
| unidist/core/backends/multiprocessing/backend.py | ||
| unidist/core/backends/multiprocessing/remote_function.py | ||
| unidist/core/backends/multiprocessing/utils.py | ||
| unidist/core/backends/multiprocessing/core/__init__.py | ||
| unidist/core/backends/multiprocessing/core/actor.py | ||
| unidist/core/backends/multiprocessing/core/api.py | ||
| unidist/core/backends/multiprocessing/core/object_store.py | ||
| unidist/core/backends/multiprocessing/core/process_manager.py | ||
| unidist/core/backends/python/__init__.py | ||
| unidist/core/backends/python/actor.py | ||
| unidist/core/backends/python/backend.py | ||
| unidist/core/backends/python/remote_function.py | ||
| unidist/core/backends/python/utils.py | ||
| unidist/core/backends/python/core/__init__.py | ||
| unidist/core/backends/python/core/api.py | ||
| unidist/core/backends/python/core/object_store.py | ||
| unidist/core/backends/pymp/__init__.py | ||
| unidist/core/backends/pymp/actor.py | ||
| unidist/core/backends/pymp/backend.py | ||
| unidist/core/backends/pymp/remote_function.py | ||
| unidist/core/backends/pymp/utils.py | ||
| unidist/core/backends/pymp/core/__init__.py | ||
| unidist/core/backends/pymp/core/actor.py | ||
| unidist/core/backends/pymp/core/api.py | ||
| unidist/core/backends/pymp/core/object_store.py | ||
| unidist/core/backends/pymp/core/process_manager.py | ||
| unidist/core/backends/pyseq/__init__.py | ||
| unidist/core/backends/pyseq/actor.py | ||
| unidist/core/backends/pyseq/backend.py | ||
| unidist/core/backends/pyseq/remote_function.py | ||
| unidist/core/backends/pyseq/utils.py | ||
| unidist/core/backends/pyseq/core/__init__.py | ||
| unidist/core/backends/pyseq/core/api.py | ||
| unidist/core/backends/pyseq/core/object_store.py | ||
| unidist/core/backends/ray/__init__.py | ||
@@ -81,0 +81,0 @@ unidist/core/backends/ray/actor.py |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -11,7 +11,7 @@ | ||
| { | ||
| "date": "2023-01-16T15:56:02+0100", | ||
| "date": "2023-03-22T17:08:07+0100", | ||
| "dirty": false, | ||
| "error": null, | ||
| "full-revisionid": "c8b6fce4c2a9b646968b84e508e8d647dbd40ffb", | ||
| "version": "0.2.2" | ||
| "full-revisionid": "b133787a752b622e1443f3714d07cc98d47a66fb", | ||
| "version": "0.3.0" | ||
| } | ||
@@ -18,0 +18,0 @@ ''' # END VERSION_JSON |
+1
-1
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -16,3 +16,3 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| from .backends.dask import DaskMemoryLimit, IsDaskCluster, DaskSchedulerAddress | ||
| from .backends.mpi import IsMpiSpawnWorkers, MpiHosts | ||
| from .backends.mpi import IsMpiSpawnWorkers, MpiHosts, MpiPickleThreshold | ||
| from .parameter import ValueSource | ||
@@ -34,2 +34,3 @@ | ||
| "ValueSource", | ||
| "MpiPickleThreshold", | ||
| ] |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -21,4 +21,4 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| BackendName.MPI, | ||
| BackendName.MP, | ||
| BackendName.PY, | ||
| BackendName.PYMP, | ||
| BackendName.PYSEQ, | ||
| ) | ||
@@ -70,3 +70,3 @@ | ||
| return BackendName.MPI | ||
| return BackendName.MP | ||
| return BackendName.PYMP | ||
@@ -73,0 +73,0 @@ |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -7,4 +7,4 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| from .envvars import IsMpiSpawnWorkers, MpiHosts | ||
| from .envvars import IsMpiSpawnWorkers, MpiHosts, MpiPickleThreshold | ||
| __all__ = ["IsMpiSpawnWorkers", "MpiHosts"] | ||
| __all__ = ["IsMpiSpawnWorkers", "MpiHosts", "MpiPickleThreshold"] |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -21,1 +21,8 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| varname = "UNIDIST_MPI_HOSTS" | ||
| class MpiPickleThreshold(EnvironmentVariable, type=int): | ||
| """Minimum buffer size for serialization with pickle 5 protocol""" | ||
| default = 1024**2 // 4 # 0.25 MiB | ||
| varname = "UNIDIST_MPI_PICKLE_THRESHOLD" |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,3 +0,3 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -154,7 +154,9 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| """ | ||
| f_format = logging.Formatter("%(message)s") | ||
| f_handler = logging.FileHandler(file_name, delay=True) | ||
| f_handler.setFormatter(f_format) | ||
| logger = logging.getLogger(logger_name) | ||
| if not logger.hasHandlers(): | ||
| f_format = logging.Formatter("%(message)s") | ||
| f_handler = logging.FileHandler(file_name, delay=True) | ||
| f_handler.setFormatter(f_format) | ||
| logger.addHandler(f_handler) | ||
| logger = logging.getLogger(logger_name) | ||
| if activate: | ||
@@ -164,3 +166,2 @@ logger.setLevel(logging.DEBUG) | ||
| logger.setLevel(logging.NOTSET) | ||
| logger.addHandler(f_handler) | ||
@@ -167,0 +168,0 @@ return logger |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -27,2 +27,3 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| from mpi4py import MPI # noqa: E402 | ||
| from mpi4py.util import pkl5 # noqa: E402 | ||
@@ -174,2 +175,8 @@ | ||
| Target MPI process to transfer data. | ||
| Notes | ||
| ----- | ||
| This blocking send is used when we have to wait for completion of the communication, | ||
| which is necessary for the pipeline to continue, or when the receiver is waiting for a result. | ||
| Otherwise, use non-blocking ``mpi_isend_object``. | ||
| """ | ||
@@ -214,2 +221,8 @@ comm.send(data, dest=dest_rank) | ||
| Target MPI process to transfer buffer. | ||
| Notes | ||
| ----- | ||
| This blocking send is used when we have to wait for completion of the communication, | ||
| which is necessary for the pipeline to continue, or when the receiver is waiting for a result. | ||
| Otherwise, use non-blocking ``mpi_isend_buffer``. | ||
| """ | ||
@@ -242,3 +255,3 @@ comm.send(buffer_size, dest=dest_rank) | ||
| def mpi_isend_buffer(comm, data, dest_rank): | ||
| def mpi_isend_buffer(comm, buffer_size, buffer, dest_rank): | ||
| """ | ||
@@ -251,3 +264,5 @@ Send buffer object to another MPI rank in a non-blocking way. | ||
| MPI communicator object. | ||
| data : object | ||
| buffer_size : int | ||
| Buffer size in bytes. | ||
| buffer : object | ||
| Buffer object to send. | ||
@@ -262,3 +277,8 @@ dest_rank : int | ||
| """ | ||
| return comm.Isend([data, MPI.CHAR], dest=dest_rank) | ||
| requests = [] | ||
| h1 = comm.isend(buffer_size, dest=dest_rank) | ||
| requests.append((h1, None)) | ||
| h2 = comm.Isend([buffer, MPI.CHAR], dest=dest_rank) | ||
| requests.append((h2, buffer)) | ||
| return requests | ||
@@ -320,3 +340,3 @@ | ||
| def _send_complex_data_impl(comm, s_data, raw_buffers, len_buffers, dest_rank): | ||
| def _send_complex_data_impl(comm, s_data, raw_buffers, buffer_count, dest_rank): | ||
| """ | ||
@@ -333,20 +353,25 @@ Send already serialized complex data. | ||
| Pickle buffers list, out-of-band data collected with pickle 5 protocol. | ||
| len_buffers : list | ||
| Size of each buffer from `raw_buffers` list. | ||
| buffer_count : list | ||
| List of the number of buffers for each object | ||
| to be serialized/deserialized using the pickle 5 protocol. | ||
| See details in :py:class:`~unidist.core.backends.mpi.core.serialization.ComplexDataSerializer`. | ||
| dest_rank : int | ||
| Target MPI process to transfer data. | ||
| """ | ||
| # Send message pack bytestring | ||
| mpi_send_buffer(comm, len(s_data), s_data, dest_rank) | ||
| # Send the necessary metadata | ||
| mpi_send_object(comm, len(raw_buffers), dest_rank) | ||
| for raw_buffer in raw_buffers: | ||
| mpi_send_buffer(comm, len(raw_buffer.raw()), raw_buffer, dest_rank) | ||
| # TODO: do not send if raw_buffers is zero | ||
| mpi_send_object(comm, len_buffers, dest_rank) | ||
| info = { | ||
| "s_data_len": len(s_data), | ||
| "buffer_count": buffer_count, | ||
| "raw_buffers_len": [len(sbuf) for sbuf in raw_buffers], | ||
| } | ||
| comm.send(info, dest=dest_rank) | ||
| with pkl5._bigmpi as bigmpi: | ||
| comm.Send(bigmpi(s_data), dest=dest_rank) | ||
| for sbuf in raw_buffers: | ||
| comm.Send(bigmpi(sbuf), dest=dest_rank) | ||
| def send_complex_data(comm, data, dest_rank): | ||
| """ | ||
| Send the data that consists of different user provided complex types, lambdas and buffers. | ||
| Send the data that consists of different user provided complex types, lambdas and buffers in a blocking way. | ||
@@ -370,2 +395,8 @@ Parameters | ||
| A list of buffers amount for each object. | ||
| Notes | ||
| ----- | ||
| This blocking send is used when we have to wait for completion of the communication, | ||
| which is necessary for the pipeline to continue, or when the receiver is waiting for a result. | ||
| Otherwise, use non-blocking ``isend_complex_data``. | ||
| """ | ||
@@ -377,12 +408,12 @@ serializer = ComplexDataSerializer() | ||
| raw_buffers = serializer.buffers | ||
| len_buffers = serializer.len_buffers | ||
| buffer_count = serializer.buffer_count | ||
| # MPI comminucation | ||
| _send_complex_data_impl(comm, s_data, raw_buffers, len_buffers, dest_rank) | ||
| _send_complex_data_impl(comm, s_data, raw_buffers, buffer_count, dest_rank) | ||
| # For caching purpose | ||
| return s_data, raw_buffers, len_buffers | ||
| return s_data, raw_buffers, buffer_count | ||
| def _isend_complex_data_impl(comm, s_data, raw_buffers, len_buffers, dest_rank): | ||
| def _isend_complex_data_impl(comm, s_data, raw_buffers, buffer_count, dest_rank): | ||
| """ | ||
@@ -401,4 +432,6 @@ Send serialized complex data. | ||
| A list of pickle buffers. | ||
| len_buffers : list | ||
| A list of buffers amount for each object. | ||
| buffer_count : list | ||
| List of the number of buffers for each object | ||
| to be serialized/deserialized using the pickle 5 protocol. | ||
| See details in :py:class:`~unidist.core.backends.mpi.core.serialization.ComplexDataSerializer`. | ||
| dest_rank : int | ||
@@ -413,20 +446,17 @@ Target MPI process to transfer data. | ||
| handlers = [] | ||
| info = { | ||
| "s_data_len": len(s_data), | ||
| "buffer_count": buffer_count, | ||
| "raw_buffers_len": [len(sbuf) for sbuf in raw_buffers], | ||
| } | ||
| # Send message pack bytestring | ||
| h1 = mpi_isend_object(comm, len(s_data), dest_rank) | ||
| h2 = mpi_isend_buffer(comm, s_data, dest_rank) | ||
| h1 = comm.isend(info, dest=dest_rank) | ||
| handlers.append((h1, None)) | ||
| handlers.append((h2, s_data)) | ||
| # Send the necessary metadata | ||
| h3 = mpi_isend_object(comm, len(raw_buffers), dest_rank) | ||
| handlers.append((h3, None)) | ||
| for raw_buffer in raw_buffers: | ||
| h4 = mpi_isend_object(comm, len(raw_buffer.raw()), dest_rank) | ||
| h5 = mpi_isend_buffer(comm, raw_buffer, dest_rank) | ||
| handlers.append((h4, None)) | ||
| handlers.append((h5, raw_buffer)) | ||
| # TODO: do not send if raw_buffers is zero | ||
| h6 = mpi_isend_object(comm, len_buffers, dest_rank) | ||
| handlers.append((h6, len_buffers)) | ||
| with pkl5._bigmpi as bigmpi: | ||
| h2 = comm.Isend(bigmpi(s_data), dest=dest_rank) | ||
| handlers.append((h2, s_data)) | ||
| for sbuf in raw_buffers: | ||
| h_sbuf = comm.Isend(bigmpi(sbuf), dest=dest_rank) | ||
| handlers.append((h_sbuf, sbuf)) | ||
@@ -436,5 +466,5 @@ return handlers | ||
| def _isend_complex_data(comm, data, dest_rank): | ||
| def isend_complex_data(comm, data, dest_rank): | ||
| """ | ||
| Send the data that consists of different user provided complex types, lambdas and buffers. | ||
| Send the data that consists of different user provided complex types, lambdas and buffers in a non-blocking way. | ||
@@ -470,10 +500,10 @@ Non-blocking asynchronous interface. | ||
| raw_buffers = serializer.buffers | ||
| len_buffers = serializer.len_buffers | ||
| buffer_count = serializer.buffer_count | ||
| # Send message pack bytestring | ||
| handlers.extend( | ||
| _isend_complex_data_impl(comm, s_data, raw_buffers, len_buffers, dest_rank) | ||
| _isend_complex_data_impl(comm, s_data, raw_buffers, buffer_count, dest_rank) | ||
| ) | ||
| return handlers, s_data, raw_buffers, len_buffers | ||
| return handlers, s_data, raw_buffers, buffer_count | ||
@@ -502,20 +532,15 @@ | ||
| # in a long running data receive operations. | ||
| buf_size = mpi_busy_wait_recv(comm, source_rank) | ||
| msgpack_buffer = bytearray(buf_size) | ||
| comm.Recv([msgpack_buffer, MPI.CHAR], source=source_rank) | ||
| # Recv pickle buffers array for all complex data frames | ||
| raw_buffers_size = comm.recv(source=source_rank) | ||
| # Pre-allocate pickle buffers list | ||
| raw_buffers = [None] * raw_buffers_size | ||
| for i in range(raw_buffers_size): | ||
| buf_size = comm.recv(source=source_rank) | ||
| recv_buffer = bytearray(buf_size) | ||
| comm.Recv([recv_buffer, MPI.CHAR], source=source_rank) | ||
| raw_buffers[i] = recv_buffer | ||
| # Recv len of buffers for each complex data frames | ||
| len_buffers = comm.recv(source=source_rank) | ||
| info = comm.recv(source=source_rank) | ||
| msgpack_buffer = bytearray(info["s_data_len"]) | ||
| buffer_count = info["buffer_count"] | ||
| raw_buffers = list(map(bytearray, info["raw_buffers_len"])) | ||
| with pkl5._bigmpi as bigmpi: | ||
| comm.Recv(bigmpi(msgpack_buffer), source=source_rank) | ||
| for rbuf in raw_buffers: | ||
| comm.Recv(bigmpi(rbuf), source=source_rank) | ||
| # Set the necessary metadata for unpacking | ||
| deserializer = ComplexDataSerializer(raw_buffers, len_buffers) | ||
| deserializer = ComplexDataSerializer(raw_buffers, buffer_count) | ||
@@ -531,8 +556,6 @@ # Start unpacking | ||
| def send_complex_operation(comm, operation_type, operation_data, dest_rank): | ||
| def send_simple_operation(comm, operation_type, operation_data, dest_rank): | ||
| """ | ||
| Send operation and data that consist of different user provided complex types, lambdas and buffers. | ||
| Send an operation type and standard Python data types in a blocking way. | ||
| The data is serialized with ``unidist.core.backends.mpi.core.ComplexDataSerializer``. | ||
| Parameters | ||
@@ -542,3 +565,3 @@ ---------- | ||
| MPI communicator object. | ||
| operation_type : ``unidist.core.backends.mpi.core.common.Operation`` | ||
| operation_type : unidist.core.backends.mpi.core.common.Operation | ||
| Operation message type. | ||
@@ -549,12 +572,19 @@ operation_data : object | ||
| Target MPI process to transfer data. | ||
| Notes | ||
| ----- | ||
| * This blocking send is used when we have to wait for completion of the communication, | ||
| which is necessary for the pipeline to continue, or when the receiver is waiting for a result. | ||
| Otherwise, use non-blocking ``isend_simple_operation``. | ||
| * Serialization of the data to be sent takes place just using ``pickle.dump`` in this case. | ||
| """ | ||
| # Send operation type | ||
| comm.send(operation_type, dest=dest_rank) | ||
| # Send complex dictionary data | ||
| send_complex_data(comm, operation_data, dest_rank) | ||
| mpi_send_object(comm, operation_type, dest_rank) | ||
| # Send the details of a communication request | ||
| mpi_send_object(comm, operation_data, dest_rank) | ||
| def send_simple_operation(comm, operation_type, operation_data, dest_rank): | ||
| def isend_simple_operation(comm, operation_type, operation_data, dest_rank): | ||
| """ | ||
| Send an operation and standard Python data types. | ||
| Send an operation type and standard Python data types in a non-blocking way. | ||
@@ -572,10 +602,19 @@ Parameters | ||
| Returns | ||
| ------- | ||
| list | ||
| A list of pairs, ``MPI_Isend`` handler and associated data to send. | ||
| Notes | ||
| ----- | ||
| Serialization is a simple pickle.dump in this case. | ||
| Serialization of the data to be sent takes place just using ``pickle.dump`` in this case. | ||
| """ | ||
| # Send operation type | ||
| mpi_send_object(comm, operation_type, dest_rank) | ||
| # Send request details | ||
| mpi_send_object(comm, operation_data, dest_rank) | ||
| handlers = [] | ||
| h1 = mpi_isend_object(comm, operation_type, dest_rank) | ||
| handlers.append((h1, operation_type)) | ||
| # Send the details of a communication request | ||
| h2 = mpi_isend_object(comm, operation_data, dest_rank) | ||
| handlers.append((h2, operation_data)) | ||
| return handlers | ||
@@ -606,87 +645,2 @@ | ||
| def send_operation_data(comm, operation_data, dest_rank, is_serialized=False): | ||
| """ | ||
| Send data that consists of different user provided complex types, lambdas and buffers. | ||
| The data is serialized with ``unidist.core.backends.mpi.core.ComplexDataSerializer``. | ||
| Function works with already serialized data. | ||
| Parameters | ||
| ---------- | ||
| comm : object | ||
| MPI communicator object. | ||
| operation_data : object | ||
| Data object to send. | ||
| dest_rank : int | ||
| Target MPI process to transfer data. | ||
| is_serialized : bool | ||
| `operation_data` is already serialized or not. | ||
| Returns | ||
| ------- | ||
| dict or None | ||
| Serialization data for caching purpose or nothing. | ||
| Notes | ||
| ----- | ||
| Function returns ``None`` if `operation_data` is already serialized, | ||
| otherwise ``dict`` containing data serialized in this function. | ||
| """ | ||
| if is_serialized: | ||
| # Send already serialized data | ||
| s_data = operation_data["s_data"] | ||
| raw_buffers = operation_data["raw_buffers"] | ||
| len_buffers = operation_data["len_buffers"] | ||
| _send_complex_data_impl(comm, s_data, raw_buffers, len_buffers, dest_rank) | ||
| return None | ||
| else: | ||
| # Serialize and send the data | ||
| s_data, raw_buffers, len_buffers = send_complex_data( | ||
| comm, operation_data, dest_rank | ||
| ) | ||
| return { | ||
| "s_data": s_data, | ||
| "raw_buffers": raw_buffers, | ||
| "len_buffers": len_buffers, | ||
| } | ||
| def send_operation( | ||
| comm, operation_type, operation_data, dest_rank, is_serialized=False | ||
| ): | ||
| """ | ||
| Send operation and data that consists of different user provided complex types, lambdas and buffers. | ||
| The data is serialized with ``unidist.core.backends.mpi.core.ComplexDataSerializer``. | ||
| Function works with already serialized data. | ||
| Parameters | ||
| ---------- | ||
| comm : object | ||
| MPI communicator object. | ||
| operation_type : ``unidist.core.backends.mpi.core.common.Operation`` | ||
| Operation message type. | ||
| operation_data : object | ||
| Data object to send. | ||
| dest_rank : int | ||
| Target MPI process to transfer data. | ||
| is_serialized : bool | ||
| `operation_data` is already serialized or not. | ||
| Returns | ||
| ------- | ||
| dict or None | ||
| Serialization data for caching purpose. | ||
| Notes | ||
| ----- | ||
| Function returns ``None`` if `operation_data` is already serialized, | ||
| otherwise ``dict`` containing data serialized in this function. | ||
| """ | ||
| # Send operation type | ||
| mpi_send_object(comm, operation_type, dest_rank) | ||
| # Send operation data | ||
| return send_operation_data(comm, operation_data, dest_rank, is_serialized) | ||
| def isend_complex_operation( | ||
@@ -736,6 +690,6 @@ comm, operation_type, operation_data, dest_rank, is_serialized=False | ||
| raw_buffers = operation_data["raw_buffers"] | ||
| len_buffers = operation_data["len_buffers"] | ||
| buffer_count = operation_data["buffer_count"] | ||
| h2_list = _isend_complex_data_impl( | ||
| comm, s_data, raw_buffers, len_buffers, dest_rank | ||
| comm, s_data, raw_buffers, buffer_count, dest_rank | ||
| ) | ||
@@ -747,3 +701,3 @@ handlers.extend(h2_list) | ||
| # Serialize and send the data | ||
| h2_list, s_data, raw_buffers, len_buffers = _isend_complex_data( | ||
| h2_list, s_data, raw_buffers, buffer_count = isend_complex_data( | ||
| comm, operation_data, dest_rank | ||
@@ -755,29 +709,8 @@ ) | ||
| "raw_buffers": raw_buffers, | ||
| "len_buffers": len_buffers, | ||
| "buffer_count": buffer_count, | ||
| } | ||
| def send_remote_task_operation(comm, operation_type, operation_data, dest_rank): | ||
| def isend_serialized_operation(comm, operation_type, operation_data, dest_rank): | ||
| """ | ||
| Send operation and data that consist of different user provided complex types, lambdas and buffers. | ||
| Parameters | ||
| ---------- | ||
| comm : object | ||
| MPI communicator object. | ||
| operation_type : ``unidist.core.backends.mpi.core.common.Operation`` | ||
| Operation message type. | ||
| operation_data : object | ||
| Data object to send. | ||
| dest_rank : int | ||
| Target MPI process to transfer data. | ||
| """ | ||
| # Send operation type | ||
| mpi_send_object(comm, operation_type, dest_rank) | ||
| # Serialize and send the complex data | ||
| send_complex_data(comm, operation_data, dest_rank) | ||
| def send_serialized_operation(comm, operation_type, operation_data, dest_rank): | ||
| """ | ||
| Send operation and serialized simple data. | ||
@@ -795,7 +728,16 @@ | ||
| Target MPI process to transfer data. | ||
| Returns | ||
| ------- | ||
| list | ||
| A list of pairs, ``MPI_Isend`` handler and associated data to send. | ||
| """ | ||
| handlers = [] | ||
| # Send operation type | ||
| mpi_send_object(comm, operation_type, dest_rank) | ||
| # Send request details | ||
| mpi_send_buffer(comm, len(operation_data), operation_data, dest_rank) | ||
| h1 = mpi_isend_object(comm, operation_type, dest_rank) | ||
| handlers.append((h1, operation_type)) | ||
| # Send the details of a communication request | ||
| h2_list = mpi_isend_buffer(comm, len(operation_data), operation_data, dest_rank) | ||
| handlers.extend(h2_list) | ||
| return handlers | ||
@@ -802,0 +744,0 @@ |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -9,2 +9,3 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| import unidist.core.backends.mpi.core.communication as communication | ||
| from unidist.core.backends.mpi.core.async_operations import AsyncOperations | ||
| from unidist.core.backends.mpi.core.controller.object_store import object_store | ||
@@ -54,3 +55,4 @@ from unidist.core.backends.mpi.core.controller.garbage_collector import ( | ||
| } | ||
| communication.send_complex_operation( | ||
| async_operations = AsyncOperations.get_instance() | ||
| h_list, _ = communication.isend_complex_operation( | ||
| communication.MPIState.get_instance().comm, | ||
@@ -61,3 +63,3 @@ operation_type, | ||
| ) | ||
| async_operations.extend(h_list) | ||
| return output_id | ||
@@ -118,3 +120,4 @@ | ||
| } | ||
| communication.send_complex_operation( | ||
| async_operations = AsyncOperations.get_instance() | ||
| h_list, _ = communication.isend_complex_operation( | ||
| communication.MPIState.get_instance().comm, | ||
@@ -125,2 +128,3 @@ operation_type, | ||
| ) | ||
| async_operations.extend(h_list) | ||
@@ -127,0 +131,0 @@ def _serialization_helper(self): |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -31,2 +31,3 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| import unidist.core.backends.mpi.core.communication as communication | ||
| from unidist.core.backends.mpi.core.async_operations import AsyncOperations | ||
| from unidist.config import ( | ||
@@ -37,2 +38,3 @@ CpuCount, | ||
| ValueSource, | ||
| MpiPickleThreshold, | ||
| ) | ||
@@ -96,5 +98,7 @@ | ||
| if MpiHosts.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [f"cfg.MpiHosts.put({MpiHosts.get()})"] | ||
| py_str += [f"cfg.MpiHosts.put('{MpiHosts.get()}')"] | ||
| if CpuCount.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [f"cfg.CpuCount.put({CpuCount.get()})"] | ||
| if MpiPickleThreshold.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [f"cfg.MpiPickleThreshold.put({MpiPickleThreshold.get()})"] | ||
| py_str += ["unidist.init()"] | ||
@@ -184,4 +188,8 @@ py_str = "; ".join(py_str) | ||
| for rank_id in range(communication.MPIRank.MONITOR, mpi_state.world_size): | ||
| # We use a blocking send here because we have to wait for | ||
| # completion of the communication, which is necessary for the pipeline to continue. | ||
| communication.mpi_send_object(mpi_state.comm, common.Operation.CANCEL, rank_id) | ||
| logger.debug("Shutdown rank {}".format(rank_id)) | ||
| async_operations = AsyncOperations.get_instance() | ||
| async_operations.finish() | ||
| if not MPI.Is_finalized(): | ||
@@ -305,2 +313,4 @@ MPI.Finalize() | ||
| operation_data = {"id": data_id.base_data_id()} | ||
| # We use a blocking send here because we have to wait for | ||
| # completion of the communication, which is necessary for the pipeline to continue. | ||
| communication.send_simple_operation( | ||
@@ -396,3 +406,4 @@ mpi_state.comm, | ||
| } | ||
| communication.send_remote_task_operation( | ||
| async_operations = AsyncOperations.get_instance() | ||
| h_list, _ = communication.isend_complex_operation( | ||
| communication.MPIState.get_instance().comm, | ||
@@ -403,2 +414,3 @@ operation_type, | ||
| ) | ||
| async_operations.extend(h_list) | ||
@@ -405,0 +417,0 @@ # Track the task execution |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -12,2 +12,3 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| import unidist.core.backends.mpi.core.communication as communication | ||
| from unidist.core.backends.mpi.core.async_operations import AsyncOperations | ||
| from unidist.core.backends.mpi.core.controller.object_store import object_store | ||
@@ -148,2 +149,4 @@ | ||
| } | ||
| # We use a blocking send here because we have to wait for | ||
| # completion of the communication, which is necessary for the pipeline to continue. | ||
| communication.send_simple_operation( | ||
@@ -181,2 +184,3 @@ mpi_state.comm, | ||
| mpi_state = communication.MPIState.get_instance() | ||
| async_operations = AsyncOperations.get_instance() | ||
| # Push the local master data to the target worker directly | ||
@@ -186,3 +190,3 @@ operation_type = common.Operation.PUT_DATA | ||
| serialized_data = object_store.get_serialized_data(data_id) | ||
| communication.send_operation( | ||
| h_list, _ = communication.isend_complex_operation( | ||
| mpi_state.comm, | ||
@@ -199,3 +203,3 @@ operation_type, | ||
| } | ||
| serialized_data = communication.send_operation( | ||
| h_list, serialized_data = communication.isend_complex_operation( | ||
| mpi_state.comm, | ||
@@ -208,2 +212,3 @@ operation_type, | ||
| object_store.cache_serialized_data(data_id, serialized_data) | ||
| async_operations.extend(h_list) | ||
| # Remember pushed id | ||
@@ -229,3 +234,4 @@ object_store.cache_send_info(data_id, dest_rank) | ||
| } | ||
| communication.send_simple_operation( | ||
| async_operations = AsyncOperations.get_instance() | ||
| h_list = communication.isend_simple_operation( | ||
| communication.MPIState.get_instance().comm, | ||
@@ -236,2 +242,3 @@ operation_type, | ||
| ) | ||
| async_operations.extend(h_list) | ||
@@ -238,0 +245,0 @@ |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -11,2 +11,3 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| import unidist.core.backends.mpi.core.communication as communication | ||
| from unidist.core.backends.mpi.core.async_operations import AsyncOperations | ||
| from unidist.core.backends.mpi.core.serialization import SimpleDataSerializer | ||
@@ -65,9 +66,12 @@ from unidist.core.backends.mpi.core.controller.object_store import object_store | ||
| s_cleanup_list = SimpleDataSerializer().serialize_pickle(cleanup_list) | ||
| async_operations = AsyncOperations.get_instance() | ||
| for rank_id in range(initial_worker_number, mpi_state.world_size): | ||
| communication.send_serialized_operation( | ||
| mpi_state.comm, | ||
| common.Operation.CLEANUP, | ||
| s_cleanup_list, | ||
| rank_id, | ||
| ) | ||
| if rank_id != mpi_state.rank: | ||
| h_list = communication.isend_serialized_operation( | ||
| mpi_state.comm, | ||
| common.Operation.CLEANUP, | ||
| s_cleanup_list, | ||
| rank_id, | ||
| ) | ||
| async_operations.extend(h_list) | ||
@@ -110,5 +114,7 @@ def increment_task_counter(self): | ||
| ) | ||
| async_operations = AsyncOperations.get_instance() | ||
| # Check completion status of previous async MPI routines | ||
| async_operations.check() | ||
| if len(self._cleanup_list) > self._cleanup_list_threshold: | ||
| if self._cleanup_counter % self._cleanup_threshold == 0: | ||
| timestamp_snapshot = time.perf_counter() | ||
@@ -120,2 +126,4 @@ if (timestamp_snapshot - self._timestamp) > self._time_threshold: | ||
| # Compare submitted and executed tasks | ||
| # We use a blocking send here because we have to wait for | ||
| # completion of the communication, which is necessary for the pipeline to continue. | ||
| communication.mpi_send_object( | ||
@@ -122,0 +130,0 @@ mpi_state.comm, |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -16,2 +16,3 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| import unidist.core.backends.mpi.core.communication as communication | ||
| from unidist.core.backends.mpi.core.async_operations import AsyncOperations | ||
@@ -60,2 +61,3 @@ # TODO: Find a way to move this after all imports | ||
| mpi_state = communication.MPIState.get_instance() | ||
| async_operations = AsyncOperations.get_instance() | ||
@@ -70,2 +72,3 @@ while True: | ||
| elif operation_type == common.Operation.GET_TASK_COUNT: | ||
| # We use a blocking send here because the receiver is waiting for the result. | ||
| communication.mpi_send_object( | ||
@@ -77,2 +80,3 @@ mpi_state.comm, | ||
| elif operation_type == common.Operation.CANCEL: | ||
| async_operations.finish() | ||
| if not MPI.Is_finalized(): | ||
@@ -83,1 +87,4 @@ MPI.Finalize() | ||
| raise ValueError("Unsupported operation!") | ||
| # Check completion status of previous async MPI routines | ||
| async_operations.check() |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -11,2 +11,3 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| from collections.abc import KeysView | ||
| from mpi4py.MPI import memory | ||
@@ -27,2 +28,4 @@ # Serialization libraries | ||
| from unidist.config import MpiPickleThreshold | ||
| # Pickle 5 protocol compatible types check | ||
@@ -93,13 +96,23 @@ compatible_modules = ("pandas", "numpy") | ||
| A list of ``PickleBuffer`` objects for data decoding. | ||
| len_buffers : list, default: None | ||
| A list of buffer sizes for data decoding. | ||
| buffer_count : list, default: None | ||
| List of the number of buffers for each object | ||
| to be serialized/deserialized using the pickle 5 protocol. | ||
| Notes | ||
| ----- | ||
| Uses a combination of msgpack, cloudpickle and pickle libraries | ||
| Uses a combination of msgpack, cloudpickle and pickle libraries. | ||
| Msgpack allows to serialize/deserialize internal objects of a container separately, | ||
| but send them as one object. For example, for an array of pandas DataFrames, | ||
| each DataFrame will be serialized separately using pickle 5, | ||
| and all `buffers` will be stored in one array to be sent together. | ||
| To deserialize it `buffer_count` is used, which contains information | ||
| about the number of `buffers` for each internal object. | ||
| """ | ||
| def __init__(self, buffers=None, len_buffers=None): | ||
| # Minimum buffer size for serialization with pickle 5 protocol | ||
| PICKLE_THRESHOLD = MpiPickleThreshold.get() | ||
| def __init__(self, buffers=None, buffer_count=None): | ||
| self.buffers = buffers if buffers else [] | ||
| self.len_buffers = len_buffers if len_buffers else [] | ||
| self.buffer_count = buffer_count if buffer_count else [] | ||
| self._callback_counter = 0 | ||
@@ -116,5 +129,8 @@ | ||
| """ | ||
| self.buffers.append(pickle_buffer) | ||
| self._callback_counter += 1 | ||
| return False | ||
| pickle_buffer = memory(pickle_buffer) | ||
| if len(pickle_buffer) >= self.PICKLE_THRESHOLD: | ||
| self.buffers.append(pickle_buffer) | ||
| self._callback_counter += 1 | ||
| return False | ||
| return True | ||
@@ -136,3 +152,3 @@ def _dataframe_encode(self, frame): | ||
| s_frame = pkl.dumps(frame, protocol=5, buffer_callback=self._buffer_callback) | ||
| self.len_buffers.append(self._callback_counter) | ||
| self.buffer_count.append(self._callback_counter) | ||
| self._callback_counter = 0 | ||
@@ -223,3 +239,3 @@ return {"__pickle5_custom__": True, "as_bytes": s_frame} | ||
| frame = pkl.loads(obj["as_bytes"], buffers=self.buffers) | ||
| del self.buffers[: self.len_buffers.pop(0)] | ||
| del self.buffers[: self.buffer_count.pop(0)] | ||
| return frame | ||
@@ -226,0 +242,0 @@ else: |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -21,3 +21,3 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| from unidist.core.backends.mpi.core.worker.task_store import TaskStore | ||
| from unidist.core.backends.mpi.core.worker.async_operations import AsyncOperations | ||
| from unidist.core.backends.mpi.core.async_operations import AsyncOperations | ||
@@ -32,5 +32,5 @@ # TODO: Find a way to move this after all imports | ||
| # we use the condition to set "worker_0.log" in order to build it succesfully. | ||
| log_file = "worker_{}.log".format(mpi_state.rank if mpi_state is not None else 0) | ||
| w_logger = common.get_logger("worker", log_file) | ||
| logger_name = "worker_{}".format(mpi_state.rank if mpi_state is not None else 0) | ||
| log_file = "{}.log".format(logger_name) | ||
| w_logger = common.get_logger(logger_name, log_file) | ||
| # Actors map {handle : actor} | ||
@@ -111,2 +111,3 @@ actor_map = {} | ||
| request = communication.recv_simple_operation(mpi_state.comm, source_rank) | ||
| request["id"] = object_store.get_unique_data_id(request["id"]) | ||
| request_store.process_get_request( | ||
@@ -121,2 +122,3 @@ request["source"], request["id"], request["is_blocking_op"] | ||
| ) | ||
| request["id"] = object_store.get_unique_data_id(request["id"]) | ||
| object_store.put(request["id"], request["data"]) | ||
@@ -134,2 +136,3 @@ | ||
| request = communication.recv_simple_operation(mpi_state.comm, source_rank) | ||
| request["id"] = object_store.get_unique_data_id(request["id"]) | ||
| object_store.put_data_owner(request["id"], request["owner"]) | ||
@@ -146,2 +149,3 @@ | ||
| w_logger.debug("WAIT for {} id".format(request["id"]._id)) | ||
| request["id"] = object_store.get_unique_data_id(request["id"]) | ||
| request_store.process_wait_request(request["id"]) | ||
@@ -148,0 +152,0 @@ |
@@ -1,5 +0,6 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| import weakref | ||
| import unidist.core.backends.mpi.core.common as common | ||
@@ -9,11 +10,9 @@ import unidist.core.backends.mpi.core.communication as communication | ||
| mpi_state = communication.MPIState.get_instance() | ||
| # Logger configuration | ||
| # When building documentation we do not have MPI initialized so | ||
| # we use the condition to set "worker_0.log" in order to build it succesfully. | ||
| log_file = "worker_{}.log".format( | ||
| communication.MPIState.get_instance().rank | ||
| if communication.MPIState.get_instance() is not None | ||
| else 0 | ||
| ) | ||
| logger = common.get_logger("worker", log_file) | ||
| logger_name = "worker_{}".format(mpi_state.rank if mpi_state is not None else 0) | ||
| log_file = "{}.log".format(logger_name) | ||
| logger = common.get_logger(logger_name, log_file) | ||
@@ -34,5 +33,9 @@ | ||
| # Add local data {DataId : Data} | ||
| self._data_map = {} | ||
| self._data_map = weakref.WeakKeyDictionary() | ||
| # "strong" references to data IDs {DataId : DataId} | ||
| # we are using dict here to improve performance when getting an element from it, | ||
| # whereas other containers would require O(n) complexity | ||
| self._data_id_map = {} | ||
| # Data owner {DataId : Rank} | ||
| self._data_owner_map = {} | ||
| self._data_owner_map = weakref.WeakKeyDictionary() | ||
| # Data serialized cache | ||
@@ -144,5 +147,29 @@ self._serialization_cache = {} | ||
| def get_unique_data_id(self, data_id): | ||
| """ | ||
| Get the "strong" reference to the data ID if it is already stored locally. | ||
| If the passed data ID is not stored locally yet, save and return it. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID | ||
| The unique ID to data. | ||
| Notes | ||
| ----- | ||
| We need to use a unique data ID reference for the garbage colleactor to work correctly. | ||
| """ | ||
| if data_id not in self._data_id_map: | ||
| self._data_id_map[data_id] = data_id | ||
| return self._data_id_map[data_id] | ||
| def clear(self, cleanup_list): | ||
| """ | ||
| Clear all local dictionary data ID instances from `cleanup_list`. | ||
| Clear "strong" references to data IDs from `cleanup_list`. | ||
@@ -153,12 +180,10 @@ Parameters | ||
| List of data IDs. | ||
| Notes | ||
| ----- | ||
| The actual data will be collected later when there is no weak or | ||
| strong reference to data in the current worker. | ||
| """ | ||
| for data_id in cleanup_list: | ||
| if data_id in self._data_map: | ||
| logger.debug("CLEANUP DataMap id {}".format(data_id._id)) | ||
| del self._data_map[data_id] | ||
| if data_id in self._data_owner_map: | ||
| logger.debug("CLEANUP DataOwnerMap id {}".format(data_id._id)) | ||
| del self._data_owner_map[data_id] | ||
| if data_id in self._serialization_cache: | ||
| del self._serialization_cache[data_id] | ||
| self._data_id_map.pop(data_id, None) | ||
@@ -165,0 +190,0 @@ def cache_serialized_data(self, data_id, data): |
@@ -5,4 +5,4 @@ from collections import defaultdict | ||
| import unidist.core.backends.mpi.core.communication as communication | ||
| from unidist.core.backends.mpi.core.async_operations import AsyncOperations | ||
| from unidist.core.backends.mpi.core.worker.object_store import ObjectStore | ||
| from unidist.core.backends.mpi.core.worker.async_operations import AsyncOperations | ||
@@ -14,4 +14,5 @@ | ||
| # we use the condition to set "worker_0.log" in order to build it succesfully. | ||
| log_file = "worker_{}.log".format(mpi_state.rank if mpi_state is not None else 0) | ||
| logger = common.get_logger("worker", log_file) | ||
| logger_name = "worker_{}".format(mpi_state.rank if mpi_state is not None else 0) | ||
| log_file = "{}.log".format(logger_name) | ||
| logger = common.get_logger(logger_name, log_file) | ||
@@ -162,5 +163,8 @@ | ||
| if data_id in self._wait_request: | ||
| # Data is already in DataMap, so not problem here | ||
| communication.MPIState.get_instance().comm.send( | ||
| data_id, dest=communication.MPIRank.ROOT | ||
| # Data is already in DataMap, so not problem here. | ||
| # We use a blocking send here because the receiver is waiting for the result. | ||
| communication.mpi_send_object( | ||
| communication.MPIState.get_instance().comm, | ||
| data_id, | ||
| communication.MPIRank.ROOT, | ||
| ) | ||
@@ -170,4 +174,7 @@ del self._wait_request[data_id] | ||
| if data_ids in self._wait_request: | ||
| communication.MPIState.get_instance().comm.send( | ||
| data_ids, dest=communication.MPIRank.ROOT | ||
| # We use a blocking send here because the receiver is waiting for the result. | ||
| communication.mpi_send_object( | ||
| communication.MPIState.get_instance().comm, | ||
| data_ids, | ||
| communication.MPIRank.ROOT, | ||
| ) | ||
@@ -193,4 +200,7 @@ del self._wait_request[data_ids] | ||
| # Executor wait just for signal | ||
| communication.MPIState.get_instance().comm.send( | ||
| data_id, dest=communication.MPIRank.ROOT | ||
| # We use a blocking send here because the receiver is waiting for the result. | ||
| communication.mpi_send_object( | ||
| communication.MPIState.get_instance().comm, | ||
| data_id, | ||
| communication.MPIRank.ROOT, | ||
| ) | ||
@@ -230,2 +240,3 @@ logger.debug("Wait data {} id is ready".format(data_id._id)) | ||
| operation_data = object_store.get(data_id) | ||
| # We use a blocking send here because the receiver is waiting for the result. | ||
| communication.send_complex_data( | ||
@@ -232,0 +243,0 @@ mpi_state.comm, |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -13,15 +13,13 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| import unidist.core.backends.mpi.core.communication as communication | ||
| from unidist.core.backends.mpi.core.async_operations import AsyncOperations | ||
| from unidist.core.backends.mpi.core.worker.object_store import ObjectStore | ||
| from unidist.core.backends.mpi.core.worker.request_store import RequestStore | ||
| mpi_state = communication.MPIState.get_instance() | ||
| # Logger configuration | ||
| # When building documentation we do not have MPI initialized so | ||
| # we use the condition to set "worker_0.log" in order to build it succesfully. | ||
| log_file = "worker_{}.log".format( | ||
| communication.MPIState.get_instance().rank | ||
| if communication.MPIState.get_instance() | ||
| else 0 | ||
| ) | ||
| w_logger = common.get_logger("worker", log_file) | ||
| logger_name = "worker_{}".format(mpi_state.rank if mpi_state is not None else 0) | ||
| log_file = "{}.log".format(logger_name) | ||
| w_logger = common.get_logger(logger_name, log_file) | ||
@@ -140,3 +138,4 @@ | ||
| } | ||
| communication.send_simple_operation( | ||
| async_operations = AsyncOperations.get_instance() | ||
| h_list = communication.isend_simple_operation( | ||
| communication.MPIState.get_instance().comm, | ||
@@ -147,2 +146,3 @@ operation_type, | ||
| ) | ||
| async_operations.extend(h_list) | ||
@@ -174,10 +174,12 @@ # Save request in order to prevent massive communication during pending task checks | ||
| if is_data_id(arg): | ||
| if ObjectStore.get_instance().contains(arg): | ||
| object_store = ObjectStore.get_instance() | ||
| arg = object_store.get_unique_data_id(arg) | ||
| if object_store.contains(arg): | ||
| value = ObjectStore.get_instance().get(arg) | ||
| # Data is already local or was pushed from master | ||
| return value, False | ||
| elif ObjectStore.get_instance().contains_data_owner(arg): | ||
| elif object_store.contains_data_owner(arg): | ||
| if not RequestStore.get_instance().is_already_requested(arg): | ||
| # Request the data from an owner worker | ||
| owner_rank = ObjectStore.get_instance().get_data_owner(arg) | ||
| owner_rank = object_store.get_data_owner(arg) | ||
| if owner_rank != communication.MPIState.get_instance().rank: | ||
@@ -210,2 +212,3 @@ self.request_worker_data(owner_rank, arg) | ||
| """ | ||
| object_store = ObjectStore.get_instance() | ||
| if inspect.iscoroutinefunction(task): | ||
@@ -243,5 +246,7 @@ | ||
| for output_id in output_data_ids: | ||
| ObjectStore.get_instance().put(output_id, e) | ||
| data_id = object_store.get_unique_data_id(output_id) | ||
| object_store.put(data_id, e) | ||
| else: | ||
| ObjectStore.get_instance().put(output_data_ids, e) | ||
| data_id = object_store.get_unique_data_id(output_data_ids) | ||
| object_store.put(data_id, e) | ||
| else: | ||
@@ -254,7 +259,7 @@ if output_data_ids is not None: | ||
| for output_id, value in zip(output_data_ids, output_values): | ||
| ObjectStore.get_instance().put(output_id, value) | ||
| data_id = object_store.get_unique_data_id(output_id) | ||
| object_store.put(data_id, value) | ||
| else: | ||
| ObjectStore.get_instance().put( | ||
| output_data_ids, output_values | ||
| ) | ||
| data_id = object_store.get_unique_data_id(output_data_ids) | ||
| object_store.put(data_id, output_values) | ||
@@ -265,2 +270,4 @@ RequestStore.get_instance().check_pending_get_requests( | ||
| # Monitor the task execution | ||
| # We use a blocking send here because we have to wait for | ||
| # completion of the communication, which is necessary for the pipeline to continue. | ||
| communication.mpi_send_object( | ||
@@ -308,5 +315,7 @@ communication.MPIState.get_instance().comm, | ||
| for output_id in output_data_ids: | ||
| ObjectStore.get_instance().put(output_id, e) | ||
| data_id = object_store.get_unique_data_id(output_id) | ||
| object_store.put(data_id, e) | ||
| else: | ||
| ObjectStore.get_instance().put(output_data_ids, e) | ||
| data_id = object_store.get_unique_data_id(output_data_ids) | ||
| object_store.put(data_id, e) | ||
| else: | ||
@@ -319,6 +328,10 @@ if output_data_ids is not None: | ||
| for output_id, value in zip(output_data_ids, output_values): | ||
| ObjectStore.get_instance().put(output_id, value) | ||
| data_id = object_store.get_unique_data_id(output_id) | ||
| object_store.put(data_id, value) | ||
| else: | ||
| ObjectStore.get_instance().put(output_data_ids, output_values) | ||
| # Monitor the task execution | ||
| data_id = object_store.get_unique_data_id(output_data_ids) | ||
| object_store.put(data_id, output_values) | ||
| # Monitor the task execution. | ||
| # We use a blocking send here because we have to wait for | ||
| # completion of the communication, which is necessary for the pipeline to continue. | ||
| communication.mpi_send_object( | ||
@@ -325,0 +338,0 @@ communication.MPIState.get_instance().comm, |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -16,4 +16,4 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| DASK = "dask" | ||
| MP = "multiprocessing" | ||
| PY = "python" | ||
| PYMP = "pymp" | ||
| PYSEQ = "pyseq" | ||
@@ -20,0 +20,0 @@ |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -45,16 +45,16 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| backend_cls = MPIBackend() | ||
| elif backend_name == BackendName.MP: | ||
| from unidist.core.backends.multiprocessing.backend import MultiProcessingBackend | ||
| from unidist.core.backends.multiprocessing.utils import ( | ||
| initialize_multiprocessing, | ||
| elif backend_name == BackendName.PYMP: | ||
| from unidist.core.backends.pymp.backend import PyMpBackend | ||
| from unidist.core.backends.pymp.utils import ( | ||
| initialize_pymp, | ||
| ) | ||
| initialize_multiprocessing() | ||
| backend_cls = MultiProcessingBackend() | ||
| elif backend_name == BackendName.PY: | ||
| from unidist.core.backends.python.backend import PythonBackend | ||
| from unidist.core.backends.python.utils import initialize_python | ||
| initialize_pymp() | ||
| backend_cls = PyMpBackend() | ||
| elif backend_name == BackendName.PYSEQ: | ||
| from unidist.core.backends.pyseq.backend import PySeqBackend | ||
| from unidist.core.backends.pyseq.utils import initialize_pyseq | ||
| initialize_python() | ||
| backend_cls = PythonBackend() | ||
| initialize_pyseq() | ||
| backend_cls = PySeqBackend() | ||
| else: | ||
@@ -78,3 +78,2 @@ raise ImportError("Unrecognized execution backend.") | ||
| if backend is None: | ||
| backend_name = Backend.get() | ||
@@ -93,12 +92,12 @@ if backend_name == BackendName.RAY: | ||
| backend_cls = MPIBackend() | ||
| elif backend_name == BackendName.MP: | ||
| from unidist.core.backends.multiprocessing.backend import ( | ||
| MultiProcessingBackend, | ||
| elif backend_name == BackendName.PYMP: | ||
| from unidist.core.backends.pymp.backend import ( | ||
| PyMpBackend, | ||
| ) | ||
| backend_cls = MultiProcessingBackend() | ||
| elif backend_name == BackendName.PY: | ||
| from unidist.core.backends.python.backend import PythonBackend | ||
| backend_cls = PyMpBackend() | ||
| elif backend_name == BackendName.PYSEQ: | ||
| from unidist.core.backends.pyseq.backend import PySeqBackend | ||
| backend_cls = PythonBackend() | ||
| backend_cls = PySeqBackend() | ||
| else: | ||
@@ -105,0 +104,0 @@ raise ValueError("Unrecognized execution backend.") |
@@ -1,3 +0,3 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -29,3 +29,3 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| @pytest.mark.skipif( | ||
| sys.platform == "win32" and Backend.get() == BackendName.MP, | ||
| sys.platform == "win32" and Backend.get() == BackendName.PYMP, | ||
| reason="Details are in https://github.com/modin-project/unidist/issues/70.", | ||
@@ -40,3 +40,3 @@ ) | ||
| @pytest.mark.skipif( | ||
| sys.platform == "win32" and Backend.get() == BackendName.MP, | ||
| sys.platform == "win32" and Backend.get() == BackendName.PYMP, | ||
| reason="Details are in https://github.com/modin-project/unidist/issues/70.", | ||
@@ -51,3 +51,3 @@ ) | ||
| @pytest.mark.skipif( | ||
| sys.platform == "win32" and Backend.get() == BackendName.MP, | ||
| sys.platform == "win32" and Backend.get() == BackendName.PYMP, | ||
| reason="Details are in https://github.com/modin-project/unidist/issues/70.", | ||
@@ -63,7 +63,7 @@ ) | ||
| @pytest.mark.skipif( | ||
| Backend.get() == BackendName.MP, | ||
| reason="`multiprocessing` backend incorrectly frees grabbed actors. Details are in https://github.com/modin-project/unidist/issues/65.", | ||
| Backend.get() == BackendName.PYMP, | ||
| reason="`pymp` backend incorrectly frees grabbed actors. Details are in https://github.com/modin-project/unidist/issues/65.", | ||
| ) | ||
| @pytest.mark.skipif( | ||
| sys.platform == "win32" and Backend.get() == BackendName.MP, | ||
| sys.platform == "win32" and Backend.get() == BackendName.PYMP, | ||
| reason="Details are in https://github.com/modin-project/unidist/issues/70.", | ||
@@ -84,4 +84,4 @@ ) | ||
| @pytest.mark.skipif( | ||
| Backend.get() == BackendName.MP, | ||
| reason="Proper serialization/deserialization is not implemented yet for multiprocessing", | ||
| Backend.get() == BackendName.PYMP, | ||
| reason="Proper serialization/deserialization is not implemented yet for pymp", | ||
| ) | ||
@@ -102,4 +102,4 @@ def test_global_capture(): | ||
| @pytest.mark.skipif( | ||
| Backend.get() == BackendName.MP, | ||
| reason="Proper serialization/deserialization is not implemented yet for multiprocessing", | ||
| Backend.get() == BackendName.PYMP, | ||
| reason="Proper serialization/deserialization is not implemented yet for pymp", | ||
| ) | ||
@@ -120,3 +120,3 @@ def test_direct_capture(): | ||
| @pytest.mark.skipif( | ||
| Backend.get() == BackendName.MP, | ||
| Backend.get() == BackendName.PYMP, | ||
| reason="Details are in https://github.com/modin-project/unidist/issues/70.", | ||
@@ -130,4 +130,4 @@ ) | ||
| @pytest.mark.skipif( | ||
| Backend.get() == BackendName.MP, | ||
| reason="Run of a remote task inside of an actor method is not implemented yet for multiprocessing", | ||
| Backend.get() == BackendName.PYMP, | ||
| reason="Run of a remote task inside of an actor method is not implemented yet for pymp", | ||
| ) | ||
@@ -134,0 +134,0 @@ @pytest.mark.skipif( |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -30,7 +30,7 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| @pytest.mark.skipif( | ||
| Backend.get() in (BackendName.MP, BackendName.PY), | ||
| reason="MP and PY backends do not support execution of coroutines.", | ||
| Backend.get() in (BackendName.PYMP, BackendName.PYSEQ), | ||
| reason="`pymp` and `pyseq` backends do not support execution of coroutines.", | ||
| ) | ||
| @pytest.mark.skipif( | ||
| sys.platform == "win32" and Backend.get() == BackendName.MP, | ||
| sys.platform == "win32" and Backend.get() == BackendName.PYMP, | ||
| reason="Details are in https://github.com/modin-project/unidist/issues/70.", | ||
@@ -47,7 +47,7 @@ ) | ||
| @pytest.mark.skipif( | ||
| Backend.get() in (BackendName.MP, BackendName.PY), | ||
| reason="MP and PY backends do not support execution of coroutines.", | ||
| Backend.get() in (BackendName.PYMP, BackendName.PYSEQ), | ||
| reason="`pymp` and `pyseq` backends do not support execution of coroutines.", | ||
| ) | ||
| @pytest.mark.skipif( | ||
| sys.platform == "win32" and Backend.get() == BackendName.MP, | ||
| sys.platform == "win32" and Backend.get() == BackendName.PYMP, | ||
| reason="Details are in https://github.com/modin-project/unidist/issues/70.", | ||
@@ -62,7 +62,7 @@ ) | ||
| @pytest.mark.skipif( | ||
| Backend.get() in (BackendName.MP, BackendName.PY), | ||
| reason="MP and PY backends do not support execution of coroutines.", | ||
| Backend.get() in (BackendName.PYMP, BackendName.PYSEQ), | ||
| reason="`pymp` and `pyseq` backends do not support execution of coroutines.", | ||
| ) | ||
| @pytest.mark.skipif( | ||
| sys.platform == "win32" and Backend.get() == BackendName.MP, | ||
| sys.platform == "win32" and Backend.get() == BackendName.PYMP, | ||
| reason="Details are in https://github.com/modin-project/unidist/issues/70.", | ||
@@ -78,11 +78,11 @@ ) | ||
| @pytest.mark.skipif( | ||
| Backend.get() in (BackendName.MP, BackendName.PY), | ||
| reason="MP and PY backends do not support execution of coroutines.", | ||
| Backend.get() in (BackendName.PYMP, BackendName.PYSEQ), | ||
| reason="`pymp` and `pyseq` backends do not support execution of coroutines.", | ||
| ) | ||
| @pytest.mark.skipif( | ||
| Backend.get() == BackendName.MP, | ||
| reason="`multiprocessing` backend incorrectly frees grabbed actors. Details are in https://github.com/modin-project/unidist/issues/65.", | ||
| Backend.get() == BackendName.PYMP, | ||
| reason="`pymp` backend incorrectly frees grabbed actors. Details are in https://github.com/modin-project/unidist/issues/65.", | ||
| ) | ||
| @pytest.mark.skipif( | ||
| sys.platform == "win32" and Backend.get() == BackendName.MP, | ||
| sys.platform == "win32" and Backend.get() == BackendName.PYMP, | ||
| reason="Details are in https://github.com/modin-project/unidist/issues/70.", | ||
@@ -111,4 +111,4 @@ ) | ||
| @pytest.mark.skipif( | ||
| Backend.get() in (BackendName.MP, BackendName.PY), | ||
| reason="Proper serialization/deserialization is not implemented yet for multiprocessing and python", | ||
| Backend.get() in (BackendName.PYMP, BackendName.PYSEQ), | ||
| reason="Proper serialization/deserialization is not implemented yet for `pymp` and `pyseq`", | ||
| ) | ||
@@ -129,4 +129,4 @@ def test_global_capture(): | ||
| @pytest.mark.skipif( | ||
| Backend.get() in (BackendName.MP, BackendName.PY), | ||
| reason="Proper serialization/deserialization is not implemented yet for multiprocessing and python", | ||
| Backend.get() in (BackendName.PYMP, BackendName.PYSEQ), | ||
| reason="Proper serialization/deserialization is not implemented yet for `pymp` and `pyseq`", | ||
| ) | ||
@@ -147,3 +147,3 @@ def test_direct_capture(): | ||
| @pytest.mark.skipif( | ||
| Backend.get() in (BackendName.MP, BackendName.PY), | ||
| Backend.get() in (BackendName.PYMP, BackendName.PYSEQ), | ||
| reason="Details are in https://github.com/modin-project/unidist/issues/70.", | ||
@@ -157,4 +157,4 @@ ) | ||
| @pytest.mark.skipif( | ||
| Backend.get() in (BackendName.MP, BackendName.PY), | ||
| reason="MP and PY backends do not support execution of coroutines.", | ||
| Backend.get() in (BackendName.PYMP, BackendName.PYSEQ), | ||
| reason="`pymp` and `pyseq` backends do not support execution of coroutines.", | ||
| ) | ||
@@ -182,4 +182,4 @@ def test_pending_get(): | ||
| @pytest.mark.skipif( | ||
| Backend.get() == BackendName.MP, | ||
| reason="Run of a remote task inside of an async actor method is not implemented yet for multiprocessing", | ||
| Backend.get() == BackendName.PYMP, | ||
| reason="Run of a remote task inside of an async actor method is not implemented yet for `pymp`", | ||
| ) | ||
@@ -186,0 +186,0 @@ def test_signal_actor(): |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -18,7 +18,7 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| @pytest.mark.skipif( | ||
| Backend.get() == BackendName.MP, | ||
| reason="Hangs on `multiprocessing` backend. Details are in https://github.com/modin-project/unidist/issues/64.", | ||
| Backend.get() == BackendName.PYMP, | ||
| reason="Hangs on `pymp` backend. Details are in https://github.com/modin-project/unidist/issues/64.", | ||
| ) | ||
| @pytest.mark.skipif( | ||
| sys.platform == "win32" and Backend.get() == BackendName.MP, | ||
| sys.platform == "win32" and Backend.get() == BackendName.PYMP, | ||
| reason="Details are in https://github.com/modin-project/unidist/issues/70.", | ||
@@ -41,4 +41,4 @@ ) | ||
| @pytest.mark.skipif( | ||
| Backend.get() == BackendName.MP, | ||
| reason="Hangs on `multiprocessing` backend. Details are in https://github.com/modin-project/unidist/issues/64.", | ||
| Backend.get() == BackendName.PYMP, | ||
| reason="Hangs on `pymp` backend. Details are in https://github.com/modin-project/unidist/issues/64.", | ||
| ) | ||
@@ -71,3 +71,3 @@ def test_wait(): | ||
| def test_num_cpus(): | ||
| if Backend.get() == BackendName.PY: | ||
| if Backend.get() == BackendName.PYSEQ: | ||
| assert_equal(unidist.num_cpus(), 1) | ||
@@ -74,0 +74,0 @@ else: |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -88,4 +88,4 @@ # SPDX-License-Identifier: Apache-2.0 | ||
| @pytest.mark.skipif( | ||
| Backend.get() == BackendName.MP, | ||
| reason="Run of a remote task inside of another one is not implemented yet for multiprocessing", | ||
| Backend.get() == BackendName.PYMP, | ||
| reason="Run of a remote task inside of another one is not implemented yet for pymp", | ||
| ) | ||
@@ -102,4 +102,4 @@ def test_internal_remote(): | ||
| @pytest.mark.skipif( | ||
| Backend.get() == BackendName.MP, | ||
| reason="Serialization of `dict_keys` is not properly implemented yet for multiprocessing", | ||
| Backend.get() == BackendName.PYMP, | ||
| reason="Serialization of `dict_keys` is not properly implemented yet for pymp", | ||
| ) | ||
@@ -106,0 +106,0 @@ def test_serialize_dict_keys(): |
@@ -1,2 +0,2 @@ | ||
| # Copyright (C) 2021-2022 Modin authors | ||
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
@@ -3,0 +3,0 @@ # SPDX-License-Identifier: Apache-2.0 |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| import unidist.core.backends.mpi.core.common as common | ||
| import unidist.core.backends.mpi.core.communication as communication | ||
| # Logger configuration | ||
| # When building documentation we do not have MPI initialized so | ||
| # we use the condition to set "worker_0.log" in order to build it succesfully. | ||
| log_file = "worker_{}.log".format( | ||
| communication.MPIState.get_instance().rank | ||
| if communication.MPIState.get_instance() is not None | ||
| else 0 | ||
| ) | ||
| logger = common.get_logger("worker", log_file) | ||
| class AsyncOperations: | ||
| """ | ||
| Class that stores MPI async communication handlers. | ||
| Class holds a reference to sending data to prolong data lifetime during send operation. | ||
| """ | ||
| __instance = None | ||
| def __init__(self): | ||
| # I-prefixed mpi call handlers | ||
| self._send_async_handlers = [] | ||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get instance of ``AsyncOperations``. | ||
| Returns | ||
| ------- | ||
| AsyncOperations | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = AsyncOperations() | ||
| return cls.__instance | ||
| def extend(self, handlers_list): | ||
| """ | ||
| Extend internal list with `handler_list`. | ||
| Parameters | ||
| ---------- | ||
| handler_list : list | ||
| A list of pairs with handler and data reference. | ||
| """ | ||
| self._send_async_handlers.extend(handlers_list) | ||
| def check(self): | ||
| """Check all MPI async send requests readiness and remove a reference to sending data.""" | ||
| def is_ready(handler): | ||
| is_ready = handler.Test() | ||
| if is_ready: | ||
| logger.debug("CHECK ASYNC HANDLER {} - ready".format(handler)) | ||
| else: | ||
| logger.debug("CHECK ASYNC HANDLER {} - not ready".format(handler)) | ||
| return is_ready | ||
| # tup[0] - mpi async send handler object | ||
| self._send_async_handlers[:] = [ | ||
| tup for tup in self._send_async_handlers if not is_ready(tup[0]) | ||
| ] | ||
| def finish(self): | ||
| """Cancel all MPI async send requests.""" | ||
| for handler, data in self._send_async_handlers: | ||
| logger.debug("WAIT ASYNC HANDLER {}".format(handler)) | ||
| handler.Cancel() | ||
| handler.Wait() | ||
| self._send_async_handlers.clear() |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """MultiProcessing backend functionality.""" |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Actor specific functionality using MultiProcessing backend.""" | ||
| import unidist.core.backends.multiprocessing.core as mp | ||
| from unidist.core.base.actor import Actor, ActorMethod | ||
| from unidist.core.base.object_ref import ObjectRef | ||
| class MultiProcessingActorMethod(ActorMethod): | ||
| """ | ||
| The class implements the interface in ``ActorMethod`` using MultiProcessing backend. | ||
| Parameters | ||
| ---------- | ||
| cls : unidist.core.backends.multiprocessing.core.Actor | ||
| An actor class from which method `method_name` will be called. | ||
| method_name : str | ||
| The name of the method to be called. | ||
| """ | ||
| def __init__(self, cls, method_name): | ||
| self._cls = cls | ||
| self._method_name = method_name | ||
| self._num_returns = 1 | ||
| def _remote(self, *args, num_returns=None, **kwargs): | ||
| """ | ||
| Execute `self._method_name` in a worker process. | ||
| Parameters | ||
| ---------- | ||
| *args : iterable | ||
| Positional arguments to be passed in the method. | ||
| num_returns : int, optional | ||
| Number of results to be returned. If it isn't | ||
| provided, `self._num_returns` will be used. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in the method. | ||
| Returns | ||
| ------- | ||
| ObjectRef, list or None | ||
| Type of returns depends on `num_returns` value: | ||
| * if `num_returns == 1`, ``ObjectRef`` will be returned. | ||
| * if `num_returns > 1`, list of ``ObjectRef``-s will be returned. | ||
| * if `num_returns == 0`, ``None`` will be returned. | ||
| """ | ||
| if num_returns is None: | ||
| num_returns = self._num_returns | ||
| class_method = getattr(self._cls, self._method_name) | ||
| data_ids = class_method.submit(*args, num_returns=num_returns, **kwargs) | ||
| if num_returns == 1: | ||
| return ObjectRef(data_ids) | ||
| elif num_returns > 1: | ||
| return [ObjectRef(data_id) for data_id in data_ids] | ||
| elif num_returns == 0: | ||
| return None | ||
| class MultiProcessingActor(Actor): | ||
| """ | ||
| The class implements the interface in ``Actor`` using MultiProcessing backend. | ||
| Parameters | ||
| ---------- | ||
| cls : object | ||
| Class to be an actor class. | ||
| num_cpus : int | ||
| The number of CPUs to reserve for the lifetime of the actor. | ||
| resources : dict | ||
| Custom resources to reserve for the lifetime of the actor. | ||
| """ | ||
| def __init__(self, cls, num_cpus, resources): | ||
| self._cls = cls | ||
| self._num_cpus = num_cpus | ||
| self._resources = resources | ||
| self._actor_handle = None | ||
| def __getattr__(self, name): | ||
| """ | ||
| Get the attribute `name` of the `self._cls` class. | ||
| This methods creates the ``MultiProcessingActorMethod`` object that is responsible | ||
| for calling method `name` of the `self._cls` class remotely. | ||
| Parameters | ||
| ---------- | ||
| name : str | ||
| Name of the method to be called remotely. | ||
| Returns | ||
| ------- | ||
| MultiProcessingActorMethod | ||
| """ | ||
| return MultiProcessingActorMethod(self._actor_handle, name) | ||
| def _remote(self, *args, num_cpus=None, resources=None, **kwargs): | ||
| """ | ||
| Create actor class, specific for MultiProcessing backend, from `self._cls`. | ||
| Parameters | ||
| ---------- | ||
| *args : iterable | ||
| Positional arguments to be passed in `self._cls` class constructor. | ||
| num_cpus : int, optional | ||
| The number of CPUs to reserve for the lifetime of the actor. | ||
| resources : dict, optional | ||
| Custom resources to reserve for the lifetime of the actor. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in `self._cls` class constructor. | ||
| Returns | ||
| ------- | ||
| MultiProcessingActor | ||
| """ | ||
| if num_cpus is not None or self._num_cpus is not None: | ||
| raise NotImplementedError( | ||
| "'num_cpus' is not supported yet by MultiProcessing backend." | ||
| ) | ||
| if resources is not None or self._resources is not None: | ||
| raise NotImplementedError( | ||
| "'resources' is not supported yet by MultiProcessing backend." | ||
| ) | ||
| self._actor_handle = mp.Actor(self._cls, *args, **kwargs) | ||
| return self |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """An implementation of ``Backend`` interface using MultiProcessing.""" | ||
| import socket | ||
| from unidist.config import CpuCount | ||
| import unidist.core.backends.multiprocessing.core as mp | ||
| from unidist.core.backends.multiprocessing.actor import MultiProcessingActor | ||
| from unidist.core.backends.multiprocessing.remote_function import ( | ||
| MultiProcessingRemoteFunction, | ||
| ) | ||
| from unidist.core.base.backend import Backend | ||
| class MultiProcessingBackend(Backend): | ||
| """The class that implements the interface in ``Backend`` using MultiProcessing.""" | ||
| @staticmethod | ||
| def make_remote_function(function, num_cpus, num_returns, resources): | ||
| """ | ||
| Define a remote function. | ||
| function : callable | ||
| Function to be a remote function. | ||
| num_cpus : int | ||
| The number of CPUs to reserve for the remote function. | ||
| num_returns : int | ||
| The number of ``ObjectRef``-s returned by the remote function invocation. | ||
| resources : dict | ||
| Custom resources to reserve for the remote function. | ||
| Returns | ||
| ------- | ||
| MultiProcessingRemoteFunction | ||
| """ | ||
| return MultiProcessingRemoteFunction(function, num_cpus, num_returns, resources) | ||
| @staticmethod | ||
| def make_actor(cls, num_cpus, resources): | ||
| """ | ||
| Define an actor class. | ||
| cls : object | ||
| Class to be an actor class. | ||
| num_cpus : int | ||
| The number of CPUs to reserve for the lifetime of the actor. | ||
| resources : dict | ||
| Custom resources to reserve for the lifetime of the actor. | ||
| Returns | ||
| ------- | ||
| MultiProcessingActor | ||
| The actor class type to create. | ||
| list | ||
| The list of arguments for ``MultiProcessingActor`` constructor. | ||
| """ | ||
| return MultiProcessingActor, [cls, num_cpus, resources] | ||
| @staticmethod | ||
| def get(data_ids): | ||
| """ | ||
| Get a remote object or a list of remote objects | ||
| from distributed memory. | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| ``DataID`` or a list of ``DataID`` objects to get data from. | ||
| Returns | ||
| ------- | ||
| object | ||
| A Python object or a list of Python objects. | ||
| """ | ||
| return mp.get(data_ids) | ||
| @staticmethod | ||
| def put(data): | ||
| """ | ||
| Put `data` into distributed memory. | ||
| Parameters | ||
| ---------- | ||
| data : object | ||
| Data to be put. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID | ||
| ``DataID`` matching to data. | ||
| """ | ||
| return mp.put(data) | ||
| @staticmethod | ||
| def wait(data_ids, num_returns=1): | ||
| """ | ||
| Wait until `data_ids` are finished. | ||
| This method returns two lists. The first list consists of | ||
| data IDs that correspond to objects that completed computations. | ||
| The second list corresponds to the rest of the data IDs (which may or may not be ready). | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| ``DataID`` or list of ``DataID``-s to be waited. | ||
| num_returns : int, default: 1 | ||
| The number of ``DataID``-s that should be returned as ready. | ||
| Returns | ||
| ------- | ||
| tuple | ||
| List of data IDs that are ready and list of the remaining data IDs. | ||
| """ | ||
| return mp.wait(data_ids, num_returns=num_returns) | ||
| @staticmethod | ||
| def get_ip(): | ||
| """ | ||
| Get node IP address. | ||
| Returns | ||
| ------- | ||
| str | ||
| Node IP address. | ||
| """ | ||
| hostname = socket.gethostname() | ||
| return socket.gethostbyname(hostname) | ||
| @staticmethod | ||
| def num_cpus(): | ||
| """ | ||
| Get the number of CPUs used by the execution backend. | ||
| Returns | ||
| ------- | ||
| int | ||
| """ | ||
| return CpuCount.get() | ||
| @staticmethod | ||
| def cluster_resources(): | ||
| """ | ||
| Get resources of MultiProcessing cluster. | ||
| Returns | ||
| ------- | ||
| dict | ||
| Dictionary with node info in the form `{"node_ip": {"CPU": x}}`. | ||
| """ | ||
| return { | ||
| MultiProcessingBackend.get_ip(): {"CPU": MultiProcessingBackend.num_cpus()} | ||
| } | ||
| @staticmethod | ||
| def shutdown(): | ||
| """ | ||
| Shutdown MultiProcessing execution backend. | ||
| Note | ||
| ---- | ||
| Not supported yet. | ||
| """ | ||
| raise NotImplementedError( | ||
| "'shutdown' is not supported yet by MultiProcessing backend." | ||
| ) | ||
| @staticmethod | ||
| def is_initialized(): | ||
| """ | ||
| Check if Multiprocessing backend has already been initialized. | ||
| Returns | ||
| ------- | ||
| bool | ||
| True or False. | ||
| """ | ||
| return mp.is_initialized() |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """MultiProcessing backend core functionality.""" | ||
| from .actor import Actor | ||
| from .api import put, wait, get, submit, init, is_initialized | ||
| __all__ = ["Actor", "put", "wait", "get", "submit", "init", "is_initialized"] |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Actor specific functionality implemented using Python multiprocessing.""" | ||
| import cloudpickle as pkl | ||
| from multiprocessing.managers import BaseManager | ||
| from unidist.core.backends.multiprocessing.core.object_store import ObjectStore, Delayed | ||
| from unidist.core.backends.multiprocessing.core.process_manager import ( | ||
| ProcessManager, | ||
| Task, | ||
| ) | ||
| class ActorMethod: | ||
| """ | ||
| Class is responsible to execute `method_name` of | ||
| `cls_obj` in the separate worker-process of `actor` object. | ||
| Parameters | ||
| ---------- | ||
| cls_obj : multiprocessing.managers.BaseManager | ||
| Shared manager-class. | ||
| actor : Actor | ||
| Actor object. | ||
| method_name : str | ||
| The name of the method to be called. | ||
| obj_store : unidist.core.backends.multiprocessing.core.object_store.ObjectStore | ||
| Object storage to share data between workers. | ||
| """ | ||
| def __init__(self, cls_obj, actor, method_name, obj_store): | ||
| self._cls_obj = cls_obj | ||
| self._method_name = method_name | ||
| self._actor = actor | ||
| self._obj_store = obj_store | ||
| def submit(self, *args, num_returns=1, **kwargs): | ||
| """ | ||
| Execute `self._method_name` asynchronously in the worker of `self._actor`. | ||
| Parameters | ||
| ---------- | ||
| *args : iterable | ||
| Positional arguments to be passed in the `self._method_name` method. | ||
| num_returns : int, default: 1 | ||
| Number of results to be returned from `self._method_name`. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in the `self._method_name` method. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID, list or None | ||
| Type of returns depends on `num_returns` value: | ||
| * if `num_returns == 1`, ``DataID`` will be returned. | ||
| * if `num_returns > 1`, list of ``DataID``-s will be returned. | ||
| * if `num_returns == 0`, ``None`` will be returned. | ||
| """ | ||
| if num_returns == 0: | ||
| data_ids = None | ||
| elif num_returns > 1: | ||
| data_ids = [self._obj_store.put(Delayed()) for _ in range(num_returns)] | ||
| else: | ||
| data_ids = self._obj_store.put(Delayed()) | ||
| cls_method = getattr(self._cls_obj, self._method_name) | ||
| task = Task(cls_method, data_ids, self._obj_store, *args, **kwargs) | ||
| self._actor.submit(task) | ||
| return data_ids | ||
| class Actor: | ||
| """ | ||
| Actor-class to execute methods of wrapped class in a separate worker. | ||
| Parameters | ||
| ---------- | ||
| cls : object | ||
| Class to be an actor class. | ||
| *args : iterable | ||
| Positional arguments to be passed in `cls` constructor. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in `cls` constructor. | ||
| Notes | ||
| ----- | ||
| Python multiprocessing manager-class will be created to wrap `cls`. | ||
| This makes `cls` class object shared between different workers. Manager-class | ||
| starts additional process to share class state between processes. | ||
| Methods of `cls` class object are executed in the worker, grabbed from a workers pool. | ||
| """ | ||
| def __init__(self, cls, *args, **kwargs): | ||
| self._worker = None | ||
| self._worker_id = None | ||
| self._obj_store = ObjectStore.get_instance() | ||
| # FIXME : Change "WrappedClass" -> cls.__name__ + "Manager", for example. | ||
| BaseManager.register("WrappedClass", cls) | ||
| manager = BaseManager() | ||
| manager.start() | ||
| self._cls_obj = manager.WrappedClass(*args, **kwargs) | ||
| self._worker, self._worker_id = ProcessManager.get_instance().grab_worker() | ||
| def __getattr__(self, name): | ||
| """ | ||
| Get the attribute `name` of the `self._cls_obj` class. | ||
| This methods creates the ``ActorMethod`` object that is responsible | ||
| for calling method `name` of the `self._cls_obj` class remotely. | ||
| Parameters | ||
| ---------- | ||
| name : str | ||
| Name of the method to be called remotely. | ||
| Returns | ||
| ------- | ||
| ActorMethod | ||
| """ | ||
| return ActorMethod(self._cls_obj, self, name, self._obj_store) | ||
| def submit(self, task): | ||
| """ | ||
| Execute `task` asynchronously in the worker grabbed by this actor. | ||
| Parameters | ||
| ---------- | ||
| task : unidist.core.backends.multiprocessing.core.process_manager.Task | ||
| Task object holding callable function. | ||
| """ | ||
| self._worker.add_task(pkl.dumps(task)) | ||
| def __del__(self): | ||
| """ | ||
| Destructor of the actor. | ||
| Free worker, grabbed from the workers pool. | ||
| """ | ||
| if self._worker_id is not None: | ||
| ProcessManager.get_instance().free_worker(self._worker_id) |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """High-level API of MultiProcessing backend.""" | ||
| import cloudpickle as pkl | ||
| from unidist.config import CpuCount | ||
| from unidist.core.backends.multiprocessing.core.object_store import ObjectStore, Delayed | ||
| from unidist.core.backends.multiprocessing.core.process_manager import ( | ||
| ProcessManager, | ||
| Task, | ||
| ) | ||
| # The global variable is responsible for if Multiprocessing backend has already been initialized | ||
| is_multiprocessing_initialized = False | ||
| def init(num_workers=CpuCount.get()): | ||
| """ | ||
| Initialize shared object storage and workers pool. | ||
| Parameters | ||
| ---------- | ||
| num_workers : int, default: number of CPUs | ||
| Number of worker-processes to start. | ||
| Notes | ||
| ----- | ||
| Run initialization of singleton objects ``unidist.core.backends.multiprocessing.core.object_store.ObjectStore`` | ||
| and ``unidist.core.backends.multiprocessing.core.process_manager.ProcessManager``. | ||
| """ | ||
| ObjectStore.get_instance() | ||
| ProcessManager.get_instance(num_workers=num_workers) | ||
| global is_multiprocessing_initialized | ||
| if not is_multiprocessing_initialized: | ||
| is_multiprocessing_initialized = True | ||
| def is_initialized(): | ||
| """ | ||
| Check if Multiprocessing backend has already been initialized. | ||
| Returns | ||
| ------- | ||
| bool | ||
| True or False. | ||
| """ | ||
| global is_multiprocessing_initialized | ||
| return is_multiprocessing_initialized | ||
| def put(data): | ||
| """ | ||
| Put data into shared object storage. | ||
| Parameters | ||
| ---------- | ||
| data : object | ||
| Data to be put. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID | ||
| An ID of object in shared object storage. | ||
| """ | ||
| return ObjectStore.get_instance().put(data) | ||
| def get(data_ids): | ||
| """ | ||
| Get a object(s) associated with `data_ids` from the shared object storage. | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| An ID(s) to object(s) to get data from. | ||
| Returns | ||
| ------- | ||
| object | ||
| A Python object. | ||
| """ | ||
| return ObjectStore.get_instance().get(data_ids) | ||
| def wait(data_ids, num_returns=1): | ||
| """ | ||
| Wait until `data_ids` are finished. | ||
| This method returns two lists. The first list consists of | ||
| ``DataID``-s that correspond to objects that completed computations. | ||
| The second list corresponds to the rest of the ``DataID``-s (which may or may not be ready). | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| ``DataID`` or list of ``DataID``-s to be waited. | ||
| num_returns : int, default: 1 | ||
| The number of ``DataID``-s that should be returned as ready. | ||
| Returns | ||
| ------- | ||
| tuple | ||
| List of data IDs that are ready and list of the remaining data IDs. | ||
| """ | ||
| return ObjectStore.get_instance().wait(data_ids, num_returns=num_returns) | ||
| def submit(func, *args, num_returns=1, **kwargs): | ||
| """ | ||
| Execute function in a worker process. | ||
| Parameters | ||
| ---------- | ||
| func : callable | ||
| Function to be executed in the worker. | ||
| *args : iterable | ||
| Positional arguments to be passed in the `func`. | ||
| num_returns : int, default: 1 | ||
| Number of results to be returned from `func`. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in the `func`. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID, list or None | ||
| Type of returns depends on `num_returns` value: | ||
| * if `num_returns == 1`, ``DataID`` will be returned. | ||
| * if `num_returns > 1`, list of ``DataID``-s will be returned. | ||
| * if `num_returns == 0`, ``None`` will be returned. | ||
| """ | ||
| obj_store = ObjectStore.get_instance() | ||
| if num_returns == 0: | ||
| data_ids = None | ||
| elif num_returns > 1: | ||
| data_ids = [obj_store.put(Delayed()) for _ in range(num_returns)] | ||
| else: | ||
| data_ids = obj_store.put(Delayed()) | ||
| task = Task(func, data_ids, obj_store, *args, **kwargs) | ||
| ProcessManager.get_instance().submit(pkl.dumps(task)) | ||
| return data_ids |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Shared object storage related functionality.""" | ||
| import cloudpickle as pkl | ||
| from multiprocessing import Manager | ||
| from unidist.core.backends.common.data_id import DataID | ||
| class Delayed: | ||
| """Class-type that used for replacement objects during computation of those.""" | ||
| pass | ||
| class ObjectStore: | ||
| """ | ||
| Class that stores objects and provides access to these from different processes. | ||
| Notes | ||
| ----- | ||
| Shared storage is organized using ``multiprocessing.Manager.dict``. This is separate | ||
| process which starts work in the class constructor. | ||
| """ | ||
| __instance = None | ||
| def __init__(self): | ||
| if ObjectStore.__instance is None: | ||
| self.store_delayed = Manager().dict() | ||
| def __repr__(self): | ||
| return f"Object store: {self.store_delayed}" | ||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get instance of ``ObjectStore``. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.multiprocessing.core.object_store.ObjectStore | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = ObjectStore() | ||
| return cls.__instance | ||
| def put(self, data, data_id=None): | ||
| """ | ||
| Put `data` to internal shared dictionary. | ||
| Parameters | ||
| ---------- | ||
| data : object | ||
| Data to be put. | ||
| data_id : unidist.core.backends.common.data_id.DataID, optional | ||
| An ID to data. If it isn't provided, will be created automatically. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID | ||
| An ID of object in internal shared dictionary. | ||
| """ | ||
| data_id = DataID() if data_id is None else data_id | ||
| self.store_delayed[data_id] = pkl.dumps(data) if callable(data) else data | ||
| return data_id | ||
| def get(self, data_ids): | ||
| """ | ||
| Get a object(s) associated with `data_ids` from the shared internal dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| An ID(s) of object(s) to get data from. | ||
| Returns | ||
| ------- | ||
| object | ||
| A Python object. | ||
| """ | ||
| is_list = isinstance(data_ids, list) | ||
| if not is_list: | ||
| data_ids = [data_ids] | ||
| if not all(isinstance(ref, DataID) for ref in data_ids): | ||
| raise ValueError( | ||
| "`data_ids` must either be a data ID or a list of data IDs." | ||
| ) | ||
| values = [None] * len(data_ids) | ||
| for idx, data_id in enumerate(data_ids): | ||
| while isinstance(self.store_delayed[data_id], Delayed): | ||
| pass | ||
| value = self.store_delayed[data_id] | ||
| if isinstance(value, Exception): | ||
| raise value | ||
| values[idx] = pkl.loads(value) if isinstance(value, bytes) else value | ||
| return values if is_list else values[0] | ||
| def wait(self, data_ids, num_returns=1): | ||
| """ | ||
| Wait until `data_ids` are finished. | ||
| This method returns two lists. The first list consists of | ||
| ``DataID``-s that correspond to objects that completed computations. | ||
| The second list corresponds to the rest of the ``DataID``-s (which may or may not be ready). | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| ``DataID`` or list of ``DataID``-s to be waited. | ||
| num_returns : int, default: 1 | ||
| The number of ``DataID``-s that should be returned as ready. | ||
| Returns | ||
| ------- | ||
| tuple | ||
| List of data IDs that are ready and list of the remaining data IDs. | ||
| """ | ||
| if not isinstance(data_ids, list): | ||
| data_ids = [data_ids] | ||
| ready = list() | ||
| not_ready = list() | ||
| for idx, data_id in enumerate(data_ids[:]): | ||
| if not isinstance(self.store_delayed[data_id], Delayed): | ||
| ready.append(data_ids.pop(idx)) | ||
| if len(ready) == num_returns: | ||
| break | ||
| not_ready = data_ids | ||
| while len(ready) != num_returns: | ||
| self.get(not_ready[0]) | ||
| ready.append(not_ready.pop(0)) | ||
| return ready, not_ready |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Workers related functionality.""" | ||
| import cloudpickle as pkl | ||
| from multiprocessing import ( | ||
| Process, | ||
| JoinableQueue, | ||
| ) | ||
| from unidist.config import CpuCount | ||
| from unidist.core.backends.common.data_id import DataID | ||
| from unidist.core.backends.multiprocessing.core.object_store import ObjectStore | ||
| class Worker(Process): | ||
| """ | ||
| Class-process that executes tasks from `self.task_queue`. | ||
| Parameters | ||
| ---------- | ||
| task_queue : multiprocessing.JoinableQueue | ||
| A queue of task to execute. | ||
| obj_store : unidist.core.backends.multiprocessing.core.object_store.ObjectStore | ||
| Shared object storage to read/write data. | ||
| """ | ||
| def __init__(self, task_queue, obj_store): | ||
| Process.__init__(self, daemon=True) | ||
| self.task_queue = task_queue | ||
| self._obj_store = obj_store | ||
| def run(self): | ||
| """Run main infinite loop of process to execute tasks from `self.task_queue`.""" | ||
| while 1: | ||
| task = self.task_queue.get() | ||
| task = pkl.loads(task) | ||
| data_ids = task.data_ids | ||
| try: | ||
| value = task() | ||
| except Exception as e: | ||
| if isinstance(data_ids, list) and len(data_ids) > 1: | ||
| for i, data_id in enumerate(data_ids): | ||
| self._obj_store.store_delayed[data_id] = e | ||
| else: | ||
| self._obj_store.store_delayed[data_ids] = e | ||
| else: | ||
| if data_ids is not None: | ||
| if isinstance(data_ids, list) and len(data_ids) > 1: | ||
| for data_id, val in zip(data_ids, value): | ||
| self._obj_store.store_delayed[data_id] = val | ||
| else: | ||
| self._obj_store.store_delayed[data_ids] = value | ||
| finally: | ||
| self.task_queue.task_done() | ||
| return | ||
| def add_task(self, task): | ||
| """ | ||
| Add `task` to `self.task_queue`. | ||
| Parameters | ||
| ---------- | ||
| task : unidist.core.backends.multiprocessing.core.process_manager.Task | ||
| Task to be added in the queue. | ||
| """ | ||
| self.task_queue.put(task) | ||
| class ProcessManager: | ||
| """ | ||
| Class that controls worker pool and assings task to workers. | ||
| Parameters | ||
| ---------- | ||
| num_workers : int, optional | ||
| Number of worker-processes to start. If isn't provided, | ||
| will be equal to number of CPUs. | ||
| Notes | ||
| ----- | ||
| Constructor starts `num_workers` MultiProcessing Workers. | ||
| """ | ||
| __instance = None | ||
| def __init__(self, num_workers=None): | ||
| if ProcessManager.__instance is None: | ||
| if num_workers is None: | ||
| num_workers = CpuCount.get() | ||
| self.workers = [None] * num_workers | ||
| self.grabbed_workers = [None] * num_workers | ||
| self.__class__._worker_id = 0 | ||
| obj_store = ObjectStore.get_instance() | ||
| for idx in range(num_workers): | ||
| self.workers[idx] = Worker(JoinableQueue(), obj_store) | ||
| self.workers[idx].start() | ||
| self.grabbed_workers[idx] = False | ||
| @classmethod | ||
| def get_instance(cls, num_workers=None): | ||
| """ | ||
| Get instance of ``ProcessManager``. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.multiprocessing.core.process_manager.ProcessManager | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = ProcessManager(num_workers=num_workers) | ||
| return cls.__instance | ||
| def _next(self): | ||
| """ | ||
| Get current worker index and move to another with incrementing by one. | ||
| Returns | ||
| ------- | ||
| int | ||
| """ | ||
| idx = self.__class__._worker_id | ||
| self.__class__._worker_id += 1 | ||
| if self.__class__._worker_id == len(self.workers): | ||
| self.__class__._worker_id = 0 | ||
| return idx | ||
| def grab_worker(self): | ||
| """ | ||
| Grab a worker from worker pool. | ||
| Grabbed worker is marked as `blocked` and doesn't participate | ||
| in the tasks submission. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.multiprocessing.core.process_manager.Worker | ||
| Grabbed worker. | ||
| int | ||
| Index of grabbed worker. | ||
| """ | ||
| for idx, is_grabbed in enumerate(self.grabbed_workers): | ||
| if not is_grabbed: | ||
| self.grabbed_workers[idx] = True | ||
| return self.workers[idx], idx | ||
| raise RuntimeError("Actor can`t be run, no available workers.") | ||
| def free_worker(self, idx): | ||
| """ | ||
| Free worker by index `idx`. | ||
| Parameters | ||
| ---------- | ||
| idx : int | ||
| Index of worker to be freed. | ||
| """ | ||
| self.grabbed_workers[idx] = False | ||
| def submit(self, task): | ||
| """ | ||
| Add `task` to task queue of one of workers using round-robin. | ||
| Parameters | ||
| ---------- | ||
| task : unidist.core.backends.multiprocessing.core.process_manager.Task | ||
| Task to be added in task queue. | ||
| """ | ||
| num_skipped = 0 | ||
| while num_skipped < len(self.workers): | ||
| idx = self._next() | ||
| if not self.grabbed_workers[idx]: | ||
| self.workers[idx].add_task(task) | ||
| return | ||
| else: | ||
| num_skipped += 1 | ||
| raise RuntimeError("Task can`t be run, no available workers.") | ||
| class Task: | ||
| """ | ||
| Class poses as unified callable object to execute in MultiProcessing Worker. | ||
| Parameters | ||
| ---------- | ||
| func : callable | ||
| A function to be called in object invocation. | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| ``DataID``-(s) associated with result(s) of `func` invocation. | ||
| obj_store : unidist.core.backends.multiprocessing.core.object_store.ObjectStore | ||
| Object storage to share data between workers. | ||
| *args : iterable | ||
| Positional arguments to be passed in the `func`. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in the `func`. | ||
| """ | ||
| def __init__(self, func, data_ids, obj_store, *args, **kwargs): | ||
| self._func = func | ||
| self._args = args | ||
| self._kwargs = kwargs | ||
| self.data_ids = data_ids | ||
| self.obj_store = obj_store | ||
| def __call__(self): | ||
| """ | ||
| Execute `self._func`. | ||
| If `self._args`/`self._kwargs` has ``DataID`` objects, | ||
| automaterialize happens. | ||
| Returns | ||
| ------- | ||
| object | ||
| The result of `self._func` invocation. | ||
| """ | ||
| materialized_args = [ | ||
| self.obj_store.get(arg) if isinstance(arg, DataID) else arg | ||
| for arg in self._args | ||
| ] | ||
| materialized_kwargs = { | ||
| key: self.obj_store.get(value) if isinstance(value, DataID) else value | ||
| for key, value in self._kwargs.items() | ||
| } | ||
| return self._func(*materialized_args, **materialized_kwargs) |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """An implementation of ``RemoteFunction`` interface using MultiProcessing.""" | ||
| import unidist.core.backends.multiprocessing.core as mp | ||
| from unidist.core.base.object_ref import ObjectRef | ||
| from unidist.core.base.remote_function import RemoteFunction | ||
| class MultiProcessingRemoteFunction(RemoteFunction): | ||
| """ | ||
| The class that implements the interface in ``RemoteFunction`` using MultiProcessing. | ||
| Parameters | ||
| ---------- | ||
| function : callable | ||
| A function to be called remotely. | ||
| num_cpus : int | ||
| The number of CPUs to reserve for the remote function. | ||
| num_returns : int | ||
| The number of ``ObjectRef``-s returned by the remote function invocation. | ||
| resources : dict | ||
| Custom resources to reserve for the remote function. | ||
| """ | ||
| def __init__(self, function, num_cpus, num_returns, resources): | ||
| self._remote_function = function | ||
| self._num_cpus = num_cpus | ||
| self._num_returns = 1 if num_returns is None else num_returns | ||
| self._resources = resources | ||
| def _remote(self, *args, num_cpus=None, num_returns=None, resources=None, **kwargs): | ||
| """ | ||
| Execute `self._remote_function` in a worker process. | ||
| Parameters | ||
| ---------- | ||
| *args : iterable | ||
| Positional arguments to be passed in the `self._remote_function`. | ||
| num_cpus : int, optional | ||
| The number of CPUs to reserve for the remote function. | ||
| num_returns : int, optional | ||
| The number of ``ObjectRef``-s returned by the remote function invocation. | ||
| resources : dict, optional | ||
| Custom resources to reserve for the remote function. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in the `self._remote_function`. | ||
| Returns | ||
| ------- | ||
| ObjectRef, list or None | ||
| Type of returns depends on `num_returns` value: | ||
| * if `num_returns == 1`, ``ObjectRef`` will be returned. | ||
| * if `num_returns > 1`, list of ``ObjectRef``-s will be returned. | ||
| * if `num_returns == 0`, ``None`` will be returned. | ||
| """ | ||
| if num_cpus is not None or self._num_cpus is not None: | ||
| raise NotImplementedError( | ||
| "'num_cpus' is not supported yet by MultiProcessing backend." | ||
| ) | ||
| if resources is not None or self._resources is not None: | ||
| raise NotImplementedError( | ||
| "'resources' is not supported yet by MultiProcessing backend." | ||
| ) | ||
| if num_returns is None: | ||
| num_returns = self._num_returns | ||
| data_ids = mp.submit( | ||
| self._remote_function, *args, num_returns=num_returns, **kwargs | ||
| ) | ||
| if num_returns == 1: | ||
| return ObjectRef(data_ids) | ||
| elif num_returns > 1: | ||
| return [ObjectRef(data_id) for data_id in data_ids] | ||
| elif num_returns == 0: | ||
| return None |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Utilities used to initialize MultiProcessing execution backend.""" | ||
| from unidist.config import CpuCount | ||
| def initialize_multiprocessing(): | ||
| """ | ||
| Initialize the MultiProcessing execution backend. | ||
| Notes | ||
| ----- | ||
| Number of workers for MultiProcessing is equal to number of CPUs used by the backend. | ||
| """ | ||
| from unidist.core.backends.multiprocessing.core import init | ||
| init(num_workers=CpuCount.get()) |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Python backend functionality.""" |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Actor specific functionality using Python backend.""" | ||
| import unidist.core.backends.python.core as py | ||
| from unidist.core.base.actor import Actor, ActorMethod | ||
| from unidist.core.base.object_ref import ObjectRef | ||
| class PythonActorMethod(ActorMethod): | ||
| """ | ||
| The class implements the interface in ``ActorMethod`` using Python backend. | ||
| Parameters | ||
| ---------- | ||
| cls : object | ||
| An actor class from which method `method_name` will be called. | ||
| method_name : str | ||
| The name of the method to be called. | ||
| """ | ||
| def __init__(self, cls, method_name): | ||
| self._cls = cls | ||
| self._method_name = method_name | ||
| self._num_returns = 1 | ||
| def _remote(self, *args, num_returns=None, **kwargs): | ||
| """ | ||
| Execute `self._method_name`. | ||
| Parameters | ||
| ---------- | ||
| *args : iterable | ||
| Positional arguments to be passed in the method. | ||
| num_returns : int, optional | ||
| Number of results to be returned. If it isn't | ||
| provided, `self._num_returns` will be used. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in the method. | ||
| Returns | ||
| ------- | ||
| ObjectRef, list or None | ||
| Type of returns depends on `num_returns` value: | ||
| * if `num_returns == 1`, ``ObjectRef`` will be returned. | ||
| * if `num_returns > 1`, list of ``ObjectRef``-s will be returned. | ||
| * if `num_returns == 0`, ``None`` will be returned. | ||
| """ | ||
| if num_returns is None: | ||
| num_returns = self._num_returns | ||
| class_method = getattr(self._cls, self._method_name) | ||
| data_ids = py.submit(class_method, *args, num_returns=num_returns, **kwargs) | ||
| if num_returns == 1: | ||
| return ObjectRef(data_ids) | ||
| elif num_returns > 1: | ||
| return [ObjectRef(data_id) for data_id in data_ids] | ||
| elif num_returns == 0: | ||
| return None | ||
| class PythonActor(Actor): | ||
| """ | ||
| The class implements the interface in ``Actor`` using Python backend. | ||
| Parameters | ||
| ---------- | ||
| cls : object | ||
| Class to be an actor class. | ||
| num_cpus : int | ||
| The number of CPUs to reserve for the lifetime of the actor. | ||
| resources : dict | ||
| Custom resources to reserve for the lifetime of the actor. | ||
| """ | ||
| def __init__(self, cls, num_cpus, resources): | ||
| self._cls = cls | ||
| self._num_cpus = num_cpus | ||
| self._resources = resources | ||
| self._actor_handle = None | ||
| def __getattr__(self, name): | ||
| """ | ||
| Get the attribute `name` of the `self._cls` class. | ||
| This method creates the ``PythonActorMethod`` object that is responsible | ||
| for calling method `name` of the `self._cls` class. | ||
| Parameters | ||
| ---------- | ||
| name : str | ||
| Name of the method to be called. | ||
| Returns | ||
| ------- | ||
| PythonActorMethod | ||
| """ | ||
| return PythonActorMethod(self._actor_handle, name) | ||
| def _remote(self, *args, num_cpus=None, resources=None, **kwargs): | ||
| """ | ||
| Create actor class, specific for Python backend, from `self._cls`. | ||
| Parameters | ||
| ---------- | ||
| *args : iterable | ||
| Positional arguments to be passed in `self._cls` class constructor. | ||
| num_cpus : int, optional | ||
| The number of CPUs to reserve for the lifetime of the actor. | ||
| resources : dict, optional | ||
| Custom resources to reserve for the lifetime of the actor. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in `self._cls` class constructor. | ||
| Returns | ||
| ------- | ||
| PythonActor | ||
| """ | ||
| if num_cpus is not None or self._num_cpus is not None: | ||
| raise NotImplementedError("'num_cpus' is not supported by Python backend.") | ||
| if resources is not None or self._resources is not None: | ||
| raise NotImplementedError("'resources' is not supported by Python backend.") | ||
| self._actor_handle = self._cls(*args, **kwargs) | ||
| return self |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """An implementation of ``Backend`` interface using Python.""" | ||
| import socket | ||
| import unidist.core.backends.python.core as py | ||
| from unidist.core.backends.python.actor import PythonActor | ||
| from unidist.core.backends.python.remote_function import ( | ||
| PythonRemoteFunction, | ||
| ) | ||
| from unidist.core.base.backend import Backend | ||
| class PythonBackend(Backend): | ||
| """The class that implements the interface in ``Backend`` using Python.""" | ||
| @staticmethod | ||
| def make_remote_function(function, num_cpus, num_returns, resources): | ||
| """ | ||
| Define ``PythonRemoteFunction``. | ||
| function : callable | ||
| Function to be ``PythonRemoteFunction``. | ||
| num_cpus : int | ||
| The number of CPUs to reserve for ``PythonRemoteFunction``. | ||
| num_returns : int | ||
| The number of ``ObjectRef``-s returned by the function invocation. | ||
| resources : dict | ||
| Custom resources to reserve for the function. | ||
| Returns | ||
| ------- | ||
| PythonRemoteFunction | ||
| """ | ||
| return PythonRemoteFunction(function, num_cpus, num_returns, resources) | ||
| @staticmethod | ||
| def make_actor(cls, num_cpus, resources): | ||
| """ | ||
| Define an actor class. | ||
| cls : object | ||
| Class to be an actor class. | ||
| num_cpus : int | ||
| The number of CPUs to reserve for the lifetime of the actor. | ||
| resources : dict | ||
| Custom resources to reserve for the lifetime of the actor. | ||
| Returns | ||
| ------- | ||
| PythonActor | ||
| The actor class type to create. | ||
| list | ||
| The list of arguments for ``PythonActor`` constructor. | ||
| """ | ||
| return PythonActor, [cls, num_cpus, resources] | ||
| @staticmethod | ||
| def get(data_ids): | ||
| """ | ||
| Get an object or a list of objects from object store. | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| ``DataID`` or a list of ``DataID`` objects to get data from. | ||
| Returns | ||
| ------- | ||
| object | ||
| A Python object or a list of Python objects. | ||
| """ | ||
| return py.get(data_ids) | ||
| @staticmethod | ||
| def put(data): | ||
| """ | ||
| Put `data` into object store. | ||
| Parameters | ||
| ---------- | ||
| data : object | ||
| Data to be put. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID | ||
| ``DataID`` matching to data. | ||
| """ | ||
| return py.put(data) | ||
| @staticmethod | ||
| def wait(data_ids, num_returns=1): | ||
| """ | ||
| Wait until `data_ids` are finished. | ||
| This method returns two lists. The first list consists of | ||
| data IDs that correspond to objects that completed computations. | ||
| The second list corresponds to the rest of the data IDs. | ||
| Parameters | ||
| ---------- | ||
| object_refs : unidist.core.backends.common.data_id.DataID or list | ||
| ``DataID`` or list of ``DataID``-s to be waited. | ||
| num_returns : int, default: 1 | ||
| The number of ``DataID``-s that should be returned as ready. | ||
| Returns | ||
| ------- | ||
| tuple | ||
| List of data IDs that are ready and list of the remaining data IDs. | ||
| Notes | ||
| ----- | ||
| Method serves to maintain behavior compatibility between backends. All objects | ||
| completed computation before putting into an object storage for Python backend. | ||
| """ | ||
| return data_ids[:num_returns], data_ids[num_returns:] | ||
| @staticmethod | ||
| def get_ip(): | ||
| """ | ||
| Get node IP address. | ||
| Returns | ||
| ------- | ||
| str | ||
| Node IP address. | ||
| """ | ||
| hostname = socket.gethostname() | ||
| return socket.gethostbyname(hostname) | ||
| @staticmethod | ||
| def num_cpus(): | ||
| """ | ||
| Get the number of CPUs used by the execution backend. | ||
| Returns | ||
| ------- | ||
| int | ||
| """ | ||
| return 1 | ||
| @staticmethod | ||
| def cluster_resources(): | ||
| """ | ||
| Get resources of the cluster. | ||
| Returns | ||
| ------- | ||
| dict | ||
| Dictionary with node info in the form `{"node_ip": {"CPU": x}}`. | ||
| """ | ||
| return {PythonBackend.get_ip(): {"CPU": PythonBackend.num_cpus()}} | ||
| @staticmethod | ||
| def is_initialized(): | ||
| """ | ||
| Check if Python backend has already been initialized. | ||
| Returns | ||
| ------- | ||
| bool | ||
| True or False. | ||
| """ | ||
| return py.is_initialized() |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Python backend core functionality.""" | ||
| from .api import put, get, submit, init, is_initialized | ||
| __all__ = ["put", "get", "submit", "init", "is_initialized"] |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """High-level API of Python backend.""" | ||
| from unidist.core.backends.common.data_id import DataID | ||
| from unidist.core.backends.python.core.object_store import ObjectStore | ||
| # The global variable is responsible for if Python backend has already been initialized | ||
| is_python_initialized = False | ||
| def init(): | ||
| """ | ||
| Initialize an object storage. | ||
| Notes | ||
| ----- | ||
| Run initialization of singleton object ``unidist.core.backends.python.core.object_store.ObjectStore``. | ||
| """ | ||
| ObjectStore.get_instance() | ||
| global is_python_initialized | ||
| if not is_python_initialized: | ||
| is_python_initialized = True | ||
| def is_initialized(): | ||
| """ | ||
| Check if Python backend has already been initialized. | ||
| Returns | ||
| ------- | ||
| bool | ||
| True or False. | ||
| """ | ||
| global is_python_initialized | ||
| return is_python_initialized | ||
| def put(data): | ||
| """ | ||
| Put data into object storage. | ||
| Parameters | ||
| ---------- | ||
| data : object | ||
| Data to be put. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID | ||
| An ID of object in object storage. | ||
| """ | ||
| return ObjectStore.get_instance().put(data) | ||
| def get(data_ids): | ||
| """ | ||
| Get object(s) associated with `data_ids` from the object storage. | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| ID(s) to object(s) to get data from. | ||
| Returns | ||
| ------- | ||
| object | ||
| A Python object. | ||
| """ | ||
| return ObjectStore.get_instance().get(data_ids) | ||
| def submit(func, *args, num_returns=1, **kwargs): | ||
| """ | ||
| Execute function. | ||
| Parameters | ||
| ---------- | ||
| func : callable | ||
| Function to be executed. | ||
| *args : iterable | ||
| Positional arguments to be passed in the `func`. | ||
| num_returns : int, default: 1 | ||
| Number of results to be returned from `func`. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in the `func`. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID, list or None | ||
| Type of returns depends on `num_returns` value: | ||
| * if `num_returns == 1`, ``DataID`` will be returned. | ||
| * if `num_returns > 1`, list of ``DataID``-s will be returned. | ||
| * if `num_returns == 0`, ``None`` will be returned. | ||
| """ | ||
| obj_store = ObjectStore.get_instance() | ||
| materialized_args = [ | ||
| obj_store.get(arg) if isinstance(arg, DataID) else arg for arg in args | ||
| ] | ||
| materialized_kwargs = { | ||
| key: obj_store.get(value) if isinstance(value, DataID) else value | ||
| for key, value in kwargs.items() | ||
| } | ||
| try: | ||
| result = func(*materialized_args, **materialized_kwargs) | ||
| except Exception as e: | ||
| result = [e] * num_returns if num_returns > 1 else e | ||
| if num_returns == 0: | ||
| data_ids = None | ||
| elif num_returns > 1: | ||
| data_ids = [obj_store.put(result[idx]) for idx in range(num_returns)] | ||
| else: | ||
| data_ids = obj_store.put(result) | ||
| return data_ids |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Object storage related functionality.""" | ||
| from unidist.core.backends.common.data_id import DataID | ||
| class ObjectStore: | ||
| """Class that stores objects and provides access to these.""" | ||
| __instance = None | ||
| def __init__(self): | ||
| if ObjectStore.__instance is None: | ||
| self.store = dict() | ||
| def __repr__(self): | ||
| return f"Object store: {self.store}" | ||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get instance of ``ObjectStore``. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.python.core.object_store.ObjectStore | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = ObjectStore() | ||
| return cls.__instance | ||
| def put(self, data, data_id=None): | ||
| """ | ||
| Put `data` to internal dictionary. | ||
| Parameters | ||
| ---------- | ||
| data : object | ||
| Data to be put. | ||
| data_id : unidist.core.backends.common.data_id.DataID, optional | ||
| An ID of data. If it isn't provided, will be created automatically. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID | ||
| An ID of object in internal dictionary. | ||
| """ | ||
| data_id = DataID() if data_id is None else data_id | ||
| self.store[data_id] = data | ||
| return data_id | ||
| def get(self, data_ids): | ||
| """ | ||
| Get object(s) associated with `data_ids` from the internal dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| ID(s) of object(s) to get data from. | ||
| Returns | ||
| ------- | ||
| object | ||
| A Python object. | ||
| """ | ||
| is_list = isinstance(data_ids, list) | ||
| if not is_list: | ||
| data_ids = [data_ids] | ||
| if not all(isinstance(data_id, DataID) for data_id in data_ids): | ||
| raise ValueError( | ||
| "`data_ids` must either be a data ID or a list of data IDs." | ||
| ) | ||
| def check_exception(value): | ||
| if isinstance(value, Exception): | ||
| raise value | ||
| return value | ||
| values = [check_exception(self.store[data_id]) for data_id in data_ids] | ||
| return values if is_list else values[0] |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """An implementation of ``RemoteFunction`` interface using Python.""" | ||
| import unidist.core.backends.python.core as py | ||
| from unidist.core.base.object_ref import ObjectRef | ||
| from unidist.core.base.remote_function import RemoteFunction | ||
| class PythonRemoteFunction(RemoteFunction): | ||
| """ | ||
| The class that implements the interface in ``RemoteFunction`` using Python. | ||
| Parameters | ||
| ---------- | ||
| function : callable | ||
| A function to be called. | ||
| num_cpus : int | ||
| The number of CPUs to reserve for the function. | ||
| num_returns : int | ||
| The number of ``ObjectRef``-s returned by the function invocation. | ||
| resources : dict | ||
| Custom resources to reserve for the function. | ||
| """ | ||
| def __init__(self, function, num_cpus, num_returns, resources): | ||
| self._remote_function = function | ||
| self._num_cpus = num_cpus | ||
| self._num_returns = 1 if num_returns is None else num_returns | ||
| self._resources = resources | ||
| def _remote(self, *args, num_cpus=None, num_returns=None, resources=None, **kwargs): | ||
| """ | ||
| Execute `self._remote_function`. | ||
| Parameters | ||
| ---------- | ||
| *args : iterable | ||
| Positional arguments to be passed in the `self._remote_function`. | ||
| num_cpus : int, optional | ||
| The number of CPUs to reserve for the function. | ||
| num_returns : int, optional | ||
| The number of ``ObjectRef``-s returned by the function invocation. | ||
| resources : dict, optional | ||
| Custom resources to reserve for the function. | ||
| **kwargs : dict | ||
| Keyword arguments to be passed in the `self._remote_function`. | ||
| Returns | ||
| ------- | ||
| ObjectRef, list or None | ||
| Type of returns depends on `num_returns` value: | ||
| * if `num_returns == 1`, ``ObjectRef`` will be returned. | ||
| * if `num_returns > 1`, list of ``ObjectRef`` will be returned. | ||
| * if `num_returns == 0`, ``None`` will be returned. | ||
| """ | ||
| if num_cpus is not None or self._num_cpus is not None: | ||
| raise NotImplementedError("'num_cpus' is not supported by Python backend.") | ||
| if resources is not None or self._resources is not None: | ||
| raise NotImplementedError("'resources' is not supported by Python backend.") | ||
| if num_returns is None: | ||
| num_returns = self._num_returns | ||
| data_ids = py.submit( | ||
| self._remote_function, *args, num_returns=num_returns, **kwargs | ||
| ) | ||
| if num_returns == 1: | ||
| return ObjectRef(data_ids) | ||
| elif num_returns > 1: | ||
| return [ObjectRef(data_id) for data_id in data_ids] | ||
| elif num_returns == 0: | ||
| return None |
| # Copyright (C) 2021-2022 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Utilities used to initialize Python execution backend.""" | ||
| def initialize_python(): | ||
| """ | ||
| Initialize the Python execution backend. | ||
| Notes | ||
| ----- | ||
| All execution will happen sequentially. | ||
| """ | ||
| from unidist.core.backends.python.core import init | ||
| init() |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
400678
1.58%9817
0.56%