You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

unidist

Package Overview
Dependencies
Maintainers
2
Versions
13
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

unidist - pypi Package Compare versions

Comparing version
0.4.1
to
0.5.0
+313
unidist/core/backends/mpi/core/local_object_store.py
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""`LocalObjectStore` functionality."""
import weakref
import unidist.core.backends.mpi.core.common as common
import unidist.core.backends.mpi.core.communication as communication
class LocalObjectStore:
"""
Class that stores local objects and provides access to them.
Notes
-----
The storage is local to the current worker process only.
"""
__instance = None
def __init__(self):
# Add local data {DataID : Data}
self._data_map = weakref.WeakKeyDictionary()
# "strong" references to data IDs {DataID : DataID}
# we are using dict here to improve performance when getting an element from it,
# whereas other containers would require O(n) complexity
self._data_id_map = {}
# Data owner {DataID : Rank}
self._data_owner_map = weakref.WeakKeyDictionary()
# Data was already sent to this ranks {DataID : [ranks]}
self._sent_data_map = weakref.WeakKeyDictionary()
# Data id generator
self._data_id_counter = 0
# Data serialized cache
self._serialization_cache = weakref.WeakKeyDictionary()
@classmethod
def get_instance(cls):
"""
Get instance of ``LocalObjectStore``.
Returns
-------
LocalObjectStore
"""
if cls.__instance is None:
cls.__instance = LocalObjectStore()
return cls.__instance
def maybe_update_data_id_map(self, data_id):
"""
Add a strong reference to the `data_id` if necessary.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
Notes
-----
The worker must have a strong reference to the external `data_id` until the owner process
send the `unidist.core.backends.common.Operation.CLEANUP` operation with this `data_id`.
"""
if (
data_id.owner_rank != communication.MPIState.get_instance().global_rank
and data_id not in self._data_id_map
):
self._data_id_map[data_id] = data_id
def put(self, data_id, data):
"""
Put `data` to internal dictionary.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
data : object
Data to be put.
"""
self._data_map[data_id] = data
self.maybe_update_data_id_map(data_id)
def put_data_owner(self, data_id, rank):
"""
Put data location (owner rank) to internal dictionary.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
rank : int
Rank number where the data resides.
"""
self._data_owner_map[data_id] = rank
self.maybe_update_data_id_map(data_id)
def get(self, data_id):
"""
Get the data from a local dictionary.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
Returns
-------
object
Return local data associated with `data_id`.
"""
return self._data_map[data_id]
def get_data_owner(self, data_id):
"""
Get the data owner rank.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
Returns
-------
int
Rank number where the data resides.
"""
return self._data_owner_map[data_id]
def contains(self, data_id):
"""
Check if the data associated with `data_id` exists in a local dictionary.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
Returns
-------
bool
Return the status if an object exist in local dictionary.
"""
return data_id in self._data_map
def contains_data_owner(self, data_id):
"""
Check if the data location info associated with `data_id` exists in a local dictionary.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
Returns
-------
bool
Return the ``True`` status if an object location is known.
"""
return data_id in self._data_owner_map
def clear(self, cleanup_list):
"""
Clear "strong" references to data IDs from `cleanup_list`.
Parameters
----------
cleanup_list : list
List of data IDs.
Notes
-----
The actual data will be collected later when there is no weak or
strong reference to data in the current worker.
"""
for data_id in cleanup_list:
self._data_id_map.pop(data_id, None)
def generate_data_id(self, gc):
"""
Generate unique ``MpiDataID`` instance.
Parameters
----------
gc : unidist.core.backends.mpi.core.executor.GarbageCollector
Local garbage collector reference.
Returns
-------
unidist.core.backends.mpi.core.common.MpiDataID
Unique data ID instance.
"""
data_id = common.MpiDataID(
communication.MPIState.get_instance().global_rank, self._data_id_counter, gc
)
self._data_id_counter += 1
return data_id
def generate_output_data_id(self, dest_rank, gc, num_returns=1):
"""
Generate unique list of ``unidist.core.backends.mpi.core.common.MpiDataID`` instance.
Parameters
----------
dest_rank : int
Ranks number where generated list will be located.
gc : unidist.core.backends.mpi.core.executor.GarbageCollector
Local garbage collector reference.
num_returns : int, default: 1
Generated list size.
Returns
-------
list
A list of unique ``MpiDataID`` instances.
"""
if num_returns == 1:
output_ids = self.generate_data_id(gc)
self.put_data_owner(output_ids, dest_rank)
elif num_returns == 0:
output_ids = None
else:
output_ids = [None] * num_returns
for i in range(num_returns):
output_id = self.generate_data_id(gc)
output_ids[i] = output_id
self.put_data_owner(output_id, dest_rank)
return output_ids
def cache_send_info(self, data_id, rank):
"""
Save communication event for this `data_id` and rank.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ``ID`` to data.
rank : int
Rank number where the data was sent.
"""
if data_id in self._sent_data_map:
self._sent_data_map[data_id].add(rank)
else:
self._sent_data_map[data_id] = set([rank])
def is_already_sent(self, data_id, rank):
"""
Check if communication event on this `data_id` and rank happened.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data
rank : int
Rank number to check.
Returns
-------
bool
``True`` if communication event already happened.
"""
return (data_id in self._sent_data_map) and (
rank in self._sent_data_map[data_id]
)
def cache_serialized_data(self, data_id, data):
"""
Save serialized object for this `data_id`.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
data : object
Serialized data to cache.
"""
self._serialization_cache[data_id] = data
self.maybe_update_data_id_map(data_id)
def is_already_serialized(self, data_id):
"""
Check if the data on this `data_id` is already serialized.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
Returns
-------
bool
``True`` if the data is already serialized.
"""
return data_id in self._serialization_cache
def get_serialized_data(self, data_id):
"""
Get serialized data on this `data_id`.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
Returns
-------
object
Cached serialized data associated with `data_id`.
"""
return self._serialization_cache[data_id]

Sorry, the diff of this file is too big to display

/*
* Copyright (C) 2021-2023 Modin authors
*
* SPDX-License-Identifier: Apache-2.0
*/
#include "memory.h"
#include <cstring>
#include <thread>
#include <vector>
namespace unidist {
uint8_t *pointer_logical_and(const uint8_t *address, uintptr_t bits) {
uintptr_t value = reinterpret_cast<uintptr_t>(address);
return reinterpret_cast<uint8_t *>(value & bits);
}
void fill(int64_t *buff, int64_t size, int64_t value){
std::fill(buff, buff+size, value);
}
void parallel_memcopy(uint8_t *dst,
const uint8_t *src,
int64_t nbytes,
uintptr_t block_size,
int num_threads) {
std::vector<std::thread> threadpool(num_threads);
uint8_t *left = pointer_logical_and(src + block_size - 1, ~(block_size - 1));
uint8_t *right = pointer_logical_and(src + nbytes, ~(block_size - 1));
int64_t num_blocks = (right - left) / block_size;
// Update right address
right = right - (num_blocks % num_threads) * block_size;
// Now we divide these blocks between available threads. The remainder is
// handled on the main thread.
int64_t chunk_size = (right - left) / num_threads;
int64_t prefix = left - src;
int64_t suffix = src + nbytes - right;
// Now the data layout is | prefix | k * num_threads * block_size | suffix |.
// We have chunk_size = k * block_size, therefore the data layout is
// | prefix | num_threads * chunk_size | suffix |.
// Each thread gets a "chunk" of k blocks.
// Start all threads first and handle leftovers while threads run.
for (int i = 0; i < num_threads; i++) {
threadpool[i] = std::thread(
memcpy, dst + prefix + i * chunk_size, left + i * chunk_size, chunk_size);
}
std::memcpy(dst, src, prefix);
std::memcpy(dst + prefix + num_threads * chunk_size, right, suffix);
for (auto &t : threadpool) {
if (t.joinable()) {
t.join();
}
}
}
} // namespace unidist
/*
* Copyright (C) 2021-2023 Modin authors
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef MEMORY_H
#define MEMORY_H
#include <stdint.h>
namespace unidist {
// A helper function for doing memcpy with multiple threads. This is required
// to saturate the memory bandwidth of modern cpus.
void parallel_memcopy(uint8_t *dst,
const uint8_t *src,
int64_t nbytes,
uintptr_t block_size,
int num_threads);
void fill(int64_t *buff, int64_t size, int64_t value);
} // namespace unidist
#endif // MEMORY_H
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""MPI backend functionality related to `monitor` process."""
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Monitoring process."""
try:
import mpi4py
except ImportError:
raise ImportError(
"Missing dependency 'mpi4py'. Use pip or conda to install it."
) from None
import unidist.core.backends.mpi.core.common as common
import unidist.core.backends.mpi.core.communication as communication
from unidist.core.backends.mpi.core.monitor.shared_memory_manager import (
SharedMemoryManager,
)
from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore
# TODO: Find a way to move this after all imports
mpi4py.rc(recv_mprobe=False, initialize=False)
from mpi4py import MPI # noqa: E402
mpi_state = communication.MPIState.get_instance()
logger_name = "monitor_{}".format(mpi_state.global_rank if mpi_state is not None else 0)
log_file = "{}.log".format(logger_name)
monitor_logger = common.get_logger(logger_name, log_file)
class TaskCounter:
__instance = None
def __init__(self):
self.task_counter = 0
@classmethod
def get_instance(cls):
"""
Get instance of ``TaskCounter``.
Returns
-------
TaskCounter
"""
if cls.__instance is None:
cls.__instance = TaskCounter()
return cls.__instance
def increment(self):
"""Increment task counter by one."""
self.task_counter += 1
class DataIDTracker:
"""
Class that keeps track of all completed (ready) data IDs.
"""
__instance = None
def __init__(self):
self.completed_data_ids = set()
@classmethod
def get_instance(cls):
"""
Get instance of ``DataIDTracker``.
Returns
-------
DataIDTracker
"""
if cls.__instance is None:
cls.__instance = DataIDTracker()
return cls.__instance
def add_to_completed(self, data_ids):
"""
Add the given data IDs to the set of completed (ready) data IDs.
Parameters
----------
data_ids : list
List of data IDs to be added to the set of completed (ready) data IDs.
"""
self.completed_data_ids.update(data_ids)
class WaitHandler:
"""
Class that handles wait requests.
"""
__instance = None
def __init__(self):
self.completed_data_ids = set()
self.awaited_data_ids = []
self.ready = []
self.num_returns = 0
@classmethod
def get_instance(cls):
"""
Get instance of ``WaitHandler``.
Returns
-------
WaitHandler
"""
if cls.__instance is None:
cls.__instance = WaitHandler()
return cls.__instance
def add_wait_request(self, awaited_data_ids, num_returns):
"""
Add a wait request for a list of data IDs and the number of data IDs to be awaited.
Parameters
----------
awaited_data_ids : list
List of data IDs to be awaited.
num_returns : int
The number of ``DataID``-s that should be returned as ready.
"""
self.awaited_data_ids = awaited_data_ids
self.num_returns = num_returns
def process_wait_requests(self):
"""
Check if wait requests are pending to be processed.
Process pending wait requests and send the data_ids to the requester
if number of data IDs that are ready are equal to the num_returns.
"""
data_id_tracker = DataIDTracker.get_instance()
i = 0
if self.awaited_data_ids:
while i < len(self.awaited_data_ids):
data_id = self.awaited_data_ids[i]
if data_id in data_id_tracker.completed_data_ids:
self.ready.append(data_id)
self.awaited_data_ids.remove(data_id)
if len(self.ready) == self.num_returns:
operation_data = {
"ready": self.ready,
"not_ready": self.awaited_data_ids,
}
communication.mpi_send_object(
communication.MPIState.get_instance().global_comm,
operation_data,
communication.MPIRank.ROOT,
)
self.ready = []
self.awaited_data_ids = []
else:
i += 1
def monitor_loop():
"""
Infinite monitor operations processing loop.
Tracks the number of executed tasks and completed (ready) data IDs.
Notes
-----
The loop exits on special cancelation operation.
``unidist.core.backends.mpi.core.common.Operations`` defines a set of supported operations.
"""
task_counter = TaskCounter.get_instance()
mpi_state = communication.MPIState.get_instance()
wait_handler = WaitHandler.get_instance()
data_id_tracker = DataIDTracker.get_instance()
shared_store = SharedObjectStore.get_instance()
shm_manager = SharedMemoryManager()
# Barrier to check if monitor process is ready to start the communication loop
mpi_state.global_comm.Barrier()
monitor_logger.debug("Monitor loop started")
# Once all workers excluding ``Root`` and ``Monitor`` ranks are ready to shutdown,
# ``Monitor` sends the shutdown signal to every worker, as well as notifies ``Root`` that
# it can exit the program.
workers_ready_to_shutdown = []
shutdown_workers = False
while True:
# Listen receive operation from any source
operation_type, source_rank = communication.mpi_recv_operation(
mpi_state.global_comm
)
monitor_logger.debug(
f"common.Operation processing - {operation_type} from {source_rank} rank"
)
# Proceed the request
if operation_type == common.Operation.TASK_DONE:
task_counter.increment()
output_data_ids = communication.mpi_recv_object(
mpi_state.global_comm, source_rank
)
data_id_tracker.add_to_completed(output_data_ids)
wait_handler.process_wait_requests()
elif operation_type == common.Operation.WAIT:
# TODO: WAIT request can be received from several workers,
# but not only from master. Handle this case when requested.
operation_data = communication.mpi_recv_object(
mpi_state.global_comm, source_rank
)
awaited_data_ids = operation_data["data_ids"]
num_returns = operation_data["num_returns"]
wait_handler.add_wait_request(awaited_data_ids, num_returns)
wait_handler.process_wait_requests()
elif operation_type == common.Operation.GET_TASK_COUNT:
# We use a blocking send here because the receiver is waiting for the result.
communication.mpi_send_object(
mpi_state.global_comm,
task_counter.task_counter,
source_rank,
)
elif operation_type == common.Operation.RESERVE_SHARED_MEMORY:
request = communication.mpi_recv_object(mpi_state.global_comm, source_rank)
reservation_info = shm_manager.get(request["id"])
if reservation_info is None:
reservation_info = shm_manager.put(request["id"], request["size"])
is_first_request = True
else:
is_first_request = False
communication.mpi_send_object(
mpi_state.global_comm,
data={**reservation_info, "is_first_request": is_first_request},
dest_rank=source_rank,
)
elif operation_type == common.Operation.REQUEST_SHARED_DATA:
info_package = communication.mpi_recv_object(
mpi_state.global_comm, source_rank
)
data_id = info_package["id"]
if data_id is None:
raise ValueError("Requested DataID is None")
reservation_info = shm_manager.get(data_id)
if reservation_info is None:
raise RuntimeError(f"The monitor does not know the data id {data_id}")
sh_buf = shared_store.get_shared_buffer(
reservation_info["first_index"], reservation_info["last_index"]
)
communication.mpi_send_buffer(
mpi_state.global_comm,
sh_buf,
dest_rank=source_rank,
data_type=MPI.BYTE,
)
elif operation_type == common.Operation.CLEANUP:
cleanup_list = communication.recv_serialized_data(
mpi_state.global_comm, source_rank
)
cleanup_list = [common.MpiDataID(*tpl) for tpl in cleanup_list]
shm_manager.clear(cleanup_list)
elif operation_type == common.Operation.READY_TO_SHUTDOWN:
workers_ready_to_shutdown.append(source_rank)
shutdown_workers = len(workers_ready_to_shutdown) == len(mpi_state.workers)
elif operation_type == common.Operation.SHUTDOWN:
SharedObjectStore.get_instance().finalize()
if not MPI.Is_finalized():
MPI.Finalize()
break # leave event loop and shutdown monitoring
else:
raise ValueError(f"Unsupported operation: {operation_type}")
if shutdown_workers:
for rank_id in mpi_state.workers + mpi_state.monitor_processes:
if rank_id != mpi_state.global_rank:
communication.mpi_send_operation(
mpi_state.global_comm,
common.Operation.SHUTDOWN,
rank_id,
)
communication.mpi_send_object(
mpi_state.global_comm,
common.Operation.SHUTDOWN,
communication.MPIRank.ROOT,
)
SharedObjectStore.get_instance().finalize()
if not MPI.Is_finalized():
MPI.Finalize()
break # leave event loop and shutdown monitoring
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""`SharedMemoryManager` functionality."""
from array import array
try:
import mpi4py
except ImportError:
raise ImportError(
"Missing dependency 'mpi4py'. Use pip or conda to install it."
) from None
from unidist.core.backends.mpi.core import communication, common
from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore
from unidist.core.backends.mpi.utils import ImmutableDict
# TODO: Find a way to move this after all imports
mpi4py.rc(recv_mprobe=False, initialize=False)
from mpi4py import MPI # noqa: E402
class FreeMemoryRange:
"""
Class that helps keep track of free space in shared memory.
Parameters
----------
range_len : int
Memory length.
"""
def __init__(self, range_len):
self.range = [[0, range_len]]
def occupy(self, count=1):
"""
Take the place of a certain length in memory.
Parameters
----------
count : int
Required number of elements in memory.
Returns
-------
int
First index in memory.
int
Last index in memory.
"""
first_index = None
last_index = None
for i in range(len(self.range)):
if count <= self.range[i][1] - self.range[i][0]:
first_index = self.range[i][0]
last_index = first_index + count
if self.range[i][1] == last_index:
self.range = self.range[:i] + self.range[i + 1 :]
else:
self.range[i][0] = last_index
break
return first_index, last_index
def release(self, first_index, last_index):
"""
Free up memory space.
Parameters
----------
first_index : int
First index in memory.
last_index : int
Last index in memory (not inclusive).
"""
if len(self.range) == 0:
self.range.append([first_index, last_index])
elif self.range[-1][1] < first_index:
self.range.append([first_index, last_index])
else:
for i in range(len(self.range)):
if self.range[i][0] == last_index:
if self.range[i - 1][1] == first_index:
self.range[i - 1][1] = self.range[i][1]
self.range = self.range[:i] + self.range[i + 1 :]
else:
self.range[i][0] = first_index
break
if self.range[i][1] == first_index:
if len(self.range) > i + 1 and self.range[i + 1][0] == last_index:
self.range[i + 1][0] = self.range[i][0]
self.range = self.range[:i] + self.range[i + 1 :]
else:
self.range[i][1] = last_index
break
if self.range[i][0] > last_index:
self.range = (
self.range[:i] + [[first_index, last_index]] + self.range[i:]
)
break
class SharedMemoryManager:
"""
Class that helps manage shared memory.
"""
def __init__(self):
self.shared_store = None
if common.is_shared_memory_supported():
self.shared_store = SharedObjectStore.get_instance()
self._reservation_info = {}
self.free_memory = FreeMemoryRange(self.shared_store.shared_memory_size)
self.free_service_indexes = FreeMemoryRange(
self.shared_store.service_info_max_count
)
self.pending_cleanup = []
self.monitor_comm = None
mpi_state = communication.MPIState.get_instance()
monitor_group = mpi_state.global_comm.Get_group().Incl(
mpi_state.monitor_processes
)
self.monitor_comm = mpi_state.global_comm.Create_group(monitor_group)
def get(self, data_id):
"""
Get the reservation information for the `data_id`.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
Returns
-------
dict or None
Reservation information.
Notes
-----
The `dict` is returned if a reservation has been specified, otherwise `False` is returned.
"""
if self.shared_store is None:
raise RuntimeError(
"`SharedMemoryManager` cannot be used if the shared object storage is not enabled."
)
if data_id not in self._reservation_info:
return None
return self._reservation_info[data_id]
def put(self, data_id, memory_len):
"""
Reserve memory for the `data_id`.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
memory_len : int
Required memory length.
Returns
-------
dict
Reservation information.
"""
if self.shared_store is None:
raise RuntimeError(
"`SharedMemoryManager` cannot be used if the shared object storage is not enabled."
)
first_index, last_index = self.free_memory.occupy(memory_len)
service_index, _ = self.free_service_indexes.occupy(SharedObjectStore.INFO_SIZE)
if first_index is None:
raise MemoryError("Overflow memory")
if service_index is None:
raise MemoryError("Overflow service memory")
reservation_info = ImmutableDict(
{
"first_index": first_index,
"last_index": last_index,
"service_index": service_index,
}
)
self._reservation_info[data_id] = reservation_info
return reservation_info
def clear(self, data_id_list):
"""
Clear shared memory for the list of `DataID` if possible.
Parameters
----------
data_id_list : list
List of `DataID`.
"""
if self.shared_store is None:
return
cleanup_list = self.pending_cleanup + data_id_list
self.pending_cleanup = []
has_refs = array(
"B",
[
1
if data_id in self._reservation_info
and self.shared_store.get_ref_number(
data_id, self._reservation_info[data_id]["service_index"]
)
> 0
else 0
for data_id in cleanup_list
],
)
if self.monitor_comm is not None:
all_refs = array("B", [0] * len(has_refs))
self.monitor_comm.Allreduce(has_refs, all_refs, MPI.MAX)
else:
all_refs = has_refs
for data_id, referers in zip(cleanup_list, all_refs):
if referers == 0:
if data_id in self._reservation_info:
reservation_info = self._reservation_info[data_id]
self.shared_store.delete_service_info(
data_id, reservation_info["service_index"]
)
self.free_service_indexes.release(
reservation_info["service_index"],
reservation_info["service_index"] + SharedObjectStore.INFO_SIZE,
)
self.free_memory.release(
reservation_info["first_index"], reservation_info["last_index"]
)
del self._reservation_info[data_id]
else:
self.pending_cleanup.append(data_id)
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""`SharedObjectStore` functionality."""
import os
import sys
import time
import warnings
import psutil
import weakref
from unidist.core.backends.mpi.core._memory import parallel_memcopy, fill
from unidist.core.backends.mpi.utils import ImmutableDict
try:
import mpi4py
except ImportError:
raise ImportError(
"Missing dependency 'mpi4py'. Use pip or conda to install it."
) from None
from unidist.config.backends.mpi.envvars import (
MpiSharedObjectStoreMemory,
MpiSharedServiceMemory,
MpiSharedObjectStoreThreshold,
MpiBackoff,
)
from unidist.core.backends.mpi.core import common, communication
from unidist.core.backends.mpi.core.serialization import (
deserialize_complex_data,
)
# TODO: Find a way to move this after all imports
mpi4py.rc(recv_mprobe=False, initialize=False)
from mpi4py import MPI # noqa: E402
class WinLock:
"""
Class that helps to synchronize the write to shared memory.
Parameters
----------
win : MPI.win
The MPI window that was used to allocate the shared memory.
Notes
-----
This class should be used as a context manager.
"""
def __init__(self, win):
self.win = win
def __enter__(self):
"""Lock the current MPI.Window for other processes."""
self.win.Lock(communication.MPIRank.MONITOR)
def __exit__(self, *args):
"""Unlock the current MPI.Window for other processes."""
self.win.Unlock(communication.MPIRank.MONITOR)
class SharedObjectStore:
"""
Class that provides access to data in shared memory.
Notes
-----
This class initializes and manages shared memory.
"""
__instance = None
# Service constants defining the structure of the service buffer
# The amount of service information for one data object in shared memory.
INFO_SIZE = 4
# Index of service information for the first part of the DataID.
WORKER_ID_INDEX = 0
# Index of service information for the second part of the DataID.
DATA_NUMBER_INDEX = 1
# Index of service information for the first shared memory index where the data is located.
FIRST_DATA_INDEX = 2
# Index of service information to count the number of data references,
# which shows how many processes are using this data.
REFERENCES_NUMBER = 3
def __init__(self):
# The `MPI.Win` object to manage shared memory for data
self.win = None
# The `MPI.Win` object to manage shared memory for service purposes
self.service_win = None
# `MPI.memory` object for reading/writing data from/to shared memory
self.shared_buffer = None
# `memoryview` object for reading/writing data from/to service shared memory.
# Service shared buffer includes service information about written shared data.
# The service info is set by the worker who sends the data to shared memory
# and is removed by the monitor if the data is cleared.
# The service info indicates that the current data is written to shared memory
# and shows the actual location and number of references.
self.service_shared_buffer = None
# Length of shared memory buffer in bytes
self.shared_memory_size = None
# Length of service shared memory buffer in items
self.service_info_max_count = None
mpi_state = communication.MPIState.get_instance()
# Initialize all properties above
if common.is_shared_memory_supported(raise_warning=mpi_state.is_root_process()):
self._allocate_shared_memory()
# Logger will be initialized after `communicator.MPIState`
self.logger = None
# Shared memory range {DataID: dict}
# Shared information is the necessary information to properly deserialize data from shared memory.
self._shared_info = weakref.WeakKeyDictionary()
# The list of `weakref.finalize` which should be canceled before closing the shared memory.
self.finalizers = []
def _get_allowed_memory_size(self):
"""
Get allowed memory size for allocate shared memory.
Returns
-------
int
The number of bytes available to allocate shared memory.
"""
virtual_memory = psutil.virtual_memory().total
if sys.platform.startswith("linux"):
shm_fd = os.open("/dev/shm", os.O_RDONLY)
try:
shm_stats = os.fstatvfs(shm_fd)
system_memory = shm_stats.f_bsize * shm_stats.f_bavail
if system_memory / (virtual_memory / 2) < 0.99:
warnings.warn(
f"The size of /dev/shm is too small ({system_memory} bytes). The required size "
+ f"at least half of RAM ({virtual_memory // 2} bytes). Please, delete files in /dev/shm or "
+ "increase size of /dev/shm with --shm-size in Docker."
)
finally:
os.close(shm_fd)
else:
system_memory = virtual_memory
return system_memory
def _allocate_shared_memory(self):
"""
Allocate shared memory.
"""
mpi_state = communication.MPIState.get_instance()
shared_object_store_memory = MpiSharedObjectStoreMemory.get()
shared_service_memory = MpiSharedServiceMemory.get()
# Use only 95% of available shared memory because
# the rest is needed for intermediate shared buffers
# handled by MPI itself for communication of small messages.
allowed_memory_size = int(self._get_allowed_memory_size() * 0.95)
if shared_object_store_memory is not None:
if shared_service_memory is not None:
self.shared_memory_size = shared_object_store_memory
self.service_memory_size = shared_service_memory
else:
self.shared_memory_size = shared_object_store_memory
# To avoid division by 0
if MpiSharedObjectStoreThreshold.get() > 0:
self.service_memory_size = min(
# allowed memory size for service buffer
allowed_memory_size - self.shared_memory_size,
# maximum amount of memory required for the service buffer
(self.shared_memory_size // MpiSharedObjectStoreThreshold.get())
* (self.INFO_SIZE * MPI.LONG.size),
)
else:
self.service_memory_size = (
allowed_memory_size - self.shared_memory_size
)
else:
if shared_service_memory is not None:
self.service_memory_size = shared_service_memory
self.shared_memory_size = allowed_memory_size - self.service_memory_size
else:
A = allowed_memory_size
B = MpiSharedObjectStoreThreshold.get()
C = self.INFO_SIZE * MPI.LONG.size
# "x" is shared_memory_size
# "y" is service_memory_size
# requirements:
# x + y = A
# y = min[ (x/B) * C, 0.01 * A ]
# calculation results:
# if B > 99 * C:
# x = (A * B) / (B + C)
# y = (A * C) / (B + C)
# else:
# x = 0.99 * A
# y = 0.01 * A
if B > 99 * C:
self.shared_memory_size = (A * B) // (B + C)
self.service_memory_size = (A * C) // (B + C)
else:
self.shared_memory_size = int(0.99 * A)
self.service_memory_size = int(0.01 * A)
if self.shared_memory_size > allowed_memory_size:
raise ValueError(
"Memory for shared object storage cannot be allocated "
+ "because the value set to `MpiSharedObjectStoreMemory` exceeds the available memory."
)
if self.service_memory_size > allowed_memory_size:
raise ValueError(
"Memory for shared service storage cannot be allocated "
+ "because the value set to `MpiSharedServiceMemory` exceeds the available memory."
)
if self.shared_memory_size + self.service_memory_size > allowed_memory_size:
raise ValueError(
"The sum of the `MpiSharedObjectStoreMemory` and `MpiSharedServiceMemory` values is greater "
+ "than the available amount of memory."
)
# Shared memory is allocated only once by the monitor process.
info = MPI.Info.Create()
info.Set("alloc_shared_noncontig", "true")
self.win = MPI.Win.Allocate_shared(
self.shared_memory_size * MPI.BYTE.size
if mpi_state.is_monitor_process()
else 0,
MPI.BYTE.size,
comm=mpi_state.host_comm,
info=info,
)
self.shared_buffer, _ = self.win.Shared_query(communication.MPIRank.MONITOR)
self.service_info_max_count = (
self.service_memory_size
// (self.INFO_SIZE * MPI.LONG.size)
* self.INFO_SIZE
)
self.service_win = MPI.Win.Allocate_shared(
self.service_info_max_count * MPI.LONG.size
if mpi_state.is_monitor_process()
else 0,
MPI.LONG.size,
comm=mpi_state.host_comm,
info=info,
)
service_buffer, _ = self.service_win.Shared_query(communication.MPIRank.MONITOR)
self.service_shared_buffer = memoryview(service_buffer).cast("l")
# Set -1 to the service buffer because 0 is a valid value and may be recognized by mistake.
if mpi_state.is_monitor_process():
fill(self.service_shared_buffer, -1)
def _parse_data_id(self, data_id):
"""
Parse `DataID` object to pair of int.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
The data identifier to be converted to a numerical form
Returns
-------
tuple
Pair of int that define the DataID
"""
splited_id = str(data_id).replace(")", "").split("_")
return int(splited_id[1]), int(splited_id[3])
def _increment_ref_number(self, data_id, service_index):
"""
Increment the number of references to indicate to the monitor that this data is being used.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
service_index : int
The service buffer index.
Notes
-----
This function create `weakref.finalizer' with decrement function which will be called after data_id collecting.
"""
if MPI.Is_finalized():
return
if service_index is None:
raise KeyError(
"it is not possible to increment the reference number for this data_id because it is not part of the shared data"
)
with WinLock(self.service_win):
prev_ref_number = self.service_shared_buffer[
service_index + self.REFERENCES_NUMBER
]
self.service_shared_buffer[service_index + self.REFERENCES_NUMBER] = (
prev_ref_number + 1
)
self.logger.debug(
f"Rank {communication.MPIState.get_instance().global_rank}: Increment references number for {data_id} from {prev_ref_number} to {prev_ref_number + 1}"
)
self.finalizers.append(
weakref.finalize(
data_id, self._decrement_ref_number, str(data_id), service_index
)
)
def _decrement_ref_number(self, data_id, service_index):
"""
Decrement the number of references to indicate to the monitor that this data is no longer used.
When references count is 0, it can be cleared.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
service_index : int
The service buffer index.
Notes
-----
This function is called in `weakref.finalizer' after data_id collecting.
"""
# we must set service_index in args because the shared_info will be deleted before than this function is called
if MPI.Is_finalized():
return
if self._check_service_info(data_id, service_index):
with WinLock(self.service_win):
prev_ref_number = self.service_shared_buffer[
service_index + self.REFERENCES_NUMBER
]
self.service_shared_buffer[service_index + self.REFERENCES_NUMBER] = (
prev_ref_number - 1
)
self.logger.debug(
f"Rank {communication.MPIState.get_instance().global_rank}: Decrement references number for {data_id} from {prev_ref_number} to {prev_ref_number - 1}"
)
def _put_service_info(self, service_index, data_id, first_index):
"""
Set service information about written shared data.
Parameters
----------
service_index : int
The service buffer index.
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
first_index : int
The first index of data in the shared buffer.
Notes
-----
This information must be set after writing data to shared memory.
"""
worker_id, data_number = self._parse_data_id(data_id)
with WinLock(self.service_win):
self.service_shared_buffer[
service_index + self.FIRST_DATA_INDEX
] = first_index
self.service_shared_buffer[service_index + self.REFERENCES_NUMBER] = 1
self.service_shared_buffer[
service_index + self.DATA_NUMBER_INDEX
] = data_number
self.service_shared_buffer[service_index + self.WORKER_ID_INDEX] = worker_id
self.finalizers.append(
weakref.finalize(
data_id, self._decrement_ref_number, str(data_id), service_index
)
)
def _check_service_info(self, data_id, service_index):
"""
Check if the `data_id` is in the shared memory on the current host.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
service_index : int
The service buffer index.
Returns
-------
bool
Return the ``True`` status if `data_id` is in the shared memory.
Notes
-----
This check ensures that the data is physically located in shared memory.
"""
worker_id, data_number = self._parse_data_id(data_id)
w_id = self.service_shared_buffer[service_index + self.WORKER_ID_INDEX]
d_id = self.service_shared_buffer[service_index + self.DATA_NUMBER_INDEX]
return w_id == worker_id and d_id == data_number
def _put_shared_info(self, data_id, shared_info):
"""
Put required information to deserialize `data_id`.
Parameters
----------
data_id : uunidist.core.backends.mpi.core.common.MpiDataID
shared_info : unidist.core.backends.mpi.utils.ImmutableDict
Information required for data deserialization
"""
if not isinstance(shared_info, ImmutableDict):
raise ValueError("Shared info should be immutable.")
if data_id not in self._shared_info:
self._shared_info[data_id] = shared_info
def _read_from_shared_buffer(self, data_id, shared_info):
"""
Read and deserialize data from the shared buffer.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
shared_info : dict
Information for correct deserialization.
Returns
-------
object
Data for the current Id.
"""
buffer_lens = shared_info["raw_buffers_len"]
buffer_count = shared_info["buffer_count"]
s_data_len = shared_info["s_data_len"]
service_index = shared_info["service_index"]
first_index = self.service_shared_buffer[service_index + self.FIRST_DATA_INDEX]
s_data_last_index = first_index + s_data_len
s_data = self.shared_buffer[first_index:s_data_last_index].toreadonly()
prev_last_index = s_data_last_index
raw_buffers = []
for raw_buffer_len in buffer_lens:
raw_last_index = prev_last_index + raw_buffer_len
raw_buffers.append(
self.shared_buffer[prev_last_index:raw_last_index].toreadonly()
)
prev_last_index = raw_last_index
data = deserialize_complex_data(s_data, raw_buffers, buffer_count)
self.logger.debug(
f"Rank {communication.MPIState.get_instance().global_rank}: Get {data_id} from {first_index} to {prev_last_index}. Service index: {service_index}"
)
return data
def _write_to_shared_buffer(self, data_id, reservation_data, serialized_data):
"""
Write serialized data to the shared buffer.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
reservation_data : dict
Information about the reserved space in shared memory for the current DataID.
serialized_data : dict
Serialized data.
Returns
-------
dict
Information required for correct data deserialization
"""
first_index = reservation_data["first_index"]
last_index = reservation_data["last_index"]
service_index = reservation_data["service_index"]
s_data = serialized_data["s_data"]
raw_buffers = serialized_data["raw_buffers"]
buffer_count = serialized_data["buffer_count"]
s_data_len = len(s_data)
buffer_lens = []
s_data_first_index = first_index
s_data_last_index = s_data_first_index + s_data_len
if s_data_last_index > last_index:
raise ValueError("Not enough shared space for data")
self.shared_buffer[s_data_first_index:s_data_last_index] = s_data
last_prev_index = s_data_last_index
for i, raw_buffer in enumerate(raw_buffers):
raw_buffer_first_index = last_prev_index
raw_buffer_len = len(raw_buffer)
last_prev_index = raw_buffer_first_index + len(raw_buffer)
if last_prev_index > last_index:
raise ValueError(f"Not enough shared space for {i} raw_buffer")
parallel_memcopy(
raw_buffer,
self.shared_buffer[raw_buffer_first_index:last_prev_index],
6,
)
buffer_lens.append(raw_buffer_len)
self.logger.debug(
f"Rank {communication.MPIState.get_instance().global_rank}: PUT {data_id} from {first_index} to {last_prev_index}. Service index: {service_index}"
)
return common.MetadataPackage.get_shared_info(
data_id, s_data_len, buffer_lens, buffer_count, service_index
)
def _sync_shared_memory_from_another_host(
self, comm, data_id, owner_rank, first_index, last_index, service_index
):
"""
Receive shared data from another host including the owner's rank
Parameters
----------
comm : object
MPI communicator object.
data_id : unidist.core.backends.mpi.core.common.MpiDataID
Data identifier
owner_rank : int
Rank of the owner process.
first_index : int
First index in shared memory.
last_index : int
Last index in shared memory.
service_index : int
Service buffer index.
Notes
-----
After writting data, service information should be set.
"""
sh_buf = self.get_shared_buffer(first_index, last_index)
# recv serialized data to shared memory
owner_monitor = (
communication.MPIState.get_instance().get_monitor_by_worker_rank(owner_rank)
)
communication.send_simple_operation(
comm,
operation_type=common.Operation.REQUEST_SHARED_DATA,
operation_data={"id": data_id},
dest_rank=owner_monitor,
)
communication.mpi_recv_buffer(comm, owner_monitor, sh_buf)
self.logger.debug(
f"Rank {communication.MPIState.get_instance().global_rank}: Sync_copy {data_id} from {owner_rank} rank. Put data from {first_index} to {last_index}. Service index: {service_index}"
)
@classmethod
def get_instance(cls):
"""
Get instance of ``SharedObjectStore``.
Returns
-------
SharedObjectStore
"""
if cls.__instance is None:
cls.__instance = SharedObjectStore()
if cls.__instance.logger is None:
logger_name = f"shared_store_{communication.MPIState.get_instance().host}"
cls.__instance.logger = common.get_logger(logger_name, f"{logger_name}.log")
return cls.__instance
def is_allocated(self):
"""
Check if the shared memory is allocated and ready to put data.x
Returns
-------
bool
True ot False.
"""
return self.shared_buffer is not None
def should_be_shared(self, data):
"""
Check if data should be sent using shared memory.
Parameters
----------
data : dict
Serialized data to check its size.
Returns
-------
bool
Return the ``True`` status if data should be sent using shared memory.
"""
data_size = len(data["s_data"]) + sum([len(buf) for buf in data["raw_buffers"]])
return data_size > MpiSharedObjectStoreThreshold.get()
def contains(self, data_id):
"""
Check if the store contains the `data_id` information required to deserialize the data.
Returns
-------
bool
Return the ``True`` status if shared store contains required information.
Notes
-----
This check does not ensure that the data is physically located in shared memory.
"""
return data_id in self._shared_info
def get_shared_info(self, data_id):
"""
Get required information to correct deserialize `data_id` from shared memory.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
Returns
-------
dict
Information required for data deserialization
"""
return self._shared_info[data_id]
def get_ref_number(self, data_id, service_index):
"""
Get current references count of data_id by service index.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
service_index : int
The service buffer index.
Returns
-------
int
The number of references to this data_id
"""
# we must to set service_index in args because this function is called from monitor which can not known the shared_info
if not self._check_service_info(data_id, service_index):
return 0
return self.service_shared_buffer[service_index + self.REFERENCES_NUMBER]
def get_shared_buffer(self, first_index, last_index):
"""
Get the requested range of shared memory
Parameters
----------
first_index : int
Start of the requested range.
last_index : int
End of the requested range. (excluding)
Notes
-----
This function is used to synchronize shared memory between different hosts.
"""
return self.shared_buffer[first_index:last_index]
def delete_service_info(self, data_id, service_index):
"""
Delete service information for the current data Id.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
service_index : int
The service buffer index.
Notes
-----
This function should be called by the monitor during the cleanup of shared data.
"""
with WinLock(self.service_win):
# Read actual value
old_worker_id = self.service_shared_buffer[
service_index + self.WORKER_ID_INDEX
]
old_data_id = self.service_shared_buffer[
service_index + self.DATA_NUMBER_INDEX
]
old_first_index = self.service_shared_buffer[
service_index + self.FIRST_DATA_INDEX
]
old_references_number = self.service_shared_buffer[
service_index + self.REFERENCES_NUMBER
]
# check if data_id is correct
if self._parse_data_id(data_id) == (old_worker_id, old_data_id):
self.service_shared_buffer[service_index + self.WORKER_ID_INDEX] = -1
self.service_shared_buffer[service_index + self.DATA_NUMBER_INDEX] = -1
self.service_shared_buffer[service_index + self.FIRST_DATA_INDEX] = -1
self.service_shared_buffer[service_index + self.REFERENCES_NUMBER] = -1
self.logger.debug(
f"Rank {communication.MPIState.get_instance().global_rank}: Clear {data_id}. Service index: {service_index} First index: {old_first_index} References number: {old_references_number}"
)
else:
self.logger.debug(
f"Rank {communication.MPIState.get_instance().global_rank}: Did not clear {data_id}, because there are was written another data_id: Data_ID(rank_{old_worker_id}_id_{old_data_id})"
)
self.logger.debug(
f"Service index: {service_index} First index: {old_first_index} References number: {old_references_number}"
)
raise RuntimeError("Unexpected data_id for cleanup shared memory")
def put(self, data_id, serialized_data):
"""
Put data into shared memory.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
serialized_data : dict
Serialized data to put into the storage.
"""
mpi_state = communication.MPIState.get_instance()
data_size = len(serialized_data["s_data"]) + sum(
[len(buf) for buf in serialized_data["raw_buffers"]]
)
# reserve shared memory
reservation_data = communication.send_reserve_operation(
mpi_state.global_comm, data_id, data_size
)
service_index = reservation_data["service_index"]
first_index = reservation_data["first_index"]
# write into shared buffer
shared_info = self._write_to_shared_buffer(
data_id, reservation_data, serialized_data
)
# put service info
self._put_service_info(service_index, data_id, first_index)
# put shared info
self._put_shared_info(data_id, shared_info)
def get(self, data_id, owner_rank, shared_info):
"""
Get data from another worker using shared memory.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
owner_rank : int
The rank that sent the data.
shared_info : dict
The necessary information to properly deserialize data from shared memory.
"""
mpi_state = communication.MPIState.get_instance()
s_data_len = shared_info["s_data_len"]
raw_buffers_len = shared_info["raw_buffers_len"]
service_index = shared_info["service_index"]
buffer_count = shared_info["buffer_count"]
# check data in shared memory
if not self._check_service_info(data_id, service_index):
# reserve shared memory
shared_data_len = s_data_len + sum([buf for buf in raw_buffers_len])
reservation_info = communication.send_reserve_operation(
mpi_state.global_comm, data_id, shared_data_len
)
service_index = reservation_info["service_index"]
# check if worker should sync shared buffer or it is doing by another worker
if reservation_info["is_first_request"]:
# syncronize shared buffer
self._sync_shared_memory_from_another_host(
mpi_state.global_comm,
data_id,
owner_rank,
reservation_info["first_index"],
reservation_info["last_index"],
service_index,
)
# put service info
self._put_service_info(
service_index, data_id, reservation_info["first_index"]
)
else:
# wait while another worker syncronize shared buffer
while not self._check_service_info(data_id, service_index):
time.sleep(MpiBackoff.get())
# put shared info with updated data_id and service_index
shared_info = common.MetadataPackage.get_shared_info(
data_id, s_data_len, raw_buffers_len, buffer_count, service_index
)
self._put_shared_info(data_id, shared_info)
# increment ref
self._increment_ref_number(data_id, shared_info["service_index"])
# read from shared buffer and deserialized
return self._read_from_shared_buffer(data_id, shared_info)
def finalize(self):
"""
Release used resources.
Notes
-----
Shared store should be finalized before MPI.Finalize().
"""
if self.win is not None:
self.win.Free()
self.win = None
if self.service_win is not None:
self.service_win.Free()
self.service_win = None
for f in self.finalizers:
f.detach()
+30
-14
Metadata-Version: 2.1
Name: unidist
Version: 0.4.1
Version: 0.5.0
Summary: Unified Distributed Execution

@@ -9,8 +9,22 @@ Home-page: https://github.com/modin-project/unidist

Description-Content-Type: text/markdown
License-File: LICENSE
License-File: AUTHORS
Requires-Dist: packaging
Requires-Dist: cloudpickle
Provides-Extra: ray
Requires-Dist: ray[default]>=1.13.0; extra == "ray"
Requires-Dist: pydantic<2; extra == "ray"
Provides-Extra: dask
Requires-Dist: dask[complete]>=2.22.0; extra == "dask"
Requires-Dist: distributed>=2.22.0; extra == "dask"
Provides-Extra: mpi
Requires-Dist: mpi4py>=3.0.3; extra == "mpi"
Requires-Dist: msgpack>=1.0.0; extra == "mpi"
Provides-Extra: all
License-File: LICENSE
License-File: AUTHORS
Requires-Dist: ray[default]>=1.13.0; extra == "all"
Requires-Dist: pydantic<2; extra == "all"
Requires-Dist: dask[complete]>=2.22.0; extra == "all"
Requires-Dist: distributed>=2.22.0; extra == "all"
Requires-Dist: mpi4py>=3.0.3; extra == "all"
Requires-Dist: msgpack>=1.0.0; extra == "all"

@@ -23,5 +37,6 @@ <p align="center">

<p align="center">
<a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/workflows/master/badge.svg" align="center"></a>
<a href="https://unidist.readthedocs.io/en/0.4.1/?badge=0.4.1"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=0.4.1" align="center"></a>
<a href="https://pypi.org/project/unidist/0.4.1/"><img src="https://img.shields.io/badge/pypi-0.4.1-blue.svg" alt="PyPI version" align="center"></a>
<a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/actions/workflows/ci.yml/badge.svg" align="center"></a>
<a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a>
<a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a>
<a href="https://pepy.tech/project/unidist"><img src="https://static.pepy.tech/personalized-badge/unidist?period=total&units=international_system&left_color=black&right_color=blue&left_text=Downloads" align="center"></a>
</p>

@@ -65,6 +80,6 @@

**Note:** There are different MPI implementations, each of which can be used as a backend in unidist.
By default, mapping `unidist[mpi]` installs MPICH on Linux and MacOS and MSMPI on Windows. If you want to use
a specific version of MPI, you can install the core dependencies of unidist as `pip install unidist` and then
install the specific version of MPI using pip as shown in the [installation](https://mpi4py.readthedocs.io/en/latest/install.html)
section of mpi4py documentation.
Mapping `unidist[mpi]` installs `mpi4py` package, which is just a Python wrapper for MPI.
To enable unidist on MPI execution you need to have a working MPI implementation and certain software installed beforehand.
Refer to [Installation](https://mpi4py.readthedocs.io/en/latest/install.html) page of the `mpi4py` documentation for details.
Also, you can find some instructions on [MPI backend](https://unidist.readthedocs.io/en/latest/optimization_notes/mpi.html) page.

@@ -93,6 +108,7 @@ #### Using conda

**Note:** There are different MPI implementations, each of which can be used as a backend in unidist.
By default, mapping `unidist-mpi` installs MPICH on Linux and MacOS and MSMPI on Windows. If you want to use
a specific version of MPI, you can install the core dependencies of unidist as `conda install unidist` and then
install the specific version of MPI using conda as shown in the [installation](https://mpi4py.readthedocs.io/en/latest/install.html)
section of mpi4py documentation. That said, it is highly encouraged to use your own MPI binaries as stated in the
By default, mapping `unidist-mpi` installs a default MPI implementation, which comes with `mpi4py` package and is ready to use.
The conda dependency solver decides on which MPI implementation is to be installed. If you want to use a specific version of MPI,
you can install the core dependencies for MPI backend and the specific version of MPI as `conda install unidist-mpi <mpi>`
as shown in the [Installation](https://mpi4py.readthedocs.io/en/latest/install.html)
page of `mpi4py` documentation. That said, it is highly encouraged to use your own MPI binaries as stated in the
[Using External MPI Libraries](https://conda-forge.org/docs/user/tipsandtricks.html#using-external-message-passing-interface-mpi-libraries)

@@ -99,0 +115,0 @@ section of the conda-forge documentation in order to get ultimate performance.

+13
-11

@@ -7,5 +7,6 @@ <p align="center">

<p align="center">
<a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/workflows/master/badge.svg" align="center"></a>
<a href="https://unidist.readthedocs.io/en/0.4.1/?badge=0.4.1"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=0.4.1" align="center"></a>
<a href="https://pypi.org/project/unidist/0.4.1/"><img src="https://img.shields.io/badge/pypi-0.4.1-blue.svg" alt="PyPI version" align="center"></a>
<a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/actions/workflows/ci.yml/badge.svg" align="center"></a>
<a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a>
<a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a>
<a href="https://pepy.tech/project/unidist"><img src="https://static.pepy.tech/personalized-badge/unidist?period=total&units=international_system&left_color=black&right_color=blue&left_text=Downloads" align="center"></a>
</p>

@@ -49,6 +50,6 @@

**Note:** There are different MPI implementations, each of which can be used as a backend in unidist.
By default, mapping `unidist[mpi]` installs MPICH on Linux and MacOS and MSMPI on Windows. If you want to use
a specific version of MPI, you can install the core dependencies of unidist as `pip install unidist` and then
install the specific version of MPI using pip as shown in the [installation](https://mpi4py.readthedocs.io/en/latest/install.html)
section of mpi4py documentation.
Mapping `unidist[mpi]` installs `mpi4py` package, which is just a Python wrapper for MPI.
To enable unidist on MPI execution you need to have a working MPI implementation and certain software installed beforehand.
Refer to [Installation](https://mpi4py.readthedocs.io/en/latest/install.html) page of the `mpi4py` documentation for details.
Also, you can find some instructions on [MPI backend](https://unidist.readthedocs.io/en/latest/optimization_notes/mpi.html) page.

@@ -77,6 +78,7 @@ #### Using conda

**Note:** There are different MPI implementations, each of which can be used as a backend in unidist.
By default, mapping `unidist-mpi` installs MPICH on Linux and MacOS and MSMPI on Windows. If you want to use
a specific version of MPI, you can install the core dependencies of unidist as `conda install unidist` and then
install the specific version of MPI using conda as shown in the [installation](https://mpi4py.readthedocs.io/en/latest/install.html)
section of mpi4py documentation. That said, it is highly encouraged to use your own MPI binaries as stated in the
By default, mapping `unidist-mpi` installs a default MPI implementation, which comes with `mpi4py` package and is ready to use.
The conda dependency solver decides on which MPI implementation is to be installed. If you want to use a specific version of MPI,
you can install the core dependencies for MPI backend and the specific version of MPI as `conda install unidist-mpi <mpi>`
as shown in the [Installation](https://mpi4py.readthedocs.io/en/latest/install.html)
page of `mpi4py` documentation. That said, it is highly encouraged to use your own MPI binaries as stated in the
[Using External MPI Libraries](https://conda-forge.org/docs/user/tipsandtricks.html#using-external-message-passing-interface-mpi-libraries)

@@ -83,0 +85,0 @@ section of the conda-forge documentation in order to get ultimate performance.

import pathlib
from setuptools import setup, find_packages
from setuptools import setup, find_packages, Extension
from setuptools.dist import Distribution
from Cython.Build import cythonize
import sys
import versioneer
try:
from wheel.bdist_wheel import bdist_wheel
HAS_WHEEL = True
except ImportError:
HAS_WHEEL = False
if HAS_WHEEL:
class UnidistWheel(bdist_wheel):
def finalize_options(self):
bdist_wheel.finalize_options(self)
self.root_is_pure = False
def get_tag(self):
_, _, plat = bdist_wheel.get_tag(self)
py = "py3"
abi = "none"
return py, abi, plat
class UnidistDistribution(Distribution):
def __init__(self, *attrs):
Distribution.__init__(self, *attrs)
if HAS_WHEEL:
self.cmdclass["bdist_wheel"] = UnidistWheel
def is_pure(self):
return False
# https://github.com/modin-project/unidist/issues/324
ray_deps = ["ray[default]>=1.13.0", "pydantic<2"]
dask_deps = ["dask[complete]>=2.22.0", "distributed>=2.22.0"]
mpi_deps = ["mpi4py-mpich", "msgpack>=1.0.0"]
mpi_deps = ["mpi4py>=3.0.3", "msgpack>=1.0.0"]
if sys.version_info[1] < 8:

@@ -19,2 +53,8 @@ mpi_deps += "pickle5"

_memory = Extension(
"unidist.core.backends.mpi.core._memory",
["unidist/core/backends/mpi/core/memory/_memory.pyx"],
language="c++",
)
setup(

@@ -24,2 +64,3 @@ name="unidist",

cmdclass=versioneer.get_cmdclass(),
distclass=UnidistDistribution,
description="Unified Distributed Execution",

@@ -40,2 +81,3 @@ long_description=long_description,

python_requires=">=3.7.1",
ext_modules=cythonize([_memory]),
)
Metadata-Version: 2.1
Name: unidist
Version: 0.4.1
Version: 0.5.0
Summary: Unified Distributed Execution

@@ -9,8 +9,22 @@ Home-page: https://github.com/modin-project/unidist

Description-Content-Type: text/markdown
License-File: LICENSE
License-File: AUTHORS
Requires-Dist: packaging
Requires-Dist: cloudpickle
Provides-Extra: ray
Requires-Dist: ray[default]>=1.13.0; extra == "ray"
Requires-Dist: pydantic<2; extra == "ray"
Provides-Extra: dask
Requires-Dist: dask[complete]>=2.22.0; extra == "dask"
Requires-Dist: distributed>=2.22.0; extra == "dask"
Provides-Extra: mpi
Requires-Dist: mpi4py>=3.0.3; extra == "mpi"
Requires-Dist: msgpack>=1.0.0; extra == "mpi"
Provides-Extra: all
License-File: LICENSE
License-File: AUTHORS
Requires-Dist: ray[default]>=1.13.0; extra == "all"
Requires-Dist: pydantic<2; extra == "all"
Requires-Dist: dask[complete]>=2.22.0; extra == "all"
Requires-Dist: distributed>=2.22.0; extra == "all"
Requires-Dist: mpi4py>=3.0.3; extra == "all"
Requires-Dist: msgpack>=1.0.0; extra == "all"

@@ -23,5 +37,6 @@ <p align="center">

<p align="center">
<a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/workflows/master/badge.svg" align="center"></a>
<a href="https://unidist.readthedocs.io/en/0.4.1/?badge=0.4.1"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=0.4.1" align="center"></a>
<a href="https://pypi.org/project/unidist/0.4.1/"><img src="https://img.shields.io/badge/pypi-0.4.1-blue.svg" alt="PyPI version" align="center"></a>
<a href="https://github.com/modin-project/unidist/actions"><img src="https://github.com/modin-project/unidist/actions/workflows/ci.yml/badge.svg" align="center"></a>
<a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a>
<a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a>
<a href="https://pepy.tech/project/unidist"><img src="https://static.pepy.tech/personalized-badge/unidist?period=total&units=international_system&left_color=black&right_color=blue&left_text=Downloads" align="center"></a>
</p>

@@ -65,6 +80,6 @@

**Note:** There are different MPI implementations, each of which can be used as a backend in unidist.
By default, mapping `unidist[mpi]` installs MPICH on Linux and MacOS and MSMPI on Windows. If you want to use
a specific version of MPI, you can install the core dependencies of unidist as `pip install unidist` and then
install the specific version of MPI using pip as shown in the [installation](https://mpi4py.readthedocs.io/en/latest/install.html)
section of mpi4py documentation.
Mapping `unidist[mpi]` installs `mpi4py` package, which is just a Python wrapper for MPI.
To enable unidist on MPI execution you need to have a working MPI implementation and certain software installed beforehand.
Refer to [Installation](https://mpi4py.readthedocs.io/en/latest/install.html) page of the `mpi4py` documentation for details.
Also, you can find some instructions on [MPI backend](https://unidist.readthedocs.io/en/latest/optimization_notes/mpi.html) page.

@@ -93,6 +108,7 @@ #### Using conda

**Note:** There are different MPI implementations, each of which can be used as a backend in unidist.
By default, mapping `unidist-mpi` installs MPICH on Linux and MacOS and MSMPI on Windows. If you want to use
a specific version of MPI, you can install the core dependencies of unidist as `conda install unidist` and then
install the specific version of MPI using conda as shown in the [installation](https://mpi4py.readthedocs.io/en/latest/install.html)
section of mpi4py documentation. That said, it is highly encouraged to use your own MPI binaries as stated in the
By default, mapping `unidist-mpi` installs a default MPI implementation, which comes with `mpi4py` package and is ready to use.
The conda dependency solver decides on which MPI implementation is to be installed. If you want to use a specific version of MPI,
you can install the core dependencies for MPI backend and the specific version of MPI as `conda install unidist-mpi <mpi>`
as shown in the [Installation](https://mpi4py.readthedocs.io/en/latest/install.html)
page of `mpi4py` documentation. That said, it is highly encouraged to use your own MPI binaries as stated in the
[Using External MPI Libraries](https://conda-forge.org/docs/user/tipsandtricks.html#using-external-message-passing-interface-mpi-libraries)

@@ -99,0 +115,0 @@ section of the conda-forge documentation in order to get ultimate performance.

@@ -9,3 +9,3 @@ packaging

distributed>=2.22.0
mpi4py-mpich
mpi4py>=3.0.3
msgpack>=1.0.0

@@ -18,3 +18,3 @@

[mpi]
mpi4py-mpich
mpi4py>=3.0.3
msgpack>=1.0.0

@@ -21,0 +21,0 @@

@@ -46,4 +46,5 @@ AUTHORS

unidist/core/backends/mpi/core/communication.py
unidist/core/backends/mpi/core/monitor.py
unidist/core/backends/mpi/core/local_object_store.py
unidist/core/backends/mpi/core/serialization.py
unidist/core/backends/mpi/core/shared_object_store.py
unidist/core/backends/mpi/core/controller/__init__.py

@@ -54,6 +55,10 @@ unidist/core/backends/mpi/core/controller/actor.py

unidist/core/backends/mpi/core/controller/garbage_collector.py
unidist/core/backends/mpi/core/controller/object_store.py
unidist/core/backends/mpi/core/memory/_memory.cpp
unidist/core/backends/mpi/core/memory/memory.cpp
unidist/core/backends/mpi/core/memory/memory.h
unidist/core/backends/mpi/core/monitor/__init__.py
unidist/core/backends/mpi/core/monitor/loop.py
unidist/core/backends/mpi/core/monitor/shared_memory_manager.py
unidist/core/backends/mpi/core/worker/__init__.py
unidist/core/backends/mpi/core/worker/loop.py
unidist/core/backends/mpi/core/worker/object_store.py
unidist/core/backends/mpi/core/worker/request_store.py

@@ -60,0 +65,0 @@ unidist/core/backends/mpi/core/worker/task_store.py

@@ -11,7 +11,7 @@

{
"date": "2023-07-14T09:41:01+0200",
"date": "2023-11-14T20:43:02+0100",
"dirty": false,
"error": null,
"full-revisionid": "5406937f574fa2abcc78478c55eb9810739934cf",
"version": "0.4.1"
"full-revisionid": "1ec811a67984ad70d035c48965f879ae82029a48",
"version": "0.5.0"
}

@@ -18,0 +18,0 @@ ''' # END VERSION_JSON

@@ -17,3 +17,3 @@ # Copyright (C) 2021-2023 Modin authors

from .backends.mpi import (
IsMpiSpawnWorkers,
MpiSpawn,
MpiHosts,

@@ -23,2 +23,7 @@ MpiPickleThreshold,

MpiLog,
MpiSharedObjectStore,
MpiSharedObjectStoreMemory,
MpiSharedServiceMemory,
MpiSharedObjectStoreThreshold,
MpiRuntimeEnv,
)

@@ -38,3 +43,3 @@ from .parameter import ValueSource

"DaskSchedulerAddress",
"IsMpiSpawnWorkers",
"MpiSpawn",
"MpiHosts",

@@ -45,2 +50,7 @@ "ValueSource",

"MpiLog",
"MpiSharedObjectStore",
"MpiSharedObjectStoreMemory",
"MpiSharedServiceMemory",
"MpiSharedObjectStoreThreshold",
"MpiRuntimeEnv",
]

@@ -7,6 +7,17 @@ # Copyright (C) 2021-2023 Modin authors

from .envvars import IsMpiSpawnWorkers, MpiHosts, MpiPickleThreshold, MpiBackoff, MpiLog
from .envvars import (
MpiSpawn,
MpiHosts,
MpiPickleThreshold,
MpiBackoff,
MpiLog,
MpiSharedObjectStore,
MpiSharedObjectStoreMemory,
MpiSharedServiceMemory,
MpiSharedObjectStoreThreshold,
MpiRuntimeEnv,
)
__all__ = [
"IsMpiSpawnWorkers",
"MpiSpawn",
"MpiHosts",

@@ -16,2 +27,7 @@ "MpiPickleThreshold",

"MpiLog",
"MpiSharedObjectStore",
"MpiSharedObjectStoreMemory",
"MpiSharedServiceMemory",
"MpiSharedObjectStoreThreshold",
"MpiRuntimeEnv",
]

@@ -10,12 +10,18 @@ # Copyright (C) 2021-2023 Modin authors

class IsMpiSpawnWorkers(EnvironmentVariable, type=bool):
class MpiSpawn(EnvironmentVariable, type=bool):
"""Whether to enable MPI spawn or not."""
default = True
varname = "UNIDIST_IS_MPI_SPAWN_WORKERS"
varname = "UNIDIST_MPI_SPAWN"
class MpiHosts(EnvironmentVariable, type=ExactStr):
"""MPI hosts to run unidist on."""
"""
MPI hosts to run unidist on.
Notes
-----
This variable is only used if a program is run in Controller/Worker model.
"""
varname = "UNIDIST_MPI_HOSTS"

@@ -25,8 +31,37 @@

class MpiPickleThreshold(EnvironmentVariable, type=int):
"""Minimum buffer size for serialization with pickle 5 protocol."""
"""
Minimum buffer size for serialization with pickle 5 protocol.
Notes
-----
If the shared object store is enabled, ``MpiSharedObjectStoreThreshold`` takes
precedence on this configuration value and the threshold gets overridden.
It is done intentionally to prevent multiple copies when putting an object
into the local object store or into the shared object store.
Data copy happens once when doing in-band serialization in depend on the threshold.
In some cases output of a remote task can take up the memory of the task arguments.
If those arguments are placed in the shared object store, this location should not be overwritten
while output is being used, otherwise the output value may be corrupted.
"""
default = 1024**2 // 4 # 0.25 MiB
varname = "UNIDIST_MPI_PICKLE_THRESHOLD"
@classmethod
def get(cls) -> int:
"""
Get minimum buffer size for serialization with pickle 5 protocol.
Returns
-------
int
"""
if MpiSharedObjectStore.get():
mpi_pickle_threshold = MpiSharedObjectStoreThreshold.get()
cls.put_value_source(MpiSharedObjectStoreThreshold.get_value_source())
else:
mpi_pickle_threshold = super().get()
return mpi_pickle_threshold
class MpiBackoff(EnvironmentVariable, type=float):

@@ -52,1 +87,71 @@ """

