New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

xorbits

Package Overview
Dependencies
Maintainers
1
Versions
31
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

xorbits - pypi Package Compare versions

Comparing version
0.8.0
to
0.8.1
+1
-1
PKG-INFO
Metadata-Version: 2.1
Name: xorbits
Version: 0.8.0
Version: 0.8.1
Summary: Scalable Python data science, in an API compatible & lightning fast way.

@@ -5,0 +5,0 @@ Home-page: http://github.com/xorbitsai/xorbits

@@ -111,2 +111,5 @@ # Copyright 2022-2023 XProbe Inc.

full_fn = os.path.relpath(os.path.join(root, fn), repo_root)
# TODO: remove this after learn is available
if "xorbits/_mars/learn/" in full_fn.replace(os.path.sep, "/"):
continue
include_dirs, source = ext_include_source_map.get(

@@ -113,0 +116,0 @@ full_fn.replace(os.path.sep, "/"), [[], []]

Metadata-Version: 2.1
Name: xorbits
Version: 0.8.0
Version: 0.8.1
Summary: Scalable Python data science, in an API compatible & lightning fast way.

@@ -5,0 +5,0 @@ Home-page: http://github.com/xorbitsai/xorbits

@@ -275,8 +275,8 @@ # Copyright 2022-2023 XProbe Inc.

)
raw_df[col] = cudf.core.column.build_categorical_column(
categories=dictionary,
codes=codes,
raw_df[col] = cudf.core.column.CategoricalColumn(
data=None,
dtype=cudf.CategoricalDtype(categories=dictionary, ordered=False),
size=codes.size,
offset=codes.offset,
ordered=False,
children=(codes,),
)

@@ -283,0 +283,0 @@ return raw_df

@@ -17,2 +17,3 @@ # Copyright 2022-2023 XProbe Inc.

import asyncio
import copy
import logging

@@ -412,3 +413,3 @@ import os

band_to_resource,
config=self._config,
config=copy.deepcopy(self._config),
)

@@ -415,0 +416,0 @@

@@ -18,2 +18,3 @@ # Copyright 2022-2023 XProbe Inc.

import logging
import os
from collections import defaultdict

@@ -70,2 +71,3 @@ from typing import Any, Dict, List, Optional, Union

self._supervisor_address = None
self._lock = asyncio.Lock()

@@ -297,2 +299,3 @@ @classmethod

async def delete(self, session_id: str, data_key: str, error: str = "raise"):
logger.debug("Delete %s, %s on %s", session_id, data_key, self.address)
if error not in ("raise", "ignore"): # pragma: no cover

@@ -388,2 +391,3 @@ raise ValueError("error must be raise or ignore")

@mo.extensible
@mo.no_lock
async def open_reader(self, session_id: str, data_key: str) -> StorageFileObject:

