Skip to content

Commit 924d6c5

Browse files
committed
introduce getters for partition
1 parent f95a5af commit 924d6c5

File tree

12 files changed

+49
-36
lines changed

12 files changed

+49
-36
lines changed

datafusion_iceberg/src/pruning_statistics.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,12 @@ impl<'table, 'manifests> PruneManifests<'table, 'manifests> {
4040

4141
impl<'table, 'manifests> PruningStatistics for PruneManifests<'table, 'manifests> {
4242
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
43-
let partition_spec = &self.table.metadata().default_partition_spec().ok()?.fields;
43+
let partition_spec = &self
44+
.table
45+
.metadata()
46+
.default_partition_spec()
47+
.ok()?
48+
.fields();
4449
let schema = self.table.current_schema(None).ok()?;
4550
let (index, partition_field) = partition_spec
4651
.iter()
@@ -67,7 +72,7 @@ impl<'table, 'manifests> PruningStatistics for PruneManifests<'table, 'manifests
6772
let partition_spec = self.table.metadata().default_partition_spec().ok()?;
6873
let schema = self.table.current_schema(None).ok()?;
6974
let (index, partition_field) = partition_spec
70-
.fields
75+
.fields()
7176
.iter()
7277
.enumerate()
7378
.find(|(_, partition_field)| partition_field.name() == &column.name)?;
@@ -94,7 +99,7 @@ impl<'table, 'manifests> PruningStatistics for PruneManifests<'table, 'manifests
9499
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
95100
let partition_spec = self.table.metadata().default_partition_spec().ok()?;
96101
let (index, _) = partition_spec
97-
.fields
102+
.fields()
98103
.iter()
99104
.enumerate()
100105
.find(|(_, partition_field)| partition_field.name() == &column.name)?;

datafusion_iceberg/src/table.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ async fn table_scan(
283283
.metadata()
284284
.default_partition_spec()
285285
.map_err(Error::from)?
286-
.fields
286+
.fields()
287287
.iter()
288288
.map(|x| {
289289
Ok(schema
@@ -450,7 +450,7 @@ async fn table_scan(
450450
.metadata()
451451
.default_partition_spec()
452452
.map_err(Into::<Error>::into)?
453-
.fields
453+
.fields()
454454
.iter()
455455
.map(|field| {
456456
let struct_field = schema.fields().get(*field.source_id() as usize).unwrap();
@@ -473,7 +473,7 @@ async fn table_scan(
473473
for field in schema.fields().iter() {
474474
schema_builder.with_struct_field(field.clone());
475475
}
476-
for partition_field in &table.metadata().default_partition_spec().unwrap().fields {
476+
for partition_field in table.metadata().default_partition_spec().unwrap().fields() {
477477
schema_builder.with_struct_field(StructField {
478478
id: *partition_field.field_id(),
479479
name: partition_field.name().clone(),

iceberg-rust-spec/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,7 @@ pub enum Error {
8282
/// structype builder
8383
#[error(transparent)]
8484
StructTypeBuilder(#[from] crate::spec::types::StructTypeBuilderError),
85+
/// partition spec builder
86+
#[error(transparent)]
87+
PartitionSpec(#[from] crate::spec::partition::PartitionSpecBuilderError),
8588
}

iceberg-rust-spec/src/spec/manifest.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,10 @@ impl<'a, R: Read> ManifestReader<'a, R> {
9696
.transpose()?
9797
.unwrap_or("0".to_string())
9898
.parse()?;
99-
let partition_spec = PartitionSpec {
100-
spec_id,
101-
fields: partition_fields,
102-
};
99+
let partition_spec = PartitionSpec::builder()
100+
.with_spec_id(spec_id)
101+
.with_fields(partition_fields)
102+
.build()?;
103103
Ok(Self {
104104
reader: reader
105105
.zip(repeat(Arc::new((schema, partition_spec, format_version))))
@@ -155,12 +155,12 @@ impl<'a, W: std::io::Write> ManifestWriter<'a, W> {
155155

156156
avro_writer.add_user_metadata(
157157
"partition-spec".to_string(),
158-
serde_json::to_string(&table_metadata.default_partition_spec()?.fields)?,
158+
serde_json::to_string(&table_metadata.default_partition_spec()?.fields())?,
159159
)?;
160160

161161
avro_writer.add_user_metadata(
162162
"partition-spec-id".to_string(),
163-
serde_json::to_string(&table_metadata.default_partition_spec()?.spec_id)?,
163+
serde_json::to_string(&table_metadata.default_partition_spec()?.spec_id())?,
164164
)?;
165165

166166
Ok(ManifestWriter(avro_writer))
@@ -684,7 +684,7 @@ impl DataFile {
684684
file_format: value.file_format,
685685
partition: value
686686
.partition
687-
.cast(schema.fields(), &partition_spec.fields)?,
687+
.cast(schema.fields(), &partition_spec.fields())?,
688688
record_count: value.record_count,
689689
file_size_in_bytes: value.file_size_in_bytes,
690690
column_sizes: value.column_sizes,
@@ -718,7 +718,7 @@ impl DataFile {
718718
file_format: value.file_format,
719719
partition: value
720720
.partition
721-
.cast(schema.fields(), &partition_spec.fields)?,
721+
.cast(schema.fields(), &partition_spec.fields())?,
722722
record_count: value.record_count,
723723
file_size_in_bytes: value.file_size_in_bytes,
724724
column_sizes: value.column_sizes,
@@ -1576,7 +1576,7 @@ mod tests {
15761576
};
15771577

15781578
let partition_schema = partition_value_schema(
1579-
&table_metadata.default_partition_spec().unwrap().fields,
1579+
&table_metadata.default_partition_spec().unwrap().fields(),
15801580
table_metadata.current_schema(None).unwrap(),
15811581
)
15821582
.unwrap();
@@ -1706,7 +1706,7 @@ mod tests {
17061706
};
17071707

17081708
let partition_schema = partition_value_schema(
1709-
&table_metadata.default_partition_spec().unwrap().fields,
1709+
&table_metadata.default_partition_spec().unwrap().fields(),
17101710
table_metadata.current_schema(None).unwrap(),
17111711
)
17121712
.unwrap();
@@ -1795,7 +1795,7 @@ mod tests {
17951795
.unwrap();
17961796

17971797
let raw_schema =
1798-
partition_value_schema(&spec.fields, &table_schema.try_into().unwrap()).unwrap();
1798+
partition_value_schema(&spec.fields(), &table_schema.try_into().unwrap()).unwrap();
17991799

18001800
let schema = apache_avro::Schema::parse_str(&raw_schema).unwrap();
18011801

iceberg-rust-spec/src/spec/partition.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,19 +136,23 @@ impl PartitionField {
136136
}
137137
}
138138

139-
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)]
139+
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder, Getters)]
140140
#[serde(rename_all = "kebab-case")]
141141
#[builder(setter(prefix = "with"))]
142142
/// Partition spec that defines how to produce a tuple of partition values from a record.
143143
pub struct PartitionSpec {
144144
/// Identifier for PartitionSpec
145-
pub spec_id: i32,
145+
spec_id: i32,
146146
/// Details of the partition spec
147147
#[builder(setter(each(name = "with_partition_field")))]
148-
pub fields: Vec<PartitionField>,
148+
fields: Vec<PartitionField>,
149149
}
150150

151151
impl PartitionSpec {
152+
/// Create partition spec builder
153+
pub fn builder() -> PartitionSpecBuilder {
154+
PartitionSpecBuilder::default()
155+
}
152156
/// Get datatypes of partition fields
153157
pub fn data_types(&self, schema: &StructType) -> Result<Vec<Type>, Error> {
154158
self.fields

iceberg-rust-spec/src/spec/table_metadata.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ mod _serde {
465465
}?,
466466
schemas,
467467
partition_specs: HashMap::from_iter(
468-
value.partition_specs.into_iter().map(|x| (x.spec_id, x)),
468+
value.partition_specs.into_iter().map(|x| (*x.spec_id(), x)),
469469
),
470470
default_spec_id: value.default_spec_id,
471471
last_partition_id: value.last_partition_id,
@@ -517,13 +517,14 @@ mod _serde {
517517
value
518518
.partition_specs
519519
.unwrap_or_else(|| {
520-
vec![PartitionSpec {
521-
spec_id: DEFAULT_SPEC_ID,
522-
fields: value.partition_spec,
523-
}]
520+
vec![PartitionSpec::builder()
521+
.with_spec_id(DEFAULT_SPEC_ID)
522+
.with_fields(value.partition_spec)
523+
.build()
524+
.unwrap()]
524525
})
525526
.into_iter()
526-
.map(|x| (x.spec_id, x)),
527+
.map(|x| (*x.spec_id(), x)),
527528
);
528529
Ok(TableMetadata {
529530
format_version: FormatVersion::V1,
@@ -630,7 +631,7 @@ mod _serde {
630631
partition_spec: v
631632
.partition_specs
632633
.get(&v.default_spec_id)
633-
.map(|x| x.fields.clone())
634+
.map(|x| x.fields().clone())
634635
.unwrap_or_default(),
635636
partition_specs: Some(v.partition_specs.into_values().collect()),
636637
default_spec_id: Some(v.default_spec_id),

iceberg-rust/src/arrow/partition.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ pub async fn partition_record_batches(
5454
let partition_sender = partition_sender.clone();
5555
async move {
5656
let partition_columns: Vec<ArrayRef> = partition_spec
57-
.fields
57+
.fields()
5858
.iter()
5959
.map(|field| {
6060
let column_name = &schema

iceberg-rust/src/arrow/write.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ async fn write_parquet_files(
166166
size,
167167
&metadata,
168168
schema,
169-
&partition_spec.fields,
169+
&partition_spec.fields(),
170170
)?)
171171
}
172172
})

iceberg-rust/src/catalog/commit.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ pub fn apply_table_updates(
325325
metadata.current_schema_id = schema_id;
326326
}
327327
TableUpdate::AddPartitionSpec { spec } => {
328-
metadata.partition_specs.insert(spec.spec_id, spec);
328+
metadata.partition_specs.insert(*spec.spec_id(), spec);
329329
}
330330
TableUpdate::SetDefaultSpec { spec_id } => {
331331
metadata.default_spec_id = spec_id;

iceberg-rust/src/materialized_view/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ impl MaterializedView {
182182
let partition_columns = Arc::new(
183183
old_storage_table_metadata
184184
.default_partition_spec()?
185-
.fields
185+
.fields()
186186
.iter()
187187
.map(|x| schema.fields().get(*x.source_id() as usize))
188188
.collect::<Option<Vec<_>>>()

0 commit comments

Comments
 (0)