Skip to content

Commit 2498104

Browse files
committed
align everything use from_spark_recoverable
1 parent 3ef3b92 commit 2498104

File tree

11 files changed

+124
-818
lines changed

11 files changed

+124
-818
lines changed

README.md

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -153,32 +153,25 @@ Please refer to [NYC Taxi PyTorch Estimator](./examples/pytorch_nyctaxi.py) and
153153

154154
***Fault Tolerance***
155155

156-
The ray dataset converted from spark dataframe like above is not fault-tolerant. This is because we implement it using `Ray.put` combined with spark `mapPartitions`. Objects created by `Ray.put` is not recoverable in Ray.
156+
RayDP now converts Spark DataFrames to Ray Datasets using a recoverable pipeline by default. This makes the resulting Ray Dataset resilient to Spark executor loss (the Arrow IPC bytes are cached in Spark and fetched via Ray tasks with lineage).
157+
158+
The recoverable conversion is also available directly via `raydp.spark.from_spark_recoverable`, and it persists (caches) the Spark DataFrame. You can provide the storage level through the `storage_level` keyword parameter.
157159

158-
RayDP now supports converting data in a way such that the resulting ray dataset is fault-tolerant. This feature is currently *experimental*. Here is how to use it:
159160
```python
160161
import ray
161162
import raydp
162163

163-
# Fault tolerance requires cross language support:
164-
# https://docs.ray.io/en/latest/ray-core/cross-language.html
165-
# set job_config to trigger load-code-from-local
166-
ray.init(address="auto",
167-
job_config=JobConfig(code_search_path=[os.getcwd()]))
168-
# set fault_tolerance_mode to True to enable the feature
169-
# this will connect pyspark driver to ray cluster
164+
ray.init(address="auto")
170165
spark = raydp.init_spark(app_name="RayDP Example",
171166
num_executors=2,
172167
executor_cores=2,
173-
executor_memory="4GB",
174-
fault_tolerance_mode=True)
175-
# df should be large enough so that result will be put into plasma
168+
executor_memory="4GB")
169+
176170
df = spark.range(100000)
177-
# use this API instead of ray.data.from_spark
178-
ds = raydp.spark.from_spark_recoverable(df)
179-
# ds is now fault-tolerant.
171+
ds = raydp.spark.from_spark_recoverable(df) # fault-tolerant
180172
```
181-
Notice that `from_spark_recoverable` will persist the converted dataframe. You can provide the storage level through keyword parameter `storage_level`. In addition, this feature is not available in ray client mode. If you need to use ray client, please wrap your application in a ray actor, as described in the ray client chapter.
173+
174+
Note: recoverable conversion is not available in Ray client mode. If you need to use Ray client, wrap your application in a Ray actor as described in the Ray client docs.
182175

183176

184177
## Getting Involved

core/raydp-main/src/main/scala/org/apache/spark/executor/RayDPExecutor.scala

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.executor
2020
import java.io.{ByteArrayOutputStream, File}
2121
import java.nio.channels.Channels
2222
import java.nio.file.Paths
23-
import java.util.concurrent.ConcurrentHashMap
2423
import java.util.concurrent.atomic.AtomicBoolean
2524

2625
import scala.reflect.classTag
@@ -269,22 +268,6 @@ class RayDPExecutor(
269268
Ray.exitActor
270269
}
271270

