Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .ciux
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apiVersion: v1alpha1
registry: k8sschool
sourcePathes:
- .ciux
- Dockerfile
- e2e/rootfs
dependencies:
- package: github.com/k8s-school/ink@v0.0.1-rc5
labels:
itest: "true"
- package: github.com/k8s-school/ktbx@v1.1.4-rc7
labels:
itest: "true"

1 change: 0 additions & 1 deletion .github/workflows/build_with_scala_and_python_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ name: sparkMeasure CI

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

Expand Down
40 changes: 40 additions & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: "e2e tests"
on:
push:
pull_request:
branches:
- master
jobs:
main:
name: Run spark-measure end-to-end tests
runs-on: ubuntu-24.04
steps:
- name: Checkout code
uses: actions/checkout@v2
# Required by ciux
with:
fetch-depth: 0
- name: Stop apparmor
run: |
sudo /etc/init.d/apparmor stop
- uses: actions/setup-go@v3
with:
go-version: '^1.21.4'
- name: Run ciux and create k8s/kind cluster
run: |
./e2e/prereq.sh
- name: Build spark-measure image
run: |
./e2e/build.sh
- name: Load spark-measure image into k8s/kind cluster
run: |
./e2e/push-image.sh -k -d
- name: Run argocd
run: |
./e2e/argocd.sh
- name: Access prometheus exporter metrics
run: |
./e2e/check-metrics.sh
# - name: Push image
# run: |
# ./push-image.sh
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
/.ciux.d/
/.vscode
.idea
target/
project/project/
project/target/
__pycache__
63 changes: 63 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
FROM sbtscala/scala-sbt:eclipse-temurin-alpine-17.0.15_6_1.11.3_2.13.16 AS builder
FROM sbtscala/scala-sbt:eclipse-temurin-alpine-17.0.15_6_1.11.3_2.13.16 AS builder


# Définir le répertoire de travail
WORKDIR /app

# Copier les fichiers SBT
COPY build.sbt ./
COPY project ./project

# Copier le code source
COPY src ./src

# Compiler le projet avec Scala 2.13.8
# ENV SCALA_VERSION=2.13.8
ENV SCALA_VERSION=2.12.18

RUN sbt ++${SCALA_VERSION} package

# Copier uniquement le JAR compilé



FROM docker.io/library/spark:3.5.6-scala2.12-java17-python3-ubuntu

USER root

# Setup for the Prometheus JMX exporter.
# Add the Prometheus JMX exporter Java agent jar for exposing metrics sent to the JmxSink to Prometheus.
ENV JMX_EXPORTER_AGENT_VERSION=1.1.0
ADD https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_AGENT_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_AGENT_VERSION}.jar /opt/spark/jars
RUN chmod 644 /opt/spark/jars/jmx_prometheus_javaagent-${JMX_EXPORTER_AGENT_VERSION}.jar

