Skip to content

Commit 7bca62f

Browse files
viiryaliancheng
authored andcommitted
[SPARK-6607][SQL] Check invalid characters for Parquet schema and show error messages
'(' and ')' are special characters used in Parquet schema for type annotation. When we run an aggregation query, we will obtain attribute name such as "MAX(a)". If we directly store the generated DataFrame as Parquet file, it causes failure when reading and parsing the stored schema string. Several methods can be adopted to solve this. This pr uses a simplest one to just replace attribute names before generating Parquet schema based on these attributes. Another possible method might be modifying all aggregation expression names from "func(column)" to "func[column]". Author: Liang-Chi Hsieh <[email protected]> Closes apache#5263 from viirya/parquet_aggregation_name and squashes the following commits: 2d70542 [Liang-Chi Hsieh] Address comment. 463dff4 [Liang-Chi Hsieh] Instead of replacing special chars, showing error message to user to suggest using Alias. 1de001d [Liang-Chi Hsieh] Replace special characters '(' and ')' of Parquet schema.
1 parent da25c86 commit 7bca62f

File tree

2 files changed

+30
-0
lines changed

2 files changed

+30
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
390390

391391
def convertFromAttributes(attributes: Seq[Attribute],
392392
toThriftSchemaNames: Boolean = false): MessageType = {
393+
checkSpecialCharacters(attributes)
393394
val fields = attributes.map(
394395
attribute =>
395396
fromDataType(attribute.dataType, attribute.name, attribute.nullable,
@@ -404,7 +405,20 @@ private[parquet] object ParquetTypesConverter extends Logging {
404405
}
405406
}
406407

408+
private def checkSpecialCharacters(schema: Seq[Attribute]) = {
409+
// ,;{}()\n\t= and space character are special characters in Parquet schema
410+
schema.map(_.name).foreach { name =>
411+
if (name.matches(".*[ ,;{}()\n\t=].*")) {
412+
sys.error(
413+
s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\n\t=".
414+
|Please use alias to rename it.
415+
""".stripMargin.split("\n").mkString(" "))
416+
}
417+
}
418+
}
419+
407420
def convertToString(schema: Seq[Attribute]): String = {
421+
checkSpecialCharacters(schema)
408422
StructType.fromAttributes(schema).json
409423
}
410424

sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,22 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
688688

689689
sql("DROP TABLE alwaysNullable")
690690
}
691+
692+
test("Aggregation attribute names can't contain special chars \" ,;{}()\\n\\t=\"") {
693+
val tempDir = Utils.createTempDir()
694+
val filePath = new File(tempDir, "testParquet").getCanonicalPath
695+
val filePath2 = new File(tempDir, "testParquet2").getCanonicalPath
696+
697+
val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
698+
val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int")
699+
intercept[RuntimeException](df2.saveAsParquetFile(filePath))
700+
701+
val df3 = df2.toDF("str", "max_int")
702+
df3.saveAsParquetFile(filePath2)
703+
val df4 = parquetFile(filePath2)
704+
checkAnswer(df4, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil)
705+
assert(df4.columns === Array("str", "max_int"))
706+
}
691707
}
692708

693709
class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {

0 commit comments

Comments
 (0)