Skip to content

Commit deed1ce

Browse files
authored
feat(trainer): add dataset and model initializer support to container backend (#188)
* feat(trainer): add dataset and model initializer support to container backend Add support for dataset and model initializers in the container backend to bring it to feature parity with the Kubernetes backend. Changes: - Add utility functions for building initializer commands and environment variables - Implement _run_initializers() and _run_single_initializer() methods in ContainerBackend - Run initializers sequentially before training containers start - Download datasets to /workspace/dataset and models to /workspace/model - Track initializer containers as separate steps in TrainJob - Support all initializer types: HuggingFace, S3, and DataCache - Add comprehensive unit tests for all initializer configurations - Handle initializer failures with proper cleanup and error messages Fixes #171 Signed-off-by: HKanoje <hrithik.kanoje@gmail.com> * feat(trainer): address reviewer feedback for initializer support - Make initializer image configurable via ContainerBackendConfig - Make initializer timeout configurable (default 600 seconds) - Implement wait API in adapters instead of polling - Clean up successful initializer containers after completion - Clean up network on initializer failure - Raise ValueError for unsupported initializer types (no datacache fallback) All tests passing (173/173). Addresses all feedback from PR #188. Signed-off-by: HKanoje <hrithik.kanoje@gmail.com> * chore(trainer): add cleanup helper to reduce duplication Add _cleanup_container_resources() helper method to consolidate duplicated cleanup logic for stopping/removing containers and deleting networks. Refactor 5 locations across train(), initializer handlers, and delete_job() to use this helper. Signed-off-by: HKanoje <hrithik.kanoje@gmail.com> * fix(trainer): use correct initializer images and working directory Address feedback for initializer support in container backend: - Use separate images for dataset/model initializers: - kubeflow/dataset-initializer:latest for datasets - kubeflow/model-initializer:latest for models (instead of kubeflow/training-operator:latest) - Update python commands to use pkg.initializers module: - python -m pkg.initializers.dataset (for dataset) - python -m pkg.initializers.model (for model) - Change initializer working_dir from /workspace to /app per Dockerfile convention Refs: https://github.com/kubeflow/trainer/tree/master/cmd/initializers Signed-off-by: HKanoje <hrithik.kanoje@gmail.com> * fix(container): address PR review comments for initializer support - Use GHCR images as default for dataset/model initializers - Replace suppress with try-except blocks - Refactor initializer utils with ContainerInitializer dataclass - Add get_dataset_initializer and get_model_initializer functions - Remove DataCache support (unsupported in container backend) - Merge initializer tests into test_train() and test_get_job_logs() - Remove duplicate test functions Signed-off-by: HKanoje <hrithik.kanoje@gmail.com> * fix(container): add name field to ContainerInitializer and remove init_type - Add name field to ContainerInitializer dataclass - Set name='dataset-initializer' and name='model-initializer' in utils - Remove init_type parameter from _run_single_initializer() - Use container_init.name for labels and log messages Signed-off-by: HKanoje <hrithik.kanoje@gmail.com> --------- Signed-off-by: HKanoje <hrithik.kanoje@gmail.com>
1 parent 36e2282 commit deed1ce

File tree

7 files changed

+688
-48
lines changed

7 files changed

+688
-48
lines changed

kubeflow/trainer/backends/container/adapters/base.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,3 +193,22 @@ def get_network(self, network_id: str) -> Optional[dict]:
193193
Dictionary with network info including labels, or None if not found
194194
"""
195195
raise NotImplementedError()
196+
197+
@abc.abstractmethod
198+
def wait_for_container(self, container_id: str, timeout: Optional[int] = None) -> int:
199+
"""
200+
Wait for a container to exit and return its exit code.
201+
202+
This is a blocking call that waits until the container stops.
203+
204+
Args:
205+
container_id: Container ID
206+
timeout: Maximum time to wait in seconds, or None to wait indefinitely
207+
208+
Returns:
209+
Container exit code
210+
211+
Raises:
212+
TimeoutError: If timeout is reached before container exits
213+
"""
214+
raise NotImplementedError()

kubeflow/trainer/backends/container/adapters/docker.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,3 +227,31 @@ def get_network(self, network_id: str) -> Optional[dict]:
227227
}
228228
except Exception:
229229
return None
230+
231+
def wait_for_container(self, container_id: str, timeout: Optional[int] = None) -> int:
232+
"""
233+
Wait for a Docker container to exit and return its exit code.
234+
235+
Args:
236+
container_id: Container ID
237+
timeout: Maximum time to wait in seconds, or None to wait indefinitely
238+
239+
Returns:
240+
Container exit code
241+
242+
Raises:
243+
TimeoutError: If timeout is reached before container exits
244+
"""
245+
try:
246+
container = self.get_container(container_id)
247+
result = container.wait(timeout=timeout)
248+
# Docker wait() returns a dict with 'StatusCode' key
249+
if isinstance(result, dict):
250+
return result.get("StatusCode", 0)
251+
return int(result)
252+
except Exception as e:
253+
if "timeout" in str(e).lower():
254+
raise TimeoutError(
255+
f"Container {container_id} did not exit within {timeout} seconds"
256+
) from e
257+
raise

kubeflow/trainer/backends/container/adapters/podman.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,3 +254,29 @@ def get_network(self, network_id: str) -> Optional[dict]:
254254
}
255255
except Exception:
256256
return None
257+
258+
def wait_for_container(self, container_id: str, timeout: Optional[int] = None) -> int:
259+
"""
260+
Wait for a Podman container to exit and return its exit code.
261+
262+
Args:
263+
container_id: Container ID
264+
timeout: Maximum time to wait in seconds, or None to wait indefinitely
265+
266+
Returns:
267+
Container exit code
268+
269+
Raises:
270+
TimeoutError: If timeout is reached before container exits
271+
"""
272+
try:
273+
container = self.get_container(container_id)
274+
result = container.wait(timeout=timeout)
275+
# Podman wait() returns exit code directly
276+
return int(result)
277+
except Exception as e:
278+
if "timeout" in str(e).lower():
279+
raise TimeoutError(
280+
f"Container {container_id} did not exit within {timeout} seconds"
281+
) from e
282+
raise

0 commit comments

Comments
 (0)