unidist
Advanced tools
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """`LocalObjectStore` functionality.""" | ||
| import weakref | ||
| import unidist.core.backends.mpi.core.common as common | ||
| import unidist.core.backends.mpi.core.communication as communication | ||
| class LocalObjectStore: | ||
| """ | ||
| Class that stores local objects and provides access to them. | ||
| Notes | ||
| ----- | ||
| The storage is local to the current worker process only. | ||
| """ | ||
| __instance = None | ||
| def __init__(self): | ||
| # Add local data {DataID : Data} | ||
| self._data_map = weakref.WeakKeyDictionary() | ||
| # "strong" references to data IDs {DataID : DataID} | ||
| # we are using dict here to improve performance when getting an element from it, | ||
| # whereas other containers would require O(n) complexity | ||
| self._data_id_map = {} | ||
| # Data owner {DataID : Rank} | ||
| self._data_owner_map = weakref.WeakKeyDictionary() | ||
| # Data was already sent to this ranks {DataID : [ranks]} | ||
| self._sent_data_map = weakref.WeakKeyDictionary() | ||
| # Data id generator | ||
| self._data_id_counter = 0 | ||
| # Data serialized cache | ||
| self._serialization_cache = weakref.WeakKeyDictionary() | ||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get instance of ``LocalObjectStore``. | ||
| Returns | ||
| ------- | ||
| LocalObjectStore | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = LocalObjectStore() | ||
| return cls.__instance | ||
| def maybe_update_data_id_map(self, data_id): | ||
| """ | ||
| Add a strong reference to the `data_id` if necessary. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| Notes | ||
| ----- | ||
| The worker must have a strong reference to the external `data_id` until the owner process | ||
| send the `unidist.core.backends.common.Operation.CLEANUP` operation with this `data_id`. | ||
| """ | ||
| if ( | ||
| data_id.owner_rank != communication.MPIState.get_instance().global_rank | ||
| and data_id not in self._data_id_map | ||
| ): | ||
| self._data_id_map[data_id] = data_id | ||
| def put(self, data_id, data): | ||
| """ | ||
| Put `data` to internal dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| data : object | ||
| Data to be put. | ||
| """ | ||
| self._data_map[data_id] = data | ||
| self.maybe_update_data_id_map(data_id) | ||
| def put_data_owner(self, data_id, rank): | ||
| """ | ||
| Put data location (owner rank) to internal dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| rank : int | ||
| Rank number where the data resides. | ||
| """ | ||
| self._data_owner_map[data_id] = rank | ||
| self.maybe_update_data_id_map(data_id) | ||
| def get(self, data_id): | ||
| """ | ||
| Get the data from a local dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| object | ||
| Return local data associated with `data_id`. | ||
| """ | ||
| return self._data_map[data_id] | ||
| def get_data_owner(self, data_id): | ||
| """ | ||
| Get the data owner rank. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| int | ||
| Rank number where the data resides. | ||
| """ | ||
| return self._data_owner_map[data_id] | ||
| def contains(self, data_id): | ||
| """ | ||
| Check if the data associated with `data_id` exists in a local dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| bool | ||
| Return the status if an object exist in local dictionary. | ||
| """ | ||
| return data_id in self._data_map | ||
| def contains_data_owner(self, data_id): | ||
| """ | ||
| Check if the data location info associated with `data_id` exists in a local dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| bool | ||
| Return the ``True`` status if an object location is known. | ||
| """ | ||
| return data_id in self._data_owner_map | ||
| def clear(self, cleanup_list): | ||
| """ | ||
| Clear "strong" references to data IDs from `cleanup_list`. | ||
| Parameters | ||
| ---------- | ||
| cleanup_list : list | ||
| List of data IDs. | ||
| Notes | ||
| ----- | ||
| The actual data will be collected later when there is no weak or | ||
| strong reference to data in the current worker. | ||
| """ | ||
| for data_id in cleanup_list: | ||
| self._data_id_map.pop(data_id, None) | ||
| def generate_data_id(self, gc): | ||
| """ | ||
| Generate unique ``MpiDataID`` instance. | ||
| Parameters | ||
| ---------- | ||
| gc : unidist.core.backends.mpi.core.executor.GarbageCollector | ||
| Local garbage collector reference. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.mpi.core.common.MpiDataID | ||
| Unique data ID instance. | ||
| """ | ||
| data_id = common.MpiDataID( | ||
| communication.MPIState.get_instance().global_rank, self._data_id_counter, gc | ||
| ) | ||
| self._data_id_counter += 1 | ||
| return data_id | ||
| def generate_output_data_id(self, dest_rank, gc, num_returns=1): | ||
| """ | ||
| Generate unique list of ``unidist.core.backends.mpi.core.common.MpiDataID`` instance. | ||
| Parameters | ||
| ---------- | ||
| dest_rank : int | ||
| Ranks number where generated list will be located. | ||
| gc : unidist.core.backends.mpi.core.executor.GarbageCollector | ||
| Local garbage collector reference. | ||
| num_returns : int, default: 1 | ||
| Generated list size. | ||
| Returns | ||
| ------- | ||
| list | ||
| A list of unique ``MpiDataID`` instances. | ||
| """ | ||
| if num_returns == 1: | ||
| output_ids = self.generate_data_id(gc) | ||
| self.put_data_owner(output_ids, dest_rank) | ||
| elif num_returns == 0: | ||
| output_ids = None | ||
| else: | ||
| output_ids = [None] * num_returns | ||
| for i in range(num_returns): | ||
| output_id = self.generate_data_id(gc) | ||
| output_ids[i] = output_id | ||
| self.put_data_owner(output_id, dest_rank) | ||
| return output_ids | ||
| def cache_send_info(self, data_id, rank): | ||
| """ | ||
| Save communication event for this `data_id` and rank. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ``ID`` to data. | ||
| rank : int | ||
| Rank number where the data was sent. | ||
| """ | ||
| if data_id in self._sent_data_map: | ||
| self._sent_data_map[data_id].add(rank) | ||
| else: | ||
| self._sent_data_map[data_id] = set([rank]) | ||
| def is_already_sent(self, data_id, rank): | ||
| """ | ||
| Check if communication event on this `data_id` and rank happened. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data | ||
| rank : int | ||
| Rank number to check. | ||
| Returns | ||
| ------- | ||
| bool | ||
| ``True`` if communication event already happened. | ||
| """ | ||
| return (data_id in self._sent_data_map) and ( | ||
| rank in self._sent_data_map[data_id] | ||
| ) | ||
| def cache_serialized_data(self, data_id, data): | ||
| """ | ||
| Save serialized object for this `data_id`. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| data : object | ||
| Serialized data to cache. | ||
| """ | ||
| self._serialization_cache[data_id] = data | ||
| self.maybe_update_data_id_map(data_id) | ||
| def is_already_serialized(self, data_id): | ||
| """ | ||
| Check if the data on this `data_id` is already serialized. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| bool | ||
| ``True`` if the data is already serialized. | ||
| """ | ||
| return data_id in self._serialization_cache | ||
| def get_serialized_data(self, data_id): | ||
| """ | ||
| Get serialized data on this `data_id`. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| object | ||
| Cached serialized data associated with `data_id`. | ||
| """ | ||
| return self._serialization_cache[data_id] |
Sorry, the diff of this file is too big to display
| /* | ||
| * Copyright (C) 2021-2023 Modin authors | ||
| * | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
| #include "memory.h" | ||
| #include <cstring> | ||
| #include <thread> | ||
| #include <vector> | ||
| namespace unidist { | ||
| uint8_t *pointer_logical_and(const uint8_t *address, uintptr_t bits) { | ||
| uintptr_t value = reinterpret_cast<uintptr_t>(address); | ||
| return reinterpret_cast<uint8_t *>(value & bits); | ||
| } | ||
| void fill(int64_t *buff, int64_t size, int64_t value){ | ||
| std::fill(buff, buff+size, value); | ||
| } | ||
| void parallel_memcopy(uint8_t *dst, | ||
| const uint8_t *src, | ||
| int64_t nbytes, | ||
| uintptr_t block_size, | ||
| int num_threads) { | ||
| std::vector<std::thread> threadpool(num_threads); | ||
| uint8_t *left = pointer_logical_and(src + block_size - 1, ~(block_size - 1)); | ||
| uint8_t *right = pointer_logical_and(src + nbytes, ~(block_size - 1)); | ||
| int64_t num_blocks = (right - left) / block_size; | ||
| // Update right address | ||
| right = right - (num_blocks % num_threads) * block_size; | ||
| // Now we divide these blocks between available threads. The remainder is | ||
| // handled on the main thread. | ||
| int64_t chunk_size = (right - left) / num_threads; | ||
| int64_t prefix = left - src; | ||
| int64_t suffix = src + nbytes - right; | ||
| // Now the data layout is | prefix | k * num_threads * block_size | suffix |. | ||
| // We have chunk_size = k * block_size, therefore the data layout is | ||
| // | prefix | num_threads * chunk_size | suffix |. | ||
| // Each thread gets a "chunk" of k blocks. | ||
| // Start all threads first and handle leftovers while threads run. | ||
| for (int i = 0; i < num_threads; i++) { | ||
| threadpool[i] = std::thread( | ||
| memcpy, dst + prefix + i * chunk_size, left + i * chunk_size, chunk_size); | ||
| } | ||
| std::memcpy(dst, src, prefix); | ||
| std::memcpy(dst + prefix + num_threads * chunk_size, right, suffix); | ||
| for (auto &t : threadpool) { | ||
| if (t.joinable()) { | ||
| t.join(); | ||
| } | ||
| } | ||
| } | ||
| } // namespace unidist |
| /* | ||
| * Copyright (C) 2021-2023 Modin authors | ||
| * | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
| #ifndef MEMORY_H | ||
| #define MEMORY_H | ||
| #include <stdint.h> | ||
| namespace unidist { | ||
| // A helper function for doing memcpy with multiple threads. This is required | ||
| // to saturate the memory bandwidth of modern cpus. | ||
| void parallel_memcopy(uint8_t *dst, | ||
| const uint8_t *src, | ||
| int64_t nbytes, | ||
| uintptr_t block_size, | ||
| int num_threads); | ||
| void fill(int64_t *buff, int64_t size, int64_t value); | ||
| } // namespace unidist | ||
| #endif // MEMORY_H |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """MPI backend functionality related to `monitor` process.""" |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Monitoring process.""" | ||
| try: | ||
| import mpi4py | ||
| except ImportError: | ||
| raise ImportError( | ||
| "Missing dependency 'mpi4py'. Use pip or conda to install it." | ||
| ) from None | ||
| import unidist.core.backends.mpi.core.common as common | ||
| import unidist.core.backends.mpi.core.communication as communication | ||
| from unidist.core.backends.mpi.core.monitor.shared_memory_manager import ( | ||
| SharedMemoryManager, | ||
| ) | ||
| from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore | ||
| # TODO: Find a way to move this after all imports | ||
| mpi4py.rc(recv_mprobe=False, initialize=False) | ||
| from mpi4py import MPI # noqa: E402 | ||
| mpi_state = communication.MPIState.get_instance() | ||
| logger_name = "monitor_{}".format(mpi_state.global_rank if mpi_state is not None else 0) | ||
| log_file = "{}.log".format(logger_name) | ||
| monitor_logger = common.get_logger(logger_name, log_file) | ||
| class TaskCounter: | ||
| __instance = None | ||
| def __init__(self): | ||
| self.task_counter = 0 | ||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get instance of ``TaskCounter``. | ||
| Returns | ||
| ------- | ||
| TaskCounter | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = TaskCounter() | ||
| return cls.__instance | ||
| def increment(self): | ||
| """Increment task counter by one.""" | ||
| self.task_counter += 1 | ||
| class DataIDTracker: | ||
| """ | ||
| Class that keeps track of all completed (ready) data IDs. | ||
| """ | ||
| __instance = None | ||
| def __init__(self): | ||
| self.completed_data_ids = set() | ||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get instance of ``DataIDTracker``. | ||
| Returns | ||
| ------- | ||
| DataIDTracker | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = DataIDTracker() | ||
| return cls.__instance | ||
| def add_to_completed(self, data_ids): | ||
| """ | ||
| Add the given data IDs to the set of completed (ready) data IDs. | ||
| Parameters | ||
| ---------- | ||
| data_ids : list | ||
| List of data IDs to be added to the set of completed (ready) data IDs. | ||
| """ | ||
| self.completed_data_ids.update(data_ids) | ||
| class WaitHandler: | ||
| """ | ||
| Class that handles wait requests. | ||
| """ | ||
| __instance = None | ||
| def __init__(self): | ||
| self.completed_data_ids = set() | ||
| self.awaited_data_ids = [] | ||
| self.ready = [] | ||
| self.num_returns = 0 | ||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get instance of ``WaitHandler``. | ||
| Returns | ||
| ------- | ||
| WaitHandler | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = WaitHandler() | ||
| return cls.__instance | ||
| def add_wait_request(self, awaited_data_ids, num_returns): | ||
| """ | ||
| Add a wait request for a list of data IDs and the number of data IDs to be awaited. | ||
| Parameters | ||
| ---------- | ||
| awaited_data_ids : list | ||
| List of data IDs to be awaited. | ||
| num_returns : int | ||
| The number of ``DataID``-s that should be returned as ready. | ||
| """ | ||
| self.awaited_data_ids = awaited_data_ids | ||
| self.num_returns = num_returns | ||
| def process_wait_requests(self): | ||
| """ | ||
| Check if wait requests are pending to be processed. | ||
| Process pending wait requests and send the data_ids to the requester | ||
| if number of data IDs that are ready are equal to the num_returns. | ||
| """ | ||
| data_id_tracker = DataIDTracker.get_instance() | ||
| i = 0 | ||
| if self.awaited_data_ids: | ||
| while i < len(self.awaited_data_ids): | ||
| data_id = self.awaited_data_ids[i] | ||
| if data_id in data_id_tracker.completed_data_ids: | ||
| self.ready.append(data_id) | ||
| self.awaited_data_ids.remove(data_id) | ||
| if len(self.ready) == self.num_returns: | ||
| operation_data = { | ||
| "ready": self.ready, | ||
| "not_ready": self.awaited_data_ids, | ||
| } | ||
| communication.mpi_send_object( | ||
| communication.MPIState.get_instance().global_comm, | ||
| operation_data, | ||
| communication.MPIRank.ROOT, | ||
| ) | ||
| self.ready = [] | ||
| self.awaited_data_ids = [] | ||
| else: | ||
| i += 1 | ||
| def monitor_loop(): | ||
| """ | ||
| Infinite monitor operations processing loop. | ||
| Tracks the number of executed tasks and completed (ready) data IDs. | ||
| Notes | ||
| ----- | ||
| The loop exits on special cancelation operation. | ||
| ``unidist.core.backends.mpi.core.common.Operations`` defines a set of supported operations. | ||
| """ | ||
| task_counter = TaskCounter.get_instance() | ||
| mpi_state = communication.MPIState.get_instance() | ||
| wait_handler = WaitHandler.get_instance() | ||
| data_id_tracker = DataIDTracker.get_instance() | ||
| shared_store = SharedObjectStore.get_instance() | ||
| shm_manager = SharedMemoryManager() | ||
| # Barrier to check if monitor process is ready to start the communication loop | ||
| mpi_state.global_comm.Barrier() | ||
| monitor_logger.debug("Monitor loop started") | ||
| # Once all workers excluding ``Root`` and ``Monitor`` ranks are ready to shutdown, | ||
| # ``Monitor` sends the shutdown signal to every worker, as well as notifies ``Root`` that | ||
| # it can exit the program. | ||
| workers_ready_to_shutdown = [] | ||
| shutdown_workers = False | ||
| while True: | ||
| # Listen receive operation from any source | ||
| operation_type, source_rank = communication.mpi_recv_operation( | ||
| mpi_state.global_comm | ||
| ) | ||
| monitor_logger.debug( | ||
| f"common.Operation processing - {operation_type} from {source_rank} rank" | ||
| ) | ||
| # Proceed the request | ||
| if operation_type == common.Operation.TASK_DONE: | ||
| task_counter.increment() | ||
| output_data_ids = communication.mpi_recv_object( | ||
| mpi_state.global_comm, source_rank | ||
| ) | ||
| data_id_tracker.add_to_completed(output_data_ids) | ||
| wait_handler.process_wait_requests() | ||
| elif operation_type == common.Operation.WAIT: | ||
| # TODO: WAIT request can be received from several workers, | ||
| # but not only from master. Handle this case when requested. | ||
| operation_data = communication.mpi_recv_object( | ||
| mpi_state.global_comm, source_rank | ||
| ) | ||
| awaited_data_ids = operation_data["data_ids"] | ||
| num_returns = operation_data["num_returns"] | ||
| wait_handler.add_wait_request(awaited_data_ids, num_returns) | ||
| wait_handler.process_wait_requests() | ||
| elif operation_type == common.Operation.GET_TASK_COUNT: | ||
| # We use a blocking send here because the receiver is waiting for the result. | ||
| communication.mpi_send_object( | ||
| mpi_state.global_comm, | ||
| task_counter.task_counter, | ||
| source_rank, | ||
| ) | ||
| elif operation_type == common.Operation.RESERVE_SHARED_MEMORY: | ||
| request = communication.mpi_recv_object(mpi_state.global_comm, source_rank) | ||
| reservation_info = shm_manager.get(request["id"]) | ||
| if reservation_info is None: | ||
| reservation_info = shm_manager.put(request["id"], request["size"]) | ||
| is_first_request = True | ||
| else: | ||
| is_first_request = False | ||
| communication.mpi_send_object( | ||
| mpi_state.global_comm, | ||
| data={**reservation_info, "is_first_request": is_first_request}, | ||
| dest_rank=source_rank, | ||
| ) | ||
| elif operation_type == common.Operation.REQUEST_SHARED_DATA: | ||
| info_package = communication.mpi_recv_object( | ||
| mpi_state.global_comm, source_rank | ||
| ) | ||
| data_id = info_package["id"] | ||
| if data_id is None: | ||
| raise ValueError("Requested DataID is None") | ||
| reservation_info = shm_manager.get(data_id) | ||
| if reservation_info is None: | ||
| raise RuntimeError(f"The monitor does not know the data id {data_id}") | ||
| sh_buf = shared_store.get_shared_buffer( | ||
| reservation_info["first_index"], reservation_info["last_index"] | ||
| ) | ||
| communication.mpi_send_buffer( | ||
| mpi_state.global_comm, | ||
| sh_buf, | ||
| dest_rank=source_rank, | ||
| data_type=MPI.BYTE, | ||
| ) | ||
| elif operation_type == common.Operation.CLEANUP: | ||
| cleanup_list = communication.recv_serialized_data( | ||
| mpi_state.global_comm, source_rank | ||
| ) | ||
| cleanup_list = [common.MpiDataID(*tpl) for tpl in cleanup_list] | ||
| shm_manager.clear(cleanup_list) | ||
| elif operation_type == common.Operation.READY_TO_SHUTDOWN: | ||
| workers_ready_to_shutdown.append(source_rank) | ||
| shutdown_workers = len(workers_ready_to_shutdown) == len(mpi_state.workers) | ||
| elif operation_type == common.Operation.SHUTDOWN: | ||
| SharedObjectStore.get_instance().finalize() | ||
| if not MPI.Is_finalized(): | ||
| MPI.Finalize() | ||
| break # leave event loop and shutdown monitoring | ||
| else: | ||
| raise ValueError(f"Unsupported operation: {operation_type}") | ||
| if shutdown_workers: | ||
| for rank_id in mpi_state.workers + mpi_state.monitor_processes: | ||
| if rank_id != mpi_state.global_rank: | ||
| communication.mpi_send_operation( | ||
| mpi_state.global_comm, | ||
| common.Operation.SHUTDOWN, | ||
| rank_id, | ||
| ) | ||
| communication.mpi_send_object( | ||
| mpi_state.global_comm, | ||
| common.Operation.SHUTDOWN, | ||
| communication.MPIRank.ROOT, | ||
| ) | ||
| SharedObjectStore.get_instance().finalize() | ||
| if not MPI.Is_finalized(): | ||
| MPI.Finalize() | ||
| break # leave event loop and shutdown monitoring |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """`SharedMemoryManager` functionality.""" | ||
| from array import array | ||
| try: | ||
| import mpi4py | ||
| except ImportError: | ||
| raise ImportError( | ||
| "Missing dependency 'mpi4py'. Use pip or conda to install it." | ||
| ) from None | ||
| from unidist.core.backends.mpi.core import communication, common | ||
| from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore | ||
| from unidist.core.backends.mpi.utils import ImmutableDict | ||
| # TODO: Find a way to move this after all imports | ||
| mpi4py.rc(recv_mprobe=False, initialize=False) | ||
| from mpi4py import MPI # noqa: E402 | ||
| class FreeMemoryRange: | ||
| """ | ||
| Class that helps keep track of free space in shared memory. | ||
| Parameters | ||
| ---------- | ||
| range_len : int | ||
| Memory length. | ||
| """ | ||
| def __init__(self, range_len): | ||
| self.range = [[0, range_len]] | ||
| def occupy(self, count=1): | ||
| """ | ||
| Take the place of a certain length in memory. | ||
| Parameters | ||
| ---------- | ||
| count : int | ||
| Required number of elements in memory. | ||
| Returns | ||
| ------- | ||
| int | ||
| First index in memory. | ||
| int | ||
| Last index in memory. | ||
| """ | ||
| first_index = None | ||
| last_index = None | ||
| for i in range(len(self.range)): | ||
| if count <= self.range[i][1] - self.range[i][0]: | ||
| first_index = self.range[i][0] | ||
| last_index = first_index + count | ||
| if self.range[i][1] == last_index: | ||
| self.range = self.range[:i] + self.range[i + 1 :] | ||
| else: | ||
| self.range[i][0] = last_index | ||
| break | ||
| return first_index, last_index | ||
| def release(self, first_index, last_index): | ||
| """ | ||
| Free up memory space. | ||
| Parameters | ||
| ---------- | ||
| first_index : int | ||
| First index in memory. | ||
| last_index : int | ||
| Last index in memory (not inclusive). | ||
| """ | ||
| if len(self.range) == 0: | ||
| self.range.append([first_index, last_index]) | ||
| elif self.range[-1][1] < first_index: | ||
| self.range.append([first_index, last_index]) | ||
| else: | ||
| for i in range(len(self.range)): | ||
| if self.range[i][0] == last_index: | ||
| if self.range[i - 1][1] == first_index: | ||
| self.range[i - 1][1] = self.range[i][1] | ||
| self.range = self.range[:i] + self.range[i + 1 :] | ||
| else: | ||
| self.range[i][0] = first_index | ||
| break | ||
| if self.range[i][1] == first_index: | ||
| if len(self.range) > i + 1 and self.range[i + 1][0] == last_index: | ||
| self.range[i + 1][0] = self.range[i][0] | ||
| self.range = self.range[:i] + self.range[i + 1 :] | ||
| else: | ||
| self.range[i][1] = last_index | ||
| break | ||
| if self.range[i][0] > last_index: | ||
| self.range = ( | ||
| self.range[:i] + [[first_index, last_index]] + self.range[i:] | ||
| ) | ||
| break | ||
| class SharedMemoryManager: | ||
| """ | ||
| Class that helps manage shared memory. | ||
| """ | ||
| def __init__(self): | ||
| self.shared_store = None | ||
| if common.is_shared_memory_supported(): | ||
| self.shared_store = SharedObjectStore.get_instance() | ||
| self._reservation_info = {} | ||
| self.free_memory = FreeMemoryRange(self.shared_store.shared_memory_size) | ||
| self.free_service_indexes = FreeMemoryRange( | ||
| self.shared_store.service_info_max_count | ||
| ) | ||
| self.pending_cleanup = [] | ||
| self.monitor_comm = None | ||
| mpi_state = communication.MPIState.get_instance() | ||
| monitor_group = mpi_state.global_comm.Get_group().Incl( | ||
| mpi_state.monitor_processes | ||
| ) | ||
| self.monitor_comm = mpi_state.global_comm.Create_group(monitor_group) | ||
| def get(self, data_id): | ||
| """ | ||
| Get the reservation information for the `data_id`. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| dict or None | ||
| Reservation information. | ||
| Notes | ||
| ----- | ||
| The `dict` is returned if a reservation has been specified, otherwise `False` is returned. | ||
| """ | ||
| if self.shared_store is None: | ||
| raise RuntimeError( | ||
| "`SharedMemoryManager` cannot be used if the shared object storage is not enabled." | ||
| ) | ||
| if data_id not in self._reservation_info: | ||
| return None | ||
| return self._reservation_info[data_id] | ||
| def put(self, data_id, memory_len): | ||
| """ | ||
| Reserve memory for the `data_id`. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| memory_len : int | ||
| Required memory length. | ||
| Returns | ||
| ------- | ||
| dict | ||
| Reservation information. | ||
| """ | ||
| if self.shared_store is None: | ||
| raise RuntimeError( | ||
| "`SharedMemoryManager` cannot be used if the shared object storage is not enabled." | ||
| ) | ||
| first_index, last_index = self.free_memory.occupy(memory_len) | ||
| service_index, _ = self.free_service_indexes.occupy(SharedObjectStore.INFO_SIZE) | ||
| if first_index is None: | ||
| raise MemoryError("Overflow memory") | ||
| if service_index is None: | ||
| raise MemoryError("Overflow service memory") | ||
| reservation_info = ImmutableDict( | ||
| { | ||
| "first_index": first_index, | ||
| "last_index": last_index, | ||
| "service_index": service_index, | ||
| } | ||
| ) | ||
| self._reservation_info[data_id] = reservation_info | ||
| return reservation_info | ||
| def clear(self, data_id_list): | ||
| """ | ||
| Clear shared memory for the list of `DataID` if possible. | ||
| Parameters | ||
| ---------- | ||
| data_id_list : list | ||
| List of `DataID`. | ||
| """ | ||
| if self.shared_store is None: | ||
| return | ||
| cleanup_list = self.pending_cleanup + data_id_list | ||
| self.pending_cleanup = [] | ||
| has_refs = array( | ||
| "B", | ||
| [ | ||
| 1 | ||
| if data_id in self._reservation_info | ||
| and self.shared_store.get_ref_number( | ||
| data_id, self._reservation_info[data_id]["service_index"] | ||
| ) | ||
| > 0 | ||
| else 0 | ||
| for data_id in cleanup_list | ||
| ], | ||
| ) | ||
| if self.monitor_comm is not None: | ||
| all_refs = array("B", [0] * len(has_refs)) | ||
| self.monitor_comm.Allreduce(has_refs, all_refs, MPI.MAX) | ||
| else: | ||
| all_refs = has_refs | ||
| for data_id, referers in zip(cleanup_list, all_refs): | ||
| if referers == 0: | ||
| if data_id in self._reservation_info: | ||
| reservation_info = self._reservation_info[data_id] | ||
| self.shared_store.delete_service_info( | ||
| data_id, reservation_info["service_index"] | ||
| ) | ||
| self.free_service_indexes.release( | ||
| reservation_info["service_index"], | ||
| reservation_info["service_index"] + SharedObjectStore.INFO_SIZE, | ||
| ) | ||
| self.free_memory.release( | ||
| reservation_info["first_index"], reservation_info["last_index"] | ||
| ) | ||
| del self._reservation_info[data_id] | ||
| else: | ||
| self.pending_cleanup.append(data_id) |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """`SharedObjectStore` functionality.""" | ||
| import os | ||
| import sys | ||
| import time | ||
| import warnings | ||
| import psutil | ||
| import weakref | ||
| from unidist.core.backends.mpi.core._memory import parallel_memcopy, fill | ||
| from unidist.core.backends.mpi.utils import ImmutableDict | ||
| try: | ||
| import mpi4py | ||
| except ImportError: | ||
| raise ImportError( | ||
| "Missing dependency 'mpi4py'. Use pip or conda to install it." | ||
| ) from None | ||
| from unidist.config.backends.mpi.envvars import ( | ||
| MpiSharedObjectStoreMemory, | ||
| MpiSharedServiceMemory, | ||
| MpiSharedObjectStoreThreshold, | ||
| MpiBackoff, | ||
| ) | ||
| from unidist.core.backends.mpi.core import common, communication | ||
| from unidist.core.backends.mpi.core.serialization import ( | ||
| deserialize_complex_data, | ||
| ) | ||
| # TODO: Find a way to move this after all imports | ||
| mpi4py.rc(recv_mprobe=False, initialize=False) | ||
| from mpi4py import MPI # noqa: E402 | ||
| class WinLock: | ||
| """ | ||
| Class that helps to synchronize the write to shared memory. | ||
| Parameters | ||
| ---------- | ||
| win : MPI.win | ||
| The MPI window that was used to allocate the shared memory. | ||
| Notes | ||
| ----- | ||
| This class should be used as a context manager. | ||
| """ | ||
| def __init__(self, win): | ||
| self.win = win | ||
| def __enter__(self): | ||
| """Lock the current MPI.Window for other processes.""" | ||
| self.win.Lock(communication.MPIRank.MONITOR) | ||
| def __exit__(self, *args): | ||
| """Unlock the current MPI.Window for other processes.""" | ||
| self.win.Unlock(communication.MPIRank.MONITOR) | ||
| class SharedObjectStore: | ||
| """ | ||
| Class that provides access to data in shared memory. | ||
| Notes | ||
| ----- | ||
| This class initializes and manages shared memory. | ||
| """ | ||
| __instance = None | ||
| # Service constants defining the structure of the service buffer | ||
| # The amount of service information for one data object in shared memory. | ||
| INFO_SIZE = 4 | ||
| # Index of service information for the first part of the DataID. | ||
| WORKER_ID_INDEX = 0 | ||
| # Index of service information for the second part of the DataID. | ||
| DATA_NUMBER_INDEX = 1 | ||
| # Index of service information for the first shared memory index where the data is located. | ||
| FIRST_DATA_INDEX = 2 | ||
| # Index of service information to count the number of data references, | ||
| # which shows how many processes are using this data. | ||
| REFERENCES_NUMBER = 3 | ||
| def __init__(self): | ||
| # The `MPI.Win` object to manage shared memory for data | ||
| self.win = None | ||
| # The `MPI.Win` object to manage shared memory for service purposes | ||
| self.service_win = None | ||
| # `MPI.memory` object for reading/writing data from/to shared memory | ||
| self.shared_buffer = None | ||
| # `memoryview` object for reading/writing data from/to service shared memory. | ||
| # Service shared buffer includes service information about written shared data. | ||
| # The service info is set by the worker who sends the data to shared memory | ||
| # and is removed by the monitor if the data is cleared. | ||
| # The service info indicates that the current data is written to shared memory | ||
| # and shows the actual location and number of references. | ||
| self.service_shared_buffer = None | ||
| # Length of shared memory buffer in bytes | ||
| self.shared_memory_size = None | ||
| # Length of service shared memory buffer in items | ||
| self.service_info_max_count = None | ||
| mpi_state = communication.MPIState.get_instance() | ||
| # Initialize all properties above | ||
| if common.is_shared_memory_supported(raise_warning=mpi_state.is_root_process()): | ||
| self._allocate_shared_memory() | ||
| # Logger will be initialized after `communicator.MPIState` | ||
| self.logger = None | ||
| # Shared memory range {DataID: dict} | ||
| # Shared information is the necessary information to properly deserialize data from shared memory. | ||
| self._shared_info = weakref.WeakKeyDictionary() | ||
| # The list of `weakref.finalize` which should be canceled before closing the shared memory. | ||
| self.finalizers = [] | ||
| def _get_allowed_memory_size(self): | ||
| """ | ||
| Get allowed memory size for allocate shared memory. | ||
| Returns | ||
| ------- | ||
| int | ||
| The number of bytes available to allocate shared memory. | ||
| """ | ||
| virtual_memory = psutil.virtual_memory().total | ||
| if sys.platform.startswith("linux"): | ||
| shm_fd = os.open("/dev/shm", os.O_RDONLY) | ||
| try: | ||
| shm_stats = os.fstatvfs(shm_fd) | ||
| system_memory = shm_stats.f_bsize * shm_stats.f_bavail | ||
| if system_memory / (virtual_memory / 2) < 0.99: | ||
| warnings.warn( | ||
| f"The size of /dev/shm is too small ({system_memory} bytes). The required size " | ||
| + f"at least half of RAM ({virtual_memory // 2} bytes). Please, delete files in /dev/shm or " | ||
| + "increase size of /dev/shm with --shm-size in Docker." | ||
| ) | ||
| finally: | ||
| os.close(shm_fd) | ||
| else: | ||
| system_memory = virtual_memory | ||
| return system_memory | ||
| def _allocate_shared_memory(self): | ||
| """ | ||
| Allocate shared memory. | ||
| """ | ||
| mpi_state = communication.MPIState.get_instance() | ||
| shared_object_store_memory = MpiSharedObjectStoreMemory.get() | ||
| shared_service_memory = MpiSharedServiceMemory.get() | ||
| # Use only 95% of available shared memory because | ||
| # the rest is needed for intermediate shared buffers | ||
| # handled by MPI itself for communication of small messages. | ||
| allowed_memory_size = int(self._get_allowed_memory_size() * 0.95) | ||
| if shared_object_store_memory is not None: | ||
| if shared_service_memory is not None: | ||
| self.shared_memory_size = shared_object_store_memory | ||
| self.service_memory_size = shared_service_memory | ||
| else: | ||
| self.shared_memory_size = shared_object_store_memory | ||
| # To avoid division by 0 | ||
| if MpiSharedObjectStoreThreshold.get() > 0: | ||
| self.service_memory_size = min( | ||
| # allowed memory size for service buffer | ||
| allowed_memory_size - self.shared_memory_size, | ||
| # maximum amount of memory required for the service buffer | ||
| (self.shared_memory_size // MpiSharedObjectStoreThreshold.get()) | ||
| * (self.INFO_SIZE * MPI.LONG.size), | ||
| ) | ||
| else: | ||
| self.service_memory_size = ( | ||
| allowed_memory_size - self.shared_memory_size | ||
| ) | ||
| else: | ||
| if shared_service_memory is not None: | ||
| self.service_memory_size = shared_service_memory | ||
| self.shared_memory_size = allowed_memory_size - self.service_memory_size | ||
| else: | ||
| A = allowed_memory_size | ||
| B = MpiSharedObjectStoreThreshold.get() | ||
| C = self.INFO_SIZE * MPI.LONG.size | ||
| # "x" is shared_memory_size | ||
| # "y" is service_memory_size | ||
| # requirements: | ||
| # x + y = A | ||
| # y = min[ (x/B) * C, 0.01 * A ] | ||
| # calculation results: | ||
| # if B > 99 * C: | ||
| # x = (A * B) / (B + C) | ||
| # y = (A * C) / (B + C) | ||
| # else: | ||
| # x = 0.99 * A | ||
| # y = 0.01 * A | ||
| if B > 99 * C: | ||
| self.shared_memory_size = (A * B) // (B + C) | ||
| self.service_memory_size = (A * C) // (B + C) | ||
| else: | ||
| self.shared_memory_size = int(0.99 * A) | ||
| self.service_memory_size = int(0.01 * A) | ||
| if self.shared_memory_size > allowed_memory_size: | ||
| raise ValueError( | ||
| "Memory for shared object storage cannot be allocated " | ||
| + "because the value set to `MpiSharedObjectStoreMemory` exceeds the available memory." | ||
| ) | ||
| if self.service_memory_size > allowed_memory_size: | ||
| raise ValueError( | ||
| "Memory for shared service storage cannot be allocated " | ||
| + "because the value set to `MpiSharedServiceMemory` exceeds the available memory." | ||
| ) | ||
| if self.shared_memory_size + self.service_memory_size > allowed_memory_size: | ||
| raise ValueError( | ||
| "The sum of the `MpiSharedObjectStoreMemory` and `MpiSharedServiceMemory` values is greater " | ||
| + "than the available amount of memory." | ||
| ) | ||
| # Shared memory is allocated only once by the monitor process. | ||
| info = MPI.Info.Create() | ||
| info.Set("alloc_shared_noncontig", "true") | ||
| self.win = MPI.Win.Allocate_shared( | ||
| self.shared_memory_size * MPI.BYTE.size | ||
| if mpi_state.is_monitor_process() | ||
| else 0, | ||
| MPI.BYTE.size, | ||
| comm=mpi_state.host_comm, | ||
| info=info, | ||
| ) | ||
| self.shared_buffer, _ = self.win.Shared_query(communication.MPIRank.MONITOR) | ||
| self.service_info_max_count = ( | ||
| self.service_memory_size | ||
| // (self.INFO_SIZE * MPI.LONG.size) | ||
| * self.INFO_SIZE | ||
| ) | ||
| self.service_win = MPI.Win.Allocate_shared( | ||
| self.service_info_max_count * MPI.LONG.size | ||
| if mpi_state.is_monitor_process() | ||
| else 0, | ||
| MPI.LONG.size, | ||
| comm=mpi_state.host_comm, | ||
| info=info, | ||
| ) | ||
| service_buffer, _ = self.service_win.Shared_query(communication.MPIRank.MONITOR) | ||
| self.service_shared_buffer = memoryview(service_buffer).cast("l") | ||
| # Set -1 to the service buffer because 0 is a valid value and may be recognized by mistake. | ||
| if mpi_state.is_monitor_process(): | ||
| fill(self.service_shared_buffer, -1) | ||
| def _parse_data_id(self, data_id): | ||
| """ | ||
| Parse `DataID` object to pair of int. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| The data identifier to be converted to a numerical form | ||
| Returns | ||
| ------- | ||
| tuple | ||
| Pair of int that define the DataID | ||
| """ | ||
| splited_id = str(data_id).replace(")", "").split("_") | ||
| return int(splited_id[1]), int(splited_id[3]) | ||
| def _increment_ref_number(self, data_id, service_index): | ||
| """ | ||
| Increment the number of references to indicate to the monitor that this data is being used. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| service_index : int | ||
| The service buffer index. | ||
| Notes | ||
| ----- | ||
| This function create `weakref.finalizer' with decrement function which will be called after data_id collecting. | ||
| """ | ||
| if MPI.Is_finalized(): | ||
| return | ||
| if service_index is None: | ||
| raise KeyError( | ||
| "it is not possible to increment the reference number for this data_id because it is not part of the shared data" | ||
| ) | ||
| with WinLock(self.service_win): | ||
| prev_ref_number = self.service_shared_buffer[ | ||
| service_index + self.REFERENCES_NUMBER | ||
| ] | ||
| self.service_shared_buffer[service_index + self.REFERENCES_NUMBER] = ( | ||
| prev_ref_number + 1 | ||
| ) | ||
| self.logger.debug( | ||
| f"Rank {communication.MPIState.get_instance().global_rank}: Increment references number for {data_id} from {prev_ref_number} to {prev_ref_number + 1}" | ||
| ) | ||
| self.finalizers.append( | ||
| weakref.finalize( | ||
| data_id, self._decrement_ref_number, str(data_id), service_index | ||
| ) | ||
| ) | ||
| def _decrement_ref_number(self, data_id, service_index): | ||
| """ | ||
| Decrement the number of references to indicate to the monitor that this data is no longer used. | ||
| When references count is 0, it can be cleared. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| service_index : int | ||
| The service buffer index. | ||
| Notes | ||
| ----- | ||
| This function is called in `weakref.finalizer' after data_id collecting. | ||
| """ | ||
| # we must set service_index in args because the shared_info will be deleted before than this function is called | ||
| if MPI.Is_finalized(): | ||
| return | ||
| if self._check_service_info(data_id, service_index): | ||
| with WinLock(self.service_win): | ||
| prev_ref_number = self.service_shared_buffer[ | ||
| service_index + self.REFERENCES_NUMBER | ||
| ] | ||
| self.service_shared_buffer[service_index + self.REFERENCES_NUMBER] = ( | ||
| prev_ref_number - 1 | ||
| ) | ||
| self.logger.debug( | ||
| f"Rank {communication.MPIState.get_instance().global_rank}: Decrement references number for {data_id} from {prev_ref_number} to {prev_ref_number - 1}" | ||
| ) | ||
| def _put_service_info(self, service_index, data_id, first_index): | ||
| """ | ||
| Set service information about written shared data. | ||
| Parameters | ||
| ---------- | ||
| service_index : int | ||
| The service buffer index. | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| first_index : int | ||
| The first index of data in the shared buffer. | ||
| Notes | ||
| ----- | ||
| This information must be set after writing data to shared memory. | ||
| """ | ||
| worker_id, data_number = self._parse_data_id(data_id) | ||
| with WinLock(self.service_win): | ||
| self.service_shared_buffer[ | ||
| service_index + self.FIRST_DATA_INDEX | ||
| ] = first_index | ||
| self.service_shared_buffer[service_index + self.REFERENCES_NUMBER] = 1 | ||
| self.service_shared_buffer[ | ||
| service_index + self.DATA_NUMBER_INDEX | ||
| ] = data_number | ||
| self.service_shared_buffer[service_index + self.WORKER_ID_INDEX] = worker_id | ||
| self.finalizers.append( | ||
| weakref.finalize( | ||
| data_id, self._decrement_ref_number, str(data_id), service_index | ||
| ) | ||
| ) | ||
| def _check_service_info(self, data_id, service_index): | ||
| """ | ||
| Check if the `data_id` is in the shared memory on the current host. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| service_index : int | ||
| The service buffer index. | ||
| Returns | ||
| ------- | ||
| bool | ||
| Return the ``True`` status if `data_id` is in the shared memory. | ||
| Notes | ||
| ----- | ||
| This check ensures that the data is physically located in shared memory. | ||
| """ | ||
| worker_id, data_number = self._parse_data_id(data_id) | ||
| w_id = self.service_shared_buffer[service_index + self.WORKER_ID_INDEX] | ||
| d_id = self.service_shared_buffer[service_index + self.DATA_NUMBER_INDEX] | ||
| return w_id == worker_id and d_id == data_number | ||
| def _put_shared_info(self, data_id, shared_info): | ||
| """ | ||
| Put required information to deserialize `data_id`. | ||
| Parameters | ||
| ---------- | ||
| data_id : uunidist.core.backends.mpi.core.common.MpiDataID | ||
| shared_info : unidist.core.backends.mpi.utils.ImmutableDict | ||
| Information required for data deserialization | ||
| """ | ||
| if not isinstance(shared_info, ImmutableDict): | ||
| raise ValueError("Shared info should be immutable.") | ||
| if data_id not in self._shared_info: | ||
| self._shared_info[data_id] = shared_info | ||
| def _read_from_shared_buffer(self, data_id, shared_info): | ||
| """ | ||
| Read and deserialize data from the shared buffer. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| shared_info : dict | ||
| Information for correct deserialization. | ||
| Returns | ||
| ------- | ||
| object | ||
| Data for the current Id. | ||
| """ | ||
| buffer_lens = shared_info["raw_buffers_len"] | ||
| buffer_count = shared_info["buffer_count"] | ||
| s_data_len = shared_info["s_data_len"] | ||
| service_index = shared_info["service_index"] | ||
| first_index = self.service_shared_buffer[service_index + self.FIRST_DATA_INDEX] | ||
| s_data_last_index = first_index + s_data_len | ||
| s_data = self.shared_buffer[first_index:s_data_last_index].toreadonly() | ||
| prev_last_index = s_data_last_index | ||
| raw_buffers = [] | ||
| for raw_buffer_len in buffer_lens: | ||
| raw_last_index = prev_last_index + raw_buffer_len | ||
| raw_buffers.append( | ||
| self.shared_buffer[prev_last_index:raw_last_index].toreadonly() | ||
| ) | ||
| prev_last_index = raw_last_index | ||
| data = deserialize_complex_data(s_data, raw_buffers, buffer_count) | ||
| self.logger.debug( | ||
| f"Rank {communication.MPIState.get_instance().global_rank}: Get {data_id} from {first_index} to {prev_last_index}. Service index: {service_index}" | ||
| ) | ||
| return data | ||
| def _write_to_shared_buffer(self, data_id, reservation_data, serialized_data): | ||
| """ | ||
| Write serialized data to the shared buffer. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| reservation_data : dict | ||
| Information about the reserved space in shared memory for the current DataID. | ||
| serialized_data : dict | ||
| Serialized data. | ||
| Returns | ||
| ------- | ||
| dict | ||
| Information required for correct data deserialization | ||
| """ | ||
| first_index = reservation_data["first_index"] | ||
| last_index = reservation_data["last_index"] | ||
| service_index = reservation_data["service_index"] | ||
| s_data = serialized_data["s_data"] | ||
| raw_buffers = serialized_data["raw_buffers"] | ||
| buffer_count = serialized_data["buffer_count"] | ||
| s_data_len = len(s_data) | ||
| buffer_lens = [] | ||
| s_data_first_index = first_index | ||
| s_data_last_index = s_data_first_index + s_data_len | ||
| if s_data_last_index > last_index: | ||
| raise ValueError("Not enough shared space for data") | ||
| self.shared_buffer[s_data_first_index:s_data_last_index] = s_data | ||
| last_prev_index = s_data_last_index | ||
| for i, raw_buffer in enumerate(raw_buffers): | ||
| raw_buffer_first_index = last_prev_index | ||
| raw_buffer_len = len(raw_buffer) | ||
| last_prev_index = raw_buffer_first_index + len(raw_buffer) | ||
| if last_prev_index > last_index: | ||
| raise ValueError(f"Not enough shared space for {i} raw_buffer") | ||
| parallel_memcopy( | ||
| raw_buffer, | ||
| self.shared_buffer[raw_buffer_first_index:last_prev_index], | ||
| 6, | ||
| ) | ||
| buffer_lens.append(raw_buffer_len) | ||
| self.logger.debug( | ||
| f"Rank {communication.MPIState.get_instance().global_rank}: PUT {data_id} from {first_index} to {last_prev_index}. Service index: {service_index}" | ||
| ) | ||
| return common.MetadataPackage.get_shared_info( | ||
| data_id, s_data_len, buffer_lens, buffer_count, service_index | ||
| ) | ||
| def _sync_shared_memory_from_another_host( | ||
| self, comm, data_id, owner_rank, first_index, last_index, service_index | ||
| ): | ||
| """ | ||
| Receive shared data from another host including the owner's rank | ||
| Parameters | ||
| ---------- | ||
| comm : object | ||
| MPI communicator object. | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| Data identifier | ||
| owner_rank : int | ||
| Rank of the owner process. | ||
| first_index : int | ||
| First index in shared memory. | ||
| last_index : int | ||
| Last index in shared memory. | ||
| service_index : int | ||
| Service buffer index. | ||
| Notes | ||
| ----- | ||
| After writting data, service information should be set. | ||
| """ | ||
| sh_buf = self.get_shared_buffer(first_index, last_index) | ||
| # recv serialized data to shared memory | ||
| owner_monitor = ( | ||
| communication.MPIState.get_instance().get_monitor_by_worker_rank(owner_rank) | ||
| ) | ||
| communication.send_simple_operation( | ||
| comm, | ||
| operation_type=common.Operation.REQUEST_SHARED_DATA, | ||
| operation_data={"id": data_id}, | ||
| dest_rank=owner_monitor, | ||
| ) | ||
| communication.mpi_recv_buffer(comm, owner_monitor, sh_buf) | ||
| self.logger.debug( | ||
| f"Rank {communication.MPIState.get_instance().global_rank}: Sync_copy {data_id} from {owner_rank} rank. Put data from {first_index} to {last_index}. Service index: {service_index}" | ||
| ) | ||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get instance of ``SharedObjectStore``. | ||
| Returns | ||
| ------- | ||
| SharedObjectStore | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = SharedObjectStore() | ||
| if cls.__instance.logger is None: | ||
| logger_name = f"shared_store_{communication.MPIState.get_instance().host}" | ||
| cls.__instance.logger = common.get_logger(logger_name, f"{logger_name}.log") | ||
| return cls.__instance | ||
| def is_allocated(self): | ||
| """ | ||
| Check if the shared memory is allocated and ready to put data.x | ||
| Returns | ||
| ------- | ||
| bool | ||
| True ot False. | ||
| """ | ||
| return self.shared_buffer is not None | ||
| def should_be_shared(self, data): | ||
| """ | ||
| Check if data should be sent using shared memory. | ||
| Parameters | ||
| ---------- | ||
| data : dict | ||
| Serialized data to check its size. | ||
| Returns | ||
| ------- | ||
| bool | ||
| Return the ``True`` status if data should be sent using shared memory. | ||
| """ | ||
| data_size = len(data["s_data"]) + sum([len(buf) for buf in data["raw_buffers"]]) | ||
| return data_size > MpiSharedObjectStoreThreshold.get() | ||
| def contains(self, data_id): | ||
| """ | ||
| Check if the store contains the `data_id` information required to deserialize the data. | ||
| Returns | ||
| ------- | ||
| bool | ||
| Return the ``True`` status if shared store contains required information. | ||
| Notes | ||
| ----- | ||
| This check does not ensure that the data is physically located in shared memory. | ||
| """ | ||
| return data_id in self._shared_info | ||
| def get_shared_info(self, data_id): | ||
| """ | ||
| Get required information to correct deserialize `data_id` from shared memory. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| Returns | ||
| ------- | ||
| dict | ||
| Information required for data deserialization | ||
| """ | ||
| return self._shared_info[data_id] | ||
| def get_ref_number(self, data_id, service_index): | ||
| """ | ||
| Get current references count of data_id by service index. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| service_index : int | ||
| The service buffer index. | ||
| Returns | ||
| ------- | ||
| int | ||
| The number of references to this data_id | ||
| """ | ||
| # we must to set service_index in args because this function is called from monitor which can not known the shared_info | ||
| if not self._check_service_info(data_id, service_index): | ||
| return 0 | ||
| return self.service_shared_buffer[service_index + self.REFERENCES_NUMBER] | ||
| def get_shared_buffer(self, first_index, last_index): | ||
| """ | ||
| Get the requested range of shared memory | ||
| Parameters | ||
| ---------- | ||
| first_index : int | ||
| Start of the requested range. | ||
| last_index : int | ||
| End of the requested range. (excluding) | ||
| Notes | ||
| ----- | ||
| This function is used to synchronize shared memory between different hosts. | ||
| """ | ||
| return self.shared_buffer[first_index:last_index] | ||
| def delete_service_info(self, data_id, service_index): | ||
| """ | ||
| Delete service information for the current data Id. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| service_index : int | ||
| The service buffer index. | ||
| Notes | ||
| ----- | ||
| This function should be called by the monitor during the cleanup of shared data. | ||
| """ | ||
| with WinLock(self.service_win): | ||
| # Read actual value | ||
| old_worker_id = self.service_shared_buffer[ | ||
| service_index + self.WORKER_ID_INDEX | ||
| ] | ||
| old_data_id = self.service_shared_buffer[ | ||
| service_index + self.DATA_NUMBER_INDEX | ||
| ] | ||
| old_first_index = self.service_shared_buffer[ | ||
| service_index + self.FIRST_DATA_INDEX | ||
| ] | ||
| old_references_number = self.service_shared_buffer[ | ||
| service_index + self.REFERENCES_NUMBER | ||
| ] | ||
| # check if data_id is correct | ||
| if self._parse_data_id(data_id) == (old_worker_id, old_data_id): | ||
| self.service_shared_buffer[service_index + self.WORKER_ID_INDEX] = -1 | ||
| self.service_shared_buffer[service_index + self.DATA_NUMBER_INDEX] = -1 | ||
| self.service_shared_buffer[service_index + self.FIRST_DATA_INDEX] = -1 | ||
| self.service_shared_buffer[service_index + self.REFERENCES_NUMBER] = -1 | ||
| self.logger.debug( | ||
| f"Rank {communication.MPIState.get_instance().global_rank}: Clear {data_id}. Service index: {service_index} First index: {old_first_index} References number: {old_references_number}" | ||
| ) | ||
| else: | ||
| self.logger.debug( | ||
| f"Rank {communication.MPIState.get_instance().global_rank}: Did not clear {data_id}, because there are was written another data_id: Data_ID(rank_{old_worker_id}_id_{old_data_id})" | ||
| ) | ||
| self.logger.debug( | ||
| f"Service index: {service_index} First index: {old_first_index} References number: {old_references_number}" | ||
| ) | ||
| raise RuntimeError("Unexpected data_id for cleanup shared memory") | ||
| def put(self, data_id, serialized_data): | ||
| """ | ||
| Put data into shared memory. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| serialized_data : dict | ||
| Serialized data to put into the storage. | ||
| """ | ||
| mpi_state = communication.MPIState.get_instance() | ||
| data_size = len(serialized_data["s_data"]) + sum( | ||
| [len(buf) for buf in serialized_data["raw_buffers"]] | ||
| ) | ||
| # reserve shared memory | ||
| reservation_data = communication.send_reserve_operation( | ||
| mpi_state.global_comm, data_id, data_size | ||
| ) | ||
| service_index = reservation_data["service_index"] | ||
| first_index = reservation_data["first_index"] | ||
| # write into shared buffer | ||
| shared_info = self._write_to_shared_buffer( | ||
| data_id, reservation_data, serialized_data | ||
| ) | ||
| # put service info | ||
| self._put_service_info(service_index, data_id, first_index) | ||
| # put shared info | ||
| self._put_shared_info(data_id, shared_info) | ||
| def get(self, data_id, owner_rank, shared_info): | ||
| """ | ||
| Get data from another worker using shared memory. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| owner_rank : int | ||
| The rank that sent the data. | ||
| shared_info : dict | ||
| The necessary information to properly deserialize data from shared memory. | ||
| """ | ||
| mpi_state = communication.MPIState.get_instance() | ||
| s_data_len = shared_info["s_data_len"] | ||
| raw_buffers_len = shared_info["raw_buffers_len"] | ||
| service_index = shared_info["service_index"] | ||
| buffer_count = shared_info["buffer_count"] | ||
| # check data in shared memory | ||
| if not self._check_service_info(data_id, service_index): | ||
| # reserve shared memory | ||
| shared_data_len = s_data_len + sum([buf for buf in raw_buffers_len]) | ||
| reservation_info = communication.send_reserve_operation( | ||
| mpi_state.global_comm, data_id, shared_data_len | ||
| ) | ||
| service_index = reservation_info["service_index"] | ||
| # check if worker should sync shared buffer or it is doing by another worker | ||
| if reservation_info["is_first_request"]: | ||
| # syncronize shared buffer | ||
| self._sync_shared_memory_from_another_host( | ||
| mpi_state.global_comm, | ||
| data_id, | ||
| owner_rank, | ||
| reservation_info["first_index"], | ||
| reservation_info["last_index"], | ||
| service_index, | ||
| ) | ||
| # put service info | ||
| self._put_service_info( | ||
| service_index, data_id, reservation_info["first_index"] | ||
| ) | ||
| else: | ||
| # wait while another worker syncronize shared buffer | ||
| while not self._check_service_info(data_id, service_index): | ||
| time.sleep(MpiBackoff.get()) | ||
| # put shared info with updated data_id and service_index | ||
| shared_info = common.MetadataPackage.get_shared_info( | ||
| data_id, s_data_len, raw_buffers_len, buffer_count, service_index | ||
| ) | ||
| self._put_shared_info(data_id, shared_info) | ||
| # increment ref | ||
| self._increment_ref_number(data_id, shared_info["service_index"]) | ||
| # read from shared buffer and deserialized | ||
| return self._read_from_shared_buffer(data_id, shared_info) | ||
| def finalize(self): | ||
| """ | ||
| Release used resources. | ||
| Notes | ||
| ----- | ||
| Shared store should be finalized before MPI.Finalize(). | ||
| """ | ||
| if self.win is not None: | ||
| self.win.Free() | ||
| self.win = None | ||
| if self.service_win is not None: | ||
| self.service_win.Free() | ||
| self.service_win = None | ||
| for f in self.finalizers: | ||
| f.detach() |
+30
-14
| Metadata-Version: 2.1 | ||
| Name: unidist | ||
| Version: 0.4.1 | ||
| Version: 0.5.0 | ||
| Summary: Unified Distributed Execution | ||
@@ -9,8 +9,22 @@ Home-page: https://github.com/modin-project/unidist | ||
| Description-Content-Type: text/markdown | ||
| License-File: LICENSE | ||
| License-File: AUTHORS | ||
| Requires-Dist: packaging | ||
| Requires-Dist: cloudpickle | ||
| Provides-Extra: ray | ||
| Requires-Dist: ray[default]>=1.13.0; extra == "ray" | ||
| Requires-Dist: pydantic<2; extra == "ray" | ||
| Provides-Extra: dask | ||
| Requires-Dist: dask[complete]>=2.22.0; extra == "dask" | ||
| Requires-Dist: distributed>=2.22.0; extra == "dask" | ||
| Provides-Extra: mpi | ||
| Requires-Dist: mpi4py>=3.0.3; extra == "mpi" | ||
| Requires-Dist: msgpack>=1.0.0; extra == "mpi" | ||
| Provides-Extra: all | ||
| License-File: LICENSE | ||
| License-File: AUTHORS | ||
| Requires-Dist: ray[default]>=1.13.0; extra == "all" | ||
| Requires-Dist: pydantic<2; extra == "all" | ||
| Requires-Dist: dask[complete]>=2.22.0; extra == "all" | ||
| Requires-Dist: distributed>=2.22.0; extra == "all" | ||
| Requires-Dist: mpi4py>=3.0.3; extra == "all" | ||
| Requires-Dist: msgpack>=1.0.0; extra == "all" | ||
@@ -23,5 +37,6 @@ <p align="center"> | ||
| <p align="center"> | ||
| <a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/workflows/master/badge.svg" align="center"></a> | ||
| <a href="https://unidist.readthedocs.io/en/0.4.1/?badge=0.4.1"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=0.4.1" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/0.4.1/"><img src="https://img.shields.io/badge/pypi-0.4.1-blue.svg" alt="PyPI version" align="center"></a> | ||
| <a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/actions/workflows/ci.yml/badge.svg" align="center"></a> | ||
| <a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a> | ||
| <a href="https://pepy.tech/project/unidist"><img src="https://static.pepy.tech/personalized-badge/unidist?period=total&units=international_system&left_color=black&right_color=blue&left_text=Downloads" align="center"></a> | ||
| </p> | ||
@@ -65,6 +80,6 @@ | ||
| **Note:** There are different MPI implementations, each of which can be used as a backend in unidist. | ||
| By default, mapping `unidist[mpi]` installs MPICH on Linux and MacOS and MSMPI on Windows. If you want to use | ||
| a specific version of MPI, you can install the core dependencies of unidist as `pip install unidist` and then | ||
| install the specific version of MPI using pip as shown in the [installation](https://mpi4py.readthedocs.io/en/latest/install.html) | ||
| section of mpi4py documentation. | ||
| Mapping `unidist[mpi]` installs `mpi4py` package, which is just a Python wrapper for MPI. | ||
| To enable unidist on MPI execution you need to have a working MPI implementation and certain software installed beforehand. | ||
| Refer to [Installation](https://mpi4py.readthedocs.io/en/latest/install.html) page of the `mpi4py` documentation for details. | ||
| Also, you can find some instructions on [MPI backend](https://unidist.readthedocs.io/en/latest/optimization_notes/mpi.html) page. | ||
@@ -93,6 +108,7 @@ #### Using conda | ||
| **Note:** There are different MPI implementations, each of which can be used as a backend in unidist. | ||
| By default, mapping `unidist-mpi` installs MPICH on Linux and MacOS and MSMPI on Windows. If you want to use | ||
| a specific version of MPI, you can install the core dependencies of unidist as `conda install unidist` and then | ||
| install the specific version of MPI using conda as shown in the [installation](https://mpi4py.readthedocs.io/en/latest/install.html) | ||
| section of mpi4py documentation. That said, it is highly encouraged to use your own MPI binaries as stated in the | ||
| By default, mapping `unidist-mpi` installs a default MPI implementation, which comes with `mpi4py` package and is ready to use. | ||
| The conda dependency solver decides on which MPI implementation is to be installed. If you want to use a specific version of MPI, | ||
| you can install the core dependencies for MPI backend and the specific version of MPI as `conda install unidist-mpi <mpi>` | ||
| as shown in the [Installation](https://mpi4py.readthedocs.io/en/latest/install.html) | ||
| page of `mpi4py` documentation. That said, it is highly encouraged to use your own MPI binaries as stated in the | ||
| [Using External MPI Libraries](https://conda-forge.org/docs/user/tipsandtricks.html#using-external-message-passing-interface-mpi-libraries) | ||
@@ -99,0 +115,0 @@ section of the conda-forge documentation in order to get ultimate performance. |
+13
-11
@@ -7,5 +7,6 @@ <p align="center"> | ||
| <p align="center"> | ||
| <a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/workflows/master/badge.svg" align="center"></a> | ||
| <a href="https://unidist.readthedocs.io/en/0.4.1/?badge=0.4.1"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=0.4.1" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/0.4.1/"><img src="https://img.shields.io/badge/pypi-0.4.1-blue.svg" alt="PyPI version" align="center"></a> | ||
| <a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/actions/workflows/ci.yml/badge.svg" align="center"></a> | ||
| <a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a> | ||
| <a href="https://pepy.tech/project/unidist"><img src="https://static.pepy.tech/personalized-badge/unidist?period=total&units=international_system&left_color=black&right_color=blue&left_text=Downloads" align="center"></a> | ||
| </p> | ||
@@ -49,6 +50,6 @@ | ||
| **Note:** There are different MPI implementations, each of which can be used as a backend in unidist. | ||
| By default, mapping `unidist[mpi]` installs MPICH on Linux and MacOS and MSMPI on Windows. If you want to use | ||
| a specific version of MPI, you can install the core dependencies of unidist as `pip install unidist` and then | ||
| install the specific version of MPI using pip as shown in the [installation](https://mpi4py.readthedocs.io/en/latest/install.html) | ||
| section of mpi4py documentation. | ||
| Mapping `unidist[mpi]` installs `mpi4py` package, which is just a Python wrapper for MPI. | ||
| To enable unidist on MPI execution you need to have a working MPI implementation and certain software installed beforehand. | ||
| Refer to [Installation](https://mpi4py.readthedocs.io/en/latest/install.html) page of the `mpi4py` documentation for details. | ||
| Also, you can find some instructions on [MPI backend](https://unidist.readthedocs.io/en/latest/optimization_notes/mpi.html) page. | ||
@@ -77,6 +78,7 @@ #### Using conda | ||
| **Note:** There are different MPI implementations, each of which can be used as a backend in unidist. | ||
| By default, mapping `unidist-mpi` installs MPICH on Linux and MacOS and MSMPI on Windows. If you want to use | ||
| a specific version of MPI, you can install the core dependencies of unidist as `conda install unidist` and then | ||
| install the specific version of MPI using conda as shown in the [installation](https://mpi4py.readthedocs.io/en/latest/install.html) | ||
| section of mpi4py documentation. That said, it is highly encouraged to use your own MPI binaries as stated in the | ||
| By default, mapping `unidist-mpi` installs a default MPI implementation, which comes with `mpi4py` package and is ready to use. | ||
| The conda dependency solver decides on which MPI implementation is to be installed. If you want to use a specific version of MPI, | ||
| you can install the core dependencies for MPI backend and the specific version of MPI as `conda install unidist-mpi <mpi>` | ||
| as shown in the [Installation](https://mpi4py.readthedocs.io/en/latest/install.html) | ||
| page of `mpi4py` documentation. That said, it is highly encouraged to use your own MPI binaries as stated in the | ||
| [Using External MPI Libraries](https://conda-forge.org/docs/user/tipsandtricks.html#using-external-message-passing-interface-mpi-libraries) | ||
@@ -83,0 +85,0 @@ section of the conda-forge documentation in order to get ultimate performance. |
+44
-2
| import pathlib | ||
| from setuptools import setup, find_packages | ||
| from setuptools import setup, find_packages, Extension | ||
| from setuptools.dist import Distribution | ||
| from Cython.Build import cythonize | ||
| import sys | ||
| import versioneer | ||
| try: | ||
| from wheel.bdist_wheel import bdist_wheel | ||
| HAS_WHEEL = True | ||
| except ImportError: | ||
| HAS_WHEEL = False | ||
| if HAS_WHEEL: | ||
| class UnidistWheel(bdist_wheel): | ||
| def finalize_options(self): | ||
| bdist_wheel.finalize_options(self) | ||
| self.root_is_pure = False | ||
| def get_tag(self): | ||
| _, _, plat = bdist_wheel.get_tag(self) | ||
| py = "py3" | ||
| abi = "none" | ||
| return py, abi, plat | ||
| class UnidistDistribution(Distribution): | ||
| def __init__(self, *attrs): | ||
| Distribution.__init__(self, *attrs) | ||
| if HAS_WHEEL: | ||
| self.cmdclass["bdist_wheel"] = UnidistWheel | ||
| def is_pure(self): | ||
| return False | ||
| # https://github.com/modin-project/unidist/issues/324 | ||
| ray_deps = ["ray[default]>=1.13.0", "pydantic<2"] | ||
| dask_deps = ["dask[complete]>=2.22.0", "distributed>=2.22.0"] | ||
| mpi_deps = ["mpi4py-mpich", "msgpack>=1.0.0"] | ||
| mpi_deps = ["mpi4py>=3.0.3", "msgpack>=1.0.0"] | ||
| if sys.version_info[1] < 8: | ||
@@ -19,2 +53,8 @@ mpi_deps += "pickle5" | ||
| _memory = Extension( | ||
| "unidist.core.backends.mpi.core._memory", | ||
| ["unidist/core/backends/mpi/core/memory/_memory.pyx"], | ||
| language="c++", | ||
| ) | ||
| setup( | ||
@@ -24,2 +64,3 @@ name="unidist", | ||
| cmdclass=versioneer.get_cmdclass(), | ||
| distclass=UnidistDistribution, | ||
| description="Unified Distributed Execution", | ||
@@ -40,2 +81,3 @@ long_description=long_description, | ||
| python_requires=">=3.7.1", | ||
| ext_modules=cythonize([_memory]), | ||
| ) |
| Metadata-Version: 2.1 | ||
| Name: unidist | ||
| Version: 0.4.1 | ||
| Version: 0.5.0 | ||
| Summary: Unified Distributed Execution | ||
@@ -9,8 +9,22 @@ Home-page: https://github.com/modin-project/unidist | ||
| Description-Content-Type: text/markdown | ||
| License-File: LICENSE | ||
| License-File: AUTHORS | ||
| Requires-Dist: packaging | ||
| Requires-Dist: cloudpickle | ||
| Provides-Extra: ray | ||
| Requires-Dist: ray[default]>=1.13.0; extra == "ray" | ||
| Requires-Dist: pydantic<2; extra == "ray" | ||
| Provides-Extra: dask | ||
| Requires-Dist: dask[complete]>=2.22.0; extra == "dask" | ||
| Requires-Dist: distributed>=2.22.0; extra == "dask" | ||
| Provides-Extra: mpi | ||
| Requires-Dist: mpi4py>=3.0.3; extra == "mpi" | ||
| Requires-Dist: msgpack>=1.0.0; extra == "mpi" | ||
| Provides-Extra: all | ||
| License-File: LICENSE | ||
| License-File: AUTHORS | ||
| Requires-Dist: ray[default]>=1.13.0; extra == "all" | ||
| Requires-Dist: pydantic<2; extra == "all" | ||
| Requires-Dist: dask[complete]>=2.22.0; extra == "all" | ||
| Requires-Dist: distributed>=2.22.0; extra == "all" | ||
| Requires-Dist: mpi4py>=3.0.3; extra == "all" | ||
| Requires-Dist: msgpack>=1.0.0; extra == "all" | ||
@@ -23,5 +37,6 @@ <p align="center"> | ||
| <p align="center"> | ||
| <a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/workflows/master/badge.svg" align="center"></a> | ||
| <a href="https://unidist.readthedocs.io/en/0.4.1/?badge=0.4.1"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=0.4.1" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/0.4.1/"><img src="https://img.shields.io/badge/pypi-0.4.1-blue.svg" alt="PyPI version" align="center"></a> | ||
| <a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/actions/workflows/ci.yml/badge.svg" align="center"></a> | ||
| <a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a> | ||
| <a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a> | ||
| <a href="https://pepy.tech/project/unidist"><img src="https://static.pepy.tech/personalized-badge/unidist?period=total&units=international_system&left_color=black&right_color=blue&left_text=Downloads" align="center"></a> | ||
| </p> | ||
@@ -65,6 +80,6 @@ | ||
| **Note:** There are different MPI implementations, each of which can be used as a backend in unidist. | ||
| By default, mapping `unidist[mpi]` installs MPICH on Linux and MacOS and MSMPI on Windows. If you want to use | ||
| a specific version of MPI, you can install the core dependencies of unidist as `pip install unidist` and then | ||
| install the specific version of MPI using pip as shown in the [installation](https://mpi4py.readthedocs.io/en/latest/install.html) | ||
| section of mpi4py documentation. | ||
| Mapping `unidist[mpi]` installs `mpi4py` package, which is just a Python wrapper for MPI. | ||
| To enable unidist on MPI execution you need to have a working MPI implementation and certain software installed beforehand. | ||
| Refer to [Installation](https://mpi4py.readthedocs.io/en/latest/install.html) page of the `mpi4py` documentation for details. | ||
| Also, you can find some instructions on [MPI backend](https://unidist.readthedocs.io/en/latest/optimization_notes/mpi.html) page. | ||
@@ -93,6 +108,7 @@ #### Using conda | ||
| **Note:** There are different MPI implementations, each of which can be used as a backend in unidist. | ||
| By default, mapping `unidist-mpi` installs MPICH on Linux and MacOS and MSMPI on Windows. If you want to use | ||
| a specific version of MPI, you can install the core dependencies of unidist as `conda install unidist` and then | ||
| install the specific version of MPI using conda as shown in the [installation](https://mpi4py.readthedocs.io/en/latest/install.html) | ||
| section of mpi4py documentation. That said, it is highly encouraged to use your own MPI binaries as stated in the | ||
| By default, mapping `unidist-mpi` installs a default MPI implementation, which comes with `mpi4py` package and is ready to use. | ||
| The conda dependency solver decides on which MPI implementation is to be installed. If you want to use a specific version of MPI, | ||
| you can install the core dependencies for MPI backend and the specific version of MPI as `conda install unidist-mpi <mpi>` | ||
| as shown in the [Installation](https://mpi4py.readthedocs.io/en/latest/install.html) | ||
| page of `mpi4py` documentation. That said, it is highly encouraged to use your own MPI binaries as stated in the | ||
| [Using External MPI Libraries](https://conda-forge.org/docs/user/tipsandtricks.html#using-external-message-passing-interface-mpi-libraries) | ||
@@ -99,0 +115,0 @@ section of the conda-forge documentation in order to get ultimate performance. |
@@ -9,3 +9,3 @@ packaging | ||
| distributed>=2.22.0 | ||
| mpi4py-mpich | ||
| mpi4py>=3.0.3 | ||
| msgpack>=1.0.0 | ||
@@ -18,3 +18,3 @@ | ||
| [mpi] | ||
| mpi4py-mpich | ||
| mpi4py>=3.0.3 | ||
| msgpack>=1.0.0 | ||
@@ -21,0 +21,0 @@ |
@@ -46,4 +46,5 @@ AUTHORS | ||
| unidist/core/backends/mpi/core/communication.py | ||
| unidist/core/backends/mpi/core/monitor.py | ||
| unidist/core/backends/mpi/core/local_object_store.py | ||
| unidist/core/backends/mpi/core/serialization.py | ||
| unidist/core/backends/mpi/core/shared_object_store.py | ||
| unidist/core/backends/mpi/core/controller/__init__.py | ||
@@ -54,6 +55,10 @@ unidist/core/backends/mpi/core/controller/actor.py | ||
| unidist/core/backends/mpi/core/controller/garbage_collector.py | ||
| unidist/core/backends/mpi/core/controller/object_store.py | ||
| unidist/core/backends/mpi/core/memory/_memory.cpp | ||
| unidist/core/backends/mpi/core/memory/memory.cpp | ||
| unidist/core/backends/mpi/core/memory/memory.h | ||
| unidist/core/backends/mpi/core/monitor/__init__.py | ||
| unidist/core/backends/mpi/core/monitor/loop.py | ||
| unidist/core/backends/mpi/core/monitor/shared_memory_manager.py | ||
| unidist/core/backends/mpi/core/worker/__init__.py | ||
| unidist/core/backends/mpi/core/worker/loop.py | ||
| unidist/core/backends/mpi/core/worker/object_store.py | ||
| unidist/core/backends/mpi/core/worker/request_store.py | ||
@@ -60,0 +65,0 @@ unidist/core/backends/mpi/core/worker/task_store.py |
@@ -11,7 +11,7 @@ | ||
| { | ||
| "date": "2023-07-14T09:41:01+0200", | ||
| "date": "2023-11-14T20:43:02+0100", | ||
| "dirty": false, | ||
| "error": null, | ||
| "full-revisionid": "5406937f574fa2abcc78478c55eb9810739934cf", | ||
| "version": "0.4.1" | ||
| "full-revisionid": "1ec811a67984ad70d035c48965f879ae82029a48", | ||
| "version": "0.5.0" | ||
| } | ||
@@ -18,0 +18,0 @@ ''' # END VERSION_JSON |
@@ -17,3 +17,3 @@ # Copyright (C) 2021-2023 Modin authors | ||
| from .backends.mpi import ( | ||
| IsMpiSpawnWorkers, | ||
| MpiSpawn, | ||
| MpiHosts, | ||
@@ -23,2 +23,7 @@ MpiPickleThreshold, | ||
| MpiLog, | ||
| MpiSharedObjectStore, | ||
| MpiSharedObjectStoreMemory, | ||
| MpiSharedServiceMemory, | ||
| MpiSharedObjectStoreThreshold, | ||
| MpiRuntimeEnv, | ||
| ) | ||
@@ -38,3 +43,3 @@ from .parameter import ValueSource | ||
| "DaskSchedulerAddress", | ||
| "IsMpiSpawnWorkers", | ||
| "MpiSpawn", | ||
| "MpiHosts", | ||
@@ -45,2 +50,7 @@ "ValueSource", | ||
| "MpiLog", | ||
| "MpiSharedObjectStore", | ||
| "MpiSharedObjectStoreMemory", | ||
| "MpiSharedServiceMemory", | ||
| "MpiSharedObjectStoreThreshold", | ||
| "MpiRuntimeEnv", | ||
| ] |
@@ -7,6 +7,17 @@ # Copyright (C) 2021-2023 Modin authors | ||
| from .envvars import IsMpiSpawnWorkers, MpiHosts, MpiPickleThreshold, MpiBackoff, MpiLog | ||
| from .envvars import ( | ||
| MpiSpawn, | ||
| MpiHosts, | ||
| MpiPickleThreshold, | ||
| MpiBackoff, | ||
| MpiLog, | ||
| MpiSharedObjectStore, | ||
| MpiSharedObjectStoreMemory, | ||
| MpiSharedServiceMemory, | ||
| MpiSharedObjectStoreThreshold, | ||
| MpiRuntimeEnv, | ||
| ) | ||
| __all__ = [ | ||
| "IsMpiSpawnWorkers", | ||
| "MpiSpawn", | ||
| "MpiHosts", | ||
@@ -16,2 +27,7 @@ "MpiPickleThreshold", | ||
| "MpiLog", | ||
| "MpiSharedObjectStore", | ||
| "MpiSharedObjectStoreMemory", | ||
| "MpiSharedServiceMemory", | ||
| "MpiSharedObjectStoreThreshold", | ||
| "MpiRuntimeEnv", | ||
| ] |
@@ -10,12 +10,18 @@ # Copyright (C) 2021-2023 Modin authors | ||
| class IsMpiSpawnWorkers(EnvironmentVariable, type=bool): | ||
| class MpiSpawn(EnvironmentVariable, type=bool): | ||
| """Whether to enable MPI spawn or not.""" | ||
| default = True | ||
| varname = "UNIDIST_IS_MPI_SPAWN_WORKERS" | ||
| varname = "UNIDIST_MPI_SPAWN" | ||
| class MpiHosts(EnvironmentVariable, type=ExactStr): | ||
| """MPI hosts to run unidist on.""" | ||
| """ | ||
| MPI hosts to run unidist on. | ||
| Notes | ||
| ----- | ||
| This variable is only used if a program is run in Controller/Worker model. | ||
| """ | ||
| varname = "UNIDIST_MPI_HOSTS" | ||
@@ -25,8 +31,37 @@ | ||
| class MpiPickleThreshold(EnvironmentVariable, type=int): | ||
| """Minimum buffer size for serialization with pickle 5 protocol.""" | ||
| """ | ||
| Minimum buffer size for serialization with pickle 5 protocol. | ||
| Notes | ||
| ----- | ||
| If the shared object store is enabled, ``MpiSharedObjectStoreThreshold`` takes | ||
| precedence on this configuration value and the threshold gets overridden. | ||
| It is done intentionally to prevent multiple copies when putting an object | ||
| into the local object store or into the shared object store. | ||
| Data copy happens once when doing in-band serialization in depend on the threshold. | ||
| In some cases output of a remote task can take up the memory of the task arguments. | ||
| If those arguments are placed in the shared object store, this location should not be overwritten | ||
| while output is being used, otherwise the output value may be corrupted. | ||
| """ | ||
| default = 1024**2 // 4 # 0.25 MiB | ||
| varname = "UNIDIST_MPI_PICKLE_THRESHOLD" | ||
| @classmethod | ||
| def get(cls) -> int: | ||
| """ | ||
| Get minimum buffer size for serialization with pickle 5 protocol. | ||
| Returns | ||
| ------- | ||
| int | ||
| """ | ||
| if MpiSharedObjectStore.get(): | ||
| mpi_pickle_threshold = MpiSharedObjectStoreThreshold.get() | ||
| cls.put_value_source(MpiSharedObjectStoreThreshold.get_value_source()) | ||
| else: | ||
| mpi_pickle_threshold = super().get() | ||
| return mpi_pickle_threshold | ||
| class MpiBackoff(EnvironmentVariable, type=float): | ||
@@ -52,1 +87,71 @@ """ | ||
| varname = "UNIDIST_MPI_LOG" | ||
| class MpiSharedObjectStore(EnvironmentVariable, type=bool): | ||
| """Whether to enable shared object store or not.""" | ||
| default = False | ||
| varname = "UNIDIST_MPI_SHARED_OBJECT_STORE" | ||
| class MpiSharedObjectStoreMemory(EnvironmentVariable, type=int): | ||
| """How many bytes of memory to start the shared object store with.""" | ||
| varname = "UNIDIST_MPI_SHARED_OBJECT_STORE_MEMORY" | ||
| class MpiSharedServiceMemory(EnvironmentVariable, type=int): | ||
| """How many bytes of memory to start the shared service memory with.""" | ||
| varname = "UNIDIST_MPI_SHARED_SERVICE_MEMORY" | ||
| class MpiSharedObjectStoreThreshold(EnvironmentVariable, type=int): | ||
| """Minimum size of data to put into the shared object store.""" | ||
| default = 10**5 # 100 KB | ||
| varname = "UNIDIST_MPI_SHARED_OBJECT_STORE_THRESHOLD" | ||
| class MpiRuntimeEnv: | ||
| """ | ||
| Runtime environment for MPI worker processes. | ||
| Notes | ||
| ----- | ||
| This config doesn't have a respective environment variable as | ||
| it is much more convenient to set a config value using the config API | ||
| but not through the environment variable. | ||
| """ | ||
| # Possible options for a runtime environment to set | ||
| env_vars = "env_vars" | ||
| # Config value | ||
| _value = {} | ||
| @classmethod | ||
| def put(cls, value): | ||
| """ | ||
| Set config value. | ||
| Parameters | ||
| ---------- | ||
| value : dict | ||
| Config value to set. | ||
| """ | ||
| if any([True for option in value if option != cls.env_vars]): | ||
| raise NotImplementedError( | ||
| "Any option other than environment variables is not supported yet." | ||
| ) | ||
| cls._value = value | ||
| @classmethod | ||
| def get(cls): | ||
| """ | ||
| Get config value. | ||
| Returns | ||
| ------- | ||
| dict | ||
| """ | ||
| return cls._value |
@@ -180,2 +180,14 @@ # Copyright (C) 2021-2023 Modin authors | ||
| @classmethod | ||
| def put_value_source(cls, value): | ||
| """ | ||
| Put value source of the config. | ||
| Parameters | ||
| ---------- | ||
| value : ValueSource | ||
| Value source to put. | ||
| """ | ||
| cls._value_source = value | ||
| @classmethod | ||
| def get(cls): | ||
@@ -182,0 +194,0 @@ """ |
@@ -9,7 +9,23 @@ # Copyright (C) 2021-2023 Modin authors | ||
| import inspect | ||
| import warnings | ||
| import weakref | ||
| from unidist.config.backends.mpi.envvars import MpiSpawn | ||
| from unidist.core.backends.mpi.utils import ImmutableDict | ||
| try: | ||
| import mpi4py | ||
| except ImportError: | ||
| raise ImportError( | ||
| "Missing dependency 'mpi4py'. Use pip or conda to install it." | ||
| ) from None | ||
| from unidist.core.backends.common.data_id import DataID, is_data_id | ||
| from unidist.config import MpiLog | ||
| from unidist.config import MpiLog, MpiSharedObjectStore | ||
| # TODO: Find a way to move this after all imports | ||
| mpi4py.rc(recv_mprobe=False, initialize=False) | ||
| from mpi4py import MPI # noqa: E402 | ||
| class Operation: | ||
@@ -29,20 +45,26 @@ """ | ||
| Save the data location to a local storage. | ||
| WAIT : int, default 5 | ||
| PUT_SHARED_DATA : int, default 5 | ||
| Save the data into shared memory. | ||
| WAIT : int, default 6 | ||
| Return readiness signal of a local data to a requester. | ||
| ACTOR_CREATE : int, default 6 | ||
| ACTOR_CREATE : int, default 7 | ||
| Create local actor instance. | ||
| ACTOR_EXECUTE : int, default 7 | ||
| ACTOR_EXECUTE : int, default 8 | ||
| Execute method of a local actor instance. | ||
| CLEANUP : int, default 8 | ||
| CLEANUP : int, default 9 | ||
| Cleanup local object storage for out-of-scope IDs. | ||
| TASK_DONE : int, default 9 | ||
| TASK_DONE : int, default 10 | ||
| Increment global task counter. | ||
| GET_TASK_COUNT : int, default 10 | ||
| GET_TASK_COUNT : int, default 11 | ||
| Return global task counter to a requester. | ||
| CANCEL : int, default 11 | ||
| RESERVE_SHARED_MEMORY : int, default 12 | ||
| Reserve area in shared memory for the data. | ||
| REQUEST_SHARED_DATA : int, default 13 | ||
| Return the area in shared memory with the requested data. | ||
| CANCEL : int, default 14 | ||
| Send a message to a worker to exit the event loop. | ||
| READY_TO_SHUTDOWN : int, default 12 | ||
| READY_TO_SHUTDOWN : int, default 15 | ||
| Send a message to monitor from a worker, | ||
| which is ready to shutdown. | ||
| SHUTDOWN : int, default 13 | ||
| SHUTDOWN : int, default 16 | ||
| Send a message from monitor to a worker to shutdown. | ||
@@ -56,13 +78,16 @@ """ | ||
| PUT_OWNER = 4 | ||
| WAIT = 5 | ||
| ACTOR_CREATE = 6 | ||
| ACTOR_EXECUTE = 7 | ||
| CLEANUP = 8 | ||
| PUT_SHARED_DATA = 5 | ||
| WAIT = 6 | ||
| ACTOR_CREATE = 7 | ||
| ACTOR_EXECUTE = 8 | ||
| CLEANUP = 9 | ||
| ### --- Monitor operations --- ### | ||
| TASK_DONE = 9 | ||
| GET_TASK_COUNT = 10 | ||
| TASK_DONE = 10 | ||
| GET_TASK_COUNT = 11 | ||
| RESERVE_SHARED_MEMORY = 12 | ||
| REQUEST_SHARED_DATA = 13 | ||
| ### --- Common operations --- ### | ||
| CANCEL = 11 | ||
| READY_TO_SHUTDOWN = 12 | ||
| SHUTDOWN = 13 | ||
| CANCEL = 14 | ||
| READY_TO_SHUTDOWN = 15 | ||
| SHUTDOWN = 16 | ||
@@ -89,2 +114,119 @@ | ||
| class MetadataPackage(ImmutableDict): | ||
| """ | ||
| The class defines metadata packages for a communication. | ||
| Attributes | ||
| ---------- | ||
| LOCAL_DATA : int, default: 0 | ||
| Package type indicating that the data will be sent from the local object store. | ||
| SHARED_DATA : int, default: 1 | ||
| Package type indicating that the data will be sent from the shared object store. | ||
| TASK_DATA : int, default: 2 | ||
| Package type indicating a task or an actor (actor method) to be sent. | ||
| """ | ||
| LOCAL_DATA = 0 | ||
| SHARED_DATA = 1 | ||
| TASK_DATA = 2 | ||
| @classmethod | ||
| def get_local_info(cls, data_id, s_data_len, raw_buffers_len, buffer_count): | ||
| """ | ||
| Get information package for sending local data. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| s_data_len : int | ||
| Main buffer length. | ||
| raw_buffers_len : list | ||
| A list of ``PickleBuffer`` lengths. | ||
| buffer_count : list | ||
| List of the number of buffers for each object | ||
| to be serialized/deserialized using the pickle 5 protocol. | ||
| Returns | ||
| ------- | ||
| dict | ||
| The information package. | ||
| """ | ||
| return MetadataPackage( | ||
| { | ||
| "package_type": MetadataPackage.LOCAL_DATA, | ||
| "id": data_id, | ||
| "s_data_len": s_data_len, | ||
| "raw_buffers_len": raw_buffers_len, | ||
| "buffer_count": buffer_count, | ||
| } | ||
| ) | ||
| @classmethod | ||
| def get_shared_info( | ||
| cls, data_id, s_data_len, raw_buffers_len, buffer_count, service_index | ||
| ): | ||
| """ | ||
| Get information package for sending data using shared memory. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| s_data_len : int | ||
| Main buffer length. | ||
| raw_buffers_len : list | ||
| A list of ``PickleBuffer`` lengths. | ||
| buffer_count : list | ||
| List of the number of buffers for each object | ||
| to be serialized/deserialized using the pickle 5 protocol. | ||
| service_index : int | ||
| Service index in shared memory. | ||
| Returns | ||
| ------- | ||
| dict | ||
| The information package. | ||
| """ | ||
| return MetadataPackage( | ||
| { | ||
| "package_type": MetadataPackage.SHARED_DATA, | ||
| "id": weakref.proxy(data_id), | ||
| "s_data_len": s_data_len, | ||
| "raw_buffers_len": tuple(raw_buffers_len), | ||
| "buffer_count": tuple(buffer_count), | ||
| "service_index": service_index, | ||
| } | ||
| ) | ||
| @classmethod | ||
| def get_task_info(cls, s_data_len, raw_buffers_len, buffer_count): | ||
| """ | ||
| Get information package for sending a task or an actor (actor method). | ||
| Parameters | ||
| ---------- | ||
| s_data_len : int | ||
| Main buffer length. | ||
| raw_buffers_len : list | ||
| A list of ``PickleBuffer`` lengths. | ||
| buffer_count : list | ||
| List of the number of buffers for each object | ||
| to be serialized/deserialized using the pickle 5 protocol. | ||
| Returns | ||
| ------- | ||
| dict | ||
| The information package. | ||
| """ | ||
| return MetadataPackage( | ||
| { | ||
| "package_type": MetadataPackage.TASK_DATA, | ||
| "s_data_len": s_data_len, | ||
| "raw_buffers_len": raw_buffers_len, | ||
| "buffer_count": buffer_count, | ||
| } | ||
| ) | ||
| default_class_properties = dir(type("dummy", (object,), {})) | ||
@@ -124,45 +266,73 @@ # Mapping between operations and their names (e.g., Operation.EXECUTE: "EXECUTE") | ||
| class MasterDataID(DataID): | ||
| class MpiDataID(DataID): | ||
| """ | ||
| Class for tracking data IDs of the main process. | ||
| Class for tracking data IDs of MPI processes. | ||
| Class extends ``unidist.core.backends.common.data_id.DataID`` functionality with a garbage collection. | ||
| The class extends ``unidist.core.backends.common.data_id.DataID`` functionality, | ||
| ensuring the uniqueness of the object to correctly construct/reconstruct it. | ||
| Otherwise, we would get into wrong garbage collection. | ||
| Parameters | ||
| ---------- | ||
| id_value : int | ||
| An integer value, generated by executor process. | ||
| garbage_collector : unidist.core.backends.mpi.core.executor.GarbageCollector | ||
| A reference to the garbage collector instance. | ||
| owner_rank : int | ||
| The rank of the process that owns the data. | ||
| data_number : int | ||
| Unique data number for the owner process. | ||
| gc : unidist.core.backends.mpi.core.executor.GarbageCollector or None | ||
| Local garbage collector reference. | ||
| The actual object is for the data id owner, otherwise, ``None``. | ||
| """ | ||
| def __init__(self, id_value, garbage_collector): | ||
| super().__init__(id_value) | ||
| self._gc = garbage_collector if garbage_collector else None | ||
| _instances = weakref.WeakValueDictionary() | ||
| def __del__(self): | ||
| """Track object deletion by garbage collector.""" | ||
| # We check for existence of `_qc` attribute because | ||
| # it might be deleted during serialization via `__getstate__` | ||
| if hasattr(self, "_gc") and self._gc is not None: | ||
| self._gc.collect(self.base_data_id()) | ||
| def __new__(cls, owner_rank, data_number, gc=None): | ||
| key = (owner_rank, data_number) | ||
| if key in cls._instances: | ||
| return cls._instances[key] | ||
| else: | ||
| new_instance = super().__new__(cls) | ||
| cls._instances[key] = new_instance | ||
| return new_instance | ||
| def __getstate__(self): | ||
| """Remove a reference to garbage collector for correct `pickle` serialization.""" | ||
| attributes = self.__dict__.copy() | ||
| del attributes["_gc"] | ||
| return attributes | ||
| def __init__(self, owner_rank, data_number, gc=None): | ||
| super().__init__(f"rank_{owner_rank}_id_{data_number}") | ||
| self.owner_rank = owner_rank | ||
| self.data_number = data_number | ||
| self._gc = gc | ||
| def base_data_id(self): | ||
| def __getnewargs__(self): | ||
| """ | ||
| Return the base class instance without garbage collector reference. | ||
| Prepare arguments to reconstruct the object upon unpickling. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID | ||
| Base ``DataID`` class object without garbage collector reference. | ||
| tuple | ||
| Tuple of the owner rank and data number to be passed into `__new__`. | ||
| """ | ||
| return DataID(self._id) | ||
| return (self.owner_rank, self.data_number) | ||
| def __getstate__(self): | ||
| """ | ||
| Remove a reference to garbage collector for correct `pickle` serialization. | ||
| Returns | ||
| ------- | ||
| dict | ||
| State of the object without garbage collector. | ||
| """ | ||
| state = self.__dict__.copy() | ||
| # we remove this attribute for correct serialization, | ||
| # as well as to reduce the length of the serialized data | ||
| if hasattr(self, "_gc"): | ||
| del state["_gc"] | ||
| return state | ||
| def __del__(self): | ||
| """Track object deletion by garbage collector.""" | ||
| # check for existence of `._gc` attribute as | ||
| # it is missing upon unpickling | ||
| if hasattr(self, "_gc") and self._gc is not None: | ||
| self._gc.collect((self.owner_rank, self.data_number)) | ||
| def get_logger(logger_name, file_name, activate=None): | ||
@@ -226,60 +396,2 @@ """ | ||
| def master_data_ids_to_base(o_ids): | ||
| """ | ||
| Transform all data ID objects of the main process to its base class instances. | ||
| Cast ``unidist.core.backends.mpi.core.common.MasterDataID`` to it's base ``unidist.backend.common.data_id.DataID`` class | ||
| to remove a reference to garbage collector. | ||
| Parameters | ||
| ---------- | ||
| o_ids : iterable | ||
| Sequence of ``unidist.core.backends.mpi.core.common.MasterDataID`` objects. | ||
| Returns | ||
| ------- | ||
| list | ||
| Transformed list. | ||
| """ | ||
| if o_ids is None: | ||
| return None | ||
| elif is_data_id(o_ids): | ||
| return o_ids.base_data_id() | ||
| id_list = [o_id.base_data_id() for o_id in o_ids] | ||
| return id_list | ||
| def unwrap_data_ids(data_ids): | ||
| """ | ||
| Find all data ID instances of the main process and prepare for communication with worker process. | ||
| Call `base_data_id` on each instance. | ||
| Parameters | ||
| ---------- | ||
| data_ids : iterable | ||
| Iterable objects to transform recursively. | ||
| Returns | ||
| ------- | ||
| iterable | ||
| Transformed iterable object (task arguments). | ||
| """ | ||
| if type(data_ids) in (list, tuple, dict): | ||
| container = type(data_ids)() | ||
| for value in data_ids: | ||
| unwrapped_value = unwrap_data_ids( | ||
| data_ids[value] if isinstance(data_ids, dict) else value | ||
| ) | ||
| if isinstance(container, list): | ||
| container += [unwrapped_value] | ||
| elif isinstance(container, tuple): | ||
| container += (unwrapped_value,) | ||
| elif isinstance(container, dict): | ||
| container.update({value: unwrapped_value}) | ||
| return container | ||
| else: | ||
| return data_ids.base_data_id() if is_data_id(data_ids) else data_ids | ||
| def materialize_data_ids(data_ids, unwrap_data_id_impl, is_pending=False): | ||
@@ -289,3 +401,3 @@ """ | ||
| Find all ``unidist.core.backends.common.data_id.DataID`` instances and call `unwrap_data_id_impl` on them. | ||
| Find all ``unidist.core.backends.mpi.core.common.MpiDataID`` instances and call `unwrap_data_id_impl` on them. | ||
@@ -332,1 +444,72 @@ Parameters | ||
| return unwrapped, is_pending | ||
| def check_mpich_version(target_version): | ||
| """ | ||
| Check if the using MPICH version is equal to or greater than the target version. | ||
| Parameters | ||
| ---------- | ||
| target_version : str | ||
| Required version of the MPICH library. | ||
| Returns | ||
| ------- | ||
| bool | ||
| True ot false. | ||
| """ | ||
| def versiontuple(v): | ||
| return tuple(map(int, (v.split(".")))) | ||
| mpich_version = [ | ||
| raw for raw in MPI.Get_library_version().split("\n") if "MPICH Version:" in raw | ||
| ][0].split(" ")[-1] | ||
| return versiontuple(mpich_version) >= versiontuple(target_version) | ||
| def is_shared_memory_supported(raise_warning=False): | ||
| """ | ||
| Check if the unidist on MPI supports shared memory. | ||
| Parameters | ||
| ---------- | ||
| raise_warning: bool, default: False | ||
| Whether to raise a warning or not. | ||
| ``True`` is passed only for root process | ||
| to have the only warning. | ||
| Returns | ||
| ------- | ||
| bool | ||
| True or False. | ||
| Notes | ||
| ----- | ||
| Prior to the MPI 3.0 standard there is no support for shared memory. | ||
| """ | ||
| if not MpiSharedObjectStore.get(): | ||
| return False | ||
| if MPI.VERSION < 3: | ||
| if raise_warning: | ||
| warnings.warn( | ||
| f"Shared object store for MPI backend is not supported for MPI version {MPI.VERSION} " | ||
| "since it doesn't support shared memory feature." | ||
| ) | ||
| return False | ||
| # Mpich shared memory does not work with spawned processes prior to version 4.2.0. | ||
| if ( | ||
| "MPICH" in MPI.Get_library_version() | ||
| and MpiSpawn.get() | ||
| and not check_mpich_version("4.2.0") | ||
| ): | ||
| if raise_warning: | ||
| warnings.warn( | ||
| "Shared object store for MPI backend is not supported in C/W model for MPICH version less than 4.2.0. " | ||
| + "Read more about this issue in the `troubleshooting` page of the unidist documentation." | ||
| ) | ||
| return False | ||
| return True |
@@ -10,2 +10,3 @@ # Copyright (C) 2021-2023 Modin authors | ||
| import time | ||
| import warnings | ||
@@ -21,6 +22,8 @@ try: | ||
| from unidist.core.backends.mpi.core.serialization import ( | ||
| ComplexDataSerializer, | ||
| SimpleDataSerializer, | ||
| serialize_complex_data, | ||
| deserialize_complex_data, | ||
| ) | ||
| import unidist.core.backends.mpi.core.common as common | ||
| from unidist.config.backends.mpi.envvars import MpiSpawn, MpiHosts | ||
@@ -51,3 +54,3 @@ # TODO: Find a way to move this after all imports | ||
| logger_op_name_len = 15 | ||
| logger_worker_count = MPIState.get_instance().world_size | ||
| logger_worker_count = MPIState.get_instance().global_size | ||
@@ -57,3 +60,3 @@ # write header on first worker | ||
| not is_logger_header_printed | ||
| and MPIState.get_instance().rank == MPIRank.FIRST_WORKER | ||
| and MPIState.get_instance().global_rank == MPIRank.FIRST_WORKER | ||
| ): | ||
@@ -66,3 +69,3 @@ worker_ids_str = "".join([f"{i}\t" for i in range(logger_worker_count)]) | ||
| source_rank = status.Get_source() | ||
| dest_rank = MPIState.get_instance().rank | ||
| dest_rank = MPIState.get_instance().global_rank | ||
| op_name = common.get_op_name(op_type) | ||
@@ -91,6 +94,24 @@ space_after_op_name = " " * (logger_op_name_len - len(op_name)) | ||
| MPI communicator. | ||
| rank : int | ||
| Attributes | ||
| ---------- | ||
| global_comm : mpi4py.MPI.Comm | ||
| Global MPI communicator. | ||
| host_comm : mpi4py.MPI.Comm | ||
| MPI subcommunicator for the current host. | ||
| global_rank : int | ||
| Rank of a process. | ||
| world_sise : int | ||
| Number of processes. | ||
| global_size : int | ||
| Number of processes in the global communicator. | ||
| host : str | ||
| IP-address of the current host. | ||
| topology : dict | ||
| Dictionary, containing all ranks assignments by IP-addresses in | ||
| the form: `{"node_ip0": {"host_rank": "global_rank", ...}, ...}`. | ||
| host_by_rank : dict | ||
| Dictionary containing IP addresses by rank. | ||
| monitor_processes : list | ||
| List of ranks that are monitor processes. | ||
| workers : list | ||
| List of ranks that are worker processes. | ||
| """ | ||
@@ -100,8 +121,68 @@ | ||
| def __init__(self, comm, rank, world_sise): | ||
| def __init__(self, comm): | ||
| # attributes get actual values when MPI is initialized in `init` function | ||
| self.comm = comm | ||
| self.rank = rank | ||
| self.world_size = world_sise | ||
| self.global_comm = comm | ||
| self.global_rank = comm.Get_rank() | ||
| self.global_size = comm.Get_size() | ||
| self.host = socket.gethostbyname(socket.gethostname()) | ||
| # `Split_type` does not work correctly in the MSMPI library in C/W model | ||
| # so we split the communicator in a different way | ||
| if "Microsoft MPI" in MPI.Get_library_version() and MpiSpawn.get(): | ||
| all_hosts = self.global_comm.allgather(self.host) | ||
| self.host_comm = self.global_comm.Split(all_hosts.index(self.host)) | ||
| else: | ||
| self.host_comm = self.global_comm.Split_type(MPI.COMM_TYPE_SHARED) | ||
| host_rank = self.host_comm.Get_rank() | ||
| # Get topology of MPI cluster. | ||
| cluster_info = self.global_comm.allgather( | ||
| (self.host, self.global_rank, host_rank) | ||
| ) | ||
| self.topology = defaultdict(dict) | ||
| self.host_by_rank = defaultdict(None) | ||
| for host, global_rank, host_rank in cluster_info: | ||
| self.topology[host][host_rank] = global_rank | ||
| self.host_by_rank[global_rank] = host | ||
| mpi_hosts = MpiHosts.get() | ||
| if mpi_hosts is not None and MpiSpawn.get(): | ||
| host_list = mpi_hosts.split(",") | ||
| host_count = len(host_list) | ||
| # check running hosts | ||
| if self.is_root_process() and len(self.topology.keys()) > host_count: | ||
| warnings.warn( | ||
| "The number of running hosts is greater than that specified in the UNIDIST_MPI_HOSTS. " | ||
| + "If you want to run the program on a host other than the local one, specify the appropriate parameter for `mpiexec` " | ||
| + "(`--host` for OpenMPI and `--hosts` for Intel MPI or MPICH)." | ||
| ) | ||
| if self.is_root_process() and len(self.topology.keys()) < host_count: | ||
| warnings.warn( | ||
| "The number of running hosts is less than that specified in the UNIDIST_MPI_HOSTS. " | ||
| + "Check the `mpiexec` option to distribute processes between hosts." | ||
| ) | ||
| if common.is_shared_memory_supported(): | ||
| self.monitor_processes = [] | ||
| for host in self.topology: | ||
| if len(self.topology[host]) >= 2: | ||
| self.monitor_processes.append(self.topology[host][MPIRank.MONITOR]) | ||
| elif self.is_root_process(): | ||
| raise ValueError( | ||
| "When using shared object store, each host must contain at least 2 processes, " | ||
| "since one of them will be a service monitor." | ||
| ) | ||
| else: | ||
| self.monitor_processes = [MPIRank.MONITOR] | ||
| self.workers = [] | ||
| for host in self.topology: | ||
| self.workers.extend( | ||
| [ | ||
| rank | ||
| for rank in self.topology[host].values() | ||
| if not self.is_root_process(rank) | ||
| and not self.is_monitor_process(rank) | ||
| ] | ||
| ) | ||
| @classmethod | ||
@@ -126,37 +207,74 @@ def get_instance(cls, *args): | ||
| def is_root_process(self, rank=None): | ||
| """ | ||
| Check if the rank is root process. | ||
| class MPIRank: | ||
| """Class that describes ranks assignment.""" | ||
| Parameters | ||
| ---------- | ||
| rank : int, optional | ||
| The rank to be checked. | ||
| If ``None``, the current rank is to be checked. | ||
| ROOT = 0 | ||
| MONITOR = 1 | ||
| FIRST_WORKER = 2 | ||
| Returns | ||
| ------- | ||
| bool | ||
| True or False. | ||
| """ | ||
| if rank is None: | ||
| rank = self.global_rank | ||
| return rank == MPIRank.ROOT | ||
| def is_monitor_process(self, rank=None): | ||
| """ | ||
| Check if the rank is a monitor process. | ||
| def get_topology(): | ||
| """ | ||
| Get topology of MPI cluster. | ||
| Parameters | ||
| ---------- | ||
| rank : int, optional | ||
| The rank to be checked. | ||
| If ``None``, the current rank is to be checked. | ||
| Returns | ||
| ------- | ||
| dict | ||
| Dictionary, containing workers ranks assignments by IP-addresses in | ||
| the form: `{"node_ip0": [rank_2, rank_3, ...], "node_ip1": [rank_i, ...], ...}`. | ||
| """ | ||
| mpi_state = MPIState.get_instance() | ||
| comm = mpi_state.comm | ||
| rank = mpi_state.rank | ||
| Returns | ||
| ------- | ||
| bool | ||
| True or False. | ||
| """ | ||
| if rank is None: | ||
| rank = self.global_rank | ||
| return rank in self.monitor_processes | ||
| hostname = socket.gethostname() | ||
| host = socket.gethostbyname(hostname) | ||
| cluster_info = comm.allgather((host, rank)) | ||
| topology = defaultdict(list) | ||
| def get_monitor_by_worker_rank(self, rank=None): | ||
| """ | ||
| Get the monitor process rank for the host that includes this rank. | ||
| for host, rank in cluster_info: | ||
| if rank not in [MPIRank.ROOT, MPIRank.MONITOR]: | ||
| topology[host].append(rank) | ||
| Parameters | ||
| ---------- | ||
| rank : int, optional | ||
| The global rank to search for a monitor process. | ||
| return dict(topology) | ||
| Returns | ||
| ------- | ||
| int | ||
| Rank of a monitor process. | ||
| """ | ||
| if not common.is_shared_memory_supported(): | ||
| return MPIRank.MONITOR | ||
| if rank is None: | ||
| rank = self.global_rank | ||
| host = self.host_by_rank[rank] | ||
| if host is None: | ||
| raise ValueError("Unknown rank of workers") | ||
| return self.topology[host][MPIRank.MONITOR] | ||
| class MPIRank: | ||
| """Class that describes rank assignment.""" | ||
| ROOT = 0 | ||
| MONITOR = 1 | ||
| FIRST_WORKER = 2 | ||
| # ---------------------------- # | ||
@@ -183,4 +301,4 @@ # Main communication utilities # | ||
| * This blocking send is used when we have to wait for completion of the communication, | ||
| which is necessary for the pipeline to continue, or when the receiver is waiting for a result. | ||
| Otherwise, use non-blocking ``mpi_isend_operation``. | ||
| which is necessary for the pipeline to continue, or when the receiver is waiting for a result. | ||
| Otherwise, use non-blocking ``mpi_isend_operation``. | ||
| * The special tag is used for this communication, namely, ``common.MPITag.OPERATION``. | ||
@@ -207,4 +325,4 @@ """ | ||
| * This blocking send is used when we have to wait for completion of the communication, | ||
| which is necessary for the pipeline to continue, or when the receiver is waiting for a result. | ||
| Otherwise, use non-blocking ``mpi_isend_object``. | ||
| which is necessary for the pipeline to continue, or when the receiver is waiting for a result. | ||
| Otherwise, use non-blocking ``mpi_isend_object``. | ||
| * The special tag is used for this communication, namely, ``common.MPITag.OBJECT``. | ||
@@ -260,3 +378,3 @@ """ | ||
| ----- | ||
| * The special tag is used for this communication, namely, ``common.MPITag.OBJECT``. | ||
| The special tag is used for this communication, namely, ``common.MPITag.OBJECT``. | ||
| """ | ||
@@ -325,3 +443,3 @@ return comm.isend(data, dest=dest_rank, tag=common.MPITag.OBJECT) | ||
| def mpi_send_buffer(comm, buffer_size, buffer, dest_rank): | ||
| def mpi_send_buffer(comm, buffer, dest_rank, data_type=MPI.CHAR, buffer_size=None): | ||
| """ | ||
@@ -334,4 +452,2 @@ Send buffer object to another MPI rank in a blocking way. | ||
| MPI communicator object. | ||
| buffer_size : int | ||
| Buffer size in bytes. | ||
| buffer : object | ||
@@ -341,2 +457,6 @@ Buffer object to send. | ||
| Target MPI process to transfer buffer. | ||
| data_type : MPI.Datatype, default: MPI.CHAR | ||
| MPI data type for sending data. | ||
| buffer_size: int, default: None | ||
| Buffer size in bytes. Send an additional message with a buffer size to prepare another process to receive if `buffer_size` is not None. | ||
@@ -346,9 +466,24 @@ Notes | ||
| * This blocking send is used when we have to wait for completion of the communication, | ||
| which is necessary for the pipeline to continue, or when the receiver is waiting for a result. | ||
| Otherwise, use non-blocking ``mpi_isend_buffer``. | ||
| which is necessary for the pipeline to continue, or when the receiver is waiting for a result. | ||
| Otherwise, use non-blocking ``mpi_isend_buffer``. | ||
| * The special tags are used for this communication, namely, | ||
| ``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``. | ||
| ``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``. | ||
| """ | ||
| comm.send(buffer_size, dest=dest_rank, tag=common.MPITag.OBJECT) | ||
| comm.Send([buffer, MPI.CHAR], dest=dest_rank, tag=common.MPITag.BUFFER) | ||
| if buffer_size: | ||
| comm.send(buffer_size, dest=dest_rank, tag=common.MPITag.OBJECT) | ||
| else: | ||
| buffer_size = len(buffer) | ||
| # Maximum block size MPI is able to send/recv | ||
| block_size = pkl5._bigmpi.blocksize | ||
| partitions = list(range(0, buffer_size, block_size)) | ||
| partitions.append(buffer_size) | ||
| num_partitions = len(partitions) | ||
| with pkl5._bigmpi as bigmpi: | ||
| for i in range(num_partitions): | ||
| if i + 1 < num_partitions: | ||
| comm.Send( | ||
| bigmpi(buffer[partitions[i] : partitions[i + 1]]), | ||
| dest=dest_rank, | ||
| tag=common.MPITag.BUFFER, | ||
| ) | ||
@@ -378,3 +513,3 @@ | ||
| ----- | ||
| * The special tags are used for this communication, namely, | ||
| The special tags are used for this communication, namely, | ||
| ``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``. | ||
@@ -385,8 +520,21 @@ """ | ||
| requests.append((h1, None)) | ||
| h2 = comm.Isend([buffer, MPI.CHAR], dest=dest_rank, tag=common.MPITag.BUFFER) | ||
| requests.append((h2, buffer)) | ||
| # Maximum block size MPI is able to send/recv | ||
| block_size = pkl5._bigmpi.blocksize | ||
| partitions = list(range(0, buffer_size, block_size)) | ||
| partitions.append(buffer_size) | ||
| num_partitions = len(partitions) | ||
| with pkl5._bigmpi as bigmpi: | ||
| for i in range(num_partitions): | ||
| if i + 1 < num_partitions: | ||
| h2 = comm.Isend( | ||
| bigmpi(buffer[partitions[i] : partitions[i + 1]]), | ||
| dest=dest_rank, | ||
| tag=common.MPITag.BUFFER, | ||
| ) | ||
| requests.append((h2, buffer)) | ||
| return requests | ||
| def mpi_recv_buffer(comm, source_rank): | ||
| def mpi_recv_buffer(comm, source_rank, result_buffer=None): | ||
| """ | ||
@@ -401,2 +549,4 @@ Receive data buffer. | ||
| Communication event source rank. | ||
| result_buffer : object, default: None | ||
| The array to be filled. If `result_buffer` is None, the buffer size will be requested and the necessary buffer created. | ||
@@ -410,11 +560,27 @@ Returns | ||
| ----- | ||
| * The special tags are used for this communication, namely, | ||
| The special tags are used for this communication, namely, | ||
| ``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``. | ||
| """ | ||
| buf_size = comm.recv(source=source_rank, tag=common.MPITag.OBJECT) | ||
| s_buffer = bytearray(buf_size) | ||
| comm.Recv([s_buffer, MPI.CHAR], source=source_rank, tag=common.MPITag.BUFFER) | ||
| return s_buffer | ||
| if result_buffer is None: | ||
| buf_size = comm.recv(source=source_rank, tag=common.MPITag.OBJECT) | ||
| result_buffer = bytearray(buf_size) | ||
| else: | ||
| buf_size = len(result_buffer) | ||
| # Maximum block size MPI is able to send/recv | ||
| block_size = pkl5._bigmpi.blocksize | ||
| partitions = list(range(0, buf_size, block_size)) | ||
| partitions.append(buf_size) | ||
| num_partitions = len(partitions) | ||
| with pkl5._bigmpi as bigmpi: | ||
| for i in range(num_partitions): | ||
| if i + 1 < num_partitions: | ||
| tmp_buffer = bytearray(partitions[i + 1] - partitions[i]) | ||
| comm.Recv( | ||
| bigmpi(tmp_buffer), source=source_rank, tag=common.MPITag.BUFFER | ||
| ) | ||
| result_buffer[partitions[i] : partitions[i + 1]] = tmp_buffer | ||
| return result_buffer | ||
| def mpi_busy_wait_recv(comm, source_rank): | ||
@@ -455,3 +621,3 @@ """ | ||
| def _send_complex_data_impl(comm, s_data, raw_buffers, buffer_count, dest_rank): | ||
| def _send_complex_data_impl(comm, s_data, raw_buffers, dest_rank, info_package): | ||
| """ | ||
@@ -468,8 +634,6 @@ Send already serialized complex data. | ||
| Pickle buffers list, out-of-band data collected with pickle 5 protocol. | ||
| buffer_count : list | ||
| List of the number of buffers for each object | ||
| to be serialized/deserialized using the pickle 5 protocol. | ||
| See details in :py:class:`~unidist.core.backends.mpi.core.serialization.ComplexDataSerializer`. | ||
| dest_rank : int | ||
| Target MPI process to transfer data. | ||
| info_package : unidist.core.backends.mpi.core.common.MetadataPackage | ||
| Required information to deserialize data on a receiver side. | ||
@@ -481,9 +645,4 @@ Notes | ||
| """ | ||
| info = { | ||
| "s_data_len": len(s_data), | ||
| "buffer_count": buffer_count, | ||
| "raw_buffers_len": [len(sbuf) for sbuf in raw_buffers], | ||
| } | ||
| comm.send(info, dest=dest_rank, tag=common.MPITag.OBJECT) | ||
| # wrap to dict for sending and correct deserialization of the object by the recipient | ||
| comm.send(dict(info_package), dest=dest_rank, tag=common.MPITag.OBJECT) | ||
| with pkl5._bigmpi as bigmpi: | ||
@@ -495,3 +654,3 @@ comm.Send(bigmpi(s_data), dest=dest_rank, tag=common.MPITag.BUFFER) | ||
| def send_complex_data(comm, data, dest_rank): | ||
| def send_complex_data(comm, data, dest_rank, is_serialized=False): | ||
| """ | ||
@@ -508,35 +667,41 @@ Send the data that consists of different user provided complex types, lambdas and buffers in a blocking way. | ||
| Target MPI process to transfer data. | ||
| is_serialized : bool, default: False | ||
| `data` is already serialized or not. | ||
| Returns | ||
| ------- | ||
| object | ||
| A serialized msgpack data. | ||
| list | ||
| A list of pickle buffers. | ||
| list | ||
| A list of buffers amount for each object. | ||
| dict | ||
| Serialized data for caching purpose. | ||
| Notes | ||
| ----- | ||
| * This blocking send is used when we have to wait for completion of the communication, | ||
| This blocking send is used when we have to wait for completion of the communication, | ||
| which is necessary for the pipeline to continue, or when the receiver is waiting for a result. | ||
| Otherwise, use non-blocking ``isend_complex_data``. | ||
| * The special tags are used for this communication, namely, | ||
| ``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``. | ||
| """ | ||
| serializer = ComplexDataSerializer() | ||
| # Main job | ||
| s_data = serializer.serialize(data) | ||
| # Retrive the metadata | ||
| raw_buffers = serializer.buffers | ||
| buffer_count = serializer.buffer_count | ||
| if is_serialized: | ||
| s_data = data["s_data"] | ||
| raw_buffers = data["raw_buffers"] | ||
| buffer_count = data["buffer_count"] | ||
| # pop `data_id` out of the dict because it will be send as part of metadata package | ||
| data_id = data.pop("id") | ||
| serialized_data = data | ||
| else: | ||
| data_id = data["id"] | ||
| serialized_data = serialize_complex_data(data) | ||
| s_data = serialized_data["s_data"] | ||
| raw_buffers = serialized_data["raw_buffers"] | ||
| buffer_count = serialized_data["buffer_count"] | ||
| info_package = common.MetadataPackage.get_local_info( | ||
| data_id, len(s_data), [len(sbuf) for sbuf in raw_buffers], buffer_count | ||
| ) | ||
| # MPI communication | ||
| _send_complex_data_impl(comm, s_data, raw_buffers, buffer_count, dest_rank) | ||
| _send_complex_data_impl(comm, s_data, raw_buffers, dest_rank, info_package) | ||
| # For caching purpose | ||
| return s_data, raw_buffers, buffer_count | ||
| return serialized_data | ||
| def _isend_complex_data_impl(comm, s_data, raw_buffers, buffer_count, dest_rank): | ||
| def _isend_complex_data_impl(comm, s_data, raw_buffers, dest_rank, info_package): | ||
| """ | ||
@@ -555,8 +720,6 @@ Send serialized complex data. | ||
| A list of pickle buffers. | ||
| buffer_count : list | ||
| List of the number of buffers for each object | ||
| to be serialized/deserialized using the pickle 5 protocol. | ||
| See details in :py:class:`~unidist.core.backends.mpi.core.serialization.ComplexDataSerializer`. | ||
| dest_rank : int | ||
| Target MPI process to transfer data. | ||
| info_package : unidist.core.backends.mpi.core.common.MetadataPackage | ||
| Required information to deserialize data on a receiver side. | ||
@@ -571,14 +734,8 @@ Returns | ||
| * The special tags are used for this communication, namely, | ||
| ``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``. | ||
| ``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``. | ||
| """ | ||
| handlers = [] | ||
| info = { | ||
| "s_data_len": len(s_data), | ||
| "buffer_count": buffer_count, | ||
| "raw_buffers_len": [len(sbuf) for sbuf in raw_buffers], | ||
| } | ||
| h1 = comm.isend(info, dest=dest_rank, tag=common.MPITag.OBJECT) | ||
| # wrap to dict for sending and correct deserialization of the object by the recipient | ||
| h1 = comm.isend(dict(info_package), dest=dest_rank, tag=common.MPITag.OBJECT) | ||
| handlers.append((h1, None)) | ||
| with pkl5._bigmpi as bigmpi: | ||
@@ -594,3 +751,3 @@ h2 = comm.Isend(bigmpi(s_data), dest=dest_rank, tag=common.MPITag.BUFFER) | ||
| def isend_complex_data(comm, data, dest_rank): | ||
| def isend_complex_data(comm, data, dest_rank, is_serialized=False): | ||
| """ | ||
@@ -609,2 +766,4 @@ Send the data that consists of different user provided complex types, lambdas and buffers in a non-blocking way. | ||
| Target MPI process to transfer data. | ||
| is_serialized : bool, default: False | ||
| `operation_data` is already serialized or not. | ||
@@ -624,17 +783,26 @@ Returns | ||
| ----- | ||
| * The special tags are used for this communication, namely, | ||
| The special tags are used for this communication, namely, | ||
| ``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``. | ||
| """ | ||
| handlers = [] | ||
| if is_serialized: | ||
| s_data = data["s_data"] | ||
| raw_buffers = data["raw_buffers"] | ||
| buffer_count = data["buffer_count"] | ||
| # pop `data_id` out of the dict because it will be send as part of metadata package | ||
| data_id = data.pop("id") | ||
| info_package = common.MetadataPackage.get_local_info( | ||
| data_id, len(s_data), [len(sbuf) for sbuf in raw_buffers], buffer_count | ||
| ) | ||
| else: | ||
| serialized_data = serialize_complex_data(data) | ||
| s_data = serialized_data["s_data"] | ||
| raw_buffers = serialized_data["raw_buffers"] | ||
| buffer_count = serialized_data["buffer_count"] | ||
| info_package = common.MetadataPackage.get_task_info( | ||
| len(s_data), [len(sbuf) for sbuf in raw_buffers], buffer_count | ||
| ) | ||
| serializer = ComplexDataSerializer() | ||
| # Main job | ||
| s_data = serializer.serialize(data) | ||
| # Retrive the metadata | ||
| raw_buffers = serializer.buffers | ||
| buffer_count = serializer.buffer_count | ||
| # Send message pack bytestring | ||
| handlers.extend( | ||
| _isend_complex_data_impl(comm, s_data, raw_buffers, buffer_count, dest_rank) | ||
| # MPI communication | ||
| handlers = _isend_complex_data_impl( | ||
| comm, s_data, raw_buffers, dest_rank, info_package | ||
| ) | ||
@@ -645,3 +813,3 @@ | ||
| def recv_complex_data(comm, source_rank): | ||
| def recv_complex_data(comm, source_rank, info_package): | ||
| """ | ||
@@ -658,2 +826,4 @@ Receive the data that may consist of different user provided complex types, lambdas and buffers. | ||
| Source MPI process to receive data from. | ||
| info_package : unidist.core.backends.mpi.core.common.MetadataPackage | ||
| Required information to deserialize data. | ||
@@ -667,9 +837,8 @@ Returns | ||
| ----- | ||
| * The special tags are used for this communication, namely, | ||
| The special tags are used for this communication, namely, | ||
| ``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``. | ||
| """ | ||
| info = comm.recv(source=source_rank, tag=common.MPITag.OBJECT) | ||
| msgpack_buffer = bytearray(info["s_data_len"]) | ||
| buffer_count = info["buffer_count"] | ||
| raw_buffers = list(map(bytearray, info["raw_buffers_len"])) | ||
| msgpack_buffer = bytearray(info_package["s_data_len"]) | ||
| buffer_count = info_package["buffer_count"] | ||
| raw_buffers = list(map(bytearray, info_package["raw_buffers_len"])) | ||
| with pkl5._bigmpi as bigmpi: | ||
@@ -680,9 +849,5 @@ comm.Recv(bigmpi(msgpack_buffer), source=source_rank, tag=common.MPITag.BUFFER) | ||
| # Set the necessary metadata for unpacking | ||
| deserializer = ComplexDataSerializer(raw_buffers, buffer_count) | ||
| return deserialize_complex_data(msgpack_buffer, raw_buffers, buffer_count) | ||
| # Start unpacking | ||
| return deserializer.deserialize(msgpack_buffer) | ||
| # ---------- # | ||
@@ -711,7 +876,7 @@ # Public API # | ||
| * This blocking send is used when we have to wait for completion of the communication, | ||
| which is necessary for the pipeline to continue, or when the receiver is waiting for a result. | ||
| Otherwise, use non-blocking ``isend_simple_operation``. | ||
| which is necessary for the pipeline to continue, or when the receiver is waiting for a result. | ||
| Otherwise, use non-blocking ``isend_simple_operation``. | ||
| * Serialization of the data to be sent takes place just using ``pickle.dump`` in this case. | ||
| * The special tags are used for this communication, namely, | ||
| ``common.MPITag.OPERATION`` and ``common.MPITag.OBJECT``. | ||
| ``common.MPITag.OPERATION`` and ``common.MPITag.OBJECT``. | ||
| """ | ||
@@ -748,3 +913,3 @@ # Send operation type | ||
| * The special tags are used for this communication, namely, | ||
| ``common.MPITag.OPERATION`` and ``common.MPITag.OBJECT``. | ||
| ``common.MPITag.OPERATION`` and ``common.MPITag.OBJECT``. | ||
| """ | ||
@@ -783,14 +948,15 @@ # Send operation type | ||
| `operation_data` is already serialized or not. | ||
| - `operation_data` is always serialized for data | ||
| that has already been saved into the object store. | ||
| - `operation_data` is always not serialized | ||
| for sending a task or an actor (actor method). | ||
| Returns | ||
| ------- | ||
| dict and dict or dict and None | ||
| Async handlers and serialization data for caching purpose. | ||
| list and dict | ||
| Async handlers list and serialization data dict for caching purpose. | ||
| Notes | ||
| ----- | ||
| * Function always returns a ``dict`` containing async handlers to the sent MPI operations. | ||
| In addition, ``None`` is returned if `operation_data` is already serialized, | ||
| otherwise ``dict`` containing data serialized in this function. | ||
| * The special tags are used for this communication, namely, | ||
| The special tags are used for this communication, namely, | ||
| ``common.MPITag.OPERATION``, ``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``. | ||
@@ -809,9 +975,11 @@ """ | ||
| buffer_count = operation_data["buffer_count"] | ||
| # pop `data_id` out of the dict because it will be send as part of metadata package | ||
| data_id = operation_data.pop("id") | ||
| info_package = common.MetadataPackage.get_local_info( | ||
| data_id, len(s_data), [len(sbuf) for sbuf in raw_buffers], buffer_count | ||
| ) | ||
| h2_list = _isend_complex_data_impl( | ||
| comm, s_data, raw_buffers, buffer_count, dest_rank | ||
| comm, s_data, raw_buffers, dest_rank, info_package | ||
| ) | ||
| handlers.extend(h2_list) | ||
| return handlers, None | ||
| else: | ||
@@ -823,7 +991,7 @@ # Serialize and send the data | ||
| handlers.extend(h2_list) | ||
| return handlers, { | ||
| "s_data": s_data, | ||
| "raw_buffers": raw_buffers, | ||
| "buffer_count": buffer_count, | ||
| } | ||
| return handlers, { | ||
| "s_data": s_data, | ||
| "raw_buffers": raw_buffers, | ||
| "buffer_count": buffer_count, | ||
| } | ||
@@ -891,1 +1059,35 @@ | ||
| return SimpleDataSerializer().deserialize_pickle(s_buffer) | ||
| def send_reserve_operation(comm, data_id, data_size): | ||
| """ | ||
| Reserve shared memory for `data_id`. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| data_size : int | ||
| Length of a required range in shared memory. | ||
| Returns | ||
| ------- | ||
| dict | ||
| Reservation info about the allocated range in shared memory. | ||
| """ | ||
| mpi_state = MPIState.get_instance() | ||
| operation_type = common.Operation.RESERVE_SHARED_MEMORY | ||
| operation_data = { | ||
| "id": data_id, | ||
| "size": data_size, | ||
| } | ||
| # We use a blocking send here because we have to wait for | ||
| # completion of the communication, which is necessary for the pipeline to continue. | ||
| send_simple_operation( | ||
| comm, | ||
| operation_type, | ||
| operation_data, | ||
| mpi_state.get_monitor_by_worker_rank(), | ||
| ) | ||
| return mpi_recv_object(comm, mpi_state.get_monitor_by_worker_rank()) |
@@ -10,3 +10,3 @@ # Copyright (C) 2021-2023 Modin authors | ||
| from unidist.core.backends.mpi.core.async_operations import AsyncOperations | ||
| from unidist.core.backends.mpi.core.controller.object_store import object_store | ||
| from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore | ||
| from unidist.core.backends.mpi.core.controller.garbage_collector import ( | ||
@@ -16,2 +16,3 @@ garbage_collector, | ||
| from unidist.core.backends.mpi.core.controller.common import push_data, RoundRobin | ||
| from unidist.core.backends.mpi.core.controller.api import put | ||
@@ -38,26 +39,27 @@ | ||
| def __call__(self, *args, num_returns=1, **kwargs): | ||
| output_id = object_store.generate_output_data_id( | ||
| local_store = LocalObjectStore.get_instance() | ||
| output_id = local_store.generate_output_data_id( | ||
| self._actor._owner_rank, garbage_collector, num_returns | ||
| ) | ||
| unwrapped_args = [common.unwrap_data_ids(arg) for arg in args] | ||
| unwrapped_kwargs = {k: common.unwrap_data_ids(v) for k, v in kwargs.items()} | ||
| push_data(self._actor._owner_rank, self._method_name) | ||
| push_data(self._actor._owner_rank, args) | ||
| push_data(self._actor._owner_rank, kwargs) | ||
| push_data(self._actor._owner_rank, unwrapped_args) | ||
| push_data(self._actor._owner_rank, unwrapped_kwargs) | ||
| operation_type = common.Operation.ACTOR_EXECUTE | ||
| operation_data = { | ||
| "task": self._method_name, | ||
| "args": unwrapped_args, | ||
| "kwargs": unwrapped_kwargs, | ||
| "output": common.master_data_ids_to_base(output_id), | ||
| "handler": self._actor._handler_id.base_data_id(), | ||
| # tuple cannot be serialized iteratively and it will fail if some internal data cannot be serialized using Pickle | ||
| "args": list(args), | ||
| "kwargs": kwargs, | ||
| "output": output_id, | ||
| "handler": self._actor._handler_id, | ||
| } | ||
| async_operations = AsyncOperations.get_instance() | ||
| h_list, _ = communication.isend_complex_operation( | ||
| communication.MPIState.get_instance().comm, | ||
| communication.MPIState.get_instance().global_comm, | ||
| operation_type, | ||
| operation_data, | ||
| self._actor._owner_rank, | ||
| is_serialized=False, | ||
| ) | ||
@@ -81,3 +83,3 @@ async_operations.extend(h_list) | ||
| Used for proper serialization/deserialization via `__reduce__`. | ||
| handler_id : None or unidist.core.backends.mpi.core.common.MasterDataID | ||
| handler_id : None or unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to the actor class. | ||
@@ -102,8 +104,9 @@ Used for proper serialization/deserialization via `__reduce__`. | ||
| ) | ||
| local_store = LocalObjectStore.get_instance() | ||
| self._handler_id = ( | ||
| object_store.generate_data_id(garbage_collector) | ||
| local_store.generate_data_id(garbage_collector) | ||
| if handler_id is None | ||
| else handler_id | ||
| ) | ||
| object_store.put_data_owner(self._handler_id, self._owner_rank) | ||
| local_store.put_data_owner(self._handler_id, self._owner_rank) | ||
@@ -120,10 +123,11 @@ # reserve a rank for actor execution only | ||
| "kwargs": kwargs, | ||
| "handler": self._handler_id.base_data_id(), | ||
| "handler": self._handler_id, | ||
| } | ||
| async_operations = AsyncOperations.get_instance() | ||
| h_list, _ = communication.isend_complex_operation( | ||
| communication.MPIState.get_instance().comm, | ||
| communication.MPIState.get_instance().global_comm, | ||
| operation_type, | ||
| operation_data, | ||
| self._owner_rank, | ||
| is_serialized=False, | ||
| ) | ||
@@ -188,4 +192,11 @@ async_operations.extend(h_list) | ||
| # Cache for serialized actor methods {"method_name": DataID} | ||
| actor_methods = {} | ||
| def __getattr__(self, name): | ||
| return ActorMethod(self, name) | ||
| data_id_to_method = self.actor_methods.get(name, None) | ||
| if data_id_to_method is None: | ||
| data_id_to_method = put(name) | ||
| self.actor_methods[name] = data_id_to_method | ||
| return ActorMethod(self, data_id_to_method) | ||
@@ -192,0 +203,0 @@ def __del__(self): |
@@ -21,3 +21,5 @@ # Copyright (C) 2021-2023 Modin authors | ||
| from unidist.core.backends.mpi.core.controller.object_store import object_store | ||
| from unidist.core.backends.mpi.core.serialization import serialize_complex_data | ||
| from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore | ||
| from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore | ||
| from unidist.core.backends.mpi.core.controller.garbage_collector import ( | ||
@@ -36,3 +38,3 @@ garbage_collector, | ||
| CpuCount, | ||
| IsMpiSpawnWorkers, | ||
| MpiSpawn, | ||
| MpiHosts, | ||
@@ -43,2 +45,7 @@ ValueSource, | ||
| MpiLog, | ||
| MpiSharedObjectStore, | ||
| MpiSharedObjectStoreMemory, | ||
| MpiSharedServiceMemory, | ||
| MpiSharedObjectStoreThreshold, | ||
| MpiRuntimeEnv, | ||
| ) | ||
@@ -54,4 +61,2 @@ | ||
| # The topology of MPI cluster gets available when MPI initialization in `init` | ||
| topology = dict() | ||
| # The global variable is responsible for if MPI backend has already been initialized | ||
@@ -139,74 +144,110 @@ is_mpi_initialized = False | ||
| runtime_env = MpiRuntimeEnv.get() | ||
| comm = MPI.COMM_WORLD | ||
| rank = comm.Get_rank() | ||
| parent_comm = MPI.Comm.Get_parent() | ||
| # path to dynamically spawn MPI processes | ||
| if rank == 0 and parent_comm == MPI.COMM_NULL: | ||
| if IsMpiSpawnWorkers.get(): | ||
| nprocs_to_spawn = CpuCount.get() + 1 # +1 for monitor process | ||
| args = _get_py_flags() | ||
| args += ["-c"] | ||
| py_str = [ | ||
| "import unidist", | ||
| "import unidist.config as cfg", | ||
| "cfg.Backend.put('mpi')", | ||
| # Path to dynamically spawn MPI processes. | ||
| # If a requirement is not met, processes have been started with mpiexec -n <N>, where N > 1. | ||
| if rank == 0 and parent_comm == MPI.COMM_NULL and MpiSpawn.get(): | ||
| args = _get_py_flags() | ||
| args += ["-c"] | ||
| py_str = [ | ||
| "import unidist", | ||
| "import unidist.config as cfg", | ||
| "cfg.Backend.put('mpi')", | ||
| ] | ||
| if MpiSpawn.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [f"cfg.MpiSpawn.put({MpiSpawn.get()})"] | ||
| if MpiHosts.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [f"cfg.MpiHosts.put('{MpiHosts.get()}')"] | ||
| if CpuCount.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [f"cfg.CpuCount.put({CpuCount.get()})"] | ||
| if MpiPickleThreshold.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [f"cfg.MpiPickleThreshold.put({MpiPickleThreshold.get()})"] | ||
| if MpiBackoff.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [f"cfg.MpiBackoff.put({MpiBackoff.get()})"] | ||
| if MpiLog.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [f"cfg.MpiLog.put({MpiLog.get()})"] | ||
| if MpiSharedObjectStore.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [f"cfg.MpiSharedObjectStore.put({MpiSharedObjectStore.get()})"] | ||
| if MpiSharedObjectStoreMemory.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [ | ||
| f"cfg.MpiSharedObjectStoreMemory.put({MpiSharedObjectStoreMemory.get()})" | ||
| ] | ||
| if IsMpiSpawnWorkers.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [f"cfg.IsMpiSpawnWorkers.put({IsMpiSpawnWorkers.get()})"] | ||
| if MpiHosts.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [f"cfg.MpiHosts.put('{MpiHosts.get()}')"] | ||
| if CpuCount.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [f"cfg.CpuCount.put({CpuCount.get()})"] | ||
| if MpiPickleThreshold.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [f"cfg.MpiPickleThreshold.put({MpiPickleThreshold.get()})"] | ||
| if MpiBackoff.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [f"cfg.MpiBackoff.put({MpiBackoff.get()})"] | ||
| if MpiLog.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [f"cfg.MpiLog.put({MpiLog.get()})"] | ||
| py_str += ["unidist.init()"] | ||
| py_str = "; ".join(py_str) | ||
| args += [py_str] | ||
| if MpiSharedServiceMemory.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [ | ||
| f"cfg.MpiSharedServiceMemory.put({MpiSharedServiceMemory.get()})" | ||
| ] | ||
| if MpiSharedObjectStoreThreshold.get_value_source() != ValueSource.DEFAULT: | ||
| py_str += [ | ||
| f"cfg.MpiSharedObjectStoreThreshold.put({MpiSharedObjectStoreThreshold.get()})" | ||
| ] | ||
| if runtime_env: | ||
| py_str += [f"cfg.MpiRuntimeEnv.put({runtime_env})"] | ||
| env_vars = ["import os"] | ||
| for option, value in runtime_env.items(): | ||
| if option == MpiRuntimeEnv.env_vars: | ||
| for env_var_name, env_var_value in value.items(): | ||
| env_vars += [ | ||
| f"os.environ['{env_var_name}'] = '{env_var_value}'" | ||
| ] | ||
| py_str += env_vars | ||
| py_str += ["unidist.init()"] | ||
| py_str = "; ".join(py_str) | ||
| args += [py_str] | ||
| hosts = MpiHosts.get() | ||
| info = MPI.Info.Create() | ||
| lib_version = MPI.Get_library_version() | ||
| if "Intel" in lib_version: | ||
| # To make dynamic spawn of MPI processes work properly | ||
| # we should set this environment variable. | ||
| # See more about Intel MPI environment variables in | ||
| # https://www.intel.com/content/www/us/en/docs/mpi-library/developer-reference-linux/2021-8/other-environment-variables.html. | ||
| os.environ["I_MPI_SPAWN"] = "1" | ||
| cpu_count = CpuCount.get() | ||
| hosts = MpiHosts.get() | ||
| info = MPI.Info.Create() | ||
| lib_version = MPI.Get_library_version() | ||
| if "Intel" in lib_version: | ||
| # To make dynamic spawn of MPI processes work properly | ||
| # we should set this environment variable. | ||
| # See more about Intel MPI environment variables in | ||
| # https://www.intel.com/content/www/us/en/docs/mpi-library/developer-reference-linux/2021-8/other-environment-variables.html. | ||
| os.environ["I_MPI_SPAWN"] = "1" | ||
| if hosts: | ||
| if "Open MPI" in lib_version: | ||
| host_list = str(hosts).split(",") | ||
| workers_per_host = [ | ||
| int(nprocs_to_spawn / len(host_list)) | ||
| + (1 if i < nprocs_to_spawn % len(host_list) else 0) | ||
| for i in range(len(host_list)) | ||
| ] | ||
| hosts = ",".join( | ||
| [ | ||
| f"{host}:{workers_per_host[i]}" | ||
| for i, host in enumerate(host_list) | ||
| ] | ||
| ) | ||
| info.Set("add-host", hosts) | ||
| else: | ||
| info.Set("hosts", hosts) | ||
| host_list = hosts.split(",") if hosts is not None else ["localhost"] | ||
| host_count = len(host_list) | ||
| intercomm = MPI.COMM_SELF.Spawn( | ||
| sys.executable, | ||
| args, | ||
| maxprocs=nprocs_to_spawn, | ||
| info=info, | ||
| root=rank, | ||
| ) | ||
| comm = intercomm.Merge(high=False) | ||
| # path for processes to be started by mpiexec -n <N>, where N > 1 | ||
| if common.is_shared_memory_supported(): | ||
| # +host_count to add monitor process on each host | ||
| nprocs_to_spawn = cpu_count + host_count | ||
| else: | ||
| comm = MPI.COMM_WORLD | ||
| # +1 for just a single process monitor | ||
| nprocs_to_spawn = cpu_count + 1 | ||
| if host_count > 1: | ||
| if "Open MPI" in lib_version: | ||
| # +1 to take into account the current root process | ||
| # to correctly allocate slots | ||
| slot_count = nprocs_to_spawn + 1 | ||
| slots_per_host = [ | ||
| int(slot_count / host_count) | ||
| + (1 if i < slot_count % host_count else 0) | ||
| for i in range(host_count) | ||
| ] | ||
| hosts = ",".join( | ||
| [f"{host_list[i]}:{slots_per_host[i]}" for i in range(host_count)] | ||
| ) | ||
| info.Set("add-host", hosts) | ||
| else: | ||
| info.Set("hosts", hosts) | ||
| intercomm = MPI.COMM_SELF.Spawn( | ||
| sys.executable, | ||
| args, | ||
| maxprocs=nprocs_to_spawn, | ||
| info=info, | ||
| root=rank, | ||
| ) | ||
| comm = intercomm.Merge(high=False) | ||
| else: | ||
| if runtime_env and rank != 0: | ||
| for option, value in runtime_env.items(): | ||
| if option == MpiRuntimeEnv.env_vars: | ||
| for env_var_name, env_var_value in value.items(): | ||
| os.environ[env_var_name] = str(env_var_value) | ||
| # path for spawned MPI processes to be merged with the parent communicator | ||
@@ -216,10 +257,4 @@ if parent_comm != MPI.COMM_NULL: | ||
| mpi_state = communication.MPIState.get_instance( | ||
| comm, comm.Get_rank(), comm.Get_size() | ||
| ) | ||
| mpi_state = communication.MPIState.get_instance(comm) | ||
| global topology | ||
| if not topology: | ||
| topology = communication.get_topology() | ||
| global is_mpi_initialized | ||
@@ -229,11 +264,15 @@ if not is_mpi_initialized: | ||
| if mpi_state.rank == communication.MPIRank.ROOT: | ||
| # Initalize shared memory | ||
| SharedObjectStore.get_instance() | ||
| if mpi_state.is_root_process(): | ||
| atexit.register(_termination_handler) | ||
| signal.signal(signal.SIGTERM, _termination_handler) | ||
| signal.signal(signal.SIGINT, _termination_handler) | ||
| # Exit the init function in root only after monitor and worker loops have started | ||
| mpi_state.global_comm.Barrier() | ||
| return | ||
| elif mpi_state.is_monitor_process(): | ||
| from unidist.core.backends.mpi.core.monitor.loop import monitor_loop | ||
| if mpi_state.rank == communication.MPIRank.MONITOR: | ||
| from unidist.core.backends.mpi.core.monitor import monitor_loop | ||
| monitor_loop() | ||
@@ -243,10 +282,6 @@ # If the user executes a program in SPMD mode, | ||
| # so just killing them. | ||
| if not IsMpiSpawnWorkers.get(): | ||
| if not MpiSpawn.get(): | ||
| sys.exit() | ||
| return | ||
| if mpi_state.rank not in ( | ||
| communication.MPIRank.ROOT, | ||
| communication.MPIRank.MONITOR, | ||
| ): | ||
| else: | ||
| from unidist.core.backends.mpi.core.worker.loop import worker_loop | ||
@@ -258,3 +293,3 @@ | ||
| # so just killing them. | ||
| if not IsMpiSpawnWorkers.get(): | ||
| if not MpiSpawn.get(): | ||
| sys.exit() | ||
@@ -292,7 +327,7 @@ return | ||
| # Send shutdown commands to all ranks | ||
| for rank_id in range(communication.MPIRank.FIRST_WORKER, mpi_state.world_size): | ||
| for rank_id in mpi_state.workers: | ||
| # We use a blocking send here because we have to wait for | ||
| # completion of the communication, which is necessary for the pipeline to continue. | ||
| communication.mpi_send_operation( | ||
| mpi_state.comm, | ||
| mpi_state.global_comm, | ||
| common.Operation.CANCEL, | ||
@@ -304,3 +339,3 @@ rank_id, | ||
| op_type = communication.mpi_recv_object( | ||
| mpi_state.comm, | ||
| mpi_state.global_comm, | ||
| communication.MPIRank.MONITOR, | ||
@@ -310,4 +345,6 @@ ) | ||
| raise ValueError(f"Got wrong operation type {op_type}.") | ||
| SharedObjectStore.get_instance().finalize() | ||
| if not MPI.Is_finalized(): | ||
| MPI.Finalize() | ||
| logger.debug("Shutdown root") | ||
@@ -327,9 +364,16 @@ is_mpi_shutdown = True | ||
| """ | ||
| global topology | ||
| if not topology: | ||
| mpi_state = communication.MPIState.get_instance() | ||
| if mpi_state is None: | ||
| raise RuntimeError("'unidist.init()' has not been called yet") | ||
| cluster_resources = defaultdict(dict) | ||
| for host, ranks_list in topology.items(): | ||
| cluster_resources[host]["CPU"] = len(ranks_list) | ||
| for host in mpi_state.topology: | ||
| cluster_resources[host]["CPU"] = len( | ||
| [ | ||
| r | ||
| for r in mpi_state.topology[host].values() | ||
| if not mpi_state.is_root_process(r) | ||
| and not mpi_state.is_monitor_process(r) | ||
| ] | ||
| ) | ||
@@ -350,8 +394,16 @@ return dict(cluster_resources) | ||
| ------- | ||
| unidist.core.backends.mpi.core.common.MasterDataID | ||
| unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID of an object in object storage. | ||
| """ | ||
| data_id = object_store.generate_data_id(garbage_collector) | ||
| object_store.put(data_id, data) | ||
| local_store = LocalObjectStore.get_instance() | ||
| shared_store = SharedObjectStore.get_instance() | ||
| data_id = local_store.generate_data_id(garbage_collector) | ||
| serialized_data = serialize_complex_data(data) | ||
| local_store.put(data_id, data) | ||
| if shared_store.is_allocated(): | ||
| shared_store.put(data_id, serialized_data) | ||
| else: | ||
| local_store.cache_serialized_data(data_id, serialized_data) | ||
| logger.debug("PUT {} id".format(data_id._id)) | ||
@@ -368,3 +420,3 @@ | ||
| ---------- | ||
| data_ids : unidist.core.backends.common.data_id.DataID or list | ||
| data_ids : unidist.core.backends.mpi.core.common.MpiDataID or list | ||
| An ID(s) to object(s) to get data from. | ||
@@ -377,6 +429,11 @@ | ||
| """ | ||
| local_store = LocalObjectStore.get_instance() | ||
| is_list = isinstance(data_ids, list) | ||
| if not is_list: | ||
| data_ids = [data_ids] | ||
| def get_impl(data_id): | ||
| if object_store.contains(data_id): | ||
| value = object_store.get(data_id) | ||
| if local_store.contains(data_id): | ||
| value = local_store.get(data_id) | ||
| else: | ||
@@ -391,9 +448,3 @@ value = request_worker_data(data_id) | ||
| logger.debug("GET {} ids".format(common.unwrapped_data_ids_list(data_ids))) | ||
| is_list = isinstance(data_ids, list) | ||
| if not is_list: | ||
| data_ids = [data_ids] | ||
| values = [get_impl(data_id) for data_id in data_ids] | ||
| # Initiate reference count based cleaup | ||
@@ -416,3 +467,3 @@ # if all the tasks were completed | ||
| ---------- | ||
| data_ids : unidist.core.backends.mpi.core.common.MasterDataID or list | ||
| data_ids : unidist.core.backends.mpi.core.common.MpiDataID or list | ||
| ``DataID`` or list of ``DataID``-s to be waited. | ||
@@ -429,3 +480,3 @@ num_returns : int, default: 1 | ||
| data_ids = [data_ids] | ||
| # Since the controller should operate MasterDataID(s), | ||
| # Since the controller should operate MpiDataID(s), | ||
| # we use this map to retrieve and return them | ||
@@ -437,6 +488,7 @@ # instead of DataID(s) received from workers. | ||
| ready = [] | ||
| local_store = LocalObjectStore.get_instance() | ||
| logger.debug("WAIT {} ids".format(common.unwrapped_data_ids_list(data_ids))) | ||
| for data_id in not_ready: | ||
| if object_store.contains(data_id): | ||
| for data_id in not_ready.copy(): | ||
| if local_store.contains(data_id): | ||
| ready.append(data_id) | ||
@@ -449,3 +501,2 @@ not_ready.remove(data_id) | ||
| operation_type = common.Operation.WAIT | ||
| not_ready = [common.unwrap_data_ids(arg) for arg in not_ready] | ||
| operation_data = { | ||
@@ -456,17 +507,18 @@ "data_ids": not_ready, | ||
| mpi_state = communication.MPIState.get_instance() | ||
| root_monitor = mpi_state.get_monitor_by_worker_rank(communication.MPIRank.ROOT) | ||
| # We use a blocking send and recv here because we have to wait for | ||
| # completion of the communication, which is necessary for the pipeline to continue. | ||
| communication.send_simple_operation( | ||
| mpi_state.comm, | ||
| mpi_state.global_comm, | ||
| operation_type, | ||
| operation_data, | ||
| communication.MPIRank.MONITOR, | ||
| root_monitor, | ||
| ) | ||
| data = communication.mpi_recv_object( | ||
| mpi_state.comm, | ||
| communication.MPIRank.MONITOR, | ||
| mpi_state.global_comm, | ||
| root_monitor, | ||
| ) | ||
| ready.extend(data["ready"]) | ||
| not_ready = data["not_ready"] | ||
| # We have to retrieve and return MasterDataID(s) | ||
| # We have to retrieve and return MpiDataID(s) | ||
| # in order for the controller to operate them in further operations. | ||
@@ -500,3 +552,3 @@ ready = [data_id_map[data_id] for data_id in ready] | ||
| ------- | ||
| unidist.core.backends.mpi.core.common.MasterDataID or list or None | ||
| unidist.core.backends.mpi.core.common.MpiDataID or list or None | ||
| Type of returns depends on `num_returns` value: | ||
@@ -514,3 +566,4 @@ | ||
| output_ids = object_store.generate_output_data_id( | ||
| local_store = LocalObjectStore.get_instance() | ||
| output_ids = local_store.generate_output_data_id( | ||
| dest_rank, garbage_collector, num_returns | ||
@@ -531,21 +584,21 @@ ) | ||
| unwrapped_args = [common.unwrap_data_ids(arg) for arg in args] | ||
| unwrapped_kwargs = {k: common.unwrap_data_ids(v) for k, v in kwargs.items()} | ||
| push_data(dest_rank, task) | ||
| push_data(dest_rank, args) | ||
| push_data(dest_rank, kwargs) | ||
| push_data(dest_rank, unwrapped_args) | ||
| push_data(dest_rank, unwrapped_kwargs) | ||
| operation_type = common.Operation.EXECUTE | ||
| operation_data = { | ||
| "task": task, | ||
| "args": unwrapped_args, | ||
| "kwargs": unwrapped_kwargs, | ||
| "output": common.master_data_ids_to_base(output_ids), | ||
| # tuple cannot be serialized iteratively and it will fail if some internal data cannot be serialized using Pickle | ||
| "args": list(args), | ||
| "kwargs": kwargs, | ||
| "output": output_ids, | ||
| } | ||
| async_operations = AsyncOperations.get_instance() | ||
| h_list, _ = communication.isend_complex_operation( | ||
| communication.MPIState.get_instance().comm, | ||
| communication.MPIState.get_instance().global_comm, | ||
| operation_type, | ||
| operation_data, | ||
| dest_rank, | ||
| is_serialized=False, | ||
| ) | ||
@@ -552,0 +605,0 @@ async_operations.extend(h_list) |
@@ -13,12 +13,10 @@ # Copyright (C) 2021-2023 Modin authors | ||
| from unidist.core.backends.mpi.core.async_operations import AsyncOperations | ||
| from unidist.core.backends.mpi.core.controller.object_store import object_store | ||
| from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore | ||
| from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore | ||
| from unidist.core.backends.mpi.core.serialization import serialize_complex_data | ||
| logger = common.get_logger("common", "common.log") | ||
| # initial worker number is equal to rank 2 because | ||
| # rank 0 is for controller process | ||
| # rank 1 is for monitor process | ||
| initial_worker_number = 2 | ||
| class RoundRobin: | ||
@@ -29,17 +27,13 @@ __instance = None | ||
| self.reserved_ranks = [] | ||
| mpi_state = communication.MPIState.get_instance() | ||
| self.rank_to_schedule = itertools.cycle( | ||
| ( | ||
| rank | ||
| for rank in range( | ||
| initial_worker_number, | ||
| communication.MPIState.get_instance().world_size, | ||
| ) | ||
| global_rank | ||
| for global_rank in mpi_state.workers | ||
| # check if a rank to schedule is not equal to the rank | ||
| # of the current process to not get into recursive scheduling | ||
| if rank != communication.MPIState.get_instance().rank | ||
| if global_rank != mpi_state.global_rank | ||
| ) | ||
| ) | ||
| logger.debug( | ||
| f"RoundRobin init for {communication.MPIState.get_instance().rank} rank" | ||
| ) | ||
| logger.debug(f"RoundRobin init for {mpi_state.global_rank} rank") | ||
@@ -69,7 +63,6 @@ @classmethod | ||
| next_rank = None | ||
| mpi_state = communication.MPIState.get_instance() | ||
| # Go rank by rank to find the first one non-reserved | ||
| for _ in range( | ||
| initial_worker_number, communication.MPIState.get_instance().world_size | ||
| ): | ||
| for _ in mpi_state.workers: | ||
| rank = next(self.rank_to_schedule) | ||
@@ -100,3 +93,3 @@ if rank not in self.reserved_ranks: | ||
| f"RoundRobin reserve rank {rank} for actor " | ||
| + f"on worker with rank {communication.MPIState.get_instance().rank}" | ||
| + f"on worker with rank {communication.MPIState.get_instance().global_rank}" | ||
| ) | ||
@@ -118,6 +111,58 @@ | ||
| f"RoundRobin release rank {rank} reserved for actor " | ||
| + f"on worker with rank {communication.MPIState.get_instance().rank}" | ||
| + f"on worker with rank {communication.MPIState.get_instance().global_rank}" | ||
| ) | ||
| def pull_data(comm, owner_rank): | ||
| """ | ||
| Receive data from another MPI process. | ||
| Data can come from shared memory or direct communication depending on the package type. | ||
| The data is de-serialized from received buffers. | ||
| Parameters | ||
| ---------- | ||
| owner_rank : int | ||
| Source MPI process to receive data from. | ||
| Returns | ||
| ------- | ||
| object | ||
| Received data object from another MPI process. | ||
| """ | ||
| info_package = communication.mpi_recv_object(comm, owner_rank) | ||
| if info_package["package_type"] == common.MetadataPackage.SHARED_DATA: | ||
| local_store = LocalObjectStore.get_instance() | ||
| shared_store = SharedObjectStore.get_instance() | ||
| data_id = info_package["id"] | ||
| if local_store.contains(data_id): | ||
| return { | ||
| "id": data_id, | ||
| "data": local_store.get(data_id), | ||
| } | ||
| data = shared_store.get(data_id, owner_rank, info_package) | ||
| local_store.put(data_id, data) | ||
| return { | ||
| "id": data_id, | ||
| "data": data, | ||
| } | ||
| elif info_package["package_type"] == common.MetadataPackage.LOCAL_DATA: | ||
| local_store = LocalObjectStore.get_instance() | ||
| data = communication.recv_complex_data( | ||
| comm, owner_rank, info_package=info_package | ||
| ) | ||
| return { | ||
| "id": info_package["id"], | ||
| "data": data, | ||
| } | ||
| elif info_package["package_type"] == common.MetadataPackage.TASK_DATA: | ||
| return communication.recv_complex_data( | ||
| comm, owner_rank, info_package=info_package | ||
| ) | ||
| else: | ||
| raise ValueError("Unexpected package of data info!") | ||
| def request_worker_data(data_id): | ||
@@ -129,3 +174,3 @@ """ | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID(s) to object(s) to get data from. | ||
@@ -139,5 +184,4 @@ | ||
| mpi_state = communication.MPIState.get_instance() | ||
| owner_rank = object_store.get_data_owner(data_id) | ||
| local_store = LocalObjectStore.get_instance() | ||
| owner_rank = local_store.get_data_owner(data_id) | ||
| logger.debug("GET {} id from {} rank".format(data_id._id, owner_rank)) | ||
@@ -148,4 +192,4 @@ | ||
| operation_data = { | ||
| "source": mpi_state.rank, | ||
| "id": data_id.base_data_id(), | ||
| "source": mpi_state.global_rank, | ||
| "id": data_id, | ||
| # set `is_blocking_op` to `True` to tell a worker | ||
@@ -159,3 +203,3 @@ # to send the data directly to the requester | ||
| communication.send_simple_operation( | ||
| mpi_state.comm, | ||
| mpi_state.global_comm, | ||
| operation_type, | ||
@@ -165,13 +209,14 @@ operation_data, | ||
| ) | ||
| # Blocking get | ||
| data = communication.recv_complex_data(mpi_state.comm, owner_rank) | ||
| complex_data = pull_data(mpi_state.global_comm, owner_rank) | ||
| if data_id != complex_data["id"]: | ||
| raise ValueError("Unexpected data_id!") | ||
| data = complex_data["data"] | ||
| # Caching the result, check the protocol correctness here | ||
| object_store.put(data_id, data) | ||
| local_store.put(data_id, data) | ||
| return data | ||
| def _push_local_data(dest_rank, data_id): | ||
| def _push_local_data(dest_rank, data_id, is_blocking_op, is_serialized): | ||
| """ | ||
@@ -184,7 +229,15 @@ Send local data associated with passed ID to target rank. | ||
| Target rank. | ||
| data_id : unidist.core.backends.mpi.core.common.MasterDataID | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| is_blocking_op : bool | ||
| Whether the communication should be blocking or not. | ||
| If ``True``, the request should be processed immediatly | ||
| even for a worker since it can get into controller mode. | ||
| is_serialized : bool | ||
| `data_id` is already serialized or not. | ||
| """ | ||
| local_store = LocalObjectStore.get_instance() | ||
| # Check if data was already pushed | ||
| if not object_store.is_already_sent(data_id, dest_rank): | ||
| if not local_store.is_already_sent(data_id, dest_rank): | ||
| logger.debug("PUT LOCAL {} id to {} rank".format(data_id._id, dest_rank)) | ||
@@ -195,30 +248,78 @@ | ||
| # Push the local master data to the target worker directly | ||
| if is_serialized: | ||
| operation_data = local_store.get_serialized_data(data_id) | ||
| # Insert `data_id` to get full metadata package further | ||
| operation_data["id"] = data_id | ||
| else: | ||
| operation_data = { | ||
| "id": data_id, | ||
| "data": local_store.get(data_id), | ||
| } | ||
| operation_type = common.Operation.PUT_DATA | ||
| if object_store.is_already_serialized(data_id): | ||
| serialized_data = object_store.get_serialized_data(data_id) | ||
| h_list, _ = communication.isend_complex_operation( | ||
| mpi_state.comm, | ||
| operation_type, | ||
| serialized_data, | ||
| if is_blocking_op: | ||
| serialized_data = communication.send_complex_data( | ||
| mpi_state.global_comm, | ||
| operation_data, | ||
| dest_rank, | ||
| is_serialized=True, | ||
| is_serialized=is_serialized, | ||
| ) | ||
| else: | ||
| operation_data = { | ||
| "id": data_id, | ||
| "data": object_store.get(data_id), | ||
| } | ||
| h_list, serialized_data = communication.isend_complex_operation( | ||
| mpi_state.comm, | ||
| mpi_state.global_comm, | ||
| operation_type, | ||
| operation_data, | ||
| dest_rank, | ||
| is_serialized=False, | ||
| is_serialized=is_serialized, | ||
| ) | ||
| object_store.cache_serialized_data(data_id, serialized_data) | ||
| async_operations.extend(h_list) | ||
| async_operations.extend(h_list) | ||
| if not is_serialized or not local_store.is_already_serialized(data_id): | ||
| local_store.cache_serialized_data(data_id, serialized_data) | ||
| # Remember pushed id | ||
| object_store.cache_send_info(data_id, dest_rank) | ||
| local_store.cache_send_info(data_id, dest_rank) | ||
| def _push_shared_data(dest_rank, data_id, is_blocking_op): | ||
| """ | ||
| Send the data associated with the data ID to target rank using shared memory. | ||
| Parameters | ||
| ---------- | ||
| dest_rank : int | ||
| Target rank. | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| is_blocking_op : bool | ||
| Whether the communication should be blocking or not. | ||
| If ``True``, the request should be processed immediatly | ||
| even for a worker since it can get into controller mode. | ||
| """ | ||
| local_store = LocalObjectStore.get_instance() | ||
| # Check if data was already pushed | ||
| if not local_store.is_already_sent(data_id, dest_rank): | ||
| mpi_state = communication.MPIState.get_instance() | ||
| shared_store = SharedObjectStore.get_instance() | ||
| mpi_state = communication.MPIState.get_instance() | ||
| operation_type = common.Operation.PUT_SHARED_DATA | ||
| async_operations = AsyncOperations.get_instance() | ||
| info_package = shared_store.get_shared_info(data_id) | ||
| # wrap to dict for sending and correct deserialization of the object by the recipient | ||
| operation_data = dict(info_package) | ||
| if is_blocking_op: | ||
| communication.mpi_send_object( | ||
| mpi_state.global_comm, operation_data, dest_rank | ||
| ) | ||
| else: | ||
| h_list = communication.isend_simple_operation( | ||
| mpi_state.global_comm, | ||
| operation_type, | ||
| operation_data, | ||
| dest_rank, | ||
| ) | ||
| async_operations.extend(h_list) | ||
| local_store.cache_send_info(data_id, dest_rank) | ||
| def _push_data_owner(dest_rank, data_id): | ||
@@ -232,13 +333,14 @@ """ | ||
| Target rank. | ||
| value : unidist.core.backends.mpi.core.common.MasterDataID | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
| """ | ||
| local_store = LocalObjectStore.get_instance() | ||
| operation_type = common.Operation.PUT_OWNER | ||
| operation_data = { | ||
| "id": data_id, | ||
| "owner": object_store.get_data_owner(data_id), | ||
| "owner": local_store.get_data_owner(data_id), | ||
| } | ||
| async_operations = AsyncOperations.get_instance() | ||
| h_list = communication.isend_simple_operation( | ||
| communication.MPIState.get_instance().comm, | ||
| communication.MPIState.get_instance().global_comm, | ||
| operation_type, | ||
@@ -251,3 +353,3 @@ operation_data, | ||
| def push_data(dest_rank, value): | ||
| def push_data(dest_rank, value, is_blocking_op=False): | ||
| """ | ||
@@ -265,3 +367,10 @@ Parse and send all values to destination rank. | ||
| Arguments to be sent. | ||
| is_blocking_op : bool | ||
| Whether the communication should be blocking or not. | ||
| If ``True``, the request should be processed immediatly | ||
| even for a worker since it can get into controller mode. | ||
| """ | ||
| local_store = LocalObjectStore.get_instance() | ||
| shared_store = SharedObjectStore.get_instance() | ||
| if isinstance(value, (list, tuple)): | ||
@@ -274,7 +383,24 @@ for v in value: | ||
| elif is_data_id(value): | ||
| if object_store.contains(value): | ||
| _push_local_data(dest_rank, value) | ||
| elif object_store.contains_data_owner(value): | ||
| _push_data_owner(dest_rank, value) | ||
| data_id = value | ||
| if shared_store.contains(data_id): | ||
| _push_shared_data(dest_rank, data_id, is_blocking_op) | ||
| elif local_store.contains(data_id): | ||
| if local_store.is_already_serialized(data_id): | ||
| _push_local_data(dest_rank, data_id, is_blocking_op, is_serialized=True) | ||
| else: | ||
| data = local_store.get(data_id) | ||
| serialized_data = serialize_complex_data(data) | ||
| if shared_store.is_allocated() and shared_store.should_be_shared( | ||
| serialized_data | ||
| ): | ||
| shared_store.put(data_id, serialized_data) | ||
| _push_shared_data(dest_rank, data_id, is_blocking_op) | ||
| else: | ||
| local_store.cache_serialized_data(data_id, serialized_data) | ||
| _push_local_data( | ||
| dest_rank, data_id, is_blocking_op, is_serialized=True | ||
| ) | ||
| elif local_store.contains_data_owner(data_id): | ||
| _push_data_owner(dest_rank, data_id) | ||
| else: | ||
| raise ValueError("Unknown DataID!") |
@@ -13,4 +13,3 @@ # Copyright (C) 2021-2023 Modin authors | ||
| from unidist.core.backends.mpi.core.serialization import SimpleDataSerializer | ||
| from unidist.core.backends.mpi.core.controller.object_store import object_store | ||
| from unidist.core.backends.mpi.core.controller.common import initial_worker_number | ||
| from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore | ||
@@ -27,3 +26,3 @@ | ||
| ---------- | ||
| object_store : unidist.core.backends.mpi.executor.ObjectStore | ||
| local_store : unidist.core.backends.mpi.core.local_object_store | ||
| Reference to the local object storage. | ||
@@ -36,3 +35,3 @@ | ||
| def __init__(self, object_store): | ||
| def __init__(self, local_store): | ||
| # Cleanup frequency settings | ||
@@ -43,7 +42,7 @@ self._cleanup_counter = 1 | ||
| self._timestamp = 0 # seconds | ||
| # Cleanup list of DataIDs | ||
| # Cleanup list of tuple(owner_rank, data_number) | ||
| self._cleanup_list = [] | ||
| self._cleanup_list_threshold = 10 | ||
| # Reference to the global object store | ||
| self._object_store = object_store | ||
| self._local_store = local_store | ||
| # Task submitted counter | ||
@@ -61,7 +60,3 @@ self._task_counter = 0 | ||
| """ | ||
| logger.debug( | ||
| "Send cleanup list - {}".format( | ||
| common.unwrapped_data_ids_list(cleanup_list) | ||
| ) | ||
| ) | ||
| logger.debug(f"Send cleanup list - {cleanup_list}") | ||
| mpi_state = communication.MPIState.get_instance() | ||
@@ -71,6 +66,6 @@ # Cache serialized list of data IDs | ||
| async_operations = AsyncOperations.get_instance() | ||
| for rank_id in range(initial_worker_number, mpi_state.world_size): | ||
| if rank_id != mpi_state.rank: | ||
| for rank_id in mpi_state.workers + mpi_state.monitor_processes: | ||
| if rank_id != mpi_state.global_rank: | ||
| h_list = communication.isend_serialized_operation( | ||
| mpi_state.comm, | ||
| mpi_state.global_comm, | ||
| common.Operation.CLEANUP, | ||
@@ -100,4 +95,4 @@ s_cleanup_list, | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MasterDataID | ||
| An ID to data | ||
| data_id_metadata : tuple | ||
| Tuple of the owner rank and data number describing a ``MpiDataID``. | ||
| """ | ||
@@ -129,2 +124,5 @@ self._cleanup_list.append(data_id) | ||
| mpi_state = communication.MPIState.get_instance() | ||
| root_monitor = mpi_state.get_monitor_by_worker_rank( | ||
| communication.MPIRank.ROOT | ||
| ) | ||
| # Compare submitted and executed tasks | ||
@@ -134,9 +132,9 @@ # We use a blocking send here because we have to wait for | ||
| communication.mpi_send_operation( | ||
| mpi_state.comm, | ||
| mpi_state.global_comm, | ||
| common.Operation.GET_TASK_COUNT, | ||
| communication.MPIRank.MONITOR, | ||
| root_monitor, | ||
| ) | ||
| executed_task_counter = communication.mpi_recv_object( | ||
| mpi_state.comm, | ||
| communication.MPIRank.MONITOR, | ||
| mpi_state.global_comm, | ||
| root_monitor, | ||
| ) | ||
@@ -151,4 +149,2 @@ | ||
| self._send_cleanup_request(self._cleanup_list) | ||
| # Clear the remaining references | ||
| self._object_store.clear(self._cleanup_list) | ||
| self._cleanup_list.clear() | ||
@@ -161,2 +157,2 @@ self._cleanup_counter += 1 | ||
| garbage_collector = GarbageCollector(object_store) | ||
| garbage_collector = GarbageCollector(LocalObjectStore.get_instance()) |
@@ -125,3 +125,3 @@ # Copyright (C) 2021-2023 Modin authors | ||
| self.buffers = buffers if buffers else [] | ||
| self.buffer_count = buffer_count if buffer_count else [] | ||
| self.buffer_count = list(buffer_count) if buffer_count else [] | ||
| self._callback_counter = 0 | ||
@@ -223,2 +223,7 @@ | ||
| Returns | ||
| ------- | ||
| bytes | ||
| Serialized data. | ||
| Notes | ||
@@ -247,3 +252,7 @@ ----- | ||
| frame = pkl.loads(obj["as_bytes"], buffers=self.buffers) | ||
| del self.buffers[: self.buffer_count.pop(0)] | ||
| # check if there are out-of-band buffers | ||
| # TODO: look at this condition and get rid of it because | ||
| # `buffer_count` should always be a list with length greater 0. | ||
| if self.buffer_count: | ||
| del self.buffers[: self.buffer_count.pop(0)] | ||
| return frame | ||
@@ -262,2 +271,7 @@ else: | ||
| Returns | ||
| ------- | ||
| object | ||
| Deserialized data. | ||
| Notes | ||
@@ -347,1 +361,55 @@ ----- | ||
| return pkl.loads(data) | ||
| def serialize_complex_data(data): | ||
| """ | ||
| Serialize data to a bytearray. | ||
| Parameters | ||
| ---------- | ||
| data : object | ||
| Data to serialize. | ||
| Returns | ||
| ------- | ||
| bytes | ||
| Serialized data. | ||
| Notes | ||
| ----- | ||
| Uses msgpack, cloudpickle and pickle libraries. | ||
| """ | ||
| serializer = ComplexDataSerializer() | ||
| s_data = serializer.serialize(data) | ||
| serialized_data = { | ||
| "s_data": s_data, | ||
| "raw_buffers": serializer.buffers, | ||
| "buffer_count": serializer.buffer_count, | ||
| } | ||
| return serialized_data | ||
| def deserialize_complex_data(s_data, raw_buffers, buffer_count): | ||
| """ | ||
| Deserialize data based on passed in information. | ||
| Parameters | ||
| ---------- | ||
| s_data : bytearray | ||
| Serialized msgpack data. | ||
| raw_buffers : list | ||
| A list of ``PickleBuffer`` objects for data decoding. | ||
| buffer_count : list | ||
| List of the number of buffers for each object | ||
| to be deserialized using the pickle 5 protocol. | ||
| Returns | ||
| ------- | ||
| object | ||
| Deserialized data. | ||
| Notes | ||
| ----- | ||
| Uses msgpack, cloudpickle and pickle libraries. | ||
| """ | ||
| deserializer = ComplexDataSerializer(raw_buffers, buffer_count) | ||
| return deserializer.deserialize(s_data) |
@@ -6,2 +6,3 @@ # Copyright (C) 2021-2023 Modin authors | ||
| """Worker MPI process task processing functionality.""" | ||
| import asyncio | ||
@@ -19,6 +20,8 @@ from functools import wraps, partial | ||
| import unidist.core.backends.mpi.core.communication as communication | ||
| from unidist.core.backends.mpi.core.worker.object_store import ObjectStore | ||
| from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore | ||
| from unidist.core.backends.mpi.core.worker.request_store import RequestStore | ||
| from unidist.core.backends.mpi.core.worker.task_store import TaskStore | ||
| from unidist.core.backends.mpi.core.async_operations import AsyncOperations | ||
| from unidist.core.backends.mpi.core.controller.common import pull_data | ||
| from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore | ||
@@ -33,3 +36,3 @@ # TODO: Find a way to move this after all imports | ||
| # we use the condition to set "worker_0.log" in order to build it succesfully. | ||
| logger_name = "worker_{}".format(mpi_state.rank if mpi_state is not None else 0) | ||
| logger_name = "worker_{}".format(mpi_state.global_rank if mpi_state is not None else 0) | ||
| log_file = "{}.log".format(logger_name) | ||
@@ -88,6 +91,9 @@ w_logger = common.get_logger(logger_name, log_file) | ||
| task_store = TaskStore.get_instance() | ||
| object_store = ObjectStore.get_instance() | ||
| local_store = LocalObjectStore.get_instance() | ||
| request_store = RequestStore.get_instance() | ||
| async_operations = AsyncOperations.get_instance() | ||
| ready_to_shutdown_posted = False | ||
| # Barrier to check if worker process is ready to start the communication loop | ||
| mpi_state.global_comm.Barrier() | ||
| w_logger.debug("Worker loop started") | ||
| # Once the worker receives the cancel signal from ``Root`` rank, | ||
@@ -98,2 +104,3 @@ # it is getting to shutdown. All pending requests and communications are cancelled, | ||
| # ``Monitor` sends the shutdown signal to every worker so they can exit the loop. | ||
| ready_to_shutdown_posted = False | ||
| while True: | ||
@@ -103,8 +110,10 @@ # Listen receive operation from any source | ||
| communication.mpi_recv_operation | ||
| )(mpi_state.comm) | ||
| w_logger.debug("common.Operation processing - {}".format(operation_type)) | ||
| )(mpi_state.global_comm) | ||
| w_logger.debug( | ||
| f"common.Operation processing - {operation_type} from {source_rank} rank" | ||
| ) | ||
| # Proceed the request | ||
| if operation_type == common.Operation.EXECUTE: | ||
| request = communication.recv_complex_data(mpi_state.comm, source_rank) | ||
| request = pull_data(mpi_state.global_comm, source_rank) | ||
| if not ready_to_shutdown_posted: | ||
@@ -120,5 +129,4 @@ # Execute the task if possible | ||
| elif operation_type == common.Operation.GET: | ||
| request = communication.mpi_recv_object(mpi_state.comm, source_rank) | ||
| if not ready_to_shutdown_posted: | ||
| request["id"] = object_store.get_unique_data_id(request["id"]) | ||
| request = communication.mpi_recv_object(mpi_state.global_comm, source_rank) | ||
| if request is not None and not ready_to_shutdown_posted: | ||
| request_store.process_get_request( | ||
@@ -129,3 +137,3 @@ request["source"], request["id"], request["is_blocking_op"] | ||
| elif operation_type == common.Operation.PUT_DATA: | ||
| request = communication.recv_complex_data(mpi_state.comm, source_rank) | ||
| request = pull_data(mpi_state.global_comm, source_rank) | ||
| if not ready_to_shutdown_posted: | ||
@@ -137,4 +145,3 @@ w_logger.debug( | ||
| ) | ||
| request["id"] = object_store.get_unique_data_id(request["id"]) | ||
| object_store.put(request["id"], request["data"]) | ||
| local_store.put(request["id"], request["data"]) | ||
@@ -150,6 +157,5 @@ # Discard data request to another worker, if data has become available | ||
| elif operation_type == common.Operation.PUT_OWNER: | ||
| request = communication.mpi_recv_object(mpi_state.comm, source_rank) | ||
| request = communication.mpi_recv_object(mpi_state.global_comm, source_rank) | ||
| if not ready_to_shutdown_posted: | ||
| request["id"] = object_store.get_unique_data_id(request["id"]) | ||
| object_store.put_data_owner(request["id"], request["owner"]) | ||
| local_store.put_data_owner(request["id"], request["owner"]) | ||
@@ -162,11 +168,21 @@ w_logger.debug( | ||
| elif operation_type == common.Operation.PUT_SHARED_DATA: | ||
| result = pull_data(mpi_state.global_comm, source_rank) | ||
| # Clear cached request to another worker, if data_id became available | ||
| request_store.discard_data_request(result["id"]) | ||
| # Check pending requests. Maybe some data became available. | ||
| task_store.check_pending_tasks() | ||
| # Check pending actor requests also. | ||
| task_store.check_pending_actor_tasks() | ||
| elif operation_type == common.Operation.WAIT: | ||
| request = communication.mpi_recv_object(mpi_state.comm, source_rank) | ||
| request = communication.mpi_recv_object(mpi_state.global_comm, source_rank) | ||
| if not ready_to_shutdown_posted: | ||
| w_logger.debug("WAIT for {} id".format(request["id"]._id)) | ||
| request["id"] = object_store.get_unique_data_id(request["id"]) | ||
| request_store.process_wait_request(request["id"]) | ||
| elif operation_type == common.Operation.ACTOR_CREATE: | ||
| request = communication.recv_complex_data(mpi_state.comm, source_rank) | ||
| request = pull_data(mpi_state.global_comm, source_rank) | ||
| if not ready_to_shutdown_posted: | ||
@@ -180,6 +196,7 @@ cls = request["class"] | ||
| elif operation_type == common.Operation.ACTOR_EXECUTE: | ||
| request = communication.recv_complex_data(mpi_state.comm, source_rank) | ||
| request = pull_data(mpi_state.global_comm, source_rank) | ||
| if not ready_to_shutdown_posted: | ||
| # Prepare the data | ||
| method_name = request["task"] | ||
| # Actor method here is a data id so we have to retrieve it from the storage | ||
| method_name = local_store.get(request["task"]) | ||
| handler = request["handler"] | ||
@@ -199,5 +216,6 @@ actor_method = getattr(actor_map[handler], method_name) | ||
| cleanup_list = communication.recv_serialized_data( | ||
| mpi_state.comm, source_rank | ||
| mpi_state.global_comm, source_rank | ||
| ) | ||
| object_store.clear(cleanup_list) | ||
| cleanup_list = [common.MpiDataID(*tpl) for tpl in cleanup_list] | ||
| local_store.clear(cleanup_list) | ||
@@ -211,5 +229,5 @@ elif operation_type == common.Operation.CANCEL: | ||
| communication.mpi_send_operation( | ||
| mpi_state.comm, | ||
| mpi_state.global_comm, | ||
| common.Operation.READY_TO_SHUTDOWN, | ||
| communication.MPIRank.MONITOR, | ||
| mpi_state.get_monitor_by_worker_rank(communication.MPIRank.ROOT), | ||
| ) | ||
@@ -219,2 +237,3 @@ ready_to_shutdown_posted = True | ||
| w_logger.debug("Exit worker event loop") | ||
| SharedObjectStore.get_instance().finalize() | ||
| if not MPI.Is_finalized(): | ||
@@ -221,0 +240,0 @@ MPI.Finalize() |
@@ -9,4 +9,4 @@ # Copyright (C) 2021-2023 Modin authors | ||
| import unidist.core.backends.mpi.core.communication as communication | ||
| from unidist.core.backends.mpi.core.async_operations import AsyncOperations | ||
| from unidist.core.backends.mpi.core.worker.object_store import ObjectStore | ||
| from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore | ||
| from unidist.core.backends.mpi.core.controller.common import push_data | ||
@@ -18,3 +18,3 @@ | ||
| # we use the condition to set "worker_0.log" in order to build it succesfully. | ||
| logger_name = "worker_{}".format(mpi_state.rank if mpi_state is not None else 0) | ||
| logger_name = "worker_{}".format(mpi_state.global_rank if mpi_state is not None else 0) | ||
| log_file = "{}.log".format(logger_name) | ||
@@ -49,7 +49,7 @@ logger = common.get_logger(logger_name, log_file) | ||
| def __init__(self): | ||
| # Non-blocking get requests {DataId : [ Set of Ranks ]} | ||
| # Non-blocking get requests {DataID : [ Set of Ranks ]} | ||
| self._nonblocking_get_requests = defaultdict(set) | ||
| # Blocking get requests {DataId : [ Set of Ranks ]} | ||
| # Blocking get requests {DataID : [ Set of Ranks ]} | ||
| self._blocking_get_requests = defaultdict(set) | ||
| # Blocking wait requests {DataId : Rank} | ||
| # Blocking wait requests {DataID : Rank} | ||
| self._blocking_wait_requests = {} | ||
@@ -78,3 +78,3 @@ # Data requests | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
@@ -108,3 +108,3 @@ rank : int | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
@@ -125,3 +125,3 @@ | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID to data. | ||
@@ -148,3 +148,3 @@ """ | ||
| ---------- | ||
| data_id : iterable or unidist.core.backends.common.data_id.DataID | ||
| data_id : iterable or unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID or list of IDs to data. | ||
@@ -184,3 +184,3 @@ """ | ||
| ---------- | ||
| data_id : iterable or unidist.core.backends.common.data_id.DataID | ||
| data_id : iterable or unidist.core.backends.mpi.core.common.MpiDataID | ||
| An ID or list of IDs to data. | ||
@@ -194,3 +194,3 @@ """ | ||
| communication.mpi_send_object( | ||
| communication.MPIState.get_instance().comm, | ||
| communication.MPIState.get_instance().global_comm, | ||
| data_id, | ||
@@ -204,3 +204,3 @@ communication.MPIRank.ROOT, | ||
| communication.mpi_send_object( | ||
| communication.MPIState.get_instance().comm, | ||
| communication.MPIState.get_instance().global_comm, | ||
| data_ids, | ||
@@ -219,3 +219,3 @@ communication.MPIRank.ROOT, | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| Chech if `data_id` is available in object store. | ||
@@ -227,7 +227,7 @@ | ||
| """ | ||
| if ObjectStore.get_instance().contains(data_id): | ||
| if LocalObjectStore.get_instance().contains(data_id): | ||
| # Executor wait just for signal | ||
| # We use a blocking send here because the receiver is waiting for the result. | ||
| communication.mpi_send_object( | ||
| communication.MPIState.get_instance().comm, | ||
| communication.MPIState.get_instance().global_comm, | ||
| data_id, | ||
@@ -251,3 +251,3 @@ communication.MPIRank.ROOT, | ||
| Rank number to send data to. | ||
| data_id: unidist.core.backends.common.data_id.DataID | ||
| data_id: unidist.core.backends.mpi.core.common.MpiDataID | ||
| `data_id` associated data to request | ||
@@ -263,44 +263,10 @@ is_blocking_op : bool, default: False | ||
| """ | ||
| object_store = ObjectStore.get_instance() | ||
| async_operations = AsyncOperations.get_instance() | ||
| if object_store.contains(data_id): | ||
| if source_rank == communication.MPIRank.ROOT or is_blocking_op: | ||
| # The controller or a requesting worker is blocked by the request | ||
| # which should be processed immediatly | ||
| operation_data = object_store.get(data_id) | ||
| # We use a blocking send here because the receiver is waiting for the result. | ||
| communication.send_complex_data( | ||
| mpi_state.comm, | ||
| operation_data, | ||
| source_rank, | ||
| ) | ||
| else: | ||
| operation_type = common.Operation.PUT_DATA | ||
| if object_store.is_already_serialized(data_id): | ||
| operation_data = object_store.get_serialized_data(data_id) | ||
| # Async send to avoid possible dead-lock between workers | ||
| h_list, _ = communication.isend_complex_operation( | ||
| mpi_state.comm, | ||
| operation_type, | ||
| operation_data, | ||
| source_rank, | ||
| is_serialized=True, | ||
| ) | ||
| async_operations.extend(h_list) | ||
| else: | ||
| operation_data = { | ||
| "id": data_id, | ||
| "data": object_store.get(data_id), | ||
| } | ||
| # Async send to avoid possible dead-lock between workers | ||
| h_list, serialized_data = communication.isend_complex_operation( | ||
| mpi_state.comm, | ||
| operation_type, | ||
| operation_data, | ||
| source_rank, | ||
| is_serialized=False, | ||
| ) | ||
| async_operations.extend(h_list) | ||
| object_store.cache_serialized_data(data_id, serialized_data) | ||
| local_store = LocalObjectStore.get_instance() | ||
| if local_store.contains(data_id): | ||
| push_data( | ||
| source_rank, | ||
| data_id, | ||
| is_blocking_op=source_rank == communication.MPIRank.ROOT | ||
| or is_blocking_op, | ||
| ) | ||
| logger.debug( | ||
@@ -307,0 +273,0 @@ "Send requested {} id to {} rank - PROCESSED".format( |
@@ -14,3 +14,5 @@ # Copyright (C) 2021-2023 Modin authors | ||
| from unidist.core.backends.mpi.core.async_operations import AsyncOperations | ||
| from unidist.core.backends.mpi.core.worker.object_store import ObjectStore | ||
| from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore | ||
| from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore | ||
| from unidist.core.backends.mpi.core.serialization import serialize_complex_data | ||
| from unidist.core.backends.mpi.core.worker.request_store import RequestStore | ||
@@ -22,3 +24,3 @@ | ||
| # we use the condition to set "worker_0.log" in order to build it succesfully. | ||
| logger_name = "worker_{}".format(mpi_state.rank if mpi_state is not None else 0) | ||
| logger_name = "worker_{}".format(mpi_state.global_rank if mpi_state is not None else 0) | ||
| log_file = "{}.log".format(logger_name) | ||
@@ -138,3 +140,3 @@ w_logger = common.get_logger(logger_name, log_file) | ||
| Rank number to request data from. | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| data_id : unidist.core.backends.mpi.core.common.MpiDataID | ||
| `data_id` associated data to request. | ||
@@ -152,3 +154,3 @@ | ||
| operation_data = { | ||
| "source": communication.MPIState.get_instance().rank, | ||
| "source": communication.MPIState.get_instance().global_rank, | ||
| "id": data_id, | ||
@@ -159,3 +161,3 @@ "is_blocking_op": False, | ||
| h_list = communication.isend_simple_operation( | ||
| communication.MPIState.get_instance().comm, | ||
| communication.MPIState.get_instance().global_comm, | ||
| operation_type, | ||
@@ -178,3 +180,3 @@ operation_data, | ||
| ---------- | ||
| arg : object or unidist.core.backends.common.data_id.DataID | ||
| arg : object or unidist.core.backends.mpi.core.common.MpiDataID | ||
| Data ID or object to inspect. | ||
@@ -193,13 +195,12 @@ | ||
| if is_data_id(arg): | ||
| object_store = ObjectStore.get_instance() | ||
| arg = object_store.get_unique_data_id(arg) | ||
| if object_store.contains(arg): | ||
| value = ObjectStore.get_instance().get(arg) | ||
| local_store = LocalObjectStore.get_instance() | ||
| if local_store.contains(arg): | ||
| value = LocalObjectStore.get_instance().get(arg) | ||
| # Data is already local or was pushed from master | ||
| return value, False | ||
| elif object_store.contains_data_owner(arg): | ||
| elif local_store.contains_data_owner(arg): | ||
| if not RequestStore.get_instance().is_data_already_requested(arg): | ||
| # Request the data from an owner worker | ||
| owner_rank = object_store.get_data_owner(arg) | ||
| if owner_rank != communication.MPIState.get_instance().rank: | ||
| owner_rank = local_store.get_data_owner(arg) | ||
| if owner_rank != communication.MPIState.get_instance().global_rank: | ||
| self.request_worker_data(owner_rank, arg) | ||
@@ -218,3 +219,3 @@ return arg, True | ||
| ---------- | ||
| output_data_ids : list of unidist.core.backends.common.data_id.DataID | ||
| output_data_ids : list of unidist.core.backends.mpi.core.common.MpiDataID | ||
| A list of output data IDs to store the results in local object store. | ||
@@ -232,4 +233,8 @@ task : callable | ||
| """ | ||
| object_store = ObjectStore.get_instance() | ||
| local_store = LocalObjectStore.get_instance() | ||
| shared_store = SharedObjectStore.get_instance() | ||
| completed_data_ids = [] | ||
| # Note that if a task is coroutine, | ||
| # the local store or the shared store will contain output data | ||
| # only once the task is complete. | ||
| if inspect.iscoroutinefunction(task): | ||
@@ -267,7 +272,5 @@ | ||
| for output_id in output_data_ids: | ||
| data_id = object_store.get_unique_data_id(output_id) | ||
| object_store.put(data_id, e) | ||
| local_store.put(output_id, e) | ||
| else: | ||
| data_id = object_store.get_unique_data_id(output_data_ids) | ||
| object_store.put(data_id, e) | ||
| local_store.put(output_data_ids, e) | ||
| else: | ||
@@ -283,9 +286,27 @@ if output_data_ids is not None: | ||
| ): | ||
| data_id = object_store.get_unique_data_id(output_id) | ||
| object_store.put(data_id, value) | ||
| completed_data_ids[idx] = data_id | ||
| serialized_data = serialize_complex_data(value) | ||
| local_store.put(output_id, value) | ||
| if ( | ||
| shared_store.is_allocated() | ||
| and shared_store.should_be_shared(serialized_data) | ||
| ): | ||
| shared_store.put(output_id, serialized_data) | ||
| else: | ||
| local_store.cache_serialized_data( | ||
| output_id, serialized_data | ||
| ) | ||
| completed_data_ids[idx] = output_id | ||
| else: | ||
| data_id = object_store.get_unique_data_id(output_data_ids) | ||
| object_store.put(data_id, output_values) | ||
| completed_data_ids = [data_id] | ||
| serialized_data = serialize_complex_data(output_values) | ||
| local_store.put(output_data_ids, output_values) | ||
| if ( | ||
| shared_store.is_allocated() | ||
| and shared_store.should_be_shared(serialized_data) | ||
| ): | ||
| shared_store.put(output_data_ids, serialized_data) | ||
| else: | ||
| local_store.cache_serialized_data( | ||
| output_data_ids, serialized_data | ||
| ) | ||
| completed_data_ids = [output_data_ids] | ||
@@ -296,7 +317,10 @@ RequestStore.get_instance().check_pending_get_requests(output_data_ids) | ||
| # completion of the communication, which is necessary for the pipeline to continue. | ||
| root_monitor = mpi_state.get_monitor_by_worker_rank( | ||
| communication.MPIRank.ROOT | ||
| ) | ||
| communication.send_simple_operation( | ||
| communication.MPIState.get_instance().comm, | ||
| communication.MPIState.get_instance().global_comm, | ||
| common.Operation.TASK_DONE, | ||
| completed_data_ids, | ||
| communication.MPIRank.MONITOR, | ||
| root_monitor, | ||
| ) | ||
@@ -340,7 +364,5 @@ | ||
| for output_id in output_data_ids: | ||
| data_id = object_store.get_unique_data_id(output_id) | ||
| object_store.put(data_id, e) | ||
| local_store.put(output_id, e) | ||
| else: | ||
| data_id = object_store.get_unique_data_id(output_data_ids) | ||
| object_store.put(data_id, e) | ||
| local_store.put(output_data_ids, e) | ||
| else: | ||
@@ -356,9 +378,27 @@ if output_data_ids is not None: | ||
| ): | ||
| data_id = object_store.get_unique_data_id(output_id) | ||
| object_store.put(data_id, value) | ||
| completed_data_ids[idx] = data_id | ||
| serialized_data = serialize_complex_data(value) | ||
| local_store.put(output_id, value) | ||
| if ( | ||
| shared_store.is_allocated() | ||
| and shared_store.should_be_shared(serialized_data) | ||
| ): | ||
| shared_store.put(output_id, serialized_data) | ||
| else: | ||
| local_store.cache_serialized_data( | ||
| output_id, serialized_data | ||
| ) | ||
| completed_data_ids[idx] = output_id | ||
| else: | ||
| data_id = object_store.get_unique_data_id(output_data_ids) | ||
| object_store.put(data_id, output_values) | ||
| completed_data_ids = [data_id] | ||
| serialized_data = serialize_complex_data(output_values) | ||
| local_store.put(output_data_ids, output_values) | ||
| if ( | ||
| shared_store.is_allocated() | ||
| and shared_store.should_be_shared(serialized_data) | ||
| ): | ||
| shared_store.put(output_data_ids, serialized_data) | ||
| else: | ||
| local_store.cache_serialized_data( | ||
| output_data_ids, serialized_data | ||
| ) | ||
| completed_data_ids = [output_data_ids] | ||
| RequestStore.get_instance().check_pending_get_requests(output_data_ids) | ||
@@ -368,7 +408,10 @@ # Monitor the task execution. | ||
| # completion of the communication, which is necessary for the pipeline to continue. | ||
| root_monitor = mpi_state.get_monitor_by_worker_rank( | ||
| communication.MPIRank.ROOT | ||
| ) | ||
| communication.send_simple_operation( | ||
| communication.MPIState.get_instance().comm, | ||
| communication.MPIState.get_instance().global_comm, | ||
| common.Operation.TASK_DONE, | ||
| completed_data_ids, | ||
| communication.MPIRank.MONITOR, | ||
| root_monitor, | ||
| ) | ||
@@ -393,3 +436,8 @@ | ||
| # Parse request | ||
| local_store = LocalObjectStore.get_instance() | ||
| task = request["task"] | ||
| # Remote function here is a data id so we have to retrieve it from the storage, | ||
| # whereas actor method is already materialized in the worker loop. | ||
| if is_data_id(task): | ||
| task = local_store.get(task) | ||
| args = request["args"] | ||
@@ -396,0 +444,0 @@ kwargs = request["kwargs"] |
@@ -8,2 +8,4 @@ # Copyright (C) 2021-2023 Modin authors | ||
| import unidist.core.backends.mpi.core as mpi | ||
| from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore | ||
| from unidist.core.backends.common.data_id import is_data_id | ||
| from unidist.core.backends.common.utils import unwrap_object_refs | ||
@@ -32,2 +34,3 @@ from unidist.core.base.object_ref import ObjectRef | ||
| self._remote_function = function | ||
| self._remote_function_orig = function | ||
| self._num_cpus = num_cpus | ||
@@ -76,2 +79,11 @@ self._num_returns = 1 if num_returns is None else num_returns | ||
| if not is_data_id(self._remote_function): | ||
| self._remote_function = mpi.put(self._remote_function_orig) | ||
| else: | ||
| # When a worker calls a remote function inside another remote function, | ||
| # we have to again serialize the former remote function and put it into the storage | ||
| # for further correct communication. | ||
| if not LocalObjectStore.get_instance().contains(self._remote_function): | ||
| self._remote_function = mpi.put(self._remote_function_orig) | ||
| data_ids = mpi.submit( | ||
@@ -78,0 +90,0 @@ self._remote_function, |
@@ -13,1 +13,26 @@ # Copyright (C) 2021-2023 Modin authors | ||
| init() | ||
| class ImmutableDict(dict): | ||
| __readonly_exception = TypeError("Cannot modify `ImmutableDict`") | ||
| def __setitem__(self, *args, **kwargs): | ||
| raise ImmutableDict.__readonly_exception | ||
| def __delitem__(self, *args, **kwargs): | ||
| raise ImmutableDict.__readonly_exception | ||
| def pop(self, *args, **kwargs): | ||
| raise ImmutableDict.__readonly_exception | ||
| def popitem(self, *args, **kwargs): | ||
| raise ImmutableDict.__readonly_exception | ||
| def clear(self, *args, **kwargs): | ||
| raise ImmutableDict.__readonly_exception | ||
| def update(self, *args, **kwargs): | ||
| raise ImmutableDict.__readonly_exception | ||
| def setdefault(self, *args, **kwargs): | ||
| raise ImmutableDict.__readonly_exception |
@@ -57,3 +57,12 @@ # Copyright (C) 2021-2023 Modin authors | ||
| # test for https://github.com/modin-project/unidist/issues/354 | ||
| object_refs = [unidist.put(1), unidist.put(2)] | ||
| ready, not_ready = unidist.wait(object_refs, num_returns=1) | ||
| assert_equal(len(ready), 1) | ||
| assert_equal(len(not_ready), 1) | ||
| ready, not_ready = unidist.wait(object_refs, num_returns=2) | ||
| assert_equal(len(ready), 2) | ||
| assert_equal(len(not_ready), 0) | ||
| def test_get_ip(): | ||
@@ -60,0 +69,0 @@ import socket |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """`ObjectStore` functionality.""" | ||
| import weakref | ||
| from collections import defaultdict | ||
| import unidist.core.backends.mpi.core.common as common | ||
| import unidist.core.backends.mpi.core.communication as communication | ||
| class ObjectStore: | ||
| """ | ||
| Class that stores objects and provides access to these from master process. | ||
| Notes | ||
| ----- | ||
| Currently, the storage is local to the current process only. | ||
| """ | ||
| __instance = None | ||
| def __init__(self): | ||
| # Add local data {DataID : Data} | ||
| self._data_map = weakref.WeakKeyDictionary() | ||
| # Data owner {DataID : Rank} | ||
| self._data_owner_map = weakref.WeakKeyDictionary() | ||
| # Data was already sent to this ranks {DataID : [ranks]} | ||
| self._sent_data_map = defaultdict(set) | ||
| # Data id generator | ||
| self._data_id_counter = 0 | ||
| # Data serialized cache | ||
| self._serialization_cache = {} | ||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get instance of ``ObjectStore``. | ||
| Returns | ||
| ------- | ||
| ObjectStore | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = ObjectStore() | ||
| return cls.__instance | ||
| def put(self, data_id, data): | ||
| """ | ||
| Put data to internal dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MasterDataID | ||
| An ID to data. | ||
| data : object | ||
| Data to be put. | ||
| """ | ||
| self._data_map[data_id] = data | ||
| def put_data_owner(self, data_id, rank): | ||
| """ | ||
| Put data location (owner rank) to internal dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MasterDataID | ||
| An ID to data. | ||
| rank : int | ||
| Rank number where the data resides. | ||
| """ | ||
| self._data_owner_map[data_id] = rank | ||
| def get(self, data_id): | ||
| """ | ||
| Get the data from a local dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MasterDataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| object | ||
| Return local data associated with `data_id`. | ||
| """ | ||
| return self._data_map[data_id] | ||
| def get_data_owner(self, data_id): | ||
| """ | ||
| Get the data owner rank. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MasterDataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| int | ||
| Rank number where the data resides. | ||
| """ | ||
| return self._data_owner_map[data_id] | ||
| def contains(self, data_id): | ||
| """ | ||
| Check if the data associated with `data_id` exists in a local dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MasterDataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| bool | ||
| Return the status if an object exist in local dictionary. | ||
| """ | ||
| return data_id in self._data_map | ||
| def contains_data_owner(self, data_id): | ||
| """ | ||
| Check if the data location info associated with `data_id` exists in a local dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MasterDataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| bool | ||
| Return the status if an object location is known. | ||
| """ | ||
| return data_id in self._data_owner_map | ||
| def clear(self, cleanup_list): | ||
| """ | ||
| Clear all local dictionary data ID instances from `cleanup_list`. | ||
| Parameters | ||
| ---------- | ||
| cleanup_list : list | ||
| List of ``DataID``-s. | ||
| Notes | ||
| ----- | ||
| Only cache of sent data can be cleared - the rest are weakreferenced. | ||
| """ | ||
| for data_id in cleanup_list: | ||
| self._sent_data_map.pop(data_id, None) | ||
| self._serialization_cache.clear() | ||
| def generate_data_id(self, gc): | ||
| """ | ||
| Generate unique ``MasterDataID`` instance. | ||
| Parameters | ||
| ---------- | ||
| gc : unidist.core.backends.mpi.core.executor.GarbageCollector | ||
| Local garbage collector reference. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.mpi.core.common.MasterDataID | ||
| Unique data ID instance. | ||
| """ | ||
| data_id = f"rank_{communication.MPIState.get_instance().rank}_id_{self._data_id_counter}" | ||
| self._data_id_counter += 1 | ||
| return common.MasterDataID(data_id, gc) | ||
| def generate_output_data_id(self, dest_rank, gc, num_returns=1): | ||
| """ | ||
| Generate unique list of ``unidist.core.backends.mpi.core.common.MasterDataID`` instance. | ||
| Parameters | ||
| ---------- | ||
| dest_rank : int | ||
| Ranks number where generated list will be located. | ||
| gc : unidist.core.backends.mpi.core.executor.GarbageCollector | ||
| Local garbage collector reference. | ||
| num_returns : int, default: 1 | ||
| Generated list size. | ||
| Returns | ||
| ------- | ||
| list | ||
| A list of unique ``MasterDataID`` instances. | ||
| """ | ||
| if num_returns == 1: | ||
| output_ids = self.generate_data_id(gc) | ||
| self.put_data_owner(output_ids, dest_rank) | ||
| elif num_returns == 0: | ||
| output_ids = None | ||
| else: | ||
| output_ids = [None] * num_returns | ||
| for i in range(num_returns): | ||
| output_id = self.generate_data_id(gc) | ||
| output_ids[i] = output_id | ||
| self.put_data_owner(output_id, dest_rank) | ||
| return output_ids | ||
| def cache_send_info(self, data_id, rank): | ||
| """ | ||
| Save communication event for this `data_id` and rank. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MasterDataID | ||
| An ``ID`` to data. | ||
| rank : int | ||
| Rank number where the data was sent. | ||
| """ | ||
| self._sent_data_map[data_id].add(rank) | ||
| def is_already_sent(self, data_id, rank): | ||
| """ | ||
| Check if communication event on this `data_id` and rank happened. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MasterDataID | ||
| An ID to data | ||
| rank : int | ||
| Rank number to check. | ||
| Returns | ||
| ------- | ||
| bool | ||
| ``True`` if communication event already happened. | ||
| """ | ||
| return (data_id in self._sent_data_map) and ( | ||
| rank in self._sent_data_map[data_id] | ||
| ) | ||
| def cache_serialized_data(self, data_id, data): | ||
| """ | ||
| Save communication event for this `data_id` and `data`. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.mpi.core.common.MasterDataID | ||
| An ID to data. | ||
| data : object | ||
| Serialized data to cache. | ||
| """ | ||
| self._serialization_cache[data_id] = data | ||
| def is_already_serialized(self, data_id): | ||
| """ | ||
| Check if the data on this `data_id` is already serialized. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| bool | ||
| ``True`` if the data is already serialized. | ||
| """ | ||
| return data_id in self._serialization_cache | ||
| def get_serialized_data(self, data_id): | ||
| """ | ||
| Get serialized data on this `data_id`. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| object | ||
| Cached serialized data associated with `data_id`. | ||
| """ | ||
| return self._serialization_cache[data_id] | ||
| object_store = ObjectStore.get_instance() |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Monitoring process.""" | ||
| try: | ||
| import mpi4py | ||
| except ImportError: | ||
| raise ImportError( | ||
| "Missing dependency 'mpi4py'. Use pip or conda to install it." | ||
| ) from None | ||
| import unidist.core.backends.mpi.core.common as common | ||
| import unidist.core.backends.mpi.core.communication as communication | ||
| # TODO: Find a way to move this after all imports | ||
| mpi4py.rc(recv_mprobe=False, initialize=False) | ||
| from mpi4py import MPI # noqa: E402 | ||
| class TaskCounter: | ||
| __instance = None | ||
| def __init__(self): | ||
| self.task_counter = 0 | ||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get instance of ``TaskCounter``. | ||
| Returns | ||
| ------- | ||
| TaskCounter | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = TaskCounter() | ||
| return cls.__instance | ||
| def increment(self): | ||
| """Increment task counter by one.""" | ||
| self.task_counter += 1 | ||
| class DataIDTracker: | ||
| """ | ||
| Class that keeps track of all completed (ready) data IDs. | ||
| """ | ||
| __instance = None | ||
| def __init__(self): | ||
| self.completed_data_ids = set() | ||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get instance of ``DataIDTracker``. | ||
| Returns | ||
| ------- | ||
| DataIDTracker | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = DataIDTracker() | ||
| return cls.__instance | ||
| def add_to_completed(self, data_ids): | ||
| """ | ||
| Add the given data IDs to the set of completed (ready) data IDs. | ||
| Parameters | ||
| ---------- | ||
| data_ids : list | ||
| List of data IDs to be added to the set of completed (ready) data IDs. | ||
| """ | ||
| self.completed_data_ids.update(data_ids) | ||
| class WaitHandler: | ||
| """ | ||
| Class that handles wait requests. | ||
| """ | ||
| __instance = None | ||
| def __init__(self): | ||
| self.completed_data_ids = set() | ||
| self.awaited_data_ids = [] | ||
| self.ready = [] | ||
| self.num_returns = 0 | ||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get instance of ``WaitHandler``. | ||
| Returns | ||
| ------- | ||
| WaitHandler | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = WaitHandler() | ||
| return cls.__instance | ||
| def add_wait_request(self, awaited_data_ids, num_returns): | ||
| """ | ||
| Add a wait request for a list of data IDs and the number of data IDs to be awaited. | ||
| Parameters | ||
| ---------- | ||
| awaited_data_ids : list | ||
| List of data IDs to be awaited. | ||
| num_returns : int | ||
| The number of ``DataID``-s that should be returned as ready. | ||
| """ | ||
| self.awaited_data_ids = awaited_data_ids | ||
| self.num_returns = num_returns | ||
| def process_wait_requests(self): | ||
| """ | ||
| Check if wait requests are pending to be processed. | ||
| Process pending wait requests and send the data_ids to the requester | ||
| if number of data IDs that are ready are equal to the num_returns. | ||
| """ | ||
| data_id_tracker = DataIDTracker.get_instance() | ||
| i = 0 | ||
| if self.awaited_data_ids: | ||
| while i < len(self.awaited_data_ids): | ||
| data_id = self.awaited_data_ids[i] | ||
| if data_id in data_id_tracker.completed_data_ids: | ||
| self.ready.append(data_id) | ||
| self.awaited_data_ids.remove(data_id) | ||
| if len(self.ready) == self.num_returns: | ||
| operation_data = { | ||
| "ready": self.ready, | ||
| "not_ready": self.awaited_data_ids, | ||
| } | ||
| communication.mpi_send_object( | ||
| communication.MPIState.get_instance().comm, | ||
| operation_data, | ||
| communication.MPIRank.ROOT, | ||
| ) | ||
| self.ready = [] | ||
| self.awaited_data_ids = [] | ||
| else: | ||
| i += 1 | ||
| def monitor_loop(): | ||
| """ | ||
| Infinite monitor operations processing loop. | ||
| Tracks the number of executed tasks and completed (ready) data IDs. | ||
| Notes | ||
| ----- | ||
| The loop exits on special cancelation operation. | ||
| ``unidist.core.backends.mpi.core.common.Operations`` defines a set of supported operations. | ||
| """ | ||
| task_counter = TaskCounter.get_instance() | ||
| mpi_state = communication.MPIState.get_instance() | ||
| wait_handler = WaitHandler.get_instance() | ||
| data_id_tracker = DataIDTracker.get_instance() | ||
| workers_ready_to_shutdown = [] | ||
| shutdown_workers = False | ||
| # Once all workers excluding ``Root`` and ``Monitor`` ranks are ready to shutdown, | ||
| # ``Monitor` sends the shutdown signal to every worker, as well as notifies ``Root`` that | ||
| # it can exit the program. | ||
| while True: | ||
| # Listen receive operation from any source | ||
| operation_type, source_rank = communication.mpi_recv_operation(mpi_state.comm) | ||
| # Proceed the request | ||
| if operation_type == common.Operation.TASK_DONE: | ||
| task_counter.increment() | ||
| output_data_ids = communication.mpi_recv_object(mpi_state.comm, source_rank) | ||
| data_id_tracker.add_to_completed(output_data_ids) | ||
| wait_handler.process_wait_requests() | ||
| elif operation_type == common.Operation.WAIT: | ||
| # TODO: WAIT request can be received from several workers, | ||
| # but not only from master. Handle this case when requested. | ||
| operation_data = communication.mpi_recv_object(mpi_state.comm, source_rank) | ||
| awaited_data_ids = operation_data["data_ids"] | ||
| num_returns = operation_data["num_returns"] | ||
| wait_handler.add_wait_request(awaited_data_ids, num_returns) | ||
| wait_handler.process_wait_requests() | ||
| elif operation_type == common.Operation.GET_TASK_COUNT: | ||
| # We use a blocking send here because the receiver is waiting for the result. | ||
| communication.mpi_send_object( | ||
| mpi_state.comm, | ||
| task_counter.task_counter, | ||
| source_rank, | ||
| ) | ||
| elif operation_type == common.Operation.READY_TO_SHUTDOWN: | ||
| workers_ready_to_shutdown.append(source_rank) | ||
| shutdown_workers = ( | ||
| len(workers_ready_to_shutdown) == mpi_state.world_size - 2 | ||
| ) # "-2" to exclude ``Root`` and ``Monitor`` ranks | ||
| else: | ||
| raise ValueError(f"Unsupported operation: {operation_type}") | ||
| if shutdown_workers: | ||
| for rank_id in range( | ||
| communication.MPIRank.FIRST_WORKER, mpi_state.world_size | ||
| ): | ||
| communication.mpi_send_operation( | ||
| mpi_state.comm, | ||
| common.Operation.SHUTDOWN, | ||
| rank_id, | ||
| ) | ||
| communication.mpi_send_object( | ||
| mpi_state.comm, | ||
| common.Operation.SHUTDOWN, | ||
| communication.MPIRank.ROOT, | ||
| ) | ||
| if not MPI.Is_finalized(): | ||
| MPI.Finalize() | ||
| break # leave event loop and shutdown monitoring |
| # Copyright (C) 2021-2023 Modin authors | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| import weakref | ||
| import unidist.core.backends.mpi.core.common as common | ||
| import unidist.core.backends.mpi.core.communication as communication | ||
| mpi_state = communication.MPIState.get_instance() | ||
| # Logger configuration | ||
| # When building documentation we do not have MPI initialized so | ||
| # we use the condition to set "worker_0.log" in order to build it succesfully. | ||
| logger_name = "worker_{}".format(mpi_state.rank if mpi_state is not None else 0) | ||
| log_file = "{}.log".format(logger_name) | ||
| logger = common.get_logger(logger_name, log_file) | ||
| class ObjectStore: | ||
| """ | ||
| Class that stores local objects and provides access to them. | ||
| Notes | ||
| ----- | ||
| For now, the storage is local to the current worker process only. | ||
| """ | ||
| __instance = None | ||
| def __init__(self): | ||
| # Add local data {DataId : Data} | ||
| self._data_map = weakref.WeakKeyDictionary() | ||
| # "strong" references to data IDs {DataId : DataId} | ||
| # we are using dict here to improve performance when getting an element from it, | ||
| # whereas other containers would require O(n) complexity | ||
| self._data_id_map = {} | ||
| # Data owner {DataId : Rank} | ||
| self._data_owner_map = weakref.WeakKeyDictionary() | ||
| # Data serialized cache | ||
| self._serialization_cache = {} | ||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get instance of ``ObjectStore``. | ||
| Returns | ||
| ------- | ||
| ObjectStore | ||
| """ | ||
| if cls.__instance is None: | ||
| cls.__instance = ObjectStore() | ||
| return cls.__instance | ||
| def put(self, data_id, data): | ||
| """ | ||
| Put `data` to internal dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| An ID to data. | ||
| data : object | ||
| Data to be put. | ||
| """ | ||
| self._data_map[data_id] = data | ||
| def put_data_owner(self, data_id, rank): | ||
| """ | ||
| Put data location (owner rank) to internal dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| An ID to data. | ||
| rank : int | ||
| Rank number where the data resides. | ||
| """ | ||
| self._data_owner_map[data_id] = rank | ||
| def get(self, data_id): | ||
| """ | ||
| Get the data from a local dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| object | ||
| Return local data associated with `data_id`. | ||
| """ | ||
| return self._data_map[data_id] | ||
| def get_data_owner(self, data_id): | ||
| """ | ||
| Get the data owner rank. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| int | ||
| Rank number where the data resides. | ||
| """ | ||
| return self._data_owner_map[data_id] | ||
| def contains(self, data_id): | ||
| """ | ||
| Check if the data associated with `data_id` exists in a local dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| bool | ||
| Return the status if an object exist in local dictionary. | ||
| """ | ||
| return data_id in self._data_map | ||
| def contains_data_owner(self, data_id): | ||
| """ | ||
| Check if the data location info associated with `data_id` exists in a local dictionary. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| bool | ||
| Return the ``True`` status if an object location is known. | ||
| """ | ||
| return data_id in self._data_owner_map | ||
| def get_unique_data_id(self, data_id): | ||
| """ | ||
| Get the "strong" reference to the data ID if it is already stored locally. | ||
| If the passed data ID is not stored locally yet, save and return it. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| unidist.core.backends.common.data_id.DataID | ||
| The unique ID to data. | ||
| Notes | ||
| ----- | ||
| We need to use a unique data ID reference for the garbage colleactor to work correctly. | ||
| """ | ||
| if data_id not in self._data_id_map: | ||
| self._data_id_map[data_id] = data_id | ||
| return self._data_id_map[data_id] | ||
| def clear(self, cleanup_list): | ||
| """ | ||
| Clear "strong" references to data IDs from `cleanup_list`. | ||
| Parameters | ||
| ---------- | ||
| cleanup_list : list | ||
| List of data IDs. | ||
| Notes | ||
| ----- | ||
| The actual data will be collected later when there is no weak or | ||
| strong reference to data in the current worker. | ||
| """ | ||
| for data_id in cleanup_list: | ||
| self._data_id_map.pop(data_id, None) | ||
| def cache_serialized_data(self, data_id, data): | ||
| """ | ||
| Save serialized object for this `data_id` and rank. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| An ID to data. | ||
| data : object | ||
| Serialized data to cache. | ||
| """ | ||
| self._serialization_cache[data_id] = data | ||
| def is_already_serialized(self, data_id): | ||
| """ | ||
| Check if the data on this `data_id` is already serialized. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| bool | ||
| ``True`` if the data is already serialized. | ||
| """ | ||
| return data_id in self._serialization_cache | ||
| def get_serialized_data(self, data_id): | ||
| """ | ||
| Get serialized data on this `data_id`. | ||
| Parameters | ||
| ---------- | ||
| data_id : unidist.core.backends.common.data_id.DataID | ||
| An ID to data. | ||
| Returns | ||
| ------- | ||
| object | ||
| Cached serialized data associated with `data_id`. | ||
| """ | ||
| return self._serialization_cache[data_id] |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
1532398
261.29%100
5.26%11921
15.81%