Skip to content

Commit fd3069a

Browse files
nikolamand-dbcloud-fan
authored andcommitted
[SPARK-49163][SQL] Attempt to create table based on broken parquet partition data results should return user-facing error
### What changes were proposed in this pull request? Create an example parquet table with partitions and insert data in Spark: ``` create table t(col1 string, col2 string, col3 string) using parquet location 'some/path/parquet-test' partitioned by (col1, col2); insert into t (col1, col2, col3) values ('a', 'b', 'c'); ``` Go into the `parquet-test` path in the filesystem and try to copy parquet data file from path `col1=a/col2=b` directory into `col1=a`. After that, try to create new table based on parquet data in Spark: ``` create table broken_table using parquet location 'some/path/parquet-test'; ``` This query errors with internal error. Stack trace excerpts: ``` org.apache.spark.SparkException: [INTERNAL_ERROR] Eagerly executed command failed. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace. SQLSTATE: XX000 ... Caused by: java.lang.AssertionError: assertion failed: Conflicting partition column names detected: Partition column name list #0: col1 Partition column name list #1: col1, col2For partitioned table directories, data files should only live in leaf directories. And directories at the same level should have the same partition column name. Please check the following directories for unexpected files or inconsistent partition column names: file:some/path/parquet-test/col1=a file:some/path/parquet-test/col1=a/col2=b at scala.Predef$.assert(Predef.scala:279) at org.apache.spark.sql.execution.datasources.PartitioningUtils$.resolvePartitions(PartitioningUtils.scala:391) ... ``` Fix this by changing internal error to user-facing error. ### Why are the changes needed? Replace internal error with user-facing one for valid sequence of Spark SQL operations. ### Does this PR introduce _any_ user-facing change? Yes, it presents the user with regular error instead of internal error. ### How was this patch tested? Added checks to `ParquetPartitionDiscoverySuite` which simulate the described scenario by manually breaking parquet table in the filesystem. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47668 from nikolamand-db/SPARK-49163. Authored-by: Nikola Mandic <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 731a104 commit fd3069a

File tree

5 files changed

+108
-47
lines changed

5 files changed

+108
-47
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,17 @@
625625
],
626626
"sqlState" : "40000"
627627
},
628+
"CONFLICTING_PARTITION_COLUMN_NAMES" : {
629+
"message" : [
630+
"Conflicting partition column names detected:",
631+
"<distinctPartColLists>",
632+
"For partitioned table directories, data files should only live in leaf directories.",
633+
"And directories at the same level should have the same partition column name.",
634+
"Please check the following directories for unexpected files or inconsistent partition column names:",
635+
"<suspiciousPaths>"
636+
],
637+
"sqlState" : "KD009"
638+
},
628639
"CONNECT" : {
629640
"message" : [
630641
"Generic Spark Connect error."

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2837,4 +2837,16 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
28372837
"parameter" -> toSQLId("unit"),
28382838
"invalidValue" -> s"'$invalidValue'"))
28392839
}
2840+
2841+
def conflictingPartitionColumnNamesError(
2842+
distinctPartColLists: Seq[String],
2843+
suspiciousPaths: Seq[Path]): SparkRuntimeException = {
2844+
new SparkRuntimeException(
2845+
errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES",
2846+
messageParameters = Map(
2847+
"distinctPartColLists" -> distinctPartColLists.mkString("\n\t", "\n\t", "\n"),
2848+
"suspiciousPaths" -> suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
2849+
)
2850+
)
2851+
}
28402852
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import scala.util.control.NonFatal
2929

3030
import org.apache.hadoop.fs.Path
3131

32+
import org.apache.spark.SparkRuntimeException
3233
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
3334
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
3435
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -386,9 +387,9 @@ object PartitioningUtils extends SQLConfHelper {
386387
} else {
387388
pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase()))
388389
}
389-
assert(
390-
partColNames.distinct.size == 1,
391-
listConflictingPartitionColumns(pathsWithPartitionValues))
390+
if (partColNames.distinct.size != 1) {
391+
throw conflictingPartitionColumnsError(pathsWithPartitionValues)
392+
}
392393