varname = "UNIDIST_MPI_LOG"
class MpiSharedObjectStore(EnvironmentVariable, type=bool):
"""Whether to enable shared object store or not."""
default = False
varname = "UNIDIST_MPI_SHARED_OBJECT_STORE"
class MpiSharedObjectStoreMemory(EnvironmentVariable, type=int):
"""How many bytes of memory to start the shared object store with."""
varname = "UNIDIST_MPI_SHARED_OBJECT_STORE_MEMORY"
class MpiSharedServiceMemory(EnvironmentVariable, type=int):
"""How many bytes of memory to start the shared service memory with."""
varname = "UNIDIST_MPI_SHARED_SERVICE_MEMORY"
class MpiSharedObjectStoreThreshold(EnvironmentVariable, type=int):
"""Minimum size of data to put into the shared object store."""
default = 10**5 # 100 KB
varname = "UNIDIST_MPI_SHARED_OBJECT_STORE_THRESHOLD"
class MpiRuntimeEnv:
"""
Runtime environment for MPI worker processes.
Notes
-----
This config doesn't have a respective environment variable as
it is much more convenient to set a config value using the config API
but not through the environment variable.
"""
# Possible options for a runtime environment to set
env_vars = "env_vars"
# Config value
_value = {}
@classmethod
def put(cls, value):
"""
Set config value.
Parameters
----------
value : dict
Config value to set.
"""
if any([True for option in value if option != cls.env_vars]):
raise NotImplementedError(
"Any option other than environment variables is not supported yet."
)
cls._value = value
@classmethod
def get(cls):
"""
Get config value.
Returns
-------
dict
"""
return cls._value

