Skip to content

Commit 2827f6f

Browse files
committed
Comments and test
1 parent 1e1fdf9 commit 2827f6f

File tree

2 files changed

+21
-14
lines changed

2 files changed

+21
-14
lines changed

distributed/stealing.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -528,9 +528,7 @@ def story(self, *keys_or_ts: str | TaskState) -> list:
528528
out.append(t)
529529
return out
530530

531-
def stealing_objective(
532-
self, ts: TaskState, ws: WorkerState
533-
) -> tuple[float, ...]:
531+
def stealing_objective(self, ts: TaskState, ws: WorkerState) -> tuple[float, ...]:
534532
"""Objective function to determine which worker should get the task
535533
536534
Minimize expected start time. If a tie then break with data storage.
@@ -565,9 +563,7 @@ def _get_thief(
565563
potential_thieves = valid_thieves
566564
elif not ts.loose_restrictions:
567565
return None
568-
return min(
569-
potential_thieves, key=partial(self.stealing_objective, ts)
570-
)
566+
return min(potential_thieves, key=partial(self.stealing_objective, ts))
571567

572568

573569
fast_tasks = {

distributed/tests/test_steal.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1886,8 +1886,7 @@ async def test_trivial_workload_should_not_cause_work_stealing(c, s, *workers):
18861886
config={"distributed.scheduler.worker-saturation": "inf"},
18871887
)
18881888
async def test_stealing_ogjective_accounts_for_in_flight(c, s, a):
1889-
"""Regression test that work-stealing's objective correctly accounts for in-flight data requests
1890-
"""
1889+
"""Regression test that work-stealing's objective correctly accounts for in-flight data requests"""
18911890
in_event = Event()
18921891
block_event = Event()
18931892

@@ -1912,19 +1911,31 @@ def block(i: int, in_event: Event, block_event: Event) -> int:
19121911
ts = next(iter(wsA.processing))
19131912

19141913
# No in-flight requests, so both match
1915-
assert extension.stealing_objective(ts, wsA) == s.worker_objective(ts, wsA)
1916-
assert extension.stealing_objective(ts, wsB) == s.worker_objective(ts, wsB)
1914+
assert extension.stealing_objective(ts, wsA) == s.worker_objective(
1915+
ts, wsA
1916+
)
1917+
assert extension.stealing_objective(ts, wsB) == s.worker_objective(
1918+
ts, wsB
1919+
)
19171920

19181921
extension.balance()
19191922
assert extension.in_flight
19201923
# We move tasks from a to b
1921-
assert extension.stealing_objective(ts, wsA) < s.worker_objective(ts, wsA)
1922-
assert extension.stealing_objective(ts, wsB) > s.worker_objective(ts, wsB)
1924+
assert extension.stealing_objective(ts, wsA) < s.worker_objective(
1925+
ts, wsA
1926+
)
1927+
assert extension.stealing_objective(ts, wsB) > s.worker_objective(
1928+
ts, wsB
1929+
)
19231930

19241931
await async_poll_for(lambda: not extension.in_flight, timeout=5)
19251932
# No in-flight requests, so both match
1926-
assert extension.stealing_objective(ts, wsA) == s.worker_objective(ts, wsA)
1927-
assert extension.stealing_objective(ts, wsB) == s.worker_objective(ts, wsB)
1933+
assert extension.stealing_objective(ts, wsA) == s.worker_objective(
1934+
ts, wsA
1935+
)
1936+
assert extension.stealing_objective(ts, wsB) == s.worker_objective(
1937+
ts, wsB
1938+
)
19281939
finally:
19291940
await block_event.set()
19301941
finally:

0 commit comments

Comments
 (0)