Skip to content

Commit a1a4e41

Browse files
wjsiqinxuye
authored andcommitted
Add batch support for broadcasts and local meta setting (#166)
1 parent ea33f73 commit a1a4e41

File tree

3 files changed

+83
-12
lines changed

3 files changed

+83
-12
lines changed

mars/scheduler/chunkmeta.py

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,17 @@ def set_chunk_broadcasts(self, session_id, chunk_key, broadcast_dests):
193193
self._meta_broadcasts[(session_id, chunk_key)] = \
194194
[d for d in broadcast_dests if d != self.address]
195195

196+
def batch_set_chunk_broadcasts(self, session_id, chunk_keys, broadcast_dests):
197+
"""
198+
Configure broadcast destinations in batch
199+
:param session_id: session id
200+
:param chunk_keys: chunk key
201+
:param broadcast_dests:
202+
:return:
203+
"""
204+
for key, dests in zip(chunk_keys, broadcast_dests):
205+
self.set_chunk_broadcasts(session_id, key, dests)
206+
196207
def get_chunk_broadcasts(self, session_id, chunk_key):
197208
"""
198209
Get chunk broadcast addresses, for test only
@@ -201,14 +212,16 @@ def get_chunk_broadcasts(self, session_id, chunk_key):
201212
"""
202213
return self._meta_broadcasts.get((session_id, chunk_key))
203214

204-
def set_chunk_meta(self, session_id, chunk_key, size=None, shape=None, workers=None):
215+
def set_chunk_meta(self, session_id, chunk_key, size=None, shape=None, workers=None,
216+
broadcast=True):
205217
"""
206218
Update chunk meta in current storage
207219
:param session_id: session id
208220
:param chunk_key: chunk key
209221
:param size: size of the chunk
210222
:param shape: shape of the chunk
211223
:param workers: workers holding the chunk
224+
:param broadcast: broadcast meta into registered destinations
212225
"""
213226
query_key = (session_id, chunk_key)
214227
# update input with existing value
@@ -238,24 +251,54 @@ def set_chunk_meta(self, session_id, chunk_key, size=None, shape=None, workers=N
238251

239252
# broadcast into pre-determined destinations
240253
futures = []
241-
if query_key in self._meta_broadcasts:
254+
if broadcast and query_key in self._meta_broadcasts:
242255
for dest in self._meta_broadcasts[query_key]:
243256
futures.append(
244257
self.ctx.actor_ref(self.default_name(), address=dest)
245-
.cache_chunk_meta(session_id, chunk_key, meta, _wait=False, _tell=True)
258+
.batch_cache_chunk_meta(session_id, [chunk_key], [meta], _wait=False, _tell=True)
246259
)
247260
[f.result() for f in futures]
248261

249-
def cache_chunk_meta(self, session_id, chunk_key, meta):
262+
def batch_set_chunk_meta(self, session_id, keys, metas):
263+
"""
264+
Set chunk metas in batch
265+
:param session_id: session id
266+
:param keys: keys to set
267+
:param metas: metas to set
268+
"""
269+
query_dict = defaultdict(lambda: (list(), list()))
270+
271+
for key, meta in zip(keys, metas):
272+
self.set_chunk_meta(session_id, key, size=meta.chunk_size, shape=meta.chunk_shape,
273+
workers=meta.workers, broadcast=False)
274+
try:
275+
dests = self._meta_broadcasts[(session_id, key)]
276+
except KeyError:
277+
continue
278+
279+
for dest in dests:
280+
query_dict[dest][0].append(key)
281+
query_dict[dest][1].append(meta)
282+
283+
futures = []
284+
for dest, (chunk_keys, metas) in query_dict.items():
285+
futures.append(
286+
self.ctx.actor_ref(self.default_name(), address=dest)
287+
.batch_cache_chunk_meta(session_id, chunk_keys, metas, _wait=False, _tell=True)
288+
)
289+
[f.result() for f in futures]
290+
291+
def batch_cache_chunk_meta(self, session_id, chunk_keys, metas):
250292
"""
251293
Receive updates for caching
252294
253295
:param session_id: session id
254-
:param chunk_key: chunk key
255-
:param meta: meta data
296+
:param chunk_keys: chunk keys
297+
:param metas: meta data
256298
"""
257-
query_key = (session_id, chunk_key)
258-
self._meta_cache[query_key] = meta
299+
for chunk_key, meta in zip(chunk_keys, metas):
300+
query_key = (session_id, chunk_key)
301+
self._meta_cache[query_key] = meta
259302

260303
def get_chunk_meta(self, session_id, chunk_key):
261304
"""
@@ -361,6 +404,17 @@ def set_chunk_broadcasts(self, session_id, chunk_key, broadcast_dests):
361404
self.ctx.actor_ref(LocalChunkMetaActor.default_name(), address=addr) \
362405
.set_chunk_broadcasts(session_id, chunk_key, broadcast_dests, _tell=True, _wait=False)
363406

407+
def batch_set_chunk_broadcasts(self, session_id, chunk_keys, broadcast_dests):
408+
query_chunk = defaultdict(lambda: (list(), list()))
409+
for key, dests in zip(chunk_keys, broadcast_dests):
410+
addr = self.get_scheduler((session_id, key))
411+
query_chunk[addr][0].append(key)
412+
query_chunk[addr][1].append(dests)
413+
414+
for addr, (chunk_keys, dest_groups) in query_chunk.items():
415+
self.ctx.actor_ref(LocalChunkMetaActor.default_name(), address=addr) \
416+
.batch_set_chunk_broadcasts(session_id, chunk_keys, dest_groups)
417+
364418
def set_chunk_meta(self, session_id, chunk_key, size=None, shape=None, workers=None):
365419
"""
366420
Update chunk metadata

mars/scheduler/operand.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -430,9 +430,13 @@ def _handle_worker_accept(self, worker):
430430
broadcast_eps.difference_update({self.address})
431431
broadcast_eps = tuple(broadcast_eps)
432432

433+
chunk_keys, broadcast_ep_groups = [], []
433434
for chunk_key in self._chunks:
434-
self._chunk_meta_ref.set_chunk_broadcasts(
435-
self._session_id, chunk_key, broadcast_eps, _tell=True, _wait=False)
435+
chunk_keys.append(chunk_key)
436+
broadcast_ep_groups.append(broadcast_eps)
437+
438+
self._chunk_meta_ref.batch_set_chunk_broadcasts(
439+
self._session_id, chunk_keys, broadcast_ep_groups, _tell=True, _wait=False)
436440

437441
# submit job
438442
logger.debug('Start running operand %s on %s', self._op_key, worker)

mars/scheduler/tests/test_chunkmeta.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ def _mock_get_scheduler(key):
126126
key1 = str(uuid.uuid4())
127127
key2 = str(uuid.uuid4())
128128
key3 = str(uuid.uuid4())
129-
keys = [key1, key2, key3]
129+
key4 = str(uuid.uuid4())
130+
keys = [key1, key2, key3, key4]
130131
ref1.set_chunk_size(session1, key1, 512)
131132
ref2.set_chunk_size(session1, key2, 1024)
132133
ref2.set_chunk_size(session2, key3, 1024)
@@ -181,6 +182,11 @@ def _mock_get_scheduler(key):
181182
self.assertIsNone(ref1.batch_get_chunk_size(session1, [key1, key2])[1])
182183
self.assertIsNone(ref1.batch_get_workers(session1, [key1, key2])[1])
183184

185+
meta4 = WorkerMeta(chunk_size=512, chunk_shape=(10,) * 2, workers=(endpoints[0],))
186+
loc_ref2.batch_set_chunk_meta(session1, [key4], [meta4])
187+
self.assertEqual(loc_ref2.get_chunk_meta(session1, key4).chunk_size, 512)
188+
self.assertEqual(loc_ref2.get_chunk_meta(session1, key4).chunk_shape, (10,) * 2)
189+
184190
@unittest.skipIf(sys.platform == 'win32', 'Currently not support multiple pools under Windows')
185191
@patch_method(ChunkMetaActor.get_scheduler)
186192
def testChunkBroadcast(self, *_):
@@ -210,7 +216,8 @@ def _mock_get_scheduler(key):
210216

211217
key1 = str(uuid.uuid4())
212218
key2 = str(uuid.uuid4())
213-
keys = [key1, key2]
219+
key3 = str(uuid.uuid4())
220+
keys = [key1, key2, key3]
214221

215222
ref1.set_chunk_broadcasts(session_id, key1, [endpoints[1]])
216223
ref1.set_chunk_size(session_id, key1, 512)
@@ -229,6 +236,12 @@ def _mock_get_scheduler(key):
229236
self.assertEqual(local_ref2.get_chunk_meta(session_id, key1).chunk_shape, (10,) * 2)
230237
self.assertEqual(local_ref2.get_chunk_broadcasts(session_id, key2), [endpoints[0]])
231238

239+
ref1.batch_set_chunk_broadcasts(session_id, [key3], [[endpoints[1]]])
240+
meta3 = WorkerMeta(chunk_size=512, chunk_shape=(10,) * 2, workers=(endpoints[0],))
241+
local_ref1.batch_set_chunk_meta(session_id, [key3], [meta3])
242+
self.assertEqual(local_ref2.get_chunk_meta(session_id, key3).chunk_size, 512)
243+
self.assertEqual(local_ref2.get_chunk_meta(session_id, key3).chunk_shape, (10,) * 2)
244+
232245
ref1.delete_meta(session_id, key1)
233246
pool2.sleep(0.1)
234247

0 commit comments

Comments
 (0)