272-
/**
273-
* Pop (remove and return) a previously stored Arrow IPC stream by key.
274-
*
275-
* This method is intended to be called from a Python "owner/registry" actor via Ray
276-
* cross-language actor calls. Since the Python actor is the caller, Ray will assign
277-
* ownership of the returned object to that Python actor.
278-
*/
279-
def popArrowIPC(batchKey: String): Array[Byte] = {
280-
val bytes = RayDPExecutor.popArrowIPC(batchKey)
281-
if (bytes == null) {
282-
throw new RayDPException(
283-
s"Missing Arrow IPC bytes for batchKey=$batchKey on executorId=$executorId.")
284-
}
285-
bytes
286-
}
287-
288271
def getBlockLocations(rddId: Int, numPartitions: Int): Array[String] = {
289272
val env = SparkEnv.get
290273
val blockIds = (0 until numPartitions).map(i =>
@@ -367,17 +350,3 @@ class RayDPExecutor(
367350
result
368351
}
369352
}
370-
371-
object RayDPExecutor {
372-
// Per-executor in-memory buffer for Arrow IPC streams produced by Spark tasks.
373-
// Stored in the executor (Ray actor) process; entries are removed by popArrowIPC.
374-
private val arrowIpcByKey = new ConcurrentHashMap[String, Array[Byte]]()
375-
376-
def putArrowIPC(batchKey: String, bytes: Array[Byte]): Unit = {
377-
arrowIpcByKey.put(batchKey, bytes)
378-
}
379-
380-
def popArrowIPC(batchKey: String): Array[Byte] = {
381-
arrowIpcByKey.remove(batchKey)
382-
}
383-
}

core/raydp-main/src/main/scala/org/apache/spark/sql/raydp/ObjectStoreWriter.scala

Lines changed: 1 addition & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -20,162 +20,26 @@ package org.apache.spark.sql.raydp
2020
import com.intel.raydp.shims.SparkShimLoader
2121
import io.ray.api.{ActorHandle, ObjectRef, Ray}
2222
import io.ray.runtime.AbstractRayRuntime
23-
import java.io.ByteArrayOutputStream
2423
import java.util.{List, UUID}
2524
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
2625
import java.util.function.{Function => JFunction}
27-
import org.apache.arrow.vector.VectorSchemaRoot
28-
import org.apache.arrow.vector.ipc.ArrowStreamWriter
2926
import org.apache.arrow.vector.types.pojo.Schema
3027
import scala.collection.JavaConverters._
3128
import scala.collection.mutable
32-
import scala.collection.mutable.ArrayBuffer
3329

34-
import org.apache.spark.{RayDPException, SparkContext, SparkEnv}
30+
import org.apache.spark.{RayDPException, SparkContext}
3531
import org.apache.spark.deploy.raydp._
3632
import org.apache.spark.executor.RayDPExecutor
3733
import org.apache.spark.network.util.JavaUtils
3834
import org.apache.spark.raydp.{RayDPUtils, RayExecutorUtils}
3935
import org.apache.spark.sql.DataFrame
40-
import org.apache.spark.sql.execution.arrow.ArrowWriter
41-
import org.apache.spark.sql.execution.python.BatchIterator
4236
import org.apache.spark.sql.internal.SQLConf
43-
import org.apache.spark.sql.util.ArrowUtils
4437
import org.apache.spark.storage.StorageLevel
45-
import org.apache.spark.util.Utils
46-
47-
/**
48-
* A batch of record that has been wrote into Ray object store.
49-
* @param ownerAddress the owner address of the ray worker
50-
* @param objectId the ObjectId for the stored data
51-
* @param numRecords the number of records for the stored data
52-
*/
53-
case class RecordBatch(
54-
ownerAddress: Array[Byte],
55-
objectId: Array[Byte],
56-
numRecords: Int)
5738

5839
class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
5940

6041
val uuid: UUID = ObjectStoreWriter.dfToId.getOrElseUpdate(df, UUID.randomUUID())
6142

62-
def writeToRay(
63-
data: Array[Byte],
64-
numRecords: Int,
65-
ownerName: String): RecordBatch = {
66-
67-
// Single-owner implementation:
68-
// - Spark executor JVM actor produces Arrow IPC bytes (data).
69-
// - Bytes are buffered inside the executor actor process keyed by batchKey.
70-
// - JVM returns (executorActorName, batchKey) to Python.
71-
// - A Python single owner actor later calls executorActor.popArrowIPC(batchKey),
72-
// decodes into pyarrow.Table and becomes the Ray owner of the resulting Dataset blocks.
73-
//
74-
// We keep ownerName non-empty for API consistency and to avoid accidental usage without
75-
// a dedicated owner actor on the Python side.
76-
if (ownerName == null || ownerName.isEmpty) {
77-
throw new RayDPException("ownerName must be set for Spark->Ray conversion.")
78-
}
79-
80-
val executorId = SparkEnv.get.executorId
81-
val executorActorName = s"raydp-executor-${executorId}"
82-
val batchKey = UUID.randomUUID().toString
83-
84-
// Buffer bytes in the executor actor process. The Python owner actor will pop them via
85-
// cross-language actor call later.
86-
RayDPExecutor.putArrowIPC(batchKey, data)
87-
88-
// RecordBatch payload:
89-
// - ownerAddress encodes the RayDPExecutor actor name (UTF-8)
90-
// - objectId encodes the batch key (UTF-8)
91-
RecordBatch(executorActorName.getBytes("UTF-8"), batchKey.getBytes("UTF-8"), numRecords)
92-
}
93-
94-
/**
95-
* Save the DataFrame to Ray object store with Apache Arrow format.
96-
*/
97-
def save(useBatch: Boolean, ownerName: String): List[RecordBatch] = {
98-
val conf = df.queryExecution.sparkSession.sessionState.conf
99-
val timeZoneId = conf.getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
100-
var batchSize = conf.getConf(SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH)
101-
if (!useBatch) {
102-
batchSize = 0
103-
}
104-
val schema = df.schema
105-
106-
val objectIds = df.queryExecution.toRdd.mapPartitions{ iter =>
107-
// DO NOT use iter.grouped(). See BatchIterator.
108-
val batchIter = if (batchSize > 0) {
109-
new BatchIterator(iter, batchSize)
110-
} else {
111-
Iterator(iter)
112-
}
113-
114-
val arrowSchema = SparkShimLoader.getSparkShims.toArrowSchema(schema, timeZoneId)
115-
val allocator = ArrowUtils.rootAllocator.newChildAllocator(
116-
s"ray object store writer", 0, Long.MaxValue)
117-
val root = VectorSchemaRoot.create(arrowSchema, allocator)
118-
val results = new ArrayBuffer[RecordBatch]()
119-
120-
val byteOut = new ByteArrayOutputStream()
121-
val arrowWriter = ArrowWriter.create(root)
122-
var numRecords: Int = 0
123-
124-
Utils.tryWithSafeFinally {
125-
while (batchIter.hasNext) {
126-
// reset the state
127-
numRecords = 0
128-
byteOut.reset()
129-
arrowWriter.reset()
130-
131-
// write out the schema meta data
132-
val writer = new ArrowStreamWriter(root, null, byteOut)
133-
writer.start()
134-
135-
// get the next record batch
136-
val nextBatch = batchIter.next()
137-
138-
while (nextBatch.hasNext) {
139-
numRecords += 1
140-
arrowWriter.write(nextBatch.next())
141-
}
142-
143-
// set the write record count
144-
arrowWriter.finish()
145-
// write out the record batch to the underlying out
146-
writer.writeBatch()
147-
148-
// get the wrote ByteArray and save to Ray ObjectStore
149-
val byteArray = byteOut.toByteArray
150-
results += writeToRay(byteArray, numRecords, ownerName)
151-
// end writes footer to the output stream and doesn't clean any resources.
152-
// It could throw exception if the output stream is closed, so it should be
153-
// in the try block.
154-
writer.end()
155-
}
156-
arrowWriter.reset()
157-
byteOut.close()
158-
} {
159-
// If we close root and allocator in TaskCompletionListener, there could be a race
160-
// condition where the writer thread keeps writing to the VectorSchemaRoot while
161-
// it's being closed by the TaskCompletion listener.
162-
// Closing root and allocator here is cleaner because root and allocator is owned
163-
// by the writer thread and is only visible to the writer thread.
164-
//
165-
// If the writer thread is interrupted by TaskCompletionListener, it should either
166-
// (1) in the try block, in which case it will get an InterruptedException when
167-
// performing io, and goes into the finally block or (2) in the finally block,
168-
// in which case it will ignore the interruption and close the resources.
169-
170-
root.close()
171-
allocator.close()
172-
}
173-
174-
results.toIterator
175-
}.collect()
176-
objectIds.toSeq.asJava
177-
}
178-
17943
/**
18044
* For test.
18145
*/

python/raydp/context.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from typing import Dict, List, Union, Optional
2222

2323
import ray
24+
import ray.util.client as ray_client
2425
from pyspark.sql import SparkSession
2526

2627
from ray.util.placement_group import PlacementGroup
@@ -62,6 +63,8 @@ class _SparkContext(ContextDecorator):
6263
please install the corresponding spark version first, set ENV SPARK_HOME,
6364
configure spark-env.sh HADOOP_CONF_DIR in spark conf, and copy hive-site.xml
6465
and hdfs-site.xml to ${SPARK_HOME}/ conf
66+
:param fault_tolerant_mode: enable recoverable Spark->Ray conversion by default.
67+
Not supported in Ray client mode.
6568
:param placement_group_strategy: RayDP will create a placement group according to the
6669
strategy and the configured resources for executors.
6770
If this parameter is specified, the next two
@@ -181,7 +184,7 @@ def init_spark(app_name: str,
181184
executor_cores: int,
182185
executor_memory: Union[str, int],
183186
enable_hive: bool = False,
184-
fault_tolerant_mode = False,
187+
fault_tolerant_mode = True,
185188
placement_group_strategy: Optional[str] = None,
186189
placement_group: Optional[PlacementGroup] = None,
187190
placement_group_bundle_indexes: Optional[List[int]] = None,
@@ -213,16 +216,8 @@ def init_spark(app_name: str,
213216
# ray has not initialized, init local
214217
ray.init()
215218

216-
if fault_tolerant_mode:
217-
print(
218-
'''
219-
Caution: Fault-tolerant mode is now experimental!
220-
This mode CANNOT be used in ray client mode.
221-
Use raydp.spark.from_spark_recoverable instead of ray.data.from_spark
222-
to make your data recoverable.
223-
The spark dataframe converted this way will be cached.
224-
'''
225-
)
219+
if fault_tolerant_mode and ray_client.ray.is_connected():
220+
raise Exception("fault_tolerant_mode is not supported in Ray client mode.")
226221

227222
with _spark_context_lock:
228223
global _global_spark_context

0 commit comments

Comments
 (0)