You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

unidist

Package Overview
Dependencies
Maintainers
2
Versions
13
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

unidist - pypi Package Compare versions

Comparing version
0.5.1
to
0.6.0
+90
unidist/core/backends/mpi/core/object_store.py
# Copyright (C) 2021-2023 Modin authors
#
# SPDX-License-Identifier: Apache-2.0
"""`ObjectStore` functionality."""
from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore
from unidist.core.backends.mpi.core.serialization import deserialize_complex_data
from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore
class ObjectStore:
"""
Class that combines checking and retrieving data from the shared and local stores in a current process.
Notes
-----
The store checks for both deserialized and serialized data.
"""
__instance = None
@classmethod
def get_instance(cls):
"""
Get instance of ``ObjectStore``.
Returns
-------
ObjectStore
"""
if cls.__instance is None:
cls.__instance = ObjectStore()
return cls.__instance
def contains(self, data_id):
"""
Check if the data associated with `data_id` exists in the current process.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
Returns
-------
bool
Return the status if an object exist in the current process.
"""
local_store = LocalObjectStore.get_instance()
shared_store = SharedObjectStore.get_instance()
return (
local_store.contains(data_id)
or local_store.is_already_serialized(data_id)
or shared_store.contains(data_id)
)
def get(self, data_id):
"""
Get data from any location in the current process.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID to data.
Returns
-------
object
Return data associated with `data_id`.
"""
local_store = LocalObjectStore.get_instance()
shared_store = SharedObjectStore.get_instance()
if local_store.contains(data_id):
return local_store.get(data_id)
if local_store.is_already_serialized(data_id):
serialized_data = local_store.get_serialized_data(data_id)
value = deserialize_complex_data(
serialized_data["s_data"],
serialized_data["raw_buffers"],
serialized_data["buffer_count"],
)
elif shared_store.contains(data_id):
value = shared_store.get(data_id)
else:
raise ValueError("The current data ID is not contained in the procces.")
local_store.put(data_id, value)
return value
+2
-2
Metadata-Version: 2.1
Name: unidist
Version: 0.5.1
Version: 0.6.0
Summary: Unified Distributed Execution

@@ -40,3 +40,3 @@ Home-page: https://github.com/modin-project/unidist

<a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a>
<a href="https://pypi.org/project/unidist/0.5.1/"><img src="https://img.shields.io/badge/pypi-0.5.1-blue.svg" alt="PyPI version" align="center"></a>
<a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a>
<a href="https://pepy.tech/project/unidist"><img src="https://static.pepy.tech/personalized-badge/unidist?period=total&units=international_system&left_color=black&right_color=blue&left_text=Downloads" align="center"></a>

@@ -43,0 +43,0 @@ </p>

@@ -9,3 +9,3 @@ <p align="center">

<a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a>
<a href="https://pypi.org/project/unidist/0.5.1/"><img src="https://img.shields.io/badge/pypi-0.5.1-blue.svg" alt="PyPI version" align="center"></a>
<a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a>
<a href="https://pepy.tech/project/unidist"><img src="https://static.pepy.tech/personalized-badge/unidist?period=total&units=international_system&left_color=black&right_color=blue&left_text=Downloads" align="center"></a>

@@ -12,0 +12,0 @@ </p>

Metadata-Version: 2.1
Name: unidist
Version: 0.5.1
Version: 0.6.0
Summary: Unified Distributed Execution

@@ -40,3 +40,3 @@ Home-page: https://github.com/modin-project/unidist

<a href="https://unidist.readthedocs.io/en/latest/?badge=latest"><img alt="" src="https://readthedocs.org/projects/unidist/badge/?version=latest" align="center"></a>
<a href="https://pypi.org/project/unidist/0.5.1/"><img src="https://img.shields.io/badge/pypi-0.5.1-blue.svg" alt="PyPI version" align="center"></a>
<a href="https://pypi.org/project/unidist/"><img src="https://badge.fury.io/py/unidist.svg" alt="PyPI version" align="center"></a>
<a href="https://pepy.tech/project/unidist"><img src="https://static.pepy.tech/personalized-badge/unidist?period=total&units=international_system&left_color=black&right_color=blue&left_text=Downloads" align="center"></a>

