diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 3bb0268a05..c3aa870977 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple from pyiceberg.conversions import from_bytes -from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary +from pyiceberg.manifest import DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary from pyiceberg.partitioning import PartitionSpec from pyiceberg.table.snapshots import Snapshot, ancestors_of from pyiceberg.types import PrimitiveType @@ -288,17 +288,51 @@ def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table": table_schema = pa.unify_schemas([partitions_schema, table_schema]) - def update_partitions_map( - partitions_map: Dict[Tuple[str, Any], Any], - file: DataFile, - partition_record_dict: Dict[str, Any], - snapshot: Optional[Snapshot], - ) -> None: + snapshot = self._get_snapshot(snapshot_id) + executor = ExecutorFactory.get_or_create() + local_partitions_maps = executor.map(self._process_manifest, snapshot.manifests(self.tbl.io)) + + partitions_map: Dict[Tuple[str, Any], Any] = {} + for local_map in local_partitions_maps: + for partition_record_key, partition_row in local_map.items(): + if partition_record_key not in partitions_map: + partitions_map[partition_record_key] = partition_row + else: + existing = partitions_map[partition_record_key] + existing["record_count"] += partition_row["record_count"] + existing["file_count"] += partition_row["file_count"] + existing["total_data_file_size_in_bytes"] += partition_row["total_data_file_size_in_bytes"] + existing["position_delete_record_count"] += partition_row["position_delete_record_count"] + existing["position_delete_file_count"] += partition_row["position_delete_file_count"] + existing["equality_delete_record_count"] += partition_row["equality_delete_record_count"] + existing["equality_delete_file_count"] += partition_row["equality_delete_file_count"] + + if partition_row["last_updated_at"] and ( + not existing["last_updated_at"] or partition_row["last_updated_at"] > existing["last_updated_at"] + ): + existing["last_updated_at"] = partition_row["last_updated_at"] + existing["last_updated_snapshot_id"] = partition_row["last_updated_snapshot_id"] + + return pa.Table.from_pylist( + partitions_map.values(), + schema=table_schema, + ) + + def _process_manifest(self, manifest: ManifestFile) -> Dict[Tuple[str, Any], Any]: + partitions_map: Dict[Tuple[str, Any], Any] = {} + for entry in manifest.fetch_manifest_entry(io=self.tbl.io): + partition = entry.data_file.partition + partition_record_dict = { + field.name: partition[pos] + for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) + } + entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None + partition_record_key = _convert_to_hashable_type(partition_record_dict) if partition_record_key not in partitions_map: partitions_map[partition_record_key] = { "partition": partition_record_dict, - "spec_id": file.spec_id, + "spec_id": entry.data_file.spec_id, "record_count": 0, "file_count": 0, "total_data_file_size_in_bytes": 0, @@ -306,46 +340,34 @@ def update_partitions_map( "position_delete_file_count": 0, "equality_delete_record_count": 0, "equality_delete_file_count": 0, - "last_updated_at": snapshot.timestamp_ms if snapshot else None, - "last_updated_snapshot_id": snapshot.snapshot_id if snapshot else None, + "last_updated_at": entry_snapshot.timestamp_ms if entry_snapshot else None, + "last_updated_snapshot_id": entry_snapshot.snapshot_id if entry_snapshot else None, } partition_row = partitions_map[partition_record_key] - if snapshot is not None: - if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms: - partition_row["last_updated_at"] = snapshot.timestamp_ms - partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id + if entry_snapshot is not None: + if ( + partition_row["last_updated_at"] is None + or partition_row["last_updated_snapshot_id"] < entry_snapshot.timestamp_ms + ): + partition_row["last_updated_at"] = entry_snapshot.timestamp_ms + partition_row["last_updated_snapshot_id"] = entry_snapshot.snapshot_id - if file.content == DataFileContent.DATA: - partition_row["record_count"] += file.record_count + if entry.data_file.content == DataFileContent.DATA: + partition_row["record_count"] += entry.data_file.record_count partition_row["file_count"] += 1 - partition_row["total_data_file_size_in_bytes"] += file.file_size_in_bytes - elif file.content == DataFileContent.POSITION_DELETES: - partition_row["position_delete_record_count"] += file.record_count + partition_row["total_data_file_size_in_bytes"] += entry.data_file.file_size_in_bytes + elif entry.data_file.content == DataFileContent.POSITION_DELETES: + partition_row["position_delete_record_count"] += entry.data_file.record_count partition_row["position_delete_file_count"] += 1 - elif file.content == DataFileContent.EQUALITY_DELETES: - partition_row["equality_delete_record_count"] += file.record_count + elif entry.data_file.content == DataFileContent.EQUALITY_DELETES: + partition_row["equality_delete_record_count"] += entry.data_file.record_count partition_row["equality_delete_file_count"] += 1 else: - raise ValueError(f"Unknown DataFileContent ({file.content})") - - partitions_map: Dict[Tuple[str, Any], Any] = {} - snapshot = self._get_snapshot(snapshot_id) - for manifest in snapshot.manifests(self.tbl.io): - for entry in manifest.fetch_manifest_entry(io=self.tbl.io): - partition = entry.data_file.partition - partition_record_dict = { - field.name: partition[pos] - for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) - } - entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None - update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot) + raise ValueError(f"Unknown DataFileContent ({entry.data_file.content})") - return pa.Table.from_pylist( - partitions_map.values(), - schema=table_schema, - ) + return partitions_map def _get_manifests_schema(self) -> "pa.Schema": import pyarrow as pa