Skip to content
127 changes: 106 additions & 21 deletions pyiceberg/table/update/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Iterator, Optional
from typing import Iterator, Optional, Set

from pyiceberg.exceptions import ValidationException
from pyiceberg.expressions import BooleanExpression
from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
from pyiceberg.typedef import Record

VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE}
VALIDATE_DATA_FILES_EXIST_OPERATIONS: Set[Operation] = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE}
VALIDATE_ADDED_DATA_FILES_OPERATIONS: Set[Operation] = {Operation.APPEND, Operation.OVERWRITE}


def _validation_history(
Expand Down Expand Up @@ -77,6 +79,47 @@ def _validation_history(
return manifests_files, snapshots


def _filter_manifest_entries(
entry: ManifestEntry,
snapshot_ids: set[int],
data_filter: Optional[BooleanExpression],
partition_set: Optional[dict[int, set[Record]]],
entry_status: Optional[ManifestEntryStatus],
schema: Schema,
) -> bool:
"""Filter manifest entries based on data filter and partition set.

Args:
entry: Manifest entry to filter
snapshot_ids: set of snapshot ids to match data files
data_filter: Optional filter to match data files
partition_set: Optional set of partitions to match data files
entry_status: Optional status to match data files
schema: schema for filtering

Returns:
True if the entry should be included, False otherwise
"""
if entry.snapshot_id not in snapshot_ids:
return False

if entry_status is not None and entry.status != entry_status:
return False

if data_filter is not None:
evaluator = _InclusiveMetricsEvaluator(schema, data_filter)
if evaluator.eval(entry.data_file) is ROWS_CANNOT_MATCH:
return False

if partition_set is not None:
partition = entry.data_file.partition
spec_id = entry.data_file.spec_id
if spec_id not in partition_set or partition not in partition_set[spec_id]:
return False

return True


def _deleted_data_files(
table: Table,
starting_snapshot: Snapshot,
Expand Down Expand Up @@ -108,27 +151,12 @@ def _deleted_data_files(
ManifestContent.DATA,
)

if data_filter is not None:
evaluator = _InclusiveMetricsEvaluator(table.schema(), data_filter).eval

for manifest in manifests:
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False):
if entry.snapshot_id not in snapshot_ids:
continue

if entry.status != ManifestEntryStatus.DELETED:
continue

if data_filter is not None and evaluator(entry.data_file) is ROWS_CANNOT_MATCH:
continue

if partition_set is not None:
spec_id = entry.data_file.spec_id
partition = entry.data_file.partition
if spec_id not in partition_set or partition not in partition_set[spec_id]:
continue

yield entry
if _filter_manifest_entries(
entry, snapshot_ids, data_filter, partition_set, ManifestEntryStatus.DELETED, table.schema()
):
yield entry
Comment on lines 154 to +159
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore this comment, but a missed opportunity for some syntactic sugar:

for manifest in manifests:
    yield from (
        entry
        for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False)
        if _filter_manifest_entries(
            entry, snapshot_ids, data_filter, partition_set, ManifestEntryStatus.DELETED, table.schema()
        )
    )



def _validate_deleted_data_files(
Expand All @@ -150,3 +178,60 @@ def _validate_deleted_data_files(
if any(conflicting_entries):
conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries}
raise ValidationException(f"Deleted data files were found matching the filter for snapshots {conflicting_snapshots}!")


def _added_data_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: Optional[BooleanExpression],
partition_set: Optional[dict[int, set[Record]]],
parent_snapshot: Optional[Snapshot],
) -> Iterator[ManifestEntry]:
"""Return manifest entries for data files added between the starting snapshot and parent snapshot.

Args:
table: Table to get the history from
starting_snapshot: Starting snapshot to get the history from
data_filter: Optional filter to match data files
partition_set: Optional set of partitions to match data files
parent_snapshot: Parent snapshot to get the history from

Returns:
Iterator of manifest entries for added data files matching the conditions
"""
if parent_snapshot is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we introduce a doc string for each function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this if makes sense, why not make the parent_snapshot non-optional?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return

manifests, snapshot_ids = _validation_history(
table,
parent_snapshot,
starting_snapshot,
VALIDATE_ADDED_DATA_FILES_OPERATIONS,
ManifestContent.DATA,
)

for manifest in manifests:
for entry in manifest.fetch_manifest_entry(table.io):
if _filter_manifest_entries(entry, snapshot_ids, data_filter, partition_set, None, table.schema()):
yield entry


def _validate_added_data_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: Optional[BooleanExpression],
parent_snapshot: Optional[Snapshot],
) -> None:
"""Validate that no files matching a filter have been added to the table since a starting snapshot.

Args:
table: Table to validate
starting_snapshot: Snapshot current at the start of the operation
data_filter: Expression used to find added data files
parent_snapshot: Ending snapshot on the branch being validated

"""
conflicting_entries = _added_data_files(table, starting_snapshot, data_filter, None, parent_snapshot)
if any(conflicting_entries):
conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries if entry.snapshot_id is not None}
raise ValidationException(f"Added data files were found matching the filter for snapshots {conflicting_snapshots}!")
135 changes: 134 additions & 1 deletion tests/table/test_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.table import Table
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
from pyiceberg.table.update.validate import _deleted_data_files, _validate_deleted_data_files, _validation_history
from pyiceberg.table.update.validate import (
_added_data_files,
_deleted_data_files,
_validate_added_data_files,
_validate_deleted_data_files,
_validation_history,
)


