Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

fused

Package Overview
Dependencies
Maintainers
2
Versions
104
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

fused - npm Package Compare versions

Comparing version
1.28.0
to
1.29.0
+87
fused/_h3/index.py
"""Index H3-indexed datasets for serverless queries.
This module provides functions to create metadata parquet files that enable
fast H3 range queries without requiring a server connection.
"""
def persist_hex_table_metadata(
dataset_path: str,
output_path: str | None = None,
metadata_path: str | None = None,
verbose: bool = False,
row_group_size: int = 1,
indexed_columns: list[str] | None = None,
pool_size: int = 10,
) -> str:
"""Persist metadata for a hex table to enable serverless queries.
Scans all parquet files in the dataset, extracts metadata needed for
fast reconstruction, and writes one .metadata.parquet file per source file.
Each metadata file contains:
- Row group metadata (offsets, H3 ranges, etc.)
- Full metadata_json stored as custom metadata in the parquet schema
Requires a 'hex' column (or variant like h3, h3_index) in all files.
Raises ValueError if no hex column is found.
Args:
dataset_path: Path to the dataset directory (e.g., "s3://bucket/dataset/")
output_path: Deprecated - use metadata_path instead
metadata_path: Directory path where metadata files should be written.
If None, writes to {source_dir}/.fused/{source_filename}.metadata.parquet
for each source file. If provided, writes to
{metadata_path}/{full_source_path}.metadata.parquet using the full
source path as the filename. This allows storing metadata in a
different location when you don't have write access to the dataset directory.
verbose: If True, print timing/progress information.
row_group_size: Number of rows per row group in output parquet file.
Default is 1 (one row per row group). Larger values
reduce file size but may increase memory usage during writes.
indexed_columns: List of column names to index. If None (default),
indexes only the first identified indexed column
(typically the hex column). If an empty list, indexes
all detected indexed columns.
pool_size: Number of parallel workers to use for processing files.
Default is 10 (parallel processing enabled by default).
Set to 1 for sequential processing. This can significantly speed up
metadata extraction for large datasets with many files.
Returns:
Path to metadata directory (if metadata_path provided) or first metadata file path
Raises:
ImportError: If job2 package is not available
ValueError: If no hex column is found in any file
FileNotFoundError: If no parquet files are found in the dataset
Example:
>>> fused.h3.persist_hex_table_metadata("s3://my-bucket/my-dataset/")
's3://my-bucket/my-dataset/.fused/file1.metadata.parquet'
>>> fused.h3.persist_hex_table_metadata("s3://my-bucket/my-dataset/", metadata_path="s3://my-bucket/metadata/")
's3://my-bucket/metadata/'
>>> fused.h3.persist_hex_table_metadata("s3://my-bucket/my-dataset/", pool_size=4)
's3://my-bucket/my-dataset/.fused/file1.metadata.parquet'
"""
try:
from job2.fasttortoise._h3_index import persist_hex_table_metadata_impl
except ImportError as e:
raise RuntimeError(
"The H3 index functionality requires the job2 module. "
"This function is only available in the Fused execution environment."
) from e
return persist_hex_table_metadata_impl(
dataset_path=dataset_path,
metadata_path=metadata_path,
verbose=verbose,
row_group_size=row_group_size,
indexed_columns=indexed_columns,
pool_size=pool_size,
)
__all__ = [
"persist_hex_table_metadata",
]
+1
-0

@@ -169,1 +169,2 @@ cdk.out

client/public/fasttortoise/FastTortoiseWasm.wasm
client/public/fasttortoise/fasttortoise-api.js
+27
-119

@@ -8,3 +8,8 @@ """Fast Tortoise - Efficient parquet row group reconstruction from metadata."""

# Keep the API helpers in fused-py since they're just API clients
from ._api import find_dataset, get_row_groups_for_dataset, register_dataset
from ._api import (
find_dataset,
get_row_groups_for_dataset,
get_row_groups_for_dataset_with_metadata,
register_dataset,
)

@@ -235,78 +240,22 @@

RuntimeError: If the batch request fails or metadata is still being processed
This function imports the implementation from job2 at runtime.
"""
metadata_url = _get_metadata_url(base_url)
batch_url = f"{metadata_url}/batch"
try:
from job2.fasttortoise._reconstruction import (
_parse_row_group_from_metadata,
_unwrap_api_metadata,
async_fetch_row_group_metadata_batch as _async_fetch_row_group_metadata_batch,
)
return await _async_fetch_row_group_metadata_batch(
requests=requests,
base_url=base_url,
session=session,
)
except ImportError as e:
raise RuntimeError(
"The fasttortoise reconstruction functionality requires the job2 module."
"The fasttortoise reconstruction functionality requires the job2 module. "
"This is typically only available in the Fused execution environment."
) from e
import aiohttp
from fused import context
from fused.core._realtime_ops import _realtime_raise_for_status_async
auth_headers = context._get_auth_header(missing_ok=True)
if session is None:
from fused.core._realtime_ops import _get_shared_session
session = await _get_shared_session()
async with session.post(
url=batch_url,
json=requests,
headers=auth_headers,
timeout=aiohttp.ClientTimeout(total=OPTIONS.metadata_request_timeout),
) as response:
await _realtime_raise_for_status_async(response)
if response.status == 202:
raise RuntimeError("Metadata is still being processed by the server")
batch_response = await response.json()
results = batch_response.get("results", [])
# Process results and convert to same format as individual calls
processed_results = []
for i, result in enumerate(results):
if "error" in result:
# Propagate error
error = result["error"]
raise RuntimeError(
f"Metadata fetch failed for row group {requests[i]['row_group_index']} "
f"in {requests[i]['path']}: {error.get('detail', 'Unknown error')}"
)
# Unwrap and parse metadata (same as individual call)
api_response = {
"metadata": result.get("metadata", ""),
"row_group_bytes": result.get("row_group_bytes", ""),
}
metadata = _unwrap_api_metadata(api_response)
target_row_group, num_rows, start_offset, end_offset = (
_parse_row_group_from_metadata(metadata)
)
processed_results.append(
{
"path": requests[i]["path"],
"row_group_index": requests[i]["row_group_index"],
"start_offset": start_offset,
"end_offset": end_offset,
"size": end_offset - start_offset,
"api_metadata": metadata,
}
)
return processed_results
async def async_read_combined_row_groups(

@@ -333,14 +282,16 @@ path: str,

List of PyArrow Tables, one per row group in the same order as input
This function imports the implementation from job2 at runtime.
"""
import pyarrow.parquet as pq
if not row_group_metadata_list:
return []
try:
from job2.fasttortoise._reconstruction import (
_async_read_file_range,
_build_parquet_buffer_from_data,
_parse_row_group_from_metadata,
async_read_combined_row_groups as _async_read_combined_row_groups,
)
return await _async_read_combined_row_groups(
path=path,
row_group_metadata_list=row_group_metadata_list,
base_url=base_url,
columns=columns,
)
except ImportError as e:

