Skip to content

Commit c214d9e

Browse files
committed
Updated the Celeborn docs
1 parent ba12446 commit c214d9e

File tree

1 file changed

+109
-161
lines changed

1 file changed

+109
-161
lines changed

website/docs/bestpractices/analytics/celeborn.md

Lines changed: 109 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ Validated with **Celeborn 0.6.2** on **Amazon EKS 1.30+**. TPC-DS 10 TB benchmar
2323
| 6 | **Ports** | Set all 4 worker ports to fixed values (9091 to 9094) | Dynamic `port=0` triggers `AssertionError` on every graceful shutdown. |
2424
| 7 | **Graceful shutdown** | `celeborn.worker.graceful.shutdown.enabled: "true"` | Without it, abrupt worker exit causes Spark jobs to hang. |
2525
| 8 | **Local shuffle reader** | `spark.sql.adaptive.localShuffleReader.enabled: "false"` on every job | If true, Spark reads from executor local disks where Celeborn data does not exist. Jobs fail with `FileNotFoundException`. |
26-
| 9 | **terminationGracePeriodSeconds** | Set to at least 600s for EBS workers, 3600s for NVMe | Kubernetes default is 30s. At 30s, SIGKILL fires before graceful shutdown can flush in-flight writes, which corrupts data and causes job failures. |
26+
| 9 | **terminationGracePeriodSeconds** | Set to at least 720s (600s graceful shutdown + 120s buffer) | Kubernetes default is 30s. If too short, SIGKILL fires before graceful shutdown completes, corrupting data and causing job failures. Must exceed `celeborn.worker.graceful.shutdown.timeout`. |
2727
| 10 | **DNS registration** | `celeborn.network.bind.preferIpAddress: "false"` | Workers register with pod IPs by default. Pod IPs change on restart, so the master ends up with stale mappings and clients can't reconnect. DNS names are stable. |
2828
| 11 | **Rolling restarts** | `kubectl delete pod` with 120s delay between workers | SIGTERM triggers graceful shutdown (requires rows 7 and 9 above). Replication covers the ~70s restart window. Zero job failures validated on TPC-DS 10 TB. |
2929
| 12 | **Decommission API** | Optional. Use for 100+ worker clusters, not required for correctness | It stops new writes to a worker but does not migrate existing shuffle data. Fetch errors still happen (20-30 per worker) and are handled by Spark retries. Simple pod delete with replication achieves the same data safety outcome. |
@@ -265,26 +265,21 @@ With NVMe, a node failure means permanent data loss. Highly recommended to enabl
265265
```yaml
266266
sparkConf:
267267
spark.celeborn.client.push.replicate.enabled: "true"
268-
spark.celeborn.client.reserveSlots.rackAware.enabled: "true" # puts replicas on different nodes
269268
```
270269

271-
:::danger
272-
Without replication on NVMe, any node termination from Karpenter consolidation, spot interruption, or hardware failure causes immediate job failure. This is not a theoretical risk.
273-
:::
274-
275-
**4. Rotate one node at a time, never two at once**
270+
**4. Rotate one node(one worker pod per node) at a time, never two at once**
276271

277-
With replication factor 1, each shuffle partition exists on exactly 2 workers. If you restart 2 workers at the same time there is a window where some partitions have zero live copies. Always rotate sequentially: decommission, drain, wait for re-registration, then move to the next worker.
272+
With replication enabled, each shuffle partition exists on 2 workers. If you restart 2 workers at the same time, there is a window where some partitions have zero live copies. Always rotate sequentially: decommission, drain, wait for re-registration, then move to the next worker.
278273

279-
**5. Set `terminationGracePeriodSeconds: 3600`**
274+
**5. Set `terminationGracePeriodSeconds` appropriately**
280275

281-
Karpenter's default drain timeout is 30 seconds. NVMe workers can take up to 10 minutes to drain active shuffle slots. Without an extended grace period Kubernetes sends SIGKILL before graceful shutdown finishes:
276+
The `terminationGracePeriodSeconds` must be longer than `celeborn.worker.graceful.shutdown.timeout` (600s by default) to allow graceful shutdown to complete before Kubernetes sends SIGKILL.
282277

283278
```yaml
284279
spec:
285280
template:
286281
spec:
287-
terminationGracePeriodSeconds: 3600
282+
terminationGracePeriodSeconds: 720 # Must exceed graceful shutdown timeout (600s) + buffer
288283
containers:
289284
- name: celeborn-worker
290285
lifecycle:
@@ -293,7 +288,12 @@ spec:
293288
command: ["/bin/sh", "-c", "/opt/celeborn/sbin/decommission-worker.sh"]
294289
```
295290

