Skip to content

Commit 97078ae

Browse files
author
Sumedh Wale
committed
Use disk to store and transport large results
- push results from CachedDataFrame to disk in case the result exceeds a limit (4MB by default) - to reduce number of disk files keep appending to the same file till a higher limit of 8X the initial size (i.e. 32MB by default) - use a separately generated broadcast ID to generate BroadBlockIds used for registering these disk blocks against Spark's BlockManager; the broadcast ID starts at Long.Max/2 to avoid any overlaps with the real broadcasted blocks - each compressed block (4MB size by default) is separate one having its size at the start so that the reader can separately decompress those blocks independently since there is no clean way to switch streams from underneath a compressed stream or append to existing compressed data - cleanup is done in four places: a) after end reader consumes a disk block, b) in case there is an exception while generating the results, c) registered with SnappySession to be cleared with session clear() and in its finalizer, d) in case there is a top-level failure in CachedDataFrame.collectWithHandler execution - increase the default stack depth shown in UI to 50 from 20 - refactored and cleaned up CompressionUtils - updated the docs to correct the defaults mentioned for storageMaxFraction and other related properties, and expanded the help text on those properties a bit - updated the docs about VSD and where to find it
1 parent 2abaae3 commit 97078ae

File tree

19 files changed

+426
-237
lines changed

19 files changed

+426
-237
lines changed

