feat(spark): implement batch job support via SparkApplication CRD (KEP-107 Phase 1)#386
feat(spark): implement batch job support via SparkApplication CRD (KEP-107 Phase 1)#386ghazariann wants to merge 13 commits intokubeflow:mainfrom
Conversation
Signed-off-by: Vahagn <vghazaryan@cloudlinux.com>
Signed-off-by: ghazariann <vahagn.ghazayan@gmail.com>
|
🎉 Welcome to the Kubeflow SDK! 🎉 Thanks for opening your first PR! We're happy to have you as part of our community 🚀 Here's what happens next:
Join the community:
Feel free to ask questions in the comments if you need any help or clarification! |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Pull request overview
Adds Spark batch job support to the Kubeflow Spark SDK by submitting SparkApplication (v1beta2) CRDs, including image build/load helpers, public SparkClient APIs, examples, and e2e coverage updates.
Changes:
- Introduces
SparkJob/SparkJobStatustypes and batch-job CRUD + wait + logs methods on the Kubernetes backend andSparkClient. - Adds an
ImageLoaderabstraction with aKindImageLoaderimplementation to load locally-built images into Kind. - Adds batch-job examples (
spark_job_simple.py,wordcount.py), a new e2e test, workflow deps, and RBAC updates.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| test/e2e/spark/test_spark_examples.py | Adds e2e coverage for the new batch-job example. |
| kubeflow/trainer/types/types.py | Fixes ignore-pattern docstring to match expected glob format. |
| kubeflow/trainer/constants/constants.py | Fixes default ignore patterns to use *.pt/*.pth globs. |
| kubeflow/spark/types/types.py | Adds batch job dataclass + status enum for SparkApplication jobs. |
| kubeflow/spark/image/loaders.py | Implements image loading abstraction + Kind loader (others stubbed). |
| kubeflow/spark/image/init.py | Exposes image-loader utilities as a package API. |
| kubeflow/spark/backends/kubernetes/utils.py | Adds job naming, image build helper, SparkApplication CR builder + CR→SDK conversion. |
| kubeflow/spark/backends/kubernetes/constants.py | Adds SparkApplication API constants and job/log-related constants. |
| kubeflow/spark/backends/kubernetes/backend.py | Implements submit/get/list/delete/logs/wait for SparkApplication batch jobs. |
| kubeflow/spark/backends/base.py | Extends backend interface with batch-job abstract methods. |
| kubeflow/spark/api/spark_client.py | Adds public SparkClient batch-job APIs. |
| kubeflow/spark/init.py | Re-exports batch job types and image loader types. |
| hack/e2e-setup-cluster.sh | Extends RBAC permissions to cover SparkApplications and renames the role. |
| examples/spark/wordcount.py | Adds a simple PySpark script used by the batch-job example. |
| examples/spark/spark_job_simple.py | Adds a batch job example demonstrating submit/wait/logs/delete. |
| .github/workflows/test-spark-examples.yaml | Installs docker extra for CI runs that build images. |
You can also share your feedback on Copilot code review. Take the survey.
| try: | ||
| docker_sdk.from_env().images.get(image_tag) | ||
| logger.info("Image '%s' already exists for same file content, reusing it", image_tag) | ||
| return image_tag | ||
| except Exception: | ||
| pass # Image not found locally, build it |
| def test_spark_job_simple_example(self): | ||
| """EX04: Validate spark_job_simple.py submits and completes batch jobs.""" | ||
| namespace = os.environ.get("SPARK_TEST_NAMESPACE", "spark-test") | ||
| returncode, stdout, stderr, _ = _run_example_with_watcher( | ||
| EXAMPLES_DIR / "spark_job_simple.py", | ||
| namespace, | ||
| timeout_sec=EXAMPLE_TIMEOUT_SEC, | ||
| ) |
| except multiprocessing.TimeoutError: | ||
| yield f"[error] Timeout reading logs for {self.namespace}/{name}" | ||
| except Exception as e: | ||
| yield f"[error] Failed to read pod logs for {self.namespace}/{name}: {e}" |
| def get_job_logs(self, name: str, container: str = "spark-kubernetes-driver", follow: bool = False) -> Iterator[str]: | ||
| """Get logs from a batch Spark job pod. | ||
|
|
||
| Args: | ||
| name: Job name. | ||
| container: Container name to read logs from. Default ``"driver"``. | ||
| follow: If True, stream logs continuously. |
| def submit_job( | ||
| self, | ||
| main_file: str, | ||
| name: str | None = None, | ||
| arguments: list[str] | None = None, | ||
| loader: ImageLoader | None = None, | ||
| ) -> SparkJob: |
| def submit_job( | ||
| self, | ||
| func: Callable[[SparkSession], Any] | None = None, | ||
| func_args: dict[str, Any] | None = None, | ||
| main_file: str | None = None, | ||
| main_class: str | None = None, | ||
| arguments: list[str] | None = None, | ||
| name: str | None = None, | ||
| ) -> str: | ||
| """Submit a batch Spark job. | ||
|
|
||
| Supports two modes based on parameters: | ||
| - Function mode: Pass `func` to submit a Python function with Spark transformations. | ||
| - File mode: Pass `main_file` to submit an existing Python/Jar file. | ||
|
|
||
| Args: | ||
| func: Python function that receives SparkSession (function mode). | ||
| func_args: Arguments to pass to the function. | ||
| main_file: Path to Python/Jar file (file mode). | ||
| main_class: Main class for Jar files. | ||
| arguments: Command-line arguments for the job. |
| def get_job_logs( | ||
| self, | ||
| name: str, | ||
| container: str = "spark-kubernetes-driver", | ||
| follow: bool = False, | ||
| ) -> Iterator[str]: | ||
| """Get logs from a Spark job (driver or executor).""" | ||
| return self.backend.get_job_logs(name, container=container, follow=follow) |
| job = self.get_job(name) | ||
| actual_container = constants.SPARK_CONTAINER_NAME_MAP.get(container, container) | ||
|
|
||
| if job.error_message: | ||
| yield f"[operator] {job.error_message}" | ||
|
|
||
| if not job.driver_pod_name: | ||
| return | ||
|
|
Signed-off-by: ghazariann <vahagn.ghazayan@gmail.com>
Signed-off-by: ghazariann <vahagn.ghazayan@gmail.com>
…nfo) Signed-off-by: ghazariann <vahagn.ghazayan@gmail.com>
Signed-off-by: ghazariann <vahagn.ghazayan@gmail.com>
Signed-off-by: ghazariann <vahagn.ghazayan@gmail.com>
…test Signed-off-by: ghazariann <vahagn.ghazayan@gmail.com>
…ntation Signed-off-by: ghazariann <vahagn.ghazayan@gmail.com>
Signed-off-by: ghazariann <vahagn.ghazayan@gmail.com>
Signed-off-by: ghazariann <vahagn.ghazayan@gmail.com>
…r name Signed-off-by: ghazariann <vahagn.ghazayan@gmail.com>
Signed-off-by: ghazariann <vahagn.ghazayan@gmail.com>
96a6ec4 to
ecf520b
Compare
feat(spark): implement batch job support via SparkApplication CRD (KEP-107 Phase 1)
Summary
Implements the batch job portion of Phase 1 from the Spark Client KEP. The
connect()API (SparkConnect sessions) was already merged in #225. This PRadds the batch job layer on top of the same
SparkClientandKubernetesBackend.What's added
SparkJobdataclass andSparkJobStatusenumImageLoaderabstraction withKindImageLoaderfor Kind clustersKubernetesBackendbatch methods:submit_job,get_job,list_jobs,delete_job,get_job_logs,wait_for_jobSparkClientexposes all batch methods as public APIspark_job_simple.py+wordcount.pysparkapplicationsIntentionally deferred (Phase 2)
submit_jobparameters: driver/executor resources, spark conf, image, service account, etc. (consistent withconnect()API)s3://,gs://, etc.)submit_job(func=...): function modeHow it works
Under the hood
submit_jobdoes:client API (discussed in #107)
SparkApplicationCR to the Spark Operator v1beta2 APITest plan
SPARK_TEST_CLUSTER=spark-test SPARK_TEST_NAMESPACE=spark-test uv run pytest test/e2e/spark/test_spark_examples.py::TestSparkJobExamples -v.github/workflows/test-spark-examples.yamlruns the full file including newTestSparkJobExamplesRef: #107
KEP: docs/proposals/107-spark-client/README.md