Skip to content

Commit 34b18a9

Browse files
Improve differentiation between incoming/outgoing connections and transfers (#6933)
1 parent 6a1b089 commit 34b18a9

File tree

12 files changed

+253
-137
lines changed

12 files changed

+253
-137
lines changed

distributed/dashboard/components/worker.py

+33-19
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def update(self):
9191
"Executing": ["%d / %d" % (w.state.executing_count, w.state.nthreads)],
9292
"Ready": [len(w.state.ready)],
9393
"Waiting": [w.state.waiting_for_data_count],
94-
"Connections": [len(w.state.in_flight_workers)],
94+
"Connections": [w.state.transfer_incoming_count],
9595
"Serving": [len(w._comms)],
9696
}
9797
update(self.source, d)
@@ -114,8 +114,8 @@ def __init__(self, worker, height=300, **kwargs):
114114
"total",
115115
]
116116

117-
self.incoming = ColumnDataSource({name: [] for name in names})
118-
self.outgoing = ColumnDataSource({name: [] for name in names})
117+
self.transfer_incoming = ColumnDataSource({name: [] for name in names})
118+
self.transfer_outgoing = ColumnDataSource({name: [] for name in names})
119119

120120
x_range = DataRange1d(range_padding=0)
121121
y_range = DataRange1d(range_padding=0)
@@ -131,7 +131,7 @@ def __init__(self, worker, height=300, **kwargs):
131131
)
132132