@pytest.fixture
Expand Down Expand Up @@ -217,3 +223,130 @@ class DummyEntry:
data_filter=None,
parent_snapshot=oldest_snapshot,
)


@pytest.mark.parametrize("operation", [Operation.APPEND, Operation.OVERWRITE])
def test_validate_added_data_files_conflicting_count(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
operation: Operation,
) -> None:
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests

snapshot_history = 100
snapshots = table.snapshots()
for i in range(1, snapshot_history + 1):
altered_snapshot = snapshots[-i]
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)})
snapshots[-i] = altered_snapshot

table.metadata = table.metadata.model_copy(
update={"snapshots": snapshots},
)

oldest_snapshot = table.snapshots()[-snapshot_history]
newest_snapshot = cast(Snapshot, table.current_snapshot())

def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
"""Mock the manifests method to use the snapshot_id for lookup."""
snapshot_id = self.snapshot_id
if snapshot_id in mock_manifests:
return mock_manifests[snapshot_id]
return []

def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]:
return [
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=self.added_snapshot_id,
)
]

with (
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry),
):
result = list(
_added_data_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
parent_snapshot=oldest_snapshot,
partition_set=None,
)
)

# since we only look at the ManifestContent.Data files
assert len(result) == snapshot_history / 2


@pytest.mark.parametrize("operation", [Operation.DELETE, Operation.REPLACE])
def test_validate_added_data_files_non_conflicting_count(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
operation: Operation,
) -> None:
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests

snapshot_history = 100
snapshots = table.snapshots()
for i in range(1, snapshot_history + 1):
altered_snapshot = snapshots[-i]
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)})
snapshots[-i] = altered_snapshot

table.metadata = table.metadata.model_copy(
update={"snapshots": snapshots},
)

oldest_snapshot = table.snapshots()[-snapshot_history]
newest_snapshot = cast(Snapshot, table.current_snapshot())

def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
"""Mock the manifests method to use the snapshot_id for lookup."""
snapshot_id = self.snapshot_id
if snapshot_id in mock_manifests:
return mock_manifests[snapshot_id]
return []

def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]:
return [
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=self.added_snapshot_id,
)
]

with (
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry),
):
result = list(
_added_data_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
parent_snapshot=oldest_snapshot,
partition_set=None,
)
)

assert len(result) == 0


def test_validate_added_data_files_raises_on_conflict(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
) -> None:
table, _ = table_v2_with_extensive_snapshots_and_manifests
oldest_snapshot = table.snapshots()[0]
newest_snapshot = cast(Snapshot, table.current_snapshot())

class DummyEntry:
snapshot_id = 123

with patch("pyiceberg.table.update.validate._added_data_files", return_value=[DummyEntry()]):
with pytest.raises(ValidationException):
_validate_added_data_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
parent_snapshot=oldest_snapshot,
)