296-
Schedule NVMe maintenance during off-peak hours. Because each worker can take minutes to drain, rolling restarts on NVMe clusters take significantly longer than on EBS clusters.
291+
**Why 720s (12 minutes)?**
292+
- Celeborn graceful shutdown timeout: 600s (10 minutes)
293+
- Buffer for decommission API call and cleanup: 120s (2 minutes)
294+
- If `terminationGracePeriodSeconds` is too short, Kubernetes sends SIGKILL before graceful shutdown completes, causing data corruption
295+
296+
**For NVMe specifically:** While decommission typically drains in 0-5 seconds (only waits for in-flight writes), the graceful shutdown process still needs the full 600s to flush buffers and save metadata to RocksDB. The longer grace period ensures this completes even under heavy load.
297297

298298
---
299299

@@ -392,18 +392,31 @@ sparkConf:
392392
3. Worker ports set to `0` (dynamic) — `AssertionError` on every graceful shutdown
393393
:::
394394

395-
### NVMe at Large Scale: Increase Retries
395+
### Large Cluster Tuning (100+ Workers)
396+
397+
#### Master Sizing
396398

397-
When workers are draining active NVMe shuffle slots, which can take 2 to 5 minutes, the executors need more time before giving up:
399+
Scale masters vertically as worker count grows:
400+
401+
| Workers | Masters | Instance | Heap |
402+
|---------|---------|----------|------|
403+
| 1–50 | 3 | r8g.xlarge | 8 GB |
404+
| 51–100 | 3 | r8g.2xlarge | 16 GB |
405+
| 101–200 | 5 | r8g.4xlarge | 32 GB |
406+
| 200–500 | 5–7 | r8g.8xlarge | 64 GB |
398407

399408
```yaml
400-
sparkConf:
401-
spark.celeborn.client.fetch.maxRetriesForEachReplica: "10"
402-
spark.celeborn.data.io.retryWait: "30s"
403-
spark.celeborn.client.rpc.maxRetries: "10"
409+
# JVM tuning for masters
410+
master:
411+
jvmOptions:
412+
- "-XX:+UseG1GC"
413+
- "-XX:MaxGCPauseMillis=200"
414+
- "-XX:G1HeapRegionSize=32m"
415+
- "-Xms32g"
416+
- "-Xmx64g" # match to the master sizing table above
404417
```
405418

406-
### Large Cluster Tuning (100+ Workers)
419+
#### Worker Tuning
407420

408421
```yaml
409422
celeborn:
@@ -424,14 +437,6 @@ celeborn:
424437
celeborn.master.estimatedPartitionSize.update.interval: "10s"
425438
celeborn.master.estimatedPartitionSize.initialSize: "64mb"
426439
427-
# JVM tuning for masters
428-
master:
429-
jvmOptions:
430-
- "-XX:+UseG1GC"
431-
- "-XX:MaxGCPauseMillis=200"
432-
- "-XX:G1HeapRegionSize=32m"
433-
- "-Xms32g"
434-
- "-Xmx64g" # match to the master sizing table below
435440
```
436441

437442
---
@@ -459,14 +464,18 @@ master:
459464
| **Test 1** | Simple pod delete | 20 to 30 "file not found" | Zero failures | ~70s | ~13 min |
460465
| **Test 2** | Decommission API first | 62 errors (worker-5), 0 (worker-4) | Zero failures | ~70s | ~13 min |
461466

462-
**What we learned from Test 2:** The decommission API did **not** eliminate errors. Worker-5 had 62 "file not found" errors even though decommission completed in 0 seconds. The API stops the worker from accepting new writes but it does not migrate existing shuffle data. It relies on replication the same way a simple restart does.
467+
#### Simple Restart with Replication
463468

464-
Why decommission drained in 0 seconds: the API only waits for in-flight *writes* to finish, not for any kind of data migration. Worker-5 had 66 active slots holding 261.2 GiB of data and still drained instantly.
469+
:::caution Not Recommended for Production
470+
While our tests showed that simple pod restarts with replication enabled can work (zero job failures, zero executor losses), we **do not recommend this approach for production environments**.
465471

466-
#### Recommended Approach: Simple Restart with Replication
472+
**Production recommendation:** Always use the decommission API when performing rolling restarts or pod updates. The decommission API provides explicit coordination with the master, cleaner shutdown signals, and better observability - all critical for production operations.
473+
474+
The simple restart approach documented below is useful for understanding how Celeborn's replication and retry mechanisms work, but production deployments should use the decommission-based approach described in the next section.
475+
:::
467476