133133
fig.rect(
134-
source=self.incoming,
134+
source=self.transfer_incoming,
135135
x="middle",
136136
y="y",
137137
width="duration",
@@ -140,7 +140,7 @@ def __init__(self, worker, height=300, **kwargs):
140140
alpha="alpha",
141141
)
142142
fig.rect(
143-
source=self.outgoing,
143+
source=self.transfer_outgoing,
144144
x="middle",
145145
y="y",
146146
width="duration",
@@ -159,26 +159,40 @@ def __init__(self, worker, height=300, **kwargs):
159159

160160
self.root = fig
161161

162-
self.last_incoming = 0
163-
self.last_outgoing = 0
162+
self.last_transfer_incoming_count_total = 0
163+
self.last_transfer_outgoing_count_total = 0
164164
self.who = dict()
165165

166166
@without_property_validation
167167
@log_errors
168168
def update(self):
169-
outgoing = self.worker.outgoing_transfer_log
170-
n = self.worker.outgoing_count - self.last_outgoing
171-
outgoing = [outgoing[-i].copy() for i in range(1, n + 1)]
172-
self.last_outgoing = self.worker.outgoing_count
169+
transfer_outgoing_log = self.worker.transfer_outgoing_log
170+
n = (
171+
self.worker.transfer_outgoing_count_total
172+
- self.last_transfer_outgoing_count_total
173+
)
174+
transfer_outgoing_log = [
175+
transfer_outgoing_log[-i].copy() for i in range(1, n + 1)
176+
]
177+
self.last_transfer_outgoing_count_total = (
178+
self.worker.transfer_outgoing_count_total
179+
)
173180

174-
incoming = self.worker.incoming_transfer_log
175-
n = self.worker.incoming_count - self.last_incoming
176-
incoming = [incoming[-i].copy() for i in range(1, n + 1)]
177-
self.last_incoming = self.worker.incoming_count
181+
transfer_incoming_log = self.worker.transfer_incoming_log
182+
n = (
183+
self.worker.state.transfer_incoming_count_total
184+
- self.last_transfer_incoming_count_total
185+
)
186+
transfer_incoming_log = [
187+
transfer_incoming_log[-i].copy() for i in range(1, n + 1)
188+
]
189+
self.last_transfer_incoming_count_total = (
190+
self.worker.state.transfer_incoming_count_total
191+
)
178192

179193
for [msgs, source] in [
180-
[incoming, self.incoming],
181-
[outgoing, self.outgoing],
194+
[transfer_incoming_log, self.transfer_incoming],
195+
[transfer_outgoing_log, self.transfer_outgoing],
182196
]:
183197

184198
for msg in msgs:
@@ -225,7 +239,7 @@ def __init__(self, worker, **kwargs):
225239
fig = figure(
226240
title="Communication History",
227241
x_axis_type="datetime",
228-
y_range=[-0.1, worker.state.total_out_connections + 0.5],
242+
y_range=[-0.1, worker.state.transfer_incoming_count_limit + 0.5],
229243
height=150,
230244
tools="",
231245
x_range=x_range,
@@ -247,7 +261,7 @@ def update(self):
247261
{
248262
"x": [time() * 1000],
249263
"out": [len(self.worker._comms)],
250-
"in": [len(self.worker.state.in_flight_workers)],
264+
"in": [self.worker.state.transfer_incoming_count],
251265
},
252266
10000,
253267
)

distributed/dashboard/tests/test_worker_bokeh.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,11 @@ async def test_CommunicatingStream(c, s, a, b):
142142
aa.update()
143143
bb.update()
144144

145-
assert len(first(aa.outgoing.data.values())) and len(
146-
first(bb.outgoing.data.values())
145+
assert len(first(aa.transfer_outgoing.data.values())) and len(
146+
first(bb.transfer_outgoing.data.values())
147147
)
148-
assert len(first(aa.incoming.data.values())) and len(
149-
first(bb.incoming.data.values())
148+
assert len(first(aa.transfer_incoming.data.values())) and len(
149+
first(bb.transfer_incoming.data.values())
150150
)
151151

152152

distributed/http/worker/prometheus/core.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def collect(self):
3737
yield GaugeMetricFamily(
3838
self.build_name("concurrent_fetch_requests"),
3939
"Number of open fetch requests to other workers.",
40-
value=len(self.server.state.in_flight_workers),
40+
value=self.server.state.transfer_incoming_count,
4141
)
4242

4343
yield GaugeMetricFamily(

distributed/tests/test_client.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,7 @@ async def test_gather_skip(c, s, a):
647647
async def test_limit_concurrent_gathering(c, s, a, b):
648648
futures = c.map(inc, range(100))
649649
await c.gather(futures)
650-
assert len(a.outgoing_transfer_log) + len(b.outgoing_transfer_log) < 100
650+
assert len(a.transfer_outgoing_log) + len(b.transfer_outgoing_log) < 100
651651

652652

653653
@gen_cluster(client=True)

distributed/tests/test_nanny.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ async def test_nanny_timeout(c, s, a):
232232
clean_kwargs={"threads": False},
233233
config={"distributed.worker.memory.pause": False},
234234
)
235-
async def test_throttle_outgoing_connections(c, s, a, *other_workers):
235+
async def test_throttle_outgoing_transfers(c, s, a, *other_workers):
236236
# Put a bunch of small data on worker a
237237
logging.getLogger("distributed.worker").setLevel(logging.DEBUG)
238238
remote_data = c.map(
@@ -241,7 +241,7 @@ async def test_throttle_outgoing_connections(c, s, a, *other_workers):
241241
await wait(remote_data)
242242

243243
a.status = Status.paused
244-
a.outgoing_current_count = 2
244+
a.transfer_outgoing_count = 2
245245

246246
requests = [
247247
await a.get_data(await w.rpc.connect(w.address), keys=[f.key], who=w.address)

distributed/tests/test_scheduler.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ def random(**kwargs):
227227
# Check that there were few transfers
228228
unexpected_transfers = []
229229
for worker in workers:
230-
for log in worker.incoming_transfer_log:
230+
for log in worker.transfer_incoming_log:
231231
keys = log["keys"]
232232
# The root-ish tasks should never be transferred
233233
assert not any(k.startswith("random") for k in keys), keys

distributed/tests/test_worker.py

+52-31
Original file line numberDiff line numberDiff line change
@@ -673,19 +673,19 @@ async def test_clean(c, s, a, b):
673673
@gen_cluster(client=True)
674674
async def test_message_breakup(c, s, a, b):
675675
n = 100_000
676-
a.state.target_message_size = 10 * n
677-
b.state.target_message_size = 10 * n
676+
a.state.transfer_message_target_bytes = 10 * n
677+
b.state.transfer_message_target_bytes = 10 * n
678678
xs = [
679679
c.submit(mul, b"%d" % i, n, key=f"x{i}", workers=[a.address]) for i in range(30)
680680
]
681681
y = c.submit(lambda _: None, xs, key="y", workers=[b.address])
682682
await y
683683

684-
assert 2 <= len(b.incoming_transfer_log) <= 20
685-
assert 2 <= len(a.outgoing_transfer_log) <= 20
684+
assert 2 <= len(b.transfer_incoming_log) <= 20
685+
assert 2 <= len(a.transfer_outgoing_log) <= 20
686686

687-
assert all(msg["who"] == b.address for msg in a.outgoing_transfer_log)
688-
assert all(msg["who"] == a.address for msg in a.incoming_transfer_log)
687+
assert all(msg["who"] == b.address for msg in a.transfer_outgoing_log)
688+
assert all(msg["who"] == a.address for msg in a.transfer_incoming_log)
689689

690690

691691
@gen_cluster(client=True)
@@ -764,7 +764,7 @@ async def test_gather_many_small(c, s, a, *snd_workers, as_deps):
764764
concurrent outgoing connections. If multiple small fetches from the same worker are
765765
scheduled all at once, they will result in a single call to gather_dep.
766766
"""
767-
a.state.total_out_connections = 2
767+
a.state.transfer_incoming_count_limit = 2
768768
futures = await c.scatter(
769769
{f"x{i}": i for i in range(100)},
770770
workers=[w.address for w in snd_workers],
@@ -779,7 +779,7 @@ async def test_gather_many_small(c, s, a, *snd_workers, as_deps):
779779
while len(a.data) < 100:
780780
await asyncio.sleep(0.01)
781781

782-
assert a.state.comm_nbytes == 0
782+
assert a.state.transfer_incoming_bytes == 0
783783

784784
story = a.state.story("request-dep", "receive-dep")
785785
assert len(story) == 40 # 1 request-dep + 1 receive-dep per sender worker
@@ -808,27 +808,31 @@ async def test_multiple_transfers(c, s, w1, w2, w3):
808808
@pytest.mark.xfail(reason="very high flakiness")
809809
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
810810
async def test_share_communication(c, s, w1, w2, w3):
811-
x = c.submit(mul, b"1", int(w3.target_message_size + 1), workers=w1.address)
812-
y = c.submit(mul, b"2", int(w3.target_message_size + 1), workers=w2.address)
811+
x = c.submit(
812+
mul, b"1", int(w3.transfer_message_target_bytes + 1), workers=w1.address
813+
)
814+
y = c.submit(
815+
mul, b"2", int(w3.transfer_message_target_bytes + 1), workers=w2.address
816+
)
813817
await wait([x, y])
814818
await c._replicate([x, y], workers=[w1.address, w2.address])
815819
z = c.submit(add, x, y, workers=w3.address)
816820
await wait(z)
817-
assert len(w3.incoming_transfer_log) == 2
818-
assert w1.outgoing_transfer_log
819-
assert w2.outgoing_transfer_log
821+
assert len(w3.transfer_incoming_log) == 2
822+
assert w1.transfer_outgoing_log
823+
assert w2.transfer_outgoing_log
820824

821825

822826
@pytest.mark.xfail(reason="very high flakiness")
823827
@gen_cluster(client=True)
824828
async def test_dont_overlap_communications_to_same_worker(c, s, a, b):
825-
x = c.submit(mul, b"1", int(b.target_message_size + 1), workers=a.address)
826-
y = c.submit(mul, b"2", int(b.target_message_size + 1), workers=a.address)
829+
x = c.submit(mul, b"1", int(b.transfer_message_target_bytes + 1), workers=a.address)
830+
y = c.submit(mul, b"2", int(b.transfer_message_target_bytes + 1), workers=a.address)
827831
await wait([x, y])
828832
z = c.submit(add, x, y, workers=b.address)
829833
await wait(z)
830-
assert len(b.incoming_transfer_log) == 2
831-
l1, l2 = b.incoming_transfer_log
834+
assert len(b.transfer_incoming_log) == 2
835+
l1, l2 = b.transfer_incoming_log
832836

833837
assert l1["stop"] < l2["start"]
834838

@@ -1244,8 +1248,8 @@ async def test_wait_for_outgoing(c, s, a, b):
12441248
y = c.submit(inc, future, workers=b.address)
12451249
await wait(y)
12461250

1247-
assert len(b.incoming_transfer_log) == len(a.outgoing_transfer_log) == 1
1248-
bb = b.incoming_transfer_log[0]["duration"]
1251+
assert len(b.transfer_incoming_log) == len(a.transfer_outgoing_log) == 1
1252+
bb = b.transfer_incoming_log[0]["duration"]
12491253
aa = a.outgoing_transfer_log[0]["duration"]
12501254
ratio = aa / bb
12511255

@@ -1262,8 +1266,8 @@ async def test_prefer_gather_from_local_address(c, s, w1, w2, w3):
12621266
y = c.submit(inc, x, workers=[w2.address])
12631267
await wait(y)
12641268

1265-
assert any(d["who"] == w2.address for d in w1.outgoing_transfer_log)
1266-
assert not any(d["who"] == w2.address for d in w3.outgoing_transfer_log)
1269+
assert any(d["who"] == w2.address for d in w1.transfer_outgoing_log)
1270+
assert not any(d["who"] == w2.address for d in w3.transfer_outgoing_log)
12671271

12681272

12691273
@gen_cluster(
@@ -1281,10 +1285,10 @@ async def test_avoid_oversubscription(c, s, *workers):
12811285
await wait(futures)
12821286

12831287
# Original worker not responsible for all transfers
1284-
assert len(workers[0].outgoing_transfer_log) < len(workers) - 2
1288+
assert len(workers[0].transfer_outgoing_log) < len(workers) - 2
12851289

12861290
# Some other workers did some work
1287-
assert len([w for w in workers if len(w.outgoing_transfer_log) > 0]) >= 3
1291+
assert len([w for w in workers if len(w.transfer_outgoing_log) > 0]) >= 3
12881292

12891293

12901294
@gen_cluster(client=True, worker_kwargs={"metrics": {"my_port": lambda w: w.port}})
@@ -1959,7 +1963,7 @@ async def test_gather_dep_one_worker_always_busy(c, s, a, b):
19591963
# We will block A for any outgoing communication. This simulates an
19601964
# overloaded worker which will always return "busy" for get_data requests,
19611965
# effectively blocking H indefinitely
1962-
a.outgoing_current_count = 10000000
1966+
a.transfer_outgoing_count = 10000000
19631967

19641968
h = c.submit(add, f, g, key="h", workers=[b.address])
19651969

@@ -2021,7 +2025,7 @@ async def test_gather_dep_from_remote_workers_if_all_local_workers_are_busy(
20212025
)
20222026
)["f"]
20232027
for w in lws:
2024-
w.outgoing_current_count = 10000000
2028+
w.transfer_outgoing_count = 10000000
20252029

20262030
g = c.submit(inc, f, key="g", workers=[a.address])
20272031
assert await g == 2
@@ -2718,7 +2722,7 @@ async def test_acquire_replicas_same_channel(c, s, a, b):
27182722
("request-dep", a.address, {fut.key}),
27192723
],
27202724
)
2721-
assert any(fut.key in msg["keys"] for msg in b.incoming_transfer_log)
2725+
assert any(fut.key in msg["keys"] for msg in b.transfer_incoming_log)
27222726

27232727

27242728
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
@@ -2997,9 +3001,9 @@ async def test_acquire_replicas_with_no_priority(c, s, a, b):
29973001
@gen_cluster(client=True, nthreads=[("", 1)])
29983002
async def test_acquire_replicas_large_data(c, s, a):
29993003
"""When acquire-replicas is used to acquire multiple sizeable tasks, it respects
3000-
target_message_size and acquires them over multiple iterations.
3004+
transfer_message_target_bytes and acquires them over multiple iterations.
30013005
"""
3002-
size = a.state.target_message_size // 5 - 10_000
3006+
size = a.state.transfer_message_target_bytes // 5 - 10_000
30033007

30043008
class C:
30053009
def __sizeof__(self):
@@ -3026,7 +3030,7 @@ async def test_missing_released_zombie_tasks(c, s, a, b):
30263030
Ensure that no fetch/flight tasks are left in the task dict of a
30273031
worker after everything was released
30283032
"""
3029-
a.total_in_connections = 0
3033+
a.transfer_outgoing_count_limit = 0
30303034
f1 = c.submit(inc, 1, key="f1", workers=[a.address])
30313035
f2 = c.submit(inc, f1, key="f2", workers=[b.address])
30323036
key = f1.key
@@ -3310,8 +3314,8 @@ async def test_Worker__to_dict(c, s, a):
33103314
"thread_id",
33113315
"logs",
33123316
"config",
3313-
"incoming_transfer_log",
3314-
"outgoing_transfer_log",
3317+
"transfer_incoming_log",
3318+
"transfer_outgoing_log",
33153319
# Attributes of WorkerMemoryManager
33163320
"data",
33173321
"max_spill",
@@ -3551,3 +3555,20 @@ async def test_execute_preamble_abort_retirement(c, s):
35513555

35523556
# Test that y does not get stuck.
35533557
assert await y == 2
3558+
3559+
3560+
@gen_cluster()
3561+
async def test_deprecation_of_renamed_worker_attributes(s, a, b):
3562+
msg = (
3563+
"The `Worker.outgoing_count` attribute has been renamed to "
3564+
"`Worker.transfer_outgoing_count_total`"
3565+
)
3566+
with pytest.warns(DeprecationWarning, match=msg):
3567+
assert a.outgoing_count == a.transfer_outgoing_count_total
3568+
3569+
msg = (
3570+
"The `Worker.outgoing_current_count` attribute has been renamed to "
3571+
"`Worker.transfer_outgoing_count`"
3572+
)
3573+
with pytest.warns(DeprecationWarning, match=msg):
3574+
assert a.outgoing_current_count == a.transfer_outgoing_count

0 commit comments

Comments
 (0)