Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 60 additions & 38 deletions pyiceberg/table/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple

from pyiceberg.conversions import from_bytes
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
from pyiceberg.manifest import DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.table.snapshots import Snapshot, ancestors_of
from pyiceberg.types import PrimitiveType
Expand Down Expand Up @@ -288,64 +288,86 @@ def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table":

table_schema = pa.unify_schemas([partitions_schema, table_schema])

def update_partitions_map(
partitions_map: Dict[Tuple[str, Any], Any],
file: DataFile,
partition_record_dict: Dict[str, Any],
snapshot: Optional[Snapshot],
) -> None:
snapshot = self._get_snapshot(snapshot_id)
executor = ExecutorFactory.get_or_create()
local_partitions_maps = executor.map(self._process_manifest, snapshot.manifests(self.tbl.io))

partitions_map: Dict[Tuple[str, Any], Any] = {}
for local_map in local_partitions_maps:
for partition_record_key, partition_row in local_map.items():
if partition_record_key not in partitions_map:
partitions_map[partition_record_key] = partition_row
else:
existing = partitions_map[partition_record_key]
existing["record_count"] += partition_row["record_count"]
existing["file_count"] += partition_row["file_count"]
existing["total_data_file_size_in_bytes"] += partition_row["total_data_file_size_in_bytes"]
existing["position_delete_record_count"] += partition_row["position_delete_record_count"]
existing["position_delete_file_count"] += partition_row["position_delete_file_count"]
existing["equality_delete_record_count"] += partition_row["equality_delete_record_count"]
existing["equality_delete_file_count"] += partition_row["equality_delete_file_count"]

if partition_row["last_updated_at"] and (
not existing["last_updated_at"] or partition_row["last_updated_at"] > existing["last_updated_at"]
):
existing["last_updated_at"] = partition_row["last_updated_at"]
existing["last_updated_snapshot_id"] = partition_row["last_updated_snapshot_id"]

return pa.Table.from_pylist(
partitions_map.values(),
schema=table_schema,
)

def _process_manifest(self, manifest: ManifestFile) -> Dict[Tuple[str, Any], Any]:
partitions_map: Dict[Tuple[str, Any], Any] = {}
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
partition = entry.data_file.partition
partition_record_dict = {
field.name: partition[pos]
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
}
entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None

partition_record_key = _convert_to_hashable_type(partition_record_dict)
if partition_record_key not in partitions_map:
partitions_map[partition_record_key] = {
"partition": partition_record_dict,
"spec_id": file.spec_id,
"spec_id": entry.data_file.spec_id,
"record_count": 0,
"file_count": 0,
"total_data_file_size_in_bytes": 0,
"position_delete_record_count": 0,
"position_delete_file_count": 0,
"equality_delete_record_count": 0,
"equality_delete_file_count": 0,
"last_updated_at": snapshot.timestamp_ms if snapshot else None,
"last_updated_snapshot_id": snapshot.snapshot_id if snapshot else None,
"last_updated_at": entry_snapshot.timestamp_ms if entry_snapshot else None,
"last_updated_snapshot_id": entry_snapshot.snapshot_id if entry_snapshot else None,
}

partition_row = partitions_map[partition_record_key]

if snapshot is not None:
if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms:
partition_row["last_updated_at"] = snapshot.timestamp_ms
partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id
if entry_snapshot is not None:
if (
partition_row["last_updated_at"] is None
or partition_row["last_updated_snapshot_id"] < entry_snapshot.timestamp_ms
):
partition_row["last_updated_at"] = entry_snapshot.timestamp_ms
partition_row["last_updated_snapshot_id"] = entry_snapshot.snapshot_id

if file.content == DataFileContent.DATA:
partition_row["record_count"] += file.record_count
if entry.data_file.content == DataFileContent.DATA:
partition_row["record_count"] += entry.data_file.record_count
partition_row["file_count"] += 1
partition_row["total_data_file_size_in_bytes"] += file.file_size_in_bytes
elif file.content == DataFileContent.POSITION_DELETES:
partition_row["position_delete_record_count"] += file.record_count
partition_row["total_data_file_size_in_bytes"] += entry.data_file.file_size_in_bytes
elif entry.data_file.content == DataFileContent.POSITION_DELETES:
partition_row["position_delete_record_count"] += entry.data_file.record_count
partition_row["position_delete_file_count"] += 1
elif file.content == DataFileContent.EQUALITY_DELETES:
partition_row["equality_delete_record_count"] += file.record_count
elif entry.data_file.content == DataFileContent.EQUALITY_DELETES:
partition_row["equality_delete_record_count"] += entry.data_file.record_count
partition_row["equality_delete_file_count"] += 1
else:
raise ValueError(f"Unknown DataFileContent ({file.content})")

partitions_map: Dict[Tuple[str, Any], Any] = {}
snapshot = self._get_snapshot(snapshot_id)
for manifest in snapshot.manifests(self.tbl.io):
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
partition = entry.data_file.partition
partition_record_dict = {
field.name: partition[pos]
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
}
entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot)
raise ValueError(f"Unknown DataFileContent ({entry.data_file.content})")

return pa.Table.from_pylist(
partitions_map.values(),
schema=table_schema,
)
return partitions_map

def _get_manifests_schema(self) -> "pa.Schema":
import pyarrow as pa
Expand Down