Skip to content

Add support for Bodo DataFrame #2167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1523,6 +1523,52 @@ print(ray_dataset.take(2))
]
```

### Bodo

PyIceberg interfaces closely with Bodo Dataframes (see [Bodo Iceberg Quick Start](https://docs.bodo.ai/latest/quick_start/quickstart_local_iceberg/)),
which provides a drop-in replacement for Pandas that applies query, compiler and HPC optimizations automatically.
Bodo accelerates and scales Python code from single laptops to large clusters without code rewrites.

<!-- prettier-ignore-start -->

!!! note "Requirements"
This requires [`bodo` to be installed](index.md).

```python
pip install pyiceberg['bodo']
```
<!-- prettier-ignore-end -->

A table can be read easily into a Bodo Dataframe to perform Pandas operations:

```python
df = table.to_bodo() # equivalent to `bodo.pandas.read_iceberg_table(table)`
df = df[df["trip_distance"] >= 10.0]
df = df[["VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"]]
print(df)
```

This creates a lazy query, optimizes it, and runs it on all available cores (print triggers execution):

```python
VendorID tpep_pickup_datetime tpep_dropoff_datetime
0 2 2023-01-01 00:27:12 2023-01-01 00:49:56
1 2 2023-01-01 00:09:29 2023-01-01 00:29:23
2 1 2023-01-01 00:13:30 2023-01-01 00:44:00
3 2 2023-01-01 00:41:41 2023-01-01 01:19:32
4 2 2023-01-01 00:22:39 2023-01-01 01:30:45
... ... ... ...
245478 2 2023-01-31 22:32:57 2023-01-31 23:01:48
245479 2 2023-01-31 22:03:26 2023-01-31 22:46:13
245480 2 2023-01-31 23:25:56 2023-02-01 00:05:42
245481 2 2023-01-31 23:18:00 2023-01-31 23:46:00
245482 2 2023-01-31 23:18:00 2023-01-31 23:41:00

