Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
<maven.compiler.source>1.6</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<spark.version>2.2.0</spark.version>
<scala.version>2.12.0</scala.version>
<scala.compat.version>2.12</scala.compat.version>
<spark.version>3.0.1</spark.version>
</properties>

<profiles>
Expand All @@ -42,6 +42,12 @@
<spark.version>2.2.0</spark.version>
</properties>
</profile>
<profile>
<id>spark-3.0.1</id>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the 2.X profile still work or should we remove them?

<properties>
<spark.version>3.0.1</spark.version>
</properties>
</profile>
</profiles>


Expand All @@ -53,19 +59,19 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_2.11</artifactId>
<version>3.2.11</version>
<artifactId>json4s-jackson_2.12</artifactId>
<version>3.5.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.crail</groupId>
<artifactId>crail-client</artifactId>
<version>1.2-incubating-SNAPSHOT</version>
<version>1.3-incubating-SNAPSHOT</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we depend on a non-snapshot? Or are there any changes that require latest and greatest (since we do not push the SNAPSHOT to maven central this would require crail source to compile)

</dependency>

<!-- Test -->
Expand All @@ -78,13 +84,13 @@
<dependency>
<groupId>org.specs2</groupId>
<artifactId>specs2-core_${scala.compat.version}</artifactId>
<version>2.4.16</version>
<version>2.4.17</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.compat.version}</artifactId>
<version>2.2.4</version>
<version>3.2.2</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.common._
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.serializer.CrailSerializer
import org.apache.spark.shuffle._
import org.apache.spark.storage.{CrailDispatcher, ShuffleBlockId}
import org.apache.spark.storage.{BlockId, CrailDispatcher, ShuffleBlockId}
import org.apache.spark.util.Utils


Expand All @@ -47,12 +47,10 @@ private[spark] class CrailShuffleManager(conf: SparkConf) extends ShuffleManager
/* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
CrailDispatcher.get.registerShuffle(shuffleId, numMaps, dependency.partitioner.numPartitions)
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}

CrailDispatcher.get.registerShuffle(shuffleId, dependency.partitioner.numPartitions)
new BaseShuffleHandle(shuffleId, dependency)
}
/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
Expand All @@ -61,21 +59,20 @@ private[spark] class CrailShuffleManager(conf: SparkConf) extends ShuffleManager
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C] = {
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K,C] = {
if (intialized.compareAndSet(false, true)){
logInfo("loading shuffler sorter " + shuffleSorterClass)
}
new CrailShuffleReader(handle.asInstanceOf[BaseShuffleHandle[K, _, C]],
startPartition, endPartition, context, shuffleSorter)
}

/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
override def getWriter[K, V](handle: ShuffleHandle, mapId: Long, context: TaskContext, metrics: ShuffleWriteMetricsReporter)
: ShuffleWriter[K, V] = {
new CrailShuffleWriter(shuffleBlockResolver, handle.asInstanceOf[BaseShuffleHandle[K, V, _]],
mapId, context)
}

/** Remove a shuffle's metadata from the ShuffleManager. */
override def unregisterShuffle(shuffleId: Int): Boolean = {
CrailDispatcher.get.unregisterShuffle(shuffleId)
Expand All @@ -91,19 +88,36 @@ private[spark] class CrailShuffleManager(conf: SparkConf) extends ShuffleManager
logInfo("shutting down crail shuffle manager")
CrailDispatcher.put
}



override def getReaderForRange[K, C](
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclear how to specify a range for CrailShuffleReader.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify?

handle: org.apache.spark.shuffle.ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: org.apache.spark.TaskContext,
metrics: org.apache.spark.shuffle.ShuffleReadMetricsReporter): org.apache.spark.shuffle.ShuffleReader[K,C] = {
if (intialized.compareAndSet(false, true)){
logInfo("loading shuffler sorter " + shuffleSorterClass)
}
new CrailShuffleReader(handle.asInstanceOf[BaseShuffleHandle[K, _, C]],
startPartition, endPartition, context, shuffleSorter)
}

}


private object CrailShuffleManager extends Logging {

}

class CrailShuffleBlockResolver (conf: SparkConf)
extends ShuffleBlockResolver with Logging {

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
override def getBlockData(blockId: BlockId, dirs: Option[Array[String]]): ManagedBuffer = {
null
}

override def stop() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class CrailShuffleReader[K, C](

/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
val multiStream = CrailDispatcher.get.getMultiStream(handle.shuffleId, startPartition, handle.numMaps)
val multiStream = CrailDispatcher.get.getMultiStream(handle.shuffleId, startPartition)
val deserializationStream = serializerInstance.deserializeCrailStream(multiStream)
dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.storage._
class CrailShuffleWriter[K, V](
shuffleBlockManager: CrailShuffleBlockResolver,
handle: BaseShuffleHandle[K, V, _],
mapId: Int,
mapId: Long,
context: TaskContext)
extends ShuffleWriter[K, V] with Logging {

Expand Down Expand Up @@ -82,7 +82,7 @@ class CrailShuffleWriter[K, V](
initRatio = runTime/initTime
overhead = 100/initRatio
logInfo("shuffler writer: initTime " + initTime + ", runTime " + runTime + ", initRatio " + initRatio + ", overhead " + overhead)
return Some(MapStatus(blockManager.shuffleServerId, sizes))
return Some(MapStatus(blockManager.shuffleServerId, sizes, context.taskAttemptId()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclear what a good value would be. It also works with a constant value.

} else {
return None
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/org/apache/spark/storage/CrailDispatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ class CrailDispatcher () extends Logging {
//---------------------------------------

/* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
def registerShuffle(shuffleId: Int, numMaps: Int, partitions: Int) : Unit = {
def registerShuffle(shuffleId: Int, partitions: Int) : Unit = {
//logInfo("registering shuffle " + shuffleId + ", time " + ", cacheSize " + fs.getCacheSize)
val shuffleStore = new CrailShuffleStore
val oldStore = shuffleCache.putIfAbsent(shuffleId, shuffleStore)
Expand Down Expand Up @@ -501,7 +501,7 @@ class CrailDispatcher () extends Logging {
streamGroupCloseStats.incrementAndGet()
}

def getMultiStream(shuffleId: Int, reduceId: Int, numMaps:Int) : CrailBufferedInputStream = {
def getMultiStream(shuffleId: Int, reduceId: Int) : CrailBufferedInputStream = {
if (debug){
//request by map task, if first (still in reduce state) then print reduce stats
isMap.synchronized(
Expand Down