Skip to content

Commit ae5568e

Browse files
committed
[SPARK-34963][SQL][2.4] Fix nested column pruning for extracting case-insensitive struct field from array of struct
### What changes were proposed in this pull request? This patch proposes a fix of nested column pruning for extracting case-insensitive struct field from array of struct. This is the backport of apache#32059 to branch-2.4. ### Why are the changes needed? Under case-insensitive mode, nested column pruning rule cannot correctly push down extractor of a struct field of an array of struct, e.g., ```scala val query = spark.table("contacts").select("friends.First", "friends.MiDDle") ``` Error stack: ``` [info] java.lang.IllegalArgumentException: Field "First" does not exist. [info] Available fields: [info] at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:274) [info] at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:274) [info] at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) [info] at scala.collection.AbstractMap.getOrElse(Map.scala:59) [info] at org.apache.spark.sql.types.StructType.apply(StructType.scala:273) [info] at org.apache.spark.sql.execution.ProjectionOverSchema$$anonfun$getProjection$3.apply(ProjectionOverSchema.scala:44) [info] at org.apache.spark.sql.execution.ProjectionOverSchema$$anonfun$getProjection$3.apply(ProjectionOverSchema.scala:41) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes apache#32112 from viirya/fix-array-nested-pruning-2.4. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Liang-Chi Hsieh <[email protected]>
1 parent b4d9d4a commit ae5568e

File tree

3 files changed

+65
-12
lines changed

3 files changed

+65
-12
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,14 @@ private[execution] case class ProjectionOverSchema(schema: StructType) {
4040
case a: GetArrayStructFields =>
4141
getProjection(a.child).map(p => (p, p.dataType)).map {
4242
case (projection, ArrayType(projSchema @ StructType(_), _)) =>
43+
// For case-sensitivity aware field resolution, we should take `ordinal` which
44+
// points to correct struct field.
45+
val selectedField = a.child.dataType.asInstanceOf[ArrayType]
46+
.elementType.asInstanceOf[StructType](a.ordinal)
47+
val prunedField = projSchema(selectedField.name)
4348
GetArrayStructFields(projection,
44-
projSchema(a.field.name),
45-
projSchema.fieldIndex(a.field.name),
49+
prunedField.copy(name = a.field.name),
50+
projSchema.fieldIndex(selectedField.name),
4651
projSchema.size,
4752
a.containsNull)
4853
case (_, projSchema) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/SelectedField.scala

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,25 @@ private[execution] object SelectedField {
8181
case GetArrayItem(child, _) =>
8282
selectField(child, fieldOpt)
8383
// Handles case "expr0.field.subfield", where "expr0" and "expr0.field" are of array type.
84-
case GetArrayStructFields(child: GetArrayStructFields,
85-
field @ StructField(name, dataType, nullable, metadata), _, _, _) =>
86-
val childField = fieldOpt.map(field => StructField(name,
87-
wrapStructType(dataType, field),
88-
nullable, metadata)).orElse(Some(field))
84+
case GetArrayStructFields(child: GetArrayStructFields, _, ordinal, _, _) =>
85+
// For case-sensitivity aware field resolution, we should take `ordinal` which
86+
// points to correct struct field.
87+
val selectedField = child.dataType.asInstanceOf[ArrayType]
88+
.elementType.asInstanceOf[StructType](ordinal)
89+
val childField = fieldOpt.map(field => StructField(selectedField.name,
90+
wrapStructType(selectedField.dataType, field),
91+
selectedField.nullable, selectedField.metadata)).orElse(Some(selectedField))
8992
selectField(child, childField)
9093
// Handles case "expr0.field", where "expr0" is of array type.
91-
case GetArrayStructFields(child,
92-
field @ StructField(name, dataType, nullable, metadata), _, _, _) =>
94+
case GetArrayStructFields(child, _, ordinal, _, _) =>
95+
// For case-sensitivity aware field resolution, we should take `ordinal` which
96+
// points to correct struct field.
97+
val selectedField = child.dataType.asInstanceOf[ArrayType]
98+
.elementType.asInstanceOf[StructType](ordinal)
9399
val childField =
94-
fieldOpt.map(field => StructField(name,
95-
wrapStructType(dataType, field),
96-
nullable, metadata)).orElse(Some(field))
100+
fieldOpt.map(field => StructField(selectedField.name,
101+
wrapStructType(selectedField.dataType, field),
102+
selectedField.nullable, selectedField.metadata)).orElse(Some(selectedField))
97103
selectField(child, childField)
98104
// Handles case "expr0.field[key]", where "expr0" is of struct type and "expr0.field" is of
99105
// map type.

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,4 +416,46 @@ class ParquetSchemaPruningSuite
416416
assert(scanSchema === expectedScanSchema)
417417
}
418418
}
419+
420+
testSchemaPruning("SPARK-34963: extract case-insensitive struct field from array") {
421+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
422+
val query1 = spark.table("contacts")
423+
.select("friends.First", "friends.MiDDle")
424+
checkScan(query1, "struct<friends:array<struct<first:string,middle:string>>>")
425+
checkAnswer(query1,
426+
Row(Array.empty[String], Array.empty[String]) ::
427+
Row(Array("Susan"), Array("Z.")) ::
428+
Row(null, null) ::
429+
Row(null, null) :: Nil)
430+
431+
val query2 = spark.table("contacts")
432+
.where("friends.First is not null")
433+
.select("friends.First", "friends.MiDDle")
434+
checkScan(query2, "struct<friends:array<struct<first:string,middle:string>>>")
435+
checkAnswer(query2,
436+
Row(Array.empty[String], Array.empty[String]) ::
437+
Row(Array("Susan"), Array("Z.")) :: Nil)
438+
}
439+
}
440+
441+
testSchemaPruning("SPARK-34963: extract case-insensitive struct field from struct") {
442+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
443+
val query1 = spark.table("contacts")
444+
.select("Name.First", "NAME.MiDDle")
445+
checkScan(query1, "struct<name:struct<first:string,middle:string>>")
446+
checkAnswer(query1,
447+
Row("Jane", "X.") ::
448+
Row("Janet", null) ::
449+
Row("Jim", null) ::
450+
Row("John", "Y.") :: Nil)
451+
452+
val query2 = spark.table("contacts")
453+
.where("Name.MIDDLE is not null")
454+
.select("Name.First", "NAME.MiDDle")
455+
checkScan(query2, "struct<name:struct<first:string,middle:string>>")
456+
checkAnswer(query2,
457+
Row("Jane", "X.") ::
458+
Row("John", "Y.") :: Nil)
459+
}
460+
}
419461
}

0 commit comments

Comments
 (0)