Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/core_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@
#ifndef CORE_C_H
#define CORE_C_H

// Guard these includes behind a special defines so that the Python
// CFFI parser does not get confused once these headers are expanded
// by the preprocessor.
#ifndef TASK_BENCH_PYTHON_CFFI
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#endif

#ifdef __cplusplus
extern "C" {
Expand Down
5 changes: 3 additions & 2 deletions dask/task_bench_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
root_dir = os.path.dirname(os.path.dirname(__file__))
core_header = subprocess.check_output(
[
"gcc", "-D", "__attribute__(x)=", "-E", "-P",
"gcc", "-D", "__attribute__(x)=",
"-D", "TASK_BENCH_PYTHON_CFFI",
"-E", "-P",
os.path.join(root_dir, "core/core_c.h")
]).decode("utf-8")
ffi = cffi.FFI()
ffi.cdef(core_header)
c = ffi.dlopen("libcore.so")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you're trying to modify the Dask implementation, and this seems to work in our CI. Do you need this change?



def init_client():
Expand Down
45 changes: 45 additions & 0 deletions experiments/sapling_metg_compute/metg_ray.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/bin/bash

cores=40
root_dir="$(dirname "$(dirname "$PWD")")"

export LD_LIBRARY_PATH="$root_dir"/core:"$LD_LIBRARY_PATH"
export PYTHONPATH="$root_dir"/ray:"$PYTHONPATH"
SCHEDULER_PORT=1234

function launch {
python3 "$root_dir"/ray/task_bench.py "${@:2}" -skip-graph-validation
}

function repeat {
local -n result=$1
local n=$2
result=()
for i in $(seq 1 $n); do
result+=("${@:3}")
if (( i < n )); then
result+=("-and")
fi
done
}

function sweep {
for s in 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18; do
for rep in 0 1 2 3 4; do
if [[ $rep -le $s ]]; then
ray start --head --port $SCHEDULER_PORT --num-cpus $cores > /dev/null
local args
repeat args $3 -kernel compute_bound -iter $(( 1 << (30-s) )) -type $4 -radix ${RADIX:-5} -steps ${STEPS:-1000} -width $(( $2 * cores ))
$1 $2 "${args[@]}"
ray stop > /dev/null
fi
done
done
}

n=1
for g in ${NGRAPHS:-1}; do
for t in ${PATTERN:-stencil_1d}; do
sweep launch $n $g $t > ray_ngraphs_${g}_type_${t}_nodes_${n}.log
done
done
52 changes: 52 additions & 0 deletions ray/task_bench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import ray
import sys
import task_bench_core as core
import time

def execute_task_graph(graph):
graph_array = core.encode_task_graph(graph)

if graph.scratch_bytes_per_task > 0:
scratch = [
core.init_scratch_delayed(graph.scratch_bytes_per_task)
for _ in range(graph.max_width)
]
else:
scratch = [None for _ in range(graph.max_width)]

outputs = []
last_row = None
for timestep in range(0, graph.timesteps):
offset = core.c.task_graph_offset_at_timestep(graph, timestep)
width = core.c.task_graph_width_at_timestep(graph, timestep)
row = []
for point in range(0, offset):
row.append(None)
for point in range(offset, offset + width):
inputs = []
for dep in core.task_graph_dependencies(graph, timestep, point):
inputs.append(last_row[dep])
output, scratch[point] = core.execute_point_delayed(
graph_array, timestep, point, scratch[point], *inputs)
row.append(output)
outputs.append(output)
for point in range(offset + width, graph.max_width):
row.append(None)
assert len(row) == graph.max_width
last_row = row
return outputs

def execute_task_bench():
app = core.app_create(sys.argv)
task_graphs = core.app_task_graphs(app)
start_time = time.perf_counter()
results = []
for task_graph in task_graphs:
results.extend(execute_task_graph(task_graph))
ray.get(results)
total_time = time.perf_counter() - start_time
core.c.app_report_timing(app, total_time)

if __name__ == "__main__":
ray.init(address="auto")
execute_task_bench()
99 changes: 99 additions & 0 deletions ray/task_bench_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import cffi
import ray
import numpy as np
import os
import subprocess

# Similiarly to the FFI loading in dask/task_bench_core.py, we load
# the CFFI handles in its own module to avoid introspection from
# cloudpickle.
root_dir = os.path.dirname(os.path.dirname(__file__))
core_header = subprocess.check_output(
[
"gcc", "-D", "__attribute__(x)=",
"-D", "TASK_BENCH_PYTHON_CFFI",
"-E", "-P",
os.path.join(root_dir, "core/core_c.h")
]).decode("utf-8")
ffi = cffi.FFI()
ffi.cdef(core_header)
c = ffi.dlopen("libcore.so")

def app_create(args):
c_args = []
c_argv = ffi.new("char *[]", len(args) + 1)
for i, arg in enumerate(args):
c_args.append(ffi.new("char []", arg.encode('utf-8')))
c_argv[i] = c_args[-1]
c_argv[len(args)] = ffi.NULL
app = c.app_create(len(args), c_argv)
c.app_display(app)
return app

def encode_task_graph(graph):
return np.frombuffer(
ffi.buffer(ffi.addressof(graph), ffi.sizeof(graph)), dtype=np.ubyte)

def app_task_graphs(app):
result = []
graphs = c.app_task_graphs(app)
for i in range(c.task_graph_list_num_task_graphs(graphs)):
result.append(c.task_graph_list_task_graph(graphs, i))
return result

def decode_task_graph(graph_array):
return ffi.cast("task_graph_t *", graph_array.ctypes.data)[0]

def task_graph_dependencies(graph, timestep, point):
last_offset = c.task_graph_offset_at_timestep(graph, timestep - 1)
last_width = c.task_graph_width_at_timestep(graph, timestep - 1)

if timestep == 0:
last_offset, last_width = 0, 0

dset = c.task_graph_dependence_set_at_timestep(graph, timestep)
ilist = c.task_graph_dependencies(graph, dset, point)
for i in range(0, c.interval_list_num_intervals(ilist)):
interval = c.interval_list_interval(ilist, i)
for dep in range(interval.start, interval.end + 1):
if last_offset <= dep < last_offset + last_width:
yield dep

def execute_point_impl(graph_array, timestep, point, scratch, *inputs):
graph = decode_task_graph(graph_array)

input_ptrs = ffi.new(
"char *[]", [ffi.cast("char *", i.ctypes.data) for i in inputs])
input_sizes = ffi.new("size_t []", [i.shape[0] for i in inputs])

output = np.empty(graph.output_bytes_per_task, dtype=np.ubyte)
output_ptr = ffi.cast("char *", output.ctypes.data)

if scratch is not None:
scratch_ptr = ffi.cast("char *", scratch.ctypes.data)
scratch_size = scratch.shape[0]
else:
scratch_ptr = ffi.NULL
scratch_size = 0

c.task_graph_execute_point_scratch(
graph, timestep, point, output_ptr, output.shape[0], input_ptrs,
input_sizes, len(inputs), scratch_ptr, scratch_size)
return output

@ray.remote
def execute_point_scratch(graph_array, timestep, point, scratch, *inputs):
return execute_point_impl(
graph_array, timestep, point, scratch, *inputs), scratch

@ray.remote
def execute_point_no_scratch(graph_array, timestep, point, *inputs):
return execute_point_impl(graph_array, timestep, point, None, *inputs)

def execute_point_delayed(graph_array, timestep, point, scratch, *inputs):
if scratch is not None:
return execute_point_scratch.remote(
graph_array, timestep, point, scratch, *inputs)
else:
return execute_point_no_scratch.remote(
graph_array, timestep, point, *inputs), None