xorbits
Advanced tools
+1
-1
| Metadata-Version: 2.1 | ||
| Name: xorbits | ||
| Version: 0.8.0 | ||
| Version: 0.8.1 | ||
| Summary: Scalable Python data science, in an API compatible & lightning fast way. | ||
@@ -5,0 +5,0 @@ Home-page: http://github.com/xorbitsai/xorbits |
+3
-0
@@ -111,2 +111,5 @@ # Copyright 2022-2023 XProbe Inc. | ||
| full_fn = os.path.relpath(os.path.join(root, fn), repo_root) | ||
| # TODO: remove this after learn is available | ||
| if "xorbits/_mars/learn/" in full_fn.replace(os.path.sep, "/"): | ||
| continue | ||
| include_dirs, source = ext_include_source_map.get( | ||
@@ -113,0 +116,0 @@ full_fn.replace(os.path.sep, "/"), [[], []] |
| Metadata-Version: 2.1 | ||
| Name: xorbits | ||
| Version: 0.8.0 | ||
| Version: 0.8.1 | ||
| Summary: Scalable Python data science, in an API compatible & lightning fast way. | ||
@@ -5,0 +5,0 @@ Home-page: http://github.com/xorbitsai/xorbits |
@@ -275,8 +275,8 @@ # Copyright 2022-2023 XProbe Inc. | ||
| ) | ||
| raw_df[col] = cudf.core.column.build_categorical_column( | ||
| categories=dictionary, | ||
| codes=codes, | ||
| raw_df[col] = cudf.core.column.CategoricalColumn( | ||
| data=None, | ||
| dtype=cudf.CategoricalDtype(categories=dictionary, ordered=False), | ||
| size=codes.size, | ||
| offset=codes.offset, | ||
| ordered=False, | ||
| children=(codes,), | ||
| ) | ||
@@ -283,0 +283,0 @@ return raw_df |
@@ -17,2 +17,3 @@ # Copyright 2022-2023 XProbe Inc. | ||
| import asyncio | ||
| import copy | ||
| import logging | ||
@@ -412,3 +413,3 @@ import os | ||
| band_to_resource, | ||
| config=self._config, | ||
| config=copy.deepcopy(self._config), | ||
| ) | ||
@@ -415,0 +416,0 @@ |
@@ -18,2 +18,3 @@ # Copyright 2022-2023 XProbe Inc. | ||
| import logging | ||
| import os | ||
| from collections import defaultdict | ||
@@ -70,2 +71,3 @@ from typing import Any, Dict, List, Optional, Union | ||
| self._supervisor_address = None | ||
| self._lock = asyncio.Lock() | ||
@@ -297,2 +299,3 @@ @classmethod | ||
| async def delete(self, session_id: str, data_key: str, error: str = "raise"): | ||
| logger.debug("Delete %s, %s on %s", session_id, data_key, self.address) | ||
| if error not in ("raise", "ignore"): # pragma: no cover | ||
@@ -388,2 +391,3 @@ raise ValueError("error must be raise or ignore") | ||
| @mo.extensible | ||
| @mo.no_lock | ||
| async def open_reader(self, session_id: str, data_key: str) -> StorageFileObject: | ||
@@ -397,2 +401,3 @@ data_info = await self._data_manager_ref.get_data_info( | ||
| @open_reader.batch | ||
| @mo.no_lock | ||
| async def batch_open_readers(self, args_list, kwargs_list): | ||
@@ -530,3 +535,17 @@ get_data_infos = [] | ||
| async def _fetch_via_transfer( | ||
| async def get_receive_manager_ref(self, band_name: str): | ||
| from .transfer import ReceiverManagerActor | ||
| return await mo.actor_ref( | ||
| address=self.address, | ||
| uid=ReceiverManagerActor.gen_uid(band_name), | ||
| ) | ||
| @staticmethod | ||
| async def get_send_manager_ref(address: str, band: str): | ||
| from .transfer import SenderManagerActor | ||
| return await mo.actor_ref(address=address, uid=SenderManagerActor.gen_uid(band)) | ||
| async def fetch_via_transfer( | ||
| self, | ||
@@ -540,18 +559,144 @@ session_id: str, | ||
| ): | ||
| from .transfer import SenderManagerActor | ||
| from .transfer import ReceiverManagerActor, SenderManagerActor | ||
| logger.debug("Begin to fetch %s from band %s", data_keys, remote_band) | ||
| sender_ref: mo.ActorRefType[SenderManagerActor] = await mo.actor_ref( | ||
| address=remote_band[0], uid=SenderManagerActor.gen_uid(remote_band[1]) | ||
| remote_data_manager_ref: mo.ActorRefType[DataManagerActor] = await mo.actor_ref( | ||
| address=remote_band[0], uid=DataManagerActor.default_uid() | ||
| ) | ||
| await sender_ref.send_batch_data( | ||
| session_id, | ||
| logger.debug("Getting actual keys for %s", data_keys) | ||
| tasks = [] | ||
| for key in data_keys: | ||
| tasks.append(remote_data_manager_ref.get_store_key.delay(session_id, key)) | ||
| data_keys = await remote_data_manager_ref.get_store_key.batch(*tasks) | ||
| data_keys = list(set(data_keys)) | ||
| logger.debug("Getting sub infos for %s", data_keys) | ||
| sub_infos = await remote_data_manager_ref.get_sub_infos.batch( | ||
| *[ | ||
| remote_data_manager_ref.get_sub_infos.delay(session_id, key) | ||
| for key in data_keys | ||
| ] | ||
| ) | ||
| get_info_tasks = [] | ||
| pin_tasks = [] | ||
| for data_key in data_keys: | ||
| get_info_tasks.append( | ||
| remote_data_manager_ref.get_data_info.delay( | ||
| session_id, data_key, remote_band[1], error | ||
| ) | ||
| ) | ||
| pin_tasks.append( | ||
| remote_data_manager_ref.pin.delay( | ||
| session_id, data_key, remote_band[1], error | ||
| ) | ||
| ) | ||
| logger.debug("Getting data infos for %s", data_keys) | ||
| infos = await remote_data_manager_ref.get_data_info.batch(*get_info_tasks) | ||
| logger.debug("Pining %s", data_keys) | ||
| await remote_data_manager_ref.pin.batch(*pin_tasks) | ||
| filtered = [ | ||
| (data_info, data_key) | ||
| for data_info, data_key in zip(infos, data_keys) | ||
| if data_info is not None | ||
| ] | ||
| if filtered: | ||
| infos, data_keys = zip(*filtered) | ||
| else: # pragma: no cover | ||
| # no data to be transferred | ||
| return [] | ||
| data_sizes = [info.store_size for info in infos] | ||
| if level is None: | ||
| level = infos[0].level | ||
| receiver_ref: mo.ActorRefType[ | ||
| ReceiverManagerActor | ||
| ] = await self.get_receive_manager_ref(fetch_band_name) | ||
| await self.request_quota_with_spill(level, sum(data_sizes)) | ||
| open_writer_tasks = [] | ||
| for data_key, data_size, sub_info in zip(data_keys, data_sizes, sub_infos): | ||
| open_writer_tasks.append( | ||
| self.open_writer.delay( | ||
| session_id, | ||
| data_key, | ||
| data_size, | ||
| level, | ||
| request_quota=False, | ||
| band_name=fetch_band_name, | ||
| ) | ||
| ) | ||
| # If the current process matches the receiver's process ID, open writers directly | ||
| # through `self.open_writer` to avoid potential deadlocks. | ||
| if os.getpid() == (await receiver_ref.get_pid()): | ||
| writers = await self.open_writer.batch(*open_writer_tasks) | ||
| is_transferring_list = await receiver_ref.add_in_process_writers( | ||
| session_id, data_keys, data_sizes, sub_infos, writers, level | ||
| ) | ||
| # If the current process differs from the receiver's process, initiate writer creation | ||
| # through the receiver_ref. handler. This avoids potential serialization issues when | ||
| # interacting with the NUMA storage handler from another process context. | ||
| else: | ||
| is_transferring_list = await receiver_ref.create_writers( | ||
| session_id, data_keys, data_sizes, level, sub_infos, fetch_band_name | ||
| ) | ||
| to_send_keys = [] | ||
| to_wait_keys = [] | ||
| wait_sizes = [] | ||
| for data_key, is_transferring, _size in zip( | ||
| data_keys, is_transferring_list, data_sizes | ||
| ): | ||
| if is_transferring: | ||
| to_wait_keys.append(data_key) | ||
| wait_sizes.append(_size) | ||
| else: | ||
| to_send_keys.append(data_key) | ||
| # Overapplied the quota for these wait keys, and now need to update the quota | ||
| if to_wait_keys: | ||
| self._quota_refs[level].update_quota(-sum(wait_sizes)) | ||
| logger.debug( | ||
| "Start transferring %s from %s to %s", | ||
| data_keys, | ||
| self._data_manager_ref.address, | ||
| level, | ||
| fetch_band_name, | ||
| error=error, | ||
| remote_band, | ||
| (self.address, fetch_band_name), | ||
| ) | ||
| logger.debug("Finish fetching %s from band %s", data_keys, remote_band) | ||
| sender_ref: mo.ActorRefType[ | ||
| SenderManagerActor | ||
| ] = await self.get_send_manager_ref(remote_band[0], remote_band[1]) | ||
| try: | ||
| await sender_ref.send_batch_data( | ||
| session_id, | ||
| data_keys, | ||
| to_send_keys, | ||
| to_wait_keys, | ||
| (self.address, fetch_band_name), | ||
| ) | ||
| await receiver_ref.handle_transmission_done(session_id, to_send_keys) | ||
| except asyncio.CancelledError: | ||
| keys_to_delete = await receiver_ref.handle_transmission_cancellation( | ||
| session_id, to_send_keys | ||
| ) | ||
| for key in keys_to_delete: | ||
| await self.delete(session_id, key, error="ignore") | ||
| raise | ||
| unpin_tasks = [] | ||
| for data_key in data_keys: | ||
| unpin_tasks.append( | ||
| remote_data_manager_ref.unpin.delay( | ||
| session_id, [data_key], remote_band[1], error="ignore" | ||
| ) | ||
| ) | ||
| await remote_data_manager_ref.unpin.batch(*unpin_tasks) | ||
| async def fetch_batch( | ||
@@ -569,6 +714,4 @@ self, | ||
| meta_api = await self._get_meta_api(session_id) | ||
| remote_keys = defaultdict(set) | ||
| missing_keys = [] | ||
| get_metas = [] | ||
| get_info_delays = [] | ||
@@ -597,2 +740,5 @@ for data_key in data_keys: | ||
| missing_keys.append(data_key) | ||
| await self._data_manager_ref.pin.batch(*pin_delays) | ||
| meta_api = await self._get_meta_api(session_id) | ||
| if address is None or band_name is None: | ||
@@ -611,5 +757,2 @@ # some mapper keys are absent, specify error='ignore' | ||
| ] | ||
| await self._data_manager_ref.pin.batch(*pin_delays) | ||
| if get_metas: | ||
| metas = await meta_api.get_chunk_meta.batch(*get_metas) | ||
@@ -622,2 +765,3 @@ else: # pragma: no cover | ||
| remote_keys[bands["bands"][0]].add(data_key) | ||
| transfer_tasks = [] | ||
@@ -634,3 +778,3 @@ fetch_keys = [] | ||
| transfer_tasks.append( | ||
| self._fetch_via_transfer( | ||
| self.fetch_via_transfer( | ||
| session_id, list(keys), level, band, band_name or band[1], error | ||
@@ -637,0 +781,0 @@ ) |
@@ -18,5 +18,6 @@ # Copyright 2022-2023 XProbe Inc. | ||
| import logging | ||
| import os | ||
| from dataclasses import dataclass | ||
| from io import UnsupportedOperation | ||
| from typing import Any, Dict, List, Union | ||
| from typing import Any, Dict, List, Optional, Union | ||
@@ -28,2 +29,3 @@ import xoscar as mo | ||
| from ...storage import StorageLevel | ||
| from ...typing import BandType | ||
| from ...utils import dataslots | ||
@@ -47,2 +49,3 @@ from .core import DataManagerActor, WrappedStorageFileObject | ||
| ): | ||
| super().__init__() | ||
| self._band_name = band_name | ||
@@ -80,3 +83,2 @@ self._data_manager_ref = data_manager_ref | ||
| await mo.copy_to(local_buffers, remote_buffers, block_size=block_size) | ||
| await receiver_ref.handle_transmission_done(session_id, data_keys) | ||
@@ -151,3 +153,2 @@ @staticmethod | ||
| except asyncio.CancelledError: | ||
| await receiver_ref.handle_transmission_cancellation(session_id, data_keys) | ||
| raise | ||
@@ -160,85 +161,26 @@ | ||
| data_keys: List[str], | ||
| address: str, | ||
| level: StorageLevel, | ||
| band_name: str = "numa-0", | ||
| to_send_keys: List, | ||
| to_wait_keys: List, | ||
| remote_band: BandType, | ||
| block_size: int = None, | ||
| error: str = "raise", | ||
| ): | ||
| logger.debug( | ||
| "Begin to send data (%s, %s) to %s", session_id, data_keys, address | ||
| "Begin to send data (%s, %s) to %s", session_id, data_keys, remote_band | ||
| ) | ||
| tasks = [] | ||
| for key in data_keys: | ||
| tasks.append(self._data_manager_ref.get_store_key.delay(session_id, key)) | ||
| data_keys = await self._data_manager_ref.get_store_key.batch(*tasks) | ||
| data_keys = list(set(data_keys)) | ||
| sub_infos = await self._data_manager_ref.get_sub_infos.batch( | ||
| *[ | ||
| self._data_manager_ref.get_sub_infos.delay(session_id, key) | ||
| for key in data_keys | ||
| ] | ||
| ) | ||
| block_size = block_size or self._transfer_block_size | ||
| receiver_ref: mo.ActorRefType[ | ||
| ReceiverManagerActor | ||
| ] = await self.get_receiver_ref(address, band_name) | ||
| get_infos = [] | ||
| pin_tasks = [] | ||
| for data_key in data_keys: | ||
| get_infos.append( | ||
| self._data_manager_ref.get_data_info.delay( | ||
| session_id, data_key, self._band_name, error | ||
| ) | ||
| ) | ||
| pin_tasks.append( | ||
| self._data_manager_ref.pin.delay( | ||
| session_id, data_key, self._band_name, error | ||
| ) | ||
| ) | ||
| await self._data_manager_ref.pin.batch(*pin_tasks) | ||
| infos = await self._data_manager_ref.get_data_info.batch(*get_infos) | ||
| filtered = [ | ||
| (data_info, data_key) | ||
| for data_info, data_key in zip(infos, data_keys) | ||
| if data_info is not None | ||
| ] | ||
| if filtered: | ||
| infos, data_keys = zip(*filtered) | ||
| else: # pragma: no cover | ||
| # no data to be transferred | ||
| return | ||
| data_sizes = [info.store_size for info in infos] | ||
| if level is None: | ||
| level = infos[0].level | ||
| is_transferring_list = await receiver_ref.open_writers( | ||
| session_id, data_keys, data_sizes, level, sub_infos, band_name | ||
| ) | ||
| to_send_keys = [] | ||
| to_wait_keys = [] | ||
| for data_key, is_transferring in zip(data_keys, is_transferring_list): | ||
| if is_transferring: | ||
| to_wait_keys.append(data_key) | ||
| else: | ||
| to_send_keys.append(data_key) | ||
| ] = await self.get_receiver_ref(remote_band[0], remote_band[1]) | ||
| if to_send_keys: | ||
| logger.debug("Start sending %s to %s", to_send_keys, receiver_ref.address) | ||
| block_size = block_size or self._transfer_block_size | ||
| await self._send_data(receiver_ref, session_id, to_send_keys, block_size) | ||
| if to_wait_keys: | ||
| await receiver_ref.wait_transfer_done(session_id, to_wait_keys) | ||
| unpin_tasks = [] | ||
| for data_key in data_keys: | ||
| unpin_tasks.append( | ||
| self._data_manager_ref.unpin.delay( | ||
| session_id, [data_key], self._band_name, error="ignore" | ||
| ) | ||
| ) | ||
| await self._data_manager_ref.unpin.batch(*unpin_tasks) | ||
| logger.debug( | ||
| "Finish sending data (%s, %s) to %s, total size is %s", | ||
| "Finish sending data (%s, %s) to %s", | ||
| session_id, | ||
| data_keys, | ||
| address, | ||
| sum(data_sizes), | ||
| remote_band, | ||
| ) | ||
@@ -268,8 +210,2 @@ | ||
| async def __post_create__(self): | ||
| if self._storage_handler is None: # for test | ||
| self._storage_handler = await mo.actor_ref( | ||
| self.address, StorageHandlerActor.gen_uid("numa-0") | ||
| ) | ||
| async def get_buffers( | ||
@@ -316,2 +252,3 @@ self, | ||
| async def handle_transmission_cancellation(self, session_id: str, data_keys: List): | ||
| data_keys_to_be_deleted = [] | ||
| async with self._lock: | ||
@@ -323,8 +260,7 @@ for data_key in data_keys: | ||
| await self._quota_refs[info.level].release_quota(info.size) | ||
| await self._storage_handler.delete( | ||
| session_id, data_key, error="ignore" | ||
| ) | ||
| data_keys_to_be_deleted.append(data_key) | ||
| await info.writer.clean_up() | ||
| info.event.set() | ||
| self._decref_writing_key(session_id, data_key) | ||
| return data_keys_to_be_deleted | ||
@@ -340,2 +276,10 @@ @classmethod | ||
| async def wait_transfer_done(self, session_id, data_keys): | ||
| await asyncio.gather( | ||
| *[self._writing_infos[(session_id, key)].event.wait() for key in data_keys] | ||
| ) | ||
| async with self._lock: | ||
| for data_key in data_keys: | ||
| self._decref_writing_key(session_id, data_key) | ||
| async def create_writers( | ||
@@ -405,8 +349,34 @@ self, | ||
| async def wait_transfer_done(self, session_id, data_keys): | ||
| await asyncio.gather( | ||
| *[self._writing_infos[(session_id, key)].event.wait() for key in data_keys] | ||
| ) | ||
| async def add_in_process_writers( | ||
| self, | ||
| session_id: str, | ||
| data_keys: List[Union[str, tuple]], | ||
| data_sizes: List[int], | ||
| sub_infos: List, | ||
| writers: List[Optional[WrappedStorageFileObject]], | ||
| level: StorageLevel, | ||
| ) -> List[bool]: | ||
| """ | ||
| This method is invoked only when the caller process matches the receiver's process. | ||
| To prevent deadlocks, the `writers` are opened directly within the caller's storage | ||
| handler before being passed to this function. | ||
| """ | ||
| is_transferring: List[bool] = [] | ||
| async with self._lock: | ||
| for data_key in data_keys: | ||
| self._decref_writing_key(session_id, data_key) | ||
| for data_key, data_size, sub_info, writer in zip( | ||
| data_keys, data_sizes, sub_infos, writers | ||
| ): | ||
| if (session_id, data_key) not in self._writing_infos: | ||
| is_transferring.append(False) | ||
| self._writing_infos[(session_id, data_key)] = WritingInfo( | ||
| writer, data_size, level, asyncio.Event(), 1 | ||
| ) | ||
| if sub_info is not None: | ||
| writer._sub_key_infos = sub_info | ||
| else: | ||
| is_transferring.append(True) | ||
| return is_transferring | ||
| def get_pid(self): | ||
| return os.getpid() |
@@ -11,7 +11,7 @@ | ||
| { | ||
| "date": "2024-11-05T14:00:38+0800", | ||
| "date": "2024-12-05T19:59:56+0800", | ||
| "dirty": false, | ||
| "error": null, | ||
| "full-revisionid": "5bb0211b5656c6d6d38d3ab5ee2452904f9ffede", | ||
| "version": "0.8.0" | ||
| "full-revisionid": "ed16df7f03c2745dade73431ae597426f8bdd36c", | ||
| "version": "0.8.1" | ||
| } | ||
@@ -18,0 +18,0 @@ ''' # END VERSION_JSON |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
8665090
0.05%181049
0.05%