Skip to content

Commit c42c3fc

Browse files
yhuailiancheng
authored andcommitted
[SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata
https://issues.apache.org/jira/browse/SPARK-6575 Author: Yin Huai <[email protected]> This patch had conflicts when merged, resolved by Committer: Cheng Lian <[email protected]> Closes apache#5339 from yhuai/parquetRelationCache and squashes the following commits: b0e1a42 [Yin Huai] Address comments. 83d9846 [Yin Huai] Remove unnecessary change. c0dc7a4 [Yin Huai] Cache converted parquet relations.
1 parent 440ea31 commit c42c3fc

File tree

3 files changed

+23
-12
lines changed

3 files changed

+23
-12
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
116116
}
117117

118118
override def refreshTable(databaseName: String, tableName: String): Unit = {
119-
// refresh table does not eagerly reload the cache. It just invalidate the cache.
119+
// refreshTable does not eagerly reload the cache. It just invalidate the cache.
120120
// Next time when we use the table, it will be populated in the cache.
121+
// Since we also cache ParquetRealtions converted from Hive Parquet tables and
122+
// adding converted ParquetRealtions into the cache is not defined in the load function
123+
// of the cache (instead, we add the cache entry in convertToParquetRelation),
124+
// it is better at here to invalidate the cache to avoid confusing waring logs from the
125+
// cache loader (e.g. cannot find data source provider, which is only defined for
126+
// data source table.).
121127
invalidateTable(databaseName, tableName)
122128
}
123129

@@ -242,21 +248,27 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
242248
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
243249

244250
def getCached(
245-
tableIdentifier: QualifiedTableName,
246-
pathsInMetastore: Seq[String],
247-
schemaInMetastore: StructType,
248-
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
251+
tableIdentifier: QualifiedTableName,
252+
pathsInMetastore: Seq[String],
253+
schemaInMetastore: StructType,
254+
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
249255
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
250256
case null => None // Cache miss
251-
case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =>
257+
case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
252258
// If we have the same paths, same schema, and same partition spec,
253259
// we will use the cached Parquet Relation.
254260
val useCached =
255-
parquetRelation.paths == pathsInMetastore &&
261+
parquetRelation.paths.toSet == pathsInMetastore.toSet &&
256262
logical.schema.sameType(metastoreSchema) &&
257263
parquetRelation.maybePartitionSpec == partitionSpecInMetastore
258264

259-
if (useCached) Some(logical) else None
265+
if (useCached) {
266+
Some(logical)
267+
} else {
268+
// If the cached relation is not updated, we invalidate it right away.
269+
cachedDataSourceTables.invalidate(tableIdentifier)
270+
None
271+
}
260272
case other =>
261273
logWarning(
262274
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored " +

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,13 @@ case class DropTable(
5858
try {
5959
hiveContext.cacheManager.tryUncacheQuery(hiveContext.table(tableName))
6060
} catch {
61-
// This table's metadata is not in
61+
// This table's metadata is not in Hive metastore (e.g. the table does not exist).
6262
case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
63+
case _: org.apache.spark.sql.catalyst.analysis.NoSuchTableException =>
6364
// Other Throwables can be caused by users providing wrong parameters in OPTIONS
6465
// (e.g. invalid paths). We catch it and log a warning message.
6566
// Users should be able to drop such kinds of tables regardless if there is an error.
66-
case e: Throwable => log.warn(s"${e.getMessage}")
67+
case e: Throwable => log.warn(s"${e.getMessage}", e)
6768
}
6869
hiveContext.invalidateTable(tableName)
6970
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")

sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -473,15 +473,13 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
473473
// Right now, insert into a partitioned Parquet is not supported in data source Parquet.
474474
// So, we expect it is not cached.
475475
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
476-
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
477476
sql(
478477
"""
479478
|INSERT INTO TABLE test_parquet_partitioned_cache_test
480479
|PARTITION (date='2015-04-02')
481480
|select a, b from jt
482481
""".stripMargin)
483482
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
484-
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
485483

486484
// Make sure we can cache the partitioned table.
487485
table("test_parquet_partitioned_cache_test")

0 commit comments

Comments
 (0)