Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/check-pr-title.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ jobs:
scripts
test
trainer
spark
requireScope: false
ignoreLabels: |
do-not-merge/work-in-progress
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/test-python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ jobs:
steps:
- uses: actions/checkout@v6
- uses: actions/setup-python@v6
with:
python-version: "3.11"
- uses: pre-commit/action@v3.0.1
with:
extra_args: --all-files

test:
runs-on: ubuntu-latest
Expand Down
100 changes: 100 additions & 0 deletions .github/workflows/test-spark-examples.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
name: Spark Examples E2E Test

on:
pull_request:
paths:
- 'examples/spark/**'
- 'kubeflow/spark/**'
- 'test/e2e/spark/**'
- 'hack/crds/**'
- 'hack/e2e-setup-cluster.sh'
- '.github/workflows/test-spark-examples.yaml'
workflow_dispatch:

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
example-validation:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
kubernetes-version: ["1.32.0"]

steps:
- name: Free disk space (Ubuntu runner)
run: |
sudo rm -rf /usr/share/dotnet
sudo rm -rf /usr/local/share/boost
sudo apt-get clean
docker system prune -af || true
df -h

- name: Checkout repository
uses: actions/checkout@v4

- name: Set up Python 3.11
uses: actions/setup-python@v5
with:
python-version: '3.11'

- name: Install uv and dependencies
uses: astral-sh/setup-uv@v7
with:
enable-cache: true
- name: Install project and test deps
run: |
uv sync --extra spark
uv pip install pytest pytest-timeout

- name: Install Kind, kubectl, and Helm
run: |
curl -Lo ./kind "https://kind.sigs.k8s.io/dl/v0.26.0/kind-linux-amd64" && chmod +x ./kind && sudo mv ./kind /usr/local/bin/kind
curl -Lo ./kubectl "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" && chmod +x ./kubectl && sudo mv ./kubectl /usr/local/bin/kubectl
curl -sSfL https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash

- name: Checkout spark-operator
uses: actions/checkout@v4
with:
repository: kubeflow/spark-operator
path: spark-operator
fetch-depth: 1

- name: Build Spark Operator image
run: |
docker build -t ghcr.io/kubeflow/spark-operator/controller:local ./spark-operator
timeout-minutes: 10

- name: Setup Kind cluster with Spark Operator
run: |
make test-e2e-setup-cluster K8S_VERSION=${{ matrix.kubernetes-version }}
env:
SPARK_TEST_CLUSTER: spark-test
SPARK_TEST_NAMESPACE: spark-test
SPARK_OPERATOR_IMAGE_TAG: local
timeout-minutes: 15

- name: Build and load Spark E2E runner image (in-cluster)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pytest (outside cluster) → creates Job → Job Pod runs Python example → SparkClient creates SparkConnect CR → Spark Operator creates server +
  executors → backend waits for READY and builds in-cluster URL → Python connects to server, runs DataFrame ops → script exits → Job completes → test
  collects logs and cleans up.

run: |
docker build -f hack/Dockerfile.spark-e2e-runner -t spark-e2e-runner:local .
kind load docker-image spark-e2e-runner:local --name spark-test
timeout-minutes: 5

- name: Run example validation tests
run: uv run pytest test/e2e/spark/test_spark_examples.py -v --tb=short
env:
SPARK_TEST_CLUSTER: spark-test
SPARK_TEST_NAMESPACE: spark-test
SPARK_E2E_DEBUG: "1"
SPARK_E2E_RUN_IN_CLUSTER: "1"
SPARK_E2E_RUNNER_IMAGE: spark-e2e-runner:local
timeout-minutes: 10

- name: Collect logs on failure
if: failure()
run: |
kubectl get pods -n spark-test
kubectl get sparkconnect -n spark-test 2>/dev/null || true
kubectl logs -n spark-test -l app.kubernetes.io/component=server --tail=100 || true
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ htmlcov/

# Documentation build
docs/_build/

# Local E2E: cloned for building Spark Operator image (SPARK_OPERATOR_IMAGE_TAG=local)
spark-operator/
13 changes: 12 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ release: install-dev
# make test-python will produce html coverage by default. Run with `make test-python report=xml` to produce xml report.
.PHONY: test-python
test-python: uv-venv ## Run Python unit tests
@uv sync
@uv sync --extra spark
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to add pyspark dependency

@uv run coverage run --source=kubeflow -m pytest ./kubeflow/
@uv run coverage report --omit='*_test.py' --skip-covered --skip-empty
ifeq ($(report),xml)
Expand All @@ -105,6 +105,17 @@ else
@uv run coverage html
endif

##@ E2E Testing

.PHONY: test-e2e-setup-cluster
test-e2e-setup-cluster: ## Setup Kind cluster for Spark E2E tests
@echo "Setting up E2E test cluster..."
@K8S_VERSION=$(K8S_VERSION) \
SPARK_TEST_CLUSTER=$(SPARK_TEST_CLUSTER) \
SPARK_TEST_NAMESPACE=$(SPARK_TEST_NAMESPACE) \
SPARK_OPERATOR_VERSION=$(SPARK_OPERATOR_VERSION) \
KIND=$(KIND) \
./hack/e2e-setup-cluster.sh