RUN apt-get update -y && apt-get install apt-transport-https curl gnupg -yqq && \
echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | tee /etc/apt/sources.list.d/sbt.list && \
echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | tee /etc/apt/sources.list.d/sbt_old.list && \
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import && \
chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg && \
apt-get update && \
apt-get install sbt -y && \
rm -rf /var/lib/apt/lists/*

ADD python /opt/src/python

RUN pip install /opt/src/python

ADD e2e/rootfs/ /

RUN mkdir -p /opt/spark/jars
COPY --from=builder /app/target/scala-2.12/*.jar /opt/spark/jars

ARG spark_uid=185
ENV spark_uid=${spark_uid}
USER ${spark_uid}

ENV PYTHONPATH=/opt/spark/python

# Exposer le port Spark
EXPOSE 4040

# Définir le point d'entrée
CMD ["/bin/bash"]
163 changes: 91 additions & 72 deletions README.md

Large diffs are not rendered by default.

23 changes: 12 additions & 11 deletions docs/Instrument_Scala_code.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
SparkMeasure can be used to instrument parts of your Scala code to measure Apache Spark workload.
Use this for example for performance troubleshooting, application instrumentation, workload studies, etc.

### Example code
You can find an example of how to instrument a Scala application running Apache Spark jobs at this link:
### Example code

You can find an example of how to instrument a Scala application running Apache Spark jobs at this link:
[link to example application](../examples/testSparkMeasureScala)

How to run the example:
```
# build the example jar
sbt package

bin/spark-submit --master local[*] --packages ch.cern.sparkmeasure:spark-measure_2.13:0.25 --class ch.cern.testSparkMeasure.testSparkMeasure <path_to_the_example_jar>/testsparkmeasurescala_2.13-0.1.jar
```

### Collect and save Stage Metrics
An example of how to collect task metrics aggregated at the stage execution level.
Some relevant snippets of code are:
Expand All @@ -29,7 +29,7 @@ Some relevant snippets of code are:
stageMetrics.runAndMeasure {
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()
}

// print report to standard output
stageMetrics.printReport()

Expand All @@ -39,11 +39,11 @@ Some relevant snippets of code are:

// Introduced in sparkMeasure v0.21, memory metrics report:
stageMetrics.printMemoryReport()

//save session metrics data
val df = stageMetrics.createStageMetricsDF("PerfStageMetrics")
stageMetrics.saveData(df.orderBy("jobId", "stageId"), "/tmp/stagemetrics_test1")

val aggregatedDF = stageMetrics.aggregateStageMetrics("PerfStageMetrics")
stageMetrics.saveData(aggregatedDF, "/tmp/stagemetrics_report_test2")
```
Expand All @@ -64,8 +64,9 @@ to study skew effects, otherwise consider using stagemetrics aggregation as pref

### Export to Prometheus PushGateway

You have the option to export aggregated stage metrics and/or task metrics to a Prometheus push gateway.
See details at: [Prometheus Pushgateway](Prometheus.md)
You have the option to export aggregated stage metrics and/or task metrics to:
- a Prometheus push gateway, see details at: [Prometheus Pushgateway](Prometheus.md)
- the JMX Prometheus exporter, see details at: [Prometheus exporter through JMX](Prometheus_through_JMX.md)

### Run sparkMeasure using the packaged version from Maven Central

Expand All @@ -89,7 +90,7 @@ See details at: [Prometheus Pushgateway](Prometheus.md)

# Run as in one of these examples:
bin/spark-submit --jars path>/spark-measure_2.13-0.26-SNAPSHOT.jar

# alternative, set classpath for the driver (it is only needed in the driver)
bin/spark-submit --conf spark.driver.extraClassPath=<path>/spark-measure_2.13-0.26-SNAPSHOT.jar ...
```
57 changes: 57 additions & 0 deletions docs/Prometheus_through_JMX.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
## Exporting sparkMeasure Metrics to Prometheus via JMX

`sparkMeasure` collects execution metrics from Spark jobs at the driver or executor level. While it does not expose its metrics directly via JMX, it can be used alongside Spark's JMX metrics system to enable Prometheus-based monitoring.

In a Kubernetes environment using the **Spark Operator**, you can configure the Spark driver and executor to expose their sparkMeasure metrics through JMX Prometheus exporter and scrape them with Prometheus.

> ✅ This setup has been validated **only** in **Kubernetes environments using the [Spark Operator](https://www.kubeflow.org/docs/components/spark-operator)**.

### Enable the JMX prometheus exporter in Spark

To configure JMX and Prometheus exporter monitoring with Spark on Kubernetes, follow the official Kubeflow Spark Operator documentation:

📖 [Monitoring with JMX and Prometheus — Kubeflow Spark Operator Guide](https://www.kubeflow.org/docs/components/spark-operator/user-guide/monitoring-with-jmx-and-prometheus/)

### Exporting `sparkMeasure` Metrics via JMX in Python

To programmatically export `sparkMeasure` metrics in Python alongside standard JMX metrics, you can leverage the `jmxexport` function from the `sparkmeasure.jmx` module. This enables custom metrics collected during job execution to be exposed through the same Prometheus exporter as native Spark metrics.

#### Example Usage

```python
from sparkmeasure import StageMetrics
from sparkmeasure.jmx import jmxexport

stage_metrics = StageMetrics(spark)
stage_metrics.begin()

# ... run your Spark jobs here ...

stage_metrics.end()
current_metrics = stagemetrics.aggregate_stagemetrics()

# export the metrics to JMX Prometheus exporter
jmxexport(spark, current_metrics)
```

The `jmxexport()` call updates the current Spark application’s JMX metrics with the `sparkMeasure` results, making them available to any configured Prometheus instance.

See a full implementation example here:
📄 [How to use the JMX exporter in Python code](../e2e/rootfs/opt/spark/examples/spark-sql.py)

---

### Prometheus Exporter Configuration

In addition to exposing metrics via JMX, you must configure the Prometheus JMX exporter in the Spark driver and executor pods to make the custom `sparkMeasure` metrics queryable by Prometheus. This configuration should be added *on top of* the existing JMX metrics exporter configuration.

Ensure your Spark pod manifest or Helm chart includes a properly configured `ConfigMap` for the JMX exporter. Specifically, you’ll need to add mappings for the custom `sparkMeasure` metrics to the YAML under the `rules` section used by the Prometheus JMX exporter.

A production-ready configuration example is available here:
📄 [How to configure the Prometheus exporter to expose sparkMeasure metrics](../e2e/charts/spark-demo/templates/jmx-configmap.yaml)

---

By combining Python-based metric collection with a Prometheus-compatible JMX exporter, you can ensure comprehensive observability for Spark applications, including custom performance instrumentation through `sparkMeasure`.

> **Security Tip:** In production environments, ensure that JMX ports are protected using appropriate Kubernetes NetworkPolicies or service mesh configurations. Avoid exposing unauthenticated JMX endpoints externally to mitigate the risk of unauthorized access.
46 changes: 23 additions & 23 deletions docs/Python_shell_and_Jupyter.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# sparkMeasure on PySpark

Notes on how to use sparkMeasure to collect Spark workload metrics when using PySpark from command line
or from a Jupyter notebook.
Notes on how to use sparkMeasure to collect Spark workload metrics when using PySpark from command line
or from a Jupyter notebook.
See also [README](../README.md) for an introduction to sparkMeasure and its architecture.

### Deployment and installation

- Use PyPi to install the Python wrapper and take the jar from Maven central:
- Use PyPi to install the Python wrapper and take the jar from Maven central:
```
pip install pyspark # Spark 4
pip install sparkmeasure
Expand All @@ -18,51 +18,51 @@ See also [README](../README.md) for an introduction to sparkMeasure and its arch
cd sparkmeasure
sbt +package
ls -l target/scala-2.13/spark-measure*.jar # note location of the compiled and packaged jar

# Install the Python wrapper package
cd python
pip install .

# Run as in one of these examples:
bin/pyspark --jars path>/spark-measure_2.13-0.26-SNAPSHOT.jar

#Alternative:
bin/pyspark --conf spark.driver.extraClassPath=<path>/spark-measure_2.13-0.26-SNAPSHOT.jar
```


### PySpark example
1. How to collect and print Spark task stage metrics using sparkMeasure, example in Python:
```python
from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)

stagemetrics.begin()
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()
stagemetrics.end()

stagemetrics.print_report()
stagemetrics.print_memory_report()

# get metrics as a dictionary
metrics = stagemetrics.aggregate_stage_metrics()
```
2. Similar to example 1, but with a shortcut to run code and measure it with one command line:
```python
from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)

stagemetrics.runandmeasure(globals(),
'spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()')

stageMetrics.print_memory_report()
```

### Jupyter notebook example

Jupyter notebooks are a popular way to interact with PySpark for data analysis.
Jupyter notebooks are a popular way to interact with PySpark for data analysis.
Example Jupyter notebook showing the use of basic sparkMeasure instrumentation:

[SparkMeasure_Jupyer_Python_getting_started.ipynb](examples/SparkMeasure_Jupyer_Python_getting_started.ipynb)

Note, in particular with Jupyter notebooks it can be handy to write cell magic to wrap the instrumentation,
Expand All @@ -84,7 +84,7 @@ def sparkmeasure(line, cell=None):
### Collecting metrics at finer granularity: use Task metrics

Collecting Spark task metrics at the granularity of each task completion has additional overhead
compare to collecting at the stage completion level, therefore this option should only be used if you need data with
compare to collecting at the stage completion level, therefore this option should only be used if you need data with
this finer granularity, for example because you want to study skew effects, otherwise consider using
stagemetrics aggregation as preferred choice.

Expand All @@ -98,7 +98,7 @@ stagemetrics aggregation as preferred choice.
taskmetrics.end()
taskmetrics.print_report()
```

```python
from sparkmeasure import TaskMetrics
taskmetrics = TaskMetrics(spark)
Expand All @@ -108,18 +108,18 @@ stagemetrics aggregation as preferred choice.

### Exporting metrics data for archiving and/or further analysis

One simple use case is to make use of the data collected and reported by stagemetrics and taskmetrics
printReport methods for immediate troubleshooting and workload analysis.
You also have options to save metrics aggregated as in the printReport output.
One simple use case is to make use of the data collected and reported by stagemetrics and taskmetrics
printReport methods for immediate troubleshooting and workload analysis.
You also have options to save metrics aggregated as in the printReport output.

Another option is to export the metrics to an external system, see details at [Prometheus Pushgateway](Prometheus.md) or or [Prometheus exporter through JMX](Prometheus_through_JMX.md).

Another option is to export the metrics to an external system, see details at [Prometheus Pushgateway](Prometheus.md)

- Example on how to export raw Stage metrics data in json format
```python
from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)
stagemetrics.runandmeasure(globals(), ...your workload here ... )

df = stagemetrics.create_stagemetrics_DF("PerfStageMetrics")
df.show()
stagemetrics.save_data(df.orderBy("jobId", "stageId"), "stagemetrics_test1", "json")
Expand Down Expand Up @@ -156,4 +156,4 @@ Stage 3 JVMHeapMemory maxVal bytes => 279558120 (266.6 MB)
Stage 3 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
```


Loading