Skip to content

Commit fa9c89e

Browse files
committed
initial version of spark client connecting to k8s spark cluster
Signed-off-by: shekharrajak <shekharrajak@live.com>
1 parent 49a5087 commit fa9c89e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+10903
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ bin
1515

1616
coverage.xml
1717
htmlcov/
18+
uv.lock
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Title: Hello Spark - Calculate Pi
4+
Level: 1 (Beginner)
5+
Target Audience: Data Scientists new to Spark
6+
Time to Run: ~2-3 minutes
7+
8+
Description:
9+
Your first Spark job! This example demonstrates how to submit a simple PySpark application
10+
that calculates Pi using the Monte Carlo method - a classic distributed computing example
11+
that shows how Spark distributes work across executors.
12+
13+
Prerequisites:
14+
- Kind cluster with Spark Operator (run ./setup_test_environment.sh)
15+
- Default namespace with 'spark-operator-spark' service account
16+
17+
What You'll Learn:
18+
- How to create a SparkClient
19+
- Submit a PySpark application
20+
- Wait for job completion
21+
- Retrieve and parse job logs
22+
- Clean up resources
23+
24+
Real-World Use Case:
25+
Distributed computation, parallel processing, Monte Carlo simulations.
26+
"""
27+
28+
from datetime import datetime
29+
import os
30+
import sys
31+
32+
# Add SDK to path for development mode
33+
sdk_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
34+
if sdk_path not in sys.path:
35+
sys.path.insert(0, sdk_path)
36+
37+
from kubeflow.spark import ( # noqa: E402
38+
ApplicationState,
39+
OperatorBackendConfig,
40+
SparkClient,
41+
)
42+
43+
44+
def main():
45+
"""Main example: Submit Pi calculation job and get results."""
46+
47+
print("=" * 80)
48+
print("EXAMPLE 01: Hello Spark - Calculate Pi")
49+
print("=" * 80)
50+
print()
51+
print("This example demonstrates:")
52+
print(" 1. Creating a Spark client")
53+
print(" 2. Submitting a PySpark application (Calculate Pi)")
54+
print(" 3. Monitoring job progress")
55+
print(" 4. Retrieving results from logs")
56+
print()
57+
58+
# Step 1: Create SparkClient with configuration
59+
print("Step 1: Creating Spark client...")
60+
config = OperatorBackendConfig(
61+
namespace=os.getenv("SPARK_NAMESPACE", "default"),
62+
service_account="spark-operator-spark",
63+
default_spark_image="docker.io/library/spark",
64+
context=os.getenv("KUBE_CONTEXT", "kind-spark-test"),
65+
enable_monitoring=False, # Keep it simple for beginners
66+
enable_ui=False, # We'll enable this in later examples
67+
)
68+
client = SparkClient(backend_config=config)
69+
print(" Client created successfully")
70+
print()
71+
72+
# Step 2: Prepare the application
73+
# Use timestamp to ensure unique name each run
74+
timestamp = datetime.now().strftime("%H%M%S")
75+
app_name = f"hello-spark-{timestamp}"
76+
77+
print("Step 2: Configuring Spark application...")
78+
print(f" App name: {app_name}")
79+
print(" Spark version: 4.0.0")
80+
print(" Resources: 1 driver + 2 executors")
81+
print(" Memory: 512m per container")
82+
print(" Example: Calculate Pi using Monte Carlo method")
83+
print()
84+
85+
# Step 3: Submit the application
86+
print("Step 3: Submitting application to Kubernetes...")
87+
88+
try:
89+
response = client.submit_application(
90+
# Application metadata
91+
app_name=app_name,
92+
main_application_file="local:///opt/spark/examples/src/main/python/pi.py",
93+
# Spark configuration
94+
spark_version="4.0.0",
95+
app_type="Python",
96+
# Resource allocation (small for demo)
97+
driver_cores=1,
98+
driver_memory="512m",
99+
executor_cores=1,
100+
executor_memory="512m",
101+
num_executors=2,
102+
# Arguments for pi calculation (number of samples)
103+
arguments=["10"], # Calculate Pi with 10 partitions
104+
# Required for Spark 4.0
105+
spark_conf={
106+
"spark.kubernetes.file.upload.path": "/tmp",
107+
},
108+
)
109+
110+
print(" Application submitted successfully!")
111+
print(f" Submission ID: {response.submission_id}")
112+
print(f" Status: {response.status}")
113+
print()
114+
115+
except Exception as e:
116+
print(f" ERROR: Submission failed: {e}")
117+
sys.exit(1)
118+
119+
# Step 4: Monitor the application
120+
print("Step 4: Monitoring application (this may take 1-2 minutes)...")
121+
print(" Waiting for pods to start...")
122+
123+
try:
124+
# Wait for completion with timeout
125+
final_status = client.wait_for_completion(
126+
submission_id=app_name,
127+
timeout=300, # 5 minutes max
128+
polling_interval=5, # Check every 5 seconds
129+
)
130+
131+
print(" Application completed!")
132+
print(f" Final state: {final_status.state.value}")
133+
print()
134+
135+
# Check if successful
136+
if final_status.state != ApplicationState.COMPLETED:
137+
print(
138+
f" WARNING: Application did not complete successfully: {final_status.state.value}"
139+
) # noqa: E501
140+
print(" Check logs below for details.")
141+
142+
except TimeoutError:
143+
print(" ERROR: Application did not complete within 5 minutes")
144+
print(" You can check status later with: client.get_status('{app_name}')")
145+
sys.exit(1)
146+
except Exception as e:
147+
print(f" ERROR: Error monitoring application: {e}")
148+
sys.exit(1)
149+
150+
# Step 5: Retrieve results from logs
151+
print("Step 5: Retrieving application logs and results...")
152+
print()
153+
154+
try:
155+
logs = list(client.get_logs(app_name))
156+
157+
# Parse and display results
158+
print("=" * 80)
159+
print("CALCULATION RESULTS:")
160+
print("=" * 80)
161+
162+
# Find the Pi calculation result
163+
pi_found = False
164+
for line in logs:
165+
if "Pi is roughly" in line:
166+
print(f"\n{line}\n")
167+
pi_found = True
168+
break
169+
170+
if not pi_found:
171+
# Show last 20 lines if Pi result not found
172+
print("Recent log lines:")
173+
for line in logs[-20:]:
174+
print(line)
175+
176+
print("=" * 80)
177+
178+
except Exception as e:
179+
print(f" WARNING: Could not retrieve logs: {e}")
180+
print(" The job may have completed but logs are not yet available")
181+
182+
# Step 6: Cleanup
183+
print()
184+
print("Step 6: Cleaning up resources...")
185+
try:
186+
client.delete_application(app_name)
187+
print(f" Application '{app_name}' deleted")
188+
except Exception as e:
189+
print(f" WARNING: Cleanup warning: {e}")
190+
print(f" You can manually delete with: kubectl delete sparkapplication {app_name}")
191+
192+
print()
193+
print("=" * 80)
194+
print("EXAMPLE COMPLETED SUCCESSFULLY!")
195+
print("=" * 80)
196+
print()
197+
print("What you learned:")
198+
print(" - How to create a SparkClient")
199+
print(" - How to submit a PySpark application")
200+
print(" - How to wait for completion")
201+
print(" - How to retrieve logs")
202+
print(" - How to clean up resources")
203+
print()
204+
print("Key SDK Methods:")
205+
print(" - SparkClient(backend_config=config) - Create client")
206+
print(" - client.submit_application(...) - Submit Spark job")
207+
print(" - client.wait_for_completion(...) - Monitor job")
208+
print(" - client.get_logs(...) - Retrieve logs")
209+
print(" - client.delete_application(...) - Cleanup")
210+
print()
211+
print("Next steps:")
212+
print(" - Try example 02: CSV data analysis")
213+
print(" - Try example 03: Interactive DataFrame exploration")
214+
print(" - Modify driver/executor resources")
215+
print(" - Try with different Spark versions")
216+
print()
217+
218+
219+
if __name__ == "__main__":
220+
main()

0 commit comments

Comments
 (0)