Skip to content

RecordBatchTransformer: Handle schema migration and column re-ordering in table scans #602

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 11, 2024
Merged
Prev Previous commit
Next Next commit
feat: more performant handling of case where only schema transform is…
… required but columns can remain unmodified
  • Loading branch information
sdd committed Oct 3, 2024
commit 0ed57212fcff716ea500f3b3e55e14a7ef2b87df
103 changes: 90 additions & 13 deletions crates/iceberg/src/arrow/record_batch_transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,21 @@ enum BatchTransform {
// Indicates how each column in the target schema is derived.
operations: Vec<ColumnSource>,
},

// Sometimes only the schema will need modifying, for example when
// the column names have changed vs the file, but not the column types.
// we can avoid a heap allocation per RecordBach in this case by retaining
// the existing column Vec.
ModifySchema {
target_schema: Arc<ArrowSchema>,
},
}

#[derive(Debug)]
enum SchemaComparison {
Equivalent,
NameChangesOnly,
Different,
}

#[derive(Debug)]
Expand Down Expand Up @@ -134,7 +149,7 @@ impl RecordBatchTransformer {
&mut self,
record_batch: RecordBatch,
) -> Result<RecordBatch> {
Ok(match self.batch_transform {
Ok(match &self.batch_transform {
Some(BatchTransform::PassThrough) => record_batch,
Some(BatchTransform::Modify {
ref target_schema,
Expand All @@ -143,6 +158,9 @@ impl RecordBatchTransformer {
target_schema.clone(),
self.transform_columns(record_batch.columns(), operations)?,
)?,
Some(BatchTransform::ModifySchema { target_schema }) => {
record_batch.with_schema(target_schema.clone())?
}
None => {
self.batch_transform = Some(Self::generate_batch_transform(
record_batch.schema_ref(),
Expand All @@ -168,8 +186,6 @@ impl RecordBatchTransformer {
projected_iceberg_field_ids: &[i32],
) -> Result<BatchTransform> {
let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?);
let field_id_to_source_schema_map =
Self::build_field_id_to_arrow_schema_map(source_schema)?;
let field_id_to_mapped_schema_map =
Self::build_field_id_to_arrow_schema_map(&mapped_unprojected_arrow_schema)?;

Expand All @@ -186,12 +202,78 @@ impl RecordBatchTransformer {
})
.collect();

let target_schema = ArrowSchema::new(fields?);
if target_schema == *source_schema.as_ref() {
return Ok(BatchTransform::PassThrough);
let target_schema = Arc::new(ArrowSchema::new(fields?));

match Self::compare_schemas(source_schema, &target_schema) {
SchemaComparison::Equivalent => Ok(BatchTransform::PassThrough),
SchemaComparison::NameChangesOnly => Ok(BatchTransform::ModifySchema { target_schema }),
SchemaComparison::Different => Ok(BatchTransform::Modify {
operations: Self::generate_transform_operations(
source_schema,
snapshot_schema,
projected_iceberg_field_ids,
field_id_to_mapped_schema_map,
)?,
target_schema,
}),
}
}

/// Compares the source and target schemas
/// Determines if they have changed in any meaningful way:
/// * If they have different numbers of fields, then we need to modify
/// the incoming RecordBatch schema AND columns
/// * If they have the same number of fields, but some of them differ in
/// either data type or nullability, then we need to modify the
/// incoming RecordBatch schema AND columns
/// * If the schemas differ only in the column names, then we need
/// to modify the RecordBatch schema BUT we can keep the
/// original column data unmodified
/// * If the schemas are identical (or differ only in inconsequential
/// ways) then we can pass through the original RecordBatch unmodified
fn compare_schemas(
source_schema: &ArrowSchemaRef,
target_schema: &ArrowSchemaRef,
) -> SchemaComparison {
if source_schema.fields().len() != target_schema.fields().len() {
return SchemaComparison::Different;
}

let mut names_changed = false;

for (source_field, target_field) in source_schema
.fields()
.iter()
.zip(target_schema.fields().iter())
{
if source_field.data_type() != target_field.data_type()
|| source_field.is_nullable() != target_field.is_nullable()
{
return SchemaComparison::Different;
}

if source_field.name() != target_field.name() {
names_changed = true;
}
}

let operations: Result<Vec<_>> = projected_iceberg_field_ids.iter().map(|field_id|{
if names_changed {
SchemaComparison::NameChangesOnly
} else {
SchemaComparison::Equivalent
}
}

fn generate_transform_operations(
source_schema: &ArrowSchemaRef,
snapshot_schema: &IcebergSchema,
projected_iceberg_field_ids: &[i32],
field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
) -> Result<Vec<ColumnSource>> {
let field_id_to_source_schema_map =
Self::build_field_id_to_arrow_schema_map(source_schema)?;

projected_iceberg_field_ids.iter().map(|field_id|{
let (target_field, _) = field_id_to_mapped_schema_map.get(field_id).ok_or(
Error::new(ErrorKind::Unexpected, "could not find field in schema")
)?;
Expand Down Expand Up @@ -237,12 +319,7 @@ impl RecordBatchTransformer {
target_type: target_type.clone(),
}
})
}).collect();

Ok(BatchTransform::Modify {
operations: operations?,
target_schema: Arc::new(target_schema),
})
}).collect()
}

fn build_field_id_to_arrow_schema_map(
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

#[macro_use]
extern crate derive_builder;
extern crate core;

mod error;
pub use error::{Error, ErrorKind, Result};
Expand Down