This repository was archived by the owner on Feb 8, 2023. It is now read-only.
forked from mars-project/mars
-
Notifications
You must be signed in to change notification settings - Fork 4
This repository was archived by the owner on Feb 8, 2023. It is now read-only.
[ENH] multi-index data accessing #120
Copy link
Copy link
Open
Description
Is your feature request related to a problem? Please describe
>>> import mars.dataframe as md
>>> df = md.DataFrame({"foo": [1, 1, 3], "bar": [4, 5, 6]})
>>> gb = df.groupby("foo").agg({"bar": ["mean", "sum"]}).execute()
>>> gb.bar
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
File ~/Documents/repo/mars/mars/dataframe/core.py:2257, in DataFrame.__getattr__(self, key)
2256 try:
-> 2257 return getattr(self._data, key)
2258 except AttributeError:
AttributeError: 'DataFrameData' object has no attribute 'bar'
During handling of the above exception, another exception occurred:
KeyError Traceback (most recent call last)
Cell In [16], line 1
----> 1 gb.bar
File ~/Documents/repo/mars/mars/dataframe/core.py:2260, in DataFrame.__getattr__(self, key)
2258 except AttributeError:
2259 if key in self.dtypes:
-> 2260 return self[key]
2261 else:
2262 raise
File ~/Documents/repo/mars/mars/dataframe/indexing/getitem.py:617, in dataframe_getitem(df, item)
615 else:
616 if item not in columns_set:
--> 617 raise KeyError(f"{item} not in columns {columns_set}")
618 op = DataFrameIndex(col_names=item)
619 return op(df)
KeyError: "bar not in columns {('bar', 'mean'), ('bar', 'sum')}"
>>> gb[('bar', 'mean')].execute()
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100.00/100 [00:00<00:00, 38757.20it/s]
---------------------------------------------------------------------------
IndexError Traceback (most recent call last)
Cell In [17], line 1
----> 1 gb[('bar', 'mean')].execute()
File ~/Documents/repo/mars/mars/core/entity/tileables.py:404, in Tileable.execute(self, session, **kw)
403 def execute(self, session=None, **kw):
--> 404 result = self.data.execute(session=session, **kw)
405 if isinstance(result, TILEABLE_TYPE):
406 return self
File ~/Documents/repo/mars/mars/core/entity/executable.py:148, in _ExecutableMixin.execute(self, session, **kw)
145 from ...deploy.oscar.session import execute
147 session = _get_session(self, session)
--> 148 return execute(self, session=session, **kw)
File ~/Documents/repo/mars/mars/deploy/oscar/session.py:1875, in execute(tileable, session, wait, new_session_kwargs, show_progress, progress_update_interval, *tileables, **kwargs)
1873 session = get_default_or_create(**(new_session_kwargs or dict()))
1874 session = _ensure_sync(session)
-> 1875 return session.execute(
1876 tileable,
1877 *tileables,
1878 wait=wait,
1879 show_progress=show_progress,
1880 progress_update_interval=progress_update_interval,
1881 **kwargs,
1882 )
File ~/Documents/repo/mars/mars/deploy/oscar/session.py:1669, in SyncSession.execute(self, tileable, show_progress, warn_duplicated_execution, *tileables, **kwargs)
1667 fut = asyncio.run_coroutine_threadsafe(coro, self._loop)
1668 try:
-> 1669 execution_info: ExecutionInfo = fut.result(
1670 timeout=self._isolated_session.timeout
1671 )
1672 except KeyboardInterrupt: # pragma: no cover
1673 logger.warning("Cancelling running task")
File ~/Documents/miniconda3/envs/mars-dev/lib/python3.10/concurrent/futures/_base.py:446, in Future.result(self, timeout)
444 raise CancelledError()
445 elif self._state == FINISHED:
--> 446 return self.__get_result()
447 else:
448 raise TimeoutError()
File ~/Documents/miniconda3/envs/mars-dev/lib/python3.10/concurrent/futures/_base.py:391, in Future.__get_result(self)
389 if self._exception:
390 try:
--> 391 raise self._exception
392 finally:
393 # Break a reference cycle with the exception in self._exception
394 self = None
File ~/Documents/repo/mars/mars/deploy/oscar/session.py:1855, in _execute(session, wait, show_progress, progress_update_interval, cancelled, *tileables, **kwargs)
1852 else:
1853 # set cancelled to avoid wait task leak
1854 cancelled.set()
-> 1855 await execution_info
1856 else:
1857 return execution_info
File ~/Documents/repo/mars/mars/deploy/oscar/session.py:106, in ExecutionInfo._ensure_future.<locals>.wait()
105 async def wait():
--> 106 return await self._aio_task
File ~/Documents/repo/mars/mars/deploy/oscar/session.py:954, in _IsolatedSession._run_in_background(self, tileables, task_id, progress, profiling)
948 logger.warning(
949 "Profile task %s execution result:\n%s",
950 task_id,
951 json.dumps(task_result.profiling, indent=4),
952 )
953 if task_result.error:
--> 954 raise task_result.error.with_traceback(task_result.traceback)
955 if cancelled:
956 return
File ~/Documents/repo/mars/mars/services/task/supervisor/processor.py:372, in TaskProcessor.run(self)
367 self._tileable_id_to_tileable = await asyncio.to_thread(
368 self._get_tileable_id_to_tileable, self._preprocessor.tileable_graph
369 )
371 async with self._executor:
--> 372 async for stage_args in self._iter_stage_chunk_graph():
373 await self._process_stage_chunk_graph(*stage_args)
374 except Exception as ex:
File ~/Documents/repo/mars/mars/services/task/supervisor/processor.py:159, in TaskProcessor._iter_stage_chunk_graph(self)
157 with Timer() as stage_timer:
158 with Timer() as timer:
--> 159 chunk_graph = await self._get_next_chunk_graph(chunk_graph_iter)
160 if chunk_graph is None:
161 # tile finished
162 self._preprocessor.done = True
File ~/Documents/repo/mars/mars/services/task/supervisor/processor.py:150, in TaskProcessor._get_next_chunk_graph(chunk_graph_iter)
147 return
149 fut = asyncio.to_thread(next_chunk_graph)
--> 150 chunk_graph = await fut
151 return chunk_graph
File ~/Documents/miniconda3/envs/mars-dev/lib/python3.10/asyncio/threads.py:25, in to_thread(func, *args, **kwargs)
23 ctx = contextvars.copy_context()
24 func_call = functools.partial(ctx.run, func, *args, **kwargs)
---> 25 return await loop.run_in_executor(None, func_call)
File ~/Documents/miniconda3/envs/mars-dev/lib/python3.10/concurrent/futures/thread.py:58, in _WorkItem.run(self)
55 return
57 try:
---> 58 result = self.fn(*self.args, **self.kwargs)
59 except BaseException as exc:
60 self.future.set_exception(exc)
File ~/Documents/repo/mars/mars/services/task/supervisor/processor.py:145, in TaskProcessor._get_next_chunk_graph.<locals>.next_chunk_graph()
143 def next_chunk_graph():
144 try:
--> 145 return next(chunk_graph_iter)
146 except StopIteration:
147 return
File ~/Documents/repo/mars/mars/services/task/supervisor/preprocessor.py:200, in TaskPreprocessor.tile(self, tileable_graph)
198 if hasattr(t.op, "logic_key") and t.op.logic_key is None:
199 t.op.logic_key = t.op.get_logic_key()
--> 200 for chunk_graph in chunk_graph_builder.build():
201 if len(chunk_graph) == 0:
202 continue
File ~/Documents/repo/mars/mars/core/graph/builder/chunk.py:440, in ChunkGraphBuilder.build(self)
439 def build(self) -> Generator[Union[TileableGraph, ChunkGraph], None, None]:
--> 440 yield from self._build()
File ~/Documents/repo/mars/mars/core/graph/builder/chunk.py:434, in ChunkGraphBuilder._build(self)
432 try:
433 with enter_mode(build=True, kernel=True):
--> 434 graph = next(tile_iterator)
435 yield graph
436 except StopIteration:
File ~/Documents/repo/mars/mars/services/task/supervisor/preprocessor.py:74, in CancellableTiler._iter_without_check(self)
72 def _iter_without_check(self):
73 while self._tileable_handlers:
---> 74 to_update_tileables = self._iter()
75 if not self.cancelled:
76 yield self._cur_chunk_graph
File ~/Documents/repo/mars/mars/core/graph/builder/chunk.py:317, in Tiler._iter(self)
315 # tile
316 for tile_handler in self._gen_tileable_handlers(next_tileable_handlers):
--> 317 self._tile(
318 chunk_graph,
319 tile_handler.tileable,
320 tile_handler.handler,
321 next_tileable_handlers,
322 to_update_tileables,
323 visited,
324 )
325 self._tileable_handlers = next_tileable_handlers
326 # gen result chunks
File ~/Documents/repo/mars/mars/core/graph/builder/chunk.py:211, in Tiler._tile(self, chunk_graph, tileable, tile_handler, next_tileable_handlers, to_update_tileables, visited)
201 def _tile(
202 self,
203 chunk_graph: ChunkGraph,
(...)
208 visited: Set[EntityType],
209 ):
210 try:
--> 211 need_process = next(tile_handler)
213 if isinstance(need_process, TileStatus):
214 # process tile that returns progress
215 self._tile_context.set_progress(tileable, need_process.progress)
File ~/Documents/repo/mars/mars/core/graph/builder/chunk.py:183, in Tiler._tile_handler(self, tileable)
181 tiled_tileables = [self._get_data(t) for t in tiled_tileables]
182 # start to tile
--> 183 tiled_tileables = yield from handler.tile(tiled_tileables)
184 return tiled_tileables
File ~/Documents/repo/mars/mars/core/entity/tileables.py:79, in OperandTilesHandler.tile(cls, tileables)
73 tile_handler = cls.get_handler(op)
74 if inspect.isgeneratorfunction(tile_handler):
75 # op.tile can be a generator function,
76 # each time an operand yield some chunks,
77 # they will be put into ChunkGraph and executed first.
78 # After execution, resume from the yield place.
---> 79 tiled_result = yield from tile_handler(op)
80 else:
81 # without iterative tiling
82 tiled_result = tile_handler(op)
File ~/Documents/repo/mars/mars/dataframe/indexing/getitem.py:355, in DataFrameIndex.tile(cls, op)
352 @classmethod
353 def tile(cls, op):
354 if op.col_names is not None:
--> 355 return cls.tile_with_columns(op)
356 else:
357 return (yield from cls.tile_with_mask(op))
File ~/Documents/repo/mars/mars/dataframe/indexing/getitem.py:466, in DataFrameIndex.tile_with_columns(cls, op)
463 chunk_meta_lazy = is_chunk_meta_lazy(in_df.chunks[0])
464 if out_df.ndim < 2:
465 # Series
--> 466 column_index = calc_columns_index(col_names, in_df)[0]
467 out_chunks = []
468 dtype = in_df.dtypes[col_names]
IndexError: list index out of rangeMetadata
Metadata
Assignees
Labels
No labels