@@ -180,2 +180,14 @@ # Copyright (C) 2021-2023 Modin authors

@classmethod
def put_value_source(cls, value):
"""
Put value source of the config.
Parameters
----------
value : ValueSource
Value source to put.
"""
cls._value_source = value
@classmethod
def get(cls):

@@ -182,0 +194,0 @@ """

@@ -9,7 +9,23 @@ # Copyright (C) 2021-2023 Modin authors

import inspect
import warnings
import weakref
from unidist.config.backends.mpi.envvars import MpiSpawn
from unidist.core.backends.mpi.utils import ImmutableDict
try:
import mpi4py
except ImportError:
raise ImportError(
"Missing dependency 'mpi4py'. Use pip or conda to install it."
) from None
from unidist.core.backends.common.data_id import DataID, is_data_id
from unidist.config import MpiLog
from unidist.config import MpiLog, MpiSharedObjectStore
# TODO: Find a way to move this after all imports
mpi4py.rc(recv_mprobe=False, initialize=False)
from mpi4py import MPI # noqa: E402
class Operation:

@@ -29,20 +45,26 @@ """

Save the data location to a local storage.
WAIT : int, default 5
PUT_SHARED_DATA : int, default 5
Save the data into shared memory.
WAIT : int, default 6
Return readiness signal of a local data to a requester.
ACTOR_CREATE : int, default 6
ACTOR_CREATE : int, default 7
Create local actor instance.
ACTOR_EXECUTE : int, default 7
ACTOR_EXECUTE : int, default 8
Execute method of a local actor instance.
CLEANUP : int, default 8
CLEANUP : int, default 9
Cleanup local object storage for out-of-scope IDs.
TASK_DONE : int, default 9
TASK_DONE : int, default 10
Increment global task counter.
GET_TASK_COUNT : int, default 10
GET_TASK_COUNT : int, default 11
Return global task counter to a requester.
CANCEL : int, default 11
RESERVE_SHARED_MEMORY : int, default 12
Reserve area in shared memory for the data.
REQUEST_SHARED_DATA : int, default 13
Return the area in shared memory with the requested data.
CANCEL : int, default 14
Send a message to a worker to exit the event loop.
READY_TO_SHUTDOWN : int, default 12
READY_TO_SHUTDOWN : int, default 15
Send a message to monitor from a worker,
which is ready to shutdown.
SHUTDOWN : int, default 13
SHUTDOWN : int, default 16
Send a message from monitor to a worker to shutdown.