.PHONY: install-dev
install-dev: uv uv-venv ruff ## Install uv, create .venv, sync deps.
Expand Down
2 changes: 1 addition & 1 deletion docs/proposals/107-spark-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ resources_per_executor = {
### Structured Types

```python
from kubeflow.spark.types import Driver, Executor
from kubeflow.spark import Driver, Executor

@dataclass
class Driver:
Expand Down
24 changes: 24 additions & 0 deletions examples/spark/README.md
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also update docs in Kubeflow SDK website to include SparkClient https://sdk.kubeflow.org/en/latest/

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Spark Examples

This directory contains examples for using the Kubeflow Spark SDK.

## Examples

- **spark_connect_simple.py** - Basic SparkClient usage with simple API
- **spark_advanced_options.py** - Advanced configuration with Driver/Executor objects
- **demo_existing_sparkconnect.py** - Connect to existing SparkConnect cluster
- **test_connect_url.py** - Test URL-based connection to Spark Connect

## Prerequisites

Install spark dependencies:
```bash
uv pip install kubeflow[spark]
```

## Running Examples

```bash
# Run from repository root
uv run python examples/spark/spark_connect_simple.py
```
123 changes: 123 additions & 0 deletions examples/spark/connect_existing_session.py
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to move those examples to the website https://sdk.kubeflow.org/en/latest/examples.html
That could be done in a separate PR though

Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#!/usr/bin/env python3
# Copyright 2025 The Kubeflow Authors.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please update this in copyright headers in all new files?

Suggested change
# Copyright 2025 The Kubeflow Authors.
# Copyright 2026 The Kubeflow Authors.

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""E2E Test: Connect to Existing SparkConnect Session (two-client pattern).

This example demonstrates the "bring your own server" use case where:
1. A setup client creates a SparkConnect server
2. A test client connects to the existing server via base_url

This validates the connect(base_url="sc://...") codepath which bypasses
session creation and directly connects to an existing Spark Connect server.

Usage:
# Run in-cluster only (via K8s Job):
SPARK_E2E_RUN_IN_CLUSTER=1 python examples/spark/connect_existing_session.py
"""

import os
import sys
import uuid

from kubeflow.common.types import KubernetesBackendConfig
from kubeflow.spark import Name, SparkClient
from kubeflow.spark.backends.kubernetes.utils import build_service_url


def _backend_config():
"""Backend config; uses SPARK_TEST_NAMESPACE in CI."""
return KubernetesBackendConfig(namespace=os.environ.get("SPARK_TEST_NAMESPACE", "spark-test"))


def _unique_session_name() -> str:
"""Generate unique session name to avoid conflicts in E2E runs."""
return f"connect-existing-{uuid.uuid4().hex[:8]}"


def test_connect_to_existing_session():
"""Test connect(base_url=...) with two clients.

Two-client pattern:
- Setup client: creates SparkConnect server, stops SparkSession (server stays running)
- Test client: connects via base_url to the existing server
"""
print("=" * 70)
print("E2E: Connect to Existing SparkConnect Session")
print("=" * 70)

session_name = _unique_session_name()
setup_client = None
test_spark = None

try:
# Phase 1: Setup client creates SparkConnect server
print("\n[Phase 1] Creating SparkConnect server...")
setup_client = SparkClient(backend_config=_backend_config())
setup_spark = setup_client.connect(options=[Name(session_name)], timeout=180)

info = setup_client.get_session(session_name)
service_url = build_service_url(info)
print(f" Session: {session_name}")
print(f" URL: {service_url}")

setup_spark.stop()
print(" Setup SparkSession stopped (server still running)")

# Phase 2: Test client connects via base_url
print("\n[Phase 2] Connecting via base_url...")
test_client = SparkClient(backend_config=_backend_config())
test_spark = test_client.connect(base_url=service_url)
print(" Connected successfully!")

# Phase 3: Validate with Spark operations
print("\n[Phase 3] Validating...")
count = test_spark.range(100).count()
print(f" spark.range(100).count() = {count}")
assert count == 100, f"Expected 100, got {count}"

print("\n[SUCCESS] connect(base_url=...) works correctly!")

finally:
# Phase 4: Cleanup
print("\n[Phase 4] Cleanup...")
if test_spark:
try:
test_spark.stop()
except Exception as e:
print(f" Warning: {e}")
if setup_client:
try:
setup_client.delete_session(session_name)
print(f" Deleted {session_name}")
except Exception as e:
print(f" Warning: {e}")


def main():
"""Entry point for E2E test."""
if os.environ.get("SPARK_E2E_RUN_IN_CLUSTER") != "1":
print("SKIP: Requires in-cluster execution (SPARK_E2E_RUN_IN_CLUSTER=1)")
sys.exit(0)

try:
test_connect_to_existing_session()
sys.exit(0)
except Exception as e:
print(f"\nFailed: {e}")
sys.exit(1)


if __name__ == "__main__":
main()
Loading