Skip to content

Support functionalities to enhance task traceability with metadata for dependency search. #450

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
79a2881
WIP: End to implement the logic to gather the required task output path.
Mar 4, 2025
0cfe7ee
WIP: success to add output path in nest mode, but some other case sho…
Mar 4, 2025
3eee422
WIP: no ci apply.
Mar 5, 2025
ec3bf4f
feat: fix to pass labels and has_seen_keys.
Mar 5, 2025
22a69d0
feat: fix conflicts
Mar 5, 2025
08e3f59
CI: apply ruff and mypy
Mar 5, 2025
9b19a1c
feat: add implementation of nest mode.
Mar 5, 2025
accbf1d
feat: deal with kitagry comments.
Mar 6, 2025
6719f4d
feat: Remove CLI dependencies.
Mar 6, 2025
0bcc16c
feat: remove redundant statements.
Mar 6, 2025
5c41035
feat: change serialization expression for single FlattenableItems[Req…
Mar 6, 2025
0b951ab
CI: fix test and apply CI.
Mar 6, 2025
10795a2
feat: fix mypy error.
Mar 6, 2025
32b4343
feat: refactoring make _list_flatten inner function.
Mar 6, 2025
6f70a41
feat: fix nits miss and add __ prefix to avoid conflicts.
Mar 6, 2025
637f5da
feat: rename _list_flatten
Mar 7, 2025
b607926
Merge: fix conflicts.
TlexCypher Mar 15, 2025
a8059a1
Merge: fix conflicts.
TlexCypher Mar 15, 2025
27b1abd
feat: convert map object to list, any iterable objects that would be …
TlexCypher Apr 17, 2025
5ac1c4d
Merge remote-tracking branch 'origin/master' into feat/nestmode
TlexCypher Apr 17, 2025
f4479da
Merge remote-tracking branch 'origin/feat/nestmode' into feat/nestmode
TlexCypher Apr 17, 2025
e71833b
feat: add new line to end of param.ini
TlexCypher Apr 22, 2025
46aabcf
feat: remove redundant expressions
TlexCypher Apr 22, 2025
7bde3b0
Merge branch 'master' into feat/nestmode
hirosassa Apr 24, 2025
4c44cea
feat: use yiled to make memory efficient and use functools.reduce to …
TlexCypher Apr 28, 2025
dd6a629
Merge remote-tracking branch 'origin/feat/nestmode' into feat/nestmode
TlexCypher Apr 28, 2025
d884c79
Merge branch 'master' into feat/nestmode
hirosassa Apr 28, 2025
f1418f8
feat: fix type of normalized_labeles_list
TlexCypher Apr 28, 2025
6a1c4c2
Merge remote-tracking branch 'origin/feat/nestmode' into feat/nestmode
TlexCypher Apr 28, 2025
0b06455
chore: change custom_labels type
kitagry Apr 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions gokart/gcs_obj_metadata_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from googleapiclient.model import makepatch

from gokart.gcs_config import GCSConfig
from gokart.utils import FlattenableItems

logger = getLogger(__name__)

Expand All @@ -32,7 +33,12 @@ def _path_to_bucket_and_key(path: str) -> tuple[str, str]:
return netloc, path_without_initial_slash

@staticmethod
def add_task_state_labels(path: str, task_params: dict[str, str] | None = None, custom_labels: dict[str, Any] | None = None) -> None:
def add_task_state_labels(
path: str,
task_params: dict[str, str] | None = None,
custom_labels: dict[str, Any] | None = None,
required_task_outputs: FlattenableItems[str] | None = None,
) -> None:
if GCSObjectMetadataClient._is_log_related_path(path):
return
# In gokart/object_storage.get_time_stamp, could find same call.
Expand All @@ -42,20 +48,17 @@ def add_task_state_labels(path: str, task_params: dict[str, str] | None = None,
if _response is None:
logger.error(f'failed to get object from GCS bucket {bucket} and object {obj}.')
return

response: dict[str, Any] = dict(_response)
original_metadata: dict[Any, Any] = {}
if 'metadata' in response.keys():
_metadata = response.get('metadata')
if _metadata is not None:
original_metadata = dict(_metadata)

patched_metadata = GCSObjectMetadataClient._get_patched_obj_metadata(
copy.deepcopy(original_metadata),
task_params,
custom_labels,
)

if original_metadata != patched_metadata:
# If we use update api, existing object metadata are removed, so should use patch api.
# See the official document descriptions.
Expand All @@ -71,7 +74,6 @@ def add_task_state_labels(path: str, task_params: dict[str, str] | None = None,
)
.execute()
)

if update_response is None:
logger.error(f'failed to patch object {obj} in bucket {bucket} and object {obj}.')

Expand All @@ -90,7 +92,6 @@ def _get_patched_obj_metadata(
if not isinstance(metadata, dict):
logger.warning(f'metadata is not a dict: {metadata}, something wrong was happened when getting response when get bucket and object information.')
return metadata

if not task_params and not custom_labels:
return metadata
# Maximum size of metadata for each object is 8 KiB.
Expand Down Expand Up @@ -132,10 +133,8 @@ def _get_label_size(label_name: str, label_value: str) -> int:
8 * 1024,
sum(_get_label_size(label_name, label_value) for label_name, label_value in labels.items()),
)

if current_total_metadata_size <= max_gcs_metadata_size:
return labels

for label_name, label_value in reversed(labels.items()):
size = _get_label_size(label_name, label_value)
del labels[label_name]
Expand Down
9 changes: 8 additions & 1 deletion gokart/in_memory/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from gokart.in_memory.repository import InMemoryCacheRepository
from gokart.target import TargetOnKart, TaskLockParams
from gokart.utils import FlattenableItems

_repository = InMemoryCacheRepository()

Expand All @@ -26,7 +27,13 @@ def _get_task_lock_params(self) -> TaskLockParams:
def _load(self) -> Any:
return _repository.get_value(self._data_key)

def _dump(self, obj: Any, task_params: dict[str, str] | None = None, custom_labels: dict[str, Any] | None = None) -> None:
def _dump(
self,
obj: Any,
task_params: dict[str, str] | None = None,
custom_labels: dict[str, Any] | None = None,
required_task_outputs: FlattenableItems[str] | None = None,
) -> None:
return _repository.set_value(self._data_key, obj)

def _remove(self) -> None:
Expand Down
49 changes: 41 additions & 8 deletions gokart/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from gokart.file_processor import FileProcessor, make_file_processor
from gokart.gcs_obj_metadata_client import GCSObjectMetadataClient
from gokart.object_storage import ObjectStorage
from gokart.utils import FlattenableItems
from gokart.zip_client_util import make_zip_client

logger = getLogger(__name__)
Expand All @@ -30,13 +31,23 @@ def exists(self) -> bool:
def load(self) -> Any:
return wrap_load_with_lock(func=self._load, task_lock_params=self._get_task_lock_params())()

def dump(self, obj, lock_at_dump: bool = True, task_params: dict[str, str] | None = None, custom_labels: dict[str, Any] | None = None) -> None:
def dump(
self,
obj,
lock_at_dump: bool = True,
task_params: dict[str, str] | None = None,
custom_labels: dict[str, Any] | None = None,
required_task_outputs: FlattenableItems | None = None,
) -> None:
if lock_at_dump:
wrap_dump_with_lock(func=self._dump, task_lock_params=self._get_task_lock_params(), exist_check=self.exists)(
obj=obj, task_params=task_params, custom_labels=custom_labels
obj=obj,
task_params=task_params,
custom_labels=custom_labels,
required_task_outputs=required_task_outputs,
)
else:
self._dump(obj=obj, task_params=task_params, custom_labels=custom_labels)
self._dump(obj=obj, task_params=task_params, custom_labels=custom_labels, required_task_outputs=required_task_outputs)

def remove(self) -> None:
if self.exists():
Expand All @@ -61,7 +72,13 @@ def _load(self) -> Any:
pass

@abstractmethod
def _dump(self, obj, task_params: dict[str, str] | None = None, custom_labels: dict[str, Any] | None = None) -> None:
def _dump(
self,
obj,
task_params: Optional[dict[str, str]] = None,
custom_labels: dict[str, Any] | None = None,
required_task_outputs: FlattenableItems[str] | None = None,
) -> None:
pass

@abstractmethod
Expand Down Expand Up @@ -98,11 +115,19 @@ def _load(self) -> Any:
with self._target.open('r') as f:
return self._processor.load(f)

def _dump(self, obj, task_params: dict[str, str] | None = None, custom_labels: dict[str, Any] | None = None) -> None:
def _dump(
self,
obj,
task_params: dict[str, str] | None = None,
custom_labels: dict[str, Any] | None = None,
required_task_outputs: FlattenableItems[str] | None = None,
) -> None:
with self._target.open('w') as f:
self._processor.dump(obj, f)
if self.path().startswith('gs://'):
GCSObjectMetadataClient.add_task_state_labels(path=self.path(), task_params=task_params, custom_labels=custom_labels)
GCSObjectMetadataClient.add_task_state_labels(
path=self.path(), task_params=task_params, custom_labels=custom_labels, required_task_outputs=required_task_outputs
)

def _remove(self) -> None:
self._target.remove()
Expand Down Expand Up @@ -142,10 +167,18 @@ def _load(self) -> Any:
self._remove_temporary_directory()
return model

def _dump(self, obj, task_params: dict[str, str] | None = None, custom_labels: dict[str, Any] | None = None) -> None:
def _dump(
self,
obj,
task_params: dict[str, str] | None = None,
custom_labels: dict[str, Any] | None = None,
required_task_outputs: FlattenableItems[str] | None = None,
) -> None:
self._make_temporary_directory()
self._save_function(obj, self._model_path())
make_target(self._load_function_path()).dump(self._load_function, task_params=task_params)
make_target(self._load_function_path()).dump(
self._load_function, task_params=task_params, custom_labels=custom_labels, required_task_outputs=required_task_outputs
)
self._zip_client.make_archive()
self._remove_temporary_directory()

Expand Down
6 changes: 6 additions & 0 deletions gokart/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from logging import getLogger
from typing import Any, Callable, Dict, Generator, Generic, Iterable, List, Optional, Set, TypeVar, Union, overload

from gokart.utils import map_flattenable_items

if sys.version_info < (3, 13):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this part is not needed?

from typing_extensions import deprecated
else:
Expand Down Expand Up @@ -367,6 +369,10 @@ def dump(self, obj: Any, target: Union[None, str, TargetOnKart] = None, custom_l
lock_at_dump=self._lock_at_dump,
task_params=super().to_str_params(only_significant=True, only_public=True),
custom_labels=custom_labels,
required_task_outputs=map_flattenable_items(
self.requires(),
func=lambda task: map_flattenable_items(task.output(), func=lambda output: output.path()),
),
)

@staticmethod
Expand Down
15 changes: 14 additions & 1 deletion gokart/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import sys
from io import BytesIO
from typing import Any, Iterable, Protocol, TypeVar, Union
from typing import Any, Callable, Iterable, Protocol, TypeVar, Union

import dill
import luigi
Expand Down Expand Up @@ -71,6 +71,19 @@ def flatten(targets: FlattenableItems[T]) -> list[T]:
return flat


K = TypeVar('K')


def map_flattenable_items(items: FlattenableItems[T], func: Callable[[T], K]) -> FlattenableItems[K]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can use both Generics and isinstance at the same time, code would be below.

def map_flattenable_items(items: FlattenableItems[T], func: Callable[[T], K]) -> FlattenableItems[K]:
    if isinstance(items, dict):
        return {k: map_flattenable_items(v, func) for k, v in items.items()}
    if isinstance(str):
        return items
    if isinstance(items, Iterable[T]):
        return [map_flattenable_items(i, func) for i in items]
    return func(items)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.python.org/3.13/library/functions.html#map

python original map define map(function, iterable), so you must suit python's manner.

Suggested change
def map_flattenable_items(items: FlattenableItems[T], func: Callable[[T], K]) -> FlattenableItems[K]:
def map_flattenable_items(func: Callable[[T], K], items: FlattenableItems[T]) -> FlattenableItems[K]:

if isinstance(items, dict):
return {k: map_flattenable_items(v, func) for k, v in items.items()}
if isinstance(items, str):
return items # type: ignore
Copy link
Member

@kitagry kitagry Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, T means `str, so you should apply func for this.

Suggested change
if isinstance(items, str):
return items # type: ignore
if isinstance(items, str):
return func(items) # type: ignore

if isinstance(items, Iterable):
return [map_flattenable_items(i, func) for i in items]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When pass tuple[T], it should returns tuple[K]. But, in this implementation, this case is not cared for.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, could you add testcase?

return func(items)


def load_dill_with_pandas_backward_compatibility(file: Union[FileLike, BytesIO]) -> Any:
"""Load binary dumped by dill with pandas backward compatibility.
pd.read_pickle can load binary dumped in backward pandas version, and also any objects dumped by pickle.
Expand Down
5 changes: 4 additions & 1 deletion test/test_gcs_obj_metadata_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ def test_mock_target_on_kart(self, mock_get_output_target):

task = _DummyTaskOnKart()
task.dump({'key': 'value'}, mock_target)
mock_target.dump.assert_called_once_with({'key': 'value'}, lock_at_dump=task._lock_at_dump, task_params={}, custom_labels=None)

mock_target.dump.assert_called_once_with(
{'key': 'value'}, lock_at_dump=task._lock_at_dump, task_params={}, custom_labels=None, required_task_outputs=[]
)


if __name__ == '__main__':
Expand Down
11 changes: 10 additions & 1 deletion test/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import unittest

from gokart.utils import flatten
from gokart.utils import flatten, map_flattenable_items


class TestFlatten(unittest.TestCase):
Expand All @@ -18,3 +18,12 @@ def test_flatten_int(self):

def test_flatten_none(self):
self.assertEqual(flatten(None), [])


class TestMapFlatten(unittest.TestCase):
def test_map_flattenable_items(self):
self.assertEqual(map_flattenable_items({'a': 1, 'b': 2}, func=lambda x: str(x)), {'a': '1', 'b': '2'})
self.assertEqual(
map_flattenable_items({'a': [1, 2, 3, '4'], 'b': {'c': True, 'd': {'e': 5}}}, func=lambda x: str(x)),
{'a': ['1', '2', '3', '4'], 'b': {'c': 'True', 'd': {'e': '5'}}},
)