Skip to content
This repository was archived by the owner on Feb 8, 2023. It is now read-only.

Commit de560e8

Browse files
committed
FEAT: Collect subtask runtime info
1 parent a99b5a1 commit de560e8

File tree

15 files changed

+555
-55
lines changed

15 files changed

+555
-55
lines changed

mars/services/scheduling/worker/execution.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import operator
1919
import pprint
2020
import sys
21+
import time
2122
from collections import defaultdict
2223
from dataclasses import dataclass, field
2324
from typing import Dict, Optional
@@ -35,6 +36,7 @@
3536
from ...meta import MetaAPI
3637
from ...storage import StorageAPI
3738
from ...subtask import Subtask, SubtaskAPI, SubtaskResult, SubtaskStatus
39+
from ...task.task_info_collector import TaskInfoCollector
3840
from .workerslot import BandSlotManagerActor
3941
from .quota import QuotaActor
4042

@@ -353,6 +355,7 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str):
353355
)
354356
try:
355357
logger.debug("Preparing data for subtask %s", subtask.subtask_id)
358+
fetch_start = time.time()
356359
prepare_data_task = asyncio.create_task(
357360
_retry_run(
358361
subtask, subtask_info, self._prepare_input_data, subtask, band_name
@@ -361,6 +364,14 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str):
361364
await asyncio.wait_for(
362365
prepare_data_task, timeout=self._data_prepare_timeout
363366
)
367+
fetch_end = time.time()
368+
collect_task_info = subtask.extra_config and subtask.extra_config.get(
369+
"collect_task_info", False
370+
)
371+
task_info_collector = TaskInfoCollector(self.address, collect_task_info)
372+
await task_info_collector.collect_fetch_time(
373+
subtask, fetch_start, fetch_end
374+
)
364375

365376
input_sizes = await self._collect_input_sizes(
366377
subtask, subtask_info.supervisor_address, band_name

mars/services/session/supervisor/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ async def create_services(self):
190190
)
191191
if "task" in self._service_config["services"]:
192192
self._task_api = await TaskAPI.create(
193-
session_id=self._session_id, address=self.address
193+
session_id=self._session_id, supervisor_address=self.address
194194
)
195195

196196
async def get_last_idle_time(self):

mars/services/subtask/worker/manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ async def _create_band_runner_actors(self, band_name: str, n_slots: int):
4848
SubtaskRunnerActor,
4949
band,
5050
worker_address=self._worker_address,
51+
slot_id=slot_id,
5152
subtask_processor_cls=self._subtask_processor_cls,
5253
uid=SubtaskRunnerActor.gen_uid(band_name, slot_id),
5354
address=self.address,

mars/services/subtask/worker/processor.py

Lines changed: 62 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,13 @@
3232
from ....optimization.physical import optimize
3333
from ....serialization import AioSerializer
3434
from ....typing import BandType, ChunkType
35-
from ....utils import get_chunk_key_to_data_keys, calc_data_size
35+
from ....utils import get_chunk_key_to_data_keys, calc_data_size, Timer
3636
from ...context import ThreadedServiceContext
3737
from ...meta.api import MetaAPI, WorkerMetaAPI
3838
from ...session import SessionAPI
3939
from ...storage import StorageAPI
4040
from ...task import TaskAPI, task_options
41+
from ...task.task_info_collector import TaskInfoCollector
4142
from ..core import Subtask, SubtaskStatus, SubtaskResult
4243
from ..utils import iter_input_data_keys, iter_output_data, get_mapper_data_keys
4344

@@ -77,6 +78,7 @@ def __init__(
7778
meta_api: MetaAPI,
7879
worker_meta_api: WorkerMetaAPI,
7980
band: BandType,
81+
slot_id: int,
8082
supervisor_address: str,
8183
engines: List[str] = None,
8284
):
@@ -91,6 +93,7 @@ def __init__(
9193
]
9294
)
9395
self._band = band
96+
self._slot_id = slot_id
9497
self._supervisor_address = supervisor_address
9598
self._engines = engines if engines is not None else task_options.runtime_engines
9699