@@ -352,46 +303,3 @@ raise RuntimeError(

# Calculate the combined byte range
start_offset = min(rg["start_offset"] for rg in row_group_metadata_list)
end_offset = max(rg["end_offset"] for rg in row_group_metadata_list)
data_size = end_offset - start_offset
# Download the combined byte range
combined_data = await _async_read_file_range(path, start_offset, data_size)
# Reconstruct each row group from the combined data
tables = []
for rg_meta in row_group_metadata_list:
metadata = rg_meta["api_metadata"]
# Parse the row group metadata again to get the thrift object
target_row_group, num_rows, rg_start, rg_end = _parse_row_group_from_metadata(
metadata
)
# Extract this row group's data from the combined buffer
# The offset is relative to our combined download start
relative_start = rg_start - start_offset
relative_end = rg_end - start_offset
row_group_data = combined_data[relative_start:relative_end]
# Build the minimal parquet file
parquet_buffer = _build_parquet_buffer_from_data(
row_group_data,
target_row_group,
metadata,
rg_start,
num_rows,
)
try:
# Parse with PyArrow
parquet_file = pq.ParquetFile(parquet_buffer)
table = parquet_file.read_row_group(0, columns=columns)
tables.append(table)
finally:
parquet_buffer.close()
return tables
__all__ = [

@@ -398,0 +306,0 @@ "async_fetch_row_group_metadata",

@@ -272,63 +272,22 @@ """API client helpers for dataset queries."""

# ]
This function imports the implementation from job2 at runtime.
"""
# Use current environment's base URL if not specified
if base_url is None:
base_url = OPTIONS.base_url
try:
from job2.fasttortoise._api import (
get_row_groups_for_dataset_with_metadata as _get_row_groups_for_dataset_with_metadata,
)
# Build request body for POST
body: Dict[str, Any] = {
"dataset_path": dataset_path,
"geographical_regions": geographical_regions,
"include_signed_urls": include_signed_urls,
"include_full_metadata": include_full_metadata,
}
if h3_resolution is not None:
body["h3_resolution"] = h3_resolution
# Make API request with auth headers from execution context
url = f"{base_url}/datasets/items-with-metadata"
with session_with_retries() as session:
response = session.post(
url,
json=body,
headers=context._get_auth_header(missing_ok=True),
return _get_row_groups_for_dataset_with_metadata(
dataset_path=dataset_path,
geographical_regions=geographical_regions,
h3_resolution=h3_resolution,
base_url=base_url,
include_signed_urls=include_signed_urls,
include_full_metadata=include_full_metadata,
)
response.raise_for_status()
# Parse response
data = response.json()
items = data.get("items", [])
# Flatten into list of {path, row_group_index, start_offset, end_offset, ...} dicts
result = []
for item in items:
relative_path = item.get("relative_path", "")
row_groups = item.get("row_groups", [])
signed_url = item.get("signed_url")
# Construct full path - handle empty relative_path to avoid trailing slash
if relative_path:
full_path = dataset_path.rstrip("/") + "/" + relative_path.lstrip("/")
else:
full_path = dataset_path.rstrip("/")
# Add each row group with its metadata
for rg in row_groups:
row_group_dict = {
"path": full_path,
"row_group_index": rg.get("row_group_index"),
"start_offset": rg.get("start_offset"),
"end_offset": rg.get("end_offset"),
}
# Include signed URL if available
if signed_url:
row_group_dict["signed_url"] = signed_url
# Include full metadata if present
if "metadata_json" in rg:
row_group_dict["metadata_json"] = rg.get("metadata_json")
if "row_group_bytes" in rg:
row_group_dict["row_group_bytes"] = rg.get("row_group_bytes")
result.append(row_group_dict)
return result
except ImportError as e:
raise RuntimeError(
"The get_row_groups_for_dataset_with_metadata function requires the job2 module. "
"This function is only available in the Fused execution environment."
) from e

@@ -0,9 +1,16 @@

from .index import persist_hex_table_metadata
from .ingest import run_ingest_raster_to_h3, run_partition_to_h3
from .read import read_hex_table, read_hex_table_slow
from .read import (
read_hex_table,
read_hex_table_slow,
read_hex_table_with_persisted_metadata,
)
__all__ = [
"persist_hex_table_metadata",
"read_hex_table",
"read_hex_table_slow",
"read_hex_table_with_persisted_metadata",
"run_ingest_raster_to_h3",
"run_partition_to_h3",
]

@@ -0,3 +1,6 @@

import datetime
import uuid
import numpy as np
import fused

@@ -19,2 +22,3 @@ from fused.core._fs_utils import is_non_empty_dir

chunk_name: str | None = None,
src_path_id: int = 0,
):

@@ -35,7 +39,9 @@ # define UDF that imports the helper function inside the UDF

chunk_name=chunk_name,
src_path_id=src_path_id,
)
def run_extract(
submit_params: list[dict],
def run_extract_step(
src_files: list[str],
target_chunk_size: int | None,
tmp_path: str,

@@ -46,5 +52,58 @@ res: int = 11,

file_res: int = 2,
**kwargs,
debug_mode: bool = False,
**submit_kwargs,
):
params = {
import rasterio
if target_chunk_size is None:
if len(src_files) > 20:
print(
"-- processing each file as a single chunk by default (specify "
"'target_chunk_size' to override this)"
)
target_chunk_size = 0
else:
target_chunk_size = 10_000_000
if target_chunk_size > 0:
# determine number of chunks based on target chunk size
x_chunks = []
y_chunks = []
for src_file in src_files:
with rasterio.open(src_file) as src:
meta = src.meta
x_chunks.append(max(round(meta["width"] / np.sqrt(target_chunk_size)), 1))
y_chunks.append(max(round(meta["height"] / np.sqrt(target_chunk_size)), 1))
else:
# allow to process each file as a single chunk
x_chunks = [1] * len(src_files)
y_chunks = [1] * len(src_files)
n_chunks_per_file = [x * y for x, y in zip(x_chunks, y_chunks)]
if debug_mode:
# avoid creating huge submit_params list in case of tiny target_chunk_size
src_files = src_files[:1]
x_chunks = x_chunks[:1]
y_chunks = y_chunks[:1]
n_chunks_per_file = [min(n_chunks_per_file[0], 2)]
# repeat variables per input file chunks
submit_arg_list = [
{
"src_path": p,
"x_chunks": x,
"y_chunks": y,
"chunk_id": chunk_id,
"chunk_name": f"{p.split('/')[-1].rsplit('.', maxsplit=1)[0]}_{chunk_id}",
"src_path_id": i,
}
for (i, p), x, y, n in zip(
enumerate(src_files), x_chunks, y_chunks, n_chunks_per_file
)
for chunk_id in range(n)
]
run_params = {
"tmp_path": tmp_path,

@@ -57,4 +116,10 @@ "res": res,

pool = fused.submit(udf_extract, submit_params, **params, collect=False, **kwargs)
return pool
# Run the actual extract step
print(f"-- processing {len(submit_arg_list)} chunks")
result_extract = _submit_with_fallback(
"Extract", udf_extract, submit_arg_list, run_params, submit_kwargs
)
_print_batch_jobs(result_extract)
result_extract.wait()
return result_extract

@@ -75,2 +140,3 @@

include_source_url: bool = True,
src_path_values: list[str] = None,
):

@@ -92,6 +158,7 @@ # define UDF that imports the helper function inside the UDF

include_source_url=include_source_url,
src_path_values=src_path_values,
)
def run_partition(
def run_partition_step(
tmp_path: str,

@@ -108,4 +175,5 @@ file_ids: list[str],

include_source_url: bool = True,
src_path_values: list[str] = None,
debug_mode: bool = False,
**kwargs,
**submit_kwargs,
):

@@ -127,3 +195,4 @@ """

"""
params = {
submit_arg_list = [{"file_id": i} for i in file_ids]
run_params = {
"tmp_path": tmp_path,

@@ -140,16 +209,14 @@ "output_path": output_path,

"include_source_url": include_source_url,
"src_path_values": src_path_values if include_source_url else None,
}
submit_params = [{"file_id": i} for i in file_ids]
if debug_mode:
submit_params = submit_params[:2]
submit_arg_list = submit_arg_list[:2]
pool = fused.submit(
udf_partition,
submit_params,
**params,
collect=False,
**kwargs,
# Run the actual partition step
result_partition = _submit_with_fallback(
"Partition", udf_partition, submit_arg_list, run_params, submit_kwargs
)
return pool
_print_batch_jobs(result_partition)
result_partition.wait()
return result_partition

@@ -172,3 +239,3 @@

def run_overview(
def run_overview_step(
tmp_path: str,

@@ -179,5 +246,9 @@ output_path: str,

max_rows_per_chunk: int = 0,
**kwargs,
**submit_kwargs,
):
params = {
submit_arg_list = [
{"res": res, "chunk_res": chunk_res}
for res, chunk_res in zip(overview_res, overview_chunk_res)
]
run_params = {
"tmp_path": tmp_path,

@@ -187,19 +258,13 @@ "output_path": output_path,

}
submit_params = [
{"res": res, "chunk_res": chunk_res}
for res, chunk_res in zip(overview_res, overview_chunk_res)
]
pool = fused.submit(
udf_overview,
submit_params,
**params,
collect=False,
**kwargs,
result_overview = _submit_with_fallback(
"Overview", udf_overview, submit_arg_list, run_params, submit_kwargs
)
return pool
_print_batch_jobs(result_overview)
result_overview.wait()
return result_overview
def _submit_with_fallback(
step: str, run_function, run_kwargs: dict, submit_kwargs: dict, update_callback=None
step: str, udf, arg_list: list, run_kwargs: dict, submit_kwargs: dict
):

@@ -211,5 +276,4 @@ if not (

submit_kwargs["instance_type"] = "realtime"
result = run_function(
**run_kwargs,
**submit_kwargs,
result = fused.submit(
udf, arg_list, **run_kwargs, collect=False, **submit_kwargs
)

@@ -231,13 +295,26 @@ result.wait()

)
update_callback(run_kwargs, errors)
arg_list = [arg_list[i] for i in errors.keys()]
submit_kwargs["instance_type"] = "large"
result = run_function(
**run_kwargs,
**submit_kwargs,
if submit_kwargs["n_processes_per_worker"] == 1:
print(
"-- n_processes_per_worker is set 1 by default. You can "
"potentially increase this for better performance (if the "
"instance has enough memory)"
)
result = fused.submit(
udf, arg_list, **run_kwargs, collect=False, **submit_kwargs
)
else:
if (
submit_kwargs.get("instance_type", "realtime") != "realtime"
and submit_kwargs["n_processes_per_worker"] == 1
):
print(
"-- n_processes_per_worker is set 1 by default. You can "
"potentially increase this for better performance (if the "
"instance has enough memory)"
)
# otherwise run once with user provided configuration
result = run_function(
**run_kwargs,
**submit_kwargs,
result = result = fused.submit(
udf, arg_list, **run_kwargs, collect=False, **submit_kwargs
)

@@ -435,2 +512,3 @@ return result

overwrite: bool = False,
steps: list[str] | None = None,
extract_kwargs={},

@@ -512,2 +590,5 @@ partition_kwargs={},

the `output_path` is not empty.
steps (list of str): The processing steps to run. Can include
"extract", "partition", "metadata", and "overview". By default, all
steps are run.
extract_kwargs (dict): Additional keyword arguments to pass to

@@ -525,3 +606,3 @@ `fused.submit` for the extract step.

Typical keywords include `engine`, `instance_type`, `max_workers`,
and `n_processes_per_worker`.
`n_processes_per_worker` and `max_retry`.

@@ -548,8 +629,9 @@ The extract, partition and overview steps are run in parallel using

)
"""
import datetime
import numpy as np
import rasterio
In contrast to `fused.submit` itself, the ingestion sets `n_processes_per_worker=1`
by default to avoid out-of-memory issues on batch instances. You can
increase this if you know the instance has enough memory to process multiple
chunks in parallel.
"""
try:

@@ -568,2 +650,13 @@ from job2.partition.raster_to_h3 import udf_sample

# Validate steps to run
if steps is None:
steps = ["extract", "partition", "metadata", "overview"]
else:
for step in steps:
if step not in ["extract", "partition", "metadata", "overview"]:
raise ValueError(
f"Invalid step '{step}' specified in `steps`. Supported steps "
"are 'extract', 'partition', 'metadata', and 'overview'."
)
# Validate and preprocess input src path

@@ -597,6 +690,4 @@ if isinstance(src_path, str):

# Construct path for intermediate results
skip_extract = False
if tmp_path is not None:
print(f"-- Using user-specified temporary path: {tmp_path}")
skip_extract = True
else:

@@ -647,63 +738,23 @@ tmp_path = _create_tmp_path(src_files[0], output_path)

# set some default kwargs for submit
# - max_retry=0: typical reason the realtime run fails here will be because
# of time outs or out-of-memory errors, which are unlikely to be resolved
# by retrying
# - n_processes_per_worker=1: when running on a (typically large) batch
# instance, the default to use as many processes as cores will almost
# always result in out-of-memory issues -> use a conservative default of 1
# instead. User can still increase this if they know the instance has enough
# memory
kwargs = {"max_retry": 0, "n_processes_per_worker": 1} | kwargs
###########################################################################
# Step one: extracting pixel values and converting to hex divided in chunks
if not skip_extract:
if "extract" in steps:
print("\nRunning extract step")
start_extract_time = datetime.datetime.now()
if target_chunk_size is None:
if len(src_files) > 20:
print(
"-- processing each file as a single chunk by default (specify "
"'target_chunk_size' to override this)"
)
target_chunk_size = 0
else:
target_chunk_size = 10_000_000
if target_chunk_size > 0:
# determine number of chunks based on target chunk size
x_chunks = []
y_chunks = []
for src_file in src_files:
with rasterio.open(src_file) as src:
meta = src.meta
x_chunks.append(
max(round(meta["width"] / np.sqrt(target_chunk_size)), 1)
)
y_chunks.append(
max(round(meta["height"] / np.sqrt(target_chunk_size)), 1)
)
else:
# allow to process each file as a single chunk
x_chunks = [1] * len(src_files)
y_chunks = [1] * len(src_files)
n_chunks_per_file = [x * y for x, y in zip(x_chunks, y_chunks)]
if debug_mode:
# avoid creating huge submit_params list in case of tiny target_chunk_size
src_path = src_path[:1]
x_chunks = x_chunks[:1]
y_chunks = y_chunks[:1]
n_chunks_per_file = [min(n_chunks_per_file[0], 2)]
# repeat variables per input file chunks
submit_params = [
{
"src_path": p,
"x_chunks": x,
"y_chunks": y,
"chunk_id": chunk_id,
"chunk_name": f"{p.split('/')[-1].rsplit('.', maxsplit=1)[0]}_{chunk_id}",
}
for p, x, y, n in zip(src_files, x_chunks, y_chunks, n_chunks_per_file)
for chunk_id in range(n)
]
extract_run_kwargs = dict(
submit_params=submit_params,
tmp_path=tmp_path,
result_extract = run_extract_step(
src_files,
target_chunk_size,
tmp_path,
res=res,

@@ -713,18 +764,5 @@ k_ring=k_ring,

file_res=file_res,
debug_mode=debug_mode,
**(kwargs | extract_kwargs),
)
extract_submit_kwargs = {"max_retry": 0} | kwargs | extract_kwargs
# Run the actual extract step
print(f"-- processing {len(submit_params)} chunks")
result_extract = _submit_with_fallback(
"Extract",
run_extract,
extract_run_kwargs,
extract_submit_kwargs,
update_callback=lambda run_kwargs, errors: run_kwargs.update(
{"submit_params": [submit_params[i] for i in errors.keys()]}
),
)
_print_batch_jobs(result_extract)
result_extract.wait()
end_extract_time = datetime.datetime.now()

@@ -743,100 +781,83 @@ if not result_extract.all_succeeded():

# metadata and overviews
if "partition" in steps:
print("\nRunning partition step")
print("\nRunning partition step")
# list available file_ids from the previous step
file_ids = _list_tmp_file_ids(tmp_path)
print(f"-- processing {len(file_ids)} file_ids")
# list available file_ids from the previous step
file_ids = _list_tmp_file_ids(tmp_path)
print(f"-- processing {len(file_ids)} file_ids")
result_partition = run_partition_step(
tmp_path,
file_ids,
output_path,
metrics=metrics,
chunk_res=chunk_res,
overview_res=overview_res,
max_rows_per_chunk=max_rows_per_chunk,
include_source_url=include_source_url,
src_path_values=src_files,
**(kwargs | partition_kwargs),
)
end_partition_time = datetime.datetime.now()
if not result_partition.all_succeeded():
print("\nPartition step failed!")
_cleanup_tmp_files(tmp_path, remove_tmp_files)
return result_extract, result_partition
print(f"-- Done partition! (took {end_partition_time - end_extract_time})")
else:
print("\nSkipping partition step")
end_partition_time = datetime.datetime.now()
partition_run_kwargs = dict(
tmp_path=tmp_path,
file_ids=file_ids,
output_path=output_path,
metrics=metrics,
chunk_res=chunk_res,
overview_res=overview_res,
max_rows_per_chunk=max_rows_per_chunk,
include_source_url=include_source_url,
)
partition_submit_kwargs = {"max_retry": 0} | kwargs | partition_kwargs
# Run the actual partition step
result_partition = _submit_with_fallback(
"Partition",
run_partition,
partition_run_kwargs,
partition_submit_kwargs,
update_callback=lambda run_kwargs, errors: run_kwargs.update(
{"file_ids": [file_ids[i] for i in errors.keys()]}
),
)
_print_batch_jobs(result_partition)
result_partition.wait()
end_partition_time = datetime.datetime.now()
if not result_partition.all_succeeded():
print("\nPartition step failed!")
_cleanup_tmp_files(tmp_path, remove_tmp_files)
return result_extract, result_partition
print(f"-- Done partition! (took {end_partition_time - end_extract_time})")
###########################################################################
# Step 3: combining the metadata and overview tmp files
print("\nRunning sample step")
if "metadata" in steps:
print("\nRunning metadata (_sample) step")
@fused.udf(cache_max_age=0)
def udf_sample(tmp_path: str, output_path: str):
from job2.partition.raster_to_h3 import udf_sample as run_udf_sample
@fused.udf(cache_max_age=0)
def udf_sample(tmp_path: str, output_path: str):
from job2.partition.raster_to_h3 import udf_sample as run_udf_sample
return run_udf_sample(tmp_path, output_path)
return run_udf_sample(tmp_path, output_path)
sample_file = fused.run(
udf_sample,
tmp_path=tmp_path,
output_path=output_path,
verbose=False,
engine=kwargs.get("engine", None),
)
end_sample_time = datetime.datetime.now()
print(f"-- Written: {sample_file}")
print(f"-- Done sample! (took {end_sample_time - end_partition_time})")
sample_file = fused.run(
udf_sample,
tmp_path=tmp_path,
output_path=output_path,
verbose=False,
engine=kwargs.get("engine", None),
)
end_sample_time = datetime.datetime.now()
print(f"-- Written: {sample_file}")
print(f"-- Done metadata! (took {end_sample_time - end_partition_time})")
else:
print("\nSkipping metadata (_sample) step")
end_sample_time = datetime.datetime.now()
print("\nRunning overview step")
overview_run_kwargs = dict(
tmp_path=tmp_path,
output_path=output_path,
overview_res=overview_res,
overview_chunk_res=overview_chunk_res,
max_rows_per_chunk=max_rows_per_chunk,
)
overview_submit_kwargs = {"max_retry": 0} | kwargs | overview_kwargs
if "overview" in steps:
print("\nRunning overview step")
result_overview = run_overview_step(
tmp_path,
output_path,
overview_res=overview_res,
overview_chunk_res=overview_chunk_res,
max_rows_per_chunk=max_rows_per_chunk,
**(kwargs | overview_kwargs),
)
end_overview_time = datetime.datetime.now()
if not result_overview.all_succeeded():
print("\nOverview step failed!")
_cleanup_tmp_files(tmp_path, remove_tmp_files)
return result_extract, result_partition
result_overview = _submit_with_fallback(
"Overview",
run_overview,
overview_run_kwargs,
overview_submit_kwargs,
update_callback=lambda run_kwargs, errors: run_kwargs.update(
{
"overview_res": [overview_res[i] for i in errors.keys()],
"overview_chunk_res": [overview_chunk_res[i] for i in errors.keys()],
}
),
)
_print_batch_jobs(result_overview)
result_overview.wait()
for i, overview_file in enumerate(result_overview.results()):
print(
f"-- Written: {overview_file} (res={overview_res[i]}, chunk_res={overview_chunk_res[i]})"
)
end_overview_time = datetime.datetime.now()
if not result_overview.all_succeeded():
print("\nOverview step failed!")
_cleanup_tmp_files(tmp_path, remove_tmp_files)
return result_extract, result_partition
print(f"-- Done overview! (took {end_overview_time - end_sample_time})")
else:
print("\nSkipping overview step")
end_overview_time = datetime.datetime.now()
for i, overview_file in enumerate(result_overview.results()):
print(
f"-- Written: {overview_file} (res={overview_res[i]}, chunk_res={overview_chunk_res[i]})"
)
print(f"-- Done overview! (took {end_overview_time - end_sample_time})")
# remove tmp files

@@ -863,18 +884,17 @@ _cleanup_tmp_files(tmp_path, remove_tmp_files)

def run_divide(src_files: list[str], tmp_path: str, file_res: int, **kwargs):
params = {
def _run_divide_step(src_files: list[str], tmp_path: str, file_res: int, **kwargs):
run_params = {
"tmp_path": tmp_path,
"file_res": file_res,
}
submit_params = [
submit_arg_list = [
{"src_path": p, "chunk_name": str(i)} for i, p in enumerate(src_files)
]
pool = fused.submit(
udf_divide,
submit_params,
**params,
collect=False,
**kwargs,
result_divide = _submit_with_fallback(
"Extract", udf_divide, submit_arg_list, run_params, kwargs
)
return pool
_print_batch_jobs(result_divide)
result_divide.wait()
return result_divide

@@ -1068,23 +1088,5 @@

divide_run_kwargs = dict(
src_files=files,
tmp_path=tmp_path,
file_res=file_res,
)
# Run the actual extract (divide) step
divide_submit_kwargs = {"max_retry": 0} | kwargs | extract_kwargs
# Run the actual extract (divide) step
result_extract = _submit_with_fallback(
"Extract",
run_divide,
divide_run_kwargs,
divide_submit_kwargs,
update_callback=lambda run_kwargs, errors: run_kwargs.update(
{
"src_path": [files[i] for i in errors.keys()],
"chunk_name": [str(i) for i in errors.keys()],
}
),
)
result_extract.wait()
result_extract = _run_divide_step(files, tmp_path, file_res, **divide_submit_kwargs)
end_extract_time = datetime.datetime.now()

@@ -1110,6 +1112,8 @@ if not result_extract.all_succeeded():

partition_run_kwargs = dict(
tmp_path=tmp_path,
file_ids=file_ids,
output_path=output_path,
# Run the actual partition step
partition_submit_kwargs = {"max_retry": 0} | kwargs | partition_kwargs
result_partition = run_partition_step(
tmp_path,
file_ids,
output_path,
metrics=metrics,

@@ -1122,16 +1126,5 @@ groupby_cols=groupby_cols,

max_rows_per_chunk=max_rows_per_chunk,
src_path_values=files,
**partition_submit_kwargs,
)
partition_submit_kwargs = {"max_retry": 0} | kwargs | partition_kwargs
# Run the actual partition step
result_partition = _submit_with_fallback(
"Partition",
run_partition,
partition_run_kwargs,
partition_submit_kwargs,
update_callback=lambda run_kwargs, errors: run_kwargs.update(
{"file_ids": [file_ids[i] for i in errors.keys()]}
),
)
result_partition.wait()
end_partition_time = datetime.datetime.now()

@@ -1167,25 +1160,11 @@ if not result_partition.all_succeeded():

print("\nRunning overview step")
overview_run_kwargs = dict(
tmp_path=tmp_path,
output_path=output_path,
overview_submit_kwargs = {"max_retry": 0} | kwargs | overview_kwargs
result_overview = run_overview_step(
tmp_path,
output_path,
overview_res=overview_res,
overview_chunk_res=overview_chunk_res,
max_rows_per_chunk=max_rows_per_chunk,
**overview_submit_kwargs,
)
overview_submit_kwargs = {"max_retry": 0} | kwargs | overview_kwargs
result_overview = _submit_with_fallback(
"Overview",
run_overview,
overview_run_kwargs,
overview_submit_kwargs,
update_callback=lambda run_kwargs, errors: run_kwargs.update(
{
"overview_res": [overview_res[i] for i in errors.keys()],
"overview_chunk_res": [overview_chunk_res[i] for i in errors.keys()],
}
),
)
result_overview.wait()
end_overview_time = datetime.datetime.now()

@@ -1192,0 +1171,0 @@ if not result_overview.all_succeeded():

@@ -9,10 +9,2 @@ """Read H3-indexed tables by hex ranges."""

from fused._fasttortoise import async_read_combined_row_groups
from fused._h3._grouping import (
DownloadGroup,
RowGroupMetadata,
create_size_based_groups,
group_row_groups_by_file,
)
if TYPE_CHECKING:

@@ -30,2 +22,13 @@ import pyarrow as pa

def _require_job2(func_name: str) -> None:
"""Raise RuntimeError if job2 is not available."""
try:
import job2 # noqa: F401
except ImportError as e:
raise RuntimeError(
f"{func_name} requires the job2 module. "
"This function is only available in the Fused execution environment."
) from e
def read_hex_table_slow(

@@ -126,2 +129,5 @@ dataset_path: str,

# Run the pipelined fetch and download
_require_job2("read_hex_table_slow")
from job2.fasttortoise._h3_read import _fetch_with_combining
tables, timing_info = _run_async(

@@ -189,4 +195,2 @@ _fetch_with_combining(

try:
import asyncio
from job2.fasttortoise._reconstruction import (

@@ -203,3 +207,3 @@ _async_s3_client_loop,

):
await _shared_async_s3_client.__aexit__(None, None, None)
await _shared_async_s3_client.close()
except Exception:

@@ -286,13 +290,6 @@ pass

"""
try:
from job2.fasttortoise import (
read_hex_table as _read_hex_table,
)
except ImportError:
raise ImportError(
"read_hex_table requires job2. This function is only available "
"in the Fused execution environment."
) from None
_require_job2("read_hex_table")
from job2.fasttortoise import read_hex_table as _job2_read_hex_table
return _read_hex_table(
return _job2_read_hex_table(
dataset_path=dataset_path,

@@ -309,272 +306,762 @@ hex_ranges_list=hex_ranges_list,

async def _fetch_with_combining(
row_groups: List[Dict[str, Any]],
base_url: str,
columns: Optional[List[str]],
batch_size: int,
def read_hex_table_with_persisted_metadata(
dataset_path: str,
hex_ranges_list: List[List[int]],
columns: Optional[List[str]] = None,
metadata_path: Optional[str] = None,
verbose: bool = False,
metadata_batch_size: int = 50,
) -> tuple:
return_timing_info: bool = False,
batch_size: Optional[int] = None,
max_concurrent_downloads: Optional[int] = None,
use_local_cache: bool = False,
local_cache_dir: Optional[str] = None,
) -> "pa.Table" | tuple["pa.Table", Dict[str, Any]]:
"""
Fetch row groups with pipelined metadata fetch and combined downloads.
Read data from an H3-indexed dataset using persisted metadata parquet.
This implements a producer-consumer pattern:
1. Group row groups by file
2. For each file, fetch metadata for all row groups in parallel (batched)
3. As metadata arrives, form size-based download groups
4. Download groups are processed as they become ready
This function reads from per-file metadata parquet files instead of
querying a server. Each source parquet file has a corresponding
.metadata.parquet file stored at:
{source_dir}/.fused/{source_filename}.metadata.parquet
Or at the specified metadata_path if provided:
{metadata_path}/{full_source_path}.metadata.parquet
Supports subdirectory queries - if dataset_path points to a subdirectory,
the function will look for metadata files for files in that subdirectory.
Args:
row_groups: List of dicts with 'path' and 'row_group_index' keys
base_url: Base URL for API
columns: Optional list of column names to read
batch_size: Target size in bytes for combining row groups
verbose: If True, print progress information
metadata_batch_size: Maximum number of row group metadata requests to batch together
dataset_path: Path to the H3-indexed dataset (e.g., "s3://bucket/dataset/")
Can also be a subdirectory path for filtering (e.g., "s3://bucket/dataset/year=2024/")
hex_ranges_list: List of [min_hex, max_hex] pairs as integers.
Example: [[622236719905341439, 622246719905341439]]
columns: Optional list of column names to read. If None, reads all columns.
metadata_path: Directory path where metadata files are stored.
If None, looks for metadata at {source_dir}/.fused/ for each source file.
If provided, reads metadata files from this location using full source paths.
This allows reading metadata from a different location when
you don't have access to the original dataset directory.
verbose: If True, print progress information. Default is False.
return_timing_info: If True, return a tuple of (table, timing_info) instead of just the table.
Default is False for backward compatibility.
batch_size: Target size in bytes for combining row groups. If None, uses
`fused.options.row_group_batch_size` (default: 32KB).
max_concurrent_downloads: Maximum number of simultaneous download operations. If None,
uses a default based on the number of files. Default is None.
use_local_cache: If True, download metadata files to local cache first and read from there.
This removes S3 download overhead for benchmarking. Default is False.
local_cache_dir: Directory path for local cache. If None and use_local_cache is True,
uses a temporary directory. Metadata files are cached by their S3 path.
Returns:
Tuple of (List of PyArrow Tables, timing_info dict)
PyArrow Table containing the concatenated data from all matching row groups.
If return_timing_info is True, returns a tuple of (table, timing_info dict).
Raises:
FileNotFoundError: If the metadata parquet file is not found.
ValueError: If any row group is missing required metadata.
Example:
import fused
# First, persist metadata (one-time operation)
fused.h3.persist_hex_table_metadata("s3://my-bucket/my-dataset/")
# Then read using persisted metadata (no server required)
table = fused.h3.read_hex_table_with_persisted_metadata(
dataset_path="s3://my-bucket/my-dataset/",
hex_ranges_list=[[622236719905341439, 622246719905341439]]
)
df = table.to_pandas()
# Read from subdirectory (filters by path prefix)
table = fused.h3.read_hex_table_with_persisted_metadata(
dataset_path="s3://my-bucket/my-dataset/year=2024/",
hex_ranges_list=[[622236719905341439, 622246719905341439]]
)
# Read with metadata in alternate location
table = fused.h3.read_hex_table_with_persisted_metadata(
dataset_path="s3://my-bucket/my-dataset/",
metadata_path="s3://my-bucket/metadata/",
hex_ranges_list=[[622236719905341439, 622246719905341439]]
)
# Benchmark with local cache (removes S3 download overhead)
table, timing = fused.h3.read_hex_table_with_persisted_metadata(
dataset_path="s3://my-bucket/my-dataset/",
hex_ranges_list=[[622236719905341439, 622246719905341439]],
use_local_cache=True,
return_timing_info=True,
)
"""
import aiohttp
import pyarrow as pa
import pyarrow.parquet as pq
from fused._h3._grouping import (
find_consecutive_runs,
)
from fused._options import options as OPTIONS
# Create our own aiohttp session for this operation
# This ensures proper cleanup and doesn't interfere with shared sessions
connector = aiohttp.TCPConnector(
limit=100, # Reasonable limit for concurrent requests
ttl_dns_cache=300,
use_dns_cache=True,
# Lazy import from job2
_require_job2("read_hex_table_with_persisted_metadata")
from job2.fasttortoise._h3_index import (
_get_metadata_file_path,
_list_parquet_files,
is_single_parquet_file,
)
session = aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(total=OPTIONS.request_timeout),
)
try:
# Timing tracking - cumulative (sum of all operations)
metadata_total_ms = 0.0
download_total_ms = 0.0
num_groups = 0
if not hex_ranges_list:
return pa.table({})
# Wall-clock timing - first start to last end
metadata_first_start: Optional[float] = None
metadata_last_end: Optional[float] = None
download_first_start: Optional[float] = None
download_last_end: Optional[float] = None
t0 = time.perf_counter()
# Track longest individual operations
longest_metadata_fetch_ms = 0.0
longest_download_ms = 0.0
import hashlib
import os
import tempfile
from pathlib import Path
# Group row groups by file path
by_file = group_row_groups_by_file(row_groups)
import fsspec
# Initialize timing breakdown and data size tracking
timing_breakdown = {
"file_listing_ms": 0.0,
"cache_setup_ms": 0.0,
"s3_exists_check_ms": 0.0,
"s3_download_ms": 0.0,
"s3_open_ms": 0.0,
"parquet_file_init_ms": 0.0,
"row_group_filter_ms": 0.0, # Time to filter row groups using statistics
"read_table_ms": 0.0,
"schema_extract_ms": 0.0,
"json_decode_ms": 0.0,
"filter_ms": 0.0,
"convert_to_row_groups_ms": 0.0,
"validation_ms": 0.0,
"download_ms": 0.0,
"concat_ms": 0.0,
"row_group_stats": {
"total": 0,
"matched": 0,
}, # Track row group filtering stats
}
# Track data sizes
data_sizes = {
"metadata_bytes_downloaded": 0, # Total metadata files downloaded (full files)
"metadata_bytes_read": 0, # Bytes actually read from parquet
"metadata_bytes_used": 0, # Total metadata bytes actually used (after filtering)
"data_bytes": 0, # Total row group data downloaded
"metadata_files_count": 0,
}
# Set up local cache if requested
cache_dir = None
if use_local_cache:
t_cache_setup = time.perf_counter()
if local_cache_dir is None:
cache_dir = tempfile.mkdtemp(prefix="fused_metadata_cache_")
else:
cache_dir = local_cache_dir
Path(cache_dir).mkdir(parents=True, exist_ok=True)
timing_breakdown["cache_setup_ms"] = (
time.perf_counter() - t_cache_setup
) * 1000
if verbose:
print(f" Row groups span {len(by_file)} files")
print(f"Using local cache directory: {cache_dir}")
# Queue for download groups ready to be fetched
download_queue: asyncio.Queue[Optional[DownloadGroup]] = asyncio.Queue()
t_listing = time.perf_counter()
# Determine source files to query
if dataset_split := is_single_parquet_file(dataset_path):
# Single file case
dataset_root = dataset_split[0]
source_files = [dataset_split[1]]
else:
# Directory case - list all parquet files
dataset_root = dataset_path.rstrip("/")
source_files = _list_parquet_files(dataset_path)
timing_breakdown["file_listing_ms"] = (time.perf_counter() - t_listing) * 1000
# Results collection (protected by lock for thread safety)
results: List["pa.Table"] = []
results_lock = asyncio.Lock()
if not source_files:
raise FileNotFoundError(f"No parquet files found in {dataset_path}")
# Track how many producers are still running
num_producers = len(by_file)
producers_done = asyncio.Event()
if verbose:
print(f"Found {len(source_files)} source files to query")
# Track timing
timing_lock = asyncio.Lock()
# Read metadata for each source file and collect row groups
all_row_groups = []
metadata_read_time = 0.0
async def metadata_producer(file_path: str, rg_indices: List[int]) -> None:
"""Fetch metadata for all row groups in a file and create download groups."""
nonlocal num_producers, metadata_total_ms, num_groups
nonlocal metadata_first_start, metadata_last_end, longest_metadata_fetch_ms
for rel_path in source_files:
full_source_path = f"{dataset_root}/{rel_path}"
try:
# Find consecutive runs first
runs = find_consecutive_runs(rg_indices)
# Determine metadata file path
metadata_file_path = _get_metadata_file_path(
full_source_path, dataset_root, metadata_path
)
for run in runs:
# Batch metadata requests to reduce API overhead
for batch_start in range(0, len(run), metadata_batch_size):
batch_indices = run[
batch_start : batch_start + metadata_batch_size
]
try:
# Handle local caching
t_exists = time.perf_counter()
original_fs, original_fs_path = fsspec.core.url_to_fs(metadata_file_path)
t_meta_start = time.perf_counter()
# If using local cache, check if file is cached or download it
if use_local_cache and cache_dir:
# Create a hash-based filename for the cache
path_hash = hashlib.md5(metadata_file_path.encode()).hexdigest()
cached_file_path = os.path.join(cache_dir, f"{path_hash}.parquet")
# Use batched API call
from fused._fasttortoise import (
async_fetch_row_group_metadata_batch,
if os.path.exists(cached_file_path):
if verbose:
print(f" Using cached metadata file: {cached_file_path}")
# Use local file instead
fs, fs_path = fsspec.core.url_to_fs(f"file://{cached_file_path}")
# Track cached file size (downloaded previously)
try:
cached_size = os.path.getsize(cached_file_path)
data_sizes["metadata_bytes_downloaded"] += cached_size
data_sizes["metadata_files_count"] += 1
except Exception:
pass
else:
# Check if original file exists first
if not original_fs.exists(original_fs_path):
if verbose:
print(
f" Skipping {rel_path} - metadata file not found: {metadata_file_path}"
)
continue
# Download to cache first
if verbose:
print(
f" Downloading metadata to cache: {metadata_file_path} -> {cached_file_path}"
)
t_download = time.perf_counter()
metadata_size = 0
with original_fs.open(original_fs_path, "rb") as src:
with open(cached_file_path, "wb") as dst:
while True:
chunk = src.read(8192) # 8KB chunks
if not chunk:
break
dst.write(chunk)
metadata_size += len(chunk)
timing_breakdown["s3_download_ms"] += (
time.perf_counter() - t_download
) * 1000
data_sizes["metadata_bytes_downloaded"] += metadata_size
data_sizes["metadata_files_count"] += 1
fs, fs_path = fsspec.core.url_to_fs(f"file://{cached_file_path}")
else:
# Not using cache - check if file exists
if not original_fs.exists(original_fs_path):
if verbose:
print(
f" Skipping {rel_path} - metadata file not found: {metadata_file_path}"
)
continue
fs, fs_path = original_fs, original_fs_path
# Get metadata file size if available (we download the whole file)
try:
if hasattr(fs, "info"):
file_info = fs.info(fs_path)
if "size" in file_info:
data_sizes["metadata_bytes_downloaded"] += file_info["size"]
data_sizes["metadata_files_count"] += 1
except Exception:
pass # Ignore if we can't get file size
batch_requests = [
{"path": file_path, "row_group_index": idx}
for idx in batch_indices
]
metadata_results = await async_fetch_row_group_metadata_batch(
batch_requests, base_url=base_url, session=session
timing_breakdown["s3_exists_check_ms"] += (
time.perf_counter() - t_exists
) * 1000
# Read metadata file with row group filtering
read_start = time.perf_counter()
t_open = time.perf_counter()
with fs.open(fs_path, "rb") as f:
timing_breakdown["s3_open_ms"] += (time.perf_counter() - t_open) * 1000
t_parquet_init = time.perf_counter()
parquet_file = pq.ParquetFile(f)
timing_breakdown["parquet_file_init_ms"] += (
time.perf_counter() - t_parquet_init
) * 1000
# Extract metadata_json from schema metadata
t_schema = time.perf_counter()
schema = parquet_file.schema_arrow
metadata_json_bytes = schema.metadata.get(b"metadata_json")
if not metadata_json_bytes:
if verbose:
print(
f" Warning: {rel_path} metadata file missing metadata_json in schema"
)
continue
timing_breakdown["schema_extract_ms"] += (
time.perf_counter() - t_schema
) * 1000
t_meta_end = time.perf_counter()
t_json = time.perf_counter()
metadata_json_str = metadata_json_bytes.decode()
timing_breakdown["json_decode_ms"] += (
time.perf_counter() - t_json
) * 1000
async with timing_lock:
metadata_fetch_ms = (t_meta_end - t_meta_start) * 1000
metadata_total_ms += metadata_fetch_ms
# Track longest individual metadata fetch
if metadata_fetch_ms > longest_metadata_fetch_ms:
longest_metadata_fetch_ms = metadata_fetch_ms
# Track wall-clock: first start to last end
if (
metadata_first_start is None
or t_meta_start < metadata_first_start
):
metadata_first_start = t_meta_start
if (
metadata_last_end is None
or t_meta_end > metadata_last_end
):
metadata_last_end = t_meta_end
# Filter row groups using statistics before reading
t_rg_filter = time.perf_counter()
matching_row_groups = []
metadata = parquet_file.metadata
total_row_groups = metadata.num_row_groups
# Convert to RowGroupMetadata objects
rg_metadata_list = [
RowGroupMetadata(
path=m["path"],
row_group_index=m["row_group_index"],
start_offset=m["start_offset"],
end_offset=m["end_offset"],
api_metadata=m["api_metadata"],
for rg_idx in range(total_row_groups):
rg = metadata.row_group(rg_idx)
# Get h3_min and h3_max statistics from row group
# Find the column indices for h3_min and h3_max
h3_min_col_idx = None
h3_max_col_idx = None
for col_idx in range(rg.num_columns):
# Get column name from schema
col_name = schema.names[col_idx]
if col_name == "h3_min":
h3_min_col_idx = col_idx
elif col_name == "h3_max":
h3_max_col_idx = col_idx
# Check if this row group could match any query range
if h3_min_col_idx is not None and h3_max_col_idx is not None:
h3_min_col = rg.column(h3_min_col_idx)
h3_max_col = rg.column(h3_max_col_idx)
# Get min/max statistics
rg_h3_min = None
rg_h3_max = None
if (
h3_min_col.statistics
and h3_min_col.statistics.min is not None
):
rg_h3_min = (
h3_min_col.statistics.min.as_py()
if hasattr(h3_min_col.statistics.min, "as_py")
else int(h3_min_col.statistics.min)
)
for m in metadata_results
]
if (
h3_max_col.statistics
and h3_max_col.statistics.max is not None
):
rg_h3_max = (
h3_max_col.statistics.max.as_py()
if hasattr(h3_max_col.statistics.max, "as_py")
else int(h3_max_col.statistics.max)
)
# Create size-based groups from this batch
groups = create_size_based_groups(rg_metadata_list, batch_size)
if rg_h3_min is not None and rg_h3_max is not None:
# Check if row group range overlaps with any query range
matches = False
for q_min, q_max in hex_ranges_list:
# Range overlap: rg_h3_min <= q_max AND rg_h3_max >= q_min
if rg_h3_min <= q_max and rg_h3_max >= q_min:
matches = True
break
async with timing_lock:
num_groups += len(groups)
if matches:
matching_row_groups.append(rg_idx)
else:
# No statistics available - include this row group to be safe
matching_row_groups.append(rg_idx)
else:
# Can't filter - include this row group to be safe
matching_row_groups.append(rg_idx)
# Enqueue each group for download
for group in groups:
await download_queue.put(group)
timing_breakdown["row_group_filter_ms"] += (
time.perf_counter() - t_rg_filter
) * 1000
finally:
# Mark this producer as done
num_producers -= 1
if num_producers == 0:
producers_done.set()
# Store row group filtering stats for verbose output
matching_row_groups_count = len(matching_row_groups)
timing_breakdown["row_group_stats"]["total"] += total_row_groups
timing_breakdown["row_group_stats"]["matched"] += (
matching_row_groups_count
)
async def download_consumer() -> None:
"""Download groups from the queue as they become available."""
nonlocal download_total_ms
nonlocal download_first_start, download_last_end, longest_download_ms
# Read only matching row groups
t_read_table = time.perf_counter()
if matching_row_groups:
metadata_table = parquet_file.read_row_groups(matching_row_groups)
else:
# No matching row groups - skip this file
if verbose:
print(
f" No matching row groups in {rel_path} (checked {total_row_groups} row groups)"
)
continue
while True:
# Wait for either a group or all producers to be done
timing_breakdown["read_table_ms"] += (
time.perf_counter() - t_read_table
) * 1000
data_sizes["metadata_bytes_read"] += metadata_table.nbytes
# Filter by H3 ranges on the read data (in case row group stats weren't precise)
t_filter = time.perf_counter()
filtered_table = _filter_by_h3_ranges(metadata_table, hex_ranges_list)
timing_breakdown["filter_ms"] += (time.perf_counter() - t_filter) * 1000
# Store reference table for size calculations
reference_table = metadata_table
if len(filtered_table) == 0:
continue
metadata_read_time += time.perf_counter() - read_start
# Track actual bytes used (size of filtered table)
if len(filtered_table) > 0:
try:
# Use a timeout to periodically check if producers are done
group = await asyncio.wait_for(download_queue.get(), timeout=0.1)
except asyncio.TimeoutError:
if producers_done.is_set() and download_queue.empty():
break
continue
# Calculate size of filtered table in bytes
filtered_size = filtered_table.nbytes
data_sizes["metadata_bytes_used"] += filtered_size
except Exception:
# Fallback: estimate based on number of rows
# Approximate size per row (rough estimate)
if len(reference_table) > 0:
avg_row_size = reference_table.nbytes / len(reference_table)
data_sizes["metadata_bytes_used"] += avg_row_size * len(
filtered_table
)
if group is None:
break
if len(filtered_table) == 0:
continue
# Download and reconstruct all row groups in this group
metadata_list = [
{
"path": rg.path,
"row_group_index": rg.row_group_index,
"start_offset": rg.start_offset,
"end_offset": rg.end_offset,
"api_metadata": rg.api_metadata,
}
for rg in group.row_groups
]
# Convert to row group format
# Add file_path and metadata_json to each row group
# Vectorized conversion: convert entire columns to lists at once
t_convert = time.perf_counter()
row_group_indices = filtered_table["row_group_index"].to_pylist()
start_offsets = filtered_table["start_offset"].to_pylist()
end_offsets = filtered_table["end_offset"].to_pylist()
row_group_bytes_list = filtered_table["row_group_bytes"].to_pylist()
t_dl_start = time.perf_counter()
tables = await async_read_combined_row_groups(
path=group.path,
row_group_metadata_list=metadata_list,
base_url=base_url,
columns=columns,
# Create all row groups at once using list comprehension
new_row_groups = [
{
"path": full_source_path,
"row_group_index": idx,
"start_offset": start,
"end_offset": end,
"metadata_json": metadata_json_str, # Use the JSON string directly
"row_group_bytes": bytes_str,
}
for idx, start, end, bytes_str in zip(
row_group_indices, start_offsets, end_offsets, row_group_bytes_list
)
t_dl_end = time.perf_counter()
]
all_row_groups.extend(new_row_groups)
timing_breakdown["convert_to_row_groups_ms"] += (
time.perf_counter() - t_convert
) * 1000
async with timing_lock:
download_ms = (t_dl_end - t_dl_start) * 1000
download_total_ms += download_ms
# Track longest individual download
if download_ms > longest_download_ms:
longest_download_ms = download_ms
# Track wall-clock: first start to last end
if (
download_first_start is None
or t_dl_start < download_first_start
):
download_first_start = t_dl_start
if download_last_end is None or t_dl_end > download_last_end:
download_last_end = t_dl_end
except Exception as e:
if verbose:
print(f" Warning: Failed to read metadata for {rel_path}: {e}")
continue
async with results_lock:
results.extend(tables)
# Calculate total data size from row groups
for rg in all_row_groups:
start_offset = rg.get("start_offset")
end_offset = rg.get("end_offset")
if start_offset is not None and end_offset is not None:
data_sizes["data_bytes"] += end_offset - start_offset
download_queue.task_done()
t_read = time.perf_counter()
# Start all producers and a consumer
producer_tasks = [
asyncio.create_task(metadata_producer(path, indices))
for path, indices in by_file.items()
]
# Calculate download rates
metadata_download_rate = 0.0
if (
timing_breakdown["s3_download_ms"] > 0
and data_sizes["metadata_bytes_downloaded"] > 0
):
metadata_download_rate = (
data_sizes["metadata_bytes_downloaded"] / 1024 / 1024
) / (timing_breakdown["s3_download_ms"] / 1000) # MB/s
# Start multiple consumers for better parallelism
num_consumers = min(len(by_file), 8) # At most 8 concurrent downloads
consumer_tasks = [
asyncio.create_task(download_consumer()) for _ in range(num_consumers)
]
if verbose:
print(
f" Read metadata: {(t_read - t0) * 1000:.1f}ms ({len(all_row_groups)} row groups from {len(source_files)} files)"
)
print(" Data sizes:")
if data_sizes["metadata_bytes_downloaded"] > 0:
metadata_downloaded_mb = (
data_sizes["metadata_bytes_downloaded"] / 1024 / 1024
)
metadata_read_mb = data_sizes["metadata_bytes_read"] / 1024 / 1024
metadata_used_mb = data_sizes["metadata_bytes_used"] / 1024 / 1024
print(
f" Metadata files downloaded: {metadata_downloaded_mb:.2f} MB ({data_sizes['metadata_files_count']} files)"
)
if metadata_read_mb > 0:
read_efficiency = (
(metadata_read_mb / metadata_downloaded_mb * 100)
if metadata_downloaded_mb > 0
else 0
)
print(
f" Metadata bytes read (row group filtered): {metadata_read_mb:.2f} MB ({read_efficiency:.1f}% of downloaded)"
)
if metadata_used_mb > 0:
efficiency = (
(metadata_used_mb / metadata_downloaded_mb * 100)
if metadata_downloaded_mb > 0
else 0
)
print(
f" Metadata bytes used (after filtering): {metadata_used_mb:.2f} MB ({efficiency:.1f}% of downloaded)"
)
if timing_breakdown["s3_download_ms"] > 0:
print(
f" Metadata download rate: {metadata_download_rate:.2f} MB/s"
)
data_mb = data_sizes["data_bytes"] / 1024 / 1024
print(
f" Row group data: {data_mb:.2f} MB ({len(all_row_groups)} row groups)"
)
print(" Timing breakdown:")
print(f" File listing: {timing_breakdown['file_listing_ms']:.1f}ms")
if use_local_cache:
print(f" Cache setup: {timing_breakdown['cache_setup_ms']:.1f}ms")
print(f" S3 exists check: {timing_breakdown['s3_exists_check_ms']:.1f}ms")
if use_local_cache:
print(
f" S3 download (to cache): {timing_breakdown['s3_download_ms']:.1f}ms"
)
print(f" S3 open: {timing_breakdown['s3_open_ms']:.1f}ms")
print(
f" ParquetFile init: {timing_breakdown['parquet_file_init_ms']:.1f}ms"
)
if timing_breakdown["row_group_filter_ms"] > 0:
# Get row group stats from timing_breakdown if available
rg_stats = timing_breakdown.get("row_group_stats", {})
total_rgs = rg_stats.get("total", 0)
matched_rgs = rg_stats.get("matched", 0)
if total_rgs > 0:
efficiency = (matched_rgs / total_rgs * 100) if total_rgs > 0 else 0
skipped_rgs = total_rgs - matched_rgs
print(
f" Row group filter (stats): {timing_breakdown['row_group_filter_ms']:.1f}ms"
)
print(
f" Total row groups: {total_rgs:,}, Matched: {matched_rgs:,}, Skipped: {skipped_rgs:,} ({efficiency:.1f}% match rate)"
)
else:
print(
f" Row group filter (stats): {timing_breakdown['row_group_filter_ms']:.1f}ms"
)
print(f" Read table: {timing_breakdown['read_table_ms']:.1f}ms")
print(f" Schema extract: {timing_breakdown['schema_extract_ms']:.1f}ms")
print(f" JSON decode: {timing_breakdown['json_decode_ms']:.1f}ms")
print(f" Filter: {timing_breakdown['filter_ms']:.1f}ms")
print(
f" Convert to row groups: {timing_breakdown['convert_to_row_groups_ms']:.1f}ms"
)
# Wait for all producers to finish
await asyncio.gather(*producer_tasks)
if len(all_row_groups) == 0:
if return_timing_info:
timing_info = {
"metadata_ms": (t_read - t0) * 1000,
"download_ms": 0,
"num_groups": 0,
"timing_breakdown": timing_breakdown,
"data_sizes": data_sizes,
"metadata_download_rate_mb_s": metadata_download_rate,
"data_download_rate_mb_s": 0.0,
"data_download_wall_rate_mb_s": 0.0,
}
return pa.table({}), timing_info
return pa.table({})
# Wait for all items to be processed
await download_queue.join()
# Validate that all row groups have required metadata
t_validate = time.perf_counter()
for rg in all_row_groups:
missing_fields = []
if rg.get("start_offset") is None:
missing_fields.append("start_offset")
if rg.get("end_offset") is None:
missing_fields.append("end_offset")
if not rg.get("metadata_json"):
missing_fields.append("metadata_json")
if not rg.get("row_group_bytes"):
missing_fields.append("row_group_bytes")
# Signal consumers to stop (they'll exit when queue is empty and producers done)
# Cancel remaining consumer tasks
for task in consumer_tasks:
task.cancel()
if missing_fields:
raise ValueError(
f"Row group {rg.get('row_group_index')} in {rg.get('path')} is missing required metadata: "
f"{', '.join(missing_fields)}. The dataset needs to be re-indexed with persist_hex_table_metadata()."
)
timing_breakdown["validation_ms"] = (time.perf_counter() - t_validate) * 1000
# Wait for consumers to finish (ignore cancellation errors)
await asyncio.gather(*consumer_tasks, return_exceptions=True)
# Get the batch size from options if not provided
if batch_size is None:
batch_size = OPTIONS.row_group_batch_size
# Calculate wall-clock times
metadata_wall_ms = 0.0
if metadata_first_start is not None and metadata_last_end is not None:
metadata_wall_ms = (metadata_last_end - metadata_first_start) * 1000
if verbose:
print(f" Using batch size: {batch_size} bytes")
print(" Using persisted metadata (no API calls)")
download_wall_ms = 0.0
if download_first_start is not None and download_last_end is not None:
download_wall_ms = (download_last_end - download_first_start) * 1000
# Run the prefetched fetch and download
async def _run_with_cleanup():
"""Run coroutine and clean up shared S3 client afterwards."""
from job2.fasttortoise._h3_read import _fetch_with_combining_prefetched
try:
return await _fetch_with_combining_prefetched(
all_row_groups,
"", # base_url not used for persisted metadata
columns,
batch_size,
verbose,
max_concurrent_downloads,
)
finally:
# Clean up shared S3 client to prevent "Unclosed" warnings
try:
from job2.fasttortoise._reconstruction import _shared_async_s3_client
if _shared_async_s3_client is not None:
await _shared_async_s3_client.close()
except Exception:
pass
tables, download_timing_info = _run_async(_run_with_cleanup())
t_fetch = time.perf_counter()
timing_breakdown["download_ms"] = (t_fetch - t_read) * 1000
# Calculate data download rate
data_download_rate = 0.0
data_download_wall_rate = 0.0
if download_timing_info:
download_wall_ms = download_timing_info.get("download_wall_ms", 0)
if download_wall_ms > 0 and data_sizes["data_bytes"] > 0:
data_download_wall_rate = (data_sizes["data_bytes"] / 1024 / 1024) / (
download_wall_ms / 1000
) # MB/s
download_cumulative_ms = download_timing_info.get("download_ms", 0)
if download_cumulative_ms > 0 and data_sizes["data_bytes"] > 0:
data_download_rate = (data_sizes["data_bytes"] / 1024 / 1024) / (
download_cumulative_ms / 1000
) # MB/s
if verbose:
print(f" Download: {(t_fetch - t_read) * 1000:.1f}ms")
if data_sizes["data_bytes"] > 0:
data_mb = data_sizes["data_bytes"] / 1024 / 1024
print(f" Data downloaded: {data_mb:.2f} MB")
if data_download_wall_rate > 0:
print(
f" Effective download rate: {data_download_wall_rate:.2f} MB/s (wall-clock)"
)
if data_download_rate > 0 and data_download_rate != data_download_wall_rate:
print(f" Cumulative download rate: {data_download_rate:.2f} MB/s")
if download_timing_info:
print(
f" Data download: {download_timing_info.get('download_wall_ms', 0):.1f}ms wall-clock, "
f"{download_timing_info.get('download_ms', 0):.1f}ms cumulative"
)
print(f" Download groups: {download_timing_info.get('num_groups', 0)}")
# Concatenate all tables into one
if not tables:
if return_timing_info:
timing_info = {
"metadata_ms": (t_read - t0) * 1000,
"download_ms": timing_breakdown["download_ms"],
"num_groups": download_timing_info.get("num_groups", 0)
if download_timing_info
else 0,
"timing_breakdown": timing_breakdown,
"data_sizes": data_sizes,
"metadata_download_rate_mb_s": metadata_download_rate,
"data_download_rate_mb_s": data_download_rate,
"data_download_wall_rate_mb_s": data_download_wall_rate,
}
if download_timing_info:
timing_info.update(download_timing_info)
return pa.table({}), timing_info
return pa.table({})
if len(tables) == 1:
if return_timing_info:
timing_info = {
"metadata_ms": (t_read - t0) * 1000,
"download_ms": timing_breakdown["download_ms"],
"num_groups": download_timing_info.get("num_groups", 0)
if download_timing_info
else 0,
"timing_breakdown": timing_breakdown,
"data_sizes": data_sizes,
"metadata_download_rate_mb_s": metadata_download_rate,
"data_download_rate_mb_s": data_download_rate,
"data_download_wall_rate_mb_s": data_download_wall_rate,
}
if download_timing_info:
timing_info.update(download_timing_info)
return tables[0], timing_info
return tables[0]
t_concat_start = time.perf_counter()
result = pa.concat_tables(tables, promote_options="permissive")
t_concat = time.perf_counter()
timing_breakdown["concat_ms"] = (t_concat - t_concat_start) * 1000
if verbose:
print(f" Concat tables: {(t_concat - t_concat_start) * 1000:.1f}ms")
if return_timing_info:
timing_info = {
"metadata_ms": metadata_total_ms, # Cumulative
"metadata_wall_ms": metadata_wall_ms, # Wall-clock (first to last)
"longest_metadata_fetch_ms": longest_metadata_fetch_ms, # Longest individual metadata fetch
"download_ms": download_total_ms, # Cumulative
"download_wall_ms": download_wall_ms, # Wall-clock (first to last)
"longest_download_ms": longest_download_ms, # Longest individual download
"num_groups": num_groups,
"metadata_ms": (t_read - t0) * 1000,
"download_ms": timing_breakdown["download_ms"],
"num_groups": download_timing_info.get("num_groups", 0)
if download_timing_info
else 0,
"timing_breakdown": timing_breakdown,
"total_ms": (t_concat - t0) * 1000,
"data_sizes": data_sizes,
"metadata_download_rate_mb_s": metadata_download_rate,
"data_download_rate_mb_s": data_download_rate,
"data_download_wall_rate_mb_s": data_download_wall_rate,
}
if download_timing_info:
timing_info.update(download_timing_info)
return result, timing_info
return result
return results, timing_info
finally:
# Always close our session to prevent "Unclosed" warnings
if not session.closed:
await session.close()
def _filter_by_h3_ranges(table: "pa.Table", hex_ranges: List[List[int]]) -> "pa.Table":
"""Filter metadata table by H3 range overlap.
For each query range [q_min, q_max], finds rows where:
h3_min <= q_max AND h3_max >= q_min (range overlap)
Args:
table: PyArrow table with h3_min and h3_max columns
hex_ranges: List of [min, max] integer pairs
Returns:
Filtered table with only matching row groups
"""
import pyarrow.compute as pc
if not hex_ranges:
return table
masks = []
for q_min, q_max in hex_ranges:
# Range overlap: h3_min <= q_max AND h3_max >= q_min
mask = pc.and_(
pc.less_equal(table["h3_min"], q_max),
pc.greater_equal(table["h3_max"], q_min),
)
masks.append(mask)
# Combine all masks with OR
combined = masks[0]
for m in masks[1:]:
combined = pc.or_(combined, m)
return table.filter(combined)

@@ -133,2 +133,5 @@ import asyncio

elif isinstance(udf, UdfJobStepConfig):
return ResolvedLocalJobStepUdf(job_step=udf, udf=udf.udf)
elif isinstance(udf, Udf):

@@ -176,2 +179,5 @@ job_step = UdfJobStepConfig(udf=udf, name=udf.name)

elif isinstance(udf, UdfJobStepConfig):
return ResolvedLocalJobStepUdf(job_step=udf, udf=udf.udf)
elif isinstance(udf, Udf):

@@ -178,0 +184,0 @@ job_step = UdfJobStepConfig(udf=udf, name=udf.name)

@@ -1,1 +0,1 @@

__version__ = "1.28.0"
__version__ = "1.29.0"

@@ -280,3 +280,7 @@ from __future__ import annotations

def hash_udf(udf: dict[str, Any], valid_extensions: list[str]) -> str:
def hash_udf(
udf: dict[str, Any],
valid_extensions: list[str],
input_data: list[Any] | None = None,
) -> str:
UNIQUE_FIELDS = ["code", "headers", "parameters", "entrypoint", "type"]

@@ -298,2 +302,7 @@ fields_to_hash = {k: v for k, v in udf.items() if k in UNIQUE_FIELDS}

fields_to_hash["valid_extensions"] = valid_extensions
# Include input data in hash if provided (for n_processes_per_worker scenarios)
if input_data is not None:
fields_to_hash["input_data"] = input_data
text = json.dumps(fields_to_hash, sort_keys=True)

@@ -300,0 +309,0 @@ return hashlib.sha256(text.encode("UTF-8")).hexdigest()

@@ -556,2 +556,3 @@ from __future__ import annotations

send_status_email: bool | None = None,
name: str | None = None,
) -> RunResponse:

@@ -571,2 +572,3 @@ """Execute this operation

send_status_email: Whether to send a status email to the user when the job is complete.
name: The name of the job. Defaults to None for a generated name.
"""

@@ -599,2 +601,3 @@ to_run = self.model_copy(deep=True)

send_status_email=send_status_email,
name=name,
)

@@ -601,0 +604,0 @@

@@ -30,5 +30,9 @@ from __future__ import annotations

"""Whether the instance has been terminated."""
last_heartbeat: datetime | None = None
"""When the last heartbeat was received from the job."""
job_status: StrictStr | None = None
"""The status of the job."""
job_status_date: datetime | None = None
"""When the job_status was last updated"""
finished_job_status: StrictBool | None = None

@@ -35,0 +39,0 @@ """Whether the job has finished running."""

@@ -5,10 +5,23 @@ from typing import Any, Dict

class MockUdfInput:
def __init__(self, data: Any) -> None:
def __init__(self, data: Any, as_kwargs: bool = False) -> None:
"""
Mock input for UDF execution.
Args:
data: The input data
as_kwargs: If True and data is a dict, pass it through as kwargs directly.
If False (default), wrap data in "bounds" parameter (for tile/bbox).
"""
self._data = data
self._as_kwargs = as_kwargs
def as_udf_args(self) -> Dict[str, Any]:
kwargs = {}
if self._data is not None:
kwargs["bounds"] = self._data
return kwargs
if self._as_kwargs and isinstance(self._data, dict):
# Pass through the dict as kwargs directly for input list processing
return self._data
else:
# Original behavior for tile/bbox
kwargs = {}
if self._data is not None:
kwargs["bounds"] = self._data
return kwargs
Metadata-Version: 2.4
Name: fused
Version: 1.28.0
Version: 1.29.0
Project-URL: Homepage, https://www.fused.io

@@ -5,0 +5,0 @@ Project-URL: Documentation, https://docs.fused.io

"""Size-based grouping logic for row group downloads."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
@dataclass
class RowGroupMetadata:
"""Metadata for a single row group with byte offset information.
Attributes:
path: File path (S3 or HTTP URL)
row_group_index: Index of the row group in the file (0-based)
start_offset: Starting byte offset in the file
end_offset: Ending byte offset in the file (exclusive)
size: Size of the row group data in bytes (end_offset - start_offset)
api_metadata: Full metadata dict from API (needed for reconstruction)
"""
path: str
row_group_index: int
start_offset: int
end_offset: int
api_metadata: Dict[str, Any]
@property
def size(self) -> int:
"""Size of the row group data in bytes."""
return self.end_offset - self.start_offset
@dataclass
class DownloadGroup:
"""A group of consecutive row groups from the same file to download together.
Attributes:
path: File path (S3 or HTTP URL)
row_groups: List of RowGroupMetadata objects (must be consecutive indices)
start_offset: Starting byte offset for the combined download
end_offset: Ending byte offset for the combined download (exclusive)
total_size: Total size of all row groups in this group
"""
path: str
row_groups: List[RowGroupMetadata] = field(default_factory=list)
@property
def start_offset(self) -> int:
"""Starting byte offset for the combined download."""
if not self.row_groups:
return 0
return min(rg.start_offset for rg in self.row_groups)
@property
def end_offset(self) -> int:
"""Ending byte offset for the combined download (exclusive)."""
if not self.row_groups:
return 0
return max(rg.end_offset for rg in self.row_groups)
@property
def total_size(self) -> int:
"""Total size of all row groups in this group."""
return self.end_offset - self.start_offset
@property
def row_group_indices(self) -> List[int]:
"""List of row group indices in this group."""
return [rg.row_group_index for rg in self.row_groups]
def add_row_group(self, rg: RowGroupMetadata) -> None:
"""Add a row group to this download group.
Args:
rg: RowGroupMetadata to add
Raises:
ValueError: If row group is from a different file or not consecutive
"""
if rg.path != self.path:
raise ValueError(
f"Cannot add row group from {rg.path} to group for {self.path}"
)
if self.row_groups:
last_index = self.row_groups[-1].row_group_index
if rg.row_group_index != last_index + 1:
raise ValueError(
f"Row group {rg.row_group_index} is not consecutive with "
f"last index {last_index}"
)
self.row_groups.append(rg)
def group_row_groups_by_file(
row_group_items: List[Dict[str, Any]],
) -> Dict[str, List[int]]:
"""Group row group items by file path.
Args:
row_group_items: List of dicts with 'path' and 'row_group_index' keys
Returns:
Dict mapping file path to list of row group indices (sorted)
"""
by_file: Dict[str, List[int]] = {}
for item in row_group_items:
path = item["path"]
rg_index = item["row_group_index"]
if path not in by_file:
by_file[path] = []
by_file[path].append(rg_index)
# Sort indices for each file
for path in by_file:
by_file[path].sort()
return by_file
def find_consecutive_runs(indices: List[int]) -> List[List[int]]:
"""Find runs of consecutive integers in a sorted list.
Args:
indices: Sorted list of integers
Returns:
List of lists, where each inner list contains consecutive integers.
Examples:
>>> find_consecutive_runs([0, 1, 2, 3])
[[0, 1, 2, 3]]
>>> find_consecutive_runs([0, 2, 4, 6])
[[0], [2], [4], [6]]
>>> find_consecutive_runs([0, 1, 5, 6, 7, 10])
[[0, 1], [5, 6, 7], [10]]
>>> find_consecutive_runs([])
[]
>>> find_consecutive_runs([5])
[[5]]
"""
if not indices:
return []
runs: List[List[int]] = [[indices[0]]]
for idx in indices[1:]:
if idx == runs[-1][-1] + 1:
# Consecutive - extend current run
runs[-1].append(idx)
else:
# Gap - start new run
runs.append([idx])
return runs
def create_size_based_groups(
row_group_metadata: List[RowGroupMetadata],
target_size: int,
) -> List[DownloadGroup]:
"""Create download groups from row group metadata based on cumulative size.
Groups consecutive row groups until the cumulative size reaches the target.
Row groups that are already larger than the target stay as single groups.
Gaps in row group indices force new groups.
Args:
row_group_metadata: List of RowGroupMetadata (should be from the same file
and sorted by row_group_index)
target_size: Target size in bytes for each group
Returns:
List of DownloadGroup objects
Examples:
With 3 row groups of sizes [10KB, 15KB, 20KB] and target 32KB:
- Group 1: [rg0, rg1] = 25KB (adding rg2 would exceed 32KB)
- Group 2: [rg2] = 20KB
"""
if not row_group_metadata:
return []
# All metadata should be for the same file
path = row_group_metadata[0].path
# Sort by row group index
sorted_metadata = sorted(row_group_metadata, key=lambda rg: rg.row_group_index)
groups: List[DownloadGroup] = []
current_group: Optional[DownloadGroup] = None
for rg in sorted_metadata:
# Check if we need to start a new group
start_new_group = False
if current_group is None:
start_new_group = True
elif current_group.row_groups:
last_index = current_group.row_groups[-1].row_group_index
# Non-consecutive indices force a new group
if rg.row_group_index != last_index + 1:
start_new_group = True
# Check if adding this row group would exceed the target
# We want batches to stay under the target size
elif current_group.total_size + rg.size > target_size:
start_new_group = True
if start_new_group:
if current_group is not None and current_group.row_groups:
groups.append(current_group)
current_group = DownloadGroup(path=path)
current_group.add_row_group(rg)
# Don't forget the last group
if current_group is not None and current_group.row_groups:
groups.append(current_group)
return groups
def create_download_groups_from_metadata(
metadata_by_file: Dict[str, List[RowGroupMetadata]],
target_size: int,
) -> List[DownloadGroup]:
"""Create download groups from all files based on cumulative size.
Args:
metadata_by_file: Dict mapping file path to list of RowGroupMetadata
target_size: Target size in bytes for each group
Returns:
List of DownloadGroup objects from all files
"""
all_groups: List[DownloadGroup] = []
for path, metadata_list in metadata_by_file.items():
groups = create_size_based_groups(metadata_list, target_size)
all_groups.extend(groups)
return all_groups

Sorry, the diff of this file is too big to display