@@ -56,13 +78,16 @@ """

PUT_OWNER = 4
WAIT = 5
ACTOR_CREATE = 6
ACTOR_EXECUTE = 7
CLEANUP = 8
PUT_SHARED_DATA = 5
WAIT = 6
ACTOR_CREATE = 7
ACTOR_EXECUTE = 8
CLEANUP = 9
### --- Monitor operations --- ###
TASK_DONE = 9
GET_TASK_COUNT = 10
TASK_DONE = 10
GET_TASK_COUNT = 11
RESERVE_SHARED_MEMORY = 12
REQUEST_SHARED_DATA = 13
### --- Common operations --- ###
CANCEL = 11
READY_TO_SHUTDOWN = 12
SHUTDOWN = 13
CANCEL = 14
READY_TO_SHUTDOWN = 15
SHUTDOWN = 16

@@ -89,2 +114,119 @@

class MetadataPackage(ImmutableDict):
"""
The class defines metadata packages for a communication.
Attributes
----------
LOCAL_DATA : int, default: 0
Package type indicating that the data will be sent from the local object store.
SHARED_DATA : int, default: 1
Package type indicating that the data will be sent from the shared object store.
TASK_DATA : int, default: 2
Package type indicating a task or an actor (actor method) to be sent.
"""
LOCAL_DATA = 0
SHARED_DATA = 1
TASK_DATA = 2
@classmethod
def get_local_info(cls, data_id, s_data_len, raw_buffers_len, buffer_count):
"""
Get information package for sending local data.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
s_data_len : int
Main buffer length.
raw_buffers_len : list
A list of ``PickleBuffer`` lengths.
buffer_count : list
List of the number of buffers for each object
to be serialized/deserialized using the pickle 5 protocol.
Returns
-------
dict
The information package.
"""
return MetadataPackage(
{
"package_type": MetadataPackage.LOCAL_DATA,
"id": data_id,
"s_data_len": s_data_len,
"raw_buffers_len": raw_buffers_len,
"buffer_count": buffer_count,
}
)
@classmethod
def get_shared_info(
cls, data_id, s_data_len, raw_buffers_len, buffer_count, service_index
):
"""
Get information package for sending data using shared memory.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
s_data_len : int
Main buffer length.
raw_buffers_len : list
A list of ``PickleBuffer`` lengths.
buffer_count : list
List of the number of buffers for each object
to be serialized/deserialized using the pickle 5 protocol.
service_index : int
Service index in shared memory.
Returns
-------
dict
The information package.
"""
return MetadataPackage(
{
"package_type": MetadataPackage.SHARED_DATA,
"id": weakref.proxy(data_id),
"s_data_len": s_data_len,
"raw_buffers_len": tuple(raw_buffers_len),
"buffer_count": tuple(buffer_count),
"service_index": service_index,
}
)
@classmethod
def get_task_info(cls, s_data_len, raw_buffers_len, buffer_count):
"""
Get information package for sending a task or an actor (actor method).
Parameters
----------
s_data_len : int
Main buffer length.
raw_buffers_len : list
A list of ``PickleBuffer`` lengths.
buffer_count : list
List of the number of buffers for each object
to be serialized/deserialized using the pickle 5 protocol.
Returns
-------
dict
The information package.
"""
return MetadataPackage(
{
"package_type": MetadataPackage.TASK_DATA,
"s_data_len": s_data_len,
"raw_buffers_len": raw_buffers_len,
"buffer_count": buffer_count,
}
)
default_class_properties = dir(type("dummy", (object,), {}))

@@ -124,45 +266,73 @@ # Mapping between operations and their names (e.g., Operation.EXECUTE: "EXECUTE")

class MasterDataID(DataID):
class MpiDataID(DataID):
"""
Class for tracking data IDs of the main process.
Class for tracking data IDs of MPI processes.
Class extends ``unidist.core.backends.common.data_id.DataID`` functionality with a garbage collection.
The class extends ``unidist.core.backends.common.data_id.DataID`` functionality,
ensuring the uniqueness of the object to correctly construct/reconstruct it.
Otherwise, we would get into wrong garbage collection.
Parameters
----------
id_value : int
An integer value, generated by executor process.
garbage_collector : unidist.core.backends.mpi.core.executor.GarbageCollector
A reference to the garbage collector instance.
owner_rank : int
The rank of the process that owns the data.
data_number : int
Unique data number for the owner process.
gc : unidist.core.backends.mpi.core.executor.GarbageCollector or None
Local garbage collector reference.
The actual object is for the data id owner, otherwise, ``None``.
"""
def __init__(self, id_value, garbage_collector):
super().__init__(id_value)
self._gc = garbage_collector if garbage_collector else None
_instances = weakref.WeakValueDictionary()
def __del__(self):
"""Track object deletion by garbage collector."""
# We check for existence of `_qc` attribute because
# it might be deleted during serialization via `__getstate__`
if hasattr(self, "_gc") and self._gc is not None:
self._gc.collect(self.base_data_id())
def __new__(cls, owner_rank, data_number, gc=None):
key = (owner_rank, data_number)
if key in cls._instances:
return cls._instances[key]
else:
new_instance = super().__new__(cls)
cls._instances[key] = new_instance
return new_instance
def __getstate__(self):
"""Remove a reference to garbage collector for correct `pickle` serialization."""
attributes = self.__dict__.copy()
del attributes["_gc"]
return attributes
def __init__(self, owner_rank, data_number, gc=None):
super().__init__(f"rank_{owner_rank}_id_{data_number}")
self.owner_rank = owner_rank
self.data_number = data_number
self._gc = gc
def base_data_id(self):
def __getnewargs__(self):
"""
Return the base class instance without garbage collector reference.
Prepare arguments to reconstruct the object upon unpickling.
Returns
-------
unidist.core.backends.common.data_id.DataID
Base ``DataID`` class object without garbage collector reference.
tuple
Tuple of the owner rank and data number to be passed into `__new__`.
"""
return DataID(self._id)
return (self.owner_rank, self.data_number)
def __getstate__(self):
"""
Remove a reference to garbage collector for correct `pickle` serialization.
Returns
-------
dict
State of the object without garbage collector.
"""
state = self.__dict__.copy()
# we remove this attribute for correct serialization,
# as well as to reduce the length of the serialized data
if hasattr(self, "_gc"):
del state["_gc"]
return state
def __del__(self):
"""Track object deletion by garbage collector."""
# check for existence of `._gc` attribute as
# it is missing upon unpickling
if hasattr(self, "_gc") and self._gc is not None:
self._gc.collect((self.owner_rank, self.data_number))
def get_logger(logger_name, file_name, activate=None):

@@ -226,60 +396,2 @@ """

def master_data_ids_to_base(o_ids):
"""
Transform all data ID objects of the main process to its base class instances.
Cast ``unidist.core.backends.mpi.core.common.MasterDataID`` to it's base ``unidist.backend.common.data_id.DataID`` class
to remove a reference to garbage collector.
Parameters
----------
o_ids : iterable
Sequence of ``unidist.core.backends.mpi.core.common.MasterDataID`` objects.
Returns
-------
list
Transformed list.
"""
if o_ids is None:
return None
elif is_data_id(o_ids):
return o_ids.base_data_id()
id_list = [o_id.base_data_id() for o_id in o_ids]
return id_list
def unwrap_data_ids(data_ids):
"""
Find all data ID instances of the main process and prepare for communication with worker process.
Call `base_data_id` on each instance.
Parameters
----------
data_ids : iterable
Iterable objects to transform recursively.
Returns
-------
iterable
Transformed iterable object (task arguments).
"""
if type(data_ids) in (list, tuple, dict):
container = type(data_ids)()
for value in data_ids:
unwrapped_value = unwrap_data_ids(
data_ids[value] if isinstance(data_ids, dict) else value
)
if isinstance(container, list):
container += [unwrapped_value]
elif isinstance(container, tuple):
container += (unwrapped_value,)
elif isinstance(container, dict):
container.update({value: unwrapped_value})
return container
else:
return data_ids.base_data_id() if is_data_id(data_ids) else data_ids
def materialize_data_ids(data_ids, unwrap_data_id_impl, is_pending=False):

@@ -289,3 +401,3 @@ """

Find all ``unidist.core.backends.common.data_id.DataID`` instances and call `unwrap_data_id_impl` on them.
Find all ``unidist.core.backends.mpi.core.common.MpiDataID`` instances and call `unwrap_data_id_impl` on them.

@@ -332,1 +444,72 @@ Parameters

return unwrapped, is_pending
def check_mpich_version(target_version):
"""
Check if the using MPICH version is equal to or greater than the target version.
Parameters
----------
target_version : str
Required version of the MPICH library.
Returns
-------
bool
True ot false.
"""
def versiontuple(v):
return tuple(map(int, (v.split("."))))
mpich_version = [
raw for raw in MPI.Get_library_version().split("\n") if "MPICH Version:" in raw
][0].split(" ")[-1]
return versiontuple(mpich_version) >= versiontuple(target_version)
def is_shared_memory_supported(raise_warning=False):
"""
Check if the unidist on MPI supports shared memory.
Parameters
----------
raise_warning: bool, default: False
Whether to raise a warning or not.
``True`` is passed only for root process
to have the only warning.
Returns
-------
bool
True or False.
Notes
-----
Prior to the MPI 3.0 standard there is no support for shared memory.
"""
if not MpiSharedObjectStore.get():
return False
if MPI.VERSION < 3:
if raise_warning:
warnings.warn(
f"Shared object store for MPI backend is not supported for MPI version {MPI.VERSION} "
"since it doesn't support shared memory feature."
)
return False
# Mpich shared memory does not work with spawned processes prior to version 4.2.0.
if (
"MPICH" in MPI.Get_library_version()
and MpiSpawn.get()
and not check_mpich_version("4.2.0")
):
if raise_warning:
warnings.warn(
"Shared object store for MPI backend is not supported in C/W model for MPICH version less than 4.2.0. "
+ "Read more about this issue in the `troubleshooting` page of the unidist documentation."
)
return False
return True

@@ -10,2 +10,3 @@ # Copyright (C) 2021-2023 Modin authors

import time
import warnings

@@ -21,6 +22,8 @@ try:

from unidist.core.backends.mpi.core.serialization import (
ComplexDataSerializer,
SimpleDataSerializer,
serialize_complex_data,
deserialize_complex_data,
)
import unidist.core.backends.mpi.core.common as common
from unidist.config.backends.mpi.envvars import MpiSpawn, MpiHosts

@@ -51,3 +54,3 @@ # TODO: Find a way to move this after all imports

logger_op_name_len = 15
logger_worker_count = MPIState.get_instance().world_size
logger_worker_count = MPIState.get_instance().global_size

@@ -57,3 +60,3 @@ # write header on first worker

not is_logger_header_printed
and MPIState.get_instance().rank == MPIRank.FIRST_WORKER
and MPIState.get_instance().global_rank == MPIRank.FIRST_WORKER
):

@@ -66,3 +69,3 @@ worker_ids_str = "".join([f"{i}\t" for i in range(logger_worker_count)])

source_rank = status.Get_source()
dest_rank = MPIState.get_instance().rank
dest_rank = MPIState.get_instance().global_rank
op_name = common.get_op_name(op_type)

@@ -91,6 +94,24 @@ space_after_op_name = " " * (logger_op_name_len - len(op_name))

MPI communicator.
rank : int
Attributes
----------
global_comm : mpi4py.MPI.Comm
Global MPI communicator.
host_comm : mpi4py.MPI.Comm
MPI subcommunicator for the current host.
global_rank : int
Rank of a process.
world_sise : int
Number of processes.
global_size : int
Number of processes in the global communicator.
host : str
IP-address of the current host.
topology : dict
Dictionary, containing all ranks assignments by IP-addresses in
the form: `{"node_ip0": {"host_rank": "global_rank", ...}, ...}`.
host_by_rank : dict
Dictionary containing IP addresses by rank.
monitor_processes : list
List of ranks that are monitor processes.
workers : list
List of ranks that are worker processes.
"""

@@ -100,8 +121,68 @@

def __init__(self, comm, rank, world_sise):
def __init__(self, comm):
# attributes get actual values when MPI is initialized in `init` function
self.comm = comm
self.rank = rank
self.world_size = world_sise
self.global_comm = comm
self.global_rank = comm.Get_rank()
self.global_size = comm.Get_size()
self.host = socket.gethostbyname(socket.gethostname())
# `Split_type` does not work correctly in the MSMPI library in C/W model
# so we split the communicator in a different way
if "Microsoft MPI" in MPI.Get_library_version() and MpiSpawn.get():
all_hosts = self.global_comm.allgather(self.host)
self.host_comm = self.global_comm.Split(all_hosts.index(self.host))
else:
self.host_comm = self.global_comm.Split_type(MPI.COMM_TYPE_SHARED)
host_rank = self.host_comm.Get_rank()
# Get topology of MPI cluster.
cluster_info = self.global_comm.allgather(
(self.host, self.global_rank, host_rank)
)
self.topology = defaultdict(dict)
self.host_by_rank = defaultdict(None)
for host, global_rank, host_rank in cluster_info:
self.topology[host][host_rank] = global_rank
self.host_by_rank[global_rank] = host
mpi_hosts = MpiHosts.get()
if mpi_hosts is not None and MpiSpawn.get():
host_list = mpi_hosts.split(",")
host_count = len(host_list)
# check running hosts
if self.is_root_process() and len(self.topology.keys()) > host_count:
warnings.warn(
"The number of running hosts is greater than that specified in the UNIDIST_MPI_HOSTS. "
+ "If you want to run the program on a host other than the local one, specify the appropriate parameter for `mpiexec` "
+ "(`--host` for OpenMPI and `--hosts` for Intel MPI or MPICH)."
)
if self.is_root_process() and len(self.topology.keys()) < host_count:
warnings.warn(
"The number of running hosts is less than that specified in the UNIDIST_MPI_HOSTS. "
+ "Check the `mpiexec` option to distribute processes between hosts."
)
if common.is_shared_memory_supported():
self.monitor_processes = []
for host in self.topology:
if len(self.topology[host]) >= 2:
self.monitor_processes.append(self.topology[host][MPIRank.MONITOR])
elif self.is_root_process():
raise ValueError(
"When using shared object store, each host must contain at least 2 processes, "
"since one of them will be a service monitor."
)
else:
self.monitor_processes = [MPIRank.MONITOR]
self.workers = []
for host in self.topology:
self.workers.extend(
[
rank
for rank in self.topology[host].values()
if not self.is_root_process(rank)
and not self.is_monitor_process(rank)
]
)
@classmethod

@@ -126,37 +207,74 @@ def get_instance(cls, *args):

def is_root_process(self, rank=None):
"""
Check if the rank is root process.
class MPIRank:
"""Class that describes ranks assignment."""
Parameters
----------
rank : int, optional
The rank to be checked.
If ``None``, the current rank is to be checked.
ROOT = 0
MONITOR = 1
FIRST_WORKER = 2
Returns
-------
bool
True or False.
"""
if rank is None:
rank = self.global_rank
return rank == MPIRank.ROOT
def is_monitor_process(self, rank=None):
"""
Check if the rank is a monitor process.
def get_topology():
"""
Get topology of MPI cluster.
Parameters
----------
rank : int, optional
The rank to be checked.
If ``None``, the current rank is to be checked.
Returns
-------
dict
Dictionary, containing workers ranks assignments by IP-addresses in
the form: `{"node_ip0": [rank_2, rank_3, ...], "node_ip1": [rank_i, ...], ...}`.
"""
mpi_state = MPIState.get_instance()
comm = mpi_state.comm
rank = mpi_state.rank
Returns
-------
bool
True or False.
"""
if rank is None:
rank = self.global_rank
return rank in self.monitor_processes
hostname = socket.gethostname()
host = socket.gethostbyname(hostname)
cluster_info = comm.allgather((host, rank))
topology = defaultdict(list)
def get_monitor_by_worker_rank(self, rank=None):
"""
Get the monitor process rank for the host that includes this rank.
for host, rank in cluster_info:
if rank not in [MPIRank.ROOT, MPIRank.MONITOR]:
topology[host].append(rank)
Parameters
----------
rank : int, optional
The global rank to search for a monitor process.
return dict(topology)
Returns
-------
int
Rank of a monitor process.
"""
if not common.is_shared_memory_supported():
return MPIRank.MONITOR
if rank is None:
rank = self.global_rank
host = self.host_by_rank[rank]
if host is None:
raise ValueError("Unknown rank of workers")
return self.topology[host][MPIRank.MONITOR]
class MPIRank:
"""Class that describes rank assignment."""
ROOT = 0
MONITOR = 1
FIRST_WORKER = 2
# ---------------------------- #

@@ -183,4 +301,4 @@ # Main communication utilities #

* This blocking send is used when we have to wait for completion of the communication,
which is necessary for the pipeline to continue, or when the receiver is waiting for a result.
Otherwise, use non-blocking ``mpi_isend_operation``.
which is necessary for the pipeline to continue, or when the receiver is waiting for a result.
Otherwise, use non-blocking ``mpi_isend_operation``.
* The special tag is used for this communication, namely, ``common.MPITag.OPERATION``.

@@ -207,4 +325,4 @@ """

* This blocking send is used when we have to wait for completion of the communication,
which is necessary for the pipeline to continue, or when the receiver is waiting for a result.
Otherwise, use non-blocking ``mpi_isend_object``.
which is necessary for the pipeline to continue, or when the receiver is waiting for a result.
Otherwise, use non-blocking ``mpi_isend_object``.
* The special tag is used for this communication, namely, ``common.MPITag.OBJECT``.

@@ -260,3 +378,3 @@ """

-----
* The special tag is used for this communication, namely, ``common.MPITag.OBJECT``.
The special tag is used for this communication, namely, ``common.MPITag.OBJECT``.
"""

@@ -325,3 +443,3 @@ return comm.isend(data, dest=dest_rank, tag=common.MPITag.OBJECT)

def mpi_send_buffer(comm, buffer_size, buffer, dest_rank):
def mpi_send_buffer(comm, buffer, dest_rank, data_type=MPI.CHAR, buffer_size=None):
"""

@@ -334,4 +452,2 @@ Send buffer object to another MPI rank in a blocking way.

MPI communicator object.
buffer_size : int
Buffer size in bytes.
buffer : object

@@ -341,2 +457,6 @@ Buffer object to send.

Target MPI process to transfer buffer.
data_type : MPI.Datatype, default: MPI.CHAR
MPI data type for sending data.
buffer_size: int, default: None
Buffer size in bytes. Send an additional message with a buffer size to prepare another process to receive if `buffer_size` is not None.

@@ -346,9 +466,24 @@ Notes

