23
23
import pyarrow as pa
24
24
import ray
25
25
26
- import raysql
27
- from raysql import Context , ExecutionGraph , QueryStage
26
+ import datafusion_ray
27
+ from datafusion_ray import Context , ExecutionGraph , QueryStage
28
28
from typing import List
29
29
30
30
def schedule_execution (
@@ -73,7 +73,7 @@ def _get_worker_inputs(
73
73
return ids , futures
74
74
75
75
# schedule the actual execution workers
76
- plan_bytes = raysql .serialize_execution_plan (stage .get_execution_plan ())
76
+ plan_bytes = datafusion_ray .serialize_execution_plan (stage .get_execution_plan ())
77
77
futures = []
78
78
opt = {}
79
79
opt ["resources" ] = {"worker" : 1e-3 }
@@ -153,7 +153,7 @@ def _get_worker_inputs(
153
153
ray .get ([f for _ , lst in child_outputs for f in lst ])
154
154
155
155
# schedule the actual execution workers
156
- plan_bytes = raysql .serialize_execution_plan (stage .get_execution_plan ())
156
+ plan_bytes = datafusion_ray .serialize_execution_plan (stage .get_execution_plan ())
157
157
futures = []
158
158
opt = {}
159
159
opt ["resources" ] = {"worker" : 1e-3 }
@@ -179,7 +179,7 @@ def execute_query_partition(
179
179
* input_partitions : list [pa .RecordBatch ],
180
180
) -> Iterable [pa .RecordBatch ]:
181
181
start_time = time .time ()
182
- plan = raysql .deserialize_execution_plan (plan_bytes )
182
+ plan = datafusion_ray .deserialize_execution_plan (plan_bytes )
183
183
# print(
184
184
# "Worker executing plan {} partition #{} with shuffle inputs {}".format(
185
185
# plan.display(),
@@ -193,7 +193,7 @@ def execute_query_partition(
193
193
# This is delegating to DataFusion for execution, but this would be a good place
194
194
# to plug in other execution engines by translating the plan into another engine's plan
195
195
# (perhaps via Substrait, once DataFusion supports converting a physical plan to Substrait)
196
- ret = raysql .execute_partition (plan , part , partitions )
196
+ ret = datafusion_ray .execute_partition (plan , part , partitions )
197
197
duration = time .time () - start_time
198
198
event = {
199
199
"cat" : f"{ stage_id } -{ part } " ,
@@ -238,7 +238,7 @@ def sql(self, sql: str) -> pa.RecordBatch:
238
238
else :
239
239
# serialize the query stages and store in Ray object store
240
240
query_stages = [
241
- raysql .serialize_execution_plan (
241
+ datafusion_ray .serialize_execution_plan (
242
242
graph .get_query_stage (i ).get_execution_plan ()
243
243
)
244
244
for i in range (final_stage_id + 1 )
0 commit comments