@@ -43,0 +43,0 @@ </p>

@@ -47,2 +47,3 @@ AUTHORS

unidist/core/backends/mpi/core/local_object_store.py
unidist/core/backends/mpi/core/object_store.py
unidist/core/backends/mpi/core/serialization.py

@@ -49,0 +50,0 @@ unidist/core/backends/mpi/core/shared_object_store.py

@@ -11,7 +11,7 @@

{
"date": "2023-11-25T22:18:18+0100",
"date": "2024-01-08T21:52:33+0100",
"dirty": false,
"error": null,
"full-revisionid": "8440b4bb3aa56fb7b558efb00f084087b3e88e18",
"version": "0.5.1"
"full-revisionid": "4fa3cf262299c08b66efd538c190b360c448d467",
"version": "0.6.0"
}

@@ -18,0 +18,0 @@ ''' # END VERSION_JSON

@@ -104,2 +104,5 @@ # Copyright (C) 2021-2023 Modin authors

The tag for send/recv of a buffer-like object.
OBJECT_BLOCKING : int, default: 114
The tag for send/recv of a regular Python object
to indicate the blocking get request.
"""

@@ -110,2 +113,3 @@

BUFFER = 113
OBJECT_BLOCKING = 114

@@ -112,0 +116,0 @@

@@ -300,3 +300,3 @@ # Copyright (C) 2021-2023 Modin authors

def mpi_send_object(comm, data, dest_rank):
def mpi_send_object(comm, data, dest_rank, tag=common.MPITag.OBJECT):
"""

@@ -313,2 +313,4 @@ Send a Python object to another MPI rank in a blocking way.

Target MPI process to transfer data.
tag : common.MPITag, default: common.MPITag.OBJECT
Message tag.

@@ -322,3 +324,3 @@ Notes

"""
comm.send(data, dest=dest_rank, tag=common.MPITag.OBJECT)
comm.send(data, dest=dest_rank, tag=tag)

@@ -411,2 +413,32 @@

def mpi_iprobe_recv_object(comm, tag=common.MPITag.OBJECT):
"""
Receive an object of a standard Python data type from any source.
The source rank gets available from `iprobe`.
Parameters
----------
comm : mpi4py.MPI.Comm
MPI communicator.
tag : common.MPITag, default: common.MPITag.OBJECT
Message tag.
Returns
-------
object
Received data from the source rank.
int
Source rank.
"""
backoff = MpiBackoff.get()
status = MPI.Status()
source = MPI.ANY_SOURCE
while not comm.iprobe(source=source, tag=tag, status=status):
time.sleep(backoff)
source = status.source
data = comm.recv(source=source, tag=tag, status=status)
return data, source
def mpi_recv_object(comm, source_rank):

@@ -629,3 +661,3 @@ """

# wrap to dict for sending and correct deserialization of the object by the recipient
comm.send(dict(info_package), dest=dest_rank, tag=common.MPITag.OBJECT)
comm.send(dict(info_package), dest=dest_rank, tag=common.MPITag.OBJECT_BLOCKING)
with pkl5._bigmpi as bigmpi:

@@ -632,0 +664,0 @@ comm.Send(bigmpi(s_data), dest=dest_rank, tag=common.MPITag.BUFFER)

@@ -22,2 +22,3 @@ # Copyright (C) 2021-2023 Modin authors

from unidist.core.backends.mpi.core.serialization import serialize_complex_data
from unidist.core.backends.mpi.core.object_store import ObjectStore
from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore

@@ -390,3 +391,2 @@ from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore

serialized_data = serialize_complex_data(data)
local_store.put(data_id, data)
if shared_store.is_allocated():

@@ -416,21 +416,17 @@ shared_store.put(data_id, serialized_data)

"""
local_store = LocalObjectStore.get_instance()
object_store = ObjectStore.get_instance()
is_list = isinstance(data_ids, list)
if not is_list:
data_ids = [data_ids]
remote_data_ids = [
data_id for data_id in data_ids if not object_store.contains(data_id)
]
# Remote data gets available in the local store inside `request_worker_data`
if remote_data_ids:
request_worker_data(remote_data_ids)
def get_impl(data_id):
if local_store.contains(data_id):
value = local_store.get(data_id)
else:
value = request_worker_data(data_id)
logger.debug("GET {} ids".format(common.unwrapped_data_ids_list(data_ids)))
if isinstance(value, Exception):
raise value
values = [object_store.get(data_id) for data_id in data_ids]
return value
logger.debug("GET {} ids".format(common.unwrapped_data_ids_list(data_ids)))
values = [get_impl(data_id) for data_id in data_ids]
# Initiate reference count based cleaup

@@ -463,2 +459,3 @@ # if all the tasks were completed

"""
object_store = ObjectStore.get_instance()
if not isinstance(data_ids, list):

@@ -473,7 +470,5 @@ data_ids = [data_ids]

ready = []
local_store = LocalObjectStore.get_instance()
logger.debug("WAIT {} ids".format(common.unwrapped_data_ids_list(data_ids)))
for data_id in not_ready.copy():
if local_store.contains(data_id):
if object_store.contains(data_id):
ready.append(data_id)

@@ -480,0 +475,0 @@ not_ready.remove(data_id)

@@ -15,3 +15,5 @@ # Copyright (C) 2021-2023 Modin authors

from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore
from unidist.core.backends.mpi.core.serialization import serialize_complex_data
from unidist.core.backends.mpi.core.serialization import (
serialize_complex_data,
)

@@ -112,3 +114,3 @@

def pull_data(comm, owner_rank):
def pull_data(comm, owner_rank=None):
"""

@@ -122,4 +124,5 @@ Receive data from another MPI process.

----------
owner_rank : int
owner_rank : int or None
Source MPI process to receive data from.
If ``None``, data will be received from any source based on `iprobe`.

@@ -131,3 +134,9 @@ Returns

"""
info_package = communication.mpi_recv_object(comm, owner_rank)
if owner_rank is None:
info_package, source = communication.mpi_iprobe_recv_object(
comm, tag=common.MPITag.OBJECT_BLOCKING
)
owner_rank = source
else:
info_package = communication.mpi_recv_object(comm, owner_rank)
if info_package["package_type"] == common.MetadataPackage.SHARED_DATA:

@@ -155,4 +164,6 @@ local_store = LocalObjectStore.get_instance()

)
data_id = info_package["id"]
local_store.put(data_id, data)
return {
"id": info_package["id"],
"id": data_id,
"data": data,

@@ -168,10 +179,10 @@ }

def request_worker_data(data_id):
def request_worker_data(data_ids):
"""
Get an object(s) associated with `data_id` from the object storage.
Get objects associated with `data_ids` from the object storage.
Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MpiDataID
An ID(s) to object(s) to get data from.
data_ids : list[unidist.core.backends.mpi.core.common.MpiDataID]
IDs to objects to get data from.

@@ -185,34 +196,48 @@ Returns

local_store = LocalObjectStore.get_instance()
owner_rank = local_store.get_data_owner(data_id)
logger.debug("GET {} id from {} rank".format(data_id._id, owner_rank))
async_operations = AsyncOperations.get_instance()
# Worker request
operation_type = common.Operation.GET
operation_data = {
"source": mpi_state.global_rank,
"id": data_id,
# set `is_blocking_op` to `True` to tell a worker
# to send the data directly to the requester
# without any delay
"is_blocking_op": True,
}
# We use a blocking send here because we have to wait for
# completion of the communication, which is necessary for the pipeline to continue.
communication.send_simple_operation(
mpi_state.global_comm,
operation_type,
operation_data,
owner_rank,
)
# Blocking get
complex_data = pull_data(mpi_state.global_comm, owner_rank)
if data_id != complex_data["id"]:
raise ValueError("Unexpected data_id!")
data = complex_data["data"]
for data_id in data_ids:
owner_rank = local_store.get_data_owner(data_id)
# Caching the result, check the protocol correctness here
local_store.put(data_id, data)
return data
logger.debug("GET {} id from {} rank".format(data_id._id, owner_rank))
# Worker request
operation_type = common.Operation.GET
operation_data = {
"source": mpi_state.global_rank,
"id": data_id,
# set `is_blocking_op` to `True` to tell a worker
# to send the data directly to the requester
# without any delay
"is_blocking_op": True,
}
h_list = communication.isend_simple_operation(
mpi_state.global_comm,
operation_type,
operation_data,
owner_rank,
)
# We do not wait for async requests here because
# we can receive the data from the first available worker below
async_operations.extend(h_list)
data_count = 0
# If some dataids raise an exception it will be captured in exception_raised variable and raised after the while loop ends.
exception_raised = None
while data_count < len(data_ids):
# Remote data gets available in the local store inside `pull_data`
complex_data = pull_data(mpi_state.global_comm)
if isinstance(complex_data["data"], Exception) and exception_raised is None:
exception_raised = complex_data["data"]
data_id = complex_data["id"]
if data_id in data_ids:
data_count += 1
else:
raise RuntimeError(
f"DataID {data_id} isn't in the requested list {data_ids}"
)
if exception_raised:
raise exception_raised
def _push_local_data(dest_rank, data_id, is_blocking_op, is_serialized):

@@ -307,3 +332,6 @@ """

communication.mpi_send_object(
mpi_state.global_comm, operation_data, dest_rank
mpi_state.global_comm,
operation_data,
dest_rank,
tag=common.MPITag.OBJECT_BLOCKING,
)

@@ -379,18 +407,15 @@ else:

_push_shared_data(dest_rank, data_id, is_blocking_op)
elif local_store.is_already_serialized(data_id):
_push_local_data(dest_rank, data_id, is_blocking_op, is_serialized=True)
elif local_store.contains(data_id):
if local_store.is_already_serialized(data_id):
data = local_store.get(data_id)
serialized_data = serialize_complex_data(data)
if shared_store.is_allocated() and shared_store.should_be_shared(
serialized_data
):
shared_store.put(data_id, serialized_data)
_push_shared_data(dest_rank, data_id, is_blocking_op)
else:
local_store.cache_serialized_data(data_id, serialized_data)
_push_local_data(dest_rank, data_id, is_blocking_op, is_serialized=True)
else:
data = local_store.get(data_id)
serialized_data = serialize_complex_data(data)
if shared_store.is_allocated() and shared_store.should_be_shared(
serialized_data
):
shared_store.put(data_id, serialized_data)
_push_shared_data(dest_rank, data_id, is_blocking_op)
else:
local_store.cache_serialized_data(data_id, serialized_data)
_push_local_data(
dest_rank, data_id, is_blocking_op, is_serialized=True
)
elif local_store.contains_data_owner(data_id):

@@ -397,0 +422,0 @@ _push_data_owner(dest_rank, data_id)

@@ -280,2 +280,6 @@ # Copyright (C) 2021-2023 Modin authors

"""
# We make a copy to avoid data corruption obtained through out-of-band serialization,
# and buffers are marked read-only to prevent them from being modified.
# `to_bytes()` call handles both points.
data["raw_buffers"] = [buf.tobytes() for buf in data["raw_buffers"]]
self._serialization_cache[data_id] = data

@@ -282,0 +286,0 @@ self.maybe_update_data_id_map(data_id)

@@ -750,3 +750,3 @@ # Copyright (C) 2021-2023 Modin authors

def get(self, data_id, owner_rank, shared_info):
def get(self, data_id, owner_rank=None, shared_info=None):
"""

@@ -759,51 +759,63 @@ Get data from another worker using shared memory.

An ID to data.
owner_rank : int
owner_rank : int, default: None
The rank that sent the data.
shared_info : dict
This value is used to synchronize data in shared memory between different hosts
if the value is not ``None``.
shared_info : dict, default: None
The necessary information to properly deserialize data from shared memory.
If `shared_info` is ``None``, the data already exists in shared memory in the current process.
"""
mpi_state = communication.MPIState.get_instance()
s_data_len = shared_info["s_data_len"]
raw_buffers_len = shared_info["raw_buffers_len"]
service_index = shared_info["service_index"]
buffer_count = shared_info["buffer_count"]
if shared_info is None:
shared_info = self.get_shared_info(data_id)
else:
mpi_state = communication.MPIState.get_instance()
s_data_len = shared_info["s_data_len"]
raw_buffers_len = shared_info["raw_buffers_len"]
service_index = shared_info["service_index"]
buffer_count = shared_info["buffer_count"]
# check data in shared memory
if not self._check_service_info(data_id, service_index):
# reserve shared memory
shared_data_len = s_data_len + sum([buf for buf in raw_buffers_len])
reservation_info = communication.send_reserve_operation(
mpi_state.global_comm, data_id, shared_data_len
)
service_index = reservation_info["service_index"]
# check if worker should sync shared buffer or it is doing by another worker
if reservation_info["is_first_request"]:
# syncronize shared buffer
self._sync_shared_memory_from_another_host(
mpi_state.global_comm,
data_id,
owner_rank,
reservation_info["first_index"],
reservation_info["last_index"],
service_index,
# check data in shared memory
if not self._check_service_info(data_id, service_index):
# reserve shared memory
shared_data_len = s_data_len + sum([buf for buf in raw_buffers_len])
reservation_info = communication.send_reserve_operation(
mpi_state.global_comm, data_id, shared_data_len
)
# put service info
self._put_service_info(
service_index, data_id, reservation_info["first_index"]
)
else:
# wait while another worker syncronize shared buffer
while not self._check_service_info(data_id, service_index):
time.sleep(MpiBackoff.get())
# put shared info with updated data_id and service_index
shared_info = common.MetadataPackage.get_shared_info(
data_id, s_data_len, raw_buffers_len, buffer_count, service_index
)
self._put_shared_info(data_id, shared_info)
service_index = reservation_info["service_index"]
# check if worker should sync shared buffer or it is doing by another worker
if reservation_info["is_first_request"]:
# syncronize shared buffer
if owner_rank is None:
raise ValueError(
"The data is not in the host's shared memory and the data must be synchronized, "
+ "but the owner rank is not defined."
)
# increment ref
self._increment_ref_number(data_id, shared_info["service_index"])
self._sync_shared_memory_from_another_host(
mpi_state.global_comm,
data_id,
owner_rank,
reservation_info["first_index"],
reservation_info["last_index"],
service_index,
)
# put service info
self._put_service_info(
service_index, data_id, reservation_info["first_index"]
)
else:
# wait while another worker syncronize shared buffer
while not self._check_service_info(data_id, service_index):
time.sleep(MpiBackoff.get())
# put shared info with updated data_id and service_index
shared_info = common.MetadataPackage.get_shared_info(
data_id, s_data_len, raw_buffers_len, buffer_count, service_index
)
self._put_shared_info(data_id, shared_info)
# increment ref
self._increment_ref_number(data_id, shared_info["service_index"])
# read from shared buffer and deserialized

@@ -810,0 +822,0 @@ return self._read_from_shared_buffer(data_id, shared_info)

@@ -19,2 +19,3 @@ # Copyright (C) 2021-2023 Modin authors

import unidist.core.backends.mpi.core.communication as communication
from unidist.core.backends.mpi.core.object_store import ObjectStore
from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore

@@ -89,2 +90,3 @@ from unidist.core.backends.mpi.core.worker.request_store import RequestStore

task_store = TaskStore.get_instance()
object_store = ObjectStore.get_instance()
local_store = LocalObjectStore.get_instance()

@@ -139,3 +141,2 @@ request_store = RequestStore.get_instance()

)
local_store.put(request["id"], request["data"])

@@ -192,3 +193,3 @@ # Discard data request to another worker, if data has become available

# Actor method here is a data id so we have to retrieve it from the storage
method_name = local_store.get(request["task"])
method_name = object_store.get(request["task"])
handler = request["handler"]

@@ -195,0 +196,0 @@ actor_method = getattr(actor_map[handler], method_name)

@@ -9,4 +9,4 @@ # Copyright (C) 2021-2023 Modin authors

import unidist.core.backends.mpi.core.communication as communication
from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore
from unidist.core.backends.mpi.core.controller.common import push_data
from unidist.core.backends.mpi.core.object_store import ObjectStore

@@ -217,3 +217,4 @@

"""
if LocalObjectStore.get_instance().contains(data_id):
object_store = ObjectStore.get_instance()
if object_store.contains(data_id):
# Executor wait just for signal

@@ -252,4 +253,4 @@ # We use a blocking send here because the receiver is waiting for the result.

"""
local_store = LocalObjectStore.get_instance()
if local_store.contains(data_id):
object_store = ObjectStore.get_instance()
if object_store.contains(data_id):
push_data(

@@ -256,0 +257,0 @@ source_rank,

@@ -14,2 +14,3 @@ # Copyright (C) 2021-2023 Modin authors

from unidist.core.backends.mpi.core.async_operations import AsyncOperations
from unidist.core.backends.mpi.core.object_store import ObjectStore
from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore

@@ -191,4 +192,5 @@ from unidist.core.backends.mpi.core.shared_object_store import SharedObjectStore

local_store = LocalObjectStore.get_instance()
if local_store.contains(arg):
value = LocalObjectStore.get_instance().get(arg)
object_store = ObjectStore.get_instance()
if object_store.contains(arg):
value = object_store.get(arg)
# Data is already local or was pushed from master

@@ -422,4 +424,4 @@ return value, False

"""
object_store = ObjectStore.get_instance()
# Parse request
local_store = LocalObjectStore.get_instance()
task = request["task"]

@@ -429,3 +431,3 @@ # Remote function here is a data id so we have to retrieve it from the storage,

if is_data_id(task):
task = local_store.get(task)
task = object_store.get(task)
args = request["args"]

@@ -432,0 +434,0 @@ kwargs = request["kwargs"]

@@ -8,3 +8,2 @@ # Copyright (C) 2021-2023 Modin authors

import unidist.core.backends.mpi.core as mpi
from unidist.core.backends.mpi.core.local_object_store import LocalObjectStore
from unidist.core.backends.common.data_id import is_data_id

@@ -34,3 +33,2 @@ from unidist.core.backends.common.utils import unwrap_object_refs

self._remote_function = function
self._remote_function_orig = function
self._num_cpus = num_cpus

@@ -80,9 +78,3 @@ self._num_returns = 1 if num_returns is None else num_returns

if not is_data_id(self._remote_function):
self._remote_function = mpi.put(self._remote_function_orig)
else:
# When a worker calls a remote function inside another remote function,
# we have to again serialize the former remote function and put it into the storage
# for further correct communication.
if not LocalObjectStore.get_instance().contains(self._remote_function):
self._remote_function = mpi.put(self._remote_function_orig)
self._remote_function = mpi.put(self._remote_function)

@@ -89,0 +81,0 @@ data_ids = mpi.submit(

@@ -125,1 +125,13 @@ # Copyright (C) 2021-2023 Modin authors

assert_equal(f.remote(data), data)
@pytest.mark.xfail(
Backend.get() == BackendName.PYSEQ,
reason="PUT using PYSEQ does not provide immutable data",
)
def test_data_immutability():
data = [1, 2, 3]
object_ref = unidist.put(data)
data[0] = 111
assert_equal(object_ref, [1, 2, 3])