[245483 rows x 3 columns]
```

Bodo is optimized to take advantage of Iceberg features such as hidden partitioning and various statistics for efficient reads.

### Daft

PyIceberg interfaces closely with Daft Dataframes (see also: [Daft integration with Iceberg](https://docs.daft.ai/en/stable/io/iceberg/)) which provides a full lazily optimized query engine interface on top of PyIceberg tables.
Expand Down
1 change: 1 addition & 0 deletions mkdocs/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ You can mix and match optional dependencies depending on your needs:
| pandas | Installs both PyArrow and Pandas |
| duckdb | Installs both PyArrow and DuckDB |
| ray | Installs PyArrow, Pandas, and Ray |
| bodo | Installs Bodo |
| daft | Installs Daft |
| polars | Installs Polars |
| s3fs | S3FS as a FileIO implementation to interact with the object store |
Expand Down
1,174 changes: 694 additions & 480 deletions poetry.lock

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
from pyiceberg.utils.properties import property_as_bool

if TYPE_CHECKING:
import bodo.pandas as bd
import daft
import pandas as pd
import polars as pl
Expand Down Expand Up @@ -1485,6 +1486,16 @@ def to_daft(self) -> daft.DataFrame:

return daft.read_iceberg(self)

def to_bodo(self) -> bd.DataFrame:
"""Read a bodo DataFrame lazily from this Iceberg table.

Returns:
bd.DataFrame: Unmaterialized Bodo Dataframe created from the Iceberg table
"""
import bodo.pandas as bd

return bd.read_iceberg_table(self)

def to_polars(self) -> pl.LazyFrame:
"""Lazily read from this Apache Iceberg table.

Expand Down
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ gcsfs = { version = ">=2023.1.0", optional = true }
huggingface-hub = { version = ">=0.24.0", optional = true }
psycopg2-binary = { version = ">=2.9.6", optional = true }
sqlalchemy = { version = "^2.0.18", optional = true }
bodo = { version = ">=2025.7.4", optional = true }
daft = { version = ">=0.5.0", optional = true }
cachetools = ">=5.5,<7.0"
pyiceberg-core = { version = "^0.5.1", optional = true }
Expand Down Expand Up @@ -298,6 +299,7 @@ pyarrow = ["pyarrow", "pyiceberg-core"]
pandas = ["pandas", "pyarrow"]
duckdb = ["duckdb", "pyarrow"]
ray = ["ray", "pyarrow", "pandas"]
bodo = ["bodo"]
daft = ["daft"]
polars = ["polars"]
snappy = ["python-snappy"]
Expand Down Expand Up @@ -483,6 +485,10 @@ ignore_missing_imports = true
module = "daft.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "bodo.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "pyparsing.*"
ignore_missing_imports = true
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,16 @@ def test_daft_nan_rewritten(catalog: Catalog) -> None:
assert math.isnan(df.to_pydict()["col_numeric"][0])


@pytest.mark.integration
@pytest.mark.filterwarnings("ignore")
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_bodo_nan(catalog: Catalog) -> None:
table_test_null_nan_rewritten = catalog.load_table("default.test_null_nan_rewritten")
df = table_test_null_nan_rewritten.to_bodo()
assert len(df) == 3
assert math.isnan(df.col_numeric.iloc[0])


@pytest.mark.integration
@pytest.mark.filterwarnings("ignore")
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
Expand Down
13 changes: 9 additions & 4 deletions tests/integration/test_writes/test_partitioned_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,11 @@ def test_dynamic_partition_overwrite_unpartitioned_evolve_to_identity_transform(

@pytest.mark.integration
def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
import pyarrow
from packaging import version

under_20_arrow = version.parse(pyarrow.__version__) < version.parse("20.0.0")

Comment on lines +457 to +458
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should find another way to make these tests pass instead of branching on pyarrow version

Copy link
Author

@ehsantn ehsantn Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any ideas? Maybe use a range of "safe" values instead of a single file size value? I'd be happy to open another PR if there is more work for this.

Bodo is currently pinned to Arrow 19 since the current release version of PyIceberg supports up to Arrow 19. Bodo uses Arrow C++, which currently requires pinning to a single version for pip wheels to work (conda-forge builds against 4 latest Arrow versions in this case but pip doesn't support this yet). It'd be great if PyIceberg wouldn't set an upper version for Arrow if possible.

identifier = "default.arrow_table_summaries"

try:
Expand Down Expand Up @@ -547,14 +552,14 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro
"total-records": "6",
}
assert summaries[5] == {
"removed-files-size": "16174",
"removed-files-size": "15774" if under_20_arrow else "16174",
"changed-partition-count": "2",
"total-equality-deletes": "0",
"deleted-data-files": "4",
"total-position-deletes": "0",
"total-delete-files": "0",
"deleted-records": "4",
"total-files-size": "8884",
"total-files-size": "8684" if under_20_arrow else "8884",
"total-data-files": "2",
"total-records": "2",
}
Expand All @@ -564,9 +569,9 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro
"total-equality-deletes": "0",
"added-records": "2",
"total-position-deletes": "0",
"added-files-size": "8087",
"added-files-size": "7887" if under_20_arrow else "8087",
"total-delete-files": "0",
"total-files-size": "16971",
"total-files-size": "16571" if under_20_arrow else "16971",
"total-data-files": "4",
"total-records": "4",
}
Expand Down
15 changes: 10 additions & 5 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi

@pytest.mark.integration
def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catalog) -> None:
import pyarrow
from packaging import version

under_20_arrow = version.parse(pyarrow.__version__) < version.parse("20.0.0")

identifier = "default.test_summaries_partial_overwrite"
TEST_DATA = {
"id": [1, 2, 3, 1, 1],
Expand Down Expand Up @@ -311,13 +316,13 @@ def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catal
# APPEND
assert summaries[0] == {
"added-data-files": "3",
"added-files-size": "2618",
"added-files-size": "2570" if under_20_arrow else "2618",
"added-records": "5",
"changed-partition-count": "3",
"total-data-files": "3",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": "2618",
"total-files-size": "2570" if under_20_arrow else "2618",
"total-position-deletes": "0",
"total-records": "5",
}
Expand Down Expand Up @@ -346,16 +351,16 @@ def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catal
assert len(files) == 3
assert summaries[1] == {
"added-data-files": "1",
"added-files-size": "875",
"added-files-size": "859" if under_20_arrow else "875",
"added-records": "2",
"changed-partition-count": "1",
"deleted-data-files": "1",
"deleted-records": "3",
"removed-files-size": "882",
"removed-files-size": "866" if under_20_arrow else "882",
"total-data-files": "3",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": "2611",
"total-files-size": "2563" if under_20_arrow else "2611",
"total-position-deletes": "0",
"total-records": "4",
}
Expand Down