@@ -120,6 +123,10 @@ def __init__(
120123
self._storage_api = storage_api
121124
self._meta_api = meta_api
122125
self._worker_meta_api = worker_meta_api
126+
collect_task_info = subtask.extra_config and subtask.extra_config.get(
127+
"collect_task_info", False
128+
)
129+
self._task_info_collector = TaskInfoCollector(self._band[0], collect_task_info)
123130

124131
# add metrics
125132
self._subtask_execution_time = Metrics.gauge(
@@ -234,35 +241,37 @@ def cb(fut):
234241
to_wait.set_result(fut.result())
235242

236243
future.add_done_callback(cb)
237-
238-
try:
239-
await to_wait
240-
logger.debug(
241-
"Finish executing operand: %s, chunk: %s, subtask id: %s",
242-
chunk.op,
243-
chunk,
244-
self.subtask.subtask_id,
245-
)
246-
except asyncio.CancelledError:
247-
logger.debug(
248-
"Receive cancel instruction for operand: %s,"
249-
"chunk: %s, subtask id: %s",
250-
chunk.op,
251-
chunk,
252-
self.subtask.subtask_id,
253-
)
254-
# wait for this computation to finish
255-
await future
256-
# if cancelled, stop next computation
257-
logger.debug(
258-
"Cancelled operand: %s, chunk: %s, subtask id: %s",
259-
chunk.op,
260-
chunk,
261-
self.subtask.subtask_id,
262-
)
263-
self.result.status = SubtaskStatus.cancelled
264-
raise
265-
244+
with Timer() as timer:
245+
try:
246+
await to_wait
247+
logger.debug(
248+
"Finish executing operand: %s, chunk: %s, subtask id: %s",
249+
chunk.op,
250+
chunk,
251+
self.subtask.subtask_id,
252+
)
253+
except asyncio.CancelledError: # pragma: no cover
254+
logger.debug(
255+
"Receive cancel instruction for operand: %s,"
256+
"chunk: %s, subtask id: %s",
257+
chunk.op,
258+
chunk,
259+
self.subtask.subtask_id,
260+
)
261+
# wait for this computation to finish
262+
await future
263+
# if cancelled, stop next computation
264+
logger.debug(
265+
"Cancelled operand: %s, chunk: %s, subtask id: %s",
266+
chunk.op,
267+
chunk,
268+
self.subtask.subtask_id,
269+
)
270+
self.result.status = SubtaskStatus.cancelled
271+
raise
272+
await self._task_info_collector.collect_runtime_operand_info(
273+
self.subtask, timer.duration, chunk, self._processor_context
274+
)
266275
self.set_op_progress(chunk.op.key, 1.0)
267276

268277
for inp in chunk_graph.iter_predecessors(chunk):
@@ -541,6 +550,7 @@ async def done(self):
541550
self.is_done.set()
542551

543552
async def run(self):
553+
cost_times = defaultdict(dict)
544554
self.result.status = SubtaskStatus.running
545555
input_keys = None
546556
unpinned = False
@@ -561,29 +571,49 @@ async def run(self):
561571
}
562572

563573
# load inputs data
574+
cost_times["load_data_time"]["start_time"] = time.time()
564575
input_keys = await self._load_input_data()
576+
cost_times["load_data_time"]["end_time"] = time.time()
565577
try:
566578
# execute chunk graph
579+
cost_times["execute_time"]["start_time"] = time.time()
567580
await self._execute_graph(chunk_graph)
581+
cost_times["execute_time"]["end_time"] = time.time()
568582
finally:
569583
# unpin inputs data
570584
unpinned = True
585+
cost_times["unpin_time"]["start_time"] = time.time()
571586
await self._unpin_data(input_keys)
587+
cost_times["unpin_time"]["end_time"] = time.time()
572588
# store results data
589+
cost_times["store_result_time"]["start_time"] = time.time()
573590
(
574591
stored_keys,
575592
store_sizes,
576593
memory_sizes,
577594
data_key_to_object_id,
578595
) = await self._store_data(chunk_graph)
596+
cost_times["store_result_time"]["end_time"] = time.time()
579597
# store meta
598+
cost_times["store_meta_time"]["start_time"] = time.time()
580599
await self._store_meta(
581600
chunk_graph,
582601
store_sizes,
583602
memory_sizes,
584603
data_key_to_object_id,
585604
update_meta_chunks,
586605
)
606+
cost_times["store_meta_time"]["end_time"] = time.time()
607+
await self._task_info_collector.collect_runtime_subtask_info(
608+
self.subtask,
609+
self._band,
610+
self._slot_id,
611+
stored_keys,
612+
store_sizes,
613+
memory_sizes,
614+
cost_times,
615+
)
616+
587617
except asyncio.CancelledError:
588618
self.result.status = SubtaskStatus.cancelled
589619
self.result.progress = 1.0
@@ -708,7 +738,7 @@ async def _init_context(self, session_id: str) -> ThreadedServiceContext:
708738
await context.init()
709739
return context
710740

711-
async def run(self, subtask: Subtask):
741+
async def run(self, subtask: Subtask, slot_id: int):
712742
logger.info(
713743
"Start to run subtask: %r on %s. chunk graph contains %s",
714744
subtask,
@@ -728,6 +758,7 @@ async def run(self, subtask: Subtask):
728758
self._meta_api,
729759
self._worker_meta_api,
730760
self._band,
761+
slot_id,
731762
self._supervisor_address,
732763
)
733764
self._processor = self._last_processor = processor

mars/services/subtask/worker/runner.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,14 @@ def gen_uid(cls, band_name: str, slot_id: int):
4141
return f"slot_{band_name}_{slot_id}_subtask_runner"
4242

4343
def __init__(
44-
self, band: BandType, worker_address: str, subtask_processor_cls: Type = None
44+
self,
45+
band: BandType,
46+
worker_address: str,
47+
slot_id: int,
48+
subtask_processor_cls: Type = None,
4549
):
4650
self._band = band
51+
self._slot_id = slot_id
4752
self._worker_address = worker_address
4853
self._subtask_processor_cls = self._get_subtask_processor_cls(
4954
subtask_processor_cls
@@ -122,7 +127,7 @@ async def run_subtask(self, subtask: Subtask):
122127
processor = self._session_id_to_processors[session_id]
123128
try:
124129
self._running_processor = self._last_processor = processor
125-
result = yield self._running_processor.run(subtask)
130+
result = yield self._running_processor.run(subtask, self._slot_id)
126131
finally:
127132
self._running_processor = None
128133
raise mo.Return(result)

mars/services/task/api/core.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
from abc import ABC, abstractmethod
16-
from typing import List, Union
16+
from typing import List, Union, Dict
1717

1818
from ....core import Tileable
1919
from ..core import TileableGraph, TaskResult
@@ -145,3 +145,8 @@ async def get_last_idle_time(self) -> Union[float, None]:
145145
last_idle_time: float
146146
The last idle time if the task manager is idle else None.
147147
"""
148+
149+
async def save_task_info(self, task_info: Dict, path: str):
150+
"""
151+
Save task information using yaml format.
152+
"""

mars/services/task/api/oscar.py

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,46 +12,65 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import List, Union
15+
from typing import List, Union, Dict
1616

1717
from .... import oscar as mo
1818
from ....core import Tileable
1919
from ....lib.aio import alru_cache
2020
from ...subtask import SubtaskResult
2121
from ..core import TileableGraph, TaskResult, MapReduceInfo
2222
from ..supervisor.manager import TaskManagerActor
23+
from ..task_info_collector import TaskInfoCollectorActor
2324
from .core import AbstractTaskAPI
2425

2526

2627
class TaskAPI(AbstractTaskAPI):
2728
def __init__(
28-
self, session_id: str, task_manager_ref: mo.ActorRefType[TaskManagerActor]
29+
self,
30+
session_id: str,
31+
task_info_collector_ref: mo.ActorRefType[TaskInfoCollectorActor],
32+
task_manager_ref: mo.ActorRefType[TaskManagerActor],
2933
):
3034
self._session_id = session_id
35+
self._task_info_collector_ref = task_info_collector_ref
3136
self._task_manager_ref = task_manager_ref
3237

3338
@classmethod
3439
@alru_cache(cache_exceptions=False)
35-
async def create(cls, session_id: str, address: str) -> "TaskAPI":
40+
async def create(
41+
cls, session_id: str, supervisor_address: str = None, local_address: str = None
42+
) -> "TaskAPI":
3643
"""
3744
Create Task API.
3845
3946
Parameters
4047
----------
4148
session_id : str
42-
Session ID
43-
address : str
49+
Session ID.
50+
supervisor_address : str
4451
Supervisor address.
52+
local_address : str
53+
Local address.
4554
4655
Returns
4756
-------
4857
task_api
4958
Task API.
5059
"""
51-
task_manager_ref = await mo.actor_ref(
52-
address, TaskManagerActor.gen_uid(session_id)
53-
)
54-
return TaskAPI(session_id, task_manager_ref)
60+
try:
61+
task_manager_ref = await mo.actor_ref(
62+
supervisor_address, TaskManagerActor.gen_uid(session_id)
63+
)
64+
except (mo.ActorNotExist, ValueError):
65+
task_manager_ref = None
66+
try:
67+
task_info_collector_ref = await mo.actor_ref(
68+
local_address, TaskInfoCollectorActor.default_uid()
69+
)
70+
except (mo.ActorNotExist, ValueError):
71+
task_info_collector_ref = None
72+
73+
return TaskAPI(session_id, task_info_collector_ref, task_manager_ref)
5574

5675
async def get_task_results(self, progress: bool = False) -> List[TaskResult]:
5776
return await self._task_manager_ref.get_task_results(progress)
@@ -110,3 +129,6 @@ async def get_map_reduce_info(
110129
self, task_id: str, map_reduce_id: int
111130
) -> MapReduceInfo:
112131
return await self._task_manager_ref.get_map_reduce_info(task_id, map_reduce_id)
132+
133+
async def save_task_info(self, task_info: Dict, path: str):
134+
await self._task_info_collector_ref.save_task_info(task_info, path)

mars/services/task/supervisor/graph_visualizer.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
1514
import itertools
1615
from io import StringIO
1716
from typing import Dict, List

0 commit comments

Comments
 (0)