Skip to content

Commit 4470cc1

Browse files
authored
Merge pull request #447 from itamarst/446-dask-process
#446: dask.persist() support.
2 parents 21765e8 + 9bf62d9 commit 4470cc1

File tree

5 files changed

+181
-29
lines changed

5 files changed

+181
-29
lines changed

docs/source/news.rst

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,17 @@
11
What's New
22
==========
33

4+
1.12.0
5+
^^^^^^
6+
7+
Features:
8+
9+
* Dask support now includes support for tracing logging of ``dask.persist()``, via wrapper API ``eliot.dask.persist_with_trace()``.
10+
11+
Bug fixes:
12+
13+
* Dask edge cases that previously weren't handled correctly should work better.
14+
415
1.11.0
516
^^^^^^
617

docs/source/scientific-computing.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ In order to do this you will need to:
4444
* Ensure all worker processes write the Eliot logs to disk (if you're using the ``multiprocessing`` or ``distributed`` backends).
4545
* If you're using multiple worker machines, aggregate all log files into a single place, so you can more easily analyze them with e.g. `eliot-tree <https://github.com/jonathanj/eliottree>`_.
4646
* Replace ``dask.compute()`` with ``eliot.dask.compute_with_trace()``.
47+
* Replace ``dask.persist()`` with ``eliot.dask.persist_with_trace()``.
4748

48-
In the following example, you can see how this works for a Dask run using ``distributed``, the recommended Dask scheduler.
49+
In the following example, you can see how this works for a Dask run using ``distributed``, the recommended Dask scheduler for more sophisticated use cases.
4950
We'll be using multiple worker processes, but only use a single machine:
5051

5152
.. literalinclude:: ../../examples/dask_eliot.py

eliot/dask.py

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,17 @@
22

33
from pyrsistent import PClass, field
44

5-
from dask import compute, optimize
6-
from dask.core import toposort, get_dependencies
5+
from dask import compute, optimize, persist
6+
7+
try:
8+
from dask.distributed import Future
9+
except:
10+
11+
class Future(object):
12+
pass
13+
14+
15+
from dask.core import toposort, get_dependencies, ishashable
716
from . import start_action, current_action, Action
817

918

@@ -75,6 +84,22 @@ def compute_with_trace(*args):
7584
return compute(*optimized, optimize_graph=False)
7685

7786

87+
def persist_with_trace(*args):
88+
"""Do Dask persist(), but with added Eliot tracing.
89+
90+
Known issues:
91+
92+
1. Retries will confuse Eliot. Probably need different
93+
distributed-tree mechanism within Eliot to solve that.
94+
"""
95+
# 1. Create top-level Eliot Action:
96+
with start_action(action_type="dask:persist"):
97+
# In order to reduce logging verbosity, add logging to the already
98+
# optimized graph:
99+
optimized = optimize(*args, optimizations=[_add_logging])
100+
return persist(*optimized, optimize_graph=False)
101+
102+
78103
def _add_logging(dsk, ignore=None):
79104
"""
80105
Add logging to a Dask graph.
@@ -101,34 +126,43 @@ def simplify(k):
101126
key_names = {}
102127
for key in keys:
103128
value = dsk[key]
104-
if not callable(value) and value in keys:
129+
if not callable(value) and ishashable(value) and value in keys:
105130
# It's an alias for another key:
106131
key_names[key] = key_names[value]
107132
else:
108133
key_names[key] = simplify(key)
109134

110-
# 2. Create Eliot child Actions for each key, in topological order:
111-
key_to_action_id = {key: str(ctx.serialize_task_id(), "utf-8") for key in keys}
135+
# Values in the graph can be either:
136+
#
137+
# 1. A list of other values.
138+
# 2. A tuple, where first value might be a callable, aka a task.
139+
# 3. A literal of some sort.
140+
def maybe_wrap(key, value):
141+
if isinstance(value, list):
142+
return [maybe_wrap(key, v) for v in value]
143+
elif isinstance(value, tuple):
144+
func = value[0]
145+
args = value[1:]
146+
if not callable(func):
147+
# Not a callable, so nothing to wrap.
148+
return value
149+
wrapped_func = _RunWithEliotContext(
150+
task_id=str(ctx.serialize_task_id(), "utf-8"),
151+
func=func,
152+
key=key_names[key],
153+
dependencies=[key_names[k] for k in get_dependencies(dsk, key)],
154+
)
155+
return (wrapped_func,) + args
156+
else:
157+
return value
112158

113-
# 3. Replace function with wrapper that logs appropriate Action:
159+
# Replace function with wrapper that logs appropriate Action; iterate in
160+
# topological order so action task levels are in reasonable order.
114161
for key in keys:
115-
func = dsk[key][0]
116-
args = dsk[key][1:]
117-
if not callable(func):
118-
# This key is just an alias for another key, no need to add
119-
# logging:
120-
result[key] = dsk[key]
121-
continue
122-
wrapped_func = _RunWithEliotContext(
123-
task_id=key_to_action_id[key],
124-
func=func,
125-
key=key_names[key],
126-
dependencies=[key_names[k] for k in get_dependencies(dsk, key)],
127-
)
128-
result[key] = (wrapped_func,) + tuple(args)
162+
result[key] = maybe_wrap(key, dsk[key])
129163

130164
assert set(result.keys()) == set(dsk.keys())
131165
return result
132166

133167

134-
__all__ = ["compute_with_trace"]
168+
__all__ = ["compute_with_trace", "persist_with_trace"]

eliot/tests/test_dask.py

Lines changed: 110 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,23 @@
33
from unittest import TestCase, skipUnless
44

55
from ..testing import capture_logging, LoggedAction, LoggedMessage
6-
from .. import start_action, Message
6+
from .. import start_action, log_message
77

88
try:
99
import dask
1010
from dask.bag import from_sequence
11+
from dask.distributed import Client
12+
import dask.dataframe as dd
13+
import pandas as pd
1114
except ImportError:
1215
dask = None
1316
else:
14-
from ..dask import compute_with_trace, _RunWithEliotContext, _add_logging
17+
from ..dask import (
18+
compute_with_trace,
19+
_RunWithEliotContext,
20+
_add_logging,
21+
persist_with_trace,
22+
)
1523

1624

1725
@skipUnless(dask, "Dask not available.")
@@ -28,30 +36,74 @@ def test_compute(self):
2836
bag = bag.fold(lambda x, y: x + y)
2937
self.assertEqual(dask.compute(bag), compute_with_trace(bag))
3038

39+
def test_future(self):
40+
"""compute_with_trace() can handle Futures."""
41+
client = Client(processes=False)
42+
self.addCleanup(client.shutdown)
43+
[bag] = dask.persist(from_sequence([1, 2, 3]))
44+
bag = bag.map(lambda x: x * 5)
45+
result = dask.compute(bag)
46+
self.assertEqual(result, ([5, 10, 15],))
47+
self.assertEqual(result, compute_with_trace(bag))
48+
49+
def test_persist_result(self):
50+
"""persist_with_trace() runs the same logic as process()."""
51+
client = Client(processes=False)
52+
self.addCleanup(client.shutdown)
53+
bag = from_sequence([1, 2, 3])
54+
bag = bag.map(lambda x: x * 7)
55+
self.assertEqual(
56+
[b.compute() for b in dask.persist(bag)],
57+
[b.compute() for b in persist_with_trace(bag)],
58+
)
59+
60+
def test_persist_pandas(self):
61+
"""persist_with_trace() with a Pandas dataframe.
62+
63+
This ensures we don't blow up, which used to be the case.
64+
"""
65+
df = pd.DataFrame()
66+
df = dd.from_pandas(df, npartitions=1)
67+
persist_with_trace(df)
68+
3169
@capture_logging(None)
32-
def test_logging(self, logger):
70+
def test_persist_logging(self, logger):
71+
"""persist_with_trace() preserves Eliot context."""
72+
73+
def persister(bag):
74+
[bag] = persist_with_trace(bag)
75+
return dask.compute(bag)
76+
77+
self.assert_logging(logger, persister, "dask:persist")
78+
79+
@capture_logging(None)
80+
def test_compute_logging(self, logger):
3381
"""compute_with_trace() preserves Eliot context."""
82+
self.assert_logging(logger, compute_with_trace, "dask:compute")
83+
84+
def assert_logging(self, logger, run_with_trace, top_action_name):
85+
"""Utility function for _with_trace() logging tests."""
3486

3587
def mult(x):
36-
Message.log(message_type="mult")
88+
log_message(message_type="mult")
3789
return x * 4
3890

3991
def summer(x, y):
40-
Message.log(message_type="finally")
92+
log_message(message_type="finally")
4193
return x + y
4294

4395
bag = from_sequence([1, 2])
4496
bag = bag.map(mult).fold(summer)
4597
with start_action(action_type="act1"):
46-
compute_with_trace(bag)
98+
run_with_trace(bag)
4799

48100
[logged_action] = LoggedAction.ofType(logger.messages, "act1")
49101
self.assertEqual(
50102
logged_action.type_tree(),
51103
{
52104
"act1": [
53105
{
54-
"dask:compute": [
106+
top_action_name: [
55107
{"eliot:remote_task": ["dask:task", "mult"]},
56108
{"eliot:remote_task": ["dask:task", "mult"]},
57109
{"eliot:remote_task": ["dask:task", "finally"]},
@@ -83,6 +135,8 @@ def summer(x, y):
83135
class AddLoggingTests(TestCase):
84136
"""Tests for _add_logging()."""
85137

138+
maxDiff = None
139+
86140
def test_add_logging_to_full_graph(self):
87141
"""_add_logging() recreates Dask graph with wrappers."""
88142
bag = from_sequence([1, 2, 3])
@@ -104,3 +158,52 @@ def test_add_logging_to_full_graph(self):
104158
logging_removed[key] = value
105159

106160
self.assertEqual(logging_removed, graph)
161+
162+
def test_add_logging_explicit(self):
163+
"""_add_logging() on more edge cases of the graph."""
164+
165+
def add(s):
166+
return s + "s"
167+
168+
def add2(s):
169+
return s + "s"
170+
171+
# b runs first, then d, then a and c.
172+
graph = {
173+
"a": "d",
174+
"d": [1, 2, (add, "b")],
175+
("b", 0): 1,
176+
"c": (add2, "d"),
177+
}
178+
179+
with start_action(action_type="bleh") as action:
180+
task_id = action.task_uuid
181+
self.assertEqual(
182+
_add_logging(graph),
183+
{
184+
"d": [
185+
1,
186+
2,
187+
(
188+
_RunWithEliotContext(
189+
task_id=task_id + "@/2",
190+
func=add,
191+
key="d",
192+
dependencies=["b"],
193+
),
194+
"b",
195+
),
196+
],
197+
"a": "d",
198+
("b", 0): 1,
199+
"c": (
200+
_RunWithEliotContext(
201+
task_id=task_id + "@/3",
202+
func=add2,
203+
key="c",
204+
dependencies=["d"],
205+
),
206+
"d",
207+
),
208+
},
209+
)

tox.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ deps = cffi
3232
basepython = python3.7
3333
deps = cffi
3434
dask[bag]
35+
dask[distributed]
36+
dask[pandas]
37+
pandas
3538

3639
[testenv:py38]
3740
basepython = python3.8

0 commit comments

Comments
 (0)