unidist
Advanced tools
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """`ObjectStore` functionality.""" | ||
| from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore | ||
| from unidist.core.backends.mpi.core.serialization import deserialize_complex_data | ||
| from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore | ||
| class ObjectStore: | ||
| """ | ||
| Class that combines checking and retrieving data from the shared and local stores in a current process. | ||
| Notes | ||
| ----- | ||
| The store checks for both deserialized and serialized data. | ||
| """ | ||
| __instance = None | ||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get instance of ``ObjectStore``. | ||
| Returns | ||
| ------- | ||
| ObjectStore | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = ObjectStore() | ||
| return cls.__instance | ||
| def contains(self, data_id): | ||
| """ | ||
| Check if the data associated with `data_id` exists in the current process. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| bool | ||
| Return the status if an object exist in the current process. | ||
| """ | ||
| local_store = LocalObjectStore.get_instance() | ||
| shared_store = SharedObjectStore.get_instance() | ||
| return ( | ||
| local_store.contains(data_id) | ||
| or local_store.is_already_serialized(data_id) | ||
| or shared_store.contains(data_id) | ||
| ) | ||
| def get(self, data_id): | ||
| """ | ||
| Get data from any location in the current process. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| object | ||
| Return data associated with `data_id`. | ||
| """ | ||
| local_store = LocalObjectStore.get_instance() | ||
| shared_store = SharedObjectStore.get_instance() | ||
| if local_store.contains(data_id): | ||
| return local_store.get(data_id) | ||
| if local_store.is_already_serialized(data_id): | ||
| serialized_data = local_store.get_serialized_data(data_id) | ||
| value = deserialize_complex_data( | ||
| serialized_data["s_data"], | ||
| serialized_data["raw_buffers"], | ||
| serialized_data["buffer_count"], | ||
| ) | ||
| elif shared_store.contains(data_id): | ||
| value = shared_store.get(data_id) | ||
| else: | ||
| raise ValueError("The current data ID is not contained in the procces.") | ||
| local_store.put(data_id, value) | ||
| return value |
+2
-2
| Metadata-Version: 2.1 | ||
| Name: unidist | ||
| Version: 0.5.1 | ||
| Version: 0.6.0 | ||
| Summary: Unified Distributed Execution | ||
@@ -40,3 +40,3 @@ Home-page: https://github.com/modin-project/unidist | ||
| <a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/0.5.1/"><img src="https://img.shields.io/badge/pypi-0.5.1-blue.svg" alt="PyPI version" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a> | ||
| <a href="https://pepy.tech/project/unidist"><img src="https://static.pepy.tech/personalized-badge/unidist?period=total&units=international_system&left_color=black&right_color=blue&left_text=Downloads" align="center"></a> | ||
@@ -43,0 +43,0 @@ </p> |
+1
-1
@@ -9,3 +9,3 @@ <p align="center"> | ||
| <a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/0.5.1/"><img src="https://img.shields.io/badge/pypi-0.5.1-blue.svg" alt="PyPI version" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a> | ||
| <a href="https://pepy.tech/project/unidist"><img src="https://static.pepy.tech/personalized-badge/unidist?period=total&units=international_system&left_color=black&right_color=blue&left_text=Downloads" align="center"></a> | ||
@@ -12,0 +12,0 @@ </p> |
| Metadata-Version: 2.1 | ||
| Name: unidist | ||
| Version: 0.5.1 | ||
| Version: 0.6.0 | ||
| Summary: Unified Distributed Execution | ||
@@ -40,3 +40,3 @@ Home-page: https://github.com/modin-project/unidist | ||
| <a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/0.5.1/"><img src="https://img.shields.io/badge/pypi-0.5.1-blue.svg" alt="PyPI version" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a> | ||
| <a href="https://pepy.tech/project/unidist"><img src="https://static.pepy.tech/personalized-badge/unidist?period=total&units=international_system&left_color=black&right_color=blue&left_text=Downloads" align="center"></a> | ||
@@ -43,0 +43,0 @@ </p> |
@@ -47,2 +47,3 @@ AUTHORS | ||
| unidist/core/backends/mpi/core/local_object_store.py | ||
| unidist/core/backends/mpi/core/object_store.py | ||
| unidist/core/backends/mpi/core/serialization.py | ||
@@ -49,0 +50,0 @@ unidist/core/backends/mpi/core/shared_object_store.py |
@@ -11,7 +11,7 @@ | ||
| { | ||
| "date": "2023-11-25T22:18:18+0100", | ||
| "date": "2024-01-08T21:52:33+0100", | ||
| "dirty": false, | ||
| "error": null, | ||
| "full-revisionid": "8440b4bb3aa56fb7b558efb00f084087b3e88e18", | ||
| "version": "0.5.1" | ||
| "full-revisionid": "4fa3cf262299c08b66efd538c190b360c448d467", | ||
| "version": "0.6.0" | ||
| } | ||
@@ -18,0 +18,0 @@ ''' # END VERSION_JSON |
@@ -104,2 +104,5 @@ # Copyright (C) 2021-2023 Modin authors | ||
| The tag for send/recv of a buffer-like object. | ||
| OBJECT_BLOCKING : int, default: 114 | ||
| The tag for send/recv of a regular Python object | ||
| to indicate the blocking get request. | ||
| """ | ||
@@ -110,2 +113,3 @@ | ||
| BUFFER = 113 | ||
| OBJECT_BLOCKING = 114 | ||
@@ -112,0 +116,0 @@ |
@@ -300,3 +300,3 @@ # Copyright (C) 2021-2023 Modin authors | ||
| def mpi_send_object(comm, data, dest_rank): | ||
| def mpi_send_object(comm, data, dest_rank, tag=common.MPITag.OBJECT): | ||
| """ | ||
@@ -313,2 +313,4 @@ Send a Python object to another MPI rank in a blocking way. | ||
| Target MPI process to transfer data. | ||
| tag : common.MPITag, default: common.MPITag.OBJECT | ||
| Message tag. | ||
@@ -322,3 +324,3 @@ Notes | ||
| """ | ||
| comm.send(data, dest=dest_rank, tag=common.MPITag.OBJECT) | ||
| comm.send(data, dest=dest_rank, tag=tag) | ||
@@ -411,2 +413,32 @@ | ||
| def mpi_iprobe_recv_object(comm, tag=common.MPITag.OBJECT): | ||
| """ | ||
| Receive an object of a standard Python data type from any source. | ||
| The source rank gets available from `iprobe`. | ||
| Parameters | ||
| ---------- | ||
| comm : mpi4py.MPI.Comm | ||
| MPI communicator. | ||
| tag : common.MPITag, default: common.MPITag.OBJECT | ||
| Message tag. | ||
| Returns | ||
| ------- | ||
| object | ||
| Received data from the source rank. | ||
| int | ||
| Source rank. | ||
| """ | ||
| backoff = MpiBackoff.get() | ||
| status = MPI.Status() | ||
| source = MPI.ANY_SOURCE | ||
| while not comm.iprobe(source=source, tag=tag, status=status): | ||
| time.sleep(backoff) | ||
| source = status.source | ||
| data = comm.recv(source=source, tag=tag, status=status) | ||
| return data, source | ||
| def mpi_recv_object(comm, source_rank): | ||
@@ -629,3 +661,3 @@ """ | ||
| # wrap to dict for sending and correct deserialization of the object by the recipient | ||
| comm.send(dict(info_package), dest=dest_rank, tag=common.MPITag.OBJECT) | ||
| comm.send(dict(info_package), dest=dest_rank, tag=common.MPITag.OBJECT_BLOCKING) | ||
| with pkl5._bigmpi as bigmpi: | ||
@@ -632,0 +664,0 @@ comm.Send(bigmpi(s_data), dest=dest_rank, tag=common.MPITag.BUFFER) |
@@ -22,2 +22,3 @@ # Copyright (C) 2021-2023 Modin authors | ||
| from unidist.core.backends.mpi.core.serialization import serialize_complex_data | ||
| from unidist.core.backends.mpi.core.object_store import ObjectStore | ||
| from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore | ||
@@ -390,3 +391,2 @@ from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore | ||
| serialized_data = serialize_complex_data(data) | ||
| local_store.put(data_id, data) | ||
| if shared_store.is_allocated(): | ||
@@ -416,21 +416,17 @@ shared_store.put(data_id, serialized_data) | ||
| """ | ||
| local_store = LocalObjectStore.get_instance() | ||
| object_store = ObjectStore.get_instance() | ||
| is_list = isinstance(data_ids, list) | ||
| if not is_list: | ||
| data_ids = [data_ids] | ||
| remote_data_ids = [ | ||
| data_id for data_id in data_ids if not object_store.contains(data_id) | ||
| ] | ||
| # Remote data gets available in the local store inside `request_worker_data` | ||
| if remote_data_ids: | ||
| request_worker_data(remote_data_ids) | ||
| def get_impl(data_id): | ||
| if local_store.contains(data_id): | ||
| value = local_store.get(data_id) | ||
| else: | ||
| value = request_worker_data(data_id) | ||
| logger.debug("GET {} ids".format(common.unwrapped_data_ids_list(data_ids))) | ||
| if isinstance(value, Exception): | ||
| raise value | ||
| values = [object_store.get(data_id) for data_id in data_ids] | ||
| return value | ||
| logger.debug("GET {} ids".format(common.unwrapped_data_ids_list(data_ids))) | ||
| values = [get_impl(data_id) for data_id in data_ids] | ||
| # Initiate reference count based cleaup | ||
@@ -463,2 +459,3 @@ # if all the tasks were completed | ||
| """ | ||
| object_store = ObjectStore.get_instance() | ||
| if not isinstance(data_ids, list): | ||
@@ -473,7 +470,5 @@ data_ids = [data_ids] | ||
| ready = [] | ||
| local_store = LocalObjectStore.get_instance() | ||
| logger.debug("WAIT {} ids".format(common.unwrapped_data_ids_list(data_ids))) | ||
| for data_id in not_ready.copy(): | ||
| if local_store.contains(data_id): | ||
| if object_store.contains(data_id): | ||
| ready.append(data_id) | ||
@@ -480,0 +475,0 @@ not_ready.remove(data_id) |
@@ -15,3 +15,5 @@ # Copyright (C) 2021-2023 Modin authors | ||
| from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore | ||
| from unidist.core.backends.mpi.core.serialization import serialize_complex_data | ||
| from unidist.core.backends.mpi.core.serialization import ( | ||
| serialize_complex_data, | ||
| ) | ||
@@ -112,3 +114,3 @@ | ||
| def pull_data(comm, owner_rank): | ||
| def pull_data(comm, owner_rank=None): | ||
| """ | ||
@@ -122,4 +124,5 @@ Receive data from another MPI process. | ||
| ---------- | ||
| owner_rank : int | ||
| owner_rank : int or None | ||
| Source MPI process to receive data from. | ||
| If ``None``, data will be received from any source based on `iprobe`. | ||
@@ -131,3 +134,9 @@ Returns | ||
| """ | ||
| info_package = communication.mpi_recv_object(comm, owner_rank) | ||
| if owner_rank is None: | ||
| info_package, source = communication.mpi_iprobe_recv_object( | ||
| comm, tag=common.MPITag.OBJECT_BLOCKING | ||
| ) | ||
| owner_rank = source | ||
| else: | ||
| info_package = communication.mpi_recv_object(comm, owner_rank) | ||
| if info_package["package_type"] == common.MetadataPackage.SHARED_DATA: | ||
@@ -155,4 +164,6 @@ local_store = LocalObjectStore.get_instance() | ||
| ) | ||
| data_id = info_package["id"] | ||
| local_store.put(data_id, data) | ||
| return { | ||
| "id": info_package["id"], | ||
| "id": data_id, | ||
| "data": data, | ||
@@ -168,10 +179,10 @@ } | ||
| def request_worker_data(data_id): | ||
| def request_worker_data(data_ids): | ||
| """ | ||
| Get an object(s) associated with `data_id` from the object storage. | ||
| Get objects associated with `data_ids` from the object storage. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID(s) to object(s) to get data from. | ||
| data_ids : list[unidist.core.backends.mpi.core.common.MpiDataID] | ||
| IDs to objects to get data from. | ||
@@ -185,34 +196,48 @@ Returns | ||
| local_store = LocalObjectStore.get_instance() | ||
| owner_rank = local_store.get_data_owner(data_id) | ||
| logger.debug("GET {} id from {} rank".format(data_id._id, owner_rank)) | ||
| async_operations = AsyncOperations.get_instance() | ||
| # Worker request | ||
| operation_type = common.Operation.GET | ||
| operation_data = { | ||
| "source": mpi_state.global_rank, | ||
| "id": data_id, | ||
| # set `is_blocking_op` to `True` to tell a worker | ||
| # to send the data directly to the requester | ||
| # without any delay | ||
| "is_blocking_op": True, | ||
| } | ||
| # We use a blocking send here because we have to wait for | ||
| # completion of the communication, which is necessary for the pipeline to continue. | ||
| communication.send_simple_operation( | ||
| mpi_state.global_comm, | ||
| operation_type, | ||
| operation_data, | ||
| owner_rank, | ||
| ) | ||
| # Blocking get | ||
| complex_data = pull_data(mpi_state.global_comm, owner_rank) | ||
| if data_id != complex_data["id"]: | ||
| raise ValueError("Unexpected data_id!") | ||
| data = complex_data["data"] | ||
| for data_id in data_ids: | ||
| owner_rank = local_store.get_data_owner(data_id) | ||
| # Caching the result, check the protocol correctness here | ||
| local_store.put(data_id, data) | ||
| return data | ||
| logger.debug("GET {} id from {} rank".format(data_id._id, owner_rank)) | ||
| # Worker request | ||
| operation_type = common.Operation.GET | ||
| operation_data = { | ||
| "source": mpi_state.global_rank, | ||
| "id": data_id, | ||
| # set `is_blocking_op` to `True` to tell a worker | ||
| # to send the data directly to the requester | ||
| # without any delay | ||
| "is_blocking_op": True, | ||
| } | ||
| h_list = communication.isend_simple_operation( | ||
| mpi_state.global_comm, | ||
| operation_type, | ||
| operation_data, | ||
| owner_rank, | ||
| ) | ||
| # We do not wait for async requests here because | ||
| # we can receive the data from the first available worker below | ||
| async_operations.extend(h_list) | ||
| data_count = 0 | ||
| # If some dataids raise an exception it will be captured in exception_raised variable and raised after the while loop ends. | ||
| exception_raised = None | ||
| while data_count < len(data_ids): | ||
| # Remote data gets available in the local store inside `pull_data` | ||
| complex_data = pull_data(mpi_state.global_comm) | ||
| if isinstance(complex_data["data"], Exception) and exception_raised is None: | ||
| exception_raised = complex_data["data"] | ||
| data_id = complex_data["id"] | ||
| if data_id in data_ids: | ||
| data_count += 1 | ||
| else: | ||
| raise RuntimeError( | ||
| f"DataID {data_id} isn't in the requested list {data_ids}" | ||
| ) | ||
| if exception_raised: | ||
| raise exception_raised | ||
| def _push_local_data(dest_rank, data_id, is_blocking_op, is_serialized): | ||
@@ -307,3 +332,6 @@ """ | ||
| communication.mpi_send_object( | ||
| mpi_state.global_comm, operation_data, dest_rank | ||
| mpi_state.global_comm, | ||
| operation_data, | ||
| dest_rank, | ||
| tag=common.MPITag.OBJECT_BLOCKING, | ||
| ) | ||
@@ -379,18 +407,15 @@ else: | ||
| _push_shared_data(dest_rank, data_id, is_blocking_op) | ||
| elif local_store.is_already_serialized(data_id): | ||
| _push_local_data(dest_rank, data_id, is_blocking_op, is_serialized=True) | ||
| elif local_store.contains(data_id): | ||
| if local_store.is_already_serialized(data_id): | ||
| data = local_store.get(data_id) | ||
| serialized_data = serialize_complex_data(data) | ||
| if shared_store.is_allocated() and shared_store.should_be_shared( | ||
| serialized_data | ||
| ): | ||
| shared_store.put(data_id, serialized_data) | ||
| _push_shared_data(dest_rank, data_id, is_blocking_op) | ||
| else: | ||
| local_store.cache_serialized_data(data_id, serialized_data) | ||
| _push_local_data(dest_rank, data_id, is_blocking_op, is_serialized=True) | ||
| else: | ||
| data = local_store.get(data_id) | ||
| serialized_data = serialize_complex_data(data) | ||
| if shared_store.is_allocated() and shared_store.should_be_shared( | ||
| serialized_data | ||
| ): | ||
| shared_store.put(data_id, serialized_data) | ||
| _push_shared_data(dest_rank, data_id, is_blocking_op) | ||
| else: | ||
| local_store.cache_serialized_data(data_id, serialized_data) | ||
| _push_local_data( | ||
| dest_rank, data_id, is_blocking_op, is_serialized=True | ||
| ) | ||
| elif local_store.contains_data_owner(data_id): | ||
@@ -397,0 +422,0 @@ _push_data_owner(dest_rank, data_id) |
@@ -280,2 +280,6 @@ # Copyright (C) 2021-2023 Modin authors | ||
| """ | ||
| # We make a copy to avoid data corruption obtained through out-of-band serialization, | ||
| # and buffers are marked read-only to prevent them from being modified. | ||
| # `to_bytes()` call handles both points. | ||
| data["raw_buffers"] = [buf.tobytes() for buf in data["raw_buffers"]] | ||
| self._serialization_cache[data_id] = data | ||
@@ -282,0 +286,0 @@ self.maybe_update_data_id_map(data_id) |
@@ -750,3 +750,3 @@ # Copyright (C) 2021-2023 Modin authors | ||
| def get(self, data_id, owner_rank, shared_info): | ||
| def get(self, data_id, owner_rank=None, shared_info=None): | ||
| """ | ||
@@ -759,51 +759,63 @@ Get data from another worker using shared memory. | ||
| An ID to data. | ||
| owner_rank : int | ||
| owner_rank : int, default: None | ||
| The rank that sent the data. | ||
| shared_info : dict | ||
| This value is used to synchronize data in shared memory between different hosts | ||
| if the value is not ``None``. | ||
| shared_info : dict, default: None | ||
| The necessary information to properly deserialize data from shared memory. | ||
| If `shared_info` is ``None``, the data already exists in shared memory in the current process. | ||
| """ | ||
| mpi_state = communication.MPIState.get_instance() | ||
| s_data_len = shared_info["s_data_len"] | ||
| raw_buffers_len = shared_info["raw_buffers_len"] | ||
| service_index = shared_info["service_index"] | ||
| buffer_count = shared_info["buffer_count"] | ||
| if shared_info is None: | ||
| shared_info = self.get_shared_info(data_id) | ||
| else: | ||
| mpi_state = communication.MPIState.get_instance() | ||
| s_data_len = shared_info["s_data_len"] | ||
| raw_buffers_len = shared_info["raw_buffers_len"] | ||
| service_index = shared_info["service_index"] | ||
| buffer_count = shared_info["buffer_count"] | ||
| # check data in shared memory | ||
| if not self._check_service_info(data_id, service_index): | ||
| # reserve shared memory | ||
| shared_data_len = s_data_len + sum([buf for buf in raw_buffers_len]) | ||
| reservation_info = communication.send_reserve_operation( | ||
| mpi_state.global_comm, data_id, shared_data_len | ||
| ) | ||
| service_index = reservation_info["service_index"] | ||
| # check if worker should sync shared buffer or it is doing by another worker | ||
| if reservation_info["is_first_request"]: | ||
| # syncronize shared buffer | ||
| self._sync_shared_memory_from_another_host( | ||
| mpi_state.global_comm, | ||
| data_id, | ||
| owner_rank, | ||
| reservation_info["first_index"], | ||
| reservation_info["last_index"], | ||
| service_index, | ||
| # check data in shared memory | ||
| if not self._check_service_info(data_id, service_index): | ||
| # reserve shared memory | ||
| shared_data_len = s_data_len + sum([buf for buf in raw_buffers_len]) | ||
| reservation_info = communication.send_reserve_operation( | ||
| mpi_state.global_comm, data_id, shared_data_len | ||
| ) | ||
| # put service info | ||
| self._put_service_info( | ||
| service_index, data_id, reservation_info["first_index"] | ||
| ) | ||
| else: | ||
| # wait while another worker syncronize shared buffer | ||
| while not self._check_service_info(data_id, service_index): | ||
| time.sleep(MpiBackoff.get()) | ||
| # put shared info with updated data_id and service_index | ||
| shared_info = common.MetadataPackage.get_shared_info( | ||
| data_id, s_data_len, raw_buffers_len, buffer_count, service_index | ||
| ) | ||
| self._put_shared_info(data_id, shared_info) | ||
| service_index = reservation_info["service_index"] | ||
| # check if worker should sync shared buffer or it is doing by another worker | ||
| if reservation_info["is_first_request"]: | ||
| # syncronize shared buffer | ||
| if owner_rank is None: | ||
| raise ValueError( | ||
| "The data is not in the host's shared memory and the data must be synchronized, " | ||
| + "but the owner rank is not defined." | ||
| ) | ||
| # increment ref | ||
| self._increment_ref_number(data_id, shared_info["service_index"]) | ||
| self._sync_shared_memory_from_another_host( | ||
| mpi_state.global_comm, | ||
| data_id, | ||
| owner_rank, | ||
| reservation_info["first_index"], | ||
| reservation_info["last_index"], | ||
| service_index, | ||
| ) | ||
| # put service info | ||
| self._put_service_info( | ||
| service_index, data_id, reservation_info["first_index"] | ||
| ) | ||
| else: | ||
| # wait while another worker syncronize shared buffer | ||
| while not self._check_service_info(data_id, service_index): | ||
| time.sleep(MpiBackoff.get()) | ||
| # put shared info with updated data_id and service_index | ||
| shared_info = common.MetadataPackage.get_shared_info( | ||
| data_id, s_data_len, raw_buffers_len, buffer_count, service_index | ||
| ) | ||
| self._put_shared_info(data_id, shared_info) | ||
| # increment ref | ||
| self._increment_ref_number(data_id, shared_info["service_index"]) | ||
| # read from shared buffer and deserialized | ||
@@ -810,0 +822,0 @@ return self._read_from_shared_buffer(data_id, shared_info) |
@@ -19,2 +19,3 @@ # Copyright (C) 2021-2023 Modin authors | ||
| import unidist.core.backends.mpi.core.communication as communication | ||
| from unidist.core.backends.mpi.core.object_store import ObjectStore | ||
| from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore | ||
@@ -89,2 +90,3 @@ from unidist.core.backends.mpi.core.worker.request_store import RequestStore | ||
| task_store = TaskStore.get_instance() | ||
| object_store = ObjectStore.get_instance() | ||
| local_store = LocalObjectStore.get_instance() | ||
@@ -139,3 +141,2 @@ request_store = RequestStore.get_instance() | ||
| ) | ||
| local_store.put(request["id"], request["data"]) | ||
@@ -192,3 +193,3 @@ # Discard data request to another worker, if data has become available | ||
| # Actor method here is a data id so we have to retrieve it from the storage | ||
| method_name = local_store.get(request["task"]) | ||
| method_name = object_store.get(request["task"]) | ||
| handler = request["handler"] | ||
@@ -195,0 +196,0 @@ actor_method = getattr(actor_map[handler], method_name) |
@@ -9,4 +9,4 @@ # Copyright (C) 2021-2023 Modin authors | ||
| import unidist.core.backends.mpi.core.communication as communication | ||
| from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore | ||
| from unidist.core.backends.mpi.core.controller.common import push_data | ||
| from unidist.core.backends.mpi.core.object_store import ObjectStore | ||
@@ -217,3 +217,4 @@ | ||
| """ | ||
| if LocalObjectStore.get_instance().contains(data_id): | ||
| object_store = ObjectStore.get_instance() | ||
| if object_store.contains(data_id): | ||
| # Executor wait just for signal | ||
@@ -252,4 +253,4 @@ # We use a blocking send here because the receiver is waiting for the result. | ||
| """ | ||
| local_store = LocalObjectStore.get_instance() | ||
| if local_store.contains(data_id): | ||
| object_store = ObjectStore.get_instance() | ||
| if object_store.contains(data_id): | ||
| push_data( | ||
@@ -256,0 +257,0 @@ source_rank, |
@@ -14,2 +14,3 @@ # Copyright (C) 2021-2023 Modin authors | ||
| from unidist.core.backends.mpi.core.async_operations import AsyncOperations | ||
| from unidist.core.backends.mpi.core.object_store import ObjectStore | ||
| from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore | ||
@@ -191,4 +192,5 @@ from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore | ||
| local_store = LocalObjectStore.get_instance() | ||
| if local_store.contains(arg): | ||
| value = LocalObjectStore.get_instance().get(arg) | ||
| object_store = ObjectStore.get_instance() | ||
| if object_store.contains(arg): | ||
| value = object_store.get(arg) | ||
| # Data is already local or was pushed from master | ||
@@ -422,4 +424,4 @@ return value, False | ||
| """ | ||
| object_store = ObjectStore.get_instance() | ||
| # Parse request | ||
| local_store = LocalObjectStore.get_instance() | ||
| task = request["task"] | ||
@@ -429,3 +431,3 @@ # Remote function here is a data id so we have to retrieve it from the storage, | ||
| if is_data_id(task): | ||
| task = local_store.get(task) | ||
| task = object_store.get(task) | ||
| args = request["args"] | ||
@@ -432,0 +434,0 @@ kwargs = request["kwargs"] |
@@ -8,3 +8,2 @@ # Copyright (C) 2021-2023 Modin authors | ||
| import unidist.core.backends.mpi.core as mpi | ||
| from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore | ||
| from unidist.core.backends.common.data_id import is_data_id | ||
@@ -34,3 +33,2 @@ from unidist.core.backends.common.utils import unwrap_object_refs | ||
| self._remote_function = function | ||
| self._remote_function_orig = function | ||
| self._num_cpus = num_cpus | ||
@@ -80,9 +78,3 @@ self._num_returns = 1 if num_returns is None else num_returns | ||
| if not is_data_id(self._remote_function): | ||
| self._remote_function = mpi.put(self._remote_function_orig) | ||
| else: | ||
| # When a worker calls a remote function inside another remote function, | ||
| # we have to again serialize the former remote function and put it into the storage | ||
| # for further correct communication. | ||
| if not LocalObjectStore.get_instance().contains(self._remote_function): | ||
| self._remote_function = mpi.put(self._remote_function_orig) | ||
| self._remote_function = mpi.put(self._remote_function) | ||
@@ -89,0 +81,0 @@ data_ids = mpi.submit( |
@@ -125,1 +125,13 @@ # Copyright (C) 2021-2023 Modin authors | ||
| assert_equal(f.remote(data), data) | ||
| @pytest.mark.xfail( | ||
| Backend.get() == BackendName.PYSEQ, | ||
| reason="PUT using PYSEQ does not provide immutable data", | ||
| ) | ||
| def test_data_immutability(): | ||
| data = [1, 2, 3] | ||
| object_ref = unidist.put(data) | ||
| data[0] = 111 | ||
| assert_equal(object_ref, [1, 2, 3]) |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
1538441
0.37%101
1%12073
1.22%