Skip to content

Commit 4fbfd7c

Browse files
committed
implement single owner
1 parent d81633a commit 4fbfd7c

File tree

7 files changed

+104
-296
lines changed

7 files changed

+104
-296
lines changed

core/raydp-main/src/main/java/org/apache/spark/raydp/RayExecutorUtils.java

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import io.ray.runtime.object.ObjectRefImpl;
2626
import java.util.List;
2727
import java.util.Map;
28-
import org.apache.spark.executor.PutRDDPartitionToBlockStoreArgs;
2928
import org.apache.spark.executor.RayDPExecutor;
3029

3130
public class RayExecutorUtils {
@@ -87,33 +86,6 @@ public static ObjectRef<byte[]> getRDDPartition(
8786
.remote();
8887
}
8988

90-
public static ObjectRef<Boolean> putRDDPartitionToBlockStoreViaRegistry(
91-
ActorHandle<RayDPExecutor> handle,
92-
int rddId,
93-
int partitionId,
94-
String schema,
95-
String driverAgentUrl,
96-
String registryActorName,
97-
String blockStoreActorName,
98-
String batchKey,
99-
double numCpus,
100-
double memory,
101-
double nodeAffinity) {
102-
PutRDDPartitionToBlockStoreArgs args =
103-
new PutRDDPartitionToBlockStoreArgs(
104-
rddId,
105-
partitionId,
106-
schema,
107-
driverAgentUrl,
108-
registryActorName,
109-
blockStoreActorName,
110-
batchKey,
111-
numCpus,
112-
memory,
113-
nodeAffinity);
114-
return handle.task(RayDPExecutor::putRDDPartitionToBlockStoreViaRegistry, args).remote();
115-
}
116-
11789
public static void exitExecutor(ActorHandle<RayDPExecutor> handle) {
11890
handle.task(RayDPExecutor::stop).remote();
11991
}

core/raydp-main/src/main/java/org/apache/spark/raydp/SparkOnRayConfigs.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,6 @@ public class SparkOnRayConfigs {
1010
public static final String SPARK_MASTER_ACTOR_RESOURCE_PREFIX =
1111
"spark.ray.raydp_spark_master.actor.resource";
1212

13-
public static final String BLOCKSTORE_ACTOR_RESOURCE_CPU =
14-
"spark.ray.raydp_blockstore.actor.resource.CPU";
15-
public static final String BLOCKSTORE_ACTOR_RESOURCE_MEMORY =
16-
"spark.ray.raydp_blockstore.actor.resource.memory";
17-
/**
18-
* Node affinity resource fraction used to pin per-executor BlockStore actors to the executor
19-
* node via the special "node:&lt;ip&gt;" resource. Ray provides 1.0 of this resource per node.
20-
*
21-
* Defaults to 0.001 (allowing up to ~1000 such actors per node).
22-
*/
23-
public static final String BLOCKSTORE_ACTOR_NODE_AFFINITY_RESOURCE =
24-
"spark.ray.raydp_blockstore.actor.resource.node_affinity";
25-
2613
/**
2714
* Extra JVM options for the RayDP AppMaster actor and gateway process.
2815
* This is useful for passing JDK 17+ --add-opens flags.

core/raydp-main/src/main/scala/org/apache/spark/executor/RayDPExecutor.scala

Lines changed: 27 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,13 @@ package org.apache.spark.executor
2020
import java.io.{ByteArrayOutputStream, File}
2121
import java.nio.channels.Channels
2222
import java.nio.file.Paths
23-
import java.util.Optional
23+
import java.util.concurrent.ConcurrentHashMap
2424
import java.util.concurrent.atomic.AtomicBoolean
2525

2626
import scala.reflect.classTag
2727

2828
import com.intel.raydp.shims.SparkShimLoader
29-
import io.ray.api.PyActorHandle
3029
import io.ray.api.Ray
31-
import io.ray.api.call.PyActorTaskCaller
32-
import io.ray.api.function.PyActorMethod
3330
import io.ray.runtime.config.RayConfig
3431
import org.apache.arrow.vector.ipc.{ArrowStreamWriter, WriteChannel}
3532
import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer}
@@ -272,6 +269,22 @@ class RayDPExecutor(
272269
Ray.exitActor
273270
}
274271

272+
/**
273+
* Pop (remove and return) a previously stored Arrow IPC stream by key.
274+
*
275+
* This method is intended to be called from a Python "owner/registry" actor via Ray
276+
* cross-language actor calls. Since the Python actor is the caller, Ray will assign
277+
* ownership of the returned object to that Python actor.
278+
*/
279+
def popArrowIPC(batchKey: String): Array[Byte] = {
280+
val bytes = RayDPExecutor.popArrowIPC(batchKey)
281+
if (bytes == null) {
282+
throw new RayDPException(
283+
s"Missing Arrow IPC bytes for batchKey=$batchKey on executorId=$executorId.")
284+
}
285+
bytes
286+
}
287+
275288
def getBlockLocations(rddId: Int, numPartitions: Int): Array[String] = {
276289
val env = SparkEnv.get
277290
val blockIds = (0 until numPartitions).map(i =>
@@ -353,63 +366,18 @@ class RayDPExecutor(
353366
byteOut.close
354367
result
355368
}
369+
}
356370

357-
/**
358-
* For recoverable Spark->Ray Dataset conversion:
359-
* read cached Arrow IPC bytes from Spark block manager, then push them into a Python BlockStore
360-
* actor created via a Python registry actor.
361-
*/
362-
def putRDDPartitionToBlockStoreViaRegistry(
363-
args: PutRDDPartitionToBlockStoreArgs): Boolean = {
364-
val bytes = getRDDPartition(args.rddId, args.partitionId, args.schemaStr, args.driverAgentUrl)
371+
object RayDPExecutor {
372+
// Per-executor in-memory buffer for Arrow IPC streams produced by Spark tasks.
373+
// Stored in the executor (Ray actor) process; entries are removed by popArrowIPC.
374+
private val arrowIpcByKey = new ConcurrentHashMap[String, Array[Byte]]()
365375

366-
val registryOpt = Ray.getActor(args.registryActorName).asInstanceOf[Optional[AnyRef]]
367-
if (!registryOpt.isPresent) {
368-
throw new RayDPException(s"Registry actor ${args.registryActorName} not found.")
369-
}
370-
val regAny: AnyRef = registryOpt.get()
371-
if (!regAny.isInstanceOf[PyActorHandle]) {
372-
throw new RayDPException(s"Registry actor ${args.registryActorName} is not a Python actor.")
373-
}
374-
val registryHandle = regAny.asInstanceOf[PyActorHandle]
375-
376-
val getActorMethod =
377-
PyActorMethod.of("get_or_create_blockstore_actor", classOf[java.lang.Boolean])
378-
val createArgs: Array[AnyRef] = Array(
379-
args.blockStoreActorName,
380-
nodeIp,
381-
Double.box(args.numCpus),
382-
Double.box(args.memory),
383-
Double.box(args.nodeAffinity)
384-
)
385-
new PyActorTaskCaller(registryHandle, getActorMethod, createArgs).remote().get()
386-
387-
val bsOpt = Ray.getActor(args.blockStoreActorName).asInstanceOf[Optional[AnyRef]]
388-
if (!bsOpt.isPresent) {
389-
throw new RayDPException(s"BlockStore actor ${args.blockStoreActorName} not found.")
390-
}
391-
val bsAny: AnyRef = bsOpt.get()
392-
if (!bsAny.isInstanceOf[PyActorHandle]) {
393-
throw new RayDPException(
394-
s"BlockStore actor ${args.blockStoreActorName} is not a Python actor.")
395-
}
396-
val bsHandle = bsAny.asInstanceOf[PyActorHandle]
376+
def putArrowIPC(batchKey: String, bytes: Array[Byte]): Unit = {
377+
arrowIpcByKey.put(batchKey, bytes)
378+
}
397379

398-
val putMethod = PyActorMethod.of("put_arrow_ipc", classOf[java.lang.Boolean])
399-
val putArgs: Array[AnyRef] = Array(args.batchKey, bytes.asInstanceOf[AnyRef])
400-
new PyActorTaskCaller(bsHandle, putMethod, putArgs).remote().get()
401-
true
380+
def popArrowIPC(batchKey: String): Array[Byte] = {
381+
arrowIpcByKey.remove(batchKey)
402382
}
403383
}
404-
405-
case class PutRDDPartitionToBlockStoreArgs(
406-
rddId: Int,
407-
partitionId: Int,
408-
schemaStr: String,
409-
driverAgentUrl: String,
410-
registryActorName: String,
411-
blockStoreActorName: String,
412-
batchKey: String,
413-
numCpus: Double,
414-
memory: Double,
415-
nodeAffinity: Double) extends Serializable

core/raydp-main/src/main/scala/org/apache/spark/sql/raydp/ObjectStoreWriter.scala

Lines changed: 19 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,9 @@ package org.apache.spark.sql.raydp
1919

2020
import com.intel.raydp.shims.SparkShimLoader
2121
import io.ray.api.{ActorHandle, ObjectRef, Ray}
22-
import io.ray.api.PyActorHandle
23-
import io.ray.api.function.PyActorMethod
2422
import io.ray.runtime.AbstractRayRuntime
25-
import io.ray.runtime.config.RayConfig
2623
import java.io.ByteArrayOutputStream
27-
import java.util.{List, Optional, UUID}
24+
import java.util.{List, UUID}
2825
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
2926
import java.util.function.{Function => JFunction}
3027
import org.apache.arrow.vector.VectorSchemaRoot
@@ -39,7 +36,6 @@ import org.apache.spark.deploy.raydp._
3936
import org.apache.spark.executor.RayDPExecutor
4037
import org.apache.spark.network.util.JavaUtils
4138
import org.apache.spark.raydp.{RayDPUtils, RayExecutorUtils}
42-
import org.apache.spark.raydp.SparkOnRayConfigs
4339
import org.apache.spark.sql.DataFrame
4440
import org.apache.spark.sql.execution.arrow.ArrowWriter
4541
import org.apache.spark.sql.execution.python.BatchIterator
@@ -66,76 +62,33 @@ class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
6662
def writeToRay(
6763
data: Array[Byte],
6864
numRecords: Int,
69-
queue: ObjectRefHolder.Queue,
7065
ownerName: String): RecordBatch = {
7166

72-
// Owner-transfer only implementation:
73-
// - ownerName must always be provided (non-empty) and refer to a Python actor
74-
// implemented RayDPBlockStoreActorRegistry.
75-
// - JVM never creates/handles Ray ObjectRefs for the dataset blocks.
76-
// - JVM returns only a per-batch key encoded in RecordBatch.objectId (bytes),
77-
// and Python will fetch the real ObjectRefs from the owner actor by key.
78-
67+
// Single-owner implementation:
68+
// - Spark executor JVM actor produces Arrow IPC bytes (data).
69+
// - Bytes are buffered inside the executor actor process keyed by batchKey.
70+
// - JVM returns (executorActorName, batchKey) to Python.
71+
// - A Python single owner actor later calls executorActor.popArrowIPC(batchKey),
72+
// decodes into pyarrow.Table and becomes the Ray owner of the resulting Dataset blocks.
73+
//
74+
// We keep ownerName non-empty for API consistency and to avoid accidental usage without
75+
// a dedicated owner actor on the Python side.
7976
if (ownerName == null || ownerName.isEmpty) {
8077
throw new RayDPException("ownerName must be set for Spark->Ray conversion.")
8178
}
8279

83-
val registryActorOptional = Ray.getActor(ownerName).asInstanceOf[Optional[AnyRef]]
84-
if (!registryActorOptional.isPresent) {
85-
throw new RayDPException(s"Blobstore registry actor $ownerName not found.")
86-
}
87-
val registryActorHandle: AnyRef = registryActorOptional.get()
88-
if (!registryActorHandle.isInstanceOf[PyActorHandle]) {
89-
throw new RayDPException(
90-
s"Blobstore registry actor $ownerName is not a Python actor.")
91-
}
92-
93-
val appName = SparkEnv.get.conf.get("spark.app.name", "raydp")
94-
val blockStoreActorName =
95-
ObjectStoreWriter.getBlockStoreActorName(appName, SparkEnv.get.executorId)
96-
val pyHandle = registryActorHandle.asInstanceOf[PyActorHandle]
97-
val getActorMethod = PyActorMethod.of(
98-
"get_or_create_blockstore_actor", classOf[java.lang.Boolean])
99-
100-
// Get config inside to retain backward compatibility since this is a public API.
101-
val nodeIp = RayConfig.create().nodeIp
102-
val cpuOpt =
103-
SparkEnv.get.conf.getOption(SparkOnRayConfigs.BLOCKSTORE_ACTOR_RESOURCE_CPU)
104-
val memOpt =
105-
SparkEnv.get.conf.getOption(SparkOnRayConfigs.BLOCKSTORE_ACTOR_RESOURCE_MEMORY)
106-
val nodeAffinityOpt =
107-
SparkEnv.get.conf.getOption(SparkOnRayConfigs.BLOCKSTORE_ACTOR_NODE_AFFINITY_RESOURCE)
108-
val numCpus = cpuOpt.map(_.toDouble).getOrElse(0.0)
109-
val memory = memOpt.map(ObjectStoreWriter.parseMemoryBytes).getOrElse(0.0)
110-
val nodeAffinity = nodeAffinityOpt.map(_.toDouble).getOrElse(0.001)
111-
112-
pyHandle
113-
.task(
114-
getActorMethod,
115-
blockStoreActorName,
116-
nodeIp,
117-
Double.box(numCpus),
118-
Double.box(memory),
119-
Double.box(nodeAffinity))
120-
.remote()
121-
.get()
122-
val blockStorageActorHandleOpt =
123-
Ray.getActor(blockStoreActorName).asInstanceOf[Optional[PyActorHandle]]
124-
if (!blockStorageActorHandleOpt.isPresent) {
125-
throw new RayDPException(s"Actor $blockStoreActorName not found when putting dataset block.")
126-
}
127-
val blockStorageActorHandle = blockStorageActorHandleOpt.get()
128-
80+
val executorId = SparkEnv.get.executorId
81+
val executorActorName = s"raydp-executor-${executorId}"
12982
val batchKey = UUID.randomUUID().toString
13083

131-
// put_arrow_ipc(batchKey, arrowBytes) -> boolean ack
132-
val putArrowIPCMethod = PyActorMethod.of("put_arrow_ipc", classOf[java.lang.Boolean])
133-
blockStorageActorHandle.task(putArrowIPCMethod, batchKey, data).remote().get()
84+
// Buffer bytes in the executor actor process. The Python owner actor will pop them via
85+
// cross-language actor call later.
86+
RayDPExecutor.putArrowIPC(batchKey, data)
13487

135-
// RecordBatch payload is an application-level locator (not Ray object metadata):
136-
// - ownerAddress encodes the BlockStore actor name (UTF-8)
88+
// RecordBatch payload:
89+
// - ownerAddress encodes the RayDPExecutor actor name (UTF-8)
13790
// - objectId encodes the batch key (UTF-8)
138-
RecordBatch(blockStoreActorName.getBytes("UTF-8"), batchKey.getBytes("UTF-8"), numRecords)
91+
RecordBatch(executorActorName.getBytes("UTF-8"), batchKey.getBytes("UTF-8"), numRecords)
13992
}
14093

14194
/**
@@ -151,8 +104,6 @@ class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
151104
val schema = df.schema
152105

153106
val objectIds = df.queryExecution.toRdd.mapPartitions{ iter =>
154-
val queue = ObjectRefHolder.getQueue(uuid)
155-
156107
// DO NOT use iter.grouped(). See BatchIterator.
157108
val batchIter = if (batchSize > 0) {
158109
new BatchIterator(iter, batchSize)
@@ -196,7 +147,7 @@ class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
196147

197148
// get the wrote ByteArray and save to Ray ObjectStore
198149
val byteArray = byteOut.toByteArray
199-
results += writeToRay(byteArray, numRecords, queue, ownerName)
150+
results += writeToRay(byteArray, numRecords, ownerName)
200151
// end writes footer to the output stream and doesn't clean any resources.
201152
// It could throw exception if the output stream is closed, so it should be
202153
// in the try block.
@@ -267,20 +218,6 @@ object ObjectStoreWriter {
267218
}
268219
}
269220

270-
private def sanitizeActorName(name: String): String = {
271-
if (name == null || name.isEmpty) {
272-
"raydp"
273-
} else {
274-
// Ray named actor names should be reasonably simple; normalize to [A-Za-z0-9_].
275-
name.replaceAll("[^A-Za-z0-9_]", "_")
276-
}
277-
}
278-
279-
private[spark] def getBlockStoreActorName(appName: String, executorId: String): String = {
280-
val safeAppName = sanitizeActorName(appName)
281-
s"${safeAppName}_BLOCKSTORE_${executorId}"
282-
}
283-
284221
def getAddress(): Array[Byte] = {
285222
if (address == null) {
286223
val objectRef = Ray.put(1)

python/raydp/spark/dataset.py

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -150,25 +150,19 @@ def _save_spark_df_to_object_store(df: sql.DataFrame, use_batch: bool = True,
150150
records = object_store_writer.save(use_batch, actor_owner_name)
151151

152152
# JVM returns List[RecordBatch] where:
153-
# - record.ownerAddress() is UTF-8 bytes of the BlockStore actor name
153+
# - record.ownerAddress() is UTF-8 bytes of the Spark executor (RayDPExecutor) actor name
154154
# - record.objectId() is UTF-8 bytes of the batch_key
155155
actor_names = [bytes(record.ownerAddress()).decode("utf-8") for record in records]
156156
batch_keys = [bytes(record.objectId()).decode("utf-8") for record in records]
157157
block_sizes = [record.numRecords() for record in records]
158158

159-
# Group by BlockStore actor, fetch refs, then restore original order.
160-
keys_by_actor = {}
161-
for actor_name, key in zip(actor_names, batch_keys):
162-
keys_by_actor.setdefault(actor_name, []).append(key)
163-
164-
refs_by_key = {}
165-
for actor_name, keys in keys_by_actor.items():
166-
blockstore_actor = ray.get_actor(actor_name)
167-
refs = ray.get(blockstore_actor.get_block_refs.remote(keys))
168-
for k, ref in zip(keys, refs):
169-
refs_by_key[k] = ref
170-
171-
blocks = [refs_by_key[k] for k in batch_keys]
159+
# Materialize blocks via the owner actor so the owner actor becomes the Ray owner
160+
# of the returned Dataset blocks (ObjectRefs), while the blocks are still produced
161+
# on (and typically stored on) executor nodes for locality.
162+
owner_actor = ray.get_actor(actor_owner_name)
163+
blocks = ray.get(owner_actor.fetch_block_refs.remote(actor_names, batch_keys))
164+
# Keep refs in owner actor state to prevent owner-side GC from releasing ownership.
165+
owner.set_reference_as_state(owner_actor, blocks)
172166
return blocks, block_sizes
173167

174168
def spark_dataframe_to_ray_dataset(df: sql.DataFrame,

0 commit comments

Comments
 (0)