Skip to content

fix: if field contains space in constraint expression, checks will fail #3374

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 68 additions & 5 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1418,15 +1418,16 @@ impl DeltaDataChecker {
return Ok(());
}
let table = MemTable::try_new(record_batch.schema(), vec![vec![record_batch.clone()]])?;

let schema = table.schema();
// Use a random table name to avoid clashes when running multiple parallel tasks, e.g. when using a partitioned table
let table_name: String = uuid::Uuid::new_v4().to_string();
self.ctx.register_table(&table_name, Arc::new(table))?;

let mut violations: Vec<String> = Vec::new();

for check in checks {
if check.get_name().contains('.') {
let check_name = check.get_name();
if check_name.contains('.') {
return Err(DeltaTableError::Generic(
"Support for nested columns is not supported.".to_string(),
));
Expand All @@ -1435,12 +1436,22 @@ impl DeltaDataChecker {
let field_to_select = if check.as_any().is::<Constraint>() {
"*"
} else {
check.get_name()
check_name
};

// Loop through schema to find the matching field. If the field has a whitespace, we
// need to backtick it, since the expression is an unquoted string
let mut expression = check.get_expression().to_string();
for field in schema.fields() {
if expression.contains(field.name()) {
expression =
expression.replace(field.name(), format!("`{}` ", field.name()).as_str());
break;
}
}
let sql = format!(
"SELECT {} FROM `{table_name}` WHERE NOT ({}) LIMIT 1",
field_to_select,
check.get_expression()
field_to_select, expression
);

let dfs: Vec<RecordBatch> = self.ctx.sql(&sql).await?.collect().await?;
Expand Down Expand Up @@ -2236,6 +2247,58 @@ mod tests {
assert!(result.is_err());
}

/// Ensure that constraints when there are spaces in the field name still work
///
/// See <https://github.com/delta-io/delta-rs/pull/3374>
#[tokio::test]
async fn test_constraints_with_spacey_fields() -> DeltaResult<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", ArrowDataType::Utf8, false),
Field::new("b bop", ArrowDataType::Int32, false),
]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from(vec![
"a", "b bop", "c", "d",
])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
],
)?;

// Valid invariants return Ok(())
let constraints = vec![
Constraint::new("custom a", "a is not null"),
Constraint::new("custom_b", "b bop < 1000"),
];
assert!(DeltaDataChecker::new_with_constraints(constraints)
.check_batch(&batch)
.await
.is_ok());

// Violated invariants returns an error with list of violations
let constraints = vec![
Constraint::new("custom_a", "a is null"),
Constraint::new("custom_B", "b bop < 100"),
];
let result = DeltaDataChecker::new_with_constraints(constraints)
.check_batch(&batch)
.await;
assert!(result.is_err());
assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
if let Err(DeltaTableError::InvalidData { violations }) = result {
assert_eq!(violations.len(), 2);
}

// Irrelevant constraints return a different error
let constraints = vec![Constraint::new("custom_c", "c > 2000")];
let result = DeltaDataChecker::new_with_constraints(constraints)
.check_batch(&batch)
.await;
assert!(result.is_err());
Ok(())
}

#[test]
fn roundtrip_test_delta_exec_plan() {
let ctx = SessionContext::new();
Expand Down
Loading