* This blocking send is used when we have to wait for completion of the communication,
which is necessary for the pipeline to continue, or when the receiver is waiting for a result.
Otherwise, use non-blocking ``mpi_isend_buffer``.
which is necessary for the pipeline to continue, or when the receiver is waiting for a result.
Otherwise, use non-blocking ``mpi_isend_buffer``.
* The special tags are used for this communication, namely,
``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``.
``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``.
"""
comm.send(buffer_size, dest=dest_rank, tag=common.MPITag.OBJECT)
comm.Send([buffer, MPI.CHAR], dest=dest_rank, tag=common.MPITag.BUFFER)
if buffer_size:
comm.send(buffer_size, dest=dest_rank, tag=common.MPITag.OBJECT)
else:
buffer_size = len(buffer)
# Maximum block size MPI is able to send/recv
block_size = pkl5._bigmpi.blocksize
partitions = list(range(0, buffer_size, block_size))
partitions.append(buffer_size)
num_partitions = len(partitions)
with pkl5._bigmpi as bigmpi:
for i in range(num_partitions):
if i + 1 < num_partitions:
comm.Send(
bigmpi(buffer[partitions[i] : partitions[i + 1]]),
dest=dest_rank,
tag=common.MPITag.BUFFER,
)

@@ -378,3 +513,3 @@

-----
* The special tags are used for this communication, namely,
The special tags are used for this communication, namely,
``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``.

@@ -385,8 +520,21 @@ """

requests.append((h1, None))
h2 = comm.Isend([buffer, MPI.CHAR], dest=dest_rank, tag=common.MPITag.BUFFER)
requests.append((h2, buffer))
# Maximum block size MPI is able to send/recv
block_size = pkl5._bigmpi.blocksize
partitions = list(range(0, buffer_size, block_size))
partitions.append(buffer_size)
num_partitions = len(partitions)
with pkl5._bigmpi as bigmpi:
for i in range(num_partitions):
if i + 1 < num_partitions:
h2 = comm.Isend(
bigmpi(buffer[partitions[i] : partitions[i + 1]]),
dest=dest_rank,
tag=common.MPITag.BUFFER,
)
requests.append((h2, buffer))
return requests
def mpi_recv_buffer(comm, source_rank):
def mpi_recv_buffer(comm, source_rank, result_buffer=None):
"""

@@ -401,2 +549,4 @@ Receive data buffer.

Communication event source rank.
result_buffer : object, default: None
The array to be filled. If `result_buffer` is None, the buffer size will be requested and the necessary buffer created.

@@ -410,11 +560,27 @@ Returns

-----
* The special tags are used for this communication, namely,
The special tags are used for this communication, namely,
``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``.
"""
buf_size = comm.recv(source=source_rank, tag=common.MPITag.OBJECT)
s_buffer = bytearray(buf_size)
comm.Recv([s_buffer, MPI.CHAR], source=source_rank, tag=common.MPITag.BUFFER)
return s_buffer
if result_buffer is None:
buf_size = comm.recv(source=source_rank, tag=common.MPITag.OBJECT)
result_buffer = bytearray(buf_size)
else:
buf_size = len(result_buffer)
# Maximum block size MPI is able to send/recv
block_size = pkl5._bigmpi.blocksize
partitions = list(range(0, buf_size, block_size))
partitions.append(buf_size)
num_partitions = len(partitions)
with pkl5._bigmpi as bigmpi:
for i in range(num_partitions):
if i + 1 < num_partitions:
tmp_buffer = bytearray(partitions[i + 1] - partitions[i])
comm.Recv(
bigmpi(tmp_buffer), source=source_rank, tag=common.MPITag.BUFFER
)
result_buffer[partitions[i] : partitions[i + 1]] = tmp_buffer
return result_buffer
def mpi_busy_wait_recv(comm, source_rank):

@@ -455,3 +621,3 @@ """

def _send_complex_data_impl(comm, s_data, raw_buffers, buffer_count, dest_rank):
def _send_complex_data_impl(comm, s_data, raw_buffers, dest_rank, info_package):
"""

@@ -468,8 +634,6 @@ Send already serialized complex data.

Pickle buffers list, out-of-band data collected with pickle 5 protocol.
buffer_count : list
List of the number of buffers for each object
to be serialized/deserialized using the pickle 5 protocol.
See details in :py:class:`~unidist.core.backends.mpi.core.serialization.ComplexDataSerializer`.
dest_rank : int
Target MPI process to transfer data.
info_package : unidist.core.backends.mpi.core.common.MetadataPackage
Required information to deserialize data on a receiver side.

@@ -481,9 +645,4 @@ Notes

"""
info = {
"s_data_len": len(s_data),
"buffer_count": buffer_count,
"raw_buffers_len": [len(sbuf) for sbuf in raw_buffers],
}
comm.send(info, dest=dest_rank, tag=common.MPITag.OBJECT)
# wrap to dict for sending and correct deserialization of the object by the recipient
comm.send(dict(info_package), dest=dest_rank, tag=common.MPITag.OBJECT)
with pkl5._bigmpi as bigmpi:

@@ -495,3 +654,3 @@ comm.Send(bigmpi(s_data), dest=dest_rank, tag=common.MPITag.BUFFER)

def send_complex_data(comm, data, dest_rank):
def send_complex_data(comm, data, dest_rank, is_serialized=False):
"""

@@ -508,35 +667,41 @@ Send the data that consists of different user provided complex types, lambdas and buffers in a blocking way.

Target MPI process to transfer data.
is_serialized : bool, default: False
`data` is already serialized or not.
Returns
-------
object
A serialized msgpack data.
list
A list of pickle buffers.
list
A list of buffers amount for each object.
dict
Serialized data for caching purpose.
Notes
-----
* This blocking send is used when we have to wait for completion of the communication,
This blocking send is used when we have to wait for completion of the communication,
which is necessary for the pipeline to continue, or when the receiver is waiting for a result.
Otherwise, use non-blocking ``isend_complex_data``.
* The special tags are used for this communication, namely,
``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``.
"""
serializer = ComplexDataSerializer()
# Main job
s_data = serializer.serialize(data)
# Retrive the metadata
raw_buffers = serializer.buffers
buffer_count = serializer.buffer_count
if is_serialized:
s_data = data["s_data"]
raw_buffers = data["raw_buffers"]
buffer_count = data["buffer_count"]
# pop `data_id` out of the dict because it will be send as part of metadata package
data_id = data.pop("id")
serialized_data = data
else:
data_id = data["id"]
serialized_data = serialize_complex_data(data)
s_data = serialized_data["s_data"]
raw_buffers = serialized_data["raw_buffers"]
buffer_count = serialized_data["buffer_count"]
info_package = common.MetadataPackage.get_local_info(
data_id, len(s_data), [len(sbuf) for sbuf in raw_buffers], buffer_count
)
# MPI communication
_send_complex_data_impl(comm, s_data, raw_buffers, buffer_count, dest_rank)
_send_complex_data_impl(comm, s_data, raw_buffers, dest_rank, info_package)
# For caching purpose
return s_data, raw_buffers, buffer_count
return serialized_data
def _isend_complex_data_impl(comm, s_data, raw_buffers, buffer_count, dest_rank):
def _isend_complex_data_impl(comm, s_data, raw_buffers, dest_rank, info_package):
"""

@@ -555,8 +720,6 @@ Send serialized complex data.

A list of pickle buffers.
buffer_count : list
List of the number of buffers for each object
to be serialized/deserialized using the pickle 5 protocol.
See details in :py:class:`~unidist.core.backends.mpi.core.serialization.ComplexDataSerializer`.
dest_rank : int
Target MPI process to transfer data.
info_package : unidist.core.backends.mpi.core.common.MetadataPackage
Required information to deserialize data on a receiver side.

@@ -571,14 +734,8 @@ Returns

* The special tags are used for this communication, namely,
``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``.
``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``.
"""
handlers = []
info = {
"s_data_len": len(s_data),
"buffer_count": buffer_count,
"raw_buffers_len": [len(sbuf) for sbuf in raw_buffers],
}
h1 = comm.isend(info, dest=dest_rank, tag=common.MPITag.OBJECT)
# wrap to dict for sending and correct deserialization of the object by the recipient
h1 = comm.isend(dict(info_package), dest=dest_rank, tag=common.MPITag.OBJECT)
handlers.append((h1, None))
with pkl5._bigmpi as bigmpi:

@@ -594,3 +751,3 @@ h2 = comm.Isend(bigmpi(s_data), dest=dest_rank, tag=common.MPITag.BUFFER)

def isend_complex_data(comm, data, dest_rank):
def isend_complex_data(comm, data, dest_rank, is_serialized=False):
"""

@@ -609,2 +766,4 @@ Send the data that consists of different user provided complex types, lambdas and buffers in a non-blocking way.

Target MPI process to transfer data.
is_serialized : bool, default: False
`operation_data` is already serialized or not.

@@ -624,17 +783,26 @@ Returns

-----
* The special tags are used for this communication, namely,
The special tags are used for this communication, namely,
``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``.
"""
handlers = []
if is_serialized:
s_data = data["s_data"]
raw_buffers = data["raw_buffers"]
buffer_count = data["buffer_count"]
# pop `data_id` out of the dict because it will be send as part of metadata package
data_id = data.pop("id")
info_package = common.MetadataPackage.get_local_info(
data_id, len(s_data), [len(sbuf) for sbuf in raw_buffers], buffer_count
)
else:
serialized_data = serialize_complex_data(data)
s_data = serialized_data["s_data"]
raw_buffers = serialized_data["raw_buffers"]
buffer_count = serialized_data["buffer_count"]
info_package = common.MetadataPackage.get_task_info(
len(s_data), [len(sbuf) for sbuf in raw_buffers], buffer_count
)
serializer = ComplexDataSerializer()
# Main job
s_data = serializer.serialize(data)
# Retrive the metadata
raw_buffers = serializer.buffers
buffer_count = serializer.buffer_count
# Send message pack bytestring
handlers.extend(
_isend_complex_data_impl(comm, s_data, raw_buffers, buffer_count, dest_rank)
# MPI communication
handlers = _isend_complex_data_impl(
comm, s_data, raw_buffers, dest_rank, info_package
)

@@ -645,3 +813,3 @@

def recv_complex_data(comm, source_rank):
def recv_complex_data(comm, source_rank, info_package):
"""

@@ -658,2 +826,4 @@ Receive the data that may consist of different user provided complex types, lambdas and buffers.

Source MPI process to receive data from.
info_package : unidist.core.backends.mpi.core.common.MetadataPackage
Required information to deserialize data.

@@ -667,9 +837,8 @@ Returns

-----
* The special tags are used for this communication, namely,
The special tags are used for this communication, namely,
``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``.
"""
info = comm.recv(source=source_rank, tag=common.MPITag.OBJECT)
msgpack_buffer = bytearray(info["s_data_len"])
buffer_count = info["buffer_count"]
raw_buffers = list(map(bytearray, info["raw_buffers_len"]))
msgpack_buffer = bytearray(info_package["s_data_len"])
buffer_count = info_package["buffer_count"]
raw_buffers = list(map(bytearray, info_package["raw_buffers_len"]))
with pkl5._bigmpi as bigmpi:

@@ -680,9 +849,5 @@ comm.Recv(bigmpi(msgpack_buffer), source=source_rank, tag=common.MPITag.BUFFER)

# Set the necessary metadata for unpacking
deserializer = ComplexDataSerializer(raw_buffers, buffer_count)
return deserialize_complex_data(msgpack_buffer, raw_buffers, buffer_count)
# Start unpacking
return deserializer.deserialize(msgpack_buffer)
# ---------- #

@@ -711,7 +876,7 @@ # Public API #

* This blocking send is used when we have to wait for completion of the communication,
which is necessary for the pipeline to continue, or when the receiver is waiting for a result.
Otherwise, use non-blocking ``isend_simple_operation``.
which is necessary for the pipeline to continue, or when the receiver is waiting for a result.
Otherwise, use non-blocking ``isend_simple_operation``.
* Serialization of the data to be sent takes place just using ``pickle.dump`` in this case.
* The special tags are used for this communication, namely,
``common.MPITag.OPERATION`` and ``common.MPITag.OBJECT``.
``common.MPITag.OPERATION`` and ``common.MPITag.OBJECT``.
"""

@@ -748,3 +913,3 @@ # Send operation type

* The special tags are used for this communication, namely,
``common.MPITag.OPERATION`` and ``common.MPITag.OBJECT``.
``common.MPITag.OPERATION`` and ``common.MPITag.OBJECT``.
"""

@@ -783,14 +948,15 @@ # Send operation type

`operation_data` is already serialized or not.
- `operation_data` is always serialized for data
that has already been saved into the object store.
- `operation_data` is always not serialized
for sending a task or an actor (actor method).
Returns
-------
dict and dict or dict and None
Async handlers and serialization data for caching purpose.
list and dict
Async handlers list and serialization data dict for caching purpose.
Notes
-----
* Function always returns a ``dict`` containing async handlers to the sent MPI operations.
In addition, ``None`` is returned if `operation_data` is already serialized,
otherwise ``dict`` containing data serialized in this function.
* The special tags are used for this communication, namely,
The special tags are used for this communication, namely,
``common.MPITag.OPERATION``, ``common.MPITag.OBJECT`` and ``common.MPITag.BUFFER``.

@@ -809,9 +975,11 @@ """

buffer_count = operation_data["buffer_count"]
# pop `data_id` out of the dict because it will be send as part of metadata package
data_id = operation_data.pop("id")
info_package = common.MetadataPackage.get_local_info(
data_id, len(s_data), [len(sbuf) for sbuf in raw_buffers], buffer_count
)
h2_list = _isend_complex_data_impl(
comm, s_data, raw_buffers, buffer_count, dest_rank
comm, s_data, raw_buffers, dest_rank, info_package
)
handlers.extend(h2_list)
return handlers, None
else:

@@ -823,7 +991,7 @@ # Serialize and send the data

handlers.extend(h2_list)
return handlers, {
"s_data": s_data,
"raw_buffers": raw_buffers,
"buffer_count": buffer_count,
}
return handlers, {
"s_data": s_data,
"raw_buffers": raw_buffers,
"buffer_count": buffer_count,
}

@@ -891,1 +1059,35 @@

return SimpleDataSerializer().deserialize_pickle(s_buffer)
def send_reserve_operation(comm, data_id, data_size):
"""
Reserve shared memory for `data_id`.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
data_size : int
Length of a required range in shared memory.
Returns
-------
dict
Reservation info about the allocated range in shared memory.
"""
mpi_state = MPIState.get_instance()
operation_type = common.Operation.RESERVE_SHARED_MEMORY
operation_data = {
"id": data_id,
"size": data_size,
}
# We use a blocking send here because we have to wait for
# completion of the communication, which is necessary for the pipeline to continue.
send_simple_operation(
comm,
operation_type,
operation_data,
mpi_state.get_monitor_by_worker_rank(),
)
return mpi_recv_object(comm, mpi_state.get_monitor_by_worker_rank())

@@ -10,3 +10,3 @@ # Copyright (C) 2021-2023 Modin authors

from unidist.core.backends.mpi.core.async_operations import AsyncOperations
from unidist.core.backends.mpi.core.controller.object_store import object_store
from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore
from unidist.core.backends.mpi.core.controller.garbage_collector import (

@@ -16,2 +16,3 @@ garbage_collector,

from unidist.core.backends.mpi.core.controller.common import push_data, RoundRobin
from unidist.core.backends.mpi.core.controller.api import put

@@ -38,26 +39,27 @@

def __call__(self, *args, num_returns=1, **kwargs):
output_id = object_store.generate_output_data_id(
local_store = LocalObjectStore.get_instance()
output_id = local_store.generate_output_data_id(
self._actor._owner_rank, garbage_collector, num_returns
)
unwrapped_args = [common.unwrap_data_ids(arg) for arg in args]
unwrapped_kwargs = {k: common.unwrap_data_ids(v) for k, v in kwargs.items()}
push_data(self._actor._owner_rank, self._method_name)
push_data(self._actor._owner_rank, args)
push_data(self._actor._owner_rank, kwargs)
push_data(self._actor._owner_rank, unwrapped_args)
push_data(self._actor._owner_rank, unwrapped_kwargs)
operation_type = common.Operation.ACTOR_EXECUTE
operation_data = {
"task": self._method_name,
"args": unwrapped_args,
"kwargs": unwrapped_kwargs,
"output": common.master_data_ids_to_base(output_id),
"handler": self._actor._handler_id.base_data_id(),
# tuple cannot be serialized iteratively and it will fail if some internal data cannot be serialized using Pickle
"args": list(args),
"kwargs": kwargs,
"output": output_id,
"handler": self._actor._handler_id,
}
async_operations = AsyncOperations.get_instance()
h_list, _ = communication.isend_complex_operation(
communication.MPIState.get_instance().comm,
communication.MPIState.get_instance().global_comm,
operation_type,
operation_data,
self._actor._owner_rank,
is_serialized=False,
)

@@ -81,3 +83,3 @@ async_operations.extend(h_list)

Used for proper serialization/deserialization via `__reduce__`.
handler_id : None or unidist.core.backends.mpi.core.common.MasterDataID
handler_id : None or unidist.core.backends.mpi.core.common.MpiDataID
An ID to the actor class.

@@ -102,8 +104,9 @@ Used for proper serialization/deserialization via `__reduce__`.

)
local_store = LocalObjectStore.get_instance()
self._handler_id = (
object_store.generate_data_id(garbage_collector)
local_store.generate_data_id(garbage_collector)
if handler_id is None
else handler_id
)
object_store.put_data_owner(self._handler_id, self._owner_rank)
local_store.put_data_owner(self._handler_id, self._owner_rank)

@@ -120,10 +123,11 @@ # reserve a rank for actor execution only

"kwargs": kwargs,
"handler": self._handler_id.base_data_id(),
"handler": self._handler_id,
}
async_operations = AsyncOperations.get_instance()
h_list, _ = communication.isend_complex_operation(
communication.MPIState.get_instance().comm,
communication.MPIState.get_instance().global_comm,
operation_type,
operation_data,
self._owner_rank,
is_serialized=False,
)

@@ -188,4 +192,11 @@ async_operations.extend(h_list)

# Cache for serialized actor methods {"method_name": DataID}
actor_methods = {}
def __getattr__(self, name):
return ActorMethod(self, name)
data_id_to_method = self.actor_methods.get(name, None)
if data_id_to_method is None:
data_id_to_method = put(name)
self.actor_methods[name] = data_id_to_method
return ActorMethod(self, data_id_to_method)

@@ -192,0 +203,0 @@ def __del__(self):

@@ -21,3 +21,5 @@ # Copyright (C) 2021-2023 Modin authors

from unidist.core.backends.mpi.core.controller.object_store import object_store
from unidist.core.backends.mpi.core.serialization import serialize_complex_data
from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore
from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore
from unidist.core.backends.mpi.core.controller.garbage_collector import (

@@ -36,3 +38,3 @@ garbage_collector,

CpuCount,
IsMpiSpawnWorkers,
MpiSpawn,
MpiHosts,

@@ -43,2 +45,7 @@ ValueSource,

MpiLog,
MpiSharedObjectStore,
MpiSharedObjectStoreMemory,
MpiSharedServiceMemory,
MpiSharedObjectStoreThreshold,
MpiRuntimeEnv,
)

@@ -54,4 +61,2 @@

# The topology of MPI cluster gets available when MPI initialization in `init`
topology = dict()
# The global variable is responsible for if MPI backend has already been initialized

@@ -139,74 +144,110 @@ is_mpi_initialized = False

