Skip to content

Commit 8b43301

Browse files
author
Sumedh Wale
committed
Add caching of resolved relations in SnappySessionCatalog
- resolving relations especially for cases like external file-based tables having large number of partitions can take a long time due to meta-data gather/process, so added a cache for resolved relations in SnappySessionCatalog - invalidate this cache whenever ExternalCatalog is being invalidated; in addition check for whether the CatalogTable looked up from ExternalCatalog matches the one cached previously and if not then invalidate and re-fetch -- this handles cases where table got invalidated from another session - also add invalidation for the case of inserts into hadoop/hive tables since that will result in new files not present in meta-data and can also result in creation of new partitions - added a dunit test to check the above i.e. correct results after adding new data/partitions from a different session
1 parent 97078ae commit 8b43301

File tree

18 files changed

+151
-36
lines changed

18 files changed

+151
-36
lines changed

cluster/src/dunit/scala/org/apache/spark/sql/ColumnBatchAndExternalTableDUnitTest.scala

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717
package org.apache.spark.sql
1818

1919

20+
import java.io.File
21+
2022
import com.pivotal.gemfirexd.internal.engine.Misc
2123
import io.snappydata.Property
2224
import io.snappydata.cluster.ClusterManagerTestBase
2325
import io.snappydata.test.dunit.{AvailablePortHelper, SerializableCallable}
2426
import io.snappydata.util.TestUtils
27+
import org.apache.commons.io.FileUtils
2528
import org.scalatest.Assertions
2629

2730
import org.apache.spark.internal.Logging
@@ -411,6 +414,54 @@ class ColumnBatchAndExternalTableDUnitTest(s: String) extends ClusterManagerTest
411414
assert(expected - max <= TestUtils.defaultCores,
412415
s"Lower limit of concurrent tasks = $expected, actual = $max")
413416
}
417+
418+
def testExternalTableMetadataCacheWithInserts(): Unit = {
419+
val dataDir = new File("extern1")
420+
FileUtils.deleteQuietly(dataDir)
421+
dataDir.mkdir()
422+
// create external parquet table and insert some data
423+
val session = new SnappySession(sc)
424+
session.sql("create table extern1 (id long, data string, stat string) using parquet " +
425+
s"options (path '${dataDir.getAbsolutePath}') partitioned by (stat)")
426+
session.sql("insert into extern1 select id, 'data_' || id, 'stat' || (id % 10) " +
427+
"from range(100000)")
428+
429+
// check results
430+
assert(session.sql("select * from extern1 where stat = 'stat1'").collect().length === 10000)
431+
assert(session.sql("select * from extern1 where stat = 'stat2'").collect().length === 10000)
432+
433+
// insert more data from another session
434+
val session2 = new SnappySession(sc)
435+
session2.sql("insert into extern1 select id, 'data_' || id, 'stat' || (id % 10) " +
436+
"from range(10000)")
437+
438+
// check results
439+
assert(session.sql("select * from extern1 where stat = 'stat1'").collect().length === 11000)
440+
assert(session.sql("select * from extern1 where stat = 'stat2'").collect().length === 11000)
441+
assert(session.sql("select * from extern1 where stat = 'stat3'").collect().length === 11000)
442+
assert(session.sql("select * from extern1 where stat = 'stat11'").collect().length === 0)
443+
444+
// insert more data with new partitions
445+
session2.sql("insert into extern1 select id, 'data_' || id, 'stat' || (id % 20) " +
446+
"from range(10000)")
447+
448+
// check results
449+
assert(session.sql("select * from extern1 where stat = 'stat1'").collect().length === 11500)
450+
assert(session.sql("select * from extern1 where stat = 'stat2'").collect().length === 11500)
451+
assert(session.sql("select * from extern1 where stat = 'stat3'").collect().length === 11500)
452+
assert(session.sql("select * from extern1 where stat = 'stat11'").collect().length === 500)
453+
454+
assert(session2.sql("select * from extern1 where stat = 'stat1'").collect().length === 11500)
455+
assert(session2.sql("select * from extern1 where stat = 'stat2'").collect().length === 11500)
456+
assert(session2.sql("select * from extern1 where stat = 'stat3'").collect().length === 11500)
457+
assert(session2.sql("select * from extern1 where stat = 'stat11'").collect().length === 500)
458+
459+
session.sql("drop table extern1")
460+
session.clear()
461+
session2.clear()
462+
463+
FileUtils.deleteDirectory(dataDir)
464+
}
414465
}
415466