468477
```bash
469-
# Rolling restart: validated approach
478+
# Rolling restart: validated approach (testing/development only)
470479
cd data-stacks/spark-on-eks/benchmarks/celeborn-benchmarks
471480
./rolling-restart-celeborn.sh 120 # 120s pause between workers
472481
```
@@ -491,149 +500,46 @@ cd data-stacks/spark-on-eks/benchmarks/celeborn-benchmarks
491500

492501
Doing a rolling restart without graceful shutdown is not safe. GitHub issue [#3539](https://github.com/apache/celeborn/issues/3539) documents the failure mode: abrupt worker termination causes Spark jobs to hang with `"CommitManager: Worker shutdown, commit all its partition locations"`.
493502

494-
#### When to Use the Decommission API
495-
496-
The decommission API is worth using in a few specific situations:
497-
- Large clusters of 100 or more workers where you want explicit coordination with the master
498-
- Automated operations pipelines that benefit from clear lifecycle hooks
499-
- When you want cleaner shutdown signals in your logs and dashboards
503+
#### Production Approach: Decommission API
500504

501-
It does **not** migrate shuffle data, eliminate "file not found" errors, speed up restarts, or let you skip replication.
505+
**This is the recommended approach for production environments.** The decommission API provides explicit coordination with the master and cleaner operational semantics.
502506

503507
```bash
504-
# Decommission-based restart: optional, for large clusters
508+
# Decommission-based restart: production recommended
505509
cd data-stacks/spark-on-eks/benchmarks/celeborn-benchmarks
506510
./rolling-restart-celeborn-with-decommission.py --namespace celeborn --release celeborn
507511
```
508512

509-
#### Pre-Restart Checklist
513+
**Why use decommission for production:**
514+
- ✅ Explicit coordination with the master before shutdown
515+
- ✅ Cleaner shutdown signals in logs and monitoring
516+
- ✅ Better observability for automated operations
517+
- ✅ Recommended by Apache Celeborn for production deployments
510518

511-
**For EBS-backed workers:**
512-
```bash
513-
# 1. Verify replication is enabled in all running jobs
514-
kubectl logs <driver-pod> | grep "push.replicate.enabled"
515-
516-
# 2. Check disk usage (keep below 70%)
517-
bash benchmarks/celeborn-benchmarks/check-celeborn-disk-usage.sh
518-
519-
# 3. Run rolling restart
520-
bash benchmarks/celeborn-benchmarks/rolling-restart-celeborn.sh 120
521-
522-
# 4. Watch pod re-registration
523-
kubectl get pods -n celeborn -w
524-
```
519+
**Decommission workflow:**
520+
1. Send decommission request to worker API
521+
2. Worker stops accepting new shuffle writes
522+
3. Master redirects new writes to other workers
523+
4. Worker drains in-flight operations (typically 0-5 seconds)
524+
5. Delete pod, wait for re-registration
525+
6. 120s stability wait before next worker
525526

526-
**For NVMe-backed workers:**
527-
- Use the decommission API since drain takes longer with active slots
528-
- Increase the delay to 180 to 300 seconds between restarts
529-
- Never restart two workers at the same time
530-
- Schedule during off-peak hours
527+
:::tip
528+
Decommission drain time is typically 0-5 seconds because it only waits for in-flight writes to complete, not for data migration. Existing shuffle files remain on disk and rely on replicas for availability during the restart.
529+
:::
531530

532531

533532
### Storage Vertical Scaling
534533

535-
EBS volumes can be resized **online** with no pod downtime. You only need a rolling restart to apply the updated configuration values.
536-
537-
```bash
538-
# Step 1: confirm the StorageClass allows expansion
539-
kubectl get storageclass <name> -o jsonpath='{.allowVolumeExpansion}' # must be true
540-
541-
# Step 2: resize all 4 PVCs per worker
542-
NEW_SIZE="2000Gi"
543-
REPLICAS=$(kubectl get statefulset celeborn-worker -n celeborn -o jsonpath='{.spec.replicas}')
544-
for i in $(seq 0 $((REPLICAS - 1))); do
545-
for disk in 1 2 3 4; do
546-
kubectl patch pvc "data-disk${disk}-celeborn-worker-${i}" -n celeborn \
547-
-p "{\"spec\":{\"resources\":{\"requests\":{\"storage\":\"${NEW_SIZE}\"}}}}"
548-
done
549-
done
550-
kubectl get pvc -n celeborn -w # wait for Bound status
551-
552-
# Step 3: update the capacity values in your Helm values file, then upgrade
553-
# celeborn.worker.storage.dirs capacity must match the new disk size
554-
helm upgrade celeborn apache-celeborn/celeborn --namespace celeborn -f your-values.yaml
555-
```
556-
557-
:::warning
558-
`volumeClaimTemplates` is immutable in StatefulSets. This procedure patches existing PVCs only and does not change the StatefulSet template. For new PVCs when scaling out workers, use the blue-green approach instead.
559-
:::
534+
EBS volumes backing Celeborn workers can be resized online without pod restarts or data movement. Patch the existing PVCs directly and update the Helm values to match — Kubernetes handles the underlying volume expansion transparently.
560535

561536
### Blue-Green Worker Pool Upgrade
562537

563-
Use this when you need to change the instance type, storage type, or any immutable StatefulSet field.
564-
565-
```bash
566-
# 1. Deploy a new Karpenter NodePool for the target instance type
567-
kubectl apply -f celeborn-nodepool-v2.yaml
568-
569-
# 2. Deploy the new worker StatefulSet pointing to the new NodePool
570-
kubectl apply -f celeborn-worker-v2.yaml
571-
572-
# 3. Confirm both old and new workers are registered with the masters
573-
kubectl port-forward -n celeborn svc/celeborn-master-svc 9098:9098 &
574-
curl -s http://localhost:9098/api/v1/workers | jq '.registeredWorkers | length'
575-
576-
# 4. Send decommission to each old worker (worker HTTP port is 9096)
577-
REPLICAS=$(kubectl get statefulset celeborn-worker -n celeborn -o jsonpath='{.spec.replicas}')
578-
for i in $(seq 0 $((REPLICAS - 1))); do
579-
kubectl exec -n celeborn "celeborn-worker-$i" -- \
580-
curl -sf -X POST -H "Content-Type: application/json" \
581-
-d '{"type":"DECOMMISSION"}' "http://localhost:9096/api/v1/workers/exit"
582-
done
583-
584-
# 5. Wait for all old workers to finish draining
585-
while true; do
586-
DECOMM=$(curl -s http://localhost:9098/api/v1/workers | jq '.decommissioningWorkers | length')
587-
[ "$DECOMM" -eq 0 ] && break
588-
echo "$DECOMM workers still draining..."; sleep 30
589-
done
590-
591-
# 6. Remove the old pool
592-
kubectl delete statefulset celeborn-worker -n celeborn
593-
kubectl delete nodepool celeborn-workers-v1
594-
```
595-
596-
If something goes wrong, keep the old StatefulSet running until the new pool is confirmed healthy. Scale the old pool back up and decommission the new one to roll back.
597-
598-
### EKS and AMI Upgrades
599-
600-
Always use AL2023 for Celeborn nodes. AL2 is end-of-life.
601-
602-
```yaml
603-
apiVersion: karpenter.k8s.aws/v1
604-
kind: EC2NodeClass
605-
metadata:
606-
name: celeborn-node-class
607-
spec:
608-
amiFamily: AL2023
609-
amiSelectorTerms:
610-
- alias: al2023@latest
611-
role: KarpenterNodeRole
612-
subnetSelectorTerms:
613-
- tags:
614-
karpenter.sh/discovery: "<cluster-name>"
615-
securityGroupSelectorTerms:
616-
- tags:
617-
karpenter.sh/discovery: "<cluster-name>"
618-
```
619-
620-
**Per-node upgrade procedure:**
621-
```bash
622-
kubectl cordon <node-name>
538+
Two patterns exist for upgrading Celeborn workers, and the right choice depends on what is changing and how your deployment is managed.
623539

624-
# Decommission the worker before draining the node
625-
WORKER=$(kubectl get pods -n celeborn --field-selector spec.nodeName=<node-name> -o name | head -1)
626-
kubectl exec -n celeborn $WORKER -- \
627-
curl -sf -X POST -H "Content-Type: application/json" \
628-
-d '{"type":"DECOMMISSION"}' "http://localhost:9096/api/v1/workers/exit"
540+
**Worker pool replacement against shared masters** suits changes to immutable StatefulSet fields — instance type, storage class, node selectors — where only the workers need to change. Deploy a second worker StatefulSet with a new name pointing at the same existing masters, wait for the new workers to register and be confirmed healthy, then gracefully decommission the old workers via the Celeborn REST API so in-flight partitions drain completely before any pods terminate. The masters remain untouched, running Spark jobs never need to update their endpoint configuration, and rollback is as simple as scaling the old StatefulSet back up since EBS PVCs are preserved. Teams using ArgoCD should manage the second worker StatefulSet as a separate ArgoCD Application outside the primary Helm release, and remove it explicitly once migration is confirmed stable to avoid drift.
629541

630-
# Poll master until the worker finishes draining, then drain the node
631-
kubectl drain <node-name> --ignore-daemonsets --delete-emptydir-data
632-
```
633-
634-
:::danger
635-
Never drain a Celeborn node without decommissioning the worker first. Abrupt termination with active shuffle slots causes retry storms. On NVMe clusters, draining two nodes at the same time can cause permanent data loss if a partition's primary and replica both happen to be on the two nodes being drained.
636-
:::
542+
**Full blue-green cluster deployment** suits major version upgrades that change master-worker wire compatibility, changes to the master layer itself, or any scenario requiring complete isolation before cutover. A complete second Celeborn cluster — masters and workers — is deployed alongside the existing one, and cutover happens by updating `spark.celeborn.master.endpoints` in Spark job configurations. This is the natural pattern for teams using GitOps tooling like ArgoCD or Flux, where both clusters are declarative manifests in Git and the cutover is a single config change that can be promoted across environments with approval gates. The tradeoff is managing two complete HA master quorums simultaneously until all in-flight jobs on the old cluster drain. When using ArgoCD, model each cluster as a separate Application or ApplicationSet pointing at versioned Helm values — the green cluster is promoted by updating the master endpoints in your Spark job values files, which ArgoCD then syncs automatically.
637543

638544
### Node Rotation with Karpenter
639545

@@ -643,7 +549,9 @@ When Karpenter drains a node for consolidation, expiry, or drift, a `preStop` ho
643549
spec:
644550
template:
645551
spec:
646-
terminationGracePeriodSeconds: 3600 # 600s is enough for EBS; use 3600s for NVMe
552+
# Must be longer than celeborn.worker.graceful.shutdown.timeout (600s)
553+
# to allow graceful shutdown to complete before Kubernetes sends SIGKILL
554+
terminationGracePeriodSeconds: 720 # 600s graceful shutdown + 120s buffer
647555
containers:
648556
- name: celeborn-worker
649557
lifecycle:
@@ -652,16 +560,56 @@ spec:
652560
command: ["/bin/sh", "-c", "/opt/celeborn/sbin/decommission-worker.sh"]
653561
```
654562

655-
To prevent Karpenter from consolidating Celeborn nodes while jobs are running, add `karpenter.sh/do-not-disrupt: "true"` to worker pods and set a conservative disruption policy on the NodePool:
563+
### Karpenter Disruption Policy
564+
565+
**For production Celeborn clusters, disable automatic consolidation** and use controlled rolling restarts instead:
656566

657567
```yaml
568+
# Karpenter NodePool for Celeborn workers
569+
apiVersion: karpenter.sh/v1
570+
kind: NodePool
571+
metadata:
572+
name: celeborn-workers
658573
spec:
659574
disruption:
660-
consolidationPolicy: WhenEmpty # only consolidate nodes that have no pods at all
661-
expireAfter: 720h # force rotation every 30 days to pick up AMI updates
575+
consolidationPolicy: WhenEmpty # Only consolidate completely empty nodes
576+
budgets:
577+
- nodes: "0" # Prevent any automatic disruption
578+
reasons:
579+
- Underutilized
580+
- Drifted
581+
- nodes: "1" # Allow one node at a time for empty node cleanup
582+
reasons:
583+
- Empty
662584
```
663585

664-
---
586+
**Why disable automatic consolidation?**
587+
- Celeborn workers hold shuffle data that must be gracefully drained
588+
- Automatic consolidation can disrupt multipe workers simultaneously
589+
- Controlled rolling restarts (documented above) provide safer, predictable maintenance windows
590+
- The decommission API provides explicit coordination that automatic consolidation cannot guarantee
591+
592+
**For AMI updates and node rotation:**
593+
- Use the manual rolling restart procedures documented above
594+
- Schedule during off-peak hours
595+
- Control the pace (120s between workers)
596+
- Monitor for issues before proceeding
597+
598+
**And add a PodDisruptionBudget for additional safety:**
599+
600+
```yaml
601+
apiVersion: policy/v1
602+
kind: PodDisruptionBudget
603+
metadata:
604+
name: celeborn-worker-pdb
605+
namespace: celeborn
606+
spec:
607+
maxUnavailable: 1
608+
selector:
609+
matchLabels:
610+
app.kubernetes.io/name: celeborn
611+
app.kubernetes.io/component: worker
612+
```
665613

666614
## Monitoring
667615

0 commit comments

Comments
 (0)