runtime_env = MpiRuntimeEnv.get()
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
parent_comm = MPI.Comm.Get_parent()
# path to dynamically spawn MPI processes
if rank == 0 and parent_comm == MPI.COMM_NULL:
if IsMpiSpawnWorkers.get():
nprocs_to_spawn = CpuCount.get() + 1 # +1 for monitor process
args = _get_py_flags()
args += ["-c"]
py_str = [
"import unidist",
"import unidist.config as cfg",
"cfg.Backend.put('mpi')",
# Path to dynamically spawn MPI processes.
# If a requirement is not met, processes have been started with mpiexec -n <N>, where N > 1.
if rank == 0 and parent_comm == MPI.COMM_NULL and MpiSpawn.get():
args = _get_py_flags()
args += ["-c"]
py_str = [
"import unidist",
"import unidist.config as cfg",
"cfg.Backend.put('mpi')",
]
if MpiSpawn.get_value_source() != ValueSource.DEFAULT:
py_str += [f"cfg.MpiSpawn.put({MpiSpawn.get()})"]
if MpiHosts.get_value_source() != ValueSource.DEFAULT:
py_str += [f"cfg.MpiHosts.put('{MpiHosts.get()}')"]
if CpuCount.get_value_source() != ValueSource.DEFAULT:
py_str += [f"cfg.CpuCount.put({CpuCount.get()})"]
if MpiPickleThreshold.get_value_source() != ValueSource.DEFAULT:
py_str += [f"cfg.MpiPickleThreshold.put({MpiPickleThreshold.get()})"]
if MpiBackoff.get_value_source() != ValueSource.DEFAULT:
py_str += [f"cfg.MpiBackoff.put({MpiBackoff.get()})"]
if MpiLog.get_value_source() != ValueSource.DEFAULT:
py_str += [f"cfg.MpiLog.put({MpiLog.get()})"]
if MpiSharedObjectStore.get_value_source() != ValueSource.DEFAULT:
py_str += [f"cfg.MpiSharedObjectStore.put({MpiSharedObjectStore.get()})"]
if MpiSharedObjectStoreMemory.get_value_source() != ValueSource.DEFAULT:
py_str += [
f"cfg.MpiSharedObjectStoreMemory.put({MpiSharedObjectStoreMemory.get()})"
]
if IsMpiSpawnWorkers.get_value_source() != ValueSource.DEFAULT:
py_str += [f"cfg.IsMpiSpawnWorkers.put({IsMpiSpawnWorkers.get()})"]
if MpiHosts.get_value_source() != ValueSource.DEFAULT:
py_str += [f"cfg.MpiHosts.put('{MpiHosts.get()}')"]
if CpuCount.get_value_source() != ValueSource.DEFAULT:
py_str += [f"cfg.CpuCount.put({CpuCount.get()})"]
if MpiPickleThreshold.get_value_source() != ValueSource.DEFAULT:
py_str += [f"cfg.MpiPickleThreshold.put({MpiPickleThreshold.get()})"]
if MpiBackoff.get_value_source() != ValueSource.DEFAULT:
py_str += [f"cfg.MpiBackoff.put({MpiBackoff.get()})"]
if MpiLog.get_value_source() != ValueSource.DEFAULT:
py_str += [f"cfg.MpiLog.put({MpiLog.get()})"]
py_str += ["unidist.init()"]
py_str = "; ".join(py_str)
args += [py_str]
if MpiSharedServiceMemory.get_value_source() != ValueSource.DEFAULT:
py_str += [
f"cfg.MpiSharedServiceMemory.put({MpiSharedServiceMemory.get()})"
]
if MpiSharedObjectStoreThreshold.get_value_source() != ValueSource.DEFAULT:
py_str += [
f"cfg.MpiSharedObjectStoreThreshold.put({MpiSharedObjectStoreThreshold.get()})"
]
if runtime_env:
py_str += [f"cfg.MpiRuntimeEnv.put({runtime_env})"]
env_vars = ["import os"]
for option, value in runtime_env.items():
if option == MpiRuntimeEnv.env_vars:
for env_var_name, env_var_value in value.items():
env_vars += [
f"os.environ['{env_var_name}'] = '{env_var_value}'"
]
py_str += env_vars
py_str += ["unidist.init()"]
py_str = "; ".join(py_str)
args += [py_str]
hosts = MpiHosts.get()
info = MPI.Info.Create()
lib_version = MPI.Get_library_version()
if "Intel" in lib_version:
# To make dynamic spawn of MPI processes work properly
# we should set this environment variable.
# See more about Intel MPI environment variables in
# https://www.intel.com/content/www/us/en/docs/mpi-library/developer-reference-linux/2021-8/other-environment-variables.html.
os.environ["I_MPI_SPAWN"] = "1"
cpu_count = CpuCount.get()
hosts = MpiHosts.get()
info = MPI.Info.Create()
lib_version = MPI.Get_library_version()
if "Intel" in lib_version:
# To make dynamic spawn of MPI processes work properly
# we should set this environment variable.
# See more about Intel MPI environment variables in
# https://www.intel.com/content/www/us/en/docs/mpi-library/developer-reference-linux/2021-8/other-environment-variables.html.
os.environ["I_MPI_SPAWN"] = "1"
if hosts:
if "Open MPI" in lib_version:
host_list = str(hosts).split(",")
workers_per_host = [
int(nprocs_to_spawn / len(host_list))
+ (1 if i < nprocs_to_spawn % len(host_list) else 0)
for i in range(len(host_list))
]
hosts = ",".join(
[
f"{host}:{workers_per_host[i]}"
for i, host in enumerate(host_list)
]
)
info.Set("add-host", hosts)
else:
info.Set("hosts", hosts)
host_list = hosts.split(",") if hosts is not None else ["localhost"]
host_count = len(host_list)
intercomm = MPI.COMM_SELF.Spawn(
sys.executable,
args,
maxprocs=nprocs_to_spawn,
info=info,
root=rank,
)
comm = intercomm.Merge(high=False)
# path for processes to be started by mpiexec -n <N>, where N > 1
if common.is_shared_memory_supported():
# +host_count to add monitor process on each host
nprocs_to_spawn = cpu_count + host_count
else:
comm = MPI.COMM_WORLD
# +1 for just a single process monitor
nprocs_to_spawn = cpu_count + 1
if host_count > 1:
if "Open MPI" in lib_version:
# +1 to take into account the current root process
# to correctly allocate slots
slot_count = nprocs_to_spawn + 1
slots_per_host = [
int(slot_count / host_count)
+ (1 if i < slot_count % host_count else 0)
for i in range(host_count)
]
hosts = ",".join(
[f"{host_list[i]}:{slots_per_host[i]}" for i in range(host_count)]
)
info.Set("add-host", hosts)
else:
info.Set("hosts", hosts)
intercomm = MPI.COMM_SELF.Spawn(
sys.executable,
args,
maxprocs=nprocs_to_spawn,
info=info,
root=rank,
)
comm = intercomm.Merge(high=False)
else:
if runtime_env and rank != 0:
for option, value in runtime_env.items():
if option == MpiRuntimeEnv.env_vars:
for env_var_name, env_var_value in value.items():
os.environ[env_var_name] = str(env_var_value)
# path for spawned MPI processes to be merged with the parent communicator

@@ -216,10 +257,4 @@ if parent_comm != MPI.COMM_NULL:

mpi_state = communication.MPIState.get_instance(
comm, comm.Get_rank(), comm.Get_size()
)
mpi_state = communication.MPIState.get_instance(comm)
global topology
if not topology:
topology = communication.get_topology()
global is_mpi_initialized

@@ -229,11 +264,15 @@ if not is_mpi_initialized:

if mpi_state.rank == communication.MPIRank.ROOT:
# Initalize shared memory
SharedObjectStore.get_instance()
if mpi_state.is_root_process():
atexit.register(_termination_handler)
signal.signal(signal.SIGTERM, _termination_handler)
signal.signal(signal.SIGINT, _termination_handler)
# Exit the init function in root only after monitor and worker loops have started
mpi_state.global_comm.Barrier()
return
elif mpi_state.is_monitor_process():
from unidist.core.backends.mpi.core.monitor.loop import monitor_loop
if mpi_state.rank == communication.MPIRank.MONITOR:
from unidist.core.backends.mpi.core.monitor import monitor_loop
monitor_loop()

@@ -243,10 +282,6 @@ # If the user executes a program in SPMD mode,

# so just killing them.
if not IsMpiSpawnWorkers.get():
if not MpiSpawn.get():
sys.exit()
return
if mpi_state.rank not in (
communication.MPIRank.ROOT,
communication.MPIRank.MONITOR,
):
else:
from unidist.core.backends.mpi.core.worker.loop import worker_loop

@@ -258,3 +293,3 @@

# so just killing them.
if not IsMpiSpawnWorkers.get():
if not MpiSpawn.get():
sys.exit()

@@ -292,7 +327,7 @@ return

# Send shutdown commands to all ranks
for rank_id in range(communication.MPIRank.FIRST_WORKER, mpi_state.world_size):
for rank_id in mpi_state.workers:
# We use a blocking send here because we have to wait for
# completion of the communication, which is necessary for the pipeline to continue.
communication.mpi_send_operation(
mpi_state.comm,
mpi_state.global_comm,
common.Operation.CANCEL,

@@ -304,3 +339,3 @@ rank_id,

op_type = communication.mpi_recv_object(
mpi_state.comm,
mpi_state.global_comm,
communication.MPIRank.MONITOR,

@@ -310,4 +345,6 @@ )

raise ValueError(f"Got wrong operation type {op_type}.")
SharedObjectStore.get_instance().finalize()
if not MPI.Is_finalized():
MPI.Finalize()
logger.debug("Shutdown root")

@@ -327,9 +364,16 @@ is_mpi_shutdown = True

"""
global topology
if not topology:
mpi_state = communication.MPIState.get_instance()
if mpi_state is None:
raise RuntimeError("'unidist.init()' has not been called yet")
cluster_resources = defaultdict(dict)
for host, ranks_list in topology.items():
cluster_resources[host]["CPU"] = len(ranks_list)
for host in mpi_state.topology:
cluster_resources[host]["CPU"] = len(
[
r
for r in mpi_state.topology[host].values()
if not mpi_state.is_root_process(r)
and not mpi_state.is_monitor_process(r)
]
)

@@ -350,8 +394,16 @@ return dict(cluster_resources)

-------
unidist.core.backends.mpi.core.common.MasterDataID
unidist.core.backends.mpi.core.common.MpiDataID
An ID of an object in object storage.
"""
data_id = object_store.generate_data_id(garbage_collector)
object_store.put(data_id, data)
local_store = LocalObjectStore.get_instance()
shared_store = SharedObjectStore.get_instance()
data_id = local_store.generate_data_id(garbage_collector)
serialized_data = serialize_complex_data(data)
local_store.put(data_id, data)
if shared_store.is_allocated():
shared_store.put(data_id, serialized_data)
else:
local_store.cache_serialized_data(data_id, serialized_data)
logger.debug("PUT {} id".format(data_id._id))

@@ -368,3 +420,3 @@

----------
data_ids : unidist.core.backends.common.data_id.DataID or list
data_ids : unidist.core.backends.mpi.core.common.MpiDataID or list
An ID(s) to object(s) to get data from.

@@ -377,6 +429,11 @@

"""
local_store = LocalObjectStore.get_instance()
is_list = isinstance(data_ids, list)
if not is_list:
data_ids = [data_ids]
def get_impl(data_id):
if object_store.contains(data_id):
value = object_store.get(data_id)
if local_store.contains(data_id):
value = local_store.get(data_id)
else:

@@ -391,9 +448,3 @@ value = request_worker_data(data_id)

logger.debug("GET {} ids".format(common.unwrapped_data_ids_list(data_ids)))
is_list = isinstance(data_ids, list)
if not is_list:
data_ids = [data_ids]
values = [get_impl(data_id) for data_id in data_ids]
# Initiate reference count based cleaup

@@ -416,3 +467,3 @@ # if all the tasks were completed

----------
data_ids : unidist.core.backends.mpi.core.common.MasterDataID or list
data_ids : unidist.core.backends.mpi.core.common.MpiDataID or list
``DataID`` or list of ``DataID``-s to be waited.

@@ -429,3 +480,3 @@ num_returns : int, default: 1

data_ids = [data_ids]
# Since the controller should operate MasterDataID(s),
# Since the controller should operate MpiDataID(s),
# we use this map to retrieve and return them

@@ -437,6 +488,7 @@ # instead of DataID(s) received from workers.

ready = []
local_store = LocalObjectStore.get_instance()
logger.debug("WAIT {} ids".format(common.unwrapped_data_ids_list(data_ids)))
for data_id in not_ready:
if object_store.contains(data_id):
for data_id in not_ready.copy():
if local_store.contains(data_id):
ready.append(data_id)

@@ -449,3 +501,2 @@ not_ready.remove(data_id)

operation_type = common.Operation.WAIT
not_ready = [common.unwrap_data_ids(arg) for arg in not_ready]
operation_data = {

@@ -456,17 +507,18 @@ "data_ids": not_ready,

mpi_state = communication.MPIState.get_instance()
root_monitor = mpi_state.get_monitor_by_worker_rank(communication.MPIRank.ROOT)
# We use a blocking send and recv here because we have to wait for
# completion of the communication, which is necessary for the pipeline to continue.
communication.send_simple_operation(
mpi_state.comm,
mpi_state.global_comm,
operation_type,
operation_data,
communication.MPIRank.MONITOR,
root_monitor,
)
data = communication.mpi_recv_object(
mpi_state.comm,
communication.MPIRank.MONITOR,
mpi_state.global_comm,
root_monitor,
)
ready.extend(data["ready"])
not_ready = data["not_ready"]
# We have to retrieve and return MasterDataID(s)
# We have to retrieve and return MpiDataID(s)
# in order for the controller to operate them in further operations.

@@ -500,3 +552,3 @@ ready = [data_id_map[data_id] for data_id in ready]

-------
unidist.core.backends.mpi.core.common.MasterDataID or list or None
unidist.core.backends.mpi.core.common.MpiDataID or list or None
Type of returns depends on `num_returns` value:

@@ -514,3 +566,4 @@

output_ids = object_store.generate_output_data_id(
local_store = LocalObjectStore.get_instance()
output_ids = local_store.generate_output_data_id(
dest_rank, garbage_collector, num_returns

@@ -531,21 +584,21 @@ )

unwrapped_args = [common.unwrap_data_ids(arg) for arg in args]
unwrapped_kwargs = {k: common.unwrap_data_ids(v) for k, v in kwargs.items()}
push_data(dest_rank, task)
push_data(dest_rank, args)
push_data(dest_rank, kwargs)
push_data(dest_rank, unwrapped_args)
push_data(dest_rank, unwrapped_kwargs)
operation_type = common.Operation.EXECUTE
operation_data = {
"task": task,
"args": unwrapped_args,
"kwargs": unwrapped_kwargs,
"output": common.master_data_ids_to_base(output_ids),
# tuple cannot be serialized iteratively and it will fail if some internal data cannot be serialized using Pickle
"args": list(args),
"kwargs": kwargs,
"output": output_ids,
}
async_operations = AsyncOperations.get_instance()
h_list, _ = communication.isend_complex_operation(
communication.MPIState.get_instance().comm,
communication.MPIState.get_instance().global_comm,
operation_type,
operation_data,
dest_rank,
is_serialized=False,
)

@@ -552,0 +605,0 @@ async_operations.extend(h_list)

@@ -13,12 +13,10 @@ # Copyright (C) 2021-2023 Modin authors

from unidist.core.backends.mpi.core.async_operations import AsyncOperations
from unidist.core.backends.mpi.core.controller.object_store import object_store
from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore
from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore
from unidist.core.backends.mpi.core.serialization import serialize_complex_data
logger = common.get_logger("common", "common.log")
# initial worker number is equal to rank 2 because
# rank 0 is for controller process
# rank 1 is for monitor process
initial_worker_number = 2
class RoundRobin:

@@ -29,17 +27,13 @@ __instance = None

self.reserved_ranks = []
mpi_state = communication.MPIState.get_instance()
self.rank_to_schedule = itertools.cycle(
(
rank
for rank in range(
initial_worker_number,
communication.MPIState.get_instance().world_size,
)
global_rank
for global_rank in mpi_state.workers
# check if a rank to schedule is not equal to the rank
# of the current process to not get into recursive scheduling
if rank != communication.MPIState.get_instance().rank
if global_rank != mpi_state.global_rank
)
)
logger.debug(
f"RoundRobin init for {communication.MPIState.get_instance().rank} rank"
)
logger.debug(f"RoundRobin init for {mpi_state.global_rank} rank")

@@ -69,7 +63,6 @@ @classmethod

next_rank = None
mpi_state = communication.MPIState.get_instance()
# Go rank by rank to find the first one non-reserved
for _ in range(
initial_worker_number, communication.MPIState.get_instance().world_size
):
for _ in mpi_state.workers:
rank = next(self.rank_to_schedule)

@@ -100,3 +93,3 @@ if rank not in self.reserved_ranks:

f"RoundRobin reserve rank {rank} for actor "
+ f"on worker with rank {communication.MPIState.get_instance().rank}"
+ f"on worker with rank {communication.MPIState.get_instance().global_rank}"
)

@@ -118,6 +111,58 @@

f"RoundRobin release rank {rank} reserved for actor "
+ f"on worker with rank {communication.MPIState.get_instance().rank}"
+ f"on worker with rank {communication.MPIState.get_instance().global_rank}"
)
def pull_data(comm, owner_rank):
"""
Receive data from another MPI process.
Data can come from shared memory or direct communication depending on the package type.
The data is de-serialized from received buffers.
Parameters
----------
owner_rank : int
Source MPI process to receive data from.
Returns
-------
object
Received data object from another MPI process.
"""
info_package = communication.mpi_recv_object(comm, owner_rank)
if info_package["package_type"] == common.MetadataPackage.SHARED_DATA:
local_store = LocalObjectStore.get_instance()
shared_store = SharedObjectStore.get_instance()
data_id = info_package["id"]
if local_store.contains(data_id):
return {
"id": data_id,
"data": local_store.get(data_id),
}
data = shared_store.get(data_id, owner_rank, info_package)
local_store.put(data_id, data)
return {
"id": data_id,
"data": data,
}
elif info_package["package_type"] == common.MetadataPackage.LOCAL_DATA:
local_store = LocalObjectStore.get_instance()
data = communication.recv_complex_data(
comm, owner_rank, info_package=info_package
)
return {
"id": info_package["id"],
"data": data,
}
elif info_package["package_type"] == common.MetadataPackage.TASK_DATA:
return communication.recv_complex_data(
comm, owner_rank, info_package=info_package
)
else:
raise ValueError("Unexpected package of data info!")
def request_worker_data(data_id):

@@ -129,3 +174,3 @@ """

----------
data_id : unidist.core.backends.common.data_id.DataID
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID(s) to object(s) to get data from.

@@ -139,5 +184,4 @@

mpi_state = communication.MPIState.get_instance()
owner_rank = object_store.get_data_owner(data_id)
local_store = LocalObjectStore.get_instance()
owner_rank = local_store.get_data_owner(data_id)
logger.debug("GET {} id from {} rank".format(data_id._id, owner_rank))

@@ -148,4 +192,4 @@

operation_data = {
"source": mpi_state.rank,
"id": data_id.base_data_id(),
"source": mpi_state.global_rank,
"id": data_id,
# set `is_blocking_op` to `True` to tell a worker

@@ -159,3 +203,3 @@ # to send the data directly to the requester

communication.send_simple_operation(
mpi_state.comm,
mpi_state.global_comm,
operation_type,

@@ -165,13 +209,14 @@ operation_data,

)
# Blocking get
data = communication.recv_complex_data(mpi_state.comm, owner_rank)
complex_data = pull_data(mpi_state.global_comm, owner_rank)
if data_id != complex_data["id"]:
raise ValueError("Unexpected data_id!")
data = complex_data["data"]
# Caching the result, check the protocol correctness here
object_store.put(data_id, data)
local_store.put(data_id, data)
return data
def _push_local_data(dest_rank, data_id):
def _push_local_data(dest_rank, data_id, is_blocking_op, is_serialized):
"""

@@ -184,7 +229,15 @@ Send local data associated with passed ID to target rank.

Target rank.
data_id : unidist.core.backends.mpi.core.common.MasterDataID
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
is_blocking_op : bool
Whether the communication should be blocking or not.
If ``True``, the request should be processed immediatly
even for a worker since it can get into controller mode.
is_serialized : bool
`data_id` is already serialized or not.
"""
local_store = LocalObjectStore.get_instance()
# Check if data was already pushed
if not object_store.is_already_sent(data_id, dest_rank):
if not local_store.is_already_sent(data_id, dest_rank):
logger.debug("PUT LOCAL {} id to {} rank".format(data_id._id, dest_rank))

@@ -195,30 +248,78 @@

# Push the local master data to the target worker directly
if is_serialized:
operation_data = local_store.get_serialized_data(data_id)
# Insert `data_id` to get full metadata package further
operation_data["id"] = data_id
else:
operation_data = {
"id": data_id,
"data": local_store.get(data_id),
}
operation_type = common.Operation.PUT_DATA
if object_store.is_already_serialized(data_id):
serialized_data = object_store.get_serialized_data(data_id)
h_list, _ = communication.isend_complex_operation(
mpi_state.comm,
operation_type,
serialized_data,
if is_blocking_op:
serialized_data = communication.send_complex_data(
mpi_state.global_comm,
operation_data,
dest_rank,
is_serialized=True,
is_serialized=is_serialized,
)
else:
operation_data = {
"id": data_id,
"data": object_store.get(data_id),
}
h_list, serialized_data = communication.isend_complex_operation(
mpi_state.comm,
mpi_state.global_comm,
operation_type,
operation_data,
dest_rank,
is_serialized=False,
is_serialized=is_serialized,
)
object_store.cache_serialized_data(data_id, serialized_data)
async_operations.extend(h_list)
async_operations.extend(h_list)
if not is_serialized or not local_store.is_already_serialized(data_id):
local_store.cache_serialized_data(data_id, serialized_data)
# Remember pushed id
object_store.cache_send_info(data_id, dest_rank)
local_store.cache_send_info(data_id, dest_rank)
def _push_shared_data(dest_rank, data_id, is_blocking_op):
"""
Send the data associated with the data ID to target rank using shared memory.
Parameters
----------
dest_rank : int
Target rank.
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
is_blocking_op : bool
Whether the communication should be blocking or not.
If ``True``, the request should be processed immediatly
even for a worker since it can get into controller mode.
"""
local_store = LocalObjectStore.get_instance()
# Check if data was already pushed
if not local_store.is_already_sent(data_id, dest_rank):
mpi_state = communication.MPIState.get_instance()
shared_store = SharedObjectStore.get_instance()
mpi_state = communication.MPIState.get_instance()
operation_type = common.Operation.PUT_SHARED_DATA
async_operations = AsyncOperations.get_instance()
info_package = shared_store.get_shared_info(data_id)
# wrap to dict for sending and correct deserialization of the object by the recipient
operation_data = dict(info_package)
if is_blocking_op:
communication.mpi_send_object(
mpi_state.global_comm, operation_data, dest_rank
)
else:
h_list = communication.isend_simple_operation(
mpi_state.global_comm,
operation_type,
operation_data,
dest_rank,
)
async_operations.extend(h_list)
local_store.cache_send_info(data_id, dest_rank)
def _push_data_owner(dest_rank, data_id):

@@ -232,13 +333,14 @@ """

Target rank.
value : unidist.core.backends.mpi.core.common.MasterDataID
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
"""
local_store = LocalObjectStore.get_instance()
operation_type = common.Operation.PUT_OWNER
operation_data = {
"id": data_id,
"owner": object_store.get_data_owner(data_id),
"owner": local_store.get_data_owner(data_id),
}
async_operations = AsyncOperations.get_instance()
h_list = communication.isend_simple_operation(
communication.MPIState.get_instance().comm,
communication.MPIState.get_instance().global_comm,
operation_type,

@@ -251,3 +353,3 @@ operation_data,

def push_data(dest_rank, value):
def push_data(dest_rank, value, is_blocking_op=False):
"""

@@ -265,3 +367,10 @@ Parse and send all values to destination rank.

Arguments to be sent.
is_blocking_op : bool
Whether the communication should be blocking or not.
If ``True``, the request should be processed immediatly
even for a worker since it can get into controller mode.
"""
local_store = LocalObjectStore.get_instance()
shared_store = SharedObjectStore.get_instance()
if isinstance(value, (list, tuple)):

@@ -274,7 +383,24 @@ for v in value:

elif is_data_id(value):
if object_store.contains(value):
_push_local_data(dest_rank, value)
elif object_store.contains_data_owner(value):
_push_data_owner(dest_rank, value)
data_id = value
if shared_store.contains(data_id):
_push_shared_data(dest_rank, data_id, is_blocking_op)
elif local_store.contains(data_id):
if local_store.is_already_serialized(data_id):
_push_local_data(dest_rank, data_id, is_blocking_op, is_serialized=True)
else:
data = local_store.get(data_id)
serialized_data = serialize_complex_data(data)
if shared_store.is_allocated() and shared_store.should_be_shared(
serialized_data
):
shared_store.put(data_id, serialized_data)
_push_shared_data(dest_rank, data_id, is_blocking_op)
else:
local_store.cache_serialized_data(data_id, serialized_data)
_push_local_data(
dest_rank, data_id, is_blocking_op, is_serialized=True
)
elif local_store.contains_data_owner(data_id):
_push_data_owner(dest_rank, data_id)
else:
raise ValueError("Unknown DataID!")

@@ -13,4 +13,3 @@ # Copyright (C) 2021-2023 Modin authors

from unidist.core.backends.mpi.core.serialization import SimpleDataSerializer
from unidist.core.backends.mpi.core.controller.object_store import object_store
from unidist.core.backends.mpi.core.controller.common import initial_worker_number
from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore

@@ -27,3 +26,3 @@

----------
object_store : unidist.core.backends.mpi.executor.ObjectStore
local_store : unidist.core.backends.mpi.core.local_object_store
Reference to the local object storage.

@@ -36,3 +35,3 @@

def __init__(self, object_store):
def __init__(self, local_store):
# Cleanup frequency settings

@@ -43,7 +42,7 @@ self._cleanup_counter = 1

self._timestamp = 0 # seconds
# Cleanup list of DataIDs
# Cleanup list of tuple(owner_rank, data_number)
self._cleanup_list = []
self._cleanup_list_threshold = 10
# Reference to the global object store
self._object_store = object_store
self._local_store = local_store
# Task submitted counter

@@ -61,7 +60,3 @@ self._task_counter = 0

"""
logger.debug(
"Send cleanup list - {}".format(
common.unwrapped_data_ids_list(cleanup_list)
)
)
logger.debug(f"Send cleanup list - {cleanup_list}")
mpi_state = communication.MPIState.get_instance()

@@ -71,6 +66,6 @@ # Cache serialized list of data IDs

async_operations = AsyncOperations.get_instance()
for rank_id in range(initial_worker_number, mpi_state.world_size):
if rank_id != mpi_state.rank:
for rank_id in mpi_state.workers + mpi_state.monitor_processes:
if rank_id != mpi_state.global_rank:
h_list = communication.isend_serialized_operation(
mpi_state.comm,
mpi_state.global_comm,
common.Operation.CLEANUP,

@@ -100,4 +95,4 @@ s_cleanup_list,

----------
data_id : unidist.core.backends.mpi.core.common.MasterDataID
An ID to data
data_id_metadata : tuple
Tuple of the owner rank and data number describing a ``MpiDataID``.
"""

@@ -129,2 +124,5 @@ self._cleanup_list.append(data_id)

mpi_state = communication.MPIState.get_instance()
root_monitor = mpi_state.get_monitor_by_worker_rank(
communication.MPIRank.ROOT
)
# Compare submitted and executed tasks

@@ -134,9 +132,9 @@ # We use a blocking send here because we have to wait for

communication.mpi_send_operation(
mpi_state.comm,
mpi_state.global_comm,
common.Operation.GET_TASK_COUNT,
communication.MPIRank.MONITOR,
root_monitor,
)
executed_task_counter = communication.mpi_recv_object(
mpi_state.comm,
communication.MPIRank.MONITOR,
mpi_state.global_comm,
root_monitor,
)

@@ -151,4 +149,2 @@

self._send_cleanup_request(self._cleanup_list)
# Clear the remaining references
self._object_store.clear(self._cleanup_list)
self._cleanup_list.clear()

@@ -161,2 +157,2 @@ self._cleanup_counter += 1

garbage_collector = GarbageCollector(object_store)
garbage_collector = GarbageCollector(LocalObjectStore.get_instance())

@@ -125,3 +125,3 @@ # Copyright (C) 2021-2023 Modin authors

self.buffers = buffers if buffers else []
self.buffer_count = buffer_count if buffer_count else []
self.buffer_count = list(buffer_count) if buffer_count else []
self._callback_counter = 0

@@ -223,2 +223,7 @@

Returns
-------
bytes
Serialized data.
Notes

@@ -247,3 +252,7 @@ -----

frame = pkl.loads(obj["as_bytes"], buffers=self.buffers)
del self.buffers[: self.buffer_count.pop(0)]
# check if there are out-of-band buffers
# TODO: look at this condition and get rid of it because
# `buffer_count` should always be a list with length greater 0.
if self.buffer_count:
del self.buffers[: self.buffer_count.pop(0)]
return frame

@@ -262,2 +271,7 @@ else:

Returns
-------
object
Deserialized data.
Notes

@@ -347,1 +361,55 @@ -----

return pkl.loads(data)
def serialize_complex_data(data):
"""
Serialize data to a bytearray.
Parameters
----------
data : object
Data to serialize.
Returns
-------
bytes
Serialized data.
Notes
-----
Uses msgpack, cloudpickle and pickle libraries.
"""
serializer = ComplexDataSerializer()
s_data = serializer.serialize(data)
serialized_data = {
"s_data": s_data,
"raw_buffers": serializer.buffers,
"buffer_count": serializer.buffer_count,
}
return serialized_data
def deserialize_complex_data(s_data, raw_buffers, buffer_count):
"""
Deserialize data based on passed in information.
Parameters
----------
s_data : bytearray
Serialized msgpack data.
raw_buffers : list
A list of ``PickleBuffer`` objects for data decoding.
buffer_count : list
List of the number of buffers for each object
to be deserialized using the pickle 5 protocol.
Returns
-------
object
Deserialized data.
Notes
-----
Uses msgpack, cloudpickle and pickle libraries.
"""
deserializer = ComplexDataSerializer(raw_buffers, buffer_count)
return deserializer.deserialize(s_data)

@@ -6,2 +6,3 @@ # Copyright (C) 2021-2023 Modin authors

"""Worker MPI process task processing functionality."""
import asyncio

@@ -19,6 +20,8 @@ from functools import wraps, partial

import unidist.core.backends.mpi.core.communication as communication
from unidist.core.backends.mpi.core.worker.object_store import ObjectStore
from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore
from unidist.core.backends.mpi.core.worker.request_store import RequestStore
from unidist.core.backends.mpi.core.worker.task_store import TaskStore
from unidist.core.backends.mpi.core.async_operations import AsyncOperations
from unidist.core.backends.mpi.core.controller.common import pull_data
from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore

@@ -33,3 +36,3 @@ # TODO: Find a way to move this after all imports

# we use the condition to set "worker_0.log" in order to build it succesfully.
logger_name = "worker_{}".format(mpi_state.rank if mpi_state is not None else 0)
logger_name = "worker_{}".format(mpi_state.global_rank if mpi_state is not None else 0)
log_file = "{}.log".format(logger_name)

@@ -88,6 +91,9 @@ w_logger = common.get_logger(logger_name, log_file)

task_store = TaskStore.get_instance()
object_store = ObjectStore.get_instance()
local_store = LocalObjectStore.get_instance()
request_store = RequestStore.get_instance()
async_operations = AsyncOperations.get_instance()
ready_to_shutdown_posted = False
# Barrier to check if worker process is ready to start the communication loop
mpi_state.global_comm.Barrier()
w_logger.debug("Worker loop started")
# Once the worker receives the cancel signal from ``Root`` rank,

@@ -98,2 +104,3 @@ # it is getting to shutdown. All pending requests and communications are cancelled,

# ``Monitor` sends the shutdown signal to every worker so they can exit the loop.
ready_to_shutdown_posted = False
while True:

@@ -103,8 +110,10 @@ # Listen receive operation from any source

communication.mpi_recv_operation
)(mpi_state.comm)
w_logger.debug("common.Operation processing - {}".format(operation_type))
)(mpi_state.global_comm)
w_logger.debug(
f"common.Operation processing - {operation_type} from {source_rank} rank"
)
# Proceed the request
if operation_type == common.Operation.EXECUTE:
request = communication.recv_complex_data(mpi_state.comm, source_rank)
request = pull_data(mpi_state.global_comm, source_rank)
if not ready_to_shutdown_posted:

@@ -120,5 +129,4 @@ # Execute the task if possible

elif operation_type == common.Operation.GET:
request = communication.mpi_recv_object(mpi_state.comm, source_rank)
if not ready_to_shutdown_posted:
request["id"] = object_store.get_unique_data_id(request["id"])
request = communication.mpi_recv_object(mpi_state.global_comm, source_rank)
if request is not None and not ready_to_shutdown_posted:
request_store.process_get_request(

@@ -129,3 +137,3 @@ request["source"], request["id"], request["is_blocking_op"]

elif operation_type == common.Operation.PUT_DATA:
request = communication.recv_complex_data(mpi_state.comm, source_rank)
request = pull_data(mpi_state.global_comm, source_rank)
if not ready_to_shutdown_posted:

@@ -137,4 +145,3 @@ w_logger.debug(

)
request["id"] = object_store.get_unique_data_id(request["id"])
object_store.put(request["id"], request["data"])
local_store.put(request["id"], request["data"])

@@ -150,6 +157,5 @@ # Discard data request to another worker, if data has become available

elif operation_type == common.Operation.PUT_OWNER:
request = communication.mpi_recv_object(mpi_state.comm, source_rank)
request = communication.mpi_recv_object(mpi_state.global_comm, source_rank)
if not ready_to_shutdown_posted:
request["id"] = object_store.get_unique_data_id(request["id"])
object_store.put_data_owner(request["id"], request["owner"])
local_store.put_data_owner(request["id"], request["owner"])

@@ -162,11 +168,21 @@ w_logger.debug(

elif operation_type == common.Operation.PUT_SHARED_DATA:
result = pull_data(mpi_state.global_comm, source_rank)
# Clear cached request to another worker, if data_id became available
request_store.discard_data_request(result["id"])
# Check pending requests. Maybe some data became available.
task_store.check_pending_tasks()
# Check pending actor requests also.
task_store.check_pending_actor_tasks()
elif operation_type == common.Operation.WAIT:
request = communication.mpi_recv_object(mpi_state.comm, source_rank)
request = communication.mpi_recv_object(mpi_state.global_comm, source_rank)
if not ready_to_shutdown_posted:
w_logger.debug("WAIT for {} id".format(request["id"]._id))
request["id"] = object_store.get_unique_data_id(request["id"])
request_store.process_wait_request(request["id"])
elif operation_type == common.Operation.ACTOR_CREATE:
request = communication.recv_complex_data(mpi_state.comm, source_rank)
request = pull_data(mpi_state.global_comm, source_rank)
if not ready_to_shutdown_posted:

@@ -180,6 +196,7 @@ cls = request["class"]

elif operation_type == common.Operation.ACTOR_EXECUTE:
request = communication.recv_complex_data(mpi_state.comm, source_rank)
request = pull_data(mpi_state.global_comm, source_rank)
if not ready_to_shutdown_posted:
# Prepare the data
method_name = request["task"]
# Actor method here is a data id so we have to retrieve it from the storage
method_name = local_store.get(request["task"])
handler = request["handler"]

@@ -199,5 +216,6 @@ actor_method = getattr(actor_map[handler], method_name)

cleanup_list = communication.recv_serialized_data(
mpi_state.comm, source_rank
mpi_state.global_comm, source_rank
)
object_store.clear(cleanup_list)
cleanup_list = [common.MpiDataID(*tpl) for tpl in cleanup_list]
local_store.clear(cleanup_list)

@@ -211,5 +229,5 @@ elif operation_type == common.Operation.CANCEL:

communication.mpi_send_operation(
mpi_state.comm,
mpi_state.global_comm,
common.Operation.READY_TO_SHUTDOWN,
communication.MPIRank.MONITOR,
mpi_state.get_monitor_by_worker_rank(communication.MPIRank.ROOT),
)

@@ -219,2 +237,3 @@ ready_to_shutdown_posted = True

w_logger.debug("Exit worker event loop")
SharedObjectStore.get_instance().finalize()
if not MPI.Is_finalized():

@@ -221,0 +240,0 @@ MPI.Finalize()

@@ -9,4 +9,4 @@ # Copyright (C) 2021-2023 Modin authors

import unidist.core.backends.mpi.core.communication as communication
from unidist.core.backends.mpi.core.async_operations import AsyncOperations
from unidist.core.backends.mpi.core.worker.object_store import ObjectStore
from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore
from unidist.core.backends.mpi.core.controller.common import push_data

@@ -18,3 +18,3 @@

# we use the condition to set "worker_0.log" in order to build it succesfully.
logger_name = "worker_{}".format(mpi_state.rank if mpi_state is not None else 0)
logger_name = "worker_{}".format(mpi_state.global_rank if mpi_state is not None else 0)
log_file = "{}.log".format(logger_name)

@@ -49,7 +49,7 @@ logger = common.get_logger(logger_name, log_file)

def __init__(self):
# Non-blocking get requests {DataId : [ Set of Ranks ]}
# Non-blocking get requests {DataID : [ Set of Ranks ]}
self._nonblocking_get_requests = defaultdict(set)
# Blocking get requests {DataId : [ Set of Ranks ]}
# Blocking get requests {DataID : [ Set of Ranks ]}
self._blocking_get_requests = defaultdict(set)
# Blocking wait requests {DataId : Rank}
# Blocking wait requests {DataID : Rank}
self._blocking_wait_requests = {}

@@ -78,3 +78,3 @@ # Data requests

----------
data_id : unidist.core.backends.common.data_id.DataID
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.

@@ -108,3 +108,3 @@ rank : int

----------
data_id : unidist.core.backends.common.data_id.DataID
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.

@@ -125,3 +125,3 @@

----------
data_id : unidist.core.backends.common.data_id.DataID
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.

@@ -148,3 +148,3 @@ """

----------
data_id : iterable or unidist.core.backends.common.data_id.DataID
data_id : iterable or unidist.core.backends.mpi.core.common.MpiDataID
An ID or list of IDs to data.

@@ -184,3 +184,3 @@ """

----------
data_id : iterable or unidist.core.backends.common.data_id.DataID
data_id : iterable or unidist.core.backends.mpi.core.common.MpiDataID
An ID or list of IDs to data.

@@ -194,3 +194,3 @@ """

communication.mpi_send_object(
communication.MPIState.get_instance().comm,
communication.MPIState.get_instance().global_comm,
data_id,

@@ -204,3 +204,3 @@ communication.MPIRank.ROOT,

communication.mpi_send_object(
communication.MPIState.get_instance().comm,
communication.MPIState.get_instance().global_comm,
data_ids,

@@ -219,3 +219,3 @@ communication.MPIRank.ROOT,

----------
data_id : unidist.core.backends.common.data_id.DataID
data_id : unidist.core.backends.mpi.core.common.MpiDataID
Chech if `data_id` is available in object store.

@@ -227,7 +227,7 @@

"""
if ObjectStore.get_instance().contains(data_id):
if LocalObjectStore.get_instance().contains(data_id):
# Executor wait just for signal
# We use a blocking send here because the receiver is waiting for the result.
communication.mpi_send_object(
communication.MPIState.get_instance().comm,
communication.MPIState.get_instance().global_comm,
data_id,

@@ -251,3 +251,3 @@ communication.MPIRank.ROOT,

Rank number to send data to.
data_id: unidist.core.backends.common.data_id.DataID
data_id: unidist.core.backends.mpi.core.common.MpiDataID
`data_id` associated data to request

@@ -263,44 +263,10 @@ is_blocking_op : bool, default: False

"""
object_store = ObjectStore.get_instance()
async_operations = AsyncOperations.get_instance()
if object_store.contains(data_id):
if source_rank == communication.MPIRank.ROOT or is_blocking_op:
# The controller or a requesting worker is blocked by the request
# which should be processed immediatly
operation_data = object_store.get(data_id)
# We use a blocking send here because the receiver is waiting for the result.
communication.send_complex_data(
mpi_state.comm,
operation_data,
source_rank,
)
else:
operation_type = common.Operation.PUT_DATA
if object_store.is_already_serialized(data_id):
operation_data = object_store.get_serialized_data(data_id)
# Async send to avoid possible dead-lock between workers
h_list, _ = communication.isend_complex_operation(
mpi_state.comm,
operation_type,
operation_data,
source_rank,
is_serialized=True,
)
async_operations.extend(h_list)
else:
operation_data = {
"id": data_id,
"data": object_store.get(data_id),
}
# Async send to avoid possible dead-lock between workers
h_list, serialized_data = communication.isend_complex_operation(
mpi_state.comm,
operation_type,
operation_data,
source_rank,
is_serialized=False,
)
async_operations.extend(h_list)
object_store.cache_serialized_data(data_id, serialized_data)
local_store = LocalObjectStore.get_instance()
if local_store.contains(data_id):
push_data(
source_rank,
data_id,
is_blocking_op=source_rank == communication.MPIRank.ROOT
or is_blocking_op,
)
logger.debug(

@@ -307,0 +273,0 @@ "Send requested {} id to {} rank - PROCESSED".format(

@@ -14,3 +14,5 @@ # Copyright (C) 2021-2023 Modin authors

from unidist.core.backends.mpi.core.async_operations import AsyncOperations
from unidist.core.backends.mpi.core.worker.object_store import ObjectStore
from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore
from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore
from unidist.core.backends.mpi.core.serialization import serialize_complex_data
from unidist.core.backends.mpi.core.worker.request_store import RequestStore

@@ -22,3 +24,3 @@

# we use the condition to set "worker_0.log" in order to build it succesfully.
logger_name = "worker_{}".format(mpi_state.rank if mpi_state is not None else 0)
logger_name = "worker_{}".format(mpi_state.global_rank if mpi_state is not None else 0)
log_file = "{}.log".format(logger_name)

@@ -138,3 +140,3 @@ w_logger = common.get_logger(logger_name, log_file)

Rank number to request data from.
data_id : unidist.core.backends.common.data_id.DataID
data_id : unidist.core.backends.mpi.core.common.MpiDataID
`data_id` associated data to request.

@@ -152,3 +154,3 @@

operation_data = {
"source": communication.MPIState.get_instance().rank,
"source": communication.MPIState.get_instance().global_rank,
"id": data_id,

@@ -159,3 +161,3 @@ "is_blocking_op": False,

h_list = communication.isend_simple_operation(
communication.MPIState.get_instance().comm,
communication.MPIState.get_instance().global_comm,
operation_type,

@@ -178,3 +180,3 @@ operation_data,

----------
arg : object or unidist.core.backends.common.data_id.DataID
arg : object or unidist.core.backends.mpi.core.common.MpiDataID
Data ID or object to inspect.

@@ -193,13 +195,12 @@

if is_data_id(arg):
object_store = ObjectStore.get_instance()
arg = object_store.get_unique_data_id(arg)
if object_store.contains(arg):
value = ObjectStore.get_instance().get(arg)
local_store = LocalObjectStore.get_instance()
if local_store.contains(arg):
value = LocalObjectStore.get_instance().get(arg)
# Data is already local or was pushed from master
return value, False
elif object_store.contains_data_owner(arg):
elif local_store.contains_data_owner(arg):
if not RequestStore.get_instance().is_data_already_requested(arg):
# Request the data from an owner worker
owner_rank = object_store.get_data_owner(arg)
if owner_rank != communication.MPIState.get_instance().rank:
owner_rank = local_store.get_data_owner(arg)
if owner_rank != communication.MPIState.get_instance().global_rank:
self.request_worker_data(owner_rank, arg)

@@ -218,3 +219,3 @@ return arg, True

----------
output_data_ids : list of unidist.core.backends.common.data_id.DataID
output_data_ids : list of unidist.core.backends.mpi.core.common.MpiDataID
A list of output data IDs to store the results in local object store.

@@ -232,4 +233,8 @@ task : callable

"""
object_store = ObjectStore.get_instance()
local_store = LocalObjectStore.get_instance()
shared_store = SharedObjectStore.get_instance()
completed_data_ids = []
# Note that if a task is coroutine,
# the local store or the shared store will contain output data
# only once the task is complete.
if inspect.iscoroutinefunction(task):

@@ -267,7 +272,5 @@

for output_id in output_data_ids:
data_id = object_store.get_unique_data_id(output_id)
object_store.put(data_id, e)
local_store.put(output_id, e)
else:
data_id = object_store.get_unique_data_id(output_data_ids)
object_store.put(data_id, e)
local_store.put(output_data_ids, e)
else:

@@ -283,9 +286,27 @@ if output_data_ids is not None:

):
data_id = object_store.get_unique_data_id(output_id)
object_store.put(data_id, value)
completed_data_ids[idx] = data_id
serialized_data = serialize_complex_data(value)
local_store.put(output_id, value)
if (
shared_store.is_allocated()
and shared_store.should_be_shared(serialized_data)
):
shared_store.put(output_id, serialized_data)
else:
local_store.cache_serialized_data(
output_id, serialized_data
)
completed_data_ids[idx] = output_id
else:
data_id = object_store.get_unique_data_id(output_data_ids)
object_store.put(data_id, output_values)
completed_data_ids = [data_id]
serialized_data = serialize_complex_data(output_values)
local_store.put(output_data_ids, output_values)
if (
shared_store.is_allocated()
and shared_store.should_be_shared(serialized_data)
):
shared_store.put(output_data_ids, serialized_data)
else:
local_store.cache_serialized_data(
output_data_ids, serialized_data
)
completed_data_ids = [output_data_ids]

@@ -296,7 +317,10 @@ RequestStore.get_instance().check_pending_get_requests(output_data_ids)

# completion of the communication, which is necessary for the pipeline to continue.
root_monitor = mpi_state.get_monitor_by_worker_rank(
communication.MPIRank.ROOT
)
communication.send_simple_operation(
communication.MPIState.get_instance().comm,
communication.MPIState.get_instance().global_comm,
common.Operation.TASK_DONE,
completed_data_ids,
communication.MPIRank.MONITOR,
root_monitor,
)

@@ -340,7 +364,5 @@

for output_id in output_data_ids:
data_id = object_store.get_unique_data_id(output_id)
object_store.put(data_id, e)
local_store.put(output_id, e)
else:
data_id = object_store.get_unique_data_id(output_data_ids)
object_store.put(data_id, e)
local_store.put(output_data_ids, e)
else:

@@ -356,9 +378,27 @@ if output_data_ids is not None:

):
data_id = object_store.get_unique_data_id(output_id)
object_store.put(data_id, value)
completed_data_ids[idx] = data_id
serialized_data = serialize_complex_data(value)
local_store.put(output_id, value)
if (
shared_store.is_allocated()
and shared_store.should_be_shared(serialized_data)
):
shared_store.put(output_id, serialized_data)
else:
local_store.cache_serialized_data(
output_id, serialized_data
)
completed_data_ids[idx] = output_id
else:
data_id = object_store.get_unique_data_id(output_data_ids)
object_store.put(data_id, output_values)
completed_data_ids = [data_id]
serialized_data = serialize_complex_data(output_values)
local_store.put(output_data_ids, output_values)
if (
shared_store.is_allocated()
and shared_store.should_be_shared(serialized_data)
):
shared_store.put(output_data_ids, serialized_data)
else:
local_store.cache_serialized_data(
output_data_ids, serialized_data
)
completed_data_ids = [output_data_ids]
RequestStore.get_instance().check_pending_get_requests(output_data_ids)

@@ -368,7 +408,10 @@ # Monitor the task execution.

# completion of the communication, which is necessary for the pipeline to continue.
root_monitor = mpi_state.get_monitor_by_worker_rank(
communication.MPIRank.ROOT
)
communication.send_simple_operation(
communication.MPIState.get_instance().comm,
communication.MPIState.get_instance().global_comm,
common.Operation.TASK_DONE,
completed_data_ids,
communication.MPIRank.MONITOR,
root_monitor,
)

@@ -393,3 +436,8 @@

# Parse request
local_store = LocalObjectStore.get_instance()
task = request["task"]
# Remote function here is a data id so we have to retrieve it from the storage,
# whereas actor method is already materialized in the worker loop.
if is_data_id(task):
task = local_store.get(task)
args = request["args"]

@@ -396,0 +444,0 @@ kwargs = request["kwargs"]

@@ -8,2 +8,4 @@ # Copyright (C) 2021-2023 Modin authors

import unidist.core.backends.mpi.core as mpi
from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore
from unidist.core.backends.common.data_id import is_data_id
from unidist.core.backends.common.utils import unwrap_object_refs

@@ -32,2 +34,3 @@ from unidist.core.base.object_ref import ObjectRef

self._remote_function = function
self._remote_function_orig = function
self._num_cpus = num_cpus

@@ -76,2 +79,11 @@ self._num_returns = 1 if num_returns is None else num_returns

if not is_data_id(self._remote_function):
self._remote_function = mpi.put(self._remote_function_orig)
else:
# When a worker calls a remote function inside another remote function,
# we have to again serialize the former remote function and put it into the storage
# for further correct communication.
if not LocalObjectStore.get_instance().contains(self._remote_function):
self._remote_function = mpi.put(self._remote_function_orig)
data_ids = mpi.submit(

@@ -78,0 +90,0 @@ self._remote_function,

@@ -13,1 +13,26 @@ # Copyright (C) 2021-2023 Modin authors

init()
class ImmutableDict(dict):
__readonly_exception = TypeError("Cannot modify `ImmutableDict`")
def __setitem__(self, *args, **kwargs):
raise ImmutableDict.__readonly_exception
def __delitem__(self, *args, **kwargs):
raise ImmutableDict.__readonly_exception
def pop(self, *args, **kwargs):
raise ImmutableDict.__readonly_exception
def popitem(self, *args, **kwargs):
raise ImmutableDict.__readonly_exception
def clear(self, *args, **kwargs):
raise ImmutableDict.__readonly_exception
def update(self, *args, **kwargs):
raise ImmutableDict.__readonly_exception
def setdefault(self, *args, **kwargs):
raise ImmutableDict.__readonly_exception

@@ -57,3 +57,12 @@ # Copyright (C) 2021-2023 Modin authors

# test for https://github.com/modin-project/unidist/issues/354
object_refs = [unidist.put(1), unidist.put(2)]
ready, not_ready = unidist.wait(object_refs, num_returns=1)
assert_equal(len(ready), 1)
assert_equal(len(not_ready), 1)
ready, not_ready = unidist.wait(object_refs, num_returns=2)
assert_equal(len(ready), 2)
assert_equal(len(not_ready), 0)
def test_get_ip():

@@ -60,0 +69,0 @@ import socket

# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""`ObjectStore` functionality."""
import weakref
from collections import defaultdict
import unidist.core.backends.mpi.core.common as common
import unidist.core.backends.mpi.core.communication as communication
class ObjectStore:
"""
Class that stores objects and provides access to these from master process.
Notes
-----
Currently, the storage is local to the current process only.
"""
__instance = None
def __init__(self):
# Add local data {DataID : Data}
self._data_map = weakref.WeakKeyDictionary()
# Data owner {DataID : Rank}
self._data_owner_map = weakref.WeakKeyDictionary()
# Data was already sent to this ranks {DataID : [ranks]}
self._sent_data_map = defaultdict(set)
# Data id generator
self._data_id_counter = 0
# Data serialized cache
self._serialization_cache = {}
@classmethod
def get_instance(cls):
"""
Get instance of ``ObjectStore``.
Returns
-------
ObjectStore
"""
if cls.__instance is None:
cls.__instance = ObjectStore()
return cls.__instance
def put(self, data_id, data):
"""
Put data to internal dictionary.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MasterDataID
An ID to data.
data : object
Data to be put.
"""
self._data_map[data_id] = data
def put_data_owner(self, data_id, rank):
"""
Put data location (owner rank) to internal dictionary.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MasterDataID
An ID to data.
rank : int
Rank number where the data resides.
"""
self._data_owner_map[data_id] = rank
def get(self, data_id):
"""
Get the data from a local dictionary.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MasterDataID
An ID to data.
Returns
-------
object
Return local data associated with `data_id`.
"""
return self._data_map[data_id]
def get_data_owner(self, data_id):
"""
Get the data owner rank.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MasterDataID
An ID to data.
Returns
-------
int
Rank number where the data resides.
"""
return self._data_owner_map[data_id]
def contains(self, data_id):
"""
Check if the data associated with `data_id` exists in a local dictionary.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MasterDataID
An ID to data.
Returns
-------
bool
Return the status if an object exist in local dictionary.
"""
return data_id in self._data_map
def contains_data_owner(self, data_id):
"""
Check if the data location info associated with `data_id` exists in a local dictionary.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MasterDataID
An ID to data.
Returns
-------
bool
Return the status if an object location is known.
"""
return data_id in self._data_owner_map
def clear(self, cleanup_list):
"""
Clear all local dictionary data ID instances from `cleanup_list`.
Parameters
----------
cleanup_list : list
List of ``DataID``-s.
Notes
-----
Only cache of sent data can be cleared - the rest are weakreferenced.
"""
for data_id in cleanup_list:
self._sent_data_map.pop(data_id, None)
self._serialization_cache.clear()
def generate_data_id(self, gc):
"""
Generate unique ``MasterDataID`` instance.
Parameters
----------
gc : unidist.core.backends.mpi.core.executor.GarbageCollector
Local garbage collector reference.
Returns
-------
unidist.core.backends.mpi.core.common.MasterDataID
Unique data ID instance.
"""
data_id = f"rank_{communication.MPIState.get_instance().rank}_id_{self._data_id_counter}"
self._data_id_counter += 1
return common.MasterDataID(data_id, gc)
def generate_output_data_id(self, dest_rank, gc, num_returns=1):
"""
Generate unique list of ``unidist.core.backends.mpi.core.common.MasterDataID`` instance.
Parameters
----------
dest_rank : int
Ranks number where generated list will be located.
gc : unidist.core.backends.mpi.core.executor.GarbageCollector
Local garbage collector reference.
num_returns : int, default: 1
Generated list size.
Returns
-------
list
A list of unique ``MasterDataID`` instances.
"""
if num_returns == 1:
output_ids = self.generate_data_id(gc)
self.put_data_owner(output_ids, dest_rank)
elif num_returns == 0:
output_ids = None
else:
output_ids = [None] * num_returns
for i in range(num_returns):
output_id = self.generate_data_id(gc)
output_ids[i] = output_id
self.put_data_owner(output_id, dest_rank)
return output_ids
def cache_send_info(self, data_id, rank):
"""
Save communication event for this `data_id` and rank.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MasterDataID
An ``ID`` to data.
rank : int
Rank number where the data was sent.
"""
self._sent_data_map[data_id].add(rank)
def is_already_sent(self, data_id, rank):
"""
Check if communication event on this `data_id` and rank happened.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MasterDataID
An ID to data
rank : int
Rank number to check.
Returns
-------
bool
``True`` if communication event already happened.
"""
return (data_id in self._sent_data_map) and (
rank in self._sent_data_map[data_id]
)
def cache_serialized_data(self, data_id, data):
"""
Save communication event for this `data_id` and `data`.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MasterDataID
An ID to data.
data : object
Serialized data to cache.
"""
self._serialization_cache[data_id] = data
def is_already_serialized(self, data_id):
"""
Check if the data on this `data_id` is already serialized.
Parameters
----------
data_id : unidist.core.backends.common.data_id.DataID
An ID to data.
Returns
-------
bool
``True`` if the data is already serialized.
"""
return data_id in self._serialization_cache
def get_serialized_data(self, data_id):
"""
Get serialized data on this `data_id`.
Parameters
----------
data_id : unidist.core.backends.common.data_id.DataID
An ID to data.
Returns
-------
object
Cached serialized data associated with `data_id`.
"""
return self._serialization_cache[data_id]
object_store = ObjectStore.get_instance()
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""Monitoring process."""
try:
import mpi4py
except ImportError:
raise ImportError(
"Missing dependency 'mpi4py'. Use pip or conda to install it."
) from None
import unidist.core.backends.mpi.core.common as common
import unidist.core.backends.mpi.core.communication as communication
# TODO: Find a way to move this after all imports
mpi4py.rc(recv_mprobe=False, initialize=False)
from mpi4py import MPI # noqa: E402
class TaskCounter:
__instance = None
def __init__(self):
self.task_counter = 0
@classmethod
def get_instance(cls):
"""
Get instance of ``TaskCounter``.
Returns
-------
TaskCounter
"""
if cls.__instance is None:
cls.__instance = TaskCounter()
return cls.__instance
def increment(self):
"""Increment task counter by one."""
self.task_counter += 1
class DataIDTracker:
"""
Class that keeps track of all completed (ready) data IDs.
"""
__instance = None
def __init__(self):
self.completed_data_ids = set()
@classmethod
def get_instance(cls):
"""
Get instance of ``DataIDTracker``.
Returns
-------
DataIDTracker
"""
if cls.__instance is None:
cls.__instance = DataIDTracker()
return cls.__instance
def add_to_completed(self, data_ids):
"""
Add the given data IDs to the set of completed (ready) data IDs.
Parameters
----------
data_ids : list
List of data IDs to be added to the set of completed (ready) data IDs.
"""
self.completed_data_ids.update(data_ids)
class WaitHandler:
"""
Class that handles wait requests.
"""
__instance = None
def __init__(self):
self.completed_data_ids = set()
self.awaited_data_ids = []
self.ready = []
self.num_returns = 0
@classmethod
def get_instance(cls):
"""
Get instance of ``WaitHandler``.
Returns
-------
WaitHandler
"""
if cls.__instance is None:
cls.__instance = WaitHandler()
return cls.__instance
def add_wait_request(self, awaited_data_ids, num_returns):
"""
Add a wait request for a list of data IDs and the number of data IDs to be awaited.
Parameters
----------
awaited_data_ids : list
List of data IDs to be awaited.
num_returns : int
The number of ``DataID``-s that should be returned as ready.
"""
self.awaited_data_ids = awaited_data_ids
self.num_returns = num_returns
def process_wait_requests(self):
"""
Check if wait requests are pending to be processed.
Process pending wait requests and send the data_ids to the requester
if number of data IDs that are ready are equal to the num_returns.
"""
data_id_tracker = DataIDTracker.get_instance()
i = 0
if self.awaited_data_ids:
while i < len(self.awaited_data_ids):
data_id = self.awaited_data_ids[i]
if data_id in data_id_tracker.completed_data_ids:
self.ready.append(data_id)
self.awaited_data_ids.remove(data_id)
if len(self.ready) == self.num_returns:
operation_data = {
"ready": self.ready,
"not_ready": self.awaited_data_ids,
}
communication.mpi_send_object(
communication.MPIState.get_instance().comm,
operation_data,
communication.MPIRank.ROOT,
)
self.ready = []
self.awaited_data_ids = []
else:
i += 1
def monitor_loop():
"""
Infinite monitor operations processing loop.
Tracks the number of executed tasks and completed (ready) data IDs.
Notes
-----
The loop exits on special cancelation operation.
``unidist.core.backends.mpi.core.common.Operations`` defines a set of supported operations.
"""
task_counter = TaskCounter.get_instance()
mpi_state = communication.MPIState.get_instance()
wait_handler = WaitHandler.get_instance()
data_id_tracker = DataIDTracker.get_instance()
workers_ready_to_shutdown = []
shutdown_workers = False
# Once all workers excluding ``Root`` and ``Monitor`` ranks are ready to shutdown,
# ``Monitor` sends the shutdown signal to every worker, as well as notifies ``Root`` that
# it can exit the program.
while True:
# Listen receive operation from any source
operation_type, source_rank = communication.mpi_recv_operation(mpi_state.comm)
# Proceed the request
if operation_type == common.Operation.TASK_DONE:
task_counter.increment()
output_data_ids = communication.mpi_recv_object(mpi_state.comm, source_rank)
data_id_tracker.add_to_completed(output_data_ids)
wait_handler.process_wait_requests()
elif operation_type == common.Operation.WAIT:
# TODO: WAIT request can be received from several workers,
# but not only from master. Handle this case when requested.
operation_data = communication.mpi_recv_object(mpi_state.comm, source_rank)
awaited_data_ids = operation_data["data_ids"]
num_returns = operation_data["num_returns"]
wait_handler.add_wait_request(awaited_data_ids, num_returns)
wait_handler.process_wait_requests()
elif operation_type == common.Operation.GET_TASK_COUNT:
# We use a blocking send here because the receiver is waiting for the result.
communication.mpi_send_object(
mpi_state.comm,
task_counter.task_counter,
source_rank,
)
elif operation_type == common.Operation.READY_TO_SHUTDOWN:
workers_ready_to_shutdown.append(source_rank)
shutdown_workers = (
len(workers_ready_to_shutdown) == mpi_state.world_size - 2
) # "-2" to exclude ``Root`` and ``Monitor`` ranks
else:
raise ValueError(f"Unsupported operation: {operation_type}")
if shutdown_workers:
for rank_id in range(
communication.MPIRank.FIRST_WORKER, mpi_state.world_size
):
communication.mpi_send_operation(
mpi_state.comm,
common.Operation.SHUTDOWN,
rank_id,
)
communication.mpi_send_object(
mpi_state.comm,
common.Operation.SHUTDOWN,
communication.MPIRank.ROOT,
)
if not MPI.Is_finalized():
MPI.Finalize()
break # leave event loop and shutdown monitoring
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
import weakref
import unidist.core.backends.mpi.core.common as common
import unidist.core.backends.mpi.core.communication as communication
mpi_state = communication.MPIState.get_instance()
# Logger configuration
# When building documentation we do not have MPI initialized so
# we use the condition to set "worker_0.log" in order to build it succesfully.
logger_name = "worker_{}".format(mpi_state.rank if mpi_state is not None else 0)
log_file = "{}.log".format(logger_name)
logger = common.get_logger(logger_name, log_file)
class ObjectStore:
"""
Class that stores local objects and provides access to them.
Notes
-----
For now, the storage is local to the current worker process only.
"""
__instance = None
def __init__(self):
# Add local data {DataId : Data}
self._data_map = weakref.WeakKeyDictionary()
# "strong" references to data IDs {DataId : DataId}
# we are using dict here to improve performance when getting an element from it,
# whereas other containers would require O(n) complexity
self._data_id_map = {}
# Data owner {DataId : Rank}
self._data_owner_map = weakref.WeakKeyDictionary()
# Data serialized cache
self._serialization_cache = {}
@classmethod
def get_instance(cls):
"""
Get instance of ``ObjectStore``.
Returns
-------
ObjectStore
"""
if cls.__instance is None:
cls.__instance = ObjectStore()
return cls.__instance
def put(self, data_id, data):
"""
Put `data` to internal dictionary.
Parameters
----------
data_id : unidist.core.backends.common.data_id.DataID
An ID to data.
data : object
Data to be put.
"""
self._data_map[data_id] = data
def put_data_owner(self, data_id, rank):
"""
Put data location (owner rank) to internal dictionary.
Parameters
----------
data_id : unidist.core.backends.common.data_id.DataID
An ID to data.
rank : int
Rank number where the data resides.
"""
self._data_owner_map[data_id] = rank
def get(self, data_id):
"""
Get the data from a local dictionary.
Parameters
----------
data_id : unidist.core.backends.common.data_id.DataID
An ID to data.
Returns
-------
object
Return local data associated with `data_id`.
"""
return self._data_map[data_id]
def get_data_owner(self, data_id):
"""
Get the data owner rank.
Parameters
----------
data_id : unidist.core.backends.common.data_id.DataID
An ID to data.
Returns
-------
int
Rank number where the data resides.
"""
return self._data_owner_map[data_id]
def contains(self, data_id):
"""
Check if the data associated with `data_id` exists in a local dictionary.
Parameters
----------
data_id : unidist.core.backends.common.data_id.DataID
An ID to data.
Returns
-------
bool
Return the status if an object exist in local dictionary.
"""
return data_id in self._data_map
def contains_data_owner(self, data_id):
"""
Check if the data location info associated with `data_id` exists in a local dictionary.
Parameters
----------
data_id : unidist.core.backends.common.data_id.DataID
An ID to data.
Returns
-------
bool
Return the ``True`` status if an object location is known.
"""
return data_id in self._data_owner_map
def get_unique_data_id(self, data_id):
"""
Get the "strong" reference to the data ID if it is already stored locally.
If the passed data ID is not stored locally yet, save and return it.
Parameters
----------
data_id : unidist.core.backends.common.data_id.DataID
An ID to data.
Returns
-------
unidist.core.backends.common.data_id.DataID
The unique ID to data.
Notes
-----
We need to use a unique data ID reference for the garbage colleactor to work correctly.
"""
if data_id not in self._data_id_map:
self._data_id_map[data_id] = data_id
return self._data_id_map[data_id]
def clear(self, cleanup_list):
"""
Clear "strong" references to data IDs from `cleanup_list`.
Parameters
----------
cleanup_list : list
List of data IDs.
Notes
-----
The actual data will be collected later when there is no weak or
strong reference to data in the current worker.
"""
for data_id in cleanup_list:
self._data_id_map.pop(data_id, None)
def cache_serialized_data(self, data_id, data):
"""
Save serialized object for this `data_id` and rank.
Parameters
----------
data_id : unidist.core.backends.common.data_id.DataID
An ID to data.
data : object
Serialized data to cache.
"""
self._serialization_cache[data_id] = data
def is_already_serialized(self, data_id):
"""
Check if the data on this `data_id` is already serialized.
Parameters
----------
data_id : unidist.core.backends.common.data_id.DataID
An ID to data.
Returns
-------
bool
``True`` if the data is already serialized.
"""
return data_id in self._serialization_cache
def get_serialized_data(self, data_id):
"""
Get serialized data on this `data_id`.
Parameters
----------
data_id : unidist.core.backends.common.data_id.DataID
An ID to data.
Returns
-------
object
Cached serialized data associated with `data_id`.
"""
return self._serialization_cache[data_id]