diff --git a/components/data_processing/automl/tabular_data_loader/README.md b/components/data_processing/automl/tabular_data_loader/README.md index efc0e1372..a469df598 100644 --- a/components/data_processing/automl/tabular_data_loader/README.md +++ b/components/data_processing/automl/tabular_data_loader/README.md @@ -224,8 +224,8 @@ load_task = automl_data_loader( In the tabular training pipeline, this component writes ``component_status.json`` under the ``component_status`` output artifact. The file includes ``component_id`` (``automl_data_loader``), -``started_at``, ``completed_at``, a ``stages`` list (ids such as ``validate_inputs``, -``read_and_sample``, ``cleanse``, ``split``, ``write_outputs``), and optional ``metadata``. +``started_at``, ``completed_at``, a ``stages`` list (ids such as ``prepare_data``, +``split_and_export``), and optional ``metadata``. Match stage ids to the tabular pipeline entry in ``component_stage_map.json`` from the ``publish-component-stage-map`` task. diff --git a/components/data_processing/automl/tabular_data_loader/component.py b/components/data_processing/automl/tabular_data_loader/component.py index b4ab5dadf..185fe8d22 100644 --- a/components/data_processing/automl/tabular_data_loader/component.py +++ b/components/data_processing/automl/tabular_data_loader/component.py @@ -132,10 +132,9 @@ def automl_data_loader( # noqa: D417 # Initialize status tracker status = ComponentStatusTracker(component_status.path, "automl_data_loader") with status: - # Stage: validate_inputs - status.record("validate_inputs", "started") - # Validation happens inline below - status.record("validate_inputs", "completed") + status.set_metadata(display_name="Data Loader Status") + component_status.metadata["display_name"] = "Data Loader Status" + status.record("prepare_data", "started") if sampling_method is None: if task_type in ("binary", "multiclass"): @@ -324,10 +323,9 @@ def load_data_in_batches( return _sample_random(text_stream, PANDAS_CHUNK_SIZE, max_size_bytes) return _sample_first_n_rows(text_stream, PANDAS_CHUNK_SIZE, max_size_bytes) - # Stage: read_and_sample status.record( - "read_and_sample", - "started", + "prepare_data", + "running", sampling_method=sampling_method, source=f"s3://{bucket_name}/{file_key}", ) @@ -347,11 +345,6 @@ def load_data_in_batches( f"Available columns: {list(sampled_dataframe.columns)}" ) - status.record("read_and_sample", "completed", rows=len(sampled_dataframe)) - - # Stage: cleanse - status.record("cleanse", "started") - sampled_dataframe.replace([math.inf, -math.inf], float("nan"), inplace=True) n_before_dedup = len(sampled_dataframe) @@ -400,15 +393,14 @@ def load_data_in_batches( sampling_method, ) status.record( - "cleanse", + "prepare_data", "completed", rows=n_samples, duplicates_dropped=n_dup_dropped, labels_dropped=n_dropped, ) - # Stage: split - status.record("split", "started") + status.record("split_and_export", "started") # --- Train/test split --- from pathlib import Path @@ -465,19 +457,13 @@ def load_data_in_batches( X_y_test.to_csv(sampled_test_dataset.path, index=False) status.record( - "split", + "split_and_export", "completed", test_size=test_size, selection_train_size=selection_train_size, stratify=stratify_effective, ) - # Stage: write_outputs - status.record("write_outputs", "started") - status.record("write_outputs", "completed") - - component_status.metadata["display_name"] = "Data Loader Status" - # Sample row for downstream use (JSON string to avoid NaN issues) sample_row = X_y_test.head(1).to_json(orient="records") diff --git a/components/data_processing/automl/tabular_data_loader/tests/test_component_unit.py b/components/data_processing/automl/tabular_data_loader/tests/test_component_unit.py index 1c3013d04..8cf2a3732 100644 --- a/components/data_processing/automl/tabular_data_loader/tests/test_component_unit.py +++ b/components/data_processing/automl/tabular_data_loader/tests/test_component_unit.py @@ -212,8 +212,8 @@ def test_writes_component_status_json(self, tmp_path, monkeypatch): payload = json.loads(status_path.read_text()) assert payload["component_id"] == "automl_data_loader" stage_ids = [stage["id"] for stage in payload["stages"]] - assert "read_and_sample" in stage_ids - assert "split" in stage_ids + assert "prepare_data" in stage_ids + assert "split_and_export" in stage_ids @mock.patch.dict("os.environ", mocked_env_variables) def test_sets_component_status_display_name(self, tmp_path): diff --git a/components/data_processing/automl/timeseries_data_loader/README.md b/components/data_processing/automl/timeseries_data_loader/README.md index 0d53d47f6..2911788a9 100644 --- a/components/data_processing/automl/timeseries_data_loader/README.md +++ b/components/data_processing/automl/timeseries_data_loader/README.md @@ -104,6 +104,6 @@ def example_pipeline( In the time series training pipeline, this component writes ``component_status.json`` under the ``component_status`` output artifact. The file includes ``component_id`` (``timeseries_data_loader``), -timestamps, and per-stage status (e.g. ``validate_inputs``, ``read_and_sample``, ``split``, -``write_outputs``). Dashboards align stage ids with ``component_stage_map.json`` from +timestamps, and per-stage status (e.g. ``prepare_data``, ``split_and_export``). +Dashboards align stage ids with ``component_stage_map.json`` from ``publish-component-stage-map``. diff --git a/components/data_processing/automl/timeseries_data_loader/component.py b/components/data_processing/automl/timeseries_data_loader/component.py index 1579bf315..a26504dd0 100644 --- a/components/data_processing/automl/timeseries_data_loader/component.py +++ b/components/data_processing/automl/timeseries_data_loader/component.py @@ -97,8 +97,9 @@ def timeseries_data_loader( status = ComponentStatusTracker(component_status.path, "timeseries_data_loader") with status: - status.record("validate_inputs", "started") - status.record("validate_inputs", "completed") + status.set_metadata(display_name="Timeseries Data Loader Status") + component_status.metadata["display_name"] = "Timeseries Data Loader Status" + status.record("prepare_data", "started") def get_s3_client(verify=True): """Create and return an S3 client using credentials from environment variables.""" @@ -290,8 +291,8 @@ def _clean_timeseries_dataframe(data, id_col, ts_col, log): return out.reset_index(drop=True) status.record( - "read_and_sample", - "started", + "prepare_data", + "running", source=f"s3://{bucket_name}/{file_key}", ) df = load_timeseries_data_truncate(bucket_name, file_key, MAX_SIZE_BYTES, PANDAS_CHUNK_SIZE) @@ -309,9 +310,6 @@ def _clean_timeseries_dataframe(data, id_col, ts_col, log): f"with columns {sorted(required_columns)}." ) - status.record("read_and_sample", "completed", rows=len(df)) - status.record("cleanse", "started") - df = _clean_timeseries_dataframe(df, id_column, timestamp_column, logger) n_valid = len(df) @@ -322,8 +320,8 @@ def _clean_timeseries_dataframe(data, id_col, ts_col, log): "Provide a larger dataset or fix invalid timestamps, null ids, and duplicate keys." ) - status.record("cleanse", "completed", rows=n_valid) - status.record("split", "started") + status.record("prepare_data", "completed", rows=n_valid) + status.record("split_and_export", "started") # Create workspace datasets directory datasets_dir = Path(workspace_path) / "datasets" @@ -391,13 +389,6 @@ def _concat_sorted(parts: list, sort_by: list) -> pd.DataFrame: "each series has enough train rows for the selection segment." ) - status.record( - "split", - "completed", - test_size=test_size, - selection_train_size=selection_train_size, - ) - # Save test dataset to artifact test_df.to_csv(sampled_test_dataset.path, index=False) @@ -407,6 +398,13 @@ def _concat_sorted(parts: list, sort_by: list) -> pd.DataFrame: selection_train_df.to_csv(selection_path, index=False) extra_train_df.to_csv(extra_path, index=False) + status.record( + "split_and_export", + "completed", + test_size=test_size, + selection_train_size=selection_train_size, + ) + logger.info( "Timeseries loader: %s rows from s3://%s/%s; split selection=%s extra=%s test=%s", len(df), @@ -425,10 +423,6 @@ def _concat_sorted(parts: list, sort_by: list) -> pd.DataFrame: "selection_train_size": selection_train_size, } - status.record("write_outputs", "started") - status.record("write_outputs", "completed") - component_status.metadata["display_name"] = "Timeseries Data Loader Status" - # Sample rows for downstream use (ISO timestamps when supported; JSON string to avoid NaN issues) sample_tail = test_df.tail(min(5, len(test_df))) if hasattr(sample_tail, "to_dict"): diff --git a/components/data_processing/autorag/documents_discovery/README.md b/components/data_processing/autorag/documents_discovery/README.md index 8603debd6..b5a55667c 100644 --- a/components/data_processing/autorag/documents_discovery/README.md +++ b/components/data_processing/autorag/documents_discovery/README.md @@ -13,12 +13,12 @@ Lists available documents from S3, performs sampling if applied and writes a JSO | Parameter | Type | Default | Description | | --------- | ---- | ------- | ----------- | | `input_data_bucket_name` | `str` | `None` | S3 (or compatible) bucket containing input data. | +| `component_status` | `dsl.Output[dsl.Artifact]` | `None` | Output artifact containing stage-level progress tracking. | | `input_data_path` | `str` | `""` | Path to folder with input documents within the bucket. | | `test_data` | `dsl.Input[dsl.Artifact]` | `None` | Optional input artifact containing test data for sampling. | | `sampling_enabled` | `bool` | `True` | Whether to enable sampling or not. | | `sampling_max_size` | `float` | `1` | Maximum size of sampled documents (in gigabytes). | | `discovered_documents` | `dsl.Output[dsl.Artifact]` | `None` | Output artifact containing the documents descriptor JSON file. | -| `component_status` | `dsl.Output[dsl.Artifact]` | `None` | Output artifact containing stage-level progress tracking. | | `embedded_artifact` | `dsl.EmbeddedInput[dsl.Dataset]` | `None` | Embedded ``autorag.shared`` helpers injected by KFP at runtime. | ## Usage Examples ๐Ÿงช diff --git a/components/data_processing/autorag/documents_discovery/component.py b/components/data_processing/autorag/documents_discovery/component.py index fb11730cb..b7f0d3ff3 100644 --- a/components/data_processing/autorag/documents_discovery/component.py +++ b/components/data_processing/autorag/documents_discovery/component.py @@ -13,12 +13,12 @@ ) def documents_discovery( input_data_bucket_name: str, + component_status: dsl.Output[dsl.Artifact], input_data_path: str = "", test_data: dsl.Input[dsl.Artifact] = None, sampling_enabled: bool = True, sampling_max_size: float = 1, discovered_documents: dsl.Output[dsl.Artifact] = None, - component_status: dsl.Output[dsl.Artifact] = None, embedded_artifact: dsl.EmbeddedInput[dsl.Dataset] = None, ): """Documents discovery component. @@ -86,7 +86,9 @@ def get_test_data_docs_names() -> list[str]: _spec.loader.exec_module(_status_module) status = _status_module.bootstrap_status_tracker(embedded_artifact, component_status, "documents_discovery") with status: - with status.stage("validate_inputs"): + status.set_metadata(display_name="Documents Discovery Status") + component_status.metadata["display_name"] = "Documents Discovery Status" + with status.stage("discover_documents"): s3_creds = {k: os.environ.get(k) for k in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_S3_ENDPOINT"]} for k, v in s3_creds.items(): if v is None: @@ -105,7 +107,6 @@ def _make_s3_client(verify=True): verify=verify, ) - with status.stage("list_and_sample"): # Use paginator to handle buckets with >1,000 objects def _list_all_objects(s3_client): """List all objects under prefix using pagination.""" @@ -186,7 +187,6 @@ def _list_all_objects(s3_client): f"enabled_max={sampling_max_size}GB" if sampling_enabled else "disabled", ) - with status.stage("write_descriptor"): os.makedirs(discovered_documents.path, exist_ok=True) descriptor_path = os.path.join(discovered_documents.path, DOCUMENTS_DESCRIPTOR_FILENAME) with open(descriptor_path, "w") as f: diff --git a/components/data_processing/autorag/test_data_loader/README.md b/components/data_processing/autorag/test_data_loader/README.md index d0385380f..1a77dba20 100644 --- a/components/data_processing/autorag/test_data_loader/README.md +++ b/components/data_processing/autorag/test_data_loader/README.md @@ -14,9 +14,9 @@ The component reads S3-compatible credentials from environment variables (inject | --------- | ---- | ------- | ----------- | | `test_data_bucket_name` | `str` | `None` | S3 (or compatible) bucket that contains the test data file. | | `test_data_path` | `str` | `None` | S3 object key to the JSON test data file. | +| `component_status` | `dsl.Output[dsl.Artifact]` | `None` | Output artifact containing stage-level progress tracking. | | `benchmark_sample_size` | `int` | `25` | Maximum number of records to keep from the test data. When the dataset exceeds this limit, a reproducible random sample is drawn (seed 42). Set to 0 to disable sampling and keep all records. | | `test_data` | `dsl.Output[dsl.Artifact]` | `None` | Output artifact that receives the (possibly sampled) file. | -| `component_status` | `dsl.Output[dsl.Artifact]` | `None` | Output artifact containing stage-level progress tracking. | | `embedded_artifact` | `dsl.EmbeddedInput[dsl.Dataset]` | `None` | Embedded ``autorag.shared`` helpers injected by KFP at runtime. | ## Usage Examples ๐Ÿงช diff --git a/components/data_processing/autorag/test_data_loader/component.py b/components/data_processing/autorag/test_data_loader/component.py index 8d8840e1f..8441ba23e 100644 --- a/components/data_processing/autorag/test_data_loader/component.py +++ b/components/data_processing/autorag/test_data_loader/component.py @@ -15,9 +15,9 @@ def test_data_loader( test_data_bucket_name: str, test_data_path: str, + component_status: dsl.Output[dsl.Artifact], benchmark_sample_size: int = 25, test_data: dsl.Output[dsl.Artifact] = None, - component_status: dsl.Output[dsl.Artifact] = None, embedded_artifact: dsl.EmbeddedInput[dsl.Dataset] = None, ): """Download test data JSON from S3 and sample it for benchmarking. @@ -78,7 +78,9 @@ class TestDataLoaderException(Exception): _spec.loader.exec_module(_status_module) status = _status_module.bootstrap_status_tracker(embedded_artifact, component_status, "test_data_loader") with status: - with status.stage("validate_inputs"): + status.set_metadata(display_name="Test Data Loader Status") + component_status.metadata["display_name"] = "Test Data Loader Status" + with status.stage("load_benchmark"): if not test_data_bucket_name: raise TypeError("test_data_bucket_name must be a non-empty string") @@ -93,17 +95,16 @@ class TestDataLoaderException(Exception): s3_creds["AWS_DEFAULT_REGION"] = os.environ.get("AWS_DEFAULT_REGION") - def _make_s3_client(verify=True): - return boto3.client( - "s3", - endpoint_url=s3_creds["AWS_S3_ENDPOINT"], - region_name=s3_creds["AWS_DEFAULT_REGION"], - aws_access_key_id=s3_creds["AWS_ACCESS_KEY_ID"], - aws_secret_access_key=s3_creds["AWS_SECRET_ACCESS_KEY"], - verify=verify, - ) - - with status.stage("download_and_sample"): + def _make_s3_client(verify=True): + return boto3.client( + "s3", + endpoint_url=s3_creds["AWS_S3_ENDPOINT"], + region_name=s3_creds["AWS_DEFAULT_REGION"], + aws_access_key_id=s3_creds["AWS_ACCESS_KEY_ID"], + aws_secret_access_key=s3_creds["AWS_SECRET_ACCESS_KEY"], + verify=verify, + ) + s3_client = _make_s3_client() logger.info("Fetching test data from S3: bucket='%s', path='%s'.", test_data_bucket_name, test_data_path) @@ -149,7 +150,6 @@ def _make_s3_client(verify=True): f"Make sure that each test data records contains following keys: {benchmark_record_keys}." ) - with status.stage("write_output"): if 0 < benchmark_sample_size < len(benchmark_data) and isinstance(benchmark_data, list): import random diff --git a/components/data_processing/autorag/text_extraction/component.py b/components/data_processing/autorag/text_extraction/component.py index 493aa57f7..d1e6f4b18 100644 --- a/components/data_processing/autorag/text_extraction/component.py +++ b/components/data_processing/autorag/text_extraction/component.py @@ -15,7 +15,7 @@ def text_extraction( documents_descriptor: dsl.Input[dsl.Artifact], extracted_text: dsl.Output[dsl.Artifact], - component_status: dsl.Output[dsl.Artifact] = None, + component_status: dsl.Output[dsl.Artifact], embedded_artifact: dsl.EmbeddedInput[dsl.Dataset] = None, error_tolerance: Optional[float] = None, max_extraction_workers: Optional[int] = None, @@ -357,9 +357,11 @@ def raise_if_threshold_exceeded(error_details: list, total_docs: int, tolerance: _spec.loader.exec_module(_status_module) status = _status_module.bootstrap_status_tracker(embedded_artifact, component_status, "text_extraction") with status: + status.set_metadata(display_name="Text Extraction Status") + component_status.metadata["display_name"] = "Text Extraction Status" descriptor_path = Path(documents_descriptor.path) / DOCUMENTS_DESCRIPTOR_FILENAME - with status.stage("load_descriptor"): + with status.stage("extract_documents"): if not descriptor_path.exists(): raise FileNotFoundError(f"documents_descriptor.json not found at {descriptor_path}") @@ -390,7 +392,6 @@ def raise_if_threshold_exceeded(error_details: list, total_docs: int, tolerance: logger.info("No documents to process.") return - with status.stage("extract_documents"): documents = sorted(documents, key=lambda d: d.get("size_bytes", 0), reverse=True) if max_extraction_workers is not None: diff --git a/components/training/automl/autogluon_leaderboard_evaluation/component.py b/components/training/automl/autogluon_leaderboard_evaluation/component.py index 4c6247e92..8fe014e7a 100644 --- a/components/training/automl/autogluon_leaderboard_evaluation/component.py +++ b/components/training/automl/autogluon_leaderboard_evaluation/component.py @@ -75,6 +75,8 @@ def evaluation_pipeline(models_artifact): # Initialize status tracker status = ComponentStatusTracker(component_status.path, "leaderboard_evaluation") with status: + status.set_metadata(display_name="Leaderboard Evaluation Status") + component_status.metadata["display_name"] = "Leaderboard Evaluation Status" logger = logging.getLogger(__name__) # Stage: build_leaderboard @@ -152,7 +154,6 @@ def evaluation_pipeline(models_artifact): best_model=best_model_name, model_count=len(leaderboard_df), ) - component_status.metadata["display_name"] = "Leaderboard Evaluation Status" return NamedTuple("outputs", best_model=str)(best_model=best_model_name) diff --git a/components/training/automl/autogluon_models_training/README.md b/components/training/automl/autogluon_models_training/README.md index c3e46b626..eefe344be 100644 --- a/components/training/automl/autogluon_models_training/README.md +++ b/components/training/automl/autogluon_models_training/README.md @@ -66,7 +66,7 @@ mutates predictor state. All artifacts are written under a single output artifac Writes ``component_status.json`` under the ``component_status`` output artifact with ``component_id`` ``autogluon_models_training`` and stages such as ``load_data``, ``model_selection`` (optional ``steps`` -when completed), ``refit_full``, and ``evaluate_models``. Artifact metadata display name: +when completed), ``refit_and_evaluate``. Artifact metadata display name: **Models Training Status**. ## Usage Examples ๐Ÿ’ก diff --git a/components/training/automl/autogluon_models_training/component.py b/components/training/automl/autogluon_models_training/component.py index 9ef867787..efdc73394 100644 --- a/components/training/automl/autogluon_models_training/component.py +++ b/components/training/automl/autogluon_models_training/component.py @@ -162,6 +162,8 @@ def _coerce_positive_class(value: Optional[str]) -> str | int | None: # Initialize status tracker status = ComponentStatusTracker(component_status.path, "autogluon_models_training") with status: + status.set_metadata(display_name="Models Training Status") + component_status.metadata["display_name"] = "Models Training Status" # Stage: load_data status.record("load_data", "started") @@ -254,7 +256,7 @@ def _coerce_positive_class(value: Optional[str]) -> str | int | None: "completed", top_n=top_n, selected_models=top_models, - steps=["feature_engineering", "model_training", "stacking", "model_evaluation"], + steps=["feature_engineering", "model_training", "stacking", "evaluation"], ) model_config = { @@ -299,9 +301,8 @@ def retrieve_pipeline_name(name: str) -> str: predictor_clone = predictor.clone(path=work_path, return_clone=True, dirs_exist_ok=True) # Refit all top models in a single call: AutoGluon resolves stacking dependencies internally. - status.record("refit_full", "started") + status.record("refit_and_evaluate", "started") predictor_clone.refit_full(model=top_models, train_data_extra=extra_train_df) - status.record("refit_full", "completed", model_count=len(model_names_full)) def replace_placeholder_in_notebook(notebook, replacements): for cell in notebook.get("cells", []): @@ -657,8 +658,12 @@ def _process_model(model_name_full: str) -> tuple[str, dict]: "models": models_metadata, } - status.record("evaluate_models", "completed", eval_metric=str(predictor.eval_metric)) - component_status.metadata["display_name"] = "Models Training Status" + status.record( + "refit_and_evaluate", + "completed", + model_count=len(model_names_full), + eval_metric=str(predictor.eval_metric), + ) return NamedTuple("outputs", eval_metric=str)(eval_metric=eval_metric) diff --git a/components/training/automl/autogluon_timeseries_models_training/README.md b/components/training/automl/autogluon_timeseries_models_training/README.md index 407ee4f8a..402bcf368 100644 --- a/components/training/automl/autogluon_timeseries_models_training/README.md +++ b/components/training/automl/autogluon_timeseries_models_training/README.md @@ -125,7 +125,10 @@ def example_pipeline( ### Component status artifact -Writes ``component_status.json`` under ``component_status`` with ``component_id`` ``autogluon_timeseries_models_training`` and training stages (``load_data``, ``model_selection``, ``refit_full``, ``evaluate_models``). Artifact metadata display name: **Timeseries Models Training Status**. +Writes ``component_status.json`` under ``component_status`` with ``component_id`` +``autogluon_timeseries_models_training`` and training stages (``load_data``, ``model_selection`` with +steps ``feature_engineering``, ``model_training``, ``stacking``, ``evaluation``, ``refit_and_evaluate``). +Artifact metadata display name: **Timeseries Models Training Status**. Inference notebooks are loaded from ``shared/notebook_templates/timeseries_notebook.ipynb`` at runtime (same shared package data as tabular training). diff --git a/components/training/automl/autogluon_timeseries_models_training/component.py b/components/training/automl/autogluon_timeseries_models_training/component.py index 2802c35ec..07197684c 100644 --- a/components/training/automl/autogluon_timeseries_models_training/component.py +++ b/components/training/automl/autogluon_timeseries_models_training/component.py @@ -93,6 +93,8 @@ def autogluon_timeseries_models_training( status = ComponentStatusTracker(component_status.path, "autogluon_timeseries_models_training") with status: + status.set_metadata(display_name="Timeseries Models Training Status") + component_status.metadata["display_name"] = "Timeseries Models Training Status" TOP_N_MAX = 7 VALID_PRESETS = {"speed", "balanced"} PRESET_AG_NAMES = {"speed": "fast_training", "balanced": "medium_quality"} @@ -227,7 +229,7 @@ def autogluon_timeseries_models_training( "completed", top_n=top_n, selected_models=top_models, - steps=["model_training", "holdout_evaluation"], + steps=["feature_engineering", "model_training", "stacking", "evaluation"], ) logger.info( "Timeseries selection done: top_%s=%s best_score_test=%s", @@ -255,7 +257,6 @@ def autogluon_timeseries_models_training( "Skipping combined full-refit stage; missing models_artifact or extra_train_data_path. " "Returning selection-only outputs for backward compatibility." ) - component_status.metadata["display_name"] = "Timeseries Models Training Status" outputs = NamedTuple( "outputs", top_models=List[str], @@ -299,7 +300,7 @@ def retrieve_pipeline_name(name: str) -> str: models_metadata = [] failed_models = [] - status.record("refit_full", "started") + status.record("refit_and_evaluate", "started") def replace_placeholder_in_notebook(notebook, replacements): for cell in notebook.get("cells", []): @@ -449,9 +450,12 @@ def replace_placeholder_in_notebook(notebook, replacements): if not model_names_full: raise RuntimeError("All models failed refit. No artifacts written.") - status.record("refit_full", "completed", model_count=len(model_names_full)) - status.record("evaluate_models", "completed", eval_metric=eval_metric) - component_status.metadata["display_name"] = "Timeseries Models Training Status" + status.record( + "refit_and_evaluate", + "completed", + model_count=len(model_names_full), + eval_metric=eval_metric, + ) models_artifact.metadata["model_names"] = json.dumps(model_names_full) models_artifact.metadata["context"] = { diff --git a/components/training/automl/shared/component_status.py b/components/training/automl/shared/component_status.py index 7e7cb5e93..825b68b18 100644 --- a/components/training/automl/shared/component_status.py +++ b/components/training/automl/shared/component_status.py @@ -14,6 +14,7 @@ from __future__ import annotations +import base64 import json import logging from collections.abc import Iterator @@ -32,6 +33,27 @@ STATUS_FAILED = "failed" +class ComponentStatusEncoder(json.JSONEncoder): + """JSON encoder for component status metadata (Path, bytes, set, datetime).""" + + def default(self, obj: Any) -> Any: + """Convert non-serializable objects to JSON-compatible types.""" + if isinstance(obj, datetime): + return obj.isoformat().replace("+00:00", "Z") if obj.tzinfo is not None else obj.isoformat() + if isinstance(obj, Path): + return str(obj) + if isinstance(obj, bytes): + return base64.b64encode(obj).decode("ascii") + if isinstance(obj, set): + return sorted(obj, key=str) + return super().default(obj) + + +def utc_now_z() -> str: + """Return current UTC time as an ISO-8601 string with ``Z`` suffix.""" + return datetime.now(UTC).isoformat().replace("+00:00", "Z") + + class ComponentStatusTracker: """Track stage-level progress within a single component. @@ -43,20 +65,15 @@ def __init__(self, artifact_path: str, component_id: str) -> None: """Initialize the status tracker. Args: - artifact_path: Path to the KFP artifact directory where status.json will be written. + artifact_path: Path to the KFP artifact directory where component_status.json will be written. component_id: Unique component identifier (e.g., "autogluon_models_training"). """ self.artifact_path = Path(artifact_path) self.component_id = component_id self.stages: list[dict[str, Any]] = [] - self.started_at = self._utc_now_iso() + self.started_at = utc_now_z() self.metadata: dict[str, Any] = {} - @staticmethod - def _utc_now_iso() -> str: - """Return current UTC timestamp in ISO format.""" - return datetime.now(UTC).isoformat().replace("+00:00", "Z") - def record(self, stage_id: str, status: str, **metadata: Any) -> None: """Record or update a stage's status. @@ -73,21 +90,18 @@ def record(self, stage_id: str, status: str, **metadata: Any) -> None: steps=["feature_eng", "training", "stacking"], models_trained=15) """ - # Find existing stage or create new one existing_idx = next((i for i, s in enumerate(self.stages) if s["id"] == stage_id), None) stage_data = { "id": stage_id, "status": status, - "timestamp": self._utc_now_iso(), + "timestamp": utc_now_z(), **metadata, } if existing_idx is not None: - # Update existing stage, preserving previously recorded metadata self.stages[existing_idx].update(stage_data) else: - # Append new stage self.stages.append(stage_data) logger.info( @@ -120,14 +134,14 @@ def save(self) -> None: data = { "component_id": self.component_id, "started_at": self.started_at, - "completed_at": self._utc_now_iso(), + "completed_at": utc_now_z(), "stages": self.stages, "metadata": self.metadata, } output_file = self.artifact_path / COMPONENT_STATUS_FILENAME with output_file.open("w", encoding="utf-8") as f: - json.dump(data, f, indent=2) + json.dump(data, f, indent=2, cls=ComponentStatusEncoder) logger.info( "COMPONENT_STATUS component=%s saved status with %d stages to %s", @@ -146,19 +160,24 @@ def save_best_effort(self) -> None: self.component_id, ) - def mark_active_failed(self, error: str) -> None: + def mark_active_failed(self, error: str | BaseException) -> None: """Mark the in-progress stage as failed, or the last open stage if none is active.""" + if isinstance(error, BaseException): + error_msg = f"{type(error).__name__}: {error}" if str(error) else type(error).__name__ + else: + error_msg = error + active_statuses = (STATUS_STARTED, STATUS_RUNNING) for stage in reversed(self.stages): if stage.get("status") in active_statuses: - self.record(stage["id"], STATUS_FAILED, error=error) + self.record(stage["id"], STATUS_FAILED, error=error_msg) return if self.stages and self.stages[-1].get("status") != STATUS_COMPLETED: - self.record(self.stages[-1]["id"], STATUS_FAILED, error=error) + self.record(self.stages[-1]["id"], STATUS_FAILED, error=error_msg) return - self.set_metadata(status=STATUS_FAILED, error=error) + self.set_metadata(status=STATUS_FAILED, error=error_msg) @contextmanager def stage(self, stage_id: str, **start_metadata: Any) -> Iterator[None]: @@ -166,11 +185,14 @@ def stage(self, stage_id: str, **start_metadata: Any) -> Iterator[None]: self.record(stage_id, STATUS_STARTED, **start_metadata) try: yield - except Exception as exc: - self.record(stage_id, STATUS_FAILED, error=str(exc)) + except BaseException as exc: + error_msg = f"{type(exc).__name__}: {exc}" if str(exc) else type(exc).__name__ + self.record(stage_id, STATUS_FAILED, error=error_msg) raise else: - self.record(stage_id, STATUS_COMPLETED) + latest = next((s for s in reversed(self.stages) if s["id"] == stage_id), None) + if latest is None or latest.get("status") not in (STATUS_COMPLETED, STATUS_FAILED): + self.record(stage_id, STATUS_COMPLETED) def __enter__(self) -> ComponentStatusTracker: """Enter context: return this tracker.""" @@ -179,7 +201,7 @@ def __enter__(self) -> ComponentStatusTracker: def __exit__(self, exc_type: type[BaseException] | None, exc: BaseException | None, tb: Any) -> bool: """On exit, mark active stage failed and save status best-effort.""" if exc is not None: - self.mark_active_failed(str(exc)) + self.mark_active_failed(exc) self.save_best_effort() return False @@ -194,7 +216,7 @@ def load_component_status(artifact_path: str) -> dict[str, Any]: Returns: Dict containing component_id, started_at, completed_at, stages, and metadata. - Returns empty dict if file doesn't exist. + Returns empty dict if file doesn't exist or is unreadable. Example: status = load_component_status("/path/to/artifact") @@ -204,5 +226,9 @@ def load_component_status(artifact_path: str) -> dict[str, Any]: if not status_file.exists(): return {} - with status_file.open("r", encoding="utf-8") as f: - return json.load(f) + try: + with status_file.open("r", encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, OSError) as e: + logger.warning("Failed to load status from %s: %s", status_file, e) + return {} diff --git a/components/training/automl/shared/run_status_templates/pipelines/autogluon-tabular-training-pipeline.json b/components/training/automl/shared/run_status_templates/pipelines/autogluon-tabular-training-pipeline.json index 22410a12a..ce5e8e90a 100644 --- a/components/training/automl/shared/run_status_templates/pipelines/autogluon-tabular-training-pipeline.json +++ b/components/training/automl/shared/run_status_templates/pipelines/autogluon-tabular-training-pipeline.json @@ -7,24 +7,12 @@ "description": "Load tabular data, sample, cleanse, split, and write train/validation outputs.", "stages": [ { - "id": "validate_inputs", - "description": "Validate pipeline inputs, split config, and sampling parameters." + "id": "prepare_data", + "description": "Validate inputs, read source data, apply sampling, and cleanse the dataset." }, { - "id": "read_and_sample", - "description": "Read source data and apply sampling strategy." - }, - { - "id": "cleanse", - "description": "Apply cleansing rules to the sampled dataset." - }, - { - "id": "split", - "description": "Split data into selection train/validation sets." - }, - { - "id": "write_outputs", - "description": "Write train/validation CSVs and metadata to the workspace." + "id": "split_and_export", + "description": "Split into selection train/validation sets and write output CSVs." } ] }, @@ -43,16 +31,12 @@ "feature_engineering", "model_training", "stacking", - "model_evaluation" + "evaluation" ] }, { - "id": "refit_full", - "description": "Refit the best models on combined train+validation data." - }, - { - "id": "evaluate_models", - "description": "Evaluate refit models and write metrics artifacts." + "id": "refit_and_evaluate", + "description": "Refit best models on combined data and write evaluation metrics." } ] }, diff --git a/components/training/automl/shared/run_status_templates/pipelines/autogluon-timeseries-training-pipeline.json b/components/training/automl/shared/run_status_templates/pipelines/autogluon-timeseries-training-pipeline.json index 234160576..6723cb21f 100644 --- a/components/training/automl/shared/run_status_templates/pipelines/autogluon-timeseries-training-pipeline.json +++ b/components/training/automl/shared/run_status_templates/pipelines/autogluon-timeseries-training-pipeline.json @@ -7,24 +7,12 @@ "description": "Load time series data from S3, cleanse, and write per-series train/test splits.", "stages": [ { - "id": "validate_inputs", - "description": "Validate pipeline inputs and split parameters." + "id": "prepare_data", + "description": "Validate inputs, read panel data from S3, parse timestamps, and deduplicate series keys." }, { - "id": "read_and_sample", - "description": "Read source data from S3 (truncated to size limit)." - }, - { - "id": "cleanse", - "description": "Parse timestamps, validate ids, and deduplicate panel keys." - }, - { - "id": "split", - "description": "Apply per-series temporal train/test and selection/extra splits." - }, - { - "id": "write_outputs", - "description": "Write train/validation CSVs and test artifact." + "id": "split_and_export", + "description": "Apply per-series temporal splits and write train/validation/test CSVs." } ] }, @@ -39,15 +27,16 @@ { "id": "model_selection", "description": "Run TimeSeriesPredictor fit on selection train and rank on test.", - "steps": ["model_training", "holdout_evaluation"] - }, - { - "id": "refit_full", - "description": "Refit each top model on selection plus extra train data." + "steps": [ + "feature_engineering", + "model_training", + "stacking", + "evaluation" + ] }, { - "id": "evaluate_models", - "description": "Evaluate refit models; write metrics, back_testing.json, and notebooks." + "id": "refit_and_evaluate", + "description": "Refit top models, evaluate on holdout, and write metrics, back_testing.json, and notebooks." } ] }, diff --git a/components/training/automl/shared/tests/test_component_status.py b/components/training/automl/shared/tests/test_component_status.py index ba027cd21..53911c393 100644 --- a/components/training/automl/shared/tests/test_component_status.py +++ b/components/training/automl/shared/tests/test_component_status.py @@ -8,8 +8,10 @@ import pytest from kfp_components.components.training.automl.shared.component_status import ( COMPONENT_STATUS_FILENAME, + ComponentStatusEncoder, ComponentStatusTracker, load_component_status, + utc_now_z, ) @@ -19,8 +21,8 @@ class TestComponentStatusTracker: def test_record_and_save(self, tmp_path: Path) -> None: """save() writes stages and component_id to component_status.json.""" tracker = ComponentStatusTracker(str(tmp_path), "automl_data_loader") - tracker.record("validate_inputs", "started", rows=5) - tracker.record("validate_inputs", "completed") + tracker.record("prepare_data", "started", rows=5) + tracker.record("prepare_data", "completed") tracker.save() data = json.loads((tmp_path / COMPONENT_STATUS_FILENAME).read_text(encoding="utf-8")) @@ -56,7 +58,7 @@ def test_context_manager_marks_failed_and_saves(self, tmp_path: Path) -> None: """Context manager marks active stage failed and persists status on exception.""" with pytest.raises(RuntimeError, match="boom"): with ComponentStatusTracker(str(tmp_path), "automl_data_loader") as status: - status.record("read_and_sample", "started") + status.record("prepare_data", "started") raise RuntimeError("boom") data = load_component_status(str(tmp_path)) @@ -66,29 +68,78 @@ def test_context_manager_marks_failed_and_saves(self, tmp_path: Path) -> None: def test_context_manager_saves_on_success(self, tmp_path: Path) -> None: """Context manager persists status when the block completes normally.""" with ComponentStatusTracker(str(tmp_path), "automl_data_loader") as status: - status.record("write_outputs", "completed") + status.record("split_and_export", "completed") assert (tmp_path / COMPONENT_STATUS_FILENAME).exists() def test_stage_context_manager_records_completed(self, tmp_path: Path) -> None: """stage() records started then completed when no exception is raised.""" tracker = ComponentStatusTracker(str(tmp_path), "automl_data_loader") - with tracker.stage("split"): + with tracker.stage("split_and_export"): pass tracker.save() data = json.loads((tmp_path / COMPONENT_STATUS_FILENAME).read_text(encoding="utf-8")) - assert data["stages"][-1]["id"] == "split" + assert data["stages"][-1]["id"] == "split_and_export" assert data["stages"][-1]["status"] == "completed" def test_stage_context_manager_records_failed(self, tmp_path: Path) -> None: """stage() records failed when an exception escapes the block.""" tracker = ComponentStatusTracker(str(tmp_path), "automl_data_loader") with pytest.raises(ValueError, match="bad split"): - with tracker.stage("split"): + with tracker.stage("split_and_export"): raise ValueError("bad split") tracker.save() data = json.loads((tmp_path / COMPONENT_STATUS_FILENAME).read_text(encoding="utf-8")) assert data["stages"][-1]["status"] == "failed" assert "bad split" in data["stages"][-1]["error"] + + def test_stage_skips_auto_complete_when_completed_inside_block(self, tmp_path: Path) -> None: + """stage() does not overwrite a completed record written inside the block.""" + tracker = ComponentStatusTracker(str(tmp_path), "autogluon_models_training") + with tracker.stage("model_selection", steps=["feature_engineering"]): + tracker.record("model_selection", "completed", top_n=3) + tracker.save() + + data = load_component_status(str(tmp_path)) + model_stage = next(stage for stage in data["stages"] if stage["id"] == "model_selection") + assert model_stage["status"] == "completed" + assert model_stage["top_n"] == 3 + + def test_utc_now_z_ends_with_z(self) -> None: + """Timestamps use UTC ISO-8601 with Z suffix.""" + assert utc_now_z().endswith("Z") + + +class TestComponentStatusEncoder: + """Tests for JSON encoding of status metadata values.""" + + def test_encodes_datetime_path_bytes_and_set(self) -> None: + """Known non-JSON types are converted for serialization.""" + from datetime import UTC, datetime + + encoder = ComponentStatusEncoder() + assert encoder.default(Path("/tmp/out")) == "/tmp/out" + assert encoder.default(b"abc") == "YWJj" + assert encoder.default({1, 2}) == [1, 2] + encoded = encoder.default(datetime(2026, 6, 10, 12, 0, 0, tzinfo=UTC)) + assert encoded.endswith("Z") + + def test_unknown_type_raises_type_error(self) -> None: + """Unsupported metadata types fail fast instead of being stringified.""" + encoder = ComponentStatusEncoder() + with pytest.raises(TypeError): + encoder.default(object()) + + +class TestLoadComponentStatus: + """Tests for load_component_status edge cases.""" + + def test_corrupt_json_returns_empty_dict(self, tmp_path: Path, caplog: pytest.LogCaptureFixture) -> None: + """Corrupt status files return {} and log a warning.""" + status_file = tmp_path / COMPONENT_STATUS_FILENAME + status_file.write_text("{not json", encoding="utf-8") + with caplog.at_level("WARNING"): + assert load_component_status(str(tmp_path)) == {} + assert "Failed to load status" in caplog.text diff --git a/components/training/automl/shared/tests/test_run_status.py b/components/training/automl/shared/tests/test_run_status.py index dbbcbfda7..76cbc8729 100644 --- a/components/training/automl/shared/tests/test_run_status.py +++ b/components/training/automl/shared/tests/test_run_status.py @@ -78,6 +78,27 @@ def test_timeseries_pipeline_manifest_covers_all_components(): ] +def test_timeseries_model_selection_steps_match_tabular(): + """Timeseries and tabular training share the same model_selection sub-steps.""" + tabular = load_component_stage_catalog(COMPONENT_MODELS_TRAINING, pipeline_id=PIPELINE_TABULAR_TRAINING) + timeseries = load_component_stage_catalog( + COMPONENT_TIMESERIES_MODELS_TRAINING, + pipeline_id=PIPELINE_TIMESERIES_TRAINING, + ) + tabular_steps = next(s for s in tabular["stages"] if s["id"] == "model_selection")["steps"] + timeseries_steps = next(s for s in timeseries["stages"] if s["id"] == "model_selection")["steps"] + assert ( + timeseries_steps + == tabular_steps + == [ + "feature_engineering", + "model_training", + "stacking", + "evaluation", + ] + ) + + def test_init_seeds_full_pipeline_as_pending(tmp_path): """Test that init seeds full pipeline as pending.""" ws = str(tmp_path) @@ -97,11 +118,8 @@ def test_init_seeds_full_pipeline_as_pending(tmp_path): assert _component_by_id(doc, COMPONENT_MODELS_TRAINING)["state"] == STATUS_PENDING loader_stages = {s["id"]: s["status"] for s in _component_by_id(doc, COMPONENT_DATA_LOADER)["stages"]} assert loader_stages == { - "validate_inputs": STATUS_PENDING, - "read_and_sample": STATUS_PENDING, - "cleanse": STATUS_PENDING, - "split": STATUS_PENDING, - "write_outputs": STATUS_PENDING, + "prepare_data": STATUS_PENDING, + "split_and_export": STATUS_PENDING, } @@ -118,10 +136,10 @@ def test_init_copies_catalog_metadata_from_manifest(tmp_path): loader = _component_by_id(doc, COMPONENT_DATA_LOADER) assert "order" not in loader assert "description" in loader - validate_inputs = next(s for s in loader["stages"] if s["id"] == "validate_inputs") - assert validate_inputs["status"] == STATUS_PENDING - assert "description" in validate_inputs - assert "steps" not in validate_inputs + prepare_data = next(s for s in loader["stages"] if s["id"] == "prepare_data") + assert prepare_data["status"] == STATUS_PENDING + assert "description" in prepare_data + assert "steps" not in prepare_data training = _component_by_id(doc, COMPONENT_MODELS_TRAINING) model_selection = next(s for s in training["stages"] if s["id"] == "model_selection") @@ -129,7 +147,7 @@ def test_init_copies_catalog_metadata_from_manifest(tmp_path): "feature_engineering", "model_training", "stacking", - "model_evaluation", + "evaluation", ] @@ -142,7 +160,7 @@ def test_ensure_pipeline_plan_preserves_progress(tmp_path): pipeline_name="p1", run_status_pipeline_id=PIPELINE_TABULAR_TRAINING, ) - record_stage(ws, COMPONENT_DATA_LOADER, "validate_inputs", STATUS_COMPLETED) + record_stage(ws, COMPONENT_DATA_LOADER, "prepare_data", STATUS_COMPLETED) ensure_pipeline_plan(ws) doc = load_run_status(ws) assert _component_by_id(doc, COMPONENT_MODELS_TRAINING)["state"] == STATUS_PENDING @@ -172,15 +190,13 @@ def test_record_stage_autofills_steps_from_manifest_on_completed(tmp_path): "feature_engineering", "model_training", "stacking", - "model_evaluation", + "evaluation", ] assert model_selection["top_n"] == 2 assert "description" in model_selection - record_stage(ws, COMPONENT_DATA_LOADER, "validate_inputs", STATUS_COMPLETED) + record_stage(ws, COMPONENT_DATA_LOADER, "prepare_data", STATUS_COMPLETED) loader_stage = next( - s - for s in _component_by_id(load_run_status(ws), COMPONENT_DATA_LOADER)["stages"] - if s["id"] == "validate_inputs" + s for s in _component_by_id(load_run_status(ws), COMPONENT_DATA_LOADER)["stages"] if s["id"] == "prepare_data" ) assert "steps" not in loader_stage @@ -195,7 +211,7 @@ def test_init_and_stages(tmp_path): run_status_pipeline_id=PIPELINE_TABULAR_TRAINING, ) begin_component(ws, COMPONENT_DATA_LOADER) - record_stage(ws, COMPONENT_DATA_LOADER, "read_and_sample", "completed", rows=100) + record_stage(ws, COMPONENT_DATA_LOADER, "prepare_data", "completed", rows=100) complete_component(ws, COMPONENT_DATA_LOADER) doc = json.loads(run_status_file_path(ws).read_text()) @@ -203,8 +219,8 @@ def test_init_and_stages(tmp_path): assert doc[DOCUMENT_PIPELINE_ID_FIELD] == PIPELINE_TABULAR_TRAINING assert _component_by_id(doc, COMPONENT_DATA_LOADER)["state"] == STATUS_COMPLETED assert _component_by_id(doc, COMPONENT_MODELS_TRAINING)["state"] == STATUS_PENDING - read_stage = next(s for s in _component_by_id(doc, COMPONENT_DATA_LOADER)["stages"] if s["id"] == "read_and_sample") - assert read_stage["rows"] == 100 + prepare_stage = next(s for s in _component_by_id(doc, COMPONENT_DATA_LOADER)["stages"] if s["id"] == "prepare_data") + assert prepare_stage["rows"] == 100 def test_run_status_recorder(tmp_path): @@ -218,7 +234,7 @@ def test_run_status_recorder(tmp_path): ) recorder = RunStatusRecorder(ws, COMPONENT_DATA_LOADER) recorder.begin() - recorder.record("validate_inputs", "completed") + recorder.record("prepare_data", "completed") recorder.complete() doc = recorder.publish_artifact(str(tmp_path / "artifact")) assert _component_by_id(doc, COMPONENT_DATA_LOADER)["state"] == STATUS_COMPLETED @@ -255,7 +271,7 @@ def test_validate_component_stages_warns_on_missing(caplog): "components": [ { "id": COMPONENT_DATA_LOADER, - "stages": [{"id": "validate_inputs", "status": "completed"}], + "stages": [{"id": "prepare_data", "status": "completed"}], } ], } diff --git a/components/training/autorag/leaderboard_evaluation/component.py b/components/training/autorag/leaderboard_evaluation/component.py index 42b928812..0e99af02d 100644 --- a/components/training/autorag/leaderboard_evaluation/component.py +++ b/components/training/autorag/leaderboard_evaluation/component.py @@ -14,7 +14,7 @@ def leaderboard_evaluation( rag_patterns: dsl.InputPath(dsl.Artifact), html_artifact: dsl.Output[dsl.HTML], - component_status: dsl.Output[dsl.Artifact] = None, + component_status: dsl.Output[dsl.Artifact], embedded_artifact: dsl.EmbeddedInput[dsl.Dataset] = None, optimization_metric: str = "faithfulness", ): @@ -338,6 +338,8 @@ def _build_leaderboard_html( _spec.loader.exec_module(_status_module) status = _status_module.bootstrap_status_tracker(embedded_artifact, component_status, "leaderboard_evaluation") with status: + status.set_metadata(display_name="Leaderboard Evaluation Status") + component_status.metadata["display_name"] = "Leaderboard Evaluation Status" with status.stage("build_leaderboard"): if not rag_patterns_dir.is_dir(): raise FileNotFoundError("rag_patterns path is not a directory: %s" % rag_patterns_dir) diff --git a/components/training/autorag/rag_templates_optimization/README.md b/components/training/autorag/rag_templates_optimization/README.md index 857af7451..38922c1f4 100644 --- a/components/training/autorag/rag_templates_optimization/README.md +++ b/components/training/autorag/rag_templates_optimization/README.md @@ -18,8 +18,8 @@ Carries out the iterative RAG optimization process. | `rag_patterns` | `dsl.Output[dsl.Artifact]` | `None` | kfp-enforced argument specifying an output artifact. Provided by kfp backend automatically. | | `test_data_key` | `Optional[str]` | `None` | Path to the benchmark JSON file in object storage used by generated notebooks. | | `vector_io_provider_id` | `str` | `None` | Vector I/O provider identifier as registered in OGX. | -| `embedded_artifact` | `dsl.EmbeddedInput[dsl.Dataset]` | `None` | Embedded ``autorag.shared`` helpers injected by KFP at runtime. | | `component_status` | `dsl.Output[dsl.Artifact]` | `None` | Output artifact containing stage-level progress tracking. | +| `embedded_artifact` | `dsl.EmbeddedInput[dsl.Dataset]` | `None` | Embedded ``autorag.shared`` helpers injected by KFP at runtime. | | `optimization_settings` | `Optional[dict]` | `None` | Additional settings customising the experiment. | | `input_data_key` | `Optional[str]` | `""` | A path to documents dir within a bucket used as an input to AI4RAG experiment. | diff --git a/components/training/autorag/rag_templates_optimization/component.py b/components/training/autorag/rag_templates_optimization/component.py index 39d3a63d9..0c8bb7af2 100644 --- a/components/training/autorag/rag_templates_optimization/component.py +++ b/components/training/autorag/rag_templates_optimization/component.py @@ -19,8 +19,8 @@ def rag_templates_optimization( rag_patterns: dsl.Output[dsl.Artifact], test_data_key: Optional[str], vector_io_provider_id: str, + component_status: dsl.Output[dsl.Artifact], embedded_artifact: dsl.EmbeddedInput[dsl.Dataset] = None, - component_status: dsl.Output[dsl.Artifact] = None, optimization_settings: Optional[dict] = None, input_data_key: Optional[str] = "", ): @@ -538,22 +538,22 @@ def load_as_langchain_doc(path: str | Path) -> list[Document]: _status_module = importlib.util.module_from_spec(_spec) _spec.loader.exec_module(_status_module) status = _status_module.bootstrap_status_tracker(embedded_artifact, component_status, "rag_templates_optimization") - if component_status is not None: - component_status.metadata["display_name"] = "RAG Templates Optimization Status" - run_optimization_steps = ["chunking", "embedding", "retrieval", "generation", "evaluation"] + optimize_templates_steps = ["chunking", "embedding", "retrieval", "generation", "evaluation"] class OptimizationEventHandler(BaseEventHandler): """Updates component status with ai4rag optimization sub-step progress.""" def on_status_change(self, level: LogLevel, message: str, step: str | None = None) -> None: if step: - status.record("run_optimization", "running", current_step=step) + status.record("optimize_templates", "running", current_step=step) def on_pattern_creation(self, payload: dict, evaluation_results: list, **kwargs) -> None: pass with status: - with status.stage("validate_inputs"): + status.set_metadata(display_name="RAG Templates Optimization Status") + component_status.metadata["display_name"] = "RAG Templates Optimization Status" + with status.stage("optimize_templates", steps=optimize_templates_steps): if not ogx_client_base_url or not ogx_client_api_key: raise ValueError( "OGX_CLIENT_BASE_URL and OGX_CLIENT_API_KEY environment variables must be set to non-empty values." @@ -645,7 +645,6 @@ def construct_model_instance(loader, node: yml.MappingNode) -> BaseEmbeddingMode explicit_instruction, ) - with status.stage("run_optimization", steps=run_optimization_steps): event_handler = OptimizationEventHandler() rag_exp = AI4RAGExperiment( client=client, @@ -661,15 +660,13 @@ def construct_model_instance(loader, node: yml.MappingNode) -> BaseEmbeddingMode rag_exp.search() selected_patterns = [getattr(ev, "pattern_name", "") for ev in rag_exp.results.evaluations] status.record( - "run_optimization", + "optimize_templates", "completed", max_rag_patterns=max_rag_patterns, selected_patterns=selected_patterns, - steps=run_optimization_steps, + steps=optimize_templates_steps, ) - with status.stage("write_patterns"): - def _evaluation_result_fallback(eval_data_list, evaluation_result): """Build evaluation_results.json-style list when question_scores missing or incomplete.""" out = [] diff --git a/components/training/autorag/rag_templates_optimization/tests/test_component_unit.py b/components/training/autorag/rag_templates_optimization/tests/test_component_unit.py index 0c6892da9..4ed6d2ff7 100644 --- a/components/training/autorag/rag_templates_optimization/tests/test_component_unit.py +++ b/components/training/autorag/rag_templates_optimization/tests/test_component_unit.py @@ -645,7 +645,7 @@ def test_detected_language_popped_before_search_space_construction(self, tmp_pat def _make_evaluation(pattern_name: str): - """Minimal ai4rag evaluation result for run_optimization status tests.""" + """Minimal ai4rag evaluation result for optimize_templates status tests.""" evaluation = mock.MagicMock() evaluation.pattern_name = pattern_name evaluation.indexing_params = {} @@ -657,8 +657,8 @@ def _make_evaluation(pattern_name: str): return evaluation -class TestRunOptimizationStatus: - """Tests for run_optimization stage progress in component_status.""" +class TestOptimizeTemplatesStatus: + """Tests for optimize_templates stage progress in component_status.""" @mock.patch.dict( "os.environ", @@ -667,8 +667,8 @@ class TestRunOptimizationStatus: "OGX_CLIENT_API_KEY": "test-api-key", }, ) - def test_run_optimization_records_max_rag_patterns_and_selected_patterns(self, tmp_path): - """run_optimization completed stage records max_rag_patterns and selected_patterns.""" + def test_optimize_templates_records_max_rag_patterns_and_selected_patterns(self, tmp_path): + """optimize_templates completed stage records max_rag_patterns and selected_patterns.""" mocks = _make_all_mocks() ogx_mod = _make_ogx_client_module() mock_ogx = mock.MagicMock() @@ -720,7 +720,7 @@ def test_run_optimization_records_max_rag_patterns_and_selected_patterns(self, t status_file = tmp_path / "component_status_out" / "component_status.json" status_data = json.loads(status_file.read_text()) - run_stage = next(stage for stage in status_data["stages"] if stage["id"] == "run_optimization") + run_stage = next(stage for stage in status_data["stages"] if stage["id"] == "optimize_templates") assert run_stage["status"] == "completed" assert run_stage["max_rag_patterns"] == 8 assert run_stage["selected_patterns"] == ["pattern_alpha", "pattern_beta"] @@ -740,8 +740,8 @@ def test_run_optimization_records_max_rag_patterns_and_selected_patterns(self, t "OGX_CLIENT_API_KEY": "test-api-key", }, ) - def test_run_optimization_search_failure_marks_stage_failed(self, tmp_path): - """run_optimization stage is marked failed when rag_exp.search() raises.""" + def test_optimize_templates_search_failure_marks_stage_failed(self, tmp_path): + """optimize_templates stage is marked failed when rag_exp.search() raises.""" mocks = _make_all_mocks() ogx_mod = _make_ogx_client_module() mock_ogx = mock.MagicMock() @@ -783,6 +783,6 @@ def test_run_optimization_search_failure_marks_stage_failed(self, tmp_path): ) status_data = json.loads((tmp_path / "component_status_out" / "component_status.json").read_text()) - run_stage = next(stage for stage in status_data["stages"] if stage["id"] == "run_optimization") + run_stage = next(stage for stage in status_data["stages"] if stage["id"] == "optimize_templates") assert run_stage["status"] == "failed" assert "search failed" in run_stage["error"] diff --git a/components/training/autorag/search_space_preparation/component.py b/components/training/autorag/search_space_preparation/component.py index 37678de2e..b7c90c4fb 100644 --- a/components/training/autorag/search_space_preparation/component.py +++ b/components/training/autorag/search_space_preparation/component.py @@ -17,7 +17,7 @@ def search_space_preparation( test_data: dsl.Input[dsl.Artifact], extracted_text: dsl.Input[dsl.Artifact], search_space_prep_report: dsl.Output[dsl.Artifact], - component_status: dsl.Output[dsl.Artifact] = None, + component_status: dsl.Output[dsl.Artifact], embedded_artifact: dsl.EmbeddedInput[dsl.Dataset] = None, embedding_models: Optional[List] = None, generation_models: Optional[List] = None, @@ -336,13 +336,14 @@ def represent_model_instance(dumper, model: BaseFoundationModel | BaseEmbeddingM _spec.loader.exec_module(_status_module) status = _status_module.bootstrap_status_tracker(embedded_artifact, component_status, "search_space_preparation") with status: - with status.stage("validate_inputs"): + status.set_metadata(display_name="Search Space Preparation Status") + component_status.metadata["display_name"] = "Search Space Preparation Status" + with status.stage("prepare_search_space"): if not ogx_client_base_url or not ogx_client_api_key: raise ValueError("OGX_CLIENT_BASE_URL and OGX_CLIENT_API_KEY environment variables must be set.") client = _create_ogx_client(base_url=ogx_client_base_url, api_key=ogx_client_api_key) - with status.stage("prepare_search_space"): search_space = prepare_ai4rag_search_space() benchmark_df = pd.read_json(Path(test_data.path)) @@ -387,7 +388,6 @@ def represent_model_instance(dumper, model: BaseFoundationModel | BaseEmbeddingM if detected_language: verbose_search_space_repr["detected_language"] = detected_language - with status.stage("write_report"): with open(search_space_prep_report.path, "w") as report_file: yml.safe_dump(verbose_search_space_repr, report_file) diff --git a/components/training/autorag/shared/component_status.py b/components/training/autorag/shared/component_status.py index 2321779bb..7da251543 100644 --- a/components/training/autorag/shared/component_status.py +++ b/components/training/autorag/shared/component_status.py @@ -10,7 +10,7 @@ status = component_status_tracker(component_status, "test_data_loader") with status: - with status.stage("download_and_sample"): + with status.stage("load_benchmark"): ... """ @@ -29,6 +29,11 @@ logger = logging.getLogger(__name__) +def utc_now_z() -> str: + """Return current UTC time as an ISO-8601 string with ``Z`` suffix.""" + return datetime.now(UTC).isoformat().replace("+00:00", "Z") + + class ComponentStatusEncoder(json.JSONEncoder): """Custom JSON encoder for component status data. @@ -39,7 +44,7 @@ class ComponentStatusEncoder(json.JSONEncoder): def default(self, obj: Any) -> Any: """Convert non-serializable objects to JSON-compatible types.""" if isinstance(obj, datetime): - return obj.isoformat() + return obj.isoformat().replace("+00:00", "Z") if obj.tzinfo is not None else obj.isoformat() if isinstance(obj, Path): return str(obj) if isinstance(obj, bytes): @@ -60,26 +65,19 @@ def default(self, obj: Any) -> Any: class ComponentStatusTracker: """Track stage-level progress within a single AutoRAG component.""" - def __init__(self, artifact_path: str | None, component_id: str) -> None: + def __init__(self, artifact_path: str, component_id: str) -> None: """Initialize the status tracker. Args: - artifact_path: Path to the KFP artifact directory where status.json will be written. - When ``None``, tracking is disabled (e.g. direct unit-test invocations without a mock artifact). + artifact_path: Path to the KFP artifact directory where component_status.json will be written. component_id: Unique component identifier (e.g., "test_data_loader"). """ - self._enabled = artifact_path is not None - self.artifact_path = Path(artifact_path) if self._enabled else Path(".") + self.artifact_path = Path(artifact_path) self.component_id = component_id self.stages: list[dict[str, Any]] = [] - self.started_at = self._utc_now_iso() + self.started_at = utc_now_z() self.metadata: dict[str, Any] = {} - @staticmethod - def _utc_now_iso() -> str: - """Return current UTC timestamp in ISO format.""" - return datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") - def record(self, stage_id: str, status: str, **metadata: Any) -> None: """Record or update a stage's status.""" existing_idx = next((i for i, s in enumerate(self.stages) if s["id"] == stage_id), None) @@ -87,7 +85,7 @@ def record(self, stage_id: str, status: str, **metadata: Any) -> None: stage_data = { "id": stage_id, "status": status, - "timestamp": self._utc_now_iso(), + "timestamp": utc_now_z(), **metadata, } @@ -110,15 +108,12 @@ def set_metadata(self, **metadata: Any) -> None: def save(self) -> None: """Write the final status to the artifact.""" - if not self._enabled: - return - self.artifact_path.mkdir(parents=True, exist_ok=True) data = { "component_id": self.component_id, "started_at": self.started_at, - "completed_at": self._utc_now_iso(), + "completed_at": utc_now_z(), "stages": self.stages, "metadata": self.metadata, } @@ -258,9 +253,16 @@ def bootstrap_status_tracker( def component_status_tracker(component_status: Any, component_id: str) -> ComponentStatusTracker: - """Build a tracker from an optional KFP ``component_status`` output artifact.""" - artifact_path = component_status.path if component_status is not None else None - return ComponentStatusTracker(artifact_path, component_id) + """Build a tracker from a KFP component_status output artifact. + + Args: + component_status: KFP output artifact for component status tracking. + component_id: Unique component identifier. + + Returns: + Configured ComponentStatusTracker instance. + """ + return ComponentStatusTracker(component_status.path, component_id) def load_component_status(artifact_path: str) -> dict[str, Any]: diff --git a/components/training/autorag/shared/run_status_templates/pipelines/documents-rag-optimization-pipeline.json b/components/training/autorag/shared/run_status_templates/pipelines/documents-rag-optimization-pipeline.json index 4729d5894..c4308f2fd 100644 --- a/components/training/autorag/shared/run_status_templates/pipelines/documents-rag-optimization-pipeline.json +++ b/components/training/autorag/shared/run_status_templates/pipelines/documents-rag-optimization-pipeline.json @@ -7,16 +7,8 @@ "description": "Download benchmark test data from S3 and sample records for evaluation.", "stages": [ { - "id": "validate_inputs", - "description": "Validate bucket/path inputs and S3 credentials." - }, - { - "id": "download_and_sample", - "description": "Download test JSON from S3 and validate benchmark schema." - }, - { - "id": "write_output", - "description": "Write sampled test data to the output artifact." + "id": "load_benchmark", + "description": "Validate S3 access, download benchmark JSON, sample records, and write test data artifact." } ] }, @@ -25,16 +17,8 @@ "description": "List input documents in S3 and write a documents descriptor manifest.", "stages": [ { - "id": "validate_inputs", - "description": "Validate S3 credentials and input parameters." - }, - { - "id": "list_and_sample", - "description": "List objects in the input prefix and apply sampling rules." - }, - { - "id": "write_descriptor", - "description": "Write documents_descriptor.json to the output artifact." + "id": "discover_documents", + "description": "List and sample source documents in S3 and write documents_descriptor.json." } ] }, @@ -42,13 +26,9 @@ "id": "text_extraction", "description": "Download listed documents and extract text with docling.", "stages": [ - { - "id": "load_descriptor", - "description": "Load documents_descriptor.json and validate S3 credentials." - }, { "id": "extract_documents", - "description": "Download source documents, run docling extraction, and write output artifacts." + "description": "Load descriptor, download sources, run docling extraction, and write text artifacts." } ] }, @@ -56,17 +36,9 @@ "id": "search_space_preparation", "description": "Prepare the AutoRAG search space and model pre-selection report.", "stages": [ - { - "id": "validate_inputs", - "description": "Validate inputs and OGX client configuration." - }, { "id": "prepare_search_space", - "description": "Run search-space preparation and model pre-selection." - }, - { - "id": "write_report", - "description": "Write search_space_prep_report.yml to the output artifact." + "description": "Validate OGX config, run search-space preparation, model pre-selection, and write the report." } ] }, @@ -75,12 +47,8 @@ "description": "Run ai4rag optimization and emit RAG pattern artifacts.", "stages": [ { - "id": "validate_inputs", - "description": "Validate optimization settings and input artifacts." - }, - { - "id": "run_optimization", - "description": "Execute RAG template optimization with ai4rag.", + "id": "optimize_templates", + "description": "Validate settings, run ai4rag optimization, and write pattern directories, metrics, and notebooks.", "steps": [ "chunking", "embedding", @@ -88,10 +56,6 @@ "generation", "evaluation" ] - }, - { - "id": "write_patterns", - "description": "Write pattern directories, metrics, and notebooks." } ] }, diff --git a/components/training/autorag/shared/tests/test_component_status.py b/components/training/autorag/shared/tests/test_component_status.py index a6a733bb0..e8be401e4 100644 --- a/components/training/autorag/shared/tests/test_component_status.py +++ b/components/training/autorag/shared/tests/test_component_status.py @@ -11,7 +11,6 @@ ComponentStatusEncoder, ComponentStatusTracker, bootstrap_status_tracker, - component_status_tracker, load_component_status, load_embedded_component_status_module, ) @@ -23,8 +22,8 @@ class TestComponentStatusTracker: def test_record_and_save(self, tmp_path: Path) -> None: """save() writes stages and component_id to component_status.json.""" tracker = ComponentStatusTracker(str(tmp_path), "test_data_loader") - tracker.record("validate_inputs", "started", rows=5) - tracker.record("validate_inputs", "completed") + tracker.record("load_benchmark", "started", rows=5) + tracker.record("load_benchmark", "completed") tracker.save() data = json.loads((tmp_path / COMPONENT_STATUS_FILENAME).read_text(encoding="utf-8")) @@ -33,20 +32,6 @@ def test_record_and_save(self, tmp_path: Path) -> None: assert data["stages"][0]["status"] == "completed" assert data["stages"][0]["rows"] == 5 - def test_disabled_tracker_skips_save(self, tmp_path: Path) -> None: - """When artifact_path is None, save() is a no-op.""" - tracker = ComponentStatusTracker(None, "test_data_loader") - tracker.record("validate_inputs", "completed") - tracker.save() - assert not (tmp_path / COMPONENT_STATUS_FILENAME).exists() - - def test_component_status_tracker_from_none(self, tmp_path: Path) -> None: - """component_status_tracker() accepts a missing artifact for unit tests.""" - tracker = component_status_tracker(None, "documents_discovery") - tracker.record("validate_inputs", "completed") - tracker.save() - assert not (tmp_path / COMPONENT_STATUS_FILENAME).exists() - def test_context_manager_marks_failed_and_saves(self, tmp_path: Path) -> None: """Context manager marks active stage failed and persists status on exception.""" with pytest.raises(RuntimeError, match="boom"): @@ -62,8 +47,8 @@ def test_record_completed_with_steps(self, tmp_path: Path) -> None: """Completed stages can include manifest step ids for dashboard display.""" tracker = ComponentStatusTracker(str(tmp_path), "rag_templates_optimization") steps = ["chunking", "embedding", "retrieval", "generation", "evaluation"] - tracker.record("run_optimization", "started") - tracker.record("run_optimization", "completed", steps=steps) + tracker.record("optimize_templates", "started") + tracker.record("optimize_templates", "completed", steps=steps) tracker.save() data = load_component_status(str(tmp_path)) @@ -72,15 +57,21 @@ def test_record_completed_with_steps(self, tmp_path: Path) -> None: def test_stage_skips_auto_complete_when_completed_inside_block(self, tmp_path: Path) -> None: """stage() does not overwrite a completed record written inside the block.""" tracker = ComponentStatusTracker(str(tmp_path), "rag_templates_optimization") - with tracker.stage("run_optimization", steps=["chunking"]): - tracker.record("run_optimization", "completed", max_rag_patterns=8) + with tracker.stage("optimize_templates", steps=["chunking"]): + tracker.record("optimize_templates", "completed", max_rag_patterns=8) tracker.save() data = load_component_status(str(tmp_path)) - run_stage = next(stage for stage in data["stages"] if stage["id"] == "run_optimization") + run_stage = next(stage for stage in data["stages"] if stage["id"] == "optimize_templates") assert run_stage["status"] == "completed" assert run_stage["max_rag_patterns"] == 8 + def test_utc_now_z_ends_with_z(self) -> None: + """Timestamps use UTC ISO-8601 with Z suffix.""" + from kfp_components.components.training.autorag.shared.component_status import utc_now_z + + assert utc_now_z().endswith("Z") + class TestComponentStatusEncoder: """Tests for JSON encoding of status metadata values.""" @@ -107,7 +98,7 @@ def test_bootstrap_status_tracker_from_shared_dir(self, tmp_path: Path) -> None: shared_dir = Path(__file__).resolve().parents[1] embedded = type("Embedded", (), {"path": str(shared_dir)})() status = bootstrap_status_tracker(embedded, type("Status", (), {"path": str(tmp_path)})(), "test_data_loader") - status.record("validate_inputs", "completed") + status.record("load_benchmark", "completed") status.save() assert (tmp_path / COMPONENT_STATUS_FILENAME).is_file() diff --git a/pipelines/training/automl/autogluon_timeseries_training_pipeline/README.md b/pipelines/training/automl/autogluon_timeseries_training_pipeline/README.md index e9bbd95d2..16ceb2632 100644 --- a/pipelines/training/automl/autogluon_timeseries_training_pipeline/README.md +++ b/pipelines/training/automl/autogluon_timeseries_training_pipeline/README.md @@ -28,35 +28,21 @@ to a single combined ``models_artifact``. 3. **Leaderboard** (``leaderboard_evaluation``): Builds an HTML leaderboard from the combined refitted-model artifact using the training stage's evaluation metric. -Args: train_data_secret_name: Kubernetes secret name containing S3 credentials (e.g. AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_S3_ENDPOINT, AWS_DEFAULT_REGION). train_data_bucket_name: S3-compatible bucket name containing the time series data file. train_data_file_key: S3 object key of the data -file (CSV or Parquet). File must include columns for item_id, timestamp, and target; optional columns for known covariates. target: Name of the column containing the numeric values to forecast. Corresponds to :attr:`~autogluon.timeseries.TimeSeriesDataFrame` target column. id_column: Name of the -column that identifies each time series (e.g. product_id, store_id). Passed as ``id_column`` when constructing TimeSeriesDataFrame; result uses ``item_id``. timestamp_column: Name of the column containing the timestamp/datetime for each observation. Passed as ``timestamp_column`` when constructing -TimeSeriesDataFrame; result uses ``timestamp`` as the second index level. known_covariates_names: Optional list of column names known in advance for the forecast horizon (e.g. holidays, promotions). See :attr:`~autogluon.timeseries.TimeSeriesPredictor.known_covariates_names`. prediction_length: -Number of time steps to forecast (horizon length). Positive integer (default: 1). top_n: Number of top models to select for the leaderboard and output (default: 3). eval_metric: Metric for model ranking in acronym (e.g. ``"MASE"``, ``"WQL"``) or snake_case form. Defaults to ``"MASE"``. preset: -Training quality tier. ``"speed"`` (default, 4 vCPU / 16 GiB) or ``"balanced"`` (may run more than 2x longer, 8 vCPU / 32 GiB). - -Returns: This pipeline wires task outputs between components; compiled runs expose the combined models artifact (per-model predictor, metrics, notebook paths) and leaderboard evaluation artifact (HTML + aggregated metrics), subject to Kubeflow Pipelines UI and artifact configuration. - -Raises: Component and runtime failures propagate from the underlying steps (for example: S3 access or empty data from the loader, invalid inputs, AutoGluon training or evaluation errors, or resource limits in the cluster). - -Example: pipeline = autogluon_timeseries_training_pipeline( train_data_secret_name="my-s3-secret", train_data_bucket_name="my-bucket", train_data_file_key="ts/sales.csv", target="sales", id_column="product_id", timestamp_column="date", known_covariates_names=["is_holiday", "promo"], -prediction_length=14, top_n=3, ) - ## Inputs ๐Ÿ“ฅ | Parameter | Type | Default | Description | | --------- | ---- | ------- | ----------- | -| `train_data_secret_name` | `str` | `None` | | -| `train_data_bucket_name` | `str` | `None` | | -| `train_data_file_key` | `str` | `None` | | -| `target` | `str` | `None` | | -| `id_column` | `str` | `None` | | -| `timestamp_column` | `str` | `None` | | -| `known_covariates_names` | `Optional[List[str]]` | `None` | | -| `prediction_length` | `int` | `1` | | -| `top_n` | `int` | `3` | | -| `eval_metric` | `str` | `MASE` | | -| `preset` | `str` | `speed` | | +| `train_data_secret_name` | `str` | `None` | Kubernetes secret name containing S3 credentials (e.g. AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_S3_ENDPOINT, AWS_DEFAULT_REGION). | +| `train_data_bucket_name` | `str` | `None` | S3-compatible bucket name containing the time series data file. | +| `train_data_file_key` | `str` | `None` | S3 object key of the data file (CSV or Parquet). File must include columns for item_id, timestamp, and target; optional columns for known covariates. | +| `target` | `str` | `None` | Name of the column containing the numeric values to forecast. Corresponds to :attr:`~autogluon.timeseries.TimeSeriesDataFrame` target column. | +| `id_column` | `str` | `None` | Name of the column that identifies each time series (e.g. product_id, store_id). Passed as ``id_column`` when constructing TimeSeriesDataFrame; result uses ``item_id``. | +| `timestamp_column` | `str` | `None` | Name of the column containing the timestamp/datetime for each observation. Passed as ``timestamp_column`` when constructing TimeSeriesDataFrame; result uses ``timestamp`` as the second index level. | +| `known_covariates_names` | `Optional[List[str]]` | `None` | Optional list of column names known in advance for the forecast horizon (e.g. holidays, promotions). See :attr:`~autogluon.timeseries.TimeSeriesPredictor.known_covariates_names`. | +| `prediction_length` | `int` | `1` | Number of time steps to forecast (horizon length). Positive integer (default: 1). | +| `top_n` | `int` | `3` | Number of top models to select for the leaderboard and output (default: 3). | +| `eval_metric` | `str` | `MASE` | Metric for model ranking in acronym (e.g. ``"MASE"``, ``"WQL"``) or snake_case form. Defaults to ``"MASE"``. | +| `preset` | `str` | `speed` | Training quality tier. ``"speed"`` (default, 4 vCPU / 16 GiB) or ``"balanced"`` (may run more than 2x longer, 8 vCPU / 32 GiB). | ## Metadata ๐Ÿ—‚๏ธ diff --git a/pipelines/training/automl/autogluon_timeseries_training_pipeline/pipeline.py b/pipelines/training/automl/autogluon_timeseries_training_pipeline/pipeline.py index b16f0d402..0d1edb41b 100644 --- a/pipelines/training/automl/autogluon_timeseries_training_pipeline/pipeline.py +++ b/pipelines/training/automl/autogluon_timeseries_training_pipeline/pipeline.py @@ -115,9 +115,9 @@ def autogluon_timeseries_training_pipeline( metrics), subject to Kubeflow Pipelines UI and artifact configuration. Raises: - Component and runtime failures propagate from the underlying steps (for example: S3 access or - empty data from the loader, invalid inputs, AutoGluon training or evaluation errors, or - resource limits in the cluster). + FileNotFoundError: If the S3 file cannot be found or accessed. + ValueError: If required columns are missing, temporal splits fail, or inputs are invalid. + RuntimeError: If AutoGluon training or evaluation fails, or cluster resource limits are exceeded. Example: pipeline = autogluon_timeseries_training_pipeline( diff --git a/pipelines/training/autorag/documents_rag_optimization_pipeline/README.md b/pipelines/training/autorag/documents_rag_optimization_pipeline/README.md index f972b6df4..777bbb2c9 100644 --- a/pipelines/training/autorag/documents_rag_optimization_pipeline/README.md +++ b/pipelines/training/autorag/documents_rag_optimization_pipeline/README.md @@ -63,6 +63,53 @@ The system integrates with OGX API for inference and vector database operations, +### Progress and dashboard artifacts + +Besides RAG pattern and data artifacts below, each run publishes: + +| KFP task | Output | File | Purpose | +| -------- | ------ | ---- | ------- | +| `publish-component-stage-map` | `component_stage_map` | `component_stage_map.json` | Static component-to-stage-to-step catalog for the RAG pipeline (published once at run start). | +| `test-data-loader` | `component_status` | `component_status.json` | Stage progress for benchmark test data download and sampling. | +| `documents-discovery` | `component_status` | `component_status.json` | Stage progress for listing and sampling source documents. | +| `text-extraction` | `component_status` | `component_status.json` | Stage progress for docling text extraction. | +| `search-space-preparation` | `component_status` | `component_status.json` | Stage progress for search-space preparation and model pre-selection. | +| `rag-templates-optimization` | `component_status` | `component_status.json` | Stage progress for RAG template optimization (including sub-steps). | +| `leaderboard-evaluation` | `component_status` | `component_status.json` | Stage progress for leaderboard generation. | + +Example artifact-store layout (task folder names are kebab-case): + +```text +// +โ”œโ”€โ”€ publish-component-stage-map//component_stage_map/component_stage_map.json +โ”œโ”€โ”€ test-data-loader//component_status/component_status.json +โ”œโ”€โ”€ documents-discovery//component_status/component_status.json +โ”œโ”€โ”€ text-extraction//component_status/component_status.json +โ”œโ”€โ”€ search-space-preparation//component_status/component_status.json +โ”œโ”€โ”€ rag-templates-optimization//component_status/component_status.json +โ””โ”€โ”€ leaderboard-evaluation//component_status/component_status.json +``` + +See [AutoRAG training components README](../../../components/training/autorag/README.md) for JSON field details. + +#### Dashboard join keys + +Dashboards join the static map (`component_stage_map.json`) to live progress (`component_status.json`) using **snake_case component ids**, not KFP task names: + +| Layer | Naming | Test data loader example | +| ----- | ------ | ------------------------ | +| Template `components[].id` | snake_case | `test_data_loader` | +| Runtime `component_status.json` โ†’ `component_id` | snake_case | `test_data_loader` | +| KFP root DAG task id (compiled YAML) | kebab-case | `test-data-loader` | +| KFP output parameter | snake_case | `component_status` | +| Artifact file | snake_case | `component_status.json` | + +Use `component_id` (and stage `id` fields inside each file) to correlate artifacts. KFP task names are only for locating artifact paths in the store. + +Canonical component ids are defined in the pipeline JSON templates under +[`run_status_templates/pipelines/`](../../../components/training/autorag/shared/run_status_templates/pipelines/) +(e.g. `documents-rag-optimization-pipeline.json`). + ## Optimization Engine: ai4rag ๐Ÿš€ The pipeline uses [ai4rag](https://github.com/IBM/ai4rag), a RAG Templates Optimization Engine that