diff --git a/pom.xml b/pom.xml index fd0c5f1..22d899a 100644 --- a/pom.xml +++ b/pom.xml @@ -18,9 +18,9 @@ 1.6 1.6 UTF-8 - 2.11.8 - 2.11 - 2.2.0 + 2.12.0 + 2.12 + 3.0.1 @@ -42,6 +42,12 @@ 2.2.0 + + spark-3.0.1 + + 3.0.1 + + @@ -53,19 +59,19 @@ org.apache.spark - spark-core_2.11 + spark-core_2.12 ${spark.version} org.json4s - json4s-jackson_2.11 - 3.2.11 + json4s-jackson_2.12 + 3.5.3 compile org.apache.crail crail-client - 1.2-incubating-SNAPSHOT + 1.3-incubating-SNAPSHOT @@ -78,13 +84,13 @@ org.specs2 specs2-core_${scala.compat.version} - 2.4.16 + 2.4.17 test org.scalatest scalatest_${scala.compat.version} - 2.2.4 + 3.2.2 test diff --git a/src/main/scala/org/apache/spark/shuffle/crail/CrailShuffleManager.scala b/src/main/scala/org/apache/spark/shuffle/crail/CrailShuffleManager.scala index 98b1447..4acd8c1 100644 --- a/src/main/scala/org/apache/spark/shuffle/crail/CrailShuffleManager.scala +++ b/src/main/scala/org/apache/spark/shuffle/crail/CrailShuffleManager.scala @@ -28,7 +28,7 @@ import org.apache.spark.common._ import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.serializer.CrailSerializer import org.apache.spark.shuffle._ -import org.apache.spark.storage.{CrailDispatcher, ShuffleBlockId} +import org.apache.spark.storage.{BlockId, CrailDispatcher, ShuffleBlockId} import org.apache.spark.util.Utils @@ -47,12 +47,10 @@ private[spark] class CrailShuffleManager(conf: SparkConf) extends ShuffleManager /* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */ override def registerShuffle[K, V, C]( shuffleId: Int, - numMaps: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { - CrailDispatcher.get.registerShuffle(shuffleId, numMaps, dependency.partitioner.numPartitions) - new BaseShuffleHandle(shuffleId, numMaps, dependency) - } - + CrailDispatcher.get.registerShuffle(shuffleId, dependency.partitioner.numPartitions) + new BaseShuffleHandle(shuffleId, dependency) + } /** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). * Called on executors by reduce tasks. @@ -61,21 +59,20 @@ private[spark] class CrailShuffleManager(conf: SparkConf) extends ShuffleManager handle: ShuffleHandle, startPartition: Int, endPartition: Int, - context: TaskContext): ShuffleReader[K, C] = { + context: TaskContext, + metrics: ShuffleReadMetricsReporter): ShuffleReader[K,C] = { if (intialized.compareAndSet(false, true)){ logInfo("loading shuffler sorter " + shuffleSorterClass) } new CrailShuffleReader(handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context, shuffleSorter) } - /** Get a writer for a given partition. Called on executors by map tasks. */ - override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) + override def getWriter[K, V](handle: ShuffleHandle, mapId: Long, context: TaskContext, metrics: ShuffleWriteMetricsReporter) : ShuffleWriter[K, V] = { new CrailShuffleWriter(shuffleBlockResolver, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) } - /** Remove a shuffle's metadata from the ShuffleManager. */ override def unregisterShuffle(shuffleId: Int): Boolean = { CrailDispatcher.get.unregisterShuffle(shuffleId) @@ -91,19 +88,36 @@ private[spark] class CrailShuffleManager(conf: SparkConf) extends ShuffleManager logInfo("shutting down crail shuffle manager") CrailDispatcher.put } + + + + override def getReaderForRange[K, C]( + handle: org.apache.spark.shuffle.ShuffleHandle, + startMapIndex: Int, + endMapIndex: Int, + startPartition: Int, + endPartition: Int, + context: org.apache.spark.TaskContext, + metrics: org.apache.spark.shuffle.ShuffleReadMetricsReporter): org.apache.spark.shuffle.ShuffleReader[K,C] = { + if (intialized.compareAndSet(false, true)){ + logInfo("loading shuffler sorter " + shuffleSorterClass) + } + new CrailShuffleReader(handle.asInstanceOf[BaseShuffleHandle[K, _, C]], + startPartition, endPartition, context, shuffleSorter) + } + } + private object CrailShuffleManager extends Logging { } class CrailShuffleBlockResolver (conf: SparkConf) extends ShuffleBlockResolver with Logging { - - override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { + override def getBlockData(blockId: BlockId, dirs: Option[Array[String]]): ManagedBuffer = { null } - override def stop() { } } diff --git a/src/main/scala/org/apache/spark/shuffle/crail/CrailShuffleReader.scala b/src/main/scala/org/apache/spark/shuffle/crail/CrailShuffleReader.scala index 953ec35..4a3ce4b 100644 --- a/src/main/scala/org/apache/spark/shuffle/crail/CrailShuffleReader.scala +++ b/src/main/scala/org/apache/spark/shuffle/crail/CrailShuffleReader.scala @@ -46,7 +46,7 @@ class CrailShuffleReader[K, C]( /** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { - val multiStream = CrailDispatcher.get.getMultiStream(handle.shuffleId, startPartition, handle.numMaps) + val multiStream = CrailDispatcher.get.getMultiStream(handle.shuffleId, startPartition) val deserializationStream = serializerInstance.deserializeCrailStream(multiStream) dep.keyOrdering match { case Some(keyOrd: Ordering[K]) => diff --git a/src/main/scala/org/apache/spark/shuffle/crail/CrailShuffleWriter.scala b/src/main/scala/org/apache/spark/shuffle/crail/CrailShuffleWriter.scala index 19ede0b..ce47d0f 100644 --- a/src/main/scala/org/apache/spark/shuffle/crail/CrailShuffleWriter.scala +++ b/src/main/scala/org/apache/spark/shuffle/crail/CrailShuffleWriter.scala @@ -32,7 +32,7 @@ import org.apache.spark.storage._ class CrailShuffleWriter[K, V]( shuffleBlockManager: CrailShuffleBlockResolver, handle: BaseShuffleHandle[K, V, _], - mapId: Int, + mapId: Long, context: TaskContext) extends ShuffleWriter[K, V] with Logging { @@ -82,7 +82,7 @@ class CrailShuffleWriter[K, V]( initRatio = runTime/initTime overhead = 100/initRatio logInfo("shuffler writer: initTime " + initTime + ", runTime " + runTime + ", initRatio " + initRatio + ", overhead " + overhead) - return Some(MapStatus(blockManager.shuffleServerId, sizes)) + return Some(MapStatus(blockManager.shuffleServerId, sizes, context.taskAttemptId())) } else { return None } diff --git a/src/main/scala/org/apache/spark/storage/CrailDispatcher.scala b/src/main/scala/org/apache/spark/storage/CrailDispatcher.scala index 629b610..c5a21c3 100644 --- a/src/main/scala/org/apache/spark/storage/CrailDispatcher.scala +++ b/src/main/scala/org/apache/spark/storage/CrailDispatcher.scala @@ -420,7 +420,7 @@ class CrailDispatcher () extends Logging { //--------------------------------------- /* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */ - def registerShuffle(shuffleId: Int, numMaps: Int, partitions: Int) : Unit = { + def registerShuffle(shuffleId: Int, partitions: Int) : Unit = { //logInfo("registering shuffle " + shuffleId + ", time " + ", cacheSize " + fs.getCacheSize) val shuffleStore = new CrailShuffleStore val oldStore = shuffleCache.putIfAbsent(shuffleId, shuffleStore) @@ -501,7 +501,7 @@ class CrailDispatcher () extends Logging { streamGroupCloseStats.incrementAndGet() } - def getMultiStream(shuffleId: Int, reduceId: Int, numMaps:Int) : CrailBufferedInputStream = { + def getMultiStream(shuffleId: Int, reduceId: Int) : CrailBufferedInputStream = { if (debug){ //request by map task, if first (still in reduce state) then print reduce stats isMap.synchronized(