@@ -397,2 +401,3 @@ data_info = await self._data_manager_ref.get_data_info(

@open_reader.batch
@mo.no_lock
async def batch_open_readers(self, args_list, kwargs_list):

@@ -530,3 +535,17 @@ get_data_infos = []

async def _fetch_via_transfer(
async def get_receive_manager_ref(self, band_name: str):
from .transfer import ReceiverManagerActor
return await mo.actor_ref(
address=self.address,
uid=ReceiverManagerActor.gen_uid(band_name),
)
@staticmethod
async def get_send_manager_ref(address: str, band: str):
from .transfer import SenderManagerActor
return await mo.actor_ref(address=address, uid=SenderManagerActor.gen_uid(band))
async def fetch_via_transfer(
self,

@@ -540,18 +559,144 @@ session_id: str,

):
from .transfer import SenderManagerActor
from .transfer import ReceiverManagerActor, SenderManagerActor
logger.debug("Begin to fetch %s from band %s", data_keys, remote_band)
sender_ref: mo.ActorRefType[SenderManagerActor] = await mo.actor_ref(
address=remote_band[0], uid=SenderManagerActor.gen_uid(remote_band[1])
remote_data_manager_ref: mo.ActorRefType[DataManagerActor] = await mo.actor_ref(
address=remote_band[0], uid=DataManagerActor.default_uid()
)
await sender_ref.send_batch_data(
session_id,
logger.debug("Getting actual keys for %s", data_keys)
tasks = []
for key in data_keys:
tasks.append(remote_data_manager_ref.get_store_key.delay(session_id, key))
data_keys = await remote_data_manager_ref.get_store_key.batch(*tasks)
data_keys = list(set(data_keys))
logger.debug("Getting sub infos for %s", data_keys)
sub_infos = await remote_data_manager_ref.get_sub_infos.batch(
*[
remote_data_manager_ref.get_sub_infos.delay(session_id, key)
for key in data_keys
]
)
get_info_tasks = []
pin_tasks = []
for data_key in data_keys:
get_info_tasks.append(
remote_data_manager_ref.get_data_info.delay(
session_id, data_key, remote_band[1], error
)
)
pin_tasks.append(
remote_data_manager_ref.pin.delay(
session_id, data_key, remote_band[1], error
)
)
logger.debug("Getting data infos for %s", data_keys)
infos = await remote_data_manager_ref.get_data_info.batch(*get_info_tasks)
logger.debug("Pining %s", data_keys)
await remote_data_manager_ref.pin.batch(*pin_tasks)
filtered = [
(data_info, data_key)
for data_info, data_key in zip(infos, data_keys)
if data_info is not None
]
if filtered:
infos, data_keys = zip(*filtered)
else: # pragma: no cover
# no data to be transferred
return []
data_sizes = [info.store_size for info in infos]
if level is None:
level = infos[0].level
receiver_ref: mo.ActorRefType[
ReceiverManagerActor
] = await self.get_receive_manager_ref(fetch_band_name)
await self.request_quota_with_spill(level, sum(data_sizes))
open_writer_tasks = []
for data_key, data_size, sub_info in zip(data_keys, data_sizes, sub_infos):
open_writer_tasks.append(
self.open_writer.delay(
session_id,
data_key,
data_size,
level,
request_quota=False,
band_name=fetch_band_name,
)
)
# If the current process matches the receiver's process ID, open writers directly
# through `self.open_writer` to avoid potential deadlocks.
if os.getpid() == (await receiver_ref.get_pid()):
writers = await self.open_writer.batch(*open_writer_tasks)
is_transferring_list = await receiver_ref.add_in_process_writers(
session_id, data_keys, data_sizes, sub_infos, writers, level
)
# If the current process differs from the receiver's process, initiate writer creation
# through the receiver_ref. handler. This avoids potential serialization issues when
# interacting with the NUMA storage handler from another process context.
else:
is_transferring_list = await receiver_ref.create_writers(
session_id, data_keys, data_sizes, level, sub_infos, fetch_band_name
)
to_send_keys = []
to_wait_keys = []
wait_sizes = []
for data_key, is_transferring, _size in zip(
data_keys, is_transferring_list, data_sizes
):
if is_transferring:
to_wait_keys.append(data_key)
wait_sizes.append(_size)
else:
to_send_keys.append(data_key)
# Overapplied the quota for these wait keys, and now need to update the quota
if to_wait_keys:
self._quota_refs[level].update_quota(-sum(wait_sizes))
logger.debug(
"Start transferring %s from %s to %s",
data_keys,
self._data_manager_ref.address,
level,
fetch_band_name,
error=error,
remote_band,
(self.address, fetch_band_name),
)
logger.debug("Finish fetching %s from band %s", data_keys, remote_band)
sender_ref: mo.ActorRefType[
SenderManagerActor
] = await self.get_send_manager_ref(remote_band[0], remote_band[1])
try:
await sender_ref.send_batch_data(
session_id,
data_keys,
to_send_keys,
to_wait_keys,
(self.address, fetch_band_name),
)
await receiver_ref.handle_transmission_done(session_id, to_send_keys)
except asyncio.CancelledError:
keys_to_delete = await receiver_ref.handle_transmission_cancellation(
session_id, to_send_keys
)
for key in keys_to_delete:
await self.delete(session_id, key, error="ignore")
raise
unpin_tasks = []
for data_key in data_keys:
unpin_tasks.append(
remote_data_manager_ref.unpin.delay(
session_id, [data_key], remote_band[1], error="ignore"
)
)
await remote_data_manager_ref.unpin.batch(*unpin_tasks)
async def fetch_batch(

@@ -569,6 +714,4 @@ self,

meta_api = await self._get_meta_api(session_id)
remote_keys = defaultdict(set)
missing_keys = []
get_metas = []
get_info_delays = []

@@ -597,2 +740,5 @@ for data_key in data_keys:

missing_keys.append(data_key)
await self._data_manager_ref.pin.batch(*pin_delays)
meta_api = await self._get_meta_api(session_id)
if address is None or band_name is None:

@@ -611,5 +757,2 @@ # some mapper keys are absent, specify error='ignore'

]
await self._data_manager_ref.pin.batch(*pin_delays)
if get_metas:
metas = await meta_api.get_chunk_meta.batch(*get_metas)

@@ -622,2 +765,3 @@ else: # pragma: no cover

remote_keys[bands["bands"][0]].add(data_key)
transfer_tasks = []

@@ -634,3 +778,3 @@ fetch_keys = []

transfer_tasks.append(
self._fetch_via_transfer(
self.fetch_via_transfer(
session_id, list(keys), level, band, band_name or band[1], error

@@ -637,0 +781,0 @@ )

@@ -18,5 +18,6 @@ # Copyright 2022-2023 XProbe Inc.

import logging
import os
from dataclasses import dataclass
from io import UnsupportedOperation
from typing import Any, Dict, List, Union
from typing import Any, Dict, List, Optional, Union

@@ -28,2 +29,3 @@ import xoscar as mo

from ...storage import StorageLevel
from ...typing import BandType
from ...utils import dataslots

@@ -47,2 +49,3 @@ from .core import DataManagerActor, WrappedStorageFileObject

):
super().__init__()
self._band_name = band_name

@@ -80,3 +83,2 @@ self._data_manager_ref = data_manager_ref

await mo.copy_to(local_buffers, remote_buffers, block_size=block_size)
await receiver_ref.handle_transmission_done(session_id, data_keys)

@@ -151,3 +153,2 @@ @staticmethod

except asyncio.CancelledError:
await receiver_ref.handle_transmission_cancellation(session_id, data_keys)
raise

@@ -160,85 +161,26 @@

data_keys: List[str],
address: str,
level: StorageLevel,
band_name: str = "numa-0",
to_send_keys: List,
to_wait_keys: List,
remote_band: BandType,
block_size: int = None,
error: str = "raise",
):
logger.debug(
"Begin to send data (%s, %s) to %s", session_id, data_keys, address
"Begin to send data (%s, %s) to %s", session_id, data_keys, remote_band
)
tasks = []
for key in data_keys:
tasks.append(self._data_manager_ref.get_store_key.delay(session_id, key))
data_keys = await self._data_manager_ref.get_store_key.batch(*tasks)
data_keys = list(set(data_keys))
sub_infos = await self._data_manager_ref.get_sub_infos.batch(
*[
self._data_manager_ref.get_sub_infos.delay(session_id, key)
for key in data_keys
]
)
block_size = block_size or self._transfer_block_size
receiver_ref: mo.ActorRefType[
ReceiverManagerActor
] = await self.get_receiver_ref(address, band_name)
get_infos = []
pin_tasks = []
for data_key in data_keys:
get_infos.append(
self._data_manager_ref.get_data_info.delay(
session_id, data_key, self._band_name, error
)
)
pin_tasks.append(
self._data_manager_ref.pin.delay(
session_id, data_key, self._band_name, error
)
)
await self._data_manager_ref.pin.batch(*pin_tasks)
infos = await self._data_manager_ref.get_data_info.batch(*get_infos)
filtered = [
(data_info, data_key)
for data_info, data_key in zip(infos, data_keys)
if data_info is not None
]
if filtered:
infos, data_keys = zip(*filtered)
else: # pragma: no cover
# no data to be transferred
return
data_sizes = [info.store_size for info in infos]
if level is None:
level = infos[0].level
is_transferring_list = await receiver_ref.open_writers(
session_id, data_keys, data_sizes, level, sub_infos, band_name
)
to_send_keys = []
to_wait_keys = []
for data_key, is_transferring in zip(data_keys, is_transferring_list):
if is_transferring:
to_wait_keys.append(data_key)
else:
to_send_keys.append(data_key)
] = await self.get_receiver_ref(remote_band[0], remote_band[1])
if to_send_keys:
logger.debug("Start sending %s to %s", to_send_keys, receiver_ref.address)
block_size = block_size or self._transfer_block_size
await self._send_data(receiver_ref, session_id, to_send_keys, block_size)
if to_wait_keys:
await receiver_ref.wait_transfer_done(session_id, to_wait_keys)
unpin_tasks = []
for data_key in data_keys:
unpin_tasks.append(
self._data_manager_ref.unpin.delay(
session_id, [data_key], self._band_name, error="ignore"
)
)
await self._data_manager_ref.unpin.batch(*unpin_tasks)
logger.debug(
"Finish sending data (%s, %s) to %s, total size is %s",
"Finish sending data (%s, %s) to %s",
session_id,
data_keys,
address,
sum(data_sizes),
remote_band,
)

@@ -268,8 +210,2 @@

async def __post_create__(self):
if self._storage_handler is None: # for test
self._storage_handler = await mo.actor_ref(
self.address, StorageHandlerActor.gen_uid("numa-0")
)
async def get_buffers(

@@ -316,2 +252,3 @@ self,

async def handle_transmission_cancellation(self, session_id: str, data_keys: List):
data_keys_to_be_deleted = []
async with self._lock:

@@ -323,8 +260,7 @@ for data_key in data_keys:

await self._quota_refs[info.level].release_quota(info.size)
await self._storage_handler.delete(
session_id, data_key, error="ignore"
)
data_keys_to_be_deleted.append(data_key)
await info.writer.clean_up()
info.event.set()
self._decref_writing_key(session_id, data_key)
return data_keys_to_be_deleted

@@ -340,2 +276,10 @@ @classmethod

async def wait_transfer_done(self, session_id, data_keys):
await asyncio.gather(
*[self._writing_infos[(session_id, key)].event.wait() for key in data_keys]
)
async with self._lock:
for data_key in data_keys:
self._decref_writing_key(session_id, data_key)
async def create_writers(

@@ -405,8 +349,34 @@ self,

async def wait_transfer_done(self, session_id, data_keys):
await asyncio.gather(
*[self._writing_infos[(session_id, key)].event.wait() for key in data_keys]
)
async def add_in_process_writers(
self,
session_id: str,
data_keys: List[Union[str, tuple]],
data_sizes: List[int],
sub_infos: List,
writers: List[Optional[WrappedStorageFileObject]],
level: StorageLevel,
) -> List[bool]:
"""
This method is invoked only when the caller process matches the receiver's process.
To prevent deadlocks, the `writers` are opened directly within the caller's storage
handler before being passed to this function.
"""
is_transferring: List[bool] = []
async with self._lock:
for data_key in data_keys:
self._decref_writing_key(session_id, data_key)
for data_key, data_size, sub_info, writer in zip(
data_keys, data_sizes, sub_infos, writers
):
if (session_id, data_key) not in self._writing_infos:
is_transferring.append(False)
self._writing_infos[(session_id, data_key)] = WritingInfo(
writer, data_size, level, asyncio.Event(), 1
)
if sub_info is not None:
writer._sub_key_infos = sub_info
else:
is_transferring.append(True)
return is_transferring
def get_pid(self):
return os.getpid()

@@ -11,7 +11,7 @@

{
"date": "2024-11-05T14:00:38+0800",
"date": "2024-12-05T19:59:56+0800",
"dirty": false,
"error": null,
"full-revisionid": "5bb0211b5656c6d6d38d3ab5ee2452904f9ffede",
"version": "0.8.0"
"full-revisionid": "ed16df7f03c2745dade73431ae597426f8bdd36c",
"version": "0.8.1"
}

@@ -18,0 +18,0 @@ ''' # END VERSION_JSON