393394
// Resolves possible type conflicts for each column
394395
val values = pathsWithPartitionValues.map(_._2)
@@ -404,8 +405,8 @@ object PartitioningUtils extends SQLConfHelper {
404405
}
405406
}
406407

407-
private[datasources] def listConflictingPartitionColumns(
408-
pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
408+
private[datasources] def conflictingPartitionColumnsError(
409+
pathWithPartitionValues: Seq[(Path, PartitionValues)]): SparkRuntimeException = {
409410
val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct
410411

411412
def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
@@ -423,13 +424,8 @@ object PartitioningUtils extends SQLConfHelper {
423424
// Lists out those non-leaf partition directories that also contain files
424425
val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths)
425426

426-
s"Conflicting partition column names detected:\n" +
427-
distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
428-
"For partitioned table directories, data files should only live in leaf directories.\n" +
429-
"And directories at the same level should have the same partition column name.\n" +
430-
"Please check the following directories for unexpected files or " +
431-
"inconsistent partition column names:\n" +
432-
suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
427+
QueryExecutionErrors.conflictingPartitionColumnNamesError(
428+
distinctPartColLists, suspiciousPaths)
433429
}
434430

435431
// scalastyle:off line.size.limit

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ class FileIndexSuite extends SharedSparkSession {
112112
}
113113

114114
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
115-
val msg = intercept[AssertionError] {
115+
val msg = intercept[SparkRuntimeException] {
116116
val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None)
117117
fileIndex.partitionSpec()
118118
}.getMessage

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala

Lines changed: 76 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import com.google.common.io.Files
2727
import org.apache.hadoop.fs.Path
2828
import org.apache.parquet.hadoop.ParquetOutputFormat
2929

30-
import org.apache.spark.SparkConf
30+
import org.apache.spark.{SparkConf, SparkRuntimeException}
3131
import org.apache.spark.sql._
3232
import org.apache.spark.sql.catalyst.InternalRow
3333
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
@@ -958,54 +958,58 @@ abstract class ParquetPartitionDiscoverySuite
958958
}
959959
}
960960

961-
test("listConflictingPartitionColumns") {
962-
def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = {
963-
val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) =>
964-
s"\tPartition column name list #$index: $list"
965-
}.mkString("\n", "\n", "\n")
966-
967-
// scalastyle:off
968-
s"""Conflicting partition column names detected:
969-
|$conflictingColNameLists
970-
|For partitioned table directories, data files should only live in leaf directories.
971-
|And directories at the same level should have the same partition column name.
972-
|Please check the following directories for unexpected files or inconsistent partition column names:
973-
|${paths.map("\t" + _).mkString("\n", "\n", "")}
974-
""".stripMargin.trim
975-
// scalastyle:on
976-
}
977-
978-
assert(
979-
listConflictingPartitionColumns(
961+
test("conflictingPartitionColumnsError") {
962+
checkError(
963+
exception = conflictingPartitionColumnsError(
980964
Seq(
981965
(new Path("file:/tmp/foo/a=1"),
982966
PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))),
983967
(new Path("file:/tmp/foo/b=1"),
984-
PartitionValues(Seq("b"), Seq(TypedPartValue("1", IntegerType)))))).trim ===
985-
makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1", "file:/tmp/foo/b=1")))
968+
PartitionValues(Seq("b"), Seq(TypedPartValue("1", IntegerType))))
969+
)
970+
),
971+
errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES",
972+
parameters = Map(
973+
"distinctPartColLists" ->
974+
"\n\tPartition column name list #0: a\n\tPartition column name list #1: b\n",
975+
"suspiciousPaths" -> "\n\tfile:/tmp/foo/a=1\n\tfile:/tmp/foo/b=1"
976+
)
977+
)
986978

