Skip to content

Commit c6a6c52

Browse files
feat: add SparkClient API for SparkConnect session management (#225)
* feat(spark): add core types, dataclasses, and constants - Add SparkConnectInfo, SparkConnectState, Driver, Executor types - Add type tests for validation - Add Kubernetes backend constants (CRD group, version, defaults) Signed-off-by: Shekhar Rajak <shekharrajak@live.com> * feat(spark): add backend base class and options pattern - Add RuntimeBackend abstract base class with session lifecycle methods - Add options pattern (Name, Image, Timeout, etc.) aligned with trainer SDK - Add validation utilities for connect parameters - Add comprehensive option tests Signed-off-by: Shekhar Rajak <shekharrajak@live.com> * feat(spark): add KubernetesBackend for SparkConnect CRD operations - Implement KubernetesBackend with create/get/list/delete session methods - Add port-forward support for out-of-cluster connections - Add CRD builder utilities and URL validation - Add comprehensive backend and utils tests with parametrized patterns Signed-off-by: Shekhar Rajak <shekharrajak@live.com> * feat(spark): add SparkClient API with KEP-107 compliant connect method - Implement SparkClient as main user interface for SparkConnect sessions - Support connect to existing server (base_url) or auto-create new session - Add public exports for SparkClient, Driver, Executor, options - Add SparkClient unit tests Signed-off-by: Shekhar Rajak <shekharrajak@live.com> * chore(spark): add test infrastructure and package init files - Add test common utilities and fixtures - Add package __init__ files for test directories - Setup test/e2e/spark structure Signed-off-by: Shekhar Rajak <shekharrajak@live.com> * feat(spark): add example scripts demonstrating SparkClient usage - Add spark_connect_simple.py with 3 usage levels (minimal, simple, advanced) - Add spark_advanced_options.py with full configuration examples - Add connect_existing_session.py for connecting to existing servers - Add demo and test scripts for local development Signed-off-by: Shekhar Rajak <shekharrajak@live.com> * docs(spark): add documentation for SparkClient and E2E testing - Add examples/spark/README.md with usage guide - Add local Spark Connect testing documentation - Add E2E test README with CI/CD integration guide - Update KEP-107 proposal documentation Signed-off-by: Shekhar Rajak <shekharrajak@live.com> * test(spark): add E2E test framework with cluster watcher - Add test_spark_examples.py with example validation tests - Add cluster_watcher.py for monitoring SparkConnect and pods during tests - Add run_in_cluster.py for executing examples as K8s Jobs Signed-off-by: Shekhar Rajak <shekharrajak@live.com> * ci(spark): add GitHub Actions workflow and E2E cluster setup - Add test-spark-examples.yaml workflow for E2E validation - Add e2e-setup-cluster.sh for Kind cluster with Spark Operator - Add SparkConnect CRD, Kind config, and E2E runner Dockerfile - Update Makefile with E2E setup target - Update PR title check for spark prefix Signed-off-by: Shekhar Rajak <shekharrajak@live.com> * chore(spark): add pyspark[connect] dependency and update lock file - Add spark extra with pyspark[connect]==3.4.1 for grpcio, pandas, pyarrow - Update uv.lock with resolved dependencies - Update .gitignore for spark-related files Signed-off-by: Shekhar Rajak <shekharrajak@live.com> * Update kubeflow/spark/backends/base.py Co-authored-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> Signed-off-by: Shekhar Prasad Rajak <5774448+Shekharrajak@users.noreply.github.com> * refactor(spark): rename backend.connect_session() to connect() Signed-off-by: Shekhar Rajak <shekharrajak@live.com> * refactor: move session creation flow from SparkClient to backend.create_and_connect() Signed-off-by: Shekhar Rajak <shekharrajak@live.com> --------- Signed-off-by: Shekhar Rajak <shekharrajak@live.com> Signed-off-by: Shekhar Prasad Rajak <5774448+Shekharrajak@users.noreply.github.com> Co-authored-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>
1 parent deed1ce commit c6a6c52

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+7698
-10
lines changed

.github/workflows/check-pr-title.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ jobs:
3939
scripts
4040
test
4141
trainer
42+
spark
4243
requireScope: false
4344
ignoreLabels: |
4445
do-not-merge/work-in-progress

.github/workflows/test-python.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@ jobs:
1515
steps:
1616
- uses: actions/checkout@v6
1717
- uses: actions/setup-python@v6
18+
with:
19+
python-version: "3.11"
1820
- uses: pre-commit/action@v3.0.1
21+
with:
22+
extra_args: --all-files
1923

2024
test:
2125
runs-on: ubuntu-latest
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
name: Spark Examples E2E Test
2+
3+
on:
4+
pull_request:
5+
paths:
6+
- 'examples/spark/**'
7+
- 'kubeflow/spark/**'
8+
- 'test/e2e/spark/**'
9+
- 'hack/crds/**'
10+
- 'hack/e2e-setup-cluster.sh'
11+
- '.github/workflows/test-spark-examples.yaml'
12+
workflow_dispatch:
13+
14+
concurrency:
15+
group: ${{ github.workflow }}-${{ github.ref }}
16+
cancel-in-progress: true
17+
18+
jobs:
19+
example-validation:
20+
runs-on: ubuntu-latest
21+
strategy:
22+
fail-fast: false
23+
matrix:
24+
kubernetes-version: ["1.32.0"]
25+
26+
steps:
27+
- name: Free disk space (Ubuntu runner)
28+
run: |
29+
sudo rm -rf /usr/share/dotnet
30+
sudo rm -rf /usr/local/share/boost
31+
sudo apt-get clean
32+
docker system prune -af || true
33+
df -h
34+
35+
- name: Checkout repository
36+
uses: actions/checkout@v4
37+
38+
- name: Set up Python 3.11
39+
uses: actions/setup-python@v5
40+
with:
41+
python-version: '3.11'
42+
43+
- name: Install uv and dependencies
44+
uses: astral-sh/setup-uv@v7
45+
with:
46+
enable-cache: true
47+
- name: Install project and test deps
48+
run: |
49+
uv sync --extra spark
50+
uv pip install pytest pytest-timeout
51+
52+
- name: Install Kind, kubectl, and Helm
53+
run: |
54+
curl -Lo ./kind "https://kind.sigs.k8s.io/dl/v0.26.0/kind-linux-amd64" && chmod +x ./kind && sudo mv ./kind /usr/local/bin/kind
55+
curl -Lo ./kubectl "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" && chmod +x ./kubectl && sudo mv ./kubectl /usr/local/bin/kubectl
56+
curl -sSfL https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash
57+
58+
- name: Checkout spark-operator
59+
uses: actions/checkout@v4
60+
with:
61+
repository: kubeflow/spark-operator
62+
path: spark-operator
63+
fetch-depth: 1
64+
65+
- name: Build Spark Operator image
66+
run: |
67+
docker build -t ghcr.io/kubeflow/spark-operator/controller:local ./spark-operator
68+
timeout-minutes: 10
69+
70+
- name: Setup Kind cluster with Spark Operator
71+
run: |
72+
make test-e2e-setup-cluster K8S_VERSION=${{ matrix.kubernetes-version }}
73+
env:
74+
SPARK_TEST_CLUSTER: spark-test
75+
SPARK_TEST_NAMESPACE: spark-test
76+
SPARK_OPERATOR_IMAGE_TAG: local
77+
timeout-minutes: 15
78+
79+
- name: Build and load Spark E2E runner image (in-cluster)
80+
run: |
81+
docker build -f hack/Dockerfile.spark-e2e-runner -t spark-e2e-runner:local .
82+
kind load docker-image spark-e2e-runner:local --name spark-test
83+
timeout-minutes: 5
84+
85+
- name: Run example validation tests
86+
run: uv run pytest test/e2e/spark/test_spark_examples.py -v --tb=short
87+
env:
88+
SPARK_TEST_CLUSTER: spark-test
89+
SPARK_TEST_NAMESPACE: spark-test
90+
SPARK_E2E_DEBUG: "1"
91+
SPARK_E2E_RUN_IN_CLUSTER: "1"
92+
SPARK_E2E_RUNNER_IMAGE: spark-e2e-runner:local
93+
timeout-minutes: 10
94+
95+
- name: Collect logs on failure
96+
if: failure()
97+
run: |
98+
kubectl get pods -n spark-test
99+
kubectl get sparkconnect -n spark-test 2>/dev/null || true
100+
kubectl logs -n spark-test -l app.kubernetes.io/component=server --tail=100 || true

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,6 @@ htmlcov/
2222

2323
# Documentation build
2424
docs/_build/
25+
26+
# Local E2E: cloned for building Spark Operator image (SPARK_OPERATOR_IMAGE_TAG=local)
27+
spark-operator/

Makefile

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ release: install-dev
9696
# make test-python will produce html coverage by default. Run with `make test-python report=xml` to produce xml report.
9797
.PHONY: test-python
9898
test-python: uv-venv ## Run Python unit tests
99-
@uv sync
99+
@uv sync --extra spark
100100
@uv run coverage run --source=kubeflow -m pytest ./kubeflow/
101101
@uv run coverage report --omit='*_test.py' --skip-covered --skip-empty
102102
ifeq ($(report),xml)
@@ -105,6 +105,17 @@ else
105105
@uv run coverage html
106106
endif
107107

108+
##@ E2E Testing
109+
110+
.PHONY: test-e2e-setup-cluster
111+
test-e2e-setup-cluster: ## Setup Kind cluster for Spark E2E tests
112+
@echo "Setting up E2E test cluster..."
113+
@K8S_VERSION=$(K8S_VERSION) \
114+
SPARK_TEST_CLUSTER=$(SPARK_TEST_CLUSTER) \
115+
SPARK_TEST_NAMESPACE=$(SPARK_TEST_NAMESPACE) \
116+
SPARK_OPERATOR_VERSION=$(SPARK_OPERATOR_VERSION) \
117+
KIND=$(KIND) \
118+
./hack/e2e-setup-cluster.sh
108119

109120
.PHONY: install-dev
110121
install-dev: uv uv-venv ruff ## Install uv, create .venv, sync deps.

docs/proposals/107-spark-client/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ resources_per_executor = {
159159
### Structured Types
160160

161161
```python
162-
from kubeflow.spark.types import Driver, Executor
162+
from kubeflow.spark import Driver, Executor
163163

164164
@dataclass
165165
class Driver:

examples/spark/README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Spark Examples
2+
3+
This directory contains examples for using the Kubeflow Spark SDK.
4+
5+
## Examples
6+
7+
- **spark_connect_simple.py** - Basic SparkClient usage with simple API
8+
- **spark_advanced_options.py** - Advanced configuration with Driver/Executor objects
9+
- **demo_existing_sparkconnect.py** - Connect to existing SparkConnect cluster
10+
- **test_connect_url.py** - Test URL-based connection to Spark Connect
11+
12+
## Prerequisites
13+
14+
Install spark dependencies:
15+
```bash
16+
uv pip install kubeflow[spark]
17+
```
18+
19+
## Running Examples
20+
21+
```bash
22+
# Run from repository root
23+
uv run python examples/spark/spark_connect_simple.py
24+
```
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
#!/usr/bin/env python3
2+
# Copyright 2025 The Kubeflow Authors.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
"""E2E Test: Connect to Existing SparkConnect Session (two-client pattern).
17+
18+
This example demonstrates the "bring your own server" use case where:
19+
1. A setup client creates a SparkConnect server
20+
2. A test client connects to the existing server via base_url
21+
22+
This validates the connect(base_url="sc://...") codepath which bypasses
23+
session creation and directly connects to an existing Spark Connect server.
24+
25+
Usage:
26+
# Run in-cluster only (via K8s Job):
27+
SPARK_E2E_RUN_IN_CLUSTER=1 python examples/spark/connect_existing_session.py
28+
"""
29+
30+
import os
31+
import sys
32+
import uuid
33+
34+
from kubeflow.common.types import KubernetesBackendConfig
35+
from kubeflow.spark import Name, SparkClient
36+
from kubeflow.spark.backends.kubernetes.utils import build_service_url
37+
38+
39+
def _backend_config():
40+
"""Backend config; uses SPARK_TEST_NAMESPACE in CI."""
41+
return KubernetesBackendConfig(namespace=os.environ.get("SPARK_TEST_NAMESPACE", "spark-test"))
42+
43+
44+
def _unique_session_name() -> str:
45+
"""Generate unique session name to avoid conflicts in E2E runs."""
46+
return f"connect-existing-{uuid.uuid4().hex[:8]}"
47+
48+
49+
def test_connect_to_existing_session():
50+
"""Test connect(base_url=...) with two clients.
51+
52+
Two-client pattern:
53+
- Setup client: creates SparkConnect server, stops SparkSession (server stays running)
54+
- Test client: connects via base_url to the existing server
55+
"""
56+
print("=" * 70)
57+
print("E2E: Connect to Existing SparkConnect Session")
58+
print("=" * 70)
59+
60+
session_name = _unique_session_name()
61+
setup_client = None
62+
test_spark = None
63+
64+
try:
65+
# Phase 1: Setup client creates SparkConnect server
66+
print("\n[Phase 1] Creating SparkConnect server...")
67+
setup_client = SparkClient(backend_config=_backend_config())
68+
setup_spark = setup_client.connect(options=[Name(session_name)], timeout=180)
69+
70+
info = setup_client.get_session(session_name)
71+
service_url = build_service_url(info)
72+
print(f" Session: {session_name}")
73+
print(f" URL: {service_url}")
74+
75+
setup_spark.stop()
76+
print(" Setup SparkSession stopped (server still running)")
77+
78+
# Phase 2: Test client connects via base_url
79+
print("\n[Phase 2] Connecting via base_url...")
80+
test_client = SparkClient(backend_config=_backend_config())
81+
test_spark = test_client.connect(base_url=service_url)
82+
print(" Connected successfully!")
83+
84+
# Phase 3: Validate with Spark operations
85+
print("\n[Phase 3] Validating...")
86+
count = test_spark.range(100).count()
87+
print(f" spark.range(100).count() = {count}")
88+
assert count == 100, f"Expected 100, got {count}"
89+
90+
print("\n[SUCCESS] connect(base_url=...) works correctly!")
91+
92+
finally:
93+
# Phase 4: Cleanup
94+
print("\n[Phase 4] Cleanup...")
95+
if test_spark:
96+
try:
97+
test_spark.stop()
98+
except Exception as e:
99+
print(f" Warning: {e}")
100+
if setup_client:
101+
try:
102+
setup_client.delete_session(session_name)
103+
print(f" Deleted {session_name}")
104+
except Exception as e:
105+
print(f" Warning: {e}")
106+
107+
108+
def main():
109+
"""Entry point for E2E test."""
110+
if os.environ.get("SPARK_E2E_RUN_IN_CLUSTER") != "1":
111+
print("SKIP: Requires in-cluster execution (SPARK_E2E_RUN_IN_CLUSTER=1)")
112+
sys.exit(0)
113+
114+
try:
115+
test_connect_to_existing_session()
116+
sys.exit(0)
117+
except Exception as e:
118+
print(f"\nFailed: {e}")
119+
sys.exit(1)
120+
121+
122+
if __name__ == "__main__":
123+
main()

0 commit comments

Comments
 (0)