Skip to content

Commit 304ad16

Browse files
aggregate and align the stages
Signed-off-by: Lukasz Cmielowski <lcmielow@redhat.com> Assisted-by: Cursor
1 parent b3cd7ef commit 304ad16

23 files changed

Lines changed: 150 additions & 219 deletions

File tree

components/data_processing/automl/tabular_data_loader/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,8 @@ load_task = automl_data_loader(
224224

225225
In the tabular training pipeline, this component writes ``component_status.json`` under the
226226
``component_status`` output artifact. The file includes ``component_id`` (``automl_data_loader``),
227-
``started_at``, ``completed_at``, a ``stages`` list (ids such as ``validate_inputs``,
228-
``read_and_sample``, ``cleanse``, ``split``, ``write_outputs``), and optional ``metadata``.
227+
``started_at``, ``completed_at``, a ``stages`` list (ids such as ``prepare_data``,
228+
``split_and_export``), and optional ``metadata``.
229229
Match stage ids to the tabular pipeline entry in ``component_stage_map.json`` from the
230230
``publish-component-stage-map`` task.
231231

components/data_processing/automl/tabular_data_loader/component.py

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,7 @@ def automl_data_loader( # noqa: D417
132132
# Initialize status tracker
133133
status = ComponentStatusTracker(component_status.path, "automl_data_loader")
134134
with status:
135-
# Stage: validate_inputs
136-
status.record("validate_inputs", "started")
137-
# Validation happens inline below
138-
status.record("validate_inputs", "completed")
135+
status.record("prepare_data", "started")
139136

140137
if sampling_method is None:
141138
if task_type in ("binary", "multiclass"):
@@ -324,10 +321,9 @@ def load_data_in_batches(
324321
return _sample_random(text_stream, PANDAS_CHUNK_SIZE, max_size_bytes)
325322
return _sample_first_n_rows(text_stream, PANDAS_CHUNK_SIZE, max_size_bytes)
326323

327-
# Stage: read_and_sample
328324
status.record(
329-
"read_and_sample",
330-
"started",
325+
"prepare_data",
326+
"running",
331327
sampling_method=sampling_method,
332328
source=f"s3://{bucket_name}/{file_key}",
333329
)
@@ -347,10 +343,6 @@ def load_data_in_batches(
347343
f"Available columns: {list(sampled_dataframe.columns)}"
348344
)
349345

350-
status.record("read_and_sample", "completed", rows=len(sampled_dataframe))
351-
352-
# Stage: cleanse
353-
status.record("cleanse", "started")
354346

355347
sampled_dataframe.replace([math.inf, -math.inf], float("nan"), inplace=True)
356348

@@ -400,15 +392,14 @@ def load_data_in_batches(
400392
sampling_method,
401393
)
402394
status.record(
403-
"cleanse",
395+
"prepare_data",
404396
"completed",
405397
rows=n_samples,
406398
duplicates_dropped=n_dup_dropped,
407399
labels_dropped=n_dropped,
408400
)
409401

410-
# Stage: split
411-
status.record("split", "started")
402+
status.record("split_and_export", "started")
412403

413404
# --- Train/test split ---
414405
from pathlib import Path
@@ -465,17 +456,13 @@ def load_data_in_batches(
465456
X_y_test.to_csv(sampled_test_dataset.path, index=False)
466457

467458
status.record(
468-
"split",
459+
"split_and_export",
469460
"completed",
470461
test_size=test_size,
471462
selection_train_size=selection_train_size,
472463
stratify=stratify_effective,
473464
)
474465

475-
# Stage: write_outputs
476-
status.record("write_outputs", "started")
477-
status.record("write_outputs", "completed")
478-
479466
component_status.metadata["display_name"] = "Data Loader Status"
480467

481468
# Sample row for downstream use (JSON string to avoid NaN issues)

components/data_processing/automl/tabular_data_loader/tests/test_component_unit.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,8 @@ def test_writes_component_status_json(self, tmp_path, monkeypatch):
212212
payload = json.loads(status_path.read_text())
213213
assert payload["component_id"] == "automl_data_loader"
214214
stage_ids = [stage["id"] for stage in payload["stages"]]
215-
assert "read_and_sample" in stage_ids
216-
assert "split" in stage_ids
215+
assert "prepare_data" in stage_ids
216+
assert "split_and_export" in stage_ids
217217

218218
@mock.patch.dict("os.environ", mocked_env_variables)
219219
def test_sets_component_status_display_name(self, tmp_path):

components/data_processing/automl/timeseries_data_loader/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,6 @@ def example_pipeline(
104104

105105
In the time series training pipeline, this component writes ``component_status.json`` under the
106106
``component_status`` output artifact. The file includes ``component_id`` (``timeseries_data_loader``),
107-
timestamps, and per-stage status (e.g. ``validate_inputs``, ``read_and_sample``, ``split``,
108-
``write_outputs``). Dashboards align stage ids with ``component_stage_map.json`` from
107+
timestamps, and per-stage status (e.g. ``prepare_data``, ``split_and_export``).
108+
Dashboards align stage ids with ``component_stage_map.json`` from
109109
``publish-component-stage-map``.

components/data_processing/automl/timeseries_data_loader/component.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,7 @@ def timeseries_data_loader(
9696

9797
status = ComponentStatusTracker(component_status.path, "timeseries_data_loader")
9898
with status:
99-
status.record("validate_inputs", "started")
100-
status.record("validate_inputs", "completed")
99+
status.record("prepare_data", "started")
101100

102101
def get_s3_client(verify=True):
103102
"""Create and return an S3 client using credentials from environment variables."""
@@ -289,8 +288,8 @@ def _clean_timeseries_dataframe(data, id_col, ts_col, log):
289288
return out.reset_index(drop=True)
290289

291290
status.record(
292-
"read_and_sample",
293-
"started",
291+
"prepare_data",
292+
"running",
294293
source=f"s3://{bucket_name}/{file_key}",
295294
)
296295
df = load_timeseries_data_truncate(bucket_name, file_key, MAX_SIZE_BYTES, PANDAS_CHUNK_SIZE)
@@ -308,9 +307,6 @@ def _clean_timeseries_dataframe(data, id_col, ts_col, log):
308307
f"with columns {sorted(required_columns)}."
309308
)
310309

311-
status.record("read_and_sample", "completed", rows=len(df))
312-
status.record("cleanse", "started")
313-
314310
df = _clean_timeseries_dataframe(df, id_column, timestamp_column, logger)
315311

316312
n_valid = len(df)
@@ -321,8 +317,8 @@ def _clean_timeseries_dataframe(data, id_col, ts_col, log):
321317
"Provide a larger dataset or fix invalid timestamps, null ids, and duplicate keys."
322318
)
323319

324-
status.record("cleanse", "completed", rows=n_valid)
325-
status.record("split", "started")
320+
status.record("prepare_data", "completed", rows=n_valid)
321+
status.record("split_and_export", "started")
326322

327323
# Create workspace datasets directory
328324
datasets_dir = Path(workspace_path) / "datasets"
@@ -391,7 +387,7 @@ def _concat_sorted(parts: list, sort_by: list) -> pd.DataFrame:
391387
)
392388

393389
status.record(
394-
"split",
390+
"split_and_export",
395391
"completed",
396392
test_size=test_size,
397393
selection_train_size=selection_train_size,
@@ -424,8 +420,6 @@ def _concat_sorted(parts: list, sort_by: list) -> pd.DataFrame:
424420
"selection_train_size": selection_train_size,
425421
}
426422

427-
status.record("write_outputs", "started")
428-
status.record("write_outputs", "completed")
429423
component_status.metadata["display_name"] = "Timeseries Data Loader Status"
430424

431425
# Sample row for downstream use (JSON string to avoid NaN issues)

components/data_processing/autorag/documents_discovery/component.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def get_test_data_docs_names() -> list[str]:
8686
_spec.loader.exec_module(_status_module)
8787
status = _status_module.bootstrap_status_tracker(embedded_artifact, component_status, "documents_discovery")
8888
with status:
89-
with status.stage("validate_inputs"):
89+
with status.stage("discover_documents"):
9090
s3_creds = {k: os.environ.get(k) for k in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_S3_ENDPOINT"]}
9191
for k, v in s3_creds.items():
9292
if v is None:
@@ -105,7 +105,6 @@ def _make_s3_client(verify=True):
105105
verify=verify,
106106
)
107107

108-
with status.stage("list_and_sample"):
109108
# Use paginator to handle buckets with >1,000 objects
110109
def _list_all_objects(s3_client):
111110
"""List all objects under prefix using pagination."""
@@ -186,7 +185,6 @@ def _list_all_objects(s3_client):
186185
f"enabled_max={sampling_max_size}GB" if sampling_enabled else "disabled",
187186
)
188187

189-
with status.stage("write_descriptor"):
190188
os.makedirs(discovered_documents.path, exist_ok=True)
191189
descriptor_path = os.path.join(discovered_documents.path, DOCUMENTS_DESCRIPTOR_FILENAME)
192190
with open(descriptor_path, "w") as f:

components/data_processing/autorag/test_data_loader/component.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class TestDataLoaderException(Exception):
7878
_spec.loader.exec_module(_status_module)
7979
status = _status_module.bootstrap_status_tracker(embedded_artifact, component_status, "test_data_loader")
8080
with status:
81-
with status.stage("validate_inputs"):
81+
with status.stage("load_benchmark"):
8282
if not test_data_bucket_name:
8383
raise TypeError("test_data_bucket_name must be a non-empty string")
8484

@@ -93,17 +93,16 @@ class TestDataLoaderException(Exception):
9393

9494
s3_creds["AWS_DEFAULT_REGION"] = os.environ.get("AWS_DEFAULT_REGION")
9595

96-
def _make_s3_client(verify=True):
97-
return boto3.client(
98-
"s3",
99-
endpoint_url=s3_creds["AWS_S3_ENDPOINT"],
100-
region_name=s3_creds["AWS_DEFAULT_REGION"],
101-
aws_access_key_id=s3_creds["AWS_ACCESS_KEY_ID"],
102-
aws_secret_access_key=s3_creds["AWS_SECRET_ACCESS_KEY"],
103-
verify=verify,
104-
)
105-
106-
with status.stage("download_and_sample"):
96+
def _make_s3_client(verify=True):
97+
return boto3.client(
98+
"s3",
99+
endpoint_url=s3_creds["AWS_S3_ENDPOINT"],
100+
region_name=s3_creds["AWS_DEFAULT_REGION"],
101+
aws_access_key_id=s3_creds["AWS_ACCESS_KEY_ID"],
102+
aws_secret_access_key=s3_creds["AWS_SECRET_ACCESS_KEY"],
103+
verify=verify,
104+
)
105+
107106
s3_client = _make_s3_client()
108107

109108
logger.info("Fetching test data from S3: bucket='%s', path='%s'.", test_data_bucket_name, test_data_path)
@@ -149,7 +148,6 @@ def _make_s3_client(verify=True):
149148
f"Make sure that each test data records contains following keys: {benchmark_record_keys}."
150149
)
151150

152-
with status.stage("write_output"):
153151
if 0 < benchmark_sample_size < len(benchmark_data) and isinstance(benchmark_data, list):
154152
import random
155153

components/data_processing/autorag/text_extraction/component.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ def raise_if_threshold_exceeded(error_details: list, total_docs: int, tolerance:
359359
with status:
360360
descriptor_path = Path(documents_descriptor.path) / DOCUMENTS_DESCRIPTOR_FILENAME
361361

362-
with status.stage("load_descriptor"):
362+
with status.stage("extract_documents"):
363363
if not descriptor_path.exists():
364364
raise FileNotFoundError(f"documents_descriptor.json not found at {descriptor_path}")
365365

@@ -390,7 +390,6 @@ def raise_if_threshold_exceeded(error_details: list, total_docs: int, tolerance:
390390
logger.info("No documents to process.")
391391
return
392392

393-
with status.stage("extract_documents"):
394393
documents = sorted(documents, key=lambda d: d.get("size_bytes", 0), reverse=True)
395394

396395
if max_extraction_workers is not None:

components/training/automl/autogluon_models_training/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ mutates predictor state. All artifacts are written under a single output artifac
6666

6767
Writes ``component_status.json`` under the ``component_status`` output artifact with ``component_id``
6868
``autogluon_models_training`` and stages such as ``load_data``, ``model_selection`` (optional ``steps``
69-
when completed), ``refit_full``, and ``evaluate_models``. Artifact metadata display name:
69+
when completed), ``refit_and_evaluate``. Artifact metadata display name:
7070
**Models Training Status**.
7171

7272
## Usage Examples 💡

components/training/automl/autogluon_models_training/component.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -299,9 +299,8 @@ def retrieve_pipeline_name(name: str) -> str:
299299
predictor_clone = predictor.clone(path=work_path, return_clone=True, dirs_exist_ok=True)
300300

301301
# Refit all top models in a single call: AutoGluon resolves stacking dependencies internally.
302-
status.record("refit_full", "started")
302+
status.record("refit_and_evaluate", "started")
303303
predictor_clone.refit_full(model=top_models, train_data_extra=extra_train_df)
304-
status.record("refit_full", "completed", model_count=len(model_names_full))
305304

306305
def replace_placeholder_in_notebook(notebook, replacements):
307306
for cell in notebook.get("cells", []):
@@ -657,7 +656,12 @@ def _process_model(model_name_full: str) -> tuple[str, dict]:
657656
"models": models_metadata,
658657
}
659658

660-
status.record("evaluate_models", "completed", eval_metric=str(predictor.eval_metric))
659+
status.record(
660+
"refit_and_evaluate",
661+
"completed",
662+
model_count=len(model_names_full),
663+
eval_metric=str(predictor.eval_metric),
664+
)
661665
component_status.metadata["display_name"] = "Models Training Status"
662666

663667
return NamedTuple("outputs", eval_metric=str)(eval_metric=eval_metric)

0 commit comments

Comments
 (0)