416467
case class AirlineData(year: Int, month: Int, dayOfMonth: Int,

cluster/src/dunit/scala/org/apache/spark/sql/SmartConnectorFunctions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ object SmartConnectorFunctions {
7676

7777
val sc = SparkContext.getOrCreate(conf)
7878
val snc = SnappyContext(sc)
79-
snc.snappySession.externalCatalog.invalidateAll()
79+
snc.snappySession.sessionCatalog.invalidateAll()
8080
val sqlContext = new SparkSession(sc).sqlContext
8181
val pw = new PrintWriter(new FileOutputStream(
8282
new File(s"ValidateNWQueries_$tableType.out"), true))

cluster/src/main/scala/org/apache/spark/memory/SnappyUnifiedMemoryManager.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -793,9 +793,10 @@ class SnappyUnifiedMemoryManager private[memory](
793793
numBytes: Long,
794794
memoryMode: MemoryMode): Unit = {
795795
// if UMM lock is already held, then release inline else enqueue and be done with it
796-
if (Thread.holdsLock(this) || !pendingStorageMemoryReleases.offer(
797-
(objectName, numBytes, memoryMode), 15, TimeUnit.SECONDS)) {
798-
synchronized(releaseStorageMemoryForObject_(objectName, numBytes, memoryMode))
796+
if (Thread.holdsLock(this)) synchronized {
797+
releaseStorageMemoryForObject_(objectName, numBytes, memoryMode)
798+
} else {
799+
pendingStorageMemoryReleases.put((objectName, numBytes, memoryMode))
799800
}
800801
}
801802

cluster/src/test/scala/org/apache/spark/sql/execution/SnappyTableMutableAPISuite.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -824,7 +824,8 @@ class SnappyTableMutableAPISuite extends SnappyFunSuite with Logging with Before
824824
val message = intercept[AnalysisException] {
825825
df2.write.deleteFrom("col_table")
826826
}.getMessage
827-
assert(message.contains("column `pk3` cannot be resolved on the right side of the operation."))
827+
assert(message.contains("column `pk3` cannot be resolved on the right side of the operation.")
828+
|| message.contains("WHERE clause of the DELETE FROM statement must have all the key"))
828829
}
829830

830831
test("Bug - SNAP-2157") {
@@ -908,7 +909,8 @@ class SnappyTableMutableAPISuite extends SnappyFunSuite with Logging with Before
908909
}.getMessage
909910

910911
assert(message.contains("DeleteFrom operation requires " +
911-
"key columns(s) or primary key defined on table."))
912+
"key columns(s) or primary key defined on table.") ||
913+
message.contains("WHERE clause of the DELETE FROM statement must have all the key"))
912914
}
913915

914916

@@ -930,7 +932,8 @@ class SnappyTableMutableAPISuite extends SnappyFunSuite with Logging with Before
930932
df2.write.deleteFrom("row_table")
931933
}.getMessage
932934

933-
assert(message.contains("column `pk3` cannot be resolved on the right side of the operation."))
935+
assert(message.contains("column `pk3` cannot be resolved on the right side of the operation.")
936+
|| message.contains("WHERE clause of the DELETE FROM statement must have all the key"))
934937
}
935938

936939
test("Delete From SQL using JDBC: row tables") {

compatibilityTests/src/test/scala/org/apache/spark/sql/hive/TestHiveSnappySession.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class TestHiveSnappySession(@transient protected val sc: SparkContext,
7272
sharedState.cacheManager.clearCache()
7373
loadedTables.clear()
7474
sessionCatalog.clearTempTables()
75-
sessionCatalog.externalCatalog.invalidateAll()
75+
sessionCatalog.invalidateAll()
7676

7777
FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)).
7878
foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }

core/src/main/scala/io/snappydata/Literals.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,9 @@ object Property extends Enumeration {
168168
"(value in bytes or k/m/g suffixes for unit, min 1k). Default is 4MB.", Some("4m"))
169169

170170
val ResultPersistenceTimeout: SparkValue[Long] = Val[Long](
171-
s"${Constant.SPARK_PREFIX}sql.ResultPersistenceTimeout",
171+
s"${Constant.SPARK_PREFIX}sql.resultPersistenceTimeout",
172172
s"Maximum duration in seconds for which results larger than ${MaxMemoryResultSize.name}" +
173-
s"are held on disk after which they are cleaned up. Default is 3600s (1h).", Some(3600L))
173+
s"are held on disk after which they are cleaned up. Default is 7200 (2h).", Some(7200L))
174174