987-
assert(
988-
listConflictingPartitionColumns(
979+
checkError(
980+
exception = conflictingPartitionColumnsError(
989981
Seq(
990982
(new Path("file:/tmp/foo/a=1/_temporary"),
991983
PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))),
992984
(new Path("file:/tmp/foo/a=1"),
993-
PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))))).trim ===
994-
makeExpectedMessage(
995-
Seq("a"),
996-
Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1")))
985+
PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType))))
986+
)
987+
),
988+
errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES",
989+
parameters = Map(
990+
"distinctPartColLists" ->
991+
"\n\tPartition column name list #0: a\n",
992+
"suspiciousPaths" -> "\n\tfile:/tmp/foo/a=1/_temporary\n\tfile:/tmp/foo/a=1"
993+
)
994+
)
997995

998-
assert(
999-
listConflictingPartitionColumns(
996+
checkError(
997+
exception = conflictingPartitionColumnsError(
1000998
Seq(
1001999
(new Path("file:/tmp/foo/a=1"),
10021000
PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))),
10031001
(new Path("file:/tmp/foo/a=1/b=foo"),
10041002
PartitionValues(Seq("a", "b"),
1005-
Seq(TypedPartValue("1", IntegerType), TypedPartValue("foo", StringType)))))).trim ===
1006-
makeExpectedMessage(
1007-
Seq("a", "a, b"),
1008-
Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo")))
1003+
Seq(TypedPartValue("1", IntegerType), TypedPartValue("foo", StringType))))
1004+
)
1005+
),
1006+
errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES",
1007+
parameters = Map(
1008+
"distinctPartColLists" ->
1009+
"\n\tPartition column name list #0: a\n\tPartition column name list #1: a, b\n",
1010+
"suspiciousPaths" -> "\n\tfile:/tmp/foo/a=1\n\tfile:/tmp/foo/a=1/b=foo"
1011+
)
1012+
)
10091013
}
10101014

10111015
test("Parallel partition discovery") {
@@ -1145,6 +1149,44 @@ abstract class ParquetPartitionDiscoverySuite
11451149
checkAnswer(res, Seq(Row(1, 2, 3, 4.0f)))
11461150
}
11471151
}
1152+
1153+
test("SPARK-49163: Attempt to create table based on broken parquet partition data") {
1154+
withTempDir { dir =>
1155+
val data = Seq[(String, String, String)](("a", "b", "c"))
1156+
data
1157+
.toDF("col1", "col2", "col3")
1158+
.write
1159+
.mode("overwrite")
1160+
.partitionBy("col1", "col2")
1161+
.parquet(dir.getCanonicalPath)
1162+
1163+
// Structure of parquet table in filesystem:
1164+
// <base>
1165+
// +- col1=a
1166+
// +- col2=b
1167+
// |- part-00000.parquet
1168+
1169+
val partition = new File(dir, "col1=a")
1170+
val dummyData = new File(partition, "dummy")
1171+
dummyData.createNewFile()
1172+
1173+
// Structure of parquet table in filesystem is now corrupt:
1174+
// <base>
1175+
// +- col1=a
1176+
// |- dummy
1177+
// +- col2=b
1178+
// |- part-00000.parquet
1179+
1180+
val exception = intercept[SparkRuntimeException] {
1181+
spark.read.parquet(dir.toString)
1182+
}
1183+
val msg = exception.getMessage
1184+
assert(exception.getErrorClass === "CONFLICTING_PARTITION_COLUMN_NAMES")
1185+
// Partitions inside the error message can be presented in any order
1186+
assert("Partition column name list #[0-1]: col1".r.findFirstIn(msg).isDefined)
1187+
assert("Partition column name list #[0-1]: col1, col2".r.findFirstIn(msg).isDefined)
1188+
}
1189+
}
11481190
}
11491191

11501192
class ParquetV1PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite {

0 commit comments

Comments
 (0)