@@ -19,6 +19,9 @@ package org.apache.spark.sql.raydp
1919
2020import com .intel .raydp .shims .SparkShimLoader
2121import io .ray .api .{ActorHandle , ObjectRef , Ray }
22+ import io .ray .api .PyActorHandle
23+ import io .ray .api .call .PyActorTaskCaller
24+ import io .ray .api .function .PyActorMethod
2225import io .ray .runtime .AbstractRayRuntime
2326import java .io .ByteArrayOutputStream
2427import java .util .{List , Optional , UUID }
@@ -65,12 +68,33 @@ class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
6568 ownerName : String ): RecordBatch = {
6669
6770 // NOTE: We intentionally do NOT pass an owner argument to Ray.put anymore.
68- // The default JVM path puts the serialized Arrow batch into Ray's object store
69- // from the Spark executor JVM process.
7071 //
71- // Ownership transfer to a long-lived Python actor is implemented on the Python side
72- // by "adopting" (re-putting) these ObjectRefs inside the target actor.
73- val objectRef : ObjectRef [Array [Byte ]] = Ray .put(data)
72+ // - When ownerName is empty, route the put via the JVM RayAppMaster actor.
73+ // - When ownerName is set to a Python actor name (e.g. RayDPSparkMaster),
74+ // invoke that Python actor's put_data(data) method via Ray cross-language
75+ // calls so that the Python actor becomes the owner of the created object.
76+ val objectRef : ObjectRef [_] =
77+ if (ownerName == " " ) {
78+ Ray .put(data)
79+ } else {
80+ // Ray.getActor(String) is a raw Java Optional in Ray's Java API.
81+ // If we don't cast it to an explicit reference type here, Scala may infer
82+ // Optional[Nothing] and insert an invalid cast at runtime.
83+ val opt = Ray .getActor(ownerName).asInstanceOf [Optional [AnyRef ]]
84+ if (! opt.isPresent) {
85+ throw new RayDPException (s " Actor $ownerName not found when putting dataset block. " )
86+ }
87+ val handleAny : AnyRef = opt.get()
88+ if (! handleAny.isInstanceOf [PyActorHandle ]) {
89+ throw new RayDPException (
90+ s " Actor $ownerName is not a Python actor; cannot invoke put_data. "
91+ )
92+ }
93+ val pyHandle = handleAny.asInstanceOf [PyActorHandle ]
94+ val method = PyActorMethod .of(" put_data" , classOf [AnyRef ])
95+ val refOfRef = pyHandle.task(method, data).remote()
96+ refOfRef
97+ }
7498
7599 // add the objectRef to the objectRefHolder to avoid reference GC
76100 queue.add(objectRef)
@@ -171,7 +195,7 @@ class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
171195 /**
172196 * For test.
173197 */
174- def getRandomRef (): List [Array [ Byte ] ] = {
198+ def getRandomRef (): List [_ ] = {
175199
176200 df.queryExecution.toRdd.mapPartitions { _ =>
177201 Iterator (ObjectRefHolder .getRandom(uuid))
@@ -270,7 +294,7 @@ object ObjectStoreWriter {
270294}
271295
272296object ObjectRefHolder {
273- type Queue = ConcurrentLinkedQueue [ObjectRef [Array [ Byte ] ]]
297+ type Queue = ConcurrentLinkedQueue [ObjectRef [_ ]]
274298 private val dfToQueue = new ConcurrentHashMap [UUID , Queue ]()
275299
276300 def getQueue (df : UUID ): Queue = {
@@ -295,7 +319,7 @@ object ObjectRefHolder {
295319 queue.size()
296320 }
297321
298- def getRandom (df : UUID ): Array [ Byte ] = {
322+ def getRandom (df : UUID ): Any = {
299323 val queue = checkQueueExists(df)
300324 val ref = RayDPUtils .convert(queue.peek())
301325 ref.get()
0 commit comments