1414
1515from __future__ import annotations
1616
17+ import base64
1718import json
1819import logging
1920from collections .abc import Iterator
3233STATUS_FAILED = "failed"
3334
3435
36+ class ComponentStatusEncoder (json .JSONEncoder ):
37+ """JSON encoder for component status metadata (Path, bytes, set, datetime)."""
38+
39+ def default (self , obj : Any ) -> Any :
40+ """Convert non-serializable objects to JSON-compatible types."""
41+ if isinstance (obj , datetime ):
42+ return obj .isoformat ().replace ("+00:00" , "Z" ) if obj .tzinfo is not None else obj .isoformat ()
43+ if isinstance (obj , Path ):
44+ return str (obj )
45+ if isinstance (obj , bytes ):
46+ return base64 .b64encode (obj ).decode ("ascii" )
47+ if isinstance (obj , set ):
48+ return sorted (obj , key = str )
49+ return super ().default (obj )
50+
51+
52+ def utc_now_z () -> str :
53+ """Return current UTC time as an ISO-8601 string with ``Z`` suffix."""
54+ return datetime .now (UTC ).isoformat ().replace ("+00:00" , "Z" )
55+
56+
3557class ComponentStatusTracker :
3658 """Track stage-level progress within a single component.
3759
3860 Publishes component-local status to an artifact without requiring workspace.
3961 Each component independently tracks its stages and metadata.
4062 """
4163
42- def __init__ (self , artifact_path : str , component_id : str ) -> None :
64+ def __init__ (self , artifact_path : str | None , component_id : str ) -> None :
4365 """Initialize the status tracker.
4466
4567 Args:
4668 artifact_path: Path to the KFP artifact directory where status.json will be written.
69+ When ``None``, tracking is disabled (e.g. unit tests without a mock artifact).
4770 component_id: Unique component identifier (e.g., "autogluon_models_training").
4871 """
49- self .artifact_path = Path (artifact_path )
72+ self ._enabled = artifact_path is not None
73+ self .artifact_path = Path (artifact_path ) if self ._enabled else Path ("." )
5074 self .component_id = component_id
5175 self .stages : list [dict [str , Any ]] = []
52- self .started_at = self . _utc_now_iso ()
76+ self .started_at = utc_now_z ()
5377 self .metadata : dict [str , Any ] = {}
5478
55- @staticmethod
56- def _utc_now_iso () -> str :
57- """Return current UTC timestamp in ISO format."""
58- return datetime .now (UTC ).isoformat ().replace ("+00:00" , "Z" )
59-
6079 def record (self , stage_id : str , status : str , ** metadata : Any ) -> None :
6180 """Record or update a stage's status.
6281
@@ -73,21 +92,18 @@ def record(self, stage_id: str, status: str, **metadata: Any) -> None:
7392 steps=["feature_eng", "training", "stacking"],
7493 models_trained=15)
7594 """
76- # Find existing stage or create new one
7795 existing_idx = next ((i for i , s in enumerate (self .stages ) if s ["id" ] == stage_id ), None )
7896
7997 stage_data = {
8098 "id" : stage_id ,
8199 "status" : status ,
82- "timestamp" : self . _utc_now_iso (),
100+ "timestamp" : utc_now_z (),
83101 ** metadata ,
84102 }
85103
86104 if existing_idx is not None :
87- # Update existing stage, preserving previously recorded metadata
88105 self .stages [existing_idx ].update (stage_data )
89106 else :
90- # Append new stage
91107 self .stages .append (stage_data )
92108
93109 logger .info (
@@ -115,19 +131,22 @@ def save(self) -> None:
115131 Creates the artifact directory if needed and writes component_status.json
116132 with all recorded stages and metadata.
117133 """
134+ if not self ._enabled :
135+ return
136+
118137 self .artifact_path .mkdir (parents = True , exist_ok = True )
119138
120139 data = {
121140 "component_id" : self .component_id ,
122141 "started_at" : self .started_at ,
123- "completed_at" : self . _utc_now_iso (),
142+ "completed_at" : utc_now_z (),
124143 "stages" : self .stages ,
125144 "metadata" : self .metadata ,
126145 }
127146
128147 output_file = self .artifact_path / COMPONENT_STATUS_FILENAME
129148 with output_file .open ("w" , encoding = "utf-8" ) as f :
130- json .dump (data , f , indent = 2 )
149+ json .dump (data , f , indent = 2 , cls = ComponentStatusEncoder )
131150
132151 logger .info (
133152 "COMPONENT_STATUS component=%s saved status with %d stages to %s" ,
@@ -146,31 +165,39 @@ def save_best_effort(self) -> None:
146165 self .component_id ,
147166 )
148167
149- def mark_active_failed (self , error : str ) -> None :
168+ def mark_active_failed (self , error : str | BaseException ) -> None :
150169 """Mark the in-progress stage as failed, or the last open stage if none is active."""
170+ if isinstance (error , BaseException ):
171+ error_msg = f"{ type (error ).__name__ } : { error } " if str (error ) else type (error ).__name__
172+ else :
173+ error_msg = error
174+
151175 active_statuses = (STATUS_STARTED , STATUS_RUNNING )
152176 for stage in reversed (self .stages ):
153177 if stage .get ("status" ) in active_statuses :
154- self .record (stage ["id" ], STATUS_FAILED , error = error )
178+ self .record (stage ["id" ], STATUS_FAILED , error = error_msg )
155179 return
156180
157181 if self .stages and self .stages [- 1 ].get ("status" ) != STATUS_COMPLETED :
158- self .record (self .stages [- 1 ]["id" ], STATUS_FAILED , error = error )
182+ self .record (self .stages [- 1 ]["id" ], STATUS_FAILED , error = error_msg )
159183 return
160184
161- self .set_metadata (status = STATUS_FAILED , error = error )
185+ self .set_metadata (status = STATUS_FAILED , error = error_msg )
162186
163187 @contextmanager
164188 def stage (self , stage_id : str , ** start_metadata : Any ) -> Iterator [None ]:
165189 """Record stage started/completed, or failed when an exception escapes the block."""
166190 self .record (stage_id , STATUS_STARTED , ** start_metadata )
167191 try :
168192 yield
169- except Exception as exc :
170- self .record (stage_id , STATUS_FAILED , error = str (exc ))
193+ except BaseException as exc :
194+ error_msg = f"{ type (exc ).__name__ } : { exc } " if str (exc ) else type (exc ).__name__
195+ self .record (stage_id , STATUS_FAILED , error = error_msg )
171196 raise
172197 else :
173- self .record (stage_id , STATUS_COMPLETED )
198+ latest = next ((s for s in reversed (self .stages ) if s ["id" ] == stage_id ), None )
199+ if latest is None or latest .get ("status" ) not in (STATUS_COMPLETED , STATUS_FAILED ):
200+ self .record (stage_id , STATUS_COMPLETED )
174201
175202 def __enter__ (self ) -> ComponentStatusTracker :
176203 """Enter context: return this tracker."""
@@ -179,7 +206,7 @@ def __enter__(self) -> ComponentStatusTracker:
179206 def __exit__ (self , exc_type : type [BaseException ] | None , exc : BaseException | None , tb : Any ) -> bool :
180207 """On exit, mark active stage failed and save status best-effort."""
181208 if exc is not None :
182- self .mark_active_failed (str ( exc ) )
209+ self .mark_active_failed (exc )
183210 self .save_best_effort ()
184211 return False
185212
@@ -194,7 +221,7 @@ def load_component_status(artifact_path: str) -> dict[str, Any]:
194221
195222 Returns:
196223 Dict containing component_id, started_at, completed_at, stages, and metadata.
197- Returns empty dict if file doesn't exist.
224+ Returns empty dict if file doesn't exist or is unreadable .
198225
199226 Example:
200227 status = load_component_status("/path/to/artifact")
@@ -204,5 +231,9 @@ def load_component_status(artifact_path: str) -> dict[str, Any]:
204231 if not status_file .exists ():
205232 return {}
206233
207- with status_file .open ("r" , encoding = "utf-8" ) as f :
208- return json .load (f )
234+ try :
235+ with status_file .open ("r" , encoding = "utf-8" ) as f :
236+ return json .load (f )
237+ except (json .JSONDecodeError , OSError ) as e :
238+ logger .warning ("Failed to load status from %s: %s" , status_file , e )
239+ return {}
0 commit comments