cluster/src/main/scala/io/snappydata/gemxd/SparkSQLExecuteImpl.scala

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ import io.snappydata.{Constant, Property, QueryHint}
4444
import org.apache.spark.serializer.{KryoSerializerPool, StructTypeSerializer}
4545
import org.apache.spark.sql._
4646
import org.apache.spark.sql.catalyst.expressions
47+
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
4748
import org.apache.spark.sql.catalyst.util.DateTimeUtils
48-
import org.apache.spark.sql.collection.Utils
49+
import org.apache.spark.sql.collection.{GenerateFlatIterator, Utils}
4950
import org.apache.spark.sql.types._
5051
import org.apache.spark.storage.RDDBlockId
5152
import org.apache.spark.util.SnappyUtils
@@ -129,13 +130,12 @@ class SparkSQLExecuteImpl(
129130
// shipping/compilation etc and lack of proper BlockManager usage in
130131
// messaging + server-side final processing, so do it selectively)
131132
val partitionBlocks = df match {
132-
case cdf: CachedDataFrame => cdf.collectWithHandler(CachedDataFrame,
133-
CachedDataFrame.localBlockStoreResultHandler(rddId, bm),
133+
case cdf: CachedDataFrame => cdf.collectWithHandler(CachedDataFrame(session),
134+
CachedDataFrame.localBlockStoreResultHandler(rddId, bm, session),
134135
CachedDataFrame.localBlockStoreDecoder(querySchema.length, bm))
135-
case dataFrame: DataFrame => {
136+
case dataFrame: DataFrame =>
136137
Iterator(CachedDataFrame(null,
137-
dataFrame.queryExecution.executedPlan.executeCollect().iterator)._1)
138-
}
138+
dataFrame.queryExecution.executedPlan.executeCollect().iterator, -1L)._1)
139139
}
140140
hdos.clearForReuse()
141141
SparkSQLExecuteImpl.writeMetaData(srh, hdos, tableNames, nullability, getColumnNames,
@@ -146,12 +146,14 @@ class SparkSQLExecuteImpl(
146146
block match {
147147
case null => // skip but still id has to be incremented
148148
case data: Array[Byte] => if (data.length > 0) {
149+
hdos.writeInt(data.length)
149150
hdos.write(data)
150151
}
151152
case p: RDDBlockId =>
152153
val partitionData = Utils.getPartitionData(p, bm)
153154
// remove the block once a local handle to it has been obtained
154155
bm.removeBlock(p, tellMaster = false)
156+
hdos.writeInt(partitionData.remaining())
155157
hdos.write(partitionData)
156158
}
157159
logTrace(s"Writing data for partition ID = $id: $block")
@@ -408,8 +410,14 @@ object SparkSQLExecuteImpl {
408410
}
409411
val execRow = new ValueRow(dvds)
410412
val numFields = types.length
411-
val unsafeRows = CachedDataFrame.decodeUnsafeRows(numFields,
412-
input.array(), input.position(), input.available())
413+
val unsafeRows = new GenerateFlatIterator[UnsafeRow, ByteArrayDataInput](input => {
414+
if (input.available() > 0) {
415+
val size = input.readInt()
416+
val pos = input.position()
417+
input.setPosition(pos + size)
418+
(CachedDataFrame.decodeUnsafeRows(numFields, input.array(), pos, size), input)
419+
} else (GenerateFlatIterator.TERMINATE, input)
420+
}, input)
413421
unsafeRows.map { row =>
414422
var index = 0
415423
var refTypeIndex = 0

cluster/src/main/scala/io/snappydata/remote/interpreter/SnappyInterpreterExecute.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ object SnappyInterpreterExecute {
224224
"scala code execution", "", "ComputeDB", "Cluster")
225225
}
226226
}
227-
val id: Long = session.getUniqueIdForExecScala()
227+
val id: Long = session.getUniqueIdForExecScala
228228
val intpHelper = SnappyInterpreterExecute.getOrCreateStateHolder(id, user, authToken, group)
229229
try {
230230
intpHelper.interpret(code.split("\n"), options) match {

cluster/src/main/scala/org/apache/spark/executor/SnappyExecutor.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ import java.util.concurrent.ThreadFactory
2222
import java.util.concurrent.atomic.AtomicInteger
2323

2424
import scala.collection.mutable
25+
2526
import com.gemstone.gemfire.internal.tcp.ConnectionTable
2627
import com.gemstone.gemfire.{CancelException, SystemFailure}
2728
import com.google.common.cache.{CacheBuilder, CacheLoader}
2829
import com.pivotal.gemfirexd.internal.engine.Misc
2930
import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils
31+
3032
import org.apache.spark.deploy.SparkHadoopUtil
3133
import org.apache.spark.serializer.KryoSerializerPool
3234
import org.apache.spark.sql.internal.ContextJarUtils
@@ -42,7 +44,6 @@ class SnappyExecutor(
4244
isLocal: Boolean = false)
4345
extends Executor(executorId, executorHostname, env, userClassPath, isLocal) {
4446

45-
4647
{
4748
// set a thread-factory for the thread pool for cleanup
4849
val threadGroup: ThreadGroup = Thread.currentThread().getThreadGroup
@@ -76,12 +77,12 @@ class SnappyExecutor(
7677
Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
7778
}
7879

79-
private val classLoaderCache = {
80+
private[this] val classLoaderCache = {
8081
val loader = new CacheLoader[ClassLoaderKey, ClassLoader]() {
8182
override def load(key: ClassLoaderKey): ClassLoader = {
8283
if (key.isReplPath) return mutableLoaderWithRepl(key)
8384
logInfo(s"Creating ClassLoader for key = $key" +
84-
s" with appTime = ${key.appTime} and appName = ${key.appName}")
85+
s" with appTime = ${key.appTime} and appName = ${key.appName}")
8586
val appName = key.appName // appName = "schemaname.functionname"
8687
val appNameAndJars = key.appNameAndJars
8788
lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
@@ -134,11 +135,10 @@ class SnappyExecutor(
134135
private def mutableLoaderWithRepl(key: ClassLoaderKey): ClassLoader = {
135136
try {
136137
val klass = Utils.classForName("org.apache.spark.repl.SnappyExecutorClassLoader")
137-
.asInstanceOf[Class[_ <: ClassLoader]]
138+
.asInstanceOf[Class[_ <: ClassLoader]]
138139
val constructor = klass.getConstructor(classOf[SparkConf], classOf[SparkEnv],
139140
classOf[String], classOf[ClassLoader], classOf[Boolean])
140-
val cl = constructor.newInstance(conf, env, key.appName, replClassLoader, java.lang.Boolean.TRUE)
141-
cl
141+
constructor.newInstance(conf, env, key.appName, replClassLoader, java.lang.Boolean.TRUE)
142142
} catch {
143143
case _: ClassNotFoundException =>
144144
logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")
@@ -148,7 +148,7 @@ class SnappyExecutor(
148148
}
149149
}
150150
// Keeping 500 as cache size. Can revisit the number
151-
CacheBuilder.newBuilder().maximumSize(500).build(loader)
151+
CacheBuilder.newBuilder().maximumSize(500).build[ClassLoaderKey, ClassLoader](loader)
152152
}
153153

154154
class ClassLoaderKey(val appName: String,
@@ -170,7 +170,8 @@ class SnappyExecutor(
170170
}
171171

172172
var userClasspathFirst: Boolean = false
173-
def setUserClassPathFirst(userClassPathFirst: Boolean) = {
173+
174+
def setUserClassPathFirst(userClassPathFirst: Boolean): Unit = {
174175
this.userClasspathFirst = userClassPathFirst
175176
}
176177
}
@@ -270,10 +271,10 @@ class SnappyExecutor(
270271
Utils.getLocalDir(conf)
271272
}
272273

273-
def invalidateReplLoader(replDir: String) = try {
274+
def invalidateReplLoader(replDir: String): Unit = try {
274275
classLoaderCache.invalidate(replDir)
275276
} catch {
276-
case npe: NullPointerException => // ignore
277+
case _: NullPointerException => // ignore
277278
}
278279
}
279280

@@ -305,7 +306,7 @@ private class SnappyUncaughtExceptionHandler(
305306
val executorBackend: SnappyCoarseGrainedExecutorBackend)
306307
extends Thread.UncaughtExceptionHandler with Logging {
307308

308-
override def uncaughtException(thread: Thread, exception: Throwable) {
309+
override def uncaughtException(thread: Thread, exception: Throwable): Unit = {
309310
try {
310311
// Make it explicit that uncaught exceptions are thrown when container is shutting down.
311312
// It will help users when they analyze the executor logs

cluster/src/main/scala/org/apache/spark/memory/SnappyUnifiedMemoryManager.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -793,10 +793,9 @@ class SnappyUnifiedMemoryManager private[memory](
793793
numBytes: Long,
794794
memoryMode: MemoryMode): Unit = {
795795
// if UMM lock is already held, then release inline else enqueue and be done with it
796-
if (Thread.holdsLock(this)) synchronized {
797-
releaseStorageMemoryForObject_(objectName, numBytes, memoryMode)
798-
} else {
799-
pendingStorageMemoryReleases.put((objectName, numBytes, memoryMode))
796+
if (Thread.holdsLock(this) || !pendingStorageMemoryReleases.offer(
797+
(objectName, numBytes, memoryMode), 15, TimeUnit.SECONDS)) {
798+
synchronized(releaseStorageMemoryForObject_(objectName, numBytes, memoryMode))
800799
}
801800
}
802801

@@ -962,13 +961,14 @@ object SnappyUnifiedMemoryManager extends Logging {
962961
val clazz = Utils.classForName(
963962
"com.gemstone.gemfire.internal.cache.store.ManagedDirectBufferAllocator")
964963
clazz.getDeclaredMethod("instance").invoke(null)
964+
size
965965
} catch {
966966
case NonFatal(e) =>
967967
logError("Failed to load managed buffer allocator in SnappyData OSS." +
968968
s"Temporary scan buffers will be unaccounted DirectByteBuffers: $e")
969+
0
969970
}
970-
}
971-
size
971+
} else 0
972972
}
973973
if (memorySize > 0) {
974974
// set Spark's off-heap properties

cluster/src/test/scala/org/apache/spark/memory/SnappyMemoryAccountingSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ class SnappyMemoryAccountingSuite extends MemoryFunSuite {
615615
val taskContext =
616616
new TaskContextImpl(0, 0, taskAttemptId = 1, 0, taskMemoryManager, new Properties, null)
617617
try {
618-
CachedDataFrame(taskContext, Seq(unsafeRow).iterator)
618+
CachedDataFrame(taskContext, Seq(unsafeRow).iterator, -1L)
619619
assert(false , "Should not have obtained memory")
620620
} catch {
621621
case lme : LowMemoryException => // Success

core/src/main/scala/io/snappydata/Literals.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,17 @@ object Property extends Enumeration {
161161
"configuration level or as system property (latter on all the servers) and cannot be " +
162162
"changed at the session level. Default is 0.2", Some(0.2))
163163

164+
val MaxMemoryResultSize: SparkValue[String] = Val[String](
165+
s"${Constant.SPARK_PREFIX}sql.maxMemoryResultSize",
166+
"Maximum size of results from a JDBC/ODBC query in a partition that will be held " +
167+
"in memory beyond which the results will be written to disk " +
168+
"(value in bytes or k/m/g suffixes for unit, min 1k). Default is 4MB.", Some("4m"))
169+
170+
val ResultPersistenceTimeout: SparkValue[Long] = Val[Long](
171+
s"${Constant.SPARK_PREFIX}sql.ResultPersistenceTimeout",
172+
s"Maximum duration in seconds for which results larger than ${MaxMemoryResultSize.name}" +
173+
s"are held on disk after which they are cleaned up. Default is 3600s (1h).", Some(3600L))
174+
164175
val DisableHashJoin: SQLValue[Boolean] = SQLVal[Boolean](
165176
s"${Constant.PROPERTY_PREFIX}sql.disableHashJoin",
166177
"Disable hash joins completely including those for replicated row tables. Default is false.",

core/src/main/scala/io/snappydata/util/ServiceUtils.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ object ServiceUtils {
9797
for ((hiveVar, dirName) <- HiveClientUtil.HIVE_DEFAULT_SETTINGS) {
9898
sysProps.putIfAbsent(hiveVar.varname, dirName)
9999
}
100+
// increase call stack depth
101+
sysProps.putIfAbsent("spark.callstack.depth", "50")
100102
}
101103
// set default member-timeout higher for GC pauses (SNAP-1777)
102104
storeProps.putIfAbsent(DistributionConfig.MEMBER_TIMEOUT_NAME, "30000")

0 commit comments

Comments
 (0)