175175
val DisableHashJoin: SQLValue[Boolean] = SQLVal[Boolean](
176176
s"${Constant.PROPERTY_PREFIX}sql.disableHashJoin",

core/src/main/scala/org/apache/spark/sql/CachedDataFrame.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ class CachedDataFrame(snappySession: SnappySession, queryExecution: QueryExecuti
415415
} catch {
416416
case t: Throwable
417417
if CachedDataFrame.isConnectorCatalogStaleException(t, snappySession) =>
418-
snappySession.externalCatalog.invalidateAll()
418+
snappySession.sessionCatalog.invalidateAll()
419419
SnappySession.clearAllCache()
420420
val execution =
421421
snappySession.getContextObject[() => QueryExecution](SnappySession.ExecutionKey)
@@ -1005,7 +1005,7 @@ object CachedDataFrame
10051005
} catch {
10061006
case t: Throwable
10071007
if CachedDataFrame.isConnectorCatalogStaleException(t, snappySession) =>
1008-
snappySession.externalCatalog.invalidateAll()
1008+
snappySession.sessionCatalog.invalidateAll()
10091009
SnappySession.clearAllCache()
10101010
if (attempts < retryCount) {
10111011
Thread.sleep(attempts*100)

core/src/main/scala/org/apache/spark/sql/SnappySession.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,12 +245,11 @@ class SnappySession(_sc: SparkContext) extends SparkSession(_sc) {
245245
val relations = plan.collect {
246246
case _: Command => hasCommand = true; null
247247
case u: UnresolvedRelation =>
248-
val tableIdent = sessionCatalog.resolveTableIdentifier(u.tableIdentifier)
249-
tableIdent.database.get -> tableIdent.table
248+
sessionCatalog.resolveTableIdentifier(u.tableIdentifier)
250249
}
251-
if (hasCommand) externalCatalog.invalidateAll()
250+
if (hasCommand) sessionCatalog.invalidateAll()
252251
else if (relations.nonEmpty) {
253-
relations.foreach(externalCatalog.invalidate)
252+
relations.foreach(sessionCatalog.invalidate(_))
254253
}
255254
throw e
256255
case _ =>
@@ -332,6 +331,7 @@ class SnappySession(_sc: SparkContext) extends SparkSession(_sc) {
332331
@transient
333332
private var sqlWarnings: SQLWarning = _
334333

334+
private[sql] var catalogInitialized: Boolean = _
335335
private[sql] var hiveInitializing: Boolean = _
336336

337337
private[sql] def isHiveSupportEnabled(v: String): Boolean = Utils.toLowerCase(v) match {
@@ -1700,6 +1700,7 @@ class SnappySession(_sc: SparkContext) extends SparkSession(_sc) {
17001700
plan match {
17011701
case LogicalRelation(rls: RowLevelSecurityRelation, _, _) =>
17021702
rls.enableOrDisableRowLevelSecurity(tableIdent, enableRls)
1703+
sessionCatalog.invalidate(tableIdent)
17031704
externalCatalog.invalidateCaches(tableIdent.database.get -> tableIdent.table :: Nil)
17041705
case _ =>
17051706
throw new AnalysisException("ALTER TABLE enable/disable Row Level Security " +

core/src/main/scala/org/apache/spark/sql/execution/CodegenSparkFallback.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ case class CodegenSparkFallback(var child: SparkPlan,
110110
result
111111
} catch {
112112
case t: Throwable if CachedDataFrame.isConnectorCatalogStaleException(t, session) =>
113-
session.externalCatalog.invalidateAll()
113+
session.sessionCatalog.invalidateAll()
114114
SnappySession.clearAllCache()
115115
throw CachedDataFrame.catalogStaleFailure(t, session)
116116
} finally {
@@ -125,7 +125,7 @@ case class CodegenSparkFallback(var child: SparkPlan,
125125
}
126126

127127
private def handleStaleCatalogException[T](f: SparkPlan => T, plan: SparkPlan, t: Throwable) = {
128-
session.externalCatalog.invalidateAll()
128+
session.sessionCatalog.invalidateAll()
129129
SnappySession.clearAllCache()
130130
// fail immediate for insert/update/delete, else retry entire query
131131
val action = plan.find {

0 commit comments

Comments
 (0)