Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update packaging and documentation #8

Merged
merged 3 commits into from
Oct 2, 2024

Conversation

austin362667
Copy link
Contributor

Rationale for this change

Update packaging and documentation to reflect the new project name and purpose (it is no longer a research project, for example). And cleaning the house like renaming raysql to datafusion_ray.

Test

I can run (TPC-H, SF=1) on single machine (Mac M2 with 8 physical cores) with ray_shuffle disabled.

Below are full logs on query 1:

austin@Machine ~/D/datafusion-ray (update_docs) [1]> python ./datafusion_ray/main.py                               (python_312)
2024-10-02 18:28:34,528	INFO worker.py:1786 -- Started a local Ray instance.
Using 8 workers
Planning select
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
        l_shipdate <= date '1998-09-02'
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus;
Query stage #0:
ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 8))
  AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)]
    ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as __common_expr_1, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]
      CoalesceBatchesExec: target_batch_size=16384
        FilterExec: l_shipdate@6 <= 1998-09-02
          ParquetExec: file_groups={8 groups: [[Users/austin/Documents/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:0..20168276], [Users/austin/Documents/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:20168276..20309983, Users/austin/Documents/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-1.parquet:0..20026569], [Users/austin/Documents/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-1.parquet:20026569..20192373, Users/austin/Documents/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-2.parquet:0..20002472], [Users/austin/Documents/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-2.parquet:20002472..20137926, Users/austin/Documents/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-3.parquet:0..20032822], [Users/austin/Documents/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-3.parquet:20032822..20144160, Users/austin/Documents/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-4.parquet:0..20056938], ...]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], predicate=l_shipdate@10 <= 1998-09-02, pruning_predicate=CASE WHEN l_shipdate_null_count@1 = l_shipdate_row_count@2 THEN false ELSE l_shipdate_min@0 <= 1998-09-02 END, required_guarantees=[]

Query stage #1:
ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 8))
  SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST], preserve_partitioning=[true]
    ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty, sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty, avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as avg_disc, count(*)@9 as count_order]
      AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)]
        CoalesceBatchesExec: target_batch_size=16384
          ShuffleReaderExec(stage_id=0, input_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 8))

Query stage #2:
SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]
  ShuffleReaderExec(stage_id=1, input_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 8))

(execute_query_stage pid=30805) Forcing reduce stage concurrency from 8 to 1
(execute_query_stage pid=30805) Scheduling query stage #2 with 1 input partitions and 1 output partitions




(execute_query_partition pid=30807) {"cat": "0-4", "name": "0-4", "pid": "127.0.0.1", "tid": 30807, "ts": 1727864915721560, "dur": 7611107, "ph": "X"},
(execute_query_stage pid=30806) Scheduling query stage #0 with 8 input partitions and 8 output partitions [repeated 2x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)
query,1,8.801308291993337
(execute_query_partition pid=30802) {"cat": "1-4", "name": "1-4", "pid": "127.0.0.1", "tid": 30802, "ts": 1727864923930683, "dur": 6819, "ph": "X"},:task_name:execute_query_partition
(execute_query_partition pid=30812) {"cat": "1-1", "name": "1-1", "pid": "127.0.0.1", "tid": 30812, "ts": 1727864923928007, "dur": 7047, "ph": "X"}, [repeated 15x across cluster]

Signed-off-by: Austin Liu <[email protected]>
Signed-off-by: Austin Liu <[email protected]>

Format

Signed-off-by: Austin Liu <[email protected]>
Signed-off-by: Austin Liu <[email protected]>
Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @austin362667

@andygrove andygrove merged commit fe86387 into apache:main Oct 2, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants