Skip to content

Commit 1c29aa6

Browse files
JustinRush80ion-elgreco
authored andcommitted
added gc valid check
Signed-off-by: JustinRush80 <[email protected]>
1 parent a99e502 commit 1c29aa6

File tree

3 files changed

+64
-32
lines changed

3 files changed

+64
-32
lines changed

crates/core/src/operations/merge/mod.rs

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ use crate::operations::cdc::*;
8484
use crate::operations::merge::barrier::find_node;
8585
use crate::operations::write::execution::write_execution_plan_v2;
8686
use crate::operations::write::generated_columns::{
87-
add_generated_columns, add_missing_generated_columns,
87+
able_to_gc, add_generated_columns, add_missing_generated_columns,
8888
};
8989
use crate::operations::write::WriterStatsConfig;
9090
use crate::protocol::{DeltaOperation, MergePredicate};
@@ -722,7 +722,7 @@ impl ExtensionPlanner for MergeMetricExtensionPlanner {
722722
#[allow(clippy::too_many_arguments)]
723723
async fn execute(
724724
predicate: Expression,
725-
source: DataFrame,
725+
mut source: DataFrame,
726726
log_store: LogStoreRef,
727727
snapshot: DeltaTableState,
728728
_state: SessionState,
@@ -779,13 +779,18 @@ async fn execute(
779779
None => TableReference::bare(UNNAMED_TABLE),
780780
};
781781

782-
let generated_col_expressions = snapshot
783-
.schema()
784-
.get_generated_columns()
785-
.unwrap_or_default();
782+
let mut generated_col_exp = None;
783+
let mut missing_generated_col = None;
784+
785+
if able_to_gc(&snapshot)? {
786+
let generated_col_expressions = snapshot.schema().get_generated_columns()?;
787+
let (source_with_gc, missing_generated_columns) =
788+
add_missing_generated_columns(source, &generated_col_expressions)?;
786789

787-
let (source, missing_generated_columns) =
788-
add_missing_generated_columns(source, &generated_col_expressions)?;
790+
source = source_with_gc;
791+
generated_col_exp = Some(generated_col_expressions);
792+
missing_generated_col = Some(missing_generated_columns);
793+
}
789794
// This is only done to provide the source columns with a correct table reference. Just renaming the columns does not work
790795
let source = LogicalPlanBuilder::scan(
791796
source_name.clone(),
@@ -1345,12 +1350,16 @@ async fn execute(
13451350
.select(write_projection)?
13461351
};
13471352

1348-
projected = add_generated_columns(
1349-
projected,
1350-
&generated_col_expressions,
1351-
&missing_generated_columns,
1352-
&state,
1353-
)?;
1353+
if let Some(generated_col_expressions) = generated_col_exp {
1354+
if let Some(missing_generated_columns) = missing_generated_col {
1355+
projected = add_generated_columns(
1356+
projected,
1357+
&generated_col_expressions,
1358+
&missing_generated_columns,
1359+
&state,
1360+
)?;
1361+
}
1362+
}
13541363

13551364
let merge_final = &projected.into_unoptimized_plan();
13561365
let write = state.create_physical_plan(merge_final).await?;

crates/core/src/operations/write/generated_columns.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,26 @@
1+
use crate::table::state::DeltaTableState;
12
use datafusion::{execution::SessionState, prelude::DataFrame};
23
use datafusion_common::ScalarValue;
34
use datafusion_expr::{col, when, Expr, ExprSchemable};
45
use tracing::debug;
56

67
use crate::{kernel::DataCheck, table::GeneratedColumn, DeltaResult};
78

9+
/// check if the writer version is able to write generated columns
10+
pub fn able_to_gc(snapshot: &DeltaTableState) -> DeltaResult<bool> {
11+
if let Some(features) = &snapshot.protocol().writer_features {
12+
if snapshot.protocol().min_writer_version < 4 {
13+
return Ok(false);
14+
}
15+
if snapshot.protocol().min_writer_version == 7
16+
&& !features.contains(&delta_kernel::table_features::WriterFeature::GeneratedColumns)
17+
{
18+
return Ok(false);
19+
}
20+
}
21+
Ok(true)
22+
}
23+
824
/// Add generated column expressions to a dataframe
925
pub fn add_missing_generated_columns(
1026
mut df: DataFrame,

crates/core/src/operations/write/mod.rs

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub mod writer;
3434
use arrow_schema::Schema;
3535
pub use configs::WriterStatsConfig;
3636
use datafusion::execution::SessionStateBuilder;
37-
use generated_columns::{add_generated_columns, add_missing_generated_columns};
37+
use generated_columns::{able_to_gc, add_generated_columns, add_missing_generated_columns};
3838
use metrics::{WriteMetricExtensionPlanner, SOURCE_COUNT_ID, SOURCE_COUNT_METRIC};
3939
use std::collections::HashMap;
4040
use std::str::FromStr;
@@ -445,18 +445,21 @@ impl std::future::IntoFuture for WriteBuilder {
445445
state
446446
}
447447
};
448-
let generated_col_expressions = this
449-
.snapshot
450-
.as_ref()
451-
.map(|v| v.schema().get_generated_columns().unwrap_or_default())
452-
.unwrap_or_default();
453-
454448
let mut schema_drift = false;
455-
let source = DataFrame::new(state.clone(), this.input.unwrap().as_ref().clone());
456-
457-
// Add missing generated columns to source_df
458-
let (mut source, missing_generated_columns) =
459-
add_missing_generated_columns(source, &generated_col_expressions)?;
449+
let mut generated_col_exp = None;
450+
let mut missing_gen_col = None;
451+
let mut source = DataFrame::new(state.clone(), this.input.unwrap().as_ref().clone());
452+
if let Some(snapshot) = &this.snapshot {
453+
if able_to_gc(snapshot)? {
454+
let generated_col_expressions = snapshot.schema().get_generated_columns()?;
455+
// Add missing generated columns to source_df
456+
let (source_with_gc, missing_generated_columns) =
457+
add_missing_generated_columns(source, &generated_col_expressions)?;
458+
source = source_with_gc;
459+
missing_gen_col = Some(missing_generated_columns);
460+
generated_col_exp = Some(generated_col_expressions);
461+
}
462+
}
460463

461464
let source_schema: Arc<Schema> = Arc::new(source.schema().as_arrow().clone());
462465

@@ -527,12 +530,16 @@ impl std::future::IntoFuture for WriteBuilder {
527530
source = source.select(schema_evolution_projection)?;
528531
}
529532

530-
source = add_generated_columns(
531-
source,
532-
&generated_col_expressions,
533-
&missing_generated_columns,
534-
&state,
535-
)?;
533+
if let Some(generated_columns_exp) = generated_col_exp {
534+
if let Some(missing_generated_col) = missing_gen_col {
535+
source = add_generated_columns(
536+
source,
537+
&generated_columns_exp,
538+
&missing_generated_col,
539+
&state,
540+
)?;
541+
}
542+
}
536543

537544
let source = LogicalPlan::Extension(Extension {
538545
node: Arc::new(MetricObserver {

0 commit comments

Comments
 (0)