fused.h3
The fused.h3 module provides functions for working with H3-indexed datasets, including ingestion, reading, and metadata management.
import fused
# Ingest raster data to H3
fused.h3.run_ingest_raster_to_h3(input_path="s3://...", output_path="s3://...")
# Read H3-indexed data
table = fused.h3.read_hex_table(...)
persist_hex_table_metadata
persist_hex_table_metadata(
dataset_path: str,
output_path: str | None = None,
metadata_path: str | None = None,
verbose: bool = False,
row_group_size: int = 500,
indexed_columns: list[str] | None = None,
pool_size: int = 10,
metadata_compression_level: int | None = None,
) -> str
Persist metadata for a hex table to enable serverless queries.
Scans all parquet files in the dataset, extracts metadata needed for fast reconstruction, and writes one .metadata.parquet file per source file.
Each metadata file contains:
- Row group metadata (offsets, H3 ranges, etc.)
- Full metadata_json stored as custom metadata in the parquet schema
Requires a hex column (or variant like h3, h3_index) in all files.
Raises ValueError if no hex column is found.
Parameters:
- dataset_path (
str) – Path to the dataset directory (e.g.,"s3://bucket/dataset/") - output_path (
str | None) – Deprecated; usemetadata_pathinstead. - metadata_path (
str | None) – Where to write metadata files. If None, writes to{source_dir}/.fused/{source_filename}.metadata.parquetper source file. If provided, writes to{metadata_path}/{full_source_path}.metadata.parquet(allows storing metadata elsewhere when you don't have write access to the dataset directory). - verbose (
bool) – If True, print timing/progress information. - row_group_size (
int) – Rows per row group in output. Default 500. - indexed_columns (
list[str] | None) – Column names to index. If None, indexes only the first identified indexed column; if[], indexes all detected indexed columns. - pool_size (
int) – Parallel workers. Default 10; set to 1 for sequential. - metadata_compression_level (
int | None) – Optional zstd compression level for metadata parquet files (e.g. 1–22).
Returns:
str– Path to metadata directory (ifmetadata_pathprovided) or first metadata file path.
Raises:
ImportError(job2 required)ValueError(no hex column)FileNotFoundError(no parquet files).
Example:
>>> fused.h3.persist_hex_table_metadata("s3://my-bucket/my-dataset/")
's3://my-bucket/my-dataset/.fused/file1.metadata.parquet'
>>> fused.h3.persist_hex_table_metadata("s3://my-bucket/my-dataset/", metadata_path="s3://my-bucket/metadata/")
's3://my-bucket/metadata/'
>>> fused.h3.persist_hex_table_metadata("s3://my-bucket/my-dataset/", pool_size=4)
's3://my-bucket/my-dataset/.fused/file1.metadata.parquet'
read_hex_table
read_hex_table(
dataset_path: str,
hex_ranges_list: list[list[int]],
columns: list[str] | None = None,
base_url: str | None = None,
verbose: bool = False,
return_timing_info: bool = False,
batch_size: int | None = None,
max_concurrent_downloads: int | None = None,
) -> pa.Table | tuple[pa.Table, dict[str, Any]]
Read data from an H3-indexed dataset by querying hex ranges.
This is an optimized version that assumes the server always provides full metadata (start_offset, end_offset, metadata, and row_group_bytes) for all row groups. If any row group is missing required metadata, this function will raise an error indicating that the dataset needs to be re-indexed.
This function eliminates all metadata API calls by using prefetched metadata from the /datasets/items-with-metadata endpoint.
Parameters:
- dataset_path (
str) – Path to the H3-indexed dataset (e.g., "s3://bucket/dataset/") - hex_ranges_list (
list[list[int]]) – List of [min_hex, max_hex] pairs as integers. Example: [[622236719905341439, 622246719905341439]] - columns (
list[str] | None) – Optional list of column names to read. If None, reads all columns. - base_url (
str | None) – Base URL for API. If None, uses current environment. - verbose (
bool) – If True, print progress information. Default is False. - return_timing_info (
bool) – If True, return a tuple of (table, timing_info) instead of just the table. Default is False for backward compatibility. - batch_size (
int | None) – Target size in bytes for combining row groups. If None, usesfused.options.row_group_batch_size(default: 32KB). - max_concurrent_downloads (
int | None) – Maximum number of simultaneous download operations. If None, uses a default based on the number of files. Default is None.
Returns:
- PyArrow Table containing the concatenated data from all matching row groups. If return_timing_info is True, returns a tuple of (table, timing_info dict).
Raises: ValueError: If any row group is missing required metadata (start_offset, end_offset, metadata, or row_group_bytes). This indicates the dataset needs to be re-indexed.
Example:
import fused
# Read data for a specific H3 hex range
table = fused.h3.read_hex_table(
dataset_path="s3://my-bucket/my-h3-dataset/",
hex_ranges_list=[[622236719905341439, 622246719905341439]]
)
df = table.to_pandas()
read_hex_table_slow
read_hex_table_slow(
dataset_path: str,
hex_ranges_list: list[list[int]],
columns: list[str] | None = None,
base_url: str | None = None,
verbose: bool = False,
return_timing_info: bool = False,
metadata_batch_size: int = 50,
) -> pa.Table | tuple[pa.Table, dict[str, Any]]
Read data from an H3-indexed dataset by querying hex ranges.
This function queries the dataset index for row groups that match the given H3 hex ranges, downloads them in parallel with optimized batching, and returns a concatenated table.
Adjacent row groups from the same file are combined into single downloads for better S3 performance. The batch size is controlled by fused.options.row_group_batch_size (default: 32KB).
Parameters:
- dataset_path (
str) – Path to the H3-indexed dataset (e.g., "s3://bucket/dataset/") - hex_ranges_list (
list[list[int]]) – List of [min_hex, max_hex] pairs as integers. Example: [[622236719905341439, 622246719905341439]] - columns (
list[str] | None) – Optional list of column names to read. If None, reads all columns. - base_url (
str | None) – Base URL for API. If None, uses current environment. - verbose (
bool) – If True, print progress information. Default is False. - return_timing_info (
bool) – If True, return a tuple of (table, timing_info) instead of just the table. Default is False for backward compatibility. - metadata_batch_size (
int) – Maximum number of row group metadata requests to batch together in a single API call. Larger batches reduce API overhead. Default is 50. Consider MongoDB's 16KB document limit when adjusting this value.
Returns:
- PyArrow Table containing the concatenated data from all matching row groups. If return_timing_info is True, returns a tuple of (table, timing_info dict).
Example:
import fused
# Read data for a specific H3 hex range
table = fused.h3.read_hex_table_slow(
dataset_path="s3://my-bucket/my-h3-dataset/",
hex_ranges_list=[[622236719905341439, 622246719905341439]]
)
df = table.to_pandas()
read_hex_table_with_persisted_metadata
read_hex_table_with_persisted_metadata(
dataset_path: str,
hex_ranges_list: list[list[int]],
columns: list[str] | None = None,
metadata_path: str | None = None,
verbose: bool = False,
return_timing_info: bool = False,
batch_size: int | None = None,
max_concurrent_downloads: int | None = None,
max_concurrent_metadata_reads: int | None = None,
) -> pa.Table | tuple[pa.Table, dict[str, Any]]
Read data from an H3-indexed dataset using persisted metadata parquet files (no server).
Reads from per-file .metadata.parquet files instead of querying a server. Metadata location: {source_dir}/.fused/{source_filename}.metadata.parquet, or when metadata_path is provided, {metadata_path}/{full_source_path}.metadata.parquet. Supports subdirectory queries: if dataset_path is a subdirectory, metadata is looked up for files in that subdirectory.
Parameters:
- dataset_path (
str) – Path to the H3-indexed dataset (e.g.,"s3://bucket/dataset/") or a subdirectory for filtering. - hex_ranges_list (
list[list[int]]) – List of [min_hex, max_hex] pairs as integers. - columns (
list[str] | None) – Columns to read. If None, reads all. - metadata_path (
str | None) – Where metadata files are stored. If None, uses{source_dir}/.fused/per source file; if provided, reads from this location using full source paths (e.g. when you don't have access to the dataset directory). - verbose (
bool) – If True, print progress. Default False. - return_timing_info (
bool) – If True, return (table, timing_info) instead of just the table. - batch_size (
int | None) – Target size in bytes for combining row groups. If None, usesfused.options.row_group_batch_size. - max_concurrent_downloads (
int | None) – Max simultaneous downloads. If None, default based on number of files. - max_concurrent_metadata_reads (
int | None) – Max simultaneous metadata file reads. If None, defaults tomin(10, len(source_files)).
Returns:
pa.Table– PyArrow Table (or (table, timing_info) ifreturn_timing_info).
Raises: FileNotFoundError (metadata file not found), ValueError (row group missing required metadata).
Example:
import fused
# First, persist metadata
fused.h3.persist_hex_table_metadata("s3://my-bucket/my-dataset/")
# Then read using persisted metadata (no server needed)
table = fused.h3.read_hex_table_with_persisted_metadata(
dataset_path="s3://my-bucket/my-dataset/",
hex_ranges_list=[[622236719905341439, 622246719905341439]]
)
run_ingest_raster_to_h3
run_ingest_raster_to_h3(
input_path: str | list[str],
output_path: str,
metrics: str | list[str] = "cnt",
res: int | None = None,
k_ring: int = 1,
res_offset: int = 1,
chunk_res: int | None = None,
file_res: int | None = None,
overview_res: list[int] | None = None,
overview_chunk_res: int | list[int] | None = None,
max_rows_per_chunk: int = 0,
include_source_url: bool = True,
target_chunk_size: int | None = None,
nodata: int | float | None = None,
debug_mode: bool = False,
remove_tmp_files: bool = True,
tmp_path: str | None = None,
overwrite: bool = False,
steps: list[str] | None = None,
extract_instance_type: str | None = None,
extract_max_workers: int = 256,
partition_instance_type: str | None = None,
partition_max_workers: int = 256,
overview_instance_type: str | None = None,
overview_max_workers: int = 256,
**kwargs,
)
Run the raster to H3 ingestion process.
This process involves multiple steps:
- extract pixels values and assign to H3 cells in chunks (extract step)
- combine the chunks per partition (file) and prepare metadata (partition step)
- create the metadata
_samplefile and overviews files
Parameters:
- input_path (
str | list[str]) – Path(s) to the input raster data. When a single path, the file is chunked for processing based ontarget_chunk_size. When a list of paths, each file is processed as one chunk. - output_path (
str) – Path for the resulting Parquet dataset. - metrics (
str | list[str]) – The metrics to compute per H3 cell. Supported:"cnt"or a list containing any of"avg","min","max","stddev","mode"(most common value), and"sum". - res (
int | None) – The resolution of the H3 cells in the output data. Pixel values are assigned to H3 cells at resolutionres + res_offsetand then aggregated tores. By default, inferred from the input data so H3 cell size is close to pixel size (e.g. 30×30m raster → resolution 11). - k_ring (
int) – The k-ring distance at resolutionres + res_offsetto which the pixel value is assigned (in addition to the center cell). Defaults to 1. - res_offset (
int) – Offset to child resolution (relative tores) at which to assign the raw pixel values to H3 cells. - file_res (
int | None) – The H3 resolution to chunk the resulting files of the Parquet dataset. By default inferred fromres. Usefile_res=-1for a single output file. - chunk_res (
int | None) – The H3 resolution to chunk the row groups within each file (ignored whenmax_rows_per_chunkis specified). By default inferred fromres. - overview_res (
list[int] | None) – The H3 resolutions for which to create overview files. By default, resolutions 3 to 7 (or capped at a lower value if outputresis lower). - overview_chunk_res (
int | list[int] | None) – The H3 resolution(s) to chunk the row groups within each overview file. By default, each overview file is chunked at overview resolution minus 5 (clamped between 0 and outputres). - max_rows_per_chunk (
int) – Maximum number of rows per chunk in the resulting data and overview files. If 0 (default),chunk_resandoverview_chunk_resare used. - include_source_url (
bool) – If True, include a"source_url"column in the output with the list of source URLs that contributed data to each H3 cell. Defaults to True. - target_chunk_size (
int | None) – Approximate number of pixel values to process per chunk in the extract step. Defaults to 10,000,000 for a single file or a few files. If ingesting more than 20 files, each file is processed as a single chunk by default; override with a specific value ortarget_chunk_size=0to always process each file as one chunk. - nodata (
int | float | None) – By default, the "nodata" value from the raster metadata is used to ignore pixels when ingesting. If missing or wrong in the raster metadata, override with this keyword. - debug_mode (
bool) – If True, run only the first two chunks for debugging. Defaults to False. - remove_tmp_files (
bool) – If True, remove temporary files after ingestion. Defaults to True. - tmp_path (
str | None) – Optional path for temporary files. If specified, the extract step is skipped and temporary files are assumed to already exist at this path. - overwrite (
bool) – If True, overwrite the output path if it already exists (by first removing existing content). Defaults to False. - steps (
list[str] | None) – The processing steps to run. Can include"extract","partition","metadata", and"overview". By default, all steps run. - extract_instance_type (
str | None) – Instance type for the extract step. By default, tries "realtime" then falls back to "large". - extract_max_workers (
int) – Maximum number of workers for the extract step. Defaults to 256. - partition_instance_type (
str | None) – Instance type for the partition step. By default, tries "realtime" then falls back to "large". - partition_max_workers (
int) – Maximum number of workers for the partition step. Defaults to 256. - overview_instance_type (
str | None) – Instance type for the overview step. By default, tries "realtime" then falls back to "large". - overview_max_workers (
int) – Maximum number of workers for the overview step. Defaults to 256. - **kwargs – Additional keyword arguments for
fused.submitfor the extract, partition, and overview steps (e.g.engine,instance_type,max_workers,n_processes_per_worker,max_retry). To run locally:run_ingest_raster_to_h3(..., engine="local").
The extract, partition and overview steps are run in parallel using
fused.submit(). By default, the function will first attempt to run this using
"realtime" instances, and retry any failed runs using "large" instances.
You can override this behavior by specifying the engine, instance_type,
max_workers, n_processes_per_worker, etc parameters as additional
keyword arguments to this function (**kwargs). If you want to specify
those per step, use extract_instance_type, partition_instance_type, etc.
For example, to run everything locally on the same machine where this
function runs, use:
run_ingest_raster_to_h3(..., engine="local")
run_partition_to_h3
run_partition_to_h3(
input_path: str | list[str],
output_path: str,
metrics: str | list[str] = "cnt",
groupby_cols: list[str] = ["hex", "data"],
window_cols: list[str] = ["hex"],
additional_cols: list[str] = [],
res: int | None = None,
chunk_res: int | None = None,
file_res: int | None = None,
overview_res: list[int] | None = None,
overview_chunk_res: int | list[int] | None = None,
max_rows_per_chunk: int = 0,
remove_tmp_files: bool = True,
overwrite: bool = False,
extract_kwargs: dict = {},
partition_kwargs: dict = {},
overview_kwargs: dict = {},
**kwargs
)
Run the H3 partitioning process.
This pipeline assumes that the input data already has a "hex" column with H3 cell IDs at the target resolution. It will then repartition the data spatially and add overview files. See run_ingest_raster_to_h3 for more on the processing steps and execution options.
Parameters:
- input_path (
str | list[str]) – Path(s) to the input data (file, directory, or list of file paths). The first step is parallelized per input file. - output_path (
str) – Path for the resulting Parquet dataset. - metrics (
str | list[str]) – Per H3 cell:"cnt"or a list of"avg","min","max","stddev","sum". - groupby_cols (
list[str]) – Columns defining groups for aggregated metrics. Default["hex", "data"]; must include"hex". Only for"cnt"metric. - window_cols (
list[str]) – Columns for window total counts (adds one output column per column). Default["hex"]. Only for"cnt"metric. - additional_cols (
list[str]) – Extra columns to include; assumed unique pergroupby_colsgroup. Default[]. Only for"cnt"metric. - res (
int | None) – H3 resolution; must be specified. - file_res (
int | None) – H3 resolution to chunk output files. Default inferred fromres. Usefile_res=-1for a single file. - chunk_res (
int | None) – H3 resolution to chunk row groups within each file (ignored whenmax_rows_per_chunkis set). Default inferred fromres. - overview_res (
list[int] | None) – Resolutions for overview files. Default 3–7 (capped byres). - overview_chunk_res (
int | list[int] | None) – Row-group chunk resolution(s) for overview files. Default: overview resolution minus 5 (clamped to 0–res). - max_rows_per_chunk (
int) – Max rows per chunk; 0 (default) useschunk_resandoverview_chunk_res. - remove_tmp_files (
bool) – Remove temporary files after run. Default True. - overwrite (
bool) – Overwrite output path if it exists. Default False. - extract_kwargs (
dict) – Extra arguments forfused.submitfor the extract step. - partition_kwargs (
dict) – Extra arguments forfused.submitfor the partition step. - overview_kwargs (
dict) – Extra arguments forfused.submitfor the overview step. - **kwargs – Extra arguments for all steps; overridden by
extract_kwargs,partition_kwargs,overview_kwargs.
See the docstring of run_ingest_raster_to_h3 for more details on the processing steps and how to configure the execution.