Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ load_task = automl_data_loader(

In the tabular training pipeline, this component writes ``component_status.json`` under the
``component_status`` output artifact. The file includes ``component_id`` (``automl_data_loader``),
``started_at``, ``completed_at``, a ``stages`` list (ids such as ``validate_inputs``,
``read_and_sample``, ``cleanse``, ``split``, ``write_outputs``), and optional ``metadata``.
``started_at``, ``completed_at``, a ``stages`` list (ids such as ``prepare_data``,
``split_and_export``), and optional ``metadata``.
Match stage ids to the tabular pipeline entry in ``component_stage_map.json`` from the
``publish-component-stage-map`` task.

Expand Down
29 changes: 7 additions & 22 deletions components/data_processing/automl/tabular_data_loader/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,8 @@ def automl_data_loader( # noqa: D417
# Initialize status tracker
status = ComponentStatusTracker(component_status.path, "automl_data_loader")
with status:
# Stage: validate_inputs
status.record("validate_inputs", "started")
# Validation happens inline below
status.record("validate_inputs", "completed")
component_status.metadata["display_name"] = "Data Loader Status"
status.record("prepare_data", "started")

if sampling_method is None:
if task_type in ("binary", "multiclass"):
Expand Down Expand Up @@ -324,10 +322,9 @@ def load_data_in_batches(
return _sample_random(text_stream, PANDAS_CHUNK_SIZE, max_size_bytes)
return _sample_first_n_rows(text_stream, PANDAS_CHUNK_SIZE, max_size_bytes)

# Stage: read_and_sample
status.record(
"read_and_sample",
"started",
"prepare_data",
"running",
sampling_method=sampling_method,
source=f"s3://{bucket_name}/{file_key}",
)
Expand All @@ -347,11 +344,6 @@ def load_data_in_batches(
f"Available columns: {list(sampled_dataframe.columns)}"
)

status.record("read_and_sample", "completed", rows=len(sampled_dataframe))

# Stage: cleanse
status.record("cleanse", "started")

sampled_dataframe.replace([math.inf, -math.inf], float("nan"), inplace=True)

n_before_dedup = len(sampled_dataframe)
Expand Down Expand Up @@ -400,15 +392,14 @@ def load_data_in_batches(
sampling_method,
)
status.record(
"cleanse",
"prepare_data",
"completed",
rows=n_samples,
duplicates_dropped=n_dup_dropped,
labels_dropped=n_dropped,
)

# Stage: split
status.record("split", "started")
status.record("split_and_export", "started")

# --- Train/test split ---
from pathlib import Path
Expand Down Expand Up @@ -465,19 +456,13 @@ def load_data_in_batches(
X_y_test.to_csv(sampled_test_dataset.path, index=False)

status.record(
"split",
"split_and_export",
"completed",
test_size=test_size,
selection_train_size=selection_train_size,
stratify=stratify_effective,
)

# Stage: write_outputs
status.record("write_outputs", "started")
status.record("write_outputs", "completed")

component_status.metadata["display_name"] = "Data Loader Status"

# Sample row for downstream use (JSON string to avoid NaN issues)
sample_row = X_y_test.head(1).to_json(orient="records")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ def test_writes_component_status_json(self, tmp_path, monkeypatch):
payload = json.loads(status_path.read_text())
assert payload["component_id"] == "automl_data_loader"
stage_ids = [stage["id"] for stage in payload["stages"]]
assert "read_and_sample" in stage_ids
assert "split" in stage_ids
assert "prepare_data" in stage_ids
assert "split_and_export" in stage_ids

@mock.patch.dict("os.environ", mocked_env_variables)
def test_sets_component_status_display_name(self, tmp_path):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,6 @@ def example_pipeline(

In the time series training pipeline, this component writes ``component_status.json`` under the
``component_status`` output artifact. The file includes ``component_id`` (``timeseries_data_loader``),
timestamps, and per-stage status (e.g. ``validate_inputs``, ``read_and_sample``, ``split``,
``write_outputs``). Dashboards align stage ids with ``component_stage_map.json`` from
timestamps, and per-stage status (e.g. ``prepare_data``, ``split_and_export``).
Dashboards align stage ids with ``component_stage_map.json`` from
``publish-component-stage-map``.
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ def timeseries_data_loader(

status = ComponentStatusTracker(component_status.path, "timeseries_data_loader")
with status:
status.record("validate_inputs", "started")
status.record("validate_inputs", "completed")
component_status.metadata["display_name"] = "Timeseries Data Loader Status"
status.record("prepare_data", "started")

def get_s3_client(verify=True):
"""Create and return an S3 client using credentials from environment variables."""
Expand Down Expand Up @@ -290,8 +290,8 @@ def _clean_timeseries_dataframe(data, id_col, ts_col, log):
return out.reset_index(drop=True)

status.record(
"read_and_sample",
"started",
"prepare_data",
"running",
source=f"s3://{bucket_name}/{file_key}",
)
df = load_timeseries_data_truncate(bucket_name, file_key, MAX_SIZE_BYTES, PANDAS_CHUNK_SIZE)
Expand All @@ -309,9 +309,6 @@ def _clean_timeseries_dataframe(data, id_col, ts_col, log):
f"with columns {sorted(required_columns)}."
)

status.record("read_and_sample", "completed", rows=len(df))
status.record("cleanse", "started")

df = _clean_timeseries_dataframe(df, id_column, timestamp_column, logger)

n_valid = len(df)
Expand All @@ -322,8 +319,8 @@ def _clean_timeseries_dataframe(data, id_col, ts_col, log):
"Provide a larger dataset or fix invalid timestamps, null ids, and duplicate keys."
)

status.record("cleanse", "completed", rows=n_valid)
status.record("split", "started")
status.record("prepare_data", "completed", rows=n_valid)
status.record("split_and_export", "started")

# Create workspace datasets directory
datasets_dir = Path(workspace_path) / "datasets"
Expand Down Expand Up @@ -392,7 +389,7 @@ def _concat_sorted(parts: list, sort_by: list) -> pd.DataFrame:
)

status.record(
"split",
"split_and_export",
"completed",
test_size=test_size,
selection_train_size=selection_train_size,
Expand Down Expand Up @@ -427,7 +424,6 @@ def _concat_sorted(parts: list, sort_by: list) -> pd.DataFrame:

status.record("write_outputs", "started")
status.record("write_outputs", "completed")
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
component_status.metadata["display_name"] = "Timeseries Data Loader Status"

# Sample rows for downstream use (ISO timestamps when supported; JSON string to avoid NaN issues)
sample_tail = test_df.tail(min(5, len(test_df)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ Lists available documents from S3, performs sampling if applied and writes a JSO
| Parameter | Type | Default | Description |
| --------- | ---- | ------- | ----------- |
| `input_data_bucket_name` | `str` | `None` | S3 (or compatible) bucket containing input data. |
| `component_status` | `dsl.Output[dsl.Artifact]` | `None` | Output artifact containing stage-level progress tracking. |
| `input_data_path` | `str` | `""` | Path to folder with input documents within the bucket. |
| `test_data` | `dsl.Input[dsl.Artifact]` | `None` | Optional input artifact containing test data for sampling. |
| `sampling_enabled` | `bool` | `True` | Whether to enable sampling or not. |
| `sampling_max_size` | `float` | `1` | Maximum size of sampled documents (in gigabytes). |
| `discovered_documents` | `dsl.Output[dsl.Artifact]` | `None` | Output artifact containing the documents descriptor JSON file. |
| `component_status` | `dsl.Output[dsl.Artifact]` | `None` | Output artifact containing stage-level progress tracking. |
| `embedded_artifact` | `dsl.EmbeddedInput[dsl.Dataset]` | `None` | Embedded ``autorag.shared`` helpers injected by KFP at runtime. |

## Usage Examples 🧪
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
)
def documents_discovery(
input_data_bucket_name: str,
component_status: dsl.Output[dsl.Artifact],
input_data_path: str = "",
test_data: dsl.Input[dsl.Artifact] = None,
sampling_enabled: bool = True,
sampling_max_size: float = 1,
discovered_documents: dsl.Output[dsl.Artifact] = None,
component_status: dsl.Output[dsl.Artifact] = None,
embedded_artifact: dsl.EmbeddedInput[dsl.Dataset] = None,
):
"""Documents discovery component.
Expand Down Expand Up @@ -86,7 +86,8 @@ def get_test_data_docs_names() -> list[str]:
_spec.loader.exec_module(_status_module)
status = _status_module.bootstrap_status_tracker(embedded_artifact, component_status, "documents_discovery")
with status:
with status.stage("validate_inputs"):
component_status.metadata["display_name"] = "Documents Discovery Status"
with status.stage("discover_documents"):
s3_creds = {k: os.environ.get(k) for k in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_S3_ENDPOINT"]}
for k, v in s3_creds.items():
if v is None:
Expand All @@ -105,7 +106,6 @@ def _make_s3_client(verify=True):
verify=verify,
)

with status.stage("list_and_sample"):
# Use paginator to handle buckets with >1,000 objects
def _list_all_objects(s3_client):
"""List all objects under prefix using pagination."""
Expand Down Expand Up @@ -186,7 +186,6 @@ def _list_all_objects(s3_client):
f"enabled_max={sampling_max_size}GB" if sampling_enabled else "disabled",
)

with status.stage("write_descriptor"):
os.makedirs(discovered_documents.path, exist_ok=True)
descriptor_path = os.path.join(discovered_documents.path, DOCUMENTS_DESCRIPTOR_FILENAME)
with open(descriptor_path, "w") as f:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ The component reads S3-compatible credentials from environment variables (inject
| --------- | ---- | ------- | ----------- |
| `test_data_bucket_name` | `str` | `None` | S3 (or compatible) bucket that contains the test data file. |
| `test_data_path` | `str` | `None` | S3 object key to the JSON test data file. |
| `component_status` | `dsl.Output[dsl.Artifact]` | `None` | Output artifact containing stage-level progress tracking. |
| `benchmark_sample_size` | `int` | `25` | Maximum number of records to keep from the test data. When the dataset exceeds this limit, a reproducible random sample is drawn (seed 42). Set to 0 to disable sampling and keep all records. |
| `test_data` | `dsl.Output[dsl.Artifact]` | `None` | Output artifact that receives the (possibly sampled) file. |
| `component_status` | `dsl.Output[dsl.Artifact]` | `None` | Output artifact containing stage-level progress tracking. |
| `embedded_artifact` | `dsl.EmbeddedInput[dsl.Dataset]` | `None` | Embedded ``autorag.shared`` helpers injected by KFP at runtime. |

## Usage Examples 🧪
Expand Down
27 changes: 13 additions & 14 deletions components/data_processing/autorag/test_data_loader/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
def test_data_loader(
test_data_bucket_name: str,
test_data_path: str,
component_status: dsl.Output[dsl.Artifact],
benchmark_sample_size: int = 25,
test_data: dsl.Output[dsl.Artifact] = None,
component_status: dsl.Output[dsl.Artifact] = None,
embedded_artifact: dsl.EmbeddedInput[dsl.Dataset] = None,
):
"""Download test data JSON from S3 and sample it for benchmarking.
Expand Down Expand Up @@ -78,7 +78,8 @@ class TestDataLoaderException(Exception):
_spec.loader.exec_module(_status_module)
status = _status_module.bootstrap_status_tracker(embedded_artifact, component_status, "test_data_loader")
with status:
with status.stage("validate_inputs"):
component_status.metadata["display_name"] = "Test Data Loader Status"
with status.stage("load_benchmark"):
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if not test_data_bucket_name:
raise TypeError("test_data_bucket_name must be a non-empty string")

Expand All @@ -93,17 +94,16 @@ class TestDataLoaderException(Exception):

s3_creds["AWS_DEFAULT_REGION"] = os.environ.get("AWS_DEFAULT_REGION")

def _make_s3_client(verify=True):
return boto3.client(
"s3",
endpoint_url=s3_creds["AWS_S3_ENDPOINT"],
region_name=s3_creds["AWS_DEFAULT_REGION"],
aws_access_key_id=s3_creds["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=s3_creds["AWS_SECRET_ACCESS_KEY"],
verify=verify,
)

with status.stage("download_and_sample"):
def _make_s3_client(verify=True):
return boto3.client(
"s3",
endpoint_url=s3_creds["AWS_S3_ENDPOINT"],
region_name=s3_creds["AWS_DEFAULT_REGION"],
aws_access_key_id=s3_creds["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=s3_creds["AWS_SECRET_ACCESS_KEY"],
verify=verify,
)

s3_client = _make_s3_client()

logger.info("Fetching test data from S3: bucket='%s', path='%s'.", test_data_bucket_name, test_data_path)
Expand Down Expand Up @@ -149,7 +149,6 @@ def _make_s3_client(verify=True):
f"Make sure that each test data records contains following keys: {benchmark_record_keys}."
)

with status.stage("write_output"):
if 0 < benchmark_sample_size < len(benchmark_data) and isinstance(benchmark_data, list):
import random

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
def text_extraction(
documents_descriptor: dsl.Input[dsl.Artifact],
extracted_text: dsl.Output[dsl.Artifact],
component_status: dsl.Output[dsl.Artifact] = None,
component_status: dsl.Output[dsl.Artifact],
embedded_artifact: dsl.EmbeddedInput[dsl.Dataset] = None,
error_tolerance: Optional[float] = None,
max_extraction_workers: Optional[int] = None,
Expand Down Expand Up @@ -357,9 +357,10 @@ def raise_if_threshold_exceeded(error_details: list, total_docs: int, tolerance:
_spec.loader.exec_module(_status_module)
status = _status_module.bootstrap_status_tracker(embedded_artifact, component_status, "text_extraction")
with status:
component_status.metadata["display_name"] = "Text Extraction Status"
descriptor_path = Path(documents_descriptor.path) / DOCUMENTS_DESCRIPTOR_FILENAME

with status.stage("load_descriptor"):
with status.stage("extract_documents"):
if not descriptor_path.exists():
raise FileNotFoundError(f"documents_descriptor.json not found at {descriptor_path}")

Expand Down Expand Up @@ -390,7 +391,6 @@ def raise_if_threshold_exceeded(error_details: list, total_docs: int, tolerance:
logger.info("No documents to process.")
return

with status.stage("extract_documents"):
documents = sorted(documents, key=lambda d: d.get("size_bytes", 0), reverse=True)

if max_extraction_workers is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def evaluation_pipeline(models_artifact):
# Initialize status tracker
status = ComponentStatusTracker(component_status.path, "leaderboard_evaluation")
with status:
component_status.metadata["display_name"] = "Leaderboard Evaluation Status"
logger = logging.getLogger(__name__)

# Stage: build_leaderboard
Expand Down Expand Up @@ -152,7 +153,6 @@ def evaluation_pipeline(models_artifact):
best_model=best_model_name,
model_count=len(leaderboard_df),
)
component_status.metadata["display_name"] = "Leaderboard Evaluation Status"
return NamedTuple("outputs", best_model=str)(best_model=best_model_name)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ mutates predictor state. All artifacts are written under a single output artifac

Writes ``component_status.json`` under the ``component_status`` output artifact with ``component_id``
``autogluon_models_training`` and stages such as ``load_data``, ``model_selection`` (optional ``steps``
when completed), ``refit_full``, and ``evaluate_models``. Artifact metadata display name:
when completed), ``refit_and_evaluate``. Artifact metadata display name:
**Models Training Status**.

## Usage Examples 💡
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def _coerce_positive_class(value: Optional[str]) -> str | int | None:
# Initialize status tracker
status = ComponentStatusTracker(component_status.path, "autogluon_models_training")
with status:
component_status.metadata["display_name"] = "Models Training Status"
# Stage: load_data
status.record("load_data", "started")

Expand Down Expand Up @@ -254,7 +255,7 @@ def _coerce_positive_class(value: Optional[str]) -> str | int | None:
"completed",
top_n=top_n,
selected_models=top_models,
steps=["feature_engineering", "model_training", "stacking", "model_evaluation"],
steps=["feature_engineering", "model_training", "stacking", "evaluation"],
)

model_config = {
Expand Down Expand Up @@ -299,9 +300,8 @@ def retrieve_pipeline_name(name: str) -> str:
predictor_clone = predictor.clone(path=work_path, return_clone=True, dirs_exist_ok=True)

# Refit all top models in a single call: AutoGluon resolves stacking dependencies internally.
status.record("refit_full", "started")
status.record("refit_and_evaluate", "started")
predictor_clone.refit_full(model=top_models, train_data_extra=extra_train_df)
status.record("refit_full", "completed", model_count=len(model_names_full))

def replace_placeholder_in_notebook(notebook, replacements):
for cell in notebook.get("cells", []):
Expand Down Expand Up @@ -657,8 +657,12 @@ def _process_model(model_name_full: str) -> tuple[str, dict]:
"models": models_metadata,
}

status.record("evaluate_models", "completed", eval_metric=str(predictor.eval_metric))
component_status.metadata["display_name"] = "Models Training Status"
status.record(
"refit_and_evaluate",
"completed",
model_count=len(model_names_full),
eval_metric=str(predictor.eval_metric),
)

return NamedTuple("outputs", eval_metric=str)(eval_metric=eval_metric)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ def example_pipeline(

### Component status artifact

Writes ``component_status.json`` under ``component_status`` with ``component_id`` ``autogluon_timeseries_models_training`` and training stages (``load_data``, ``model_selection``, ``refit_full``, ``evaluate_models``). Artifact metadata display name: **Timeseries Models Training Status**.
Writes ``component_status.json`` under ``component_status`` with ``component_id``
``autogluon_timeseries_models_training`` and training stages (``load_data``, ``model_selection`` with
steps ``feature_engineering``, ``model_training``, ``stacking``, ``evaluation``, ``refit_and_evaluate``).
Artifact metadata display name: **Timeseries Models Training Status**.

Inference notebooks are loaded from ``shared/notebook_templates/timeseries_notebook.ipynb`` at runtime (same shared package data as tabular training).

Expand Down
Loading
Loading