fused
Advanced tools
| """Index H3-indexed datasets for serverless queries. | ||
| This module provides functions to create metadata parquet files that enable | ||
| fast H3 range queries without requiring a server connection. | ||
| """ | ||
| def persist_hex_table_metadata( | ||
| dataset_path: str, | ||
| output_path: str | None = None, | ||
| metadata_path: str | None = None, | ||
| verbose: bool = False, | ||
| row_group_size: int = 1, | ||
| indexed_columns: list[str] | None = None, | ||
| pool_size: int = 10, | ||
| ) -> str: | ||
| """Persist metadata for a hex table to enable serverless queries. | ||
| Scans all parquet files in the dataset, extracts metadata needed for | ||
| fast reconstruction, and writes one .metadata.parquet file per source file. | ||
| Each metadata file contains: | ||
| - Row group metadata (offsets, H3 ranges, etc.) | ||
| - Full metadata_json stored as custom metadata in the parquet schema | ||
| Requires a 'hex' column (or variant like h3, h3_index) in all files. | ||
| Raises ValueError if no hex column is found. | ||
| Args: | ||
| dataset_path: Path to the dataset directory (e.g., "s3://bucket/dataset/") | ||
| output_path: Deprecated - use metadata_path instead | ||
| metadata_path: Directory path where metadata files should be written. | ||
| If None, writes to {source_dir}/.fused/{source_filename}.metadata.parquet | ||
| for each source file. If provided, writes to | ||
| {metadata_path}/{full_source_path}.metadata.parquet using the full | ||
| source path as the filename. This allows storing metadata in a | ||
| different location when you don't have write access to the dataset directory. | ||
| verbose: If True, print timing/progress information. | ||
| row_group_size: Number of rows per row group in output parquet file. | ||
| Default is 1 (one row per row group). Larger values | ||
| reduce file size but may increase memory usage during writes. | ||
| indexed_columns: List of column names to index. If None (default), | ||
| indexes only the first identified indexed column | ||
| (typically the hex column). If an empty list, indexes | ||
| all detected indexed columns. | ||
| pool_size: Number of parallel workers to use for processing files. | ||
| Default is 10 (parallel processing enabled by default). | ||
| Set to 1 for sequential processing. This can significantly speed up | ||
| metadata extraction for large datasets with many files. | ||
| Returns: | ||
| Path to metadata directory (if metadata_path provided) or first metadata file path | ||
| Raises: | ||
| ImportError: If job2 package is not available | ||
| ValueError: If no hex column is found in any file | ||
| FileNotFoundError: If no parquet files are found in the dataset | ||
| Example: | ||
| >>> fused.h3.persist_hex_table_metadata("s3://my-bucket/my-dataset/") | ||
| 's3://my-bucket/my-dataset/.fused/file1.metadata.parquet' | ||
| >>> fused.h3.persist_hex_table_metadata("s3://my-bucket/my-dataset/", metadata_path="s3://my-bucket/metadata/") | ||
| 's3://my-bucket/metadata/' | ||
| >>> fused.h3.persist_hex_table_metadata("s3://my-bucket/my-dataset/", pool_size=4) | ||
| 's3://my-bucket/my-dataset/.fused/file1.metadata.parquet' | ||
| """ | ||
| try: | ||
| from job2.fasttortoise._h3_index import persist_hex_table_metadata_impl | ||
| except ImportError as e: | ||
| raise RuntimeError( | ||
| "The H3 index functionality requires the job2 module. " | ||
| "This function is only available in the Fused execution environment." | ||
| ) from e | ||
| return persist_hex_table_metadata_impl( | ||
| dataset_path=dataset_path, | ||
| metadata_path=metadata_path, | ||
| verbose=verbose, | ||
| row_group_size=row_group_size, | ||
| indexed_columns=indexed_columns, | ||
| pool_size=pool_size, | ||
| ) | ||
| __all__ = [ | ||
| "persist_hex_table_metadata", | ||
| ] |
+1
-0
@@ -169,1 +169,2 @@ cdk.out | ||
| client/public/fasttortoise/FastTortoiseWasm.wasm | ||
| client/public/fasttortoise/fasttortoise-api.js |
@@ -8,3 +8,8 @@ """Fast Tortoise - Efficient parquet row group reconstruction from metadata.""" | ||
| # Keep the API helpers in fused-py since they're just API clients | ||
| from ._api import find_dataset, get_row_groups_for_dataset, register_dataset | ||
| from ._api import ( | ||
| find_dataset, | ||
| get_row_groups_for_dataset, | ||
| get_row_groups_for_dataset_with_metadata, | ||
| register_dataset, | ||
| ) | ||
@@ -235,78 +240,22 @@ | ||
| RuntimeError: If the batch request fails or metadata is still being processed | ||
| This function imports the implementation from job2 at runtime. | ||
| """ | ||
| metadata_url = _get_metadata_url(base_url) | ||
| batch_url = f"{metadata_url}/batch" | ||
| try: | ||
| from job2.fasttortoise._reconstruction import ( | ||
| _parse_row_group_from_metadata, | ||
| _unwrap_api_metadata, | ||
| async_fetch_row_group_metadata_batch as _async_fetch_row_group_metadata_batch, | ||
| ) | ||
| return await _async_fetch_row_group_metadata_batch( | ||
| requests=requests, | ||
| base_url=base_url, | ||
| session=session, | ||
| ) | ||
| except ImportError as e: | ||
| raise RuntimeError( | ||
| "The fasttortoise reconstruction functionality requires the job2 module." | ||
| "The fasttortoise reconstruction functionality requires the job2 module. " | ||
| "This is typically only available in the Fused execution environment." | ||
| ) from e | ||
| import aiohttp | ||
| from fused import context | ||
| from fused.core._realtime_ops import _realtime_raise_for_status_async | ||
| auth_headers = context._get_auth_header(missing_ok=True) | ||
| if session is None: | ||
| from fused.core._realtime_ops import _get_shared_session | ||
| session = await _get_shared_session() | ||
| async with session.post( | ||
| url=batch_url, | ||
| json=requests, | ||
| headers=auth_headers, | ||
| timeout=aiohttp.ClientTimeout(total=OPTIONS.metadata_request_timeout), | ||
| ) as response: | ||
| await _realtime_raise_for_status_async(response) | ||
| if response.status == 202: | ||
| raise RuntimeError("Metadata is still being processed by the server") | ||
| batch_response = await response.json() | ||
| results = batch_response.get("results", []) | ||
| # Process results and convert to same format as individual calls | ||
| processed_results = [] | ||
| for i, result in enumerate(results): | ||
| if "error" in result: | ||
| # Propagate error | ||
| error = result["error"] | ||
| raise RuntimeError( | ||
| f"Metadata fetch failed for row group {requests[i]['row_group_index']} " | ||
| f"in {requests[i]['path']}: {error.get('detail', 'Unknown error')}" | ||
| ) | ||
| # Unwrap and parse metadata (same as individual call) | ||
| api_response = { | ||
| "metadata": result.get("metadata", ""), | ||
| "row_group_bytes": result.get("row_group_bytes", ""), | ||
| } | ||
| metadata = _unwrap_api_metadata(api_response) | ||
| target_row_group, num_rows, start_offset, end_offset = ( | ||
| _parse_row_group_from_metadata(metadata) | ||
| ) | ||
| processed_results.append( | ||
| { | ||
| "path": requests[i]["path"], | ||
| "row_group_index": requests[i]["row_group_index"], | ||
| "start_offset": start_offset, | ||
| "end_offset": end_offset, | ||
| "size": end_offset - start_offset, | ||
| "api_metadata": metadata, | ||
| } | ||
| ) | ||
| return processed_results | ||
| async def async_read_combined_row_groups( | ||
@@ -333,14 +282,16 @@ path: str, | ||
| List of PyArrow Tables, one per row group in the same order as input | ||
| This function imports the implementation from job2 at runtime. | ||
| """ | ||
| import pyarrow.parquet as pq | ||
| if not row_group_metadata_list: | ||
| return [] | ||
| try: | ||
| from job2.fasttortoise._reconstruction import ( | ||
| _async_read_file_range, | ||
| _build_parquet_buffer_from_data, | ||
| _parse_row_group_from_metadata, | ||
| async_read_combined_row_groups as _async_read_combined_row_groups, | ||
| ) | ||
| return await _async_read_combined_row_groups( | ||
| path=path, | ||
| row_group_metadata_list=row_group_metadata_list, | ||
| base_url=base_url, | ||
| columns=columns, | ||
| ) | ||
| except ImportError as e: | ||
@@ -352,46 +303,3 @@ raise RuntimeError( | ||
| # Calculate the combined byte range | ||
| start_offset = min(rg["start_offset"] for rg in row_group_metadata_list) | ||
| end_offset = max(rg["end_offset"] for rg in row_group_metadata_list) | ||
| data_size = end_offset - start_offset | ||
| # Download the combined byte range | ||
| combined_data = await _async_read_file_range(path, start_offset, data_size) | ||
| # Reconstruct each row group from the combined data | ||
| tables = [] | ||
| for rg_meta in row_group_metadata_list: | ||
| metadata = rg_meta["api_metadata"] | ||
| # Parse the row group metadata again to get the thrift object | ||
| target_row_group, num_rows, rg_start, rg_end = _parse_row_group_from_metadata( | ||
| metadata | ||
| ) | ||
| # Extract this row group's data from the combined buffer | ||
| # The offset is relative to our combined download start | ||
| relative_start = rg_start - start_offset | ||
| relative_end = rg_end - start_offset | ||
| row_group_data = combined_data[relative_start:relative_end] | ||
| # Build the minimal parquet file | ||
| parquet_buffer = _build_parquet_buffer_from_data( | ||
| row_group_data, | ||
| target_row_group, | ||
| metadata, | ||
| rg_start, | ||
| num_rows, | ||
| ) | ||
| try: | ||
| # Parse with PyArrow | ||
| parquet_file = pq.ParquetFile(parquet_buffer) | ||
| table = parquet_file.read_row_group(0, columns=columns) | ||
| tables.append(table) | ||
| finally: | ||
| parquet_buffer.close() | ||
| return tables | ||
| __all__ = [ | ||
@@ -398,0 +306,0 @@ "async_fetch_row_group_metadata", |
@@ -272,63 +272,22 @@ """API client helpers for dataset queries.""" | ||
| # ] | ||
| This function imports the implementation from job2 at runtime. | ||
| """ | ||
| # Use current environment's base URL if not specified | ||
| if base_url is None: | ||
| base_url = OPTIONS.base_url | ||
| try: | ||
| from job2.fasttortoise._api import ( | ||
| get_row_groups_for_dataset_with_metadata as _get_row_groups_for_dataset_with_metadata, | ||
| ) | ||
| # Build request body for POST | ||
| body: Dict[str, Any] = { | ||
| "dataset_path": dataset_path, | ||
| "geographical_regions": geographical_regions, | ||
| "include_signed_urls": include_signed_urls, | ||
| "include_full_metadata": include_full_metadata, | ||
| } | ||
| if h3_resolution is not None: | ||
| body["h3_resolution"] = h3_resolution | ||
| # Make API request with auth headers from execution context | ||
| url = f"{base_url}/datasets/items-with-metadata" | ||
| with session_with_retries() as session: | ||
| response = session.post( | ||
| url, | ||
| json=body, | ||
| headers=context._get_auth_header(missing_ok=True), | ||
| return _get_row_groups_for_dataset_with_metadata( | ||
| dataset_path=dataset_path, | ||
| geographical_regions=geographical_regions, | ||
| h3_resolution=h3_resolution, | ||
| base_url=base_url, | ||
| include_signed_urls=include_signed_urls, | ||
| include_full_metadata=include_full_metadata, | ||
| ) | ||
| response.raise_for_status() | ||
| # Parse response | ||
| data = response.json() | ||
| items = data.get("items", []) | ||
| # Flatten into list of {path, row_group_index, start_offset, end_offset, ...} dicts | ||
| result = [] | ||
| for item in items: | ||
| relative_path = item.get("relative_path", "") | ||
| row_groups = item.get("row_groups", []) | ||
| signed_url = item.get("signed_url") | ||
| # Construct full path - handle empty relative_path to avoid trailing slash | ||
| if relative_path: | ||
| full_path = dataset_path.rstrip("/") + "/" + relative_path.lstrip("/") | ||
| else: | ||
| full_path = dataset_path.rstrip("/") | ||
| # Add each row group with its metadata | ||
| for rg in row_groups: | ||
| row_group_dict = { | ||
| "path": full_path, | ||
| "row_group_index": rg.get("row_group_index"), | ||
| "start_offset": rg.get("start_offset"), | ||
| "end_offset": rg.get("end_offset"), | ||
| } | ||
| # Include signed URL if available | ||
| if signed_url: | ||
| row_group_dict["signed_url"] = signed_url | ||
| # Include full metadata if present | ||
| if "metadata_json" in rg: | ||
| row_group_dict["metadata_json"] = rg.get("metadata_json") | ||
| if "row_group_bytes" in rg: | ||
| row_group_dict["row_group_bytes"] = rg.get("row_group_bytes") | ||
| result.append(row_group_dict) | ||
| return result | ||
| except ImportError as e: | ||
| raise RuntimeError( | ||
| "The get_row_groups_for_dataset_with_metadata function requires the job2 module. " | ||
| "This function is only available in the Fused execution environment." | ||
| ) from e |
@@ -0,9 +1,16 @@ | ||
| from .index import persist_hex_table_metadata | ||
| from .ingest import run_ingest_raster_to_h3, run_partition_to_h3 | ||
| from .read import read_hex_table, read_hex_table_slow | ||
| from .read import ( | ||
| read_hex_table, | ||
| read_hex_table_slow, | ||
| read_hex_table_with_persisted_metadata, | ||
| ) | ||
| __all__ = [ | ||
| "persist_hex_table_metadata", | ||
| "read_hex_table", | ||
| "read_hex_table_slow", | ||
| "read_hex_table_with_persisted_metadata", | ||
| "run_ingest_raster_to_h3", | ||
| "run_partition_to_h3", | ||
| ] |
+252
-273
@@ -0,3 +1,6 @@ | ||
| import datetime | ||
| import uuid | ||
| import numpy as np | ||
| import fused | ||
@@ -19,2 +22,3 @@ from fused.core._fs_utils import is_non_empty_dir | ||
| chunk_name: str | None = None, | ||
| src_path_id: int = 0, | ||
| ): | ||
@@ -35,7 +39,9 @@ # define UDF that imports the helper function inside the UDF | ||
| chunk_name=chunk_name, | ||
| src_path_id=src_path_id, | ||
| ) | ||
| def run_extract( | ||
| submit_params: list[dict], | ||
| def run_extract_step( | ||
| src_files: list[str], | ||
| target_chunk_size: int | None, | ||
| tmp_path: str, | ||
@@ -46,5 +52,58 @@ res: int = 11, | ||
| file_res: int = 2, | ||
| **kwargs, | ||
| debug_mode: bool = False, | ||
| **submit_kwargs, | ||
| ): | ||
| params = { | ||
| import rasterio | ||
| if target_chunk_size is None: | ||
| if len(src_files) > 20: | ||
| print( | ||
| "-- processing each file as a single chunk by default (specify " | ||
| "'target_chunk_size' to override this)" | ||
| ) | ||
| target_chunk_size = 0 | ||
| else: | ||
| target_chunk_size = 10_000_000 | ||
| if target_chunk_size > 0: | ||
| # determine number of chunks based on target chunk size | ||
| x_chunks = [] | ||
| y_chunks = [] | ||
| for src_file in src_files: | ||
| with rasterio.open(src_file) as src: | ||
| meta = src.meta | ||
| x_chunks.append(max(round(meta["width"] / np.sqrt(target_chunk_size)), 1)) | ||
| y_chunks.append(max(round(meta["height"] / np.sqrt(target_chunk_size)), 1)) | ||
| else: | ||
| # allow to process each file as a single chunk | ||
| x_chunks = [1] * len(src_files) | ||
| y_chunks = [1] * len(src_files) | ||
| n_chunks_per_file = [x * y for x, y in zip(x_chunks, y_chunks)] | ||
| if debug_mode: | ||
| # avoid creating huge submit_params list in case of tiny target_chunk_size | ||
| src_files = src_files[:1] | ||
| x_chunks = x_chunks[:1] | ||
| y_chunks = y_chunks[:1] | ||
| n_chunks_per_file = [min(n_chunks_per_file[0], 2)] | ||
| # repeat variables per input file chunks | ||
| submit_arg_list = [ | ||
| { | ||
| "src_path": p, | ||
| "x_chunks": x, | ||
| "y_chunks": y, | ||
| "chunk_id": chunk_id, | ||
| "chunk_name": f"{p.split('/')[-1].rsplit('.', maxsplit=1)[0]}_{chunk_id}", | ||
| "src_path_id": i, | ||
| } | ||
| for (i, p), x, y, n in zip( | ||
| enumerate(src_files), x_chunks, y_chunks, n_chunks_per_file | ||
| ) | ||
| for chunk_id in range(n) | ||
| ] | ||
| run_params = { | ||
| "tmp_path": tmp_path, | ||
@@ -57,4 +116,10 @@ "res": res, | ||
| pool = fused.submit(udf_extract, submit_params, **params, collect=False, **kwargs) | ||
| return pool | ||
| # Run the actual extract step | ||
| print(f"-- processing {len(submit_arg_list)} chunks") | ||
| result_extract = _submit_with_fallback( | ||
| "Extract", udf_extract, submit_arg_list, run_params, submit_kwargs | ||
| ) | ||
| _print_batch_jobs(result_extract) | ||
| result_extract.wait() | ||
| return result_extract | ||
@@ -75,2 +140,3 @@ | ||
| include_source_url: bool = True, | ||
| src_path_values: list[str] = None, | ||
| ): | ||
@@ -92,6 +158,7 @@ # define UDF that imports the helper function inside the UDF | ||
| include_source_url=include_source_url, | ||
| src_path_values=src_path_values, | ||
| ) | ||
| def run_partition( | ||
| def run_partition_step( | ||
| tmp_path: str, | ||
@@ -108,4 +175,5 @@ file_ids: list[str], | ||
| include_source_url: bool = True, | ||
| src_path_values: list[str] = None, | ||
| debug_mode: bool = False, | ||
| **kwargs, | ||
| **submit_kwargs, | ||
| ): | ||
@@ -127,3 +195,4 @@ """ | ||
| """ | ||
| params = { | ||
| submit_arg_list = [{"file_id": i} for i in file_ids] | ||
| run_params = { | ||
| "tmp_path": tmp_path, | ||
@@ -140,16 +209,14 @@ "output_path": output_path, | ||
| "include_source_url": include_source_url, | ||
| "src_path_values": src_path_values if include_source_url else None, | ||
| } | ||
| submit_params = [{"file_id": i} for i in file_ids] | ||
| if debug_mode: | ||
| submit_params = submit_params[:2] | ||
| submit_arg_list = submit_arg_list[:2] | ||
| pool = fused.submit( | ||
| udf_partition, | ||
| submit_params, | ||
| **params, | ||
| collect=False, | ||
| **kwargs, | ||
| # Run the actual partition step | ||
| result_partition = _submit_with_fallback( | ||
| "Partition", udf_partition, submit_arg_list, run_params, submit_kwargs | ||
| ) | ||
| return pool | ||
| _print_batch_jobs(result_partition) | ||
| result_partition.wait() | ||
| return result_partition | ||
@@ -172,3 +239,3 @@ | ||
| def run_overview( | ||
| def run_overview_step( | ||
| tmp_path: str, | ||
@@ -179,5 +246,9 @@ output_path: str, | ||
| max_rows_per_chunk: int = 0, | ||
| **kwargs, | ||
| **submit_kwargs, | ||
| ): | ||
| params = { | ||
| submit_arg_list = [ | ||
| {"res": res, "chunk_res": chunk_res} | ||
| for res, chunk_res in zip(overview_res, overview_chunk_res) | ||
| ] | ||
| run_params = { | ||
| "tmp_path": tmp_path, | ||
@@ -187,19 +258,13 @@ "output_path": output_path, | ||
| } | ||
| submit_params = [ | ||
| {"res": res, "chunk_res": chunk_res} | ||
| for res, chunk_res in zip(overview_res, overview_chunk_res) | ||
| ] | ||
| pool = fused.submit( | ||
| udf_overview, | ||
| submit_params, | ||
| **params, | ||
| collect=False, | ||
| **kwargs, | ||
| result_overview = _submit_with_fallback( | ||
| "Overview", udf_overview, submit_arg_list, run_params, submit_kwargs | ||
| ) | ||
| return pool | ||
| _print_batch_jobs(result_overview) | ||
| result_overview.wait() | ||
| return result_overview | ||
| def _submit_with_fallback( | ||
| step: str, run_function, run_kwargs: dict, submit_kwargs: dict, update_callback=None | ||
| step: str, udf, arg_list: list, run_kwargs: dict, submit_kwargs: dict | ||
| ): | ||
@@ -211,5 +276,4 @@ if not ( | ||
| submit_kwargs["instance_type"] = "realtime" | ||
| result = run_function( | ||
| **run_kwargs, | ||
| **submit_kwargs, | ||
| result = fused.submit( | ||
| udf, arg_list, **run_kwargs, collect=False, **submit_kwargs | ||
| ) | ||
@@ -231,13 +295,26 @@ result.wait() | ||
| ) | ||
| update_callback(run_kwargs, errors) | ||
| arg_list = [arg_list[i] for i in errors.keys()] | ||
| submit_kwargs["instance_type"] = "large" | ||
| result = run_function( | ||
| **run_kwargs, | ||
| **submit_kwargs, | ||
| if submit_kwargs["n_processes_per_worker"] == 1: | ||
| print( | ||
| "-- n_processes_per_worker is set 1 by default. You can " | ||
| "potentially increase this for better performance (if the " | ||
| "instance has enough memory)" | ||
| ) | ||
| result = fused.submit( | ||
| udf, arg_list, **run_kwargs, collect=False, **submit_kwargs | ||
| ) | ||
| else: | ||
| if ( | ||
| submit_kwargs.get("instance_type", "realtime") != "realtime" | ||
| and submit_kwargs["n_processes_per_worker"] == 1 | ||
| ): | ||
| print( | ||
| "-- n_processes_per_worker is set 1 by default. You can " | ||
| "potentially increase this for better performance (if the " | ||
| "instance has enough memory)" | ||
| ) | ||
| # otherwise run once with user provided configuration | ||
| result = run_function( | ||
| **run_kwargs, | ||
| **submit_kwargs, | ||
| result = result = fused.submit( | ||
| udf, arg_list, **run_kwargs, collect=False, **submit_kwargs | ||
| ) | ||
@@ -435,2 +512,3 @@ return result | ||
| overwrite: bool = False, | ||
| steps: list[str] | None = None, | ||
| extract_kwargs={}, | ||
@@ -512,2 +590,5 @@ partition_kwargs={}, | ||
| the `output_path` is not empty. | ||
| steps (list of str): The processing steps to run. Can include | ||
| "extract", "partition", "metadata", and "overview". By default, all | ||
| steps are run. | ||
| extract_kwargs (dict): Additional keyword arguments to pass to | ||
@@ -525,3 +606,3 @@ `fused.submit` for the extract step. | ||
| Typical keywords include `engine`, `instance_type`, `max_workers`, | ||
| and `n_processes_per_worker`. | ||
| `n_processes_per_worker` and `max_retry`. | ||
@@ -548,8 +629,9 @@ The extract, partition and overview steps are run in parallel using | ||
| ) | ||
| """ | ||
| import datetime | ||
| import numpy as np | ||
| import rasterio | ||
| In contrast to `fused.submit` itself, the ingestion sets `n_processes_per_worker=1` | ||
| by default to avoid out-of-memory issues on batch instances. You can | ||
| increase this if you know the instance has enough memory to process multiple | ||
| chunks in parallel. | ||
| """ | ||
| try: | ||
@@ -568,2 +650,13 @@ from job2.partition.raster_to_h3 import udf_sample | ||
| # Validate steps to run | ||
| if steps is None: | ||
| steps = ["extract", "partition", "metadata", "overview"] | ||
| else: | ||
| for step in steps: | ||
| if step not in ["extract", "partition", "metadata", "overview"]: | ||
| raise ValueError( | ||
| f"Invalid step '{step}' specified in `steps`. Supported steps " | ||
| "are 'extract', 'partition', 'metadata', and 'overview'." | ||
| ) | ||
| # Validate and preprocess input src path | ||
@@ -597,6 +690,4 @@ if isinstance(src_path, str): | ||
| # Construct path for intermediate results | ||
| skip_extract = False | ||
| if tmp_path is not None: | ||
| print(f"-- Using user-specified temporary path: {tmp_path}") | ||
| skip_extract = True | ||
| else: | ||
@@ -647,63 +738,23 @@ tmp_path = _create_tmp_path(src_files[0], output_path) | ||
| # set some default kwargs for submit | ||
| # - max_retry=0: typical reason the realtime run fails here will be because | ||
| # of time outs or out-of-memory errors, which are unlikely to be resolved | ||
| # by retrying | ||
| # - n_processes_per_worker=1: when running on a (typically large) batch | ||
| # instance, the default to use as many processes as cores will almost | ||
| # always result in out-of-memory issues -> use a conservative default of 1 | ||
| # instead. User can still increase this if they know the instance has enough | ||
| # memory | ||
| kwargs = {"max_retry": 0, "n_processes_per_worker": 1} | kwargs | ||
| ########################################################################### | ||
| # Step one: extracting pixel values and converting to hex divided in chunks | ||
| if not skip_extract: | ||
| if "extract" in steps: | ||
| print("\nRunning extract step") | ||
| start_extract_time = datetime.datetime.now() | ||
| if target_chunk_size is None: | ||
| if len(src_files) > 20: | ||
| print( | ||
| "-- processing each file as a single chunk by default (specify " | ||
| "'target_chunk_size' to override this)" | ||
| ) | ||
| target_chunk_size = 0 | ||
| else: | ||
| target_chunk_size = 10_000_000 | ||
| if target_chunk_size > 0: | ||
| # determine number of chunks based on target chunk size | ||
| x_chunks = [] | ||
| y_chunks = [] | ||
| for src_file in src_files: | ||
| with rasterio.open(src_file) as src: | ||
| meta = src.meta | ||
| x_chunks.append( | ||
| max(round(meta["width"] / np.sqrt(target_chunk_size)), 1) | ||
| ) | ||
| y_chunks.append( | ||
| max(round(meta["height"] / np.sqrt(target_chunk_size)), 1) | ||
| ) | ||
| else: | ||
| # allow to process each file as a single chunk | ||
| x_chunks = [1] * len(src_files) | ||
| y_chunks = [1] * len(src_files) | ||
| n_chunks_per_file = [x * y for x, y in zip(x_chunks, y_chunks)] | ||
| if debug_mode: | ||
| # avoid creating huge submit_params list in case of tiny target_chunk_size | ||
| src_path = src_path[:1] | ||
| x_chunks = x_chunks[:1] | ||
| y_chunks = y_chunks[:1] | ||
| n_chunks_per_file = [min(n_chunks_per_file[0], 2)] | ||
| # repeat variables per input file chunks | ||
| submit_params = [ | ||
| { | ||
| "src_path": p, | ||
| "x_chunks": x, | ||
| "y_chunks": y, | ||
| "chunk_id": chunk_id, | ||
| "chunk_name": f"{p.split('/')[-1].rsplit('.', maxsplit=1)[0]}_{chunk_id}", | ||
| } | ||
| for p, x, y, n in zip(src_files, x_chunks, y_chunks, n_chunks_per_file) | ||
| for chunk_id in range(n) | ||
| ] | ||
| extract_run_kwargs = dict( | ||
| submit_params=submit_params, | ||
| tmp_path=tmp_path, | ||
| result_extract = run_extract_step( | ||
| src_files, | ||
| target_chunk_size, | ||
| tmp_path, | ||
| res=res, | ||
@@ -713,18 +764,5 @@ k_ring=k_ring, | ||
| file_res=file_res, | ||
| debug_mode=debug_mode, | ||
| **(kwargs | extract_kwargs), | ||
| ) | ||
| extract_submit_kwargs = {"max_retry": 0} | kwargs | extract_kwargs | ||
| # Run the actual extract step | ||
| print(f"-- processing {len(submit_params)} chunks") | ||
| result_extract = _submit_with_fallback( | ||
| "Extract", | ||
| run_extract, | ||
| extract_run_kwargs, | ||
| extract_submit_kwargs, | ||
| update_callback=lambda run_kwargs, errors: run_kwargs.update( | ||
| {"submit_params": [submit_params[i] for i in errors.keys()]} | ||
| ), | ||
| ) | ||
| _print_batch_jobs(result_extract) | ||
| result_extract.wait() | ||
| end_extract_time = datetime.datetime.now() | ||
@@ -743,100 +781,83 @@ if not result_extract.all_succeeded(): | ||
| # metadata and overviews | ||
| if "partition" in steps: | ||
| print("\nRunning partition step") | ||
| print("\nRunning partition step") | ||
| # list available file_ids from the previous step | ||
| file_ids = _list_tmp_file_ids(tmp_path) | ||
| print(f"-- processing {len(file_ids)} file_ids") | ||
| # list available file_ids from the previous step | ||
| file_ids = _list_tmp_file_ids(tmp_path) | ||
| print(f"-- processing {len(file_ids)} file_ids") | ||
| result_partition = run_partition_step( | ||
| tmp_path, | ||
| file_ids, | ||
| output_path, | ||
| metrics=metrics, | ||
| chunk_res=chunk_res, | ||
| overview_res=overview_res, | ||
| max_rows_per_chunk=max_rows_per_chunk, | ||
| include_source_url=include_source_url, | ||
| src_path_values=src_files, | ||
| **(kwargs | partition_kwargs), | ||
| ) | ||
| end_partition_time = datetime.datetime.now() | ||
| if not result_partition.all_succeeded(): | ||
| print("\nPartition step failed!") | ||
| _cleanup_tmp_files(tmp_path, remove_tmp_files) | ||
| return result_extract, result_partition | ||
| print(f"-- Done partition! (took {end_partition_time - end_extract_time})") | ||
| else: | ||
| print("\nSkipping partition step") | ||
| end_partition_time = datetime.datetime.now() | ||
| partition_run_kwargs = dict( | ||
| tmp_path=tmp_path, | ||
| file_ids=file_ids, | ||
| output_path=output_path, | ||
| metrics=metrics, | ||
| chunk_res=chunk_res, | ||
| overview_res=overview_res, | ||
| max_rows_per_chunk=max_rows_per_chunk, | ||
| include_source_url=include_source_url, | ||
| ) | ||
| partition_submit_kwargs = {"max_retry": 0} | kwargs | partition_kwargs | ||
| # Run the actual partition step | ||
| result_partition = _submit_with_fallback( | ||
| "Partition", | ||
| run_partition, | ||
| partition_run_kwargs, | ||
| partition_submit_kwargs, | ||
| update_callback=lambda run_kwargs, errors: run_kwargs.update( | ||
| {"file_ids": [file_ids[i] for i in errors.keys()]} | ||
| ), | ||
| ) | ||
| _print_batch_jobs(result_partition) | ||
| result_partition.wait() | ||
| end_partition_time = datetime.datetime.now() | ||
| if not result_partition.all_succeeded(): | ||
| print("\nPartition step failed!") | ||
| _cleanup_tmp_files(tmp_path, remove_tmp_files) | ||
| return result_extract, result_partition | ||
| print(f"-- Done partition! (took {end_partition_time - end_extract_time})") | ||
| ########################################################################### | ||
| # Step 3: combining the metadata and overview tmp files | ||
| print("\nRunning sample step") | ||
| if "metadata" in steps: | ||
| print("\nRunning metadata (_sample) step") | ||
| @fused.udf(cache_max_age=0) | ||
| def udf_sample(tmp_path: str, output_path: str): | ||
| from job2.partition.raster_to_h3 import udf_sample as run_udf_sample | ||
| @fused.udf(cache_max_age=0) | ||
| def udf_sample(tmp_path: str, output_path: str): | ||
| from job2.partition.raster_to_h3 import udf_sample as run_udf_sample | ||
| return run_udf_sample(tmp_path, output_path) | ||
| return run_udf_sample(tmp_path, output_path) | ||
| sample_file = fused.run( | ||
| udf_sample, | ||
| tmp_path=tmp_path, | ||
| output_path=output_path, | ||
| verbose=False, | ||
| engine=kwargs.get("engine", None), | ||
| ) | ||
| end_sample_time = datetime.datetime.now() | ||
| print(f"-- Written: {sample_file}") | ||
| print(f"-- Done sample! (took {end_sample_time - end_partition_time})") | ||
| sample_file = fused.run( | ||
| udf_sample, | ||
| tmp_path=tmp_path, | ||
| output_path=output_path, | ||
| verbose=False, | ||
| engine=kwargs.get("engine", None), | ||
| ) | ||
| end_sample_time = datetime.datetime.now() | ||
| print(f"-- Written: {sample_file}") | ||
| print(f"-- Done metadata! (took {end_sample_time - end_partition_time})") | ||
| else: | ||
| print("\nSkipping metadata (_sample) step") | ||
| end_sample_time = datetime.datetime.now() | ||
| print("\nRunning overview step") | ||
| overview_run_kwargs = dict( | ||
| tmp_path=tmp_path, | ||
| output_path=output_path, | ||
| overview_res=overview_res, | ||
| overview_chunk_res=overview_chunk_res, | ||
| max_rows_per_chunk=max_rows_per_chunk, | ||
| ) | ||
| overview_submit_kwargs = {"max_retry": 0} | kwargs | overview_kwargs | ||
| if "overview" in steps: | ||
| print("\nRunning overview step") | ||
| result_overview = run_overview_step( | ||
| tmp_path, | ||
| output_path, | ||
| overview_res=overview_res, | ||
| overview_chunk_res=overview_chunk_res, | ||
| max_rows_per_chunk=max_rows_per_chunk, | ||
| **(kwargs | overview_kwargs), | ||
| ) | ||
| end_overview_time = datetime.datetime.now() | ||
| if not result_overview.all_succeeded(): | ||
| print("\nOverview step failed!") | ||
| _cleanup_tmp_files(tmp_path, remove_tmp_files) | ||
| return result_extract, result_partition | ||
| result_overview = _submit_with_fallback( | ||
| "Overview", | ||
| run_overview, | ||
| overview_run_kwargs, | ||
| overview_submit_kwargs, | ||
| update_callback=lambda run_kwargs, errors: run_kwargs.update( | ||
| { | ||
| "overview_res": [overview_res[i] for i in errors.keys()], | ||
| "overview_chunk_res": [overview_chunk_res[i] for i in errors.keys()], | ||
| } | ||
| ), | ||
| ) | ||
| _print_batch_jobs(result_overview) | ||
| result_overview.wait() | ||
| for i, overview_file in enumerate(result_overview.results()): | ||
| print( | ||
| f"-- Written: {overview_file} (res={overview_res[i]}, chunk_res={overview_chunk_res[i]})" | ||
| ) | ||
| end_overview_time = datetime.datetime.now() | ||
| if not result_overview.all_succeeded(): | ||
| print("\nOverview step failed!") | ||
| _cleanup_tmp_files(tmp_path, remove_tmp_files) | ||
| return result_extract, result_partition | ||
| print(f"-- Done overview! (took {end_overview_time - end_sample_time})") | ||
| else: | ||
| print("\nSkipping overview step") | ||
| end_overview_time = datetime.datetime.now() | ||
| for i, overview_file in enumerate(result_overview.results()): | ||
| print( | ||
| f"-- Written: {overview_file} (res={overview_res[i]}, chunk_res={overview_chunk_res[i]})" | ||
| ) | ||
| print(f"-- Done overview! (took {end_overview_time - end_sample_time})") | ||
| # remove tmp files | ||
@@ -863,18 +884,17 @@ _cleanup_tmp_files(tmp_path, remove_tmp_files) | ||
| def run_divide(src_files: list[str], tmp_path: str, file_res: int, **kwargs): | ||
| params = { | ||
| def _run_divide_step(src_files: list[str], tmp_path: str, file_res: int, **kwargs): | ||
| run_params = { | ||
| "tmp_path": tmp_path, | ||
| "file_res": file_res, | ||
| } | ||
| submit_params = [ | ||
| submit_arg_list = [ | ||
| {"src_path": p, "chunk_name": str(i)} for i, p in enumerate(src_files) | ||
| ] | ||
| pool = fused.submit( | ||
| udf_divide, | ||
| submit_params, | ||
| **params, | ||
| collect=False, | ||
| **kwargs, | ||
| result_divide = _submit_with_fallback( | ||
| "Extract", udf_divide, submit_arg_list, run_params, kwargs | ||
| ) | ||
| return pool | ||
| _print_batch_jobs(result_divide) | ||
| result_divide.wait() | ||
| return result_divide | ||
@@ -1068,23 +1088,5 @@ | ||
| divide_run_kwargs = dict( | ||
| src_files=files, | ||
| tmp_path=tmp_path, | ||
| file_res=file_res, | ||
| ) | ||
| # Run the actual extract (divide) step | ||
| divide_submit_kwargs = {"max_retry": 0} | kwargs | extract_kwargs | ||
| # Run the actual extract (divide) step | ||
| result_extract = _submit_with_fallback( | ||
| "Extract", | ||
| run_divide, | ||
| divide_run_kwargs, | ||
| divide_submit_kwargs, | ||
| update_callback=lambda run_kwargs, errors: run_kwargs.update( | ||
| { | ||
| "src_path": [files[i] for i in errors.keys()], | ||
| "chunk_name": [str(i) for i in errors.keys()], | ||
| } | ||
| ), | ||
| ) | ||
| result_extract.wait() | ||
| result_extract = _run_divide_step(files, tmp_path, file_res, **divide_submit_kwargs) | ||
| end_extract_time = datetime.datetime.now() | ||
@@ -1110,6 +1112,8 @@ if not result_extract.all_succeeded(): | ||
| partition_run_kwargs = dict( | ||
| tmp_path=tmp_path, | ||
| file_ids=file_ids, | ||
| output_path=output_path, | ||
| # Run the actual partition step | ||
| partition_submit_kwargs = {"max_retry": 0} | kwargs | partition_kwargs | ||
| result_partition = run_partition_step( | ||
| tmp_path, | ||
| file_ids, | ||
| output_path, | ||
| metrics=metrics, | ||
@@ -1122,16 +1126,5 @@ groupby_cols=groupby_cols, | ||
| max_rows_per_chunk=max_rows_per_chunk, | ||
| src_path_values=files, | ||
| **partition_submit_kwargs, | ||
| ) | ||
| partition_submit_kwargs = {"max_retry": 0} | kwargs | partition_kwargs | ||
| # Run the actual partition step | ||
| result_partition = _submit_with_fallback( | ||
| "Partition", | ||
| run_partition, | ||
| partition_run_kwargs, | ||
| partition_submit_kwargs, | ||
| update_callback=lambda run_kwargs, errors: run_kwargs.update( | ||
| {"file_ids": [file_ids[i] for i in errors.keys()]} | ||
| ), | ||
| ) | ||
| result_partition.wait() | ||
| end_partition_time = datetime.datetime.now() | ||
@@ -1167,25 +1160,11 @@ if not result_partition.all_succeeded(): | ||
| print("\nRunning overview step") | ||
| overview_run_kwargs = dict( | ||
| tmp_path=tmp_path, | ||
| output_path=output_path, | ||
| overview_submit_kwargs = {"max_retry": 0} | kwargs | overview_kwargs | ||
| result_overview = run_overview_step( | ||
| tmp_path, | ||
| output_path, | ||
| overview_res=overview_res, | ||
| overview_chunk_res=overview_chunk_res, | ||
| max_rows_per_chunk=max_rows_per_chunk, | ||
| **overview_submit_kwargs, | ||
| ) | ||
| overview_submit_kwargs = {"max_retry": 0} | kwargs | overview_kwargs | ||
| result_overview = _submit_with_fallback( | ||
| "Overview", | ||
| run_overview, | ||
| overview_run_kwargs, | ||
| overview_submit_kwargs, | ||
| update_callback=lambda run_kwargs, errors: run_kwargs.update( | ||
| { | ||
| "overview_res": [overview_res[i] for i in errors.keys()], | ||
| "overview_chunk_res": [overview_chunk_res[i] for i in errors.keys()], | ||
| } | ||
| ), | ||
| ) | ||
| result_overview.wait() | ||
| end_overview_time = datetime.datetime.now() | ||
@@ -1192,0 +1171,0 @@ if not result_overview.all_succeeded(): |
+719
-232
@@ -9,10 +9,2 @@ """Read H3-indexed tables by hex ranges.""" | ||
| from fused._fasttortoise import async_read_combined_row_groups | ||
| from fused._h3._grouping import ( | ||
| DownloadGroup, | ||
| RowGroupMetadata, | ||
| create_size_based_groups, | ||
| group_row_groups_by_file, | ||
| ) | ||
| if TYPE_CHECKING: | ||
@@ -30,2 +22,13 @@ import pyarrow as pa | ||
| def _require_job2(func_name: str) -> None: | ||
| """Raise RuntimeError if job2 is not available.""" | ||
| try: | ||
| import job2 # noqa: F401 | ||
| except ImportError as e: | ||
| raise RuntimeError( | ||
| f"{func_name} requires the job2 module. " | ||
| "This function is only available in the Fused execution environment." | ||
| ) from e | ||
| def read_hex_table_slow( | ||
@@ -126,2 +129,5 @@ dataset_path: str, | ||
| # Run the pipelined fetch and download | ||
| _require_job2("read_hex_table_slow") | ||
| from job2.fasttortoise._h3_read import _fetch_with_combining | ||
| tables, timing_info = _run_async( | ||
@@ -189,4 +195,2 @@ _fetch_with_combining( | ||
| try: | ||
| import asyncio | ||
| from job2.fasttortoise._reconstruction import ( | ||
@@ -203,3 +207,3 @@ _async_s3_client_loop, | ||
| ): | ||
| await _shared_async_s3_client.__aexit__(None, None, None) | ||
| await _shared_async_s3_client.close() | ||
| except Exception: | ||
@@ -286,13 +290,6 @@ pass | ||
| """ | ||
| try: | ||
| from job2.fasttortoise import ( | ||
| read_hex_table as _read_hex_table, | ||
| ) | ||
| except ImportError: | ||
| raise ImportError( | ||
| "read_hex_table requires job2. This function is only available " | ||
| "in the Fused execution environment." | ||
| ) from None | ||
| _require_job2("read_hex_table") | ||
| from job2.fasttortoise import read_hex_table as _job2_read_hex_table | ||
| return _read_hex_table( | ||
| return _job2_read_hex_table( | ||
| dataset_path=dataset_path, | ||
@@ -309,272 +306,762 @@ hex_ranges_list=hex_ranges_list, | ||
| async def _fetch_with_combining( | ||
| row_groups: List[Dict[str, Any]], | ||
| base_url: str, | ||
| columns: Optional[List[str]], | ||
| batch_size: int, | ||
| def read_hex_table_with_persisted_metadata( | ||
| dataset_path: str, | ||
| hex_ranges_list: List[List[int]], | ||
| columns: Optional[List[str]] = None, | ||
| metadata_path: Optional[str] = None, | ||
| verbose: bool = False, | ||
| metadata_batch_size: int = 50, | ||
| ) -> tuple: | ||
| return_timing_info: bool = False, | ||
| batch_size: Optional[int] = None, | ||
| max_concurrent_downloads: Optional[int] = None, | ||
| use_local_cache: bool = False, | ||
| local_cache_dir: Optional[str] = None, | ||
| ) -> "pa.Table" | tuple["pa.Table", Dict[str, Any]]: | ||
| """ | ||
| Fetch row groups with pipelined metadata fetch and combined downloads. | ||
| Read data from an H3-indexed dataset using persisted metadata parquet. | ||
| This implements a producer-consumer pattern: | ||
| 1. Group row groups by file | ||
| 2. For each file, fetch metadata for all row groups in parallel (batched) | ||
| 3. As metadata arrives, form size-based download groups | ||
| 4. Download groups are processed as they become ready | ||
| This function reads from per-file metadata parquet files instead of | ||
| querying a server. Each source parquet file has a corresponding | ||
| .metadata.parquet file stored at: | ||
| {source_dir}/.fused/{source_filename}.metadata.parquet | ||
| Or at the specified metadata_path if provided: | ||
| {metadata_path}/{full_source_path}.metadata.parquet | ||
| Supports subdirectory queries - if dataset_path points to a subdirectory, | ||
| the function will look for metadata files for files in that subdirectory. | ||
| Args: | ||
| row_groups: List of dicts with 'path' and 'row_group_index' keys | ||
| base_url: Base URL for API | ||
| columns: Optional list of column names to read | ||
| batch_size: Target size in bytes for combining row groups | ||
| verbose: If True, print progress information | ||
| metadata_batch_size: Maximum number of row group metadata requests to batch together | ||
| dataset_path: Path to the H3-indexed dataset (e.g., "s3://bucket/dataset/") | ||
| Can also be a subdirectory path for filtering (e.g., "s3://bucket/dataset/year=2024/") | ||
| hex_ranges_list: List of [min_hex, max_hex] pairs as integers. | ||
| Example: [[622236719905341439, 622246719905341439]] | ||
| columns: Optional list of column names to read. If None, reads all columns. | ||
| metadata_path: Directory path where metadata files are stored. | ||
| If None, looks for metadata at {source_dir}/.fused/ for each source file. | ||
| If provided, reads metadata files from this location using full source paths. | ||
| This allows reading metadata from a different location when | ||
| you don't have access to the original dataset directory. | ||
| verbose: If True, print progress information. Default is False. | ||
| return_timing_info: If True, return a tuple of (table, timing_info) instead of just the table. | ||
| Default is False for backward compatibility. | ||
| batch_size: Target size in bytes for combining row groups. If None, uses | ||
| `fused.options.row_group_batch_size` (default: 32KB). | ||
| max_concurrent_downloads: Maximum number of simultaneous download operations. If None, | ||
| uses a default based on the number of files. Default is None. | ||
| use_local_cache: If True, download metadata files to local cache first and read from there. | ||
| This removes S3 download overhead for benchmarking. Default is False. | ||
| local_cache_dir: Directory path for local cache. If None and use_local_cache is True, | ||
| uses a temporary directory. Metadata files are cached by their S3 path. | ||
| Returns: | ||
| Tuple of (List of PyArrow Tables, timing_info dict) | ||
| PyArrow Table containing the concatenated data from all matching row groups. | ||
| If return_timing_info is True, returns a tuple of (table, timing_info dict). | ||
| Raises: | ||
| FileNotFoundError: If the metadata parquet file is not found. | ||
| ValueError: If any row group is missing required metadata. | ||
| Example: | ||
| import fused | ||
| # First, persist metadata (one-time operation) | ||
| fused.h3.persist_hex_table_metadata("s3://my-bucket/my-dataset/") | ||
| # Then read using persisted metadata (no server required) | ||
| table = fused.h3.read_hex_table_with_persisted_metadata( | ||
| dataset_path="s3://my-bucket/my-dataset/", | ||
| hex_ranges_list=[[622236719905341439, 622246719905341439]] | ||
| ) | ||
| df = table.to_pandas() | ||
| # Read from subdirectory (filters by path prefix) | ||
| table = fused.h3.read_hex_table_with_persisted_metadata( | ||
| dataset_path="s3://my-bucket/my-dataset/year=2024/", | ||
| hex_ranges_list=[[622236719905341439, 622246719905341439]] | ||
| ) | ||
| # Read with metadata in alternate location | ||
| table = fused.h3.read_hex_table_with_persisted_metadata( | ||
| dataset_path="s3://my-bucket/my-dataset/", | ||
| metadata_path="s3://my-bucket/metadata/", | ||
| hex_ranges_list=[[622236719905341439, 622246719905341439]] | ||
| ) | ||
| # Benchmark with local cache (removes S3 download overhead) | ||
| table, timing = fused.h3.read_hex_table_with_persisted_metadata( | ||
| dataset_path="s3://my-bucket/my-dataset/", | ||
| hex_ranges_list=[[622236719905341439, 622246719905341439]], | ||
| use_local_cache=True, | ||
| return_timing_info=True, | ||
| ) | ||
| """ | ||
| import aiohttp | ||
| import pyarrow as pa | ||
| import pyarrow.parquet as pq | ||
| from fused._h3._grouping import ( | ||
| find_consecutive_runs, | ||
| ) | ||
| from fused._options import options as OPTIONS | ||
| # Create our own aiohttp session for this operation | ||
| # This ensures proper cleanup and doesn't interfere with shared sessions | ||
| connector = aiohttp.TCPConnector( | ||
| limit=100, # Reasonable limit for concurrent requests | ||
| ttl_dns_cache=300, | ||
| use_dns_cache=True, | ||
| # Lazy import from job2 | ||
| _require_job2("read_hex_table_with_persisted_metadata") | ||
| from job2.fasttortoise._h3_index import ( | ||
| _get_metadata_file_path, | ||
| _list_parquet_files, | ||
| is_single_parquet_file, | ||
| ) | ||
| session = aiohttp.ClientSession( | ||
| connector=connector, | ||
| timeout=aiohttp.ClientTimeout(total=OPTIONS.request_timeout), | ||
| ) | ||
| try: | ||
| # Timing tracking - cumulative (sum of all operations) | ||
| metadata_total_ms = 0.0 | ||
| download_total_ms = 0.0 | ||
| num_groups = 0 | ||
| if not hex_ranges_list: | ||
| return pa.table({}) | ||
| # Wall-clock timing - first start to last end | ||
| metadata_first_start: Optional[float] = None | ||
| metadata_last_end: Optional[float] = None | ||
| download_first_start: Optional[float] = None | ||
| download_last_end: Optional[float] = None | ||
| t0 = time.perf_counter() | ||
| # Track longest individual operations | ||
| longest_metadata_fetch_ms = 0.0 | ||
| longest_download_ms = 0.0 | ||
| import hashlib | ||
| import os | ||
| import tempfile | ||
| from pathlib import Path | ||
| # Group row groups by file path | ||
| by_file = group_row_groups_by_file(row_groups) | ||
| import fsspec | ||
| # Initialize timing breakdown and data size tracking | ||
| timing_breakdown = { | ||
| "file_listing_ms": 0.0, | ||
| "cache_setup_ms": 0.0, | ||
| "s3_exists_check_ms": 0.0, | ||
| "s3_download_ms": 0.0, | ||
| "s3_open_ms": 0.0, | ||
| "parquet_file_init_ms": 0.0, | ||
| "row_group_filter_ms": 0.0, # Time to filter row groups using statistics | ||
| "read_table_ms": 0.0, | ||
| "schema_extract_ms": 0.0, | ||
| "json_decode_ms": 0.0, | ||
| "filter_ms": 0.0, | ||
| "convert_to_row_groups_ms": 0.0, | ||
| "validation_ms": 0.0, | ||
| "download_ms": 0.0, | ||
| "concat_ms": 0.0, | ||
| "row_group_stats": { | ||
| "total": 0, | ||
| "matched": 0, | ||
| }, # Track row group filtering stats | ||
| } | ||
| # Track data sizes | ||
| data_sizes = { | ||
| "metadata_bytes_downloaded": 0, # Total metadata files downloaded (full files) | ||
| "metadata_bytes_read": 0, # Bytes actually read from parquet | ||
| "metadata_bytes_used": 0, # Total metadata bytes actually used (after filtering) | ||
| "data_bytes": 0, # Total row group data downloaded | ||
| "metadata_files_count": 0, | ||
| } | ||
| # Set up local cache if requested | ||
| cache_dir = None | ||
| if use_local_cache: | ||
| t_cache_setup = time.perf_counter() | ||
| if local_cache_dir is None: | ||
| cache_dir = tempfile.mkdtemp(prefix="fused_metadata_cache_") | ||
| else: | ||
| cache_dir = local_cache_dir | ||
| Path(cache_dir).mkdir(parents=True, exist_ok=True) | ||
| timing_breakdown["cache_setup_ms"] = ( | ||
| time.perf_counter() - t_cache_setup | ||
| ) * 1000 | ||
| if verbose: | ||
| print(f" Row groups span {len(by_file)} files") | ||
| print(f"Using local cache directory: {cache_dir}") | ||
| # Queue for download groups ready to be fetched | ||
| download_queue: asyncio.Queue[Optional[DownloadGroup]] = asyncio.Queue() | ||
| t_listing = time.perf_counter() | ||
| # Determine source files to query | ||
| if dataset_split := is_single_parquet_file(dataset_path): | ||
| # Single file case | ||
| dataset_root = dataset_split[0] | ||
| source_files = [dataset_split[1]] | ||
| else: | ||
| # Directory case - list all parquet files | ||
| dataset_root = dataset_path.rstrip("/") | ||
| source_files = _list_parquet_files(dataset_path) | ||
| timing_breakdown["file_listing_ms"] = (time.perf_counter() - t_listing) * 1000 | ||
| # Results collection (protected by lock for thread safety) | ||
| results: List["pa.Table"] = [] | ||
| results_lock = asyncio.Lock() | ||
| if not source_files: | ||
| raise FileNotFoundError(f"No parquet files found in {dataset_path}") | ||
| # Track how many producers are still running | ||
| num_producers = len(by_file) | ||
| producers_done = asyncio.Event() | ||
| if verbose: | ||
| print(f"Found {len(source_files)} source files to query") | ||
| # Track timing | ||
| timing_lock = asyncio.Lock() | ||
| # Read metadata for each source file and collect row groups | ||
| all_row_groups = [] | ||
| metadata_read_time = 0.0 | ||
| async def metadata_producer(file_path: str, rg_indices: List[int]) -> None: | ||
| """Fetch metadata for all row groups in a file and create download groups.""" | ||
| nonlocal num_producers, metadata_total_ms, num_groups | ||
| nonlocal metadata_first_start, metadata_last_end, longest_metadata_fetch_ms | ||
| for rel_path in source_files: | ||
| full_source_path = f"{dataset_root}/{rel_path}" | ||
| try: | ||
| # Find consecutive runs first | ||
| runs = find_consecutive_runs(rg_indices) | ||
| # Determine metadata file path | ||
| metadata_file_path = _get_metadata_file_path( | ||
| full_source_path, dataset_root, metadata_path | ||
| ) | ||
| for run in runs: | ||
| # Batch metadata requests to reduce API overhead | ||
| for batch_start in range(0, len(run), metadata_batch_size): | ||
| batch_indices = run[ | ||
| batch_start : batch_start + metadata_batch_size | ||
| ] | ||
| try: | ||
| # Handle local caching | ||
| t_exists = time.perf_counter() | ||
| original_fs, original_fs_path = fsspec.core.url_to_fs(metadata_file_path) | ||
| t_meta_start = time.perf_counter() | ||
| # If using local cache, check if file is cached or download it | ||
| if use_local_cache and cache_dir: | ||
| # Create a hash-based filename for the cache | ||
| path_hash = hashlib.md5(metadata_file_path.encode()).hexdigest() | ||
| cached_file_path = os.path.join(cache_dir, f"{path_hash}.parquet") | ||
| # Use batched API call | ||
| from fused._fasttortoise import ( | ||
| async_fetch_row_group_metadata_batch, | ||
| if os.path.exists(cached_file_path): | ||
| if verbose: | ||
| print(f" Using cached metadata file: {cached_file_path}") | ||
| # Use local file instead | ||
| fs, fs_path = fsspec.core.url_to_fs(f"file://{cached_file_path}") | ||
| # Track cached file size (downloaded previously) | ||
| try: | ||
| cached_size = os.path.getsize(cached_file_path) | ||
| data_sizes["metadata_bytes_downloaded"] += cached_size | ||
| data_sizes["metadata_files_count"] += 1 | ||
| except Exception: | ||
| pass | ||
| else: | ||
| # Check if original file exists first | ||
| if not original_fs.exists(original_fs_path): | ||
| if verbose: | ||
| print( | ||
| f" Skipping {rel_path} - metadata file not found: {metadata_file_path}" | ||
| ) | ||
| continue | ||
| # Download to cache first | ||
| if verbose: | ||
| print( | ||
| f" Downloading metadata to cache: {metadata_file_path} -> {cached_file_path}" | ||
| ) | ||
| t_download = time.perf_counter() | ||
| metadata_size = 0 | ||
| with original_fs.open(original_fs_path, "rb") as src: | ||
| with open(cached_file_path, "wb") as dst: | ||
| while True: | ||
| chunk = src.read(8192) # 8KB chunks | ||
| if not chunk: | ||
| break | ||
| dst.write(chunk) | ||
| metadata_size += len(chunk) | ||
| timing_breakdown["s3_download_ms"] += ( | ||
| time.perf_counter() - t_download | ||
| ) * 1000 | ||
| data_sizes["metadata_bytes_downloaded"] += metadata_size | ||
| data_sizes["metadata_files_count"] += 1 | ||
| fs, fs_path = fsspec.core.url_to_fs(f"file://{cached_file_path}") | ||
| else: | ||
| # Not using cache - check if file exists | ||
| if not original_fs.exists(original_fs_path): | ||
| if verbose: | ||
| print( | ||
| f" Skipping {rel_path} - metadata file not found: {metadata_file_path}" | ||
| ) | ||
| continue | ||
| fs, fs_path = original_fs, original_fs_path | ||
| # Get metadata file size if available (we download the whole file) | ||
| try: | ||
| if hasattr(fs, "info"): | ||
| file_info = fs.info(fs_path) | ||
| if "size" in file_info: | ||
| data_sizes["metadata_bytes_downloaded"] += file_info["size"] | ||
| data_sizes["metadata_files_count"] += 1 | ||
| except Exception: | ||
| pass # Ignore if we can't get file size | ||
| batch_requests = [ | ||
| {"path": file_path, "row_group_index": idx} | ||
| for idx in batch_indices | ||
| ] | ||
| metadata_results = await async_fetch_row_group_metadata_batch( | ||
| batch_requests, base_url=base_url, session=session | ||
| timing_breakdown["s3_exists_check_ms"] += ( | ||
| time.perf_counter() - t_exists | ||
| ) * 1000 | ||
| # Read metadata file with row group filtering | ||
| read_start = time.perf_counter() | ||
| t_open = time.perf_counter() | ||
| with fs.open(fs_path, "rb") as f: | ||
| timing_breakdown["s3_open_ms"] += (time.perf_counter() - t_open) * 1000 | ||
| t_parquet_init = time.perf_counter() | ||
| parquet_file = pq.ParquetFile(f) | ||
| timing_breakdown["parquet_file_init_ms"] += ( | ||
| time.perf_counter() - t_parquet_init | ||
| ) * 1000 | ||
| # Extract metadata_json from schema metadata | ||
| t_schema = time.perf_counter() | ||
| schema = parquet_file.schema_arrow | ||
| metadata_json_bytes = schema.metadata.get(b"metadata_json") | ||
| if not metadata_json_bytes: | ||
| if verbose: | ||
| print( | ||
| f" Warning: {rel_path} metadata file missing metadata_json in schema" | ||
| ) | ||
| continue | ||
| timing_breakdown["schema_extract_ms"] += ( | ||
| time.perf_counter() - t_schema | ||
| ) * 1000 | ||
| t_meta_end = time.perf_counter() | ||
| t_json = time.perf_counter() | ||
| metadata_json_str = metadata_json_bytes.decode() | ||
| timing_breakdown["json_decode_ms"] += ( | ||
| time.perf_counter() - t_json | ||
| ) * 1000 | ||
| async with timing_lock: | ||
| metadata_fetch_ms = (t_meta_end - t_meta_start) * 1000 | ||
| metadata_total_ms += metadata_fetch_ms | ||
| # Track longest individual metadata fetch | ||
| if metadata_fetch_ms > longest_metadata_fetch_ms: | ||
| longest_metadata_fetch_ms = metadata_fetch_ms | ||
| # Track wall-clock: first start to last end | ||
| if ( | ||
| metadata_first_start is None | ||
| or t_meta_start < metadata_first_start | ||
| ): | ||
| metadata_first_start = t_meta_start | ||
| if ( | ||
| metadata_last_end is None | ||
| or t_meta_end > metadata_last_end | ||
| ): | ||
| metadata_last_end = t_meta_end | ||
| # Filter row groups using statistics before reading | ||
| t_rg_filter = time.perf_counter() | ||
| matching_row_groups = [] | ||
| metadata = parquet_file.metadata | ||
| total_row_groups = metadata.num_row_groups | ||
| # Convert to RowGroupMetadata objects | ||
| rg_metadata_list = [ | ||
| RowGroupMetadata( | ||
| path=m["path"], | ||
| row_group_index=m["row_group_index"], | ||
| start_offset=m["start_offset"], | ||
| end_offset=m["end_offset"], | ||
| api_metadata=m["api_metadata"], | ||
| for rg_idx in range(total_row_groups): | ||
| rg = metadata.row_group(rg_idx) | ||
| # Get h3_min and h3_max statistics from row group | ||
| # Find the column indices for h3_min and h3_max | ||
| h3_min_col_idx = None | ||
| h3_max_col_idx = None | ||
| for col_idx in range(rg.num_columns): | ||
| # Get column name from schema | ||
| col_name = schema.names[col_idx] | ||
| if col_name == "h3_min": | ||
| h3_min_col_idx = col_idx | ||
| elif col_name == "h3_max": | ||
| h3_max_col_idx = col_idx | ||
| # Check if this row group could match any query range | ||
| if h3_min_col_idx is not None and h3_max_col_idx is not None: | ||
| h3_min_col = rg.column(h3_min_col_idx) | ||
| h3_max_col = rg.column(h3_max_col_idx) | ||
| # Get min/max statistics | ||
| rg_h3_min = None | ||
| rg_h3_max = None | ||
| if ( | ||
| h3_min_col.statistics | ||
| and h3_min_col.statistics.min is not None | ||
| ): | ||
| rg_h3_min = ( | ||
| h3_min_col.statistics.min.as_py() | ||
| if hasattr(h3_min_col.statistics.min, "as_py") | ||
| else int(h3_min_col.statistics.min) | ||
| ) | ||
| for m in metadata_results | ||
| ] | ||
| if ( | ||
| h3_max_col.statistics | ||
| and h3_max_col.statistics.max is not None | ||
| ): | ||
| rg_h3_max = ( | ||
| h3_max_col.statistics.max.as_py() | ||
| if hasattr(h3_max_col.statistics.max, "as_py") | ||
| else int(h3_max_col.statistics.max) | ||
| ) | ||
| # Create size-based groups from this batch | ||
| groups = create_size_based_groups(rg_metadata_list, batch_size) | ||
| if rg_h3_min is not None and rg_h3_max is not None: | ||
| # Check if row group range overlaps with any query range | ||
| matches = False | ||
| for q_min, q_max in hex_ranges_list: | ||
| # Range overlap: rg_h3_min <= q_max AND rg_h3_max >= q_min | ||
| if rg_h3_min <= q_max and rg_h3_max >= q_min: | ||
| matches = True | ||
| break | ||
| async with timing_lock: | ||
| num_groups += len(groups) | ||
| if matches: | ||
| matching_row_groups.append(rg_idx) | ||
| else: | ||
| # No statistics available - include this row group to be safe | ||
| matching_row_groups.append(rg_idx) | ||
| else: | ||
| # Can't filter - include this row group to be safe | ||
| matching_row_groups.append(rg_idx) | ||
| # Enqueue each group for download | ||
| for group in groups: | ||
| await download_queue.put(group) | ||
| timing_breakdown["row_group_filter_ms"] += ( | ||
| time.perf_counter() - t_rg_filter | ||
| ) * 1000 | ||
| finally: | ||
| # Mark this producer as done | ||
| num_producers -= 1 | ||
| if num_producers == 0: | ||
| producers_done.set() | ||
| # Store row group filtering stats for verbose output | ||
| matching_row_groups_count = len(matching_row_groups) | ||
| timing_breakdown["row_group_stats"]["total"] += total_row_groups | ||
| timing_breakdown["row_group_stats"]["matched"] += ( | ||
| matching_row_groups_count | ||
| ) | ||
| async def download_consumer() -> None: | ||
| """Download groups from the queue as they become available.""" | ||
| nonlocal download_total_ms | ||
| nonlocal download_first_start, download_last_end, longest_download_ms | ||
| # Read only matching row groups | ||
| t_read_table = time.perf_counter() | ||
| if matching_row_groups: | ||
| metadata_table = parquet_file.read_row_groups(matching_row_groups) | ||
| else: | ||
| # No matching row groups - skip this file | ||
| if verbose: | ||
| print( | ||
| f" No matching row groups in {rel_path} (checked {total_row_groups} row groups)" | ||
| ) | ||
| continue | ||
| while True: | ||
| # Wait for either a group or all producers to be done | ||
| timing_breakdown["read_table_ms"] += ( | ||
| time.perf_counter() - t_read_table | ||
| ) * 1000 | ||
| data_sizes["metadata_bytes_read"] += metadata_table.nbytes | ||
| # Filter by H3 ranges on the read data (in case row group stats weren't precise) | ||
| t_filter = time.perf_counter() | ||
| filtered_table = _filter_by_h3_ranges(metadata_table, hex_ranges_list) | ||
| timing_breakdown["filter_ms"] += (time.perf_counter() - t_filter) * 1000 | ||
| # Store reference table for size calculations | ||
| reference_table = metadata_table | ||
| if len(filtered_table) == 0: | ||
| continue | ||
| metadata_read_time += time.perf_counter() - read_start | ||
| # Track actual bytes used (size of filtered table) | ||
| if len(filtered_table) > 0: | ||
| try: | ||
| # Use a timeout to periodically check if producers are done | ||
| group = await asyncio.wait_for(download_queue.get(), timeout=0.1) | ||
| except asyncio.TimeoutError: | ||
| if producers_done.is_set() and download_queue.empty(): | ||
| break | ||
| continue | ||
| # Calculate size of filtered table in bytes | ||
| filtered_size = filtered_table.nbytes | ||
| data_sizes["metadata_bytes_used"] += filtered_size | ||
| except Exception: | ||
| # Fallback: estimate based on number of rows | ||
| # Approximate size per row (rough estimate) | ||
| if len(reference_table) > 0: | ||
| avg_row_size = reference_table.nbytes / len(reference_table) | ||
| data_sizes["metadata_bytes_used"] += avg_row_size * len( | ||
| filtered_table | ||
| ) | ||
| if group is None: | ||
| break | ||
| if len(filtered_table) == 0: | ||
| continue | ||
| # Download and reconstruct all row groups in this group | ||
| metadata_list = [ | ||
| { | ||
| "path": rg.path, | ||
| "row_group_index": rg.row_group_index, | ||
| "start_offset": rg.start_offset, | ||
| "end_offset": rg.end_offset, | ||
| "api_metadata": rg.api_metadata, | ||
| } | ||
| for rg in group.row_groups | ||
| ] | ||
| # Convert to row group format | ||
| # Add file_path and metadata_json to each row group | ||
| # Vectorized conversion: convert entire columns to lists at once | ||
| t_convert = time.perf_counter() | ||
| row_group_indices = filtered_table["row_group_index"].to_pylist() | ||
| start_offsets = filtered_table["start_offset"].to_pylist() | ||
| end_offsets = filtered_table["end_offset"].to_pylist() | ||
| row_group_bytes_list = filtered_table["row_group_bytes"].to_pylist() | ||
| t_dl_start = time.perf_counter() | ||
| tables = await async_read_combined_row_groups( | ||
| path=group.path, | ||
| row_group_metadata_list=metadata_list, | ||
| base_url=base_url, | ||
| columns=columns, | ||
| # Create all row groups at once using list comprehension | ||
| new_row_groups = [ | ||
| { | ||
| "path": full_source_path, | ||
| "row_group_index": idx, | ||
| "start_offset": start, | ||
| "end_offset": end, | ||
| "metadata_json": metadata_json_str, # Use the JSON string directly | ||
| "row_group_bytes": bytes_str, | ||
| } | ||
| for idx, start, end, bytes_str in zip( | ||
| row_group_indices, start_offsets, end_offsets, row_group_bytes_list | ||
| ) | ||
| t_dl_end = time.perf_counter() | ||
| ] | ||
| all_row_groups.extend(new_row_groups) | ||
| timing_breakdown["convert_to_row_groups_ms"] += ( | ||
| time.perf_counter() - t_convert | ||
| ) * 1000 | ||
| async with timing_lock: | ||
| download_ms = (t_dl_end - t_dl_start) * 1000 | ||
| download_total_ms += download_ms | ||
| # Track longest individual download | ||
| if download_ms > longest_download_ms: | ||
| longest_download_ms = download_ms | ||
| # Track wall-clock: first start to last end | ||
| if ( | ||
| download_first_start is None | ||
| or t_dl_start < download_first_start | ||
| ): | ||
| download_first_start = t_dl_start | ||
| if download_last_end is None or t_dl_end > download_last_end: | ||
| download_last_end = t_dl_end | ||
| except Exception as e: | ||
| if verbose: | ||
| print(f" Warning: Failed to read metadata for {rel_path}: {e}") | ||
| continue | ||
| async with results_lock: | ||
| results.extend(tables) | ||
| # Calculate total data size from row groups | ||
| for rg in all_row_groups: | ||
| start_offset = rg.get("start_offset") | ||
| end_offset = rg.get("end_offset") | ||
| if start_offset is not None and end_offset is not None: | ||
| data_sizes["data_bytes"] += end_offset - start_offset | ||
| download_queue.task_done() | ||
| t_read = time.perf_counter() | ||
| # Start all producers and a consumer | ||
| producer_tasks = [ | ||
| asyncio.create_task(metadata_producer(path, indices)) | ||
| for path, indices in by_file.items() | ||
| ] | ||
| # Calculate download rates | ||
| metadata_download_rate = 0.0 | ||
| if ( | ||
| timing_breakdown["s3_download_ms"] > 0 | ||
| and data_sizes["metadata_bytes_downloaded"] > 0 | ||
| ): | ||
| metadata_download_rate = ( | ||
| data_sizes["metadata_bytes_downloaded"] / 1024 / 1024 | ||
| ) / (timing_breakdown["s3_download_ms"] / 1000) # MB/s | ||
| # Start multiple consumers for better parallelism | ||
| num_consumers = min(len(by_file), 8) # At most 8 concurrent downloads | ||
| consumer_tasks = [ | ||
| asyncio.create_task(download_consumer()) for _ in range(num_consumers) | ||
| ] | ||
| if verbose: | ||
| print( | ||
| f" Read metadata: {(t_read - t0) * 1000:.1f}ms ({len(all_row_groups)} row groups from {len(source_files)} files)" | ||
| ) | ||
| print(" Data sizes:") | ||
| if data_sizes["metadata_bytes_downloaded"] > 0: | ||
| metadata_downloaded_mb = ( | ||
| data_sizes["metadata_bytes_downloaded"] / 1024 / 1024 | ||
| ) | ||
| metadata_read_mb = data_sizes["metadata_bytes_read"] / 1024 / 1024 | ||
| metadata_used_mb = data_sizes["metadata_bytes_used"] / 1024 / 1024 | ||
| print( | ||
| f" Metadata files downloaded: {metadata_downloaded_mb:.2f} MB ({data_sizes['metadata_files_count']} files)" | ||
| ) | ||
| if metadata_read_mb > 0: | ||
| read_efficiency = ( | ||
| (metadata_read_mb / metadata_downloaded_mb * 100) | ||
| if metadata_downloaded_mb > 0 | ||
| else 0 | ||
| ) | ||
| print( | ||
| f" Metadata bytes read (row group filtered): {metadata_read_mb:.2f} MB ({read_efficiency:.1f}% of downloaded)" | ||
| ) | ||
| if metadata_used_mb > 0: | ||
| efficiency = ( | ||
| (metadata_used_mb / metadata_downloaded_mb * 100) | ||
| if metadata_downloaded_mb > 0 | ||
| else 0 | ||
| ) | ||
| print( | ||
| f" Metadata bytes used (after filtering): {metadata_used_mb:.2f} MB ({efficiency:.1f}% of downloaded)" | ||
| ) | ||
| if timing_breakdown["s3_download_ms"] > 0: | ||
| print( | ||
| f" Metadata download rate: {metadata_download_rate:.2f} MB/s" | ||
| ) | ||
| data_mb = data_sizes["data_bytes"] / 1024 / 1024 | ||
| print( | ||
| f" Row group data: {data_mb:.2f} MB ({len(all_row_groups)} row groups)" | ||
| ) | ||
| print(" Timing breakdown:") | ||
| print(f" File listing: {timing_breakdown['file_listing_ms']:.1f}ms") | ||
| if use_local_cache: | ||
| print(f" Cache setup: {timing_breakdown['cache_setup_ms']:.1f}ms") | ||
| print(f" S3 exists check: {timing_breakdown['s3_exists_check_ms']:.1f}ms") | ||
| if use_local_cache: | ||
| print( | ||
| f" S3 download (to cache): {timing_breakdown['s3_download_ms']:.1f}ms" | ||
| ) | ||
| print(f" S3 open: {timing_breakdown['s3_open_ms']:.1f}ms") | ||
| print( | ||
| f" ParquetFile init: {timing_breakdown['parquet_file_init_ms']:.1f}ms" | ||
| ) | ||
| if timing_breakdown["row_group_filter_ms"] > 0: | ||
| # Get row group stats from timing_breakdown if available | ||
| rg_stats = timing_breakdown.get("row_group_stats", {}) | ||
| total_rgs = rg_stats.get("total", 0) | ||
| matched_rgs = rg_stats.get("matched", 0) | ||
| if total_rgs > 0: | ||
| efficiency = (matched_rgs / total_rgs * 100) if total_rgs > 0 else 0 | ||
| skipped_rgs = total_rgs - matched_rgs | ||
| print( | ||
| f" Row group filter (stats): {timing_breakdown['row_group_filter_ms']:.1f}ms" | ||
| ) | ||
| print( | ||
| f" Total row groups: {total_rgs:,}, Matched: {matched_rgs:,}, Skipped: {skipped_rgs:,} ({efficiency:.1f}% match rate)" | ||
| ) | ||
| else: | ||
| print( | ||
| f" Row group filter (stats): {timing_breakdown['row_group_filter_ms']:.1f}ms" | ||
| ) | ||
| print(f" Read table: {timing_breakdown['read_table_ms']:.1f}ms") | ||
| print(f" Schema extract: {timing_breakdown['schema_extract_ms']:.1f}ms") | ||
| print(f" JSON decode: {timing_breakdown['json_decode_ms']:.1f}ms") | ||
| print(f" Filter: {timing_breakdown['filter_ms']:.1f}ms") | ||
| print( | ||
| f" Convert to row groups: {timing_breakdown['convert_to_row_groups_ms']:.1f}ms" | ||
| ) | ||
| # Wait for all producers to finish | ||
| await asyncio.gather(*producer_tasks) | ||
| if len(all_row_groups) == 0: | ||
| if return_timing_info: | ||
| timing_info = { | ||
| "metadata_ms": (t_read - t0) * 1000, | ||
| "download_ms": 0, | ||
| "num_groups": 0, | ||
| "timing_breakdown": timing_breakdown, | ||
| "data_sizes": data_sizes, | ||
| "metadata_download_rate_mb_s": metadata_download_rate, | ||
| "data_download_rate_mb_s": 0.0, | ||
| "data_download_wall_rate_mb_s": 0.0, | ||
| } | ||
| return pa.table({}), timing_info | ||
| return pa.table({}) | ||
| # Wait for all items to be processed | ||
| await download_queue.join() | ||
| # Validate that all row groups have required metadata | ||
| t_validate = time.perf_counter() | ||
| for rg in all_row_groups: | ||
| missing_fields = [] | ||
| if rg.get("start_offset") is None: | ||
| missing_fields.append("start_offset") | ||
| if rg.get("end_offset") is None: | ||
| missing_fields.append("end_offset") | ||
| if not rg.get("metadata_json"): | ||
| missing_fields.append("metadata_json") | ||
| if not rg.get("row_group_bytes"): | ||
| missing_fields.append("row_group_bytes") | ||
| # Signal consumers to stop (they'll exit when queue is empty and producers done) | ||
| # Cancel remaining consumer tasks | ||
| for task in consumer_tasks: | ||
| task.cancel() | ||
| if missing_fields: | ||
| raise ValueError( | ||
| f"Row group {rg.get('row_group_index')} in {rg.get('path')} is missing required metadata: " | ||
| f"{', '.join(missing_fields)}. The dataset needs to be re-indexed with persist_hex_table_metadata()." | ||
| ) | ||
| timing_breakdown["validation_ms"] = (time.perf_counter() - t_validate) * 1000 | ||
| # Wait for consumers to finish (ignore cancellation errors) | ||
| await asyncio.gather(*consumer_tasks, return_exceptions=True) | ||
| # Get the batch size from options if not provided | ||
| if batch_size is None: | ||
| batch_size = OPTIONS.row_group_batch_size | ||
| # Calculate wall-clock times | ||
| metadata_wall_ms = 0.0 | ||
| if metadata_first_start is not None and metadata_last_end is not None: | ||
| metadata_wall_ms = (metadata_last_end - metadata_first_start) * 1000 | ||
| if verbose: | ||
| print(f" Using batch size: {batch_size} bytes") | ||
| print(" Using persisted metadata (no API calls)") | ||
| download_wall_ms = 0.0 | ||
| if download_first_start is not None and download_last_end is not None: | ||
| download_wall_ms = (download_last_end - download_first_start) * 1000 | ||
| # Run the prefetched fetch and download | ||
| async def _run_with_cleanup(): | ||
| """Run coroutine and clean up shared S3 client afterwards.""" | ||
| from job2.fasttortoise._h3_read import _fetch_with_combining_prefetched | ||
| try: | ||
| return await _fetch_with_combining_prefetched( | ||
| all_row_groups, | ||
| "", # base_url not used for persisted metadata | ||
| columns, | ||
| batch_size, | ||
| verbose, | ||
| max_concurrent_downloads, | ||
| ) | ||
| finally: | ||
| # Clean up shared S3 client to prevent "Unclosed" warnings | ||
| try: | ||
| from job2.fasttortoise._reconstruction import _shared_async_s3_client | ||
| if _shared_async_s3_client is not None: | ||
| await _shared_async_s3_client.close() | ||
| except Exception: | ||
| pass | ||
| tables, download_timing_info = _run_async(_run_with_cleanup()) | ||
| t_fetch = time.perf_counter() | ||
| timing_breakdown["download_ms"] = (t_fetch - t_read) * 1000 | ||
| # Calculate data download rate | ||
| data_download_rate = 0.0 | ||
| data_download_wall_rate = 0.0 | ||
| if download_timing_info: | ||
| download_wall_ms = download_timing_info.get("download_wall_ms", 0) | ||
| if download_wall_ms > 0 and data_sizes["data_bytes"] > 0: | ||
| data_download_wall_rate = (data_sizes["data_bytes"] / 1024 / 1024) / ( | ||
| download_wall_ms / 1000 | ||
| ) # MB/s | ||
| download_cumulative_ms = download_timing_info.get("download_ms", 0) | ||
| if download_cumulative_ms > 0 and data_sizes["data_bytes"] > 0: | ||
| data_download_rate = (data_sizes["data_bytes"] / 1024 / 1024) / ( | ||
| download_cumulative_ms / 1000 | ||
| ) # MB/s | ||
| if verbose: | ||
| print(f" Download: {(t_fetch - t_read) * 1000:.1f}ms") | ||
| if data_sizes["data_bytes"] > 0: | ||
| data_mb = data_sizes["data_bytes"] / 1024 / 1024 | ||
| print(f" Data downloaded: {data_mb:.2f} MB") | ||
| if data_download_wall_rate > 0: | ||
| print( | ||
| f" Effective download rate: {data_download_wall_rate:.2f} MB/s (wall-clock)" | ||
| ) | ||
| if data_download_rate > 0 and data_download_rate != data_download_wall_rate: | ||
| print(f" Cumulative download rate: {data_download_rate:.2f} MB/s") | ||
| if download_timing_info: | ||
| print( | ||
| f" Data download: {download_timing_info.get('download_wall_ms', 0):.1f}ms wall-clock, " | ||
| f"{download_timing_info.get('download_ms', 0):.1f}ms cumulative" | ||
| ) | ||
| print(f" Download groups: {download_timing_info.get('num_groups', 0)}") | ||
| # Concatenate all tables into one | ||
| if not tables: | ||
| if return_timing_info: | ||
| timing_info = { | ||
| "metadata_ms": (t_read - t0) * 1000, | ||
| "download_ms": timing_breakdown["download_ms"], | ||
| "num_groups": download_timing_info.get("num_groups", 0) | ||
| if download_timing_info | ||
| else 0, | ||
| "timing_breakdown": timing_breakdown, | ||
| "data_sizes": data_sizes, | ||
| "metadata_download_rate_mb_s": metadata_download_rate, | ||
| "data_download_rate_mb_s": data_download_rate, | ||
| "data_download_wall_rate_mb_s": data_download_wall_rate, | ||
| } | ||
| if download_timing_info: | ||
| timing_info.update(download_timing_info) | ||
| return pa.table({}), timing_info | ||
| return pa.table({}) | ||
| if len(tables) == 1: | ||
| if return_timing_info: | ||
| timing_info = { | ||
| "metadata_ms": (t_read - t0) * 1000, | ||
| "download_ms": timing_breakdown["download_ms"], | ||
| "num_groups": download_timing_info.get("num_groups", 0) | ||
| if download_timing_info | ||
| else 0, | ||
| "timing_breakdown": timing_breakdown, | ||
| "data_sizes": data_sizes, | ||
| "metadata_download_rate_mb_s": metadata_download_rate, | ||
| "data_download_rate_mb_s": data_download_rate, | ||
| "data_download_wall_rate_mb_s": data_download_wall_rate, | ||
| } | ||
| if download_timing_info: | ||
| timing_info.update(download_timing_info) | ||
| return tables[0], timing_info | ||
| return tables[0] | ||
| t_concat_start = time.perf_counter() | ||
| result = pa.concat_tables(tables, promote_options="permissive") | ||
| t_concat = time.perf_counter() | ||
| timing_breakdown["concat_ms"] = (t_concat - t_concat_start) * 1000 | ||
| if verbose: | ||
| print(f" Concat tables: {(t_concat - t_concat_start) * 1000:.1f}ms") | ||
| if return_timing_info: | ||
| timing_info = { | ||
| "metadata_ms": metadata_total_ms, # Cumulative | ||
| "metadata_wall_ms": metadata_wall_ms, # Wall-clock (first to last) | ||
| "longest_metadata_fetch_ms": longest_metadata_fetch_ms, # Longest individual metadata fetch | ||
| "download_ms": download_total_ms, # Cumulative | ||
| "download_wall_ms": download_wall_ms, # Wall-clock (first to last) | ||
| "longest_download_ms": longest_download_ms, # Longest individual download | ||
| "num_groups": num_groups, | ||
| "metadata_ms": (t_read - t0) * 1000, | ||
| "download_ms": timing_breakdown["download_ms"], | ||
| "num_groups": download_timing_info.get("num_groups", 0) | ||
| if download_timing_info | ||
| else 0, | ||
| "timing_breakdown": timing_breakdown, | ||
| "total_ms": (t_concat - t0) * 1000, | ||
| "data_sizes": data_sizes, | ||
| "metadata_download_rate_mb_s": metadata_download_rate, | ||
| "data_download_rate_mb_s": data_download_rate, | ||
| "data_download_wall_rate_mb_s": data_download_wall_rate, | ||
| } | ||
| if download_timing_info: | ||
| timing_info.update(download_timing_info) | ||
| return result, timing_info | ||
| return result | ||
| return results, timing_info | ||
| finally: | ||
| # Always close our session to prevent "Unclosed" warnings | ||
| if not session.closed: | ||
| await session.close() | ||
| def _filter_by_h3_ranges(table: "pa.Table", hex_ranges: List[List[int]]) -> "pa.Table": | ||
| """Filter metadata table by H3 range overlap. | ||
| For each query range [q_min, q_max], finds rows where: | ||
| h3_min <= q_max AND h3_max >= q_min (range overlap) | ||
| Args: | ||
| table: PyArrow table with h3_min and h3_max columns | ||
| hex_ranges: List of [min, max] integer pairs | ||
| Returns: | ||
| Filtered table with only matching row groups | ||
| """ | ||
| import pyarrow.compute as pc | ||
| if not hex_ranges: | ||
| return table | ||
| masks = [] | ||
| for q_min, q_max in hex_ranges: | ||
| # Range overlap: h3_min <= q_max AND h3_max >= q_min | ||
| mask = pc.and_( | ||
| pc.less_equal(table["h3_min"], q_max), | ||
| pc.greater_equal(table["h3_max"], q_min), | ||
| ) | ||
| masks.append(mask) | ||
| # Combine all masks with OR | ||
| combined = masks[0] | ||
| for m in masks[1:]: | ||
| combined = pc.or_(combined, m) | ||
| return table.filter(combined) |
+6
-0
@@ -133,2 +133,5 @@ import asyncio | ||
| elif isinstance(udf, UdfJobStepConfig): | ||
| return ResolvedLocalJobStepUdf(job_step=udf, udf=udf.udf) | ||
| elif isinstance(udf, Udf): | ||
@@ -176,2 +179,5 @@ job_step = UdfJobStepConfig(udf=udf, name=udf.name) | ||
| elif isinstance(udf, UdfJobStepConfig): | ||
| return ResolvedLocalJobStepUdf(job_step=udf, udf=udf.udf) | ||
| elif isinstance(udf, Udf): | ||
@@ -178,0 +184,0 @@ job_step = UdfJobStepConfig(udf=udf, name=udf.name) |
@@ -1,1 +0,1 @@ | ||
| __version__ = "1.28.0" | ||
| __version__ = "1.29.0" |
+10
-1
@@ -280,3 +280,7 @@ from __future__ import annotations | ||
| def hash_udf(udf: dict[str, Any], valid_extensions: list[str]) -> str: | ||
| def hash_udf( | ||
| udf: dict[str, Any], | ||
| valid_extensions: list[str], | ||
| input_data: list[Any] | None = None, | ||
| ) -> str: | ||
| UNIQUE_FIELDS = ["code", "headers", "parameters", "entrypoint", "type"] | ||
@@ -298,2 +302,7 @@ fields_to_hash = {k: v for k, v in udf.items() if k in UNIQUE_FIELDS} | ||
| fields_to_hash["valid_extensions"] = valid_extensions | ||
| # Include input data in hash if provided (for n_processes_per_worker scenarios) | ||
| if input_data is not None: | ||
| fields_to_hash["input_data"] = input_data | ||
| text = json.dumps(fields_to_hash, sort_keys=True) | ||
@@ -300,0 +309,0 @@ return hashlib.sha256(text.encode("UTF-8")).hexdigest() |
@@ -556,2 +556,3 @@ from __future__ import annotations | ||
| send_status_email: bool | None = None, | ||
| name: str | None = None, | ||
| ) -> RunResponse: | ||
@@ -571,2 +572,3 @@ """Execute this operation | ||
| send_status_email: Whether to send a status email to the user when the job is complete. | ||
| name: The name of the job. Defaults to None for a generated name. | ||
| """ | ||
@@ -599,2 +601,3 @@ to_run = self.model_copy(deep=True) | ||
| send_status_email=send_status_email, | ||
| name=name, | ||
| ) | ||
@@ -601,0 +604,0 @@ |
@@ -30,5 +30,9 @@ from __future__ import annotations | ||
| """Whether the instance has been terminated.""" | ||
| last_heartbeat: datetime | None = None | ||
| """When the last heartbeat was received from the job.""" | ||
| job_status: StrictStr | None = None | ||
| """The status of the job.""" | ||
| job_status_date: datetime | None = None | ||
| """When the job_status was last updated""" | ||
| finished_job_status: StrictBool | None = None | ||
@@ -35,0 +39,0 @@ """Whether the job has finished running.""" |
@@ -5,10 +5,23 @@ from typing import Any, Dict | ||
| class MockUdfInput: | ||
| def __init__(self, data: Any) -> None: | ||
| def __init__(self, data: Any, as_kwargs: bool = False) -> None: | ||
| """ | ||
| Mock input for UDF execution. | ||
| Args: | ||
| data: The input data | ||
| as_kwargs: If True and data is a dict, pass it through as kwargs directly. | ||
| If False (default), wrap data in "bounds" parameter (for tile/bbox). | ||
| """ | ||
| self._data = data | ||
| self._as_kwargs = as_kwargs | ||
| def as_udf_args(self) -> Dict[str, Any]: | ||
| kwargs = {} | ||
| if self._data is not None: | ||
| kwargs["bounds"] = self._data | ||
| return kwargs | ||
| if self._as_kwargs and isinstance(self._data, dict): | ||
| # Pass through the dict as kwargs directly for input list processing | ||
| return self._data | ||
| else: | ||
| # Original behavior for tile/bbox | ||
| kwargs = {} | ||
| if self._data is not None: | ||
| kwargs["bounds"] = self._data | ||
| return kwargs |
+1
-1
| Metadata-Version: 2.4 | ||
| Name: fused | ||
| Version: 1.28.0 | ||
| Version: 1.29.0 | ||
| Project-URL: Homepage, https://www.fused.io | ||
@@ -5,0 +5,0 @@ Project-URL: Documentation, https://docs.fused.io |
| """Size-based grouping logic for row group downloads.""" | ||
| from __future__ import annotations | ||
| from dataclasses import dataclass, field | ||
| from typing import Any, Dict, List, Optional | ||
| @dataclass | ||
| class RowGroupMetadata: | ||
| """Metadata for a single row group with byte offset information. | ||
| Attributes: | ||
| path: File path (S3 or HTTP URL) | ||
| row_group_index: Index of the row group in the file (0-based) | ||
| start_offset: Starting byte offset in the file | ||
| end_offset: Ending byte offset in the file (exclusive) | ||
| size: Size of the row group data in bytes (end_offset - start_offset) | ||
| api_metadata: Full metadata dict from API (needed for reconstruction) | ||
| """ | ||
| path: str | ||
| row_group_index: int | ||
| start_offset: int | ||
| end_offset: int | ||
| api_metadata: Dict[str, Any] | ||
| @property | ||
| def size(self) -> int: | ||
| """Size of the row group data in bytes.""" | ||
| return self.end_offset - self.start_offset | ||
| @dataclass | ||
| class DownloadGroup: | ||
| """A group of consecutive row groups from the same file to download together. | ||
| Attributes: | ||
| path: File path (S3 or HTTP URL) | ||
| row_groups: List of RowGroupMetadata objects (must be consecutive indices) | ||
| start_offset: Starting byte offset for the combined download | ||
| end_offset: Ending byte offset for the combined download (exclusive) | ||
| total_size: Total size of all row groups in this group | ||
| """ | ||
| path: str | ||
| row_groups: List[RowGroupMetadata] = field(default_factory=list) | ||
| @property | ||
| def start_offset(self) -> int: | ||
| """Starting byte offset for the combined download.""" | ||
| if not self.row_groups: | ||
| return 0 | ||
| return min(rg.start_offset for rg in self.row_groups) | ||
| @property | ||
| def end_offset(self) -> int: | ||
| """Ending byte offset for the combined download (exclusive).""" | ||
| if not self.row_groups: | ||
| return 0 | ||
| return max(rg.end_offset for rg in self.row_groups) | ||
| @property | ||
| def total_size(self) -> int: | ||
| """Total size of all row groups in this group.""" | ||
| return self.end_offset - self.start_offset | ||
| @property | ||
| def row_group_indices(self) -> List[int]: | ||
| """List of row group indices in this group.""" | ||
| return [rg.row_group_index for rg in self.row_groups] | ||
| def add_row_group(self, rg: RowGroupMetadata) -> None: | ||
| """Add a row group to this download group. | ||
| Args: | ||
| rg: RowGroupMetadata to add | ||
| Raises: | ||
| ValueError: If row group is from a different file or not consecutive | ||
| """ | ||
| if rg.path != self.path: | ||
| raise ValueError( | ||
| f"Cannot add row group from {rg.path} to group for {self.path}" | ||
| ) | ||
| if self.row_groups: | ||
| last_index = self.row_groups[-1].row_group_index | ||
| if rg.row_group_index != last_index + 1: | ||
| raise ValueError( | ||
| f"Row group {rg.row_group_index} is not consecutive with " | ||
| f"last index {last_index}" | ||
| ) | ||
| self.row_groups.append(rg) | ||
| def group_row_groups_by_file( | ||
| row_group_items: List[Dict[str, Any]], | ||
| ) -> Dict[str, List[int]]: | ||
| """Group row group items by file path. | ||
| Args: | ||
| row_group_items: List of dicts with 'path' and 'row_group_index' keys | ||
| Returns: | ||
| Dict mapping file path to list of row group indices (sorted) | ||
| """ | ||
| by_file: Dict[str, List[int]] = {} | ||
| for item in row_group_items: | ||
| path = item["path"] | ||
| rg_index = item["row_group_index"] | ||
| if path not in by_file: | ||
| by_file[path] = [] | ||
| by_file[path].append(rg_index) | ||
| # Sort indices for each file | ||
| for path in by_file: | ||
| by_file[path].sort() | ||
| return by_file | ||
| def find_consecutive_runs(indices: List[int]) -> List[List[int]]: | ||
| """Find runs of consecutive integers in a sorted list. | ||
| Args: | ||
| indices: Sorted list of integers | ||
| Returns: | ||
| List of lists, where each inner list contains consecutive integers. | ||
| Examples: | ||
| >>> find_consecutive_runs([0, 1, 2, 3]) | ||
| [[0, 1, 2, 3]] | ||
| >>> find_consecutive_runs([0, 2, 4, 6]) | ||
| [[0], [2], [4], [6]] | ||
| >>> find_consecutive_runs([0, 1, 5, 6, 7, 10]) | ||
| [[0, 1], [5, 6, 7], [10]] | ||
| >>> find_consecutive_runs([]) | ||
| [] | ||
| >>> find_consecutive_runs([5]) | ||
| [[5]] | ||
| """ | ||
| if not indices: | ||
| return [] | ||
| runs: List[List[int]] = [[indices[0]]] | ||
| for idx in indices[1:]: | ||
| if idx == runs[-1][-1] + 1: | ||
| # Consecutive - extend current run | ||
| runs[-1].append(idx) | ||
| else: | ||
| # Gap - start new run | ||
| runs.append([idx]) | ||
| return runs | ||
| def create_size_based_groups( | ||
| row_group_metadata: List[RowGroupMetadata], | ||
| target_size: int, | ||
| ) -> List[DownloadGroup]: | ||
| """Create download groups from row group metadata based on cumulative size. | ||
| Groups consecutive row groups until the cumulative size reaches the target. | ||
| Row groups that are already larger than the target stay as single groups. | ||
| Gaps in row group indices force new groups. | ||
| Args: | ||
| row_group_metadata: List of RowGroupMetadata (should be from the same file | ||
| and sorted by row_group_index) | ||
| target_size: Target size in bytes for each group | ||
| Returns: | ||
| List of DownloadGroup objects | ||
| Examples: | ||
| With 3 row groups of sizes [10KB, 15KB, 20KB] and target 32KB: | ||
| - Group 1: [rg0, rg1] = 25KB (adding rg2 would exceed 32KB) | ||
| - Group 2: [rg2] = 20KB | ||
| """ | ||
| if not row_group_metadata: | ||
| return [] | ||
| # All metadata should be for the same file | ||
| path = row_group_metadata[0].path | ||
| # Sort by row group index | ||
| sorted_metadata = sorted(row_group_metadata, key=lambda rg: rg.row_group_index) | ||
| groups: List[DownloadGroup] = [] | ||
| current_group: Optional[DownloadGroup] = None | ||
| for rg in sorted_metadata: | ||
| # Check if we need to start a new group | ||
| start_new_group = False | ||
| if current_group is None: | ||
| start_new_group = True | ||
| elif current_group.row_groups: | ||
| last_index = current_group.row_groups[-1].row_group_index | ||
| # Non-consecutive indices force a new group | ||
| if rg.row_group_index != last_index + 1: | ||
| start_new_group = True | ||
| # Check if adding this row group would exceed the target | ||
| # We want batches to stay under the target size | ||
| elif current_group.total_size + rg.size > target_size: | ||
| start_new_group = True | ||
| if start_new_group: | ||
| if current_group is not None and current_group.row_groups: | ||
| groups.append(current_group) | ||
| current_group = DownloadGroup(path=path) | ||
| current_group.add_row_group(rg) | ||
| # Don't forget the last group | ||
| if current_group is not None and current_group.row_groups: | ||
| groups.append(current_group) | ||
| return groups | ||
| def create_download_groups_from_metadata( | ||
| metadata_by_file: Dict[str, List[RowGroupMetadata]], | ||
| target_size: int, | ||
| ) -> List[DownloadGroup]: | ||
| """Create download groups from all files based on cumulative size. | ||
| Args: | ||
| metadata_by_file: Dict mapping file path to list of RowGroupMetadata | ||
| target_size: Target size in bytes for each group | ||
| Returns: | ||
| List of DownloadGroup objects from all files | ||
| """ | ||
| all_groups: List[DownloadGroup] = [] | ||
| for path, metadata_list in metadata_by_file.items(): | ||
| groups = create_size_based_groups(metadata_list, target_size) | ||
| all_groups.extend(groups) | ||
| return all_groups |
Sorry, the diff of this file is too big to display
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
880849
4.02%21101
3.47%