diff --git a/.ciux b/.ciux
new file mode 100644
index 0000000..ef70219
--- /dev/null
+++ b/.ciux
@@ -0,0 +1,14 @@
+apiVersion: v1alpha1
+registry: k8sschool
+sourcePathes:
+ - .ciux
+ - Dockerfile
+ - e2e/rootfs
+dependencies:
+ - package: github.com/k8s-school/ink@v0.0.1-rc5
+ labels:
+ itest: "true"
+ - package: github.com/k8s-school/ktbx@v1.1.4-rc7
+ labels:
+ itest: "true"
+
diff --git a/.github/workflows/build_with_scala_and_python_tests.yml b/.github/workflows/build_with_scala_and_python_tests.yml
index 62658a7..d594808 100644
--- a/.github/workflows/build_with_scala_and_python_tests.yml
+++ b/.github/workflows/build_with_scala_and_python_tests.yml
@@ -2,7 +2,6 @@ name: sparkMeasure CI
on:
push:
- branches: [ master ]
pull_request:
branches: [ master ]
diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
new file mode 100644
index 0000000..5875e6b
--- /dev/null
+++ b/.github/workflows/e2e.yml
@@ -0,0 +1,40 @@
+name: "e2e tests"
+on:
+ push:
+ pull_request:
+ branches:
+ - master
+jobs:
+ main:
+ name: Run spark-measure end-to-end tests
+ runs-on: ubuntu-24.04
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v2
+ # Required by ciux
+ with:
+ fetch-depth: 0
+ - name: Stop apparmor
+ run: |
+ sudo /etc/init.d/apparmor stop
+ - uses: actions/setup-go@v3
+ with:
+ go-version: '^1.21.4'
+ - name: Run ciux and create k8s/kind cluster
+ run: |
+ ./e2e/prereq.sh
+ - name: Build spark-measure image
+ run: |
+ ./e2e/build.sh
+ - name: Load spark-measure image into k8s/kind cluster
+ run: |
+ ./e2e/push-image.sh -k -d
+ - name: Run argocd
+ run: |
+ ./e2e/argocd.sh
+ - name: Access prometheus exporter metrics
+ run: |
+ ./e2e/check-metrics.sh
+ # - name: Push image
+ # run: |
+ # ./push-image.sh
diff --git a/.gitignore b/.gitignore
index b6a65f4..984ccdb 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,7 @@
+/.ciux.d/
+/.vscode
.idea
target/
project/project/
project/target/
+__pycache__
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..3035efd
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,50 @@
+# Use a Scala SBT base image with Java 17 for building sparkMeasure jar files
+FROM sbtscala/scala-sbt:eclipse-temurin-alpine-17.0.15_6_1.11.3_2.13.16 AS builder
+
+# Set the working directory
+WORKDIR /app
+
+# Copy the SBT configuration file
+COPY build.sbt ./
+# Copy the SBT project configuration directory
+COPY project ./project
+
+# Copy the application source code
+COPY src ./src
+
+# Compile the project with Scala 2.12.18
+ENV SCALA_VERSION=2.12.18
+RUN sbt ++${SCALA_VERSION} package
+
+# Use the official Spark image with Scala 2.12, Java 17, and Python 3 for runtime
+FROM docker.io/library/spark:3.5.6-scala2.12-java17-python3-ubuntu
+
+USER root
+
+# Set up the Prometheus JMX exporter to expose metrics to Prometheus
+ENV JMX_EXPORTER_AGENT_VERSION=1.1.0
+ADD https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_AGENT_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_AGENT_VERSION}.jar /opt/spark/jars
+RUN chmod 644 /opt/spark/jars/jmx_prometheus_javaagent-${JMX_EXPORTER_AGENT_VERSION}.jar
+
+# Add the local sparkMeasure python code to the image
+ADD python /opt/src/python
+
+# Install the local sparkMeasure python package
+RUN pip install /opt/src/python
+
+# Add rootfs filesystem, it contains python scripts for runnning end-to-end tests
+ADD e2e/rootfs/ /
+
+# Copy the compiled jar from the build stage
+COPY --from=builder /app/target/scala-2.12/*.jar /opt/spark/jars/
+
+# Set the Spark user
+ARG spark_uid=185
+ENV spark_uid=${spark_uid}
+USER ${spark_uid}
+
+# Expose port 4040 for the Spark UI
+EXPOSE 4040
+
+# Set the default entrypoint
+CMD ["/bin/bash"]
\ No newline at end of file
diff --git a/README.md b/README.md
index c102018..c353a61 100644
--- a/README.md
+++ b/README.md
@@ -7,18 +7,18 @@
[](https://pypistats.org/packages/sparkmeasure)
[](docs/Reference_SparkMeasure_API_and_Configs.md)
-SparkMeasure is a tool and a library designed to ease performance measurement and troubleshooting of
-Apache Spark jobs. It focuses on easing the collection and analysis of Spark metrics,
+SparkMeasure is a tool and a library designed to ease performance measurement and troubleshooting of
+Apache Spark jobs. It focuses on easing the collection and analysis of Spark metrics,
making it a practical choice for both developers and data engineers.
With sparkMeasure, users gain a deeper understanding of their Spark job performance,
enabling faster and more reliable data processing workflows.
### ✨ Highlights
-- **Interactive Troubleshooting:** Ideal for real-time analysis of Spark workloads in notebooks
+- **Interactive Troubleshooting:** Ideal for real-time analysis of Spark workloads in notebooks
and spark-shell/pyspark environments.
- **Development & CI/CD Integration:** Facilitates testing, measuring, and comparing execution metrics
of Spark jobs under various configurations or code changes.
-- **Batch Job Analysis:** With Flight Recorder mode sparkMeasure transparently records batch job metrics
+- **Batch Job Analysis:** With Flight Recorder mode sparkMeasure transparently records batch job metrics
for later analysis.
- **Monitoring Capabilities:** Integrates with external systems like Apache Kafka, InfluxDB,
and Prometheus Push Gateway for extensive monitoring.
@@ -28,32 +28,44 @@ and spark-shell/pyspark environments.
of Spark applications.
### 📚 Table of Contents
-- [Getting started with sparkMeasure](#getting-started-with-sparkmeasure)
- - [Demo](#demo)
- - [Examples of sparkMeasure on notebooks](#examples-of-sparkmeasure-on-notebooks)
- - [Examples of sparkMeasure on the CLI](#examples-of-sparkmeasure-on-the-cli)
-- [Setting up SparkMeasure with Spark](#setting-up-sparkmeasure-with-spark)
- - [Version vompatibility for SparkMeasure](#version-compatibility-for-sparkmeasure)
- - [Downloading sparkMeasure](#downloading-sparkmeasure)
- - [Setup examples](#setup-examples)
-- [Notes on Metrics](#notes-on-metrics)
-- [Documentation and API reference](#documentation-api-and-examples)
-- [Architecture diagram](#architecture-diagram)
-- [Concepts and FAQ](#main-concepts-underlying-sparkmeasure-implementation)
+- [SparkMeasure - a performance tool for Apache Spark](#sparkmeasure---a-performance-tool-for-apache-spark)
+ - [✨ Highlights](#highlights)
+ - [📚 Table of Contents](#tableofcontents)
+ - [Links to related work on Spark Performance](#links-to-related-work-on-spark-performance)
+ - [🚀 Quick start](#quickstart)
+ - [Examples of sparkMeasure on notebooks](#examples-of-sparkmeasure-on-notebooks)
+ - [Examples of sparkMeasure on the CLI](#examples-of-sparkmeasure-on-the-cli)
+ - [Python CLI](#python-cli)
+ - [Scala CLI](#scala-cli)
+ - [Memory report](#memory-report)
+ - [CLI example for Task Metrics:](#cli-example-for-task-metrics)
+ - [Setting Up SparkMeasure with Spark](#setting-up-sparkmeasure-with-spark)
+ - [Version Compatibility for SparkMeasure](#version-compatibility-for-sparkmeasure)
+ - [📥 Downloading SparkMeasure](#-downloading-sparkmeasure)
+ - [Setup Examples](#setup-examples)
+ - [Spark 4 with Scala 2.13](#spark-4-with-scala-213)
+ - [Spark 3 with Scala 2.12](#spark-3-with-scala-212)
+ - [Including sparkMeasure in your Spark environment](#including-sparkmeasure-in-your-spark-environment)
+ - [Running unit tests](#running-unit-tests)
+ - [Notes on Spark Metrics](#notes-on-spark-metrics)
+ - [Documentation, API, and examples](#documentation-api-and-examples)
+ - [Architecture diagram](#architecture-diagram)
+ - [Main concepts underlying sparkMeasure implementation](#main-concepts-underlying-sparkmeasure-implementation)
+ - [FAQ:](#faq)
### Links to related work on Spark Performance
-- **[Building an Apache Spark Performance Lab](https://db-blog.web.cern.ch/node/195)**
+- **[Building an Apache Spark Performance Lab](https://db-blog.web.cern.ch/node/195)**
Guide to setting up a Spark performance testing environment.
-- **[TPC-DS Benchmark with PySpark](https://github.com/LucaCanali/Miscellaneous/tree/master/Performance_Testing/TPCDS_PySpark)**
+- **[TPC-DS Benchmark with PySpark](https://github.com/LucaCanali/Miscellaneous/tree/master/Performance_Testing/TPCDS_PySpark)**
Tool for running TPC-DS with PySpark, instrumented with `sparkMeasure`.
-- **[Spark Monitoring Dashboard](https://github.com/cerndb/spark-dashboard)**
+- **[Spark Monitoring Dashboard](https://github.com/cerndb/spark-dashboard)**
Custom monitoring solution with real-time dashboards for Spark.
-- **[Flamegraphs for Profiling Spark Jobs](https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Notes/Tools_Spark_Pyroscope_FlameGraph.md)**
+- **[Flamegraphs for Profiling Spark Jobs](https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Notes/Tools_Spark_Pyroscope_FlameGraph.md)**
Guide to profiling Spark with Pyroscope and Flamegraphs.
-- **[Advanced Notes on Apache Spark](https://github.com/LucaCanali/Miscellaneous/tree/master/Spark_Notes)**
+- **[Advanced Notes on Apache Spark](https://github.com/LucaCanali/Miscellaneous/tree/master/Spark_Notes)**
Tips, configuration, and troubleshooting for Spark.
-- **[Introductory Course on Apache Spark](https://sparktraining.web.cern.ch/)**
+- **[Introductory Course on Apache Spark](https://sparktraining.web.cern.ch/)**
Beginner-friendly course on Spark fundamentals.
Main author and contact: Luca.Canali@cern.ch
@@ -68,13 +80,13 @@ Main author and contact: Luca.Canali@cern.ch
- [
Jupyter notebook on Google Colab Research](https://colab.research.google.com/github/LucaCanali/sparkMeasure/blob/master/examples/SparkMeasure_Jupyter_Colab_Example.ipynb)
-- [
Scala notebook on Databricks](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2061385495597958/2910895789597333/442806354506758/latest.html)
-
-- [
Python notebook on Databricks](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2061385495597958/2910895789597316/442806354506758/latest.html)
-
+- [
Scala notebook on Databricks](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2061385495597958/2910895789597333/442806354506758/latest.html)
+
+- [
Python notebook on Databricks](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2061385495597958/2910895789597316/442806354506758/latest.html)
+
- [
Local Python/Jupyter Notebook](examples/SparkMeasure_Jupyter_Python_getting_started.ipynb)
-### Examples of sparkMeasure on the CLI
+### Examples of sparkMeasure on the CLI
- Run locally or on hosted resources
- [](https://codespaces.new/LucaCanali/sparkMeasure)
@@ -90,7 +102,7 @@ Main author and contact: Luca.Canali@cern.ch
stagemetrics = StageMetrics(spark)
# Simple one-liner to run a Spark SQL query and measure its performance
stagemetrics.runandmeasure(globals(), 'spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()')
-
+
# Alternatively, you can use the begin() and end() methods to measure performance
# Start measuring
stagemetrics.begin()
@@ -277,9 +289,16 @@ Choose your preferred method:
--conf spark.driver.extraClassPath=/path/to/spark-measure_2.13-0.25.jar
```
+
+--
+## Running unit tests
+
+To ensure the integrity of the sparkMeasure codebase and validate your setup, you can run the built-in unit tests. These tests are designed to verify core functionality.
+[Running sparkMeasure unit tests](docs/Unit-tests.md)
+
---
## Notes on Spark Metrics
-Spark is instrumented with several metrics, collected at task execution, they are described in the documentation:
+Spark is instrumented with several metrics, collected at task execution, they are described in the documentation:
- [Spark Task Metrics docs](https://spark.apache.org/docs/latest/monitoring.html#executor-task-metrics)
Some of the key metrics when looking at a sparkMeasure report are:
@@ -294,32 +313,32 @@ To learn more about the metrics, I advise you set up your lab environment and ru
A good place to start with is [TPCDS PySpark](https://github.com/LucaCanali/Miscellaneous/tree/master/Performance_Testing/TPCDS_PySpark) - A tool you can use run TPCDS with PySpark, instrumented with sparkMeasure
---
-## Documentation, API, and examples
+## Documentation, API, and examples
SparkMeasure is one tool for many different use cases, languages, and environments:
- * [](docs/Reference_SparkMeasure_API_and_Configs.md)
+ * [](docs/Reference_SparkMeasure_API_and_Configs.md)
- [SparkMeasure's API and configurations](docs/Reference_SparkMeasure_API_and_Configs.md)
- * **Interactive mode**
+ * **Interactive mode**
Use sparkMeasure to collect and analyze Spark workload metrics in interactive mode when
working with shell or notebook environments, such as `spark-shell` (Scala), `PySpark` (Python) and/or
- from `jupyter notebooks`.
+ from `jupyter notebooks`.
- **[SparkMeasure on PySpark and Jupyter notebooks](docs/Python_shell_and_Jupyter.md)**
- **[SparkMeasure on Scala shell and notebooks](docs/Scala_shell_and_notebooks.md)**
-
-
- * **Batch and code instrumentation**
+
+
+ * **Batch and code instrumentation**
Instrument your code with the sparkMeasure API, for collecting, saving,
and analyzing Spark workload metrics data. Examples and how-to guides:
- **[Instrument Spark-Python code](docs/Instrument_Python_code.md)**
- **[Instrument Spark-Scala code](docs/Instrument_Scala_code.md)**
- See also [Spark_CPU_memory_load_testkit](https://github.com/LucaCanali/Miscellaneous/tree/master/Performance_Testing/Spark_CPU_memory_load_testkit)
as an example of how to use sparkMeasure to instrument Spark code for performance testing.
-
+
* **Flight Recorder mode**
- SparkMeasure in flight recorder will collect metrics transparently, without any need for you
- to change your code.
+ SparkMeasure in flight recorder will collect metrics transparently, without any need for you
+ to change your code.
* Metrics can be saved to a file, locally, or to a Hadoop-compliant filesystem
* or you can write metrics in near-realtime to the following sinks: InfluxDB, Apache Kafka, Prometheus PushPushgateway
* More details:
@@ -327,20 +346,20 @@ SparkMeasure is one tool for many different use cases, languages, and environmen
- **[Flight Recorder mode with InfluxDB sink](docs/Flight_recorder_mode_InfluxDBSink.md)**
- **[Flight Recorder mode with Apache Kafka sink](docs/Flight_recorder_mode_KafkaSink.md)**
- **[Flight Recorder mode with Prometheus Pushgateway sink](docs/Flight_recorder_mode_PrometheusPushgatewaySink.md)**
-
+
* **Limitations and known issues**
- * **Support for Spark Connect**
+ * **Support for Spark Connect**
SparkMeasure cannot yet provide full integration with [Spark Connect](https://spark.apache.org/docs/latest/spark-connect-overview.html)
because it needs direct access to the `SparkContext` and its listener interface. You can run sparkMeasure in **Flight Recorder** mode
on the Spark Connect drive* to capture metrics for the entire application, but per-client (Spark Connect client-side)
metrics are not reported.
- * **Single-threaded applications**
+ * **Single-threaded applications**
The sparkMeasure APIs for generating reports using `StageMetric` and `TaskMetric` are best suited
for a single-threaded driver environment. These APIs capture metrics by calculating deltas between
snapshots taken at the start and end of an execution. If multiple Spark actions run concurrently on
the Spark driver, it may result in double-counting of metric values.
-
+
* **Additional documentation and examples**
- Presentations at Spark/Data+AI Summit:
@@ -350,76 +369,76 @@ SparkMeasure is one tool for many different use cases, languages, and environmen
- [2018: SparkMeasure, a tool for performance troubleshooting of Apache Spark workloads](https://db-blog.web.cern.ch/blog/luca-canali/2018-08-sparkmeasure-tool-performance-troubleshooting-apache-spark-workloads),
- [2017: SparkMeasure blog post](http://db-blog.web.cern.ch/blog/luca-canali/2017-03-measuring-apache-spark-workload-metrics-performance-troubleshooting)
- [TODO list and known issues](docs/TODO_and_issues.md)
- - [TPCDS-PySpark](https://github.com/LucaCanali/Miscellaneous/tree/master/Performance_Testing/TPCDS_PySpark)
+ - [TPCDS-PySpark](https://github.com/LucaCanali/Miscellaneous/tree/master/Performance_Testing/TPCDS_PySpark)
a tool for running the TPCDS benchmark workload with PySpark and instrumented with sparkMeasure
---
-## Architecture diagram
+## Architecture diagram

---
-## Main concepts underlying sparkMeasure implementation
-* The tool is based on the Spark Listener interface. Listeners transport Spark executor
+## Main concepts underlying sparkMeasure implementation
+* The tool is based on the Spark Listener interface. Listeners transport Spark executor
[Task Metrics](https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Notes/Spark_TaskMetrics.md)
data from the executor to the driver.
- They are a standard part of Spark instrumentation, used by the Spark Web UI and History Server for example.
+ They are a standard part of Spark instrumentation, used by the Spark Web UI and History Server for example.
* The tool is built on multiple modules implemented as classes
- * metrics collection and processing can be at the Stage-level or Task-level. The user chooses which mode to use with the API.
+ * metrics collection and processing can be at the Stage-level or Task-level. The user chooses which mode to use with the API.
* metrics are can be buffered into memory for real-time reporting, or they can be dumped to an external
system in the "flight recorder mode".
* supported external systems are File Systems supported by the Hadoop API, InfluxDB, Apache Kafka, Prometheus Pushgateway.
* Metrics are flattened and collected into local memory structures in the driver (ListBuffer of a custom case class).
* sparkMeasure in flight recorder mode when using one between the InfluxDB sink, Apache Kafka sink, and Prometheus Pushgateway sink, does not buffer,
- but rather writes the collected metrics directly
+ but rather writes the collected metrics directly
* Metrics processing:
* metrics can be aggregated into a report showing the cumulative values for each metric
* aggregated metrics can also be returned as a Scala Map or Python dictionary
- * metrics can be converted into a Spark DataFrame for custom querying
+ * metrics can be converted into a Spark DataFrame for custom querying
* Metrics data and reports can be saved for offline analysis.
-### FAQ:
+### FAQ:
- Why measuring performance with workload metrics instrumentation rather than just using execution time measurements?
- When measuring just the jobs' elapsed time, you treat your workload as "a black box" and most often this does
- not allow you to understand the root causes of performance regression.
+ not allow you to understand the root causes of performance regression.
With workload metrics you can (attempt to) go further in understanding and perform root cause analysis,
- bottleneck identification, and resource usage measurement.
+ bottleneck identification, and resource usage measurement.
- What are Apache Spark task metrics and what can I use them for?
- Apache Spark measures several details of each task execution, including run time, CPU time,
- information on garbage collection time, shuffle metrics, and task I/O.
- See also Spark documentation for a description of the
+ information on garbage collection time, shuffle metrics, and task I/O.
+ See also Spark documentation for a description of the
[Spark Task Metrics](https://spark.apache.org/docs/latest/monitoring.html#executor-task-metrics)
- How is sparkMeasure different from Web UI/Spark History Server and EventLog?
- sparkMeasure uses the same ListenerBus infrastructure used to collect data for the Web UI and Spark EventLog.
- Spark collects metrics and other execution details and exposes them via the Web UI.
- Notably, Task execution metrics are also available through the [REST API](https://spark.apache.org/docs/latest/monitoring.html#rest-api)
- - In addition, Spark writes all details of the task execution in the EventLog file
+ - In addition, Spark writes all details of the task execution in the EventLog file
(see config of `spark.eventlog.enabled` and `spark.eventLog.dir`)
- The EventLog is used by the Spark History server + other tools and programs that can read and parse
- the EventLog file(s) for workload analysis and performance troubleshooting, see a [proof-of-concept example of reading the EventLog with Spark SQL](https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Notes/Spark_EventLog.md)
- - There are key differences that motivate this development:
+ the EventLog file(s) for workload analysis and performance troubleshooting, see a [proof-of-concept example of reading the EventLog with Spark SQL](https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Notes/Spark_EventLog.md)
+ - There are key differences that motivate this development:
- sparkMeasure can collect data at the stage completion-level, which is more lightweight than measuring
- all the tasks, in case you only need to compute aggregated performance metrics. When needed,
+ all the tasks, in case you only need to compute aggregated performance metrics. When needed,
sparkMeasure can also collect data at the task granularity level.
- sparkMeasure has an API that makes it simple to add instrumentation/performance measurements
- in notebooks and in application code for Scala, Java, and Python.
- - sparkMeasure collects data in a flat structure, which makes it natural to use Spark SQL for
+ in notebooks and in application code for Scala, Java, and Python.
+ - sparkMeasure collects data in a flat structure, which makes it natural to use Spark SQL for
workload data analysis/
- sparkMeasure can sink metrics data into external systems (Filesystem, InfluxDB, Apache Kafka, Prometheus Pushgateway)
- What are known limitations and gotchas?
- sparkMeasure does not collect all the data available in the EventLog
- See also the [TODO and issues doc](docs/TODO_and_issues.md)
- - The currently available Spark task metrics can give you precious quantitative information on
+ - The currently available Spark task metrics can give you precious quantitative information on
resources used by the executors, however there do not allow to fully perform time-based analysis of
the workload performance, notably they do not expose the time spent doing I/O or network traffic.
- - Metrics are collected on the driver, which could become a bottleneck. This is an issues affecting tools
+ - Metrics are collected on the driver, which could become a bottleneck. This is an issues affecting tools
based on Spark ListenerBus instrumentation, such as the Spark WebUI.
In addition, note that sparkMeasure in the current version buffers all data in the driver memory.
- The notable exception is when using the Flight recorder mode with InfluxDB or
+ The notable exception is when using the Flight recorder mode with InfluxDB or
Apache Kafka or Prometheus Pushgateway sink, in this case metrics are directly sent to InfluxDB/Kafka/Prometheus Pushgateway.
- - Task metrics values are collected by sparkMeasure only for successfully executed tasks. Note that
+ - Task metrics values are collected by sparkMeasure only for successfully executed tasks. Note that
resources used by failed tasks are not collected in the current version. The notable exception is
with the Flight recorder mode with InfluxDB or Apache Kafka or Prometheus Pushgateway sink.
- sparkMeasure collects and processes data in order of stage and/or task completion. This means that
@@ -432,20 +451,20 @@ SparkMeasure is one tool for many different use cases, languages, and environmen
- When should I use Stage-level metrics and when should I use Task-level metrics?
- Use stage metrics whenever possible as they are much more lightweight. Collect metrics at
- the task granularity if you need the extra information, for example if you want to study
+ the task granularity if you need the extra information, for example if you want to study
effects of skew, long tails and task stragglers.
- How can I save/sink the collected metrics?
- You can print metrics data and reports to standard output or save them to files, using
a locally mounted filesystem or a Hadoop compliant filesystem (including HDFS).
- Additionally, you can sink metrics to external systems (such as Prometheus Pushgateway).
- The Flight Recorder mode can sink to InfluxDB, Apache Kafka or Prometheus Pushgateway.
+ Additionally, you can sink metrics to external systems (such as Prometheus Pushgateway).
+ The Flight Recorder mode can sink to InfluxDB, Apache Kafka or Prometheus Pushgateway.
- How can I process metrics data?
- You can use Spark to read the saved metrics data and perform further post-processing and analysis.
See the also [Notes on metrics analysis](docs/Notes_on_metrics_analysis.md).
- How can I contribute to sparkMeasure?
- - SparkMeasure has already profited from users submitting PR contributions. Additional contributions are welcome.
- See the [TODO_and_issues list](docs/TODO_and_issues.md) for a list of known issues and ideas on what
- you can contribute.
+ - SparkMeasure has already profited from users submitting PR contributions. Additional contributions are welcome.
+ See the [TODO_and_issues list](docs/TODO_and_issues.md) for a list of known issues and ideas on what
+ you can contribute.
diff --git a/docs/Instrument_Scala_code.md b/docs/Instrument_Scala_code.md
index d6c55f1..c0e5310 100644
--- a/docs/Instrument_Scala_code.md
+++ b/docs/Instrument_Scala_code.md
@@ -3,11 +3,11 @@
SparkMeasure can be used to instrument parts of your Scala code to measure Apache Spark workload.
Use this for example for performance troubleshooting, application instrumentation, workload studies, etc.
-### Example code
-
-You can find an example of how to instrument a Scala application running Apache Spark jobs at this link:
+### Example code
+
+You can find an example of how to instrument a Scala application running Apache Spark jobs at this link:
[link to example application](../examples/testSparkMeasureScala)
-
+
How to run the example:
```
# build the example jar
@@ -15,7 +15,7 @@ sbt package
bin/spark-submit --master local[*] --packages ch.cern.sparkmeasure:spark-measure_2.13:0.25 --class ch.cern.testSparkMeasure.testSparkMeasure /testsparkmeasurescala_2.13-0.1.jar
```
-
+
### Collect and save Stage Metrics
An example of how to collect task metrics aggregated at the stage execution level.
Some relevant snippets of code are:
@@ -29,7 +29,7 @@ Some relevant snippets of code are:
stageMetrics.runAndMeasure {
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()
}
-
+
// print report to standard output
stageMetrics.printReport()
@@ -39,11 +39,11 @@ Some relevant snippets of code are:
// Introduced in sparkMeasure v0.21, memory metrics report:
stageMetrics.printMemoryReport()
-
+
//save session metrics data
val df = stageMetrics.createStageMetricsDF("PerfStageMetrics")
stageMetrics.saveData(df.orderBy("jobId", "stageId"), "/tmp/stagemetrics_test1")
-
+
val aggregatedDF = stageMetrics.aggregateStageMetrics("PerfStageMetrics")
stageMetrics.saveData(aggregatedDF, "/tmp/stagemetrics_report_test2")
```
@@ -64,8 +64,9 @@ to study skew effects, otherwise consider using stagemetrics aggregation as pref
### Export to Prometheus PushGateway
-You have the option to export aggregated stage metrics and/or task metrics to a Prometheus push gateway.
-See details at: [Prometheus Pushgateway](Prometheus.md)
+You have the option to export aggregated stage metrics and/or task metrics to:
+- a Prometheus push gateway, see details at: [Prometheus Pushgateway](Prometheus.md)
+- the JMX Prometheus exporter, see details at: [Prometheus exporter through JMX](Prometheus_through_JMX.md)
### Run sparkMeasure using the packaged version from Maven Central
@@ -89,7 +90,7 @@ See details at: [Prometheus Pushgateway](Prometheus.md)
# Run as in one of these examples:
bin/spark-submit --jars path>/spark-measure_2.13-0.26-SNAPSHOT.jar
-
+
# alternative, set classpath for the driver (it is only needed in the driver)
bin/spark-submit --conf spark.driver.extraClassPath=/spark-measure_2.13-0.26-SNAPSHOT.jar ...
```
diff --git a/docs/Prometheus_through_JMX.md b/docs/Prometheus_through_JMX.md
new file mode 100644
index 0000000..b8a8306
--- /dev/null
+++ b/docs/Prometheus_through_JMX.md
@@ -0,0 +1,57 @@
+## Exporting sparkMeasure Metrics to Prometheus via JMX
+
+`sparkMeasure` collects execution metrics from Spark jobs at the driver or executor level. While it does not expose its metrics directly via JMX, it can be used alongside Spark's JMX metrics system to enable Prometheus-based monitoring.
+
+In a Kubernetes environment using the **Spark Operator**, you can configure the Spark driver and executor to expose their sparkMeasure metrics through JMX Prometheus exporter and scrape them with Prometheus.
+
+> ✅ This setup has been validated **only** in **Kubernetes environments using the [Spark Operator](https://www.kubeflow.org/docs/components/spark-operator)**.
+
+### Enable the JMX prometheus exporter in Spark
+
+To configure JMX and Prometheus exporter monitoring with Spark on Kubernetes, follow the official Kubeflow Spark Operator documentation:
+
+📖 [Monitoring with JMX and Prometheus — Kubeflow Spark Operator Guide](https://www.kubeflow.org/docs/components/spark-operator/user-guide/monitoring-with-jmx-and-prometheus/)
+
+### Exporting `sparkMeasure` Metrics via JMX in Python
+
+To programmatically export `sparkMeasure` metrics in Python alongside standard JMX metrics, you can leverage the `jmxexport` function from the `sparkmeasure.jmx` module. This enables custom metrics collected during job execution to be exposed through the same Prometheus exporter as native Spark metrics.
+
+#### Example Usage
+
+```python
+from sparkmeasure import StageMetrics
+from sparkmeasure.jmx import jmxexport
+
+stage_metrics = StageMetrics(spark)
+stage_metrics.begin()
+
+# ... run your Spark jobs here ...
+
+stage_metrics.end()
+current_metrics = stagemetrics.aggregate_stagemetrics()
+
+# export the metrics to JMX Prometheus exporter
+jmxexport(spark, current_metrics)
+```
+
+ The `jmxexport()` call updates the current Spark application’s JMX metrics with the `sparkMeasure` results, making them available to any configured Prometheus instance.
+
+See a full implementation example here:
+📄 [How to use the JMX exporter in Python code](../e2e/rootfs/opt/spark/examples/spark-sql.py)
+
+---
+
+### Prometheus Exporter Configuration
+
+In addition to exposing metrics via JMX, you must configure the Prometheus JMX exporter in the Spark driver and executor pods to make the custom `sparkMeasure` metrics queryable by Prometheus. This configuration should be added *on top of* the existing JMX metrics exporter configuration.
+
+Ensure your Spark pod manifest or Helm chart includes a properly configured `ConfigMap` for the JMX exporter. Specifically, you’ll need to add mappings for the custom `sparkMeasure` metrics to the YAML under the `rules` section used by the Prometheus JMX exporter.
+
+A production-ready configuration example is available here:
+📄 [How to configure the Prometheus exporter to expose sparkMeasure metrics](../e2e/charts/spark-demo/templates/jmx-configmap.yaml)
+
+---
+
+By combining Python-based metric collection with a Prometheus-compatible JMX exporter, you can ensure comprehensive observability for Spark applications, including custom performance instrumentation through `sparkMeasure`.
+
+> **Security Tip:** In production environments, ensure that JMX ports are protected using appropriate Kubernetes NetworkPolicies or service mesh configurations. Avoid exposing unauthenticated JMX endpoints externally to mitigate the risk of unauthorized access.
diff --git a/docs/Python_shell_and_Jupyter.md b/docs/Python_shell_and_Jupyter.md
index 3e01f90..e063a74 100644
--- a/docs/Python_shell_and_Jupyter.md
+++ b/docs/Python_shell_and_Jupyter.md
@@ -1,12 +1,12 @@
# sparkMeasure on PySpark
-Notes on how to use sparkMeasure to collect Spark workload metrics when using PySpark from command line
-or from a Jupyter notebook.
+Notes on how to use sparkMeasure to collect Spark workload metrics when using PySpark from command line
+or from a Jupyter notebook.
See also [README](../README.md) for an introduction to sparkMeasure and its architecture.
### Deployment and installation
-- Use PyPi to install the Python wrapper and take the jar from Maven central:
+- Use PyPi to install the Python wrapper and take the jar from Maven central:
```
pip install pyspark # Spark 4
pip install sparkmeasure
@@ -18,32 +18,32 @@ See also [README](../README.md) for an introduction to sparkMeasure and its arch
cd sparkmeasure
sbt +package
ls -l target/scala-2.13/spark-measure*.jar # note location of the compiled and packaged jar
-
+
# Install the Python wrapper package
cd python
pip install .
-
+
# Run as in one of these examples:
bin/pyspark --jars path>/spark-measure_2.13-0.26-SNAPSHOT.jar
-
+
#Alternative:
bin/pyspark --conf spark.driver.extraClassPath=/spark-measure_2.13-0.26-SNAPSHOT.jar
```
-
-
+
+
### PySpark example
1. How to collect and print Spark task stage metrics using sparkMeasure, example in Python:
```python
from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)
-
+
stagemetrics.begin()
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()
stagemetrics.end()
stagemetrics.print_report()
stagemetrics.print_memory_report()
-
+
# get metrics as a dictionary
metrics = stagemetrics.aggregate_stage_metrics()
```
@@ -51,18 +51,18 @@ See also [README](../README.md) for an introduction to sparkMeasure and its arch
```python
from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)
-
+
stagemetrics.runandmeasure(globals(),
'spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()')
-
+
stageMetrics.print_memory_report()
```
### Jupyter notebook example
-Jupyter notebooks are a popular way to interact with PySpark for data analysis.
+Jupyter notebooks are a popular way to interact with PySpark for data analysis.
Example Jupyter notebook showing the use of basic sparkMeasure instrumentation:
-
+
[SparkMeasure_Jupyer_Python_getting_started.ipynb](examples/SparkMeasure_Jupyer_Python_getting_started.ipynb)
Note, in particular with Jupyter notebooks it can be handy to write cell magic to wrap the instrumentation,
@@ -84,7 +84,7 @@ def sparkmeasure(line, cell=None):
### Collecting metrics at finer granularity: use Task metrics
Collecting Spark task metrics at the granularity of each task completion has additional overhead
-compare to collecting at the stage completion level, therefore this option should only be used if you need data with
+compare to collecting at the stage completion level, therefore this option should only be used if you need data with
this finer granularity, for example because you want to study skew effects, otherwise consider using
stagemetrics aggregation as preferred choice.
@@ -98,7 +98,7 @@ stagemetrics aggregation as preferred choice.
taskmetrics.end()
taskmetrics.print_report()
```
-
+
```python
from sparkmeasure import TaskMetrics
taskmetrics = TaskMetrics(spark)
@@ -108,18 +108,18 @@ stagemetrics aggregation as preferred choice.
### Exporting metrics data for archiving and/or further analysis
-One simple use case is to make use of the data collected and reported by stagemetrics and taskmetrics
-printReport methods for immediate troubleshooting and workload analysis.
-You also have options to save metrics aggregated as in the printReport output.
+One simple use case is to make use of the data collected and reported by stagemetrics and taskmetrics
+printReport methods for immediate troubleshooting and workload analysis.
+You also have options to save metrics aggregated as in the printReport output.
+
+Another option is to export the metrics to an external system, see details at [Prometheus Pushgateway](Prometheus.md) or or [Prometheus exporter through JMX](Prometheus_through_JMX.md).
-Another option is to export the metrics to an external system, see details at [Prometheus Pushgateway](Prometheus.md)
-
- Example on how to export raw Stage metrics data in json format
```python
from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)
stagemetrics.runandmeasure(globals(), ...your workload here ... )
-
+
df = stagemetrics.create_stagemetrics_DF("PerfStageMetrics")
df.show()
stagemetrics.save_data(df.orderBy("jobId", "stageId"), "stagemetrics_test1", "json")
@@ -156,4 +156,4 @@ Stage 3 JVMHeapMemory maxVal bytes => 279558120 (266.6 MB)
Stage 3 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
```
-
+
diff --git a/docs/Scala_shell_and_notebooks.md b/docs/Scala_shell_and_notebooks.md
index 36e2001..4845a65 100644
--- a/docs/Scala_shell_and_notebooks.md
+++ b/docs/Scala_shell_and_notebooks.md
@@ -3,8 +3,8 @@
Notes on how to use sparkMeasure to collect Spark workload metrics with Scala shell or a Scala notebook.
See also [README](../README.md) for an introduction to sparkMeasure and its architecture.
-
-### Run sparkMeasure using the packaged version from Maven Central
+
+### Run sparkMeasure using the packaged version from Maven Central
- The alternative, see paragraph above, is to build a jar from master.
```
@@ -22,21 +22,21 @@ See also [README](../README.md) for an introduction to sparkMeasure and its arch
cd sparkmeasure
sbt +package
ls -l target/scala-2.13/spark-measure*.jar # location of the compiled jar
-
+
# Run as in one of these examples:
bin/spark-shell --jars /spark-measure_2.13-0.26-SNAPSHOT.jar
-
+
# Alternative, set classpath for the driver (the JAR is only needed in the driver)
bin/spark-shell --conf spark.driver.extraClassPath=/spark-measure_2.11-0.24-SNAPSHOT.jar
```
### Example: collect and print stage metrics with sparkMeasure
-
+
1. Measure metrics at the Stage level, a basic example:
```
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.13:0.25
-
- val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
+
+ val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
stageMetrics.runAndMeasure(spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show)
// get metrics as a Map
@@ -95,7 +95,7 @@ Stage 0 duration => 355 (0.4 s)
Stage 1 duration => 411 (0.4 s)
Stage 3 duration => 98 (98 ms)
```
-
+
- New in sparkMeasure v01: memory metrics report:
```
> stageMetrics.printMemoryReport
@@ -112,11 +112,11 @@ Stage 3 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
2. An alternative way to collect and print metrics:
```scala
- val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
+ val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
stageMetrics.begin()
-
+
...execute one or more Spark jobs...
-
+
stageMetrics.end()
stageMetrics.printReport()
```
@@ -124,7 +124,7 @@ Stage 3 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
### Collecting metrics at finer granularity: use Task metrics
Collecting Spark task metrics at the granularity of each task completion has additional overhead
-compare to collecting at the stage completion level, therefore this option should only be used if you need data with
+compare to collecting at the stage completion level, therefore this option should only be used if you need data with
this finer granularity, for example because you want
to study skew effects, otherwise consider using stagemetrics aggregation as preferred choice.
@@ -138,16 +138,16 @@ to study skew effects, otherwise consider using stagemetrics aggregation as pref
### Exporting metrics data for archiving and/or further analysis
-One simple use case is to make use of the data collected and reported by stagemetrics and taskmetrics
-printReport methods for immediate troubleshooting and workload analysis.
-You also have options to save metrics aggregated as in the printReport output.
-Another option is to export the metrics to an external system, such as [Prometheus Pushgateway](Prometheus.md)
-
+One simple use case is to make use of the data collected and reported by stagemetrics and taskmetrics
+printReport methods for immediate troubleshooting and workload analysis.
+You also have options to save metrics aggregated as in the printReport output.
+Another option is to export the metrics to an external system, such as [Prometheus Pushgateway](Prometheus.md) or [Prometheus exporter through JMX](Prometheus_through_JMX.md).
+
- Example on how to export raw "StageMetrics" into a DataFrame and save data in json format
```scala
- val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
+ val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
stageMetrics.runAndMeasure( ...your workload here ... )
-
+
val df = stageMetrics.createStageMetricsDF("PerfStageMetrics")
df.show()
stageMetrics.saveData(df.orderBy("jobId", "stageId"), "/tmp/stagemetrics_test1")
@@ -156,9 +156,9 @@ Another option is to export the metrics to an external system, such as [Promethe
- Example, save aggregated metrics (as found in the printReport output) in json format
```scala
- val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
+ val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
stageMetrics.runAndMeasure( ...your workload here ... )
-
+
val df = stageMetrics.createStageMetricsDF("PerfStageMetrics")
val aggregatedDF = stageMetrics.aggregateStageMetrics("PerfStageMetrics")
aggregatedDF.show()
@@ -166,9 +166,9 @@ Another option is to export the metrics to an external system, such as [Promethe
```
Aggregated data in name,value format:
```scala
- val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
+ val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
stageMetrics.runAndMeasure( ...your workload here ... )
-
+
val df = stageMetrics.aggregateStageMetrics.toList.toDF("name","value")
stageMetrics.saveData(df, "/tmp/stagemetrics_report_test3")
```
diff --git a/docs/Unit-tests.md b/docs/Unit-tests.md
new file mode 100644
index 0000000..c9f7156
--- /dev/null
+++ b/docs/Unit-tests.md
@@ -0,0 +1,62 @@
+## How to Run Unit Tests in sparkMeasure
+
+This guide explains how to run both the **Scala unit tests** (via `sbt`) and the **Python integration tests** for the `sparkMeasure` project.
+
+---
+
+### 1. Run Scala Unit Tests
+
+To execute the built-in unit tests for the core sparkMeasure Scala codebase:
+
+```bash
+sbt test
+````
+
+This will compile the project and run all tests defined under `src/test/scala/`.
+
+---
+
+### 2. Run Python Integration Tests
+
+#### a. Create and Activate a Python Virtual Environment
+
+From the root of the repository:
+
+```bash
+python3 -m venv ~/venv/sparkmeasure
+source ~/venv/sparkmeasure/bin/activate
+```
+
+#### b. Install Python Dependencies
+
+```bash
+pip install -r python/requirements.txt
+```
+
+#### c. Build the sparkMeasure JAR
+
+Python tests require the JAR built from the Scala code:
+
+```bash
+sbt package
+```
+
+This generates the JAR in `target/scala-2.12/` or `target/scala-2.13/`.
+
+#### d. Run the Python Tests
+
+```bash
+pytest python/sparkmeasure -vvv -s
+```
+
+---
+
+### Notes
+
+* Ensure the JAR is up-to-date and present in the expected `target/scala-*/` directory.
+* Python tests require a working Spark installation (`SPARK_HOME` may need to be set).
+* Scala and Python tests are independent; run both for full validation.
+* Test sources:
+
+ * Scala: `src/test/scala/`
+ * Python: `python/sparkmeasure/test_*.py`
\ No newline at end of file
diff --git a/e2e/argocd.sh b/e2e/argocd.sh
new file mode 100755
index 0000000..976937c
--- /dev/null
+++ b/e2e/argocd.sh
@@ -0,0 +1,44 @@
+#!/bin/bash
+
+# Install pre-requisite for fink ci
+
+# @author Fabrice Jammes
+
+set -euxo pipefail
+
+DIR=$(cd "$(dirname "$0")"; pwd -P)
+PROJECT_DIR=$(cd "$DIR/.."; pwd -P)
+
+ciux ignite --selector itest "$PROJECT_DIR"
+
+# Run the CD pipeline
+export CIUXCONFIG=$PROJECT_DIR/.ciux.d/ciux_itest.sh
+. $CIUXCONFIG
+app_name="$CIUX_IMAGE_NAME"
+
+NS=argocd
+
+argocd login --core
+kubectl config set-context --current --namespace="$NS"
+
+# Add support for Github PR, which do not have a branch name in the git repository
+if [ "${GITHUB_EVENT_NAME:-}" = "pull_request" ]; then
+ revision="$GITHUB_HEAD_REF"
+else
+ revision="$SPARKMEASURE_WORKBRANCH"
+fi
+
+argocd app create $app_name --dest-server https://kubernetes.default.svc \
+ --dest-namespace "$app_name" \
+ --repo https://github.com/k8s-school/$app_name \
+ --path e2e/charts/apps --revision "$revision" \
+ -p spec.source.targetRevision.default="$revision"
+
+argocd app sync $app_name
+
+argocd app set spark-jobs -p image.tag="$CIUX_IMAGE_TAG"
+
+argocd app sync -l app.kubernetes.io/part-of=$app_name,app.kubernetes.io/component=operator
+argocd app wait -l app.kubernetes.io/part-of=$app_name,app.kubernetes.io/component=operator
+
+argocd app sync -l app.kubernetes.io/part-of=$app_name
\ No newline at end of file
diff --git a/e2e/build.sh b/e2e/build.sh
new file mode 100755
index 0000000..f1fe8a2
--- /dev/null
+++ b/e2e/build.sh
@@ -0,0 +1,41 @@
+#!/bin/bash
+
+
+# Build image containing integration tests for sparkMeasure over Kubernetes
+
+# @author Fabrice Jammes
+
+set -euxo pipefail
+
+DIR=$(cd "$(dirname "$0")"; pwd -P)
+PROJECT_DIR=$(cd "$DIR/.."; pwd -P)
+
+usage() {
+ cat << EOD
+
+Usage: `basename $0` [options]
+
+ Available options:
+ -h this message
+
+Build image containing integration tests for sparkMeasure over Kubernetes
+EOD
+}
+
+# get the options
+while getopts h c ; do
+ case $c in
+ h) usage ; exit 0 ;;
+ \?) usage ; exit 2 ;;
+ esac
+done
+shift `expr $OPTIND - 1`
+
+ciux ignite --selector build $PROJECT_DIR
+. $PROJECT_DIR/.ciux.d/ciux_build.sh
+
+echo "Building Docker image for sparkMeasure integration tests"
+docker image build --tag "$CIUX_IMAGE_URL" "$PROJECT_DIR"
+
+echo "Build successful"
+
diff --git a/e2e/charts/apps/Chart.yaml b/e2e/charts/apps/Chart.yaml
new file mode 100644
index 0000000..976bbe0
--- /dev/null
+++ b/e2e/charts/apps/Chart.yaml
@@ -0,0 +1,23 @@
+apiVersion: v2
+name: demo-sparkmeasure
+description: Deploy a sparkmeasure demo with argoCD
+
+# A chart can be either an 'application' or a 'library' chart.
+#
+# Application charts are a collection of templates that can be packaged into versioned archives
+# to be deployed.
+#
+# Library charts provide useful utilities or functions for the chart developer. They're included as
+# a dependency of application charts to inject those utilities and functions into the rendering
+# pipeline. Library charts do not define any templates and therefore cannot be deployed.
+type: application
+
+# This is the chart version. This version number should be incremented each time you make changes
+# to the chart and its templates, including the app version.
+# Versions are expected to follow Semantic Versioning (https://semver.org/)
+version: 0.1.0
+
+# This is the version number of the application being deployed. This version number should be
+# incremented each time you make changes to the application. Versions are not expected to
+# follow Semantic Versioning. They should reflect the version the application is using.
+appVersion: "1.0"
diff --git a/e2e/charts/apps/templates/prereq-spark-operator.yaml b/e2e/charts/apps/templates/prereq-spark-operator.yaml
new file mode 100644
index 0000000..b0dc95b
--- /dev/null
+++ b/e2e/charts/apps/templates/prereq-spark-operator.yaml
@@ -0,0 +1,7 @@
+# Required by sparkJobNamespaces
+apiVersion: v1
+kind: Namespace
+metadata:
+ name: {{ .Values.spec.sparkAppName }}
+ annotations:
+ argocd.argoproj.io/hook: PreSync
diff --git a/e2e/charts/apps/templates/spark-jobs.yaml b/e2e/charts/apps/templates/spark-jobs.yaml
new file mode 100644
index 0000000..05792e6
--- /dev/null
+++ b/e2e/charts/apps/templates/spark-jobs.yaml
@@ -0,0 +1,25 @@
+apiVersion: argoproj.io/v1alpha1
+kind: Application
+metadata:
+ name: {{ .Values.spec.sparkAppName }}
+ namespace: argocd
+ labels:
+ app.kubernetes.io/name: {{ .Values.spec.sparkAppName }}
+ app.kubernetes.io/part-of: sparkmeasure
+ finalizers:
+ - resources-finalizer.argocd.argoproj.io
+spec:
+ destination:
+ server: {{ .Values.spec.destination.server }}
+ namespace: {{ .Values.spec.sparkAppName }}
+ project: default
+ source:
+ path: e2e/charts/{{ .Values.spec.sparkAppName }}
+ repoURL: {{ .Values.spec.source.repoURL }}
+ targetRevision: {{ .Values.spec.source.targetRevision.default }}
+ helm:
+ releaseName: {{ .Values.spec.sparkAppName }}
+ syncPolicy:
+ syncOptions:
+ - CreateNamespace=true
+ - ApplyOutOfSyncOnly=true
\ No newline at end of file
diff --git a/e2e/charts/apps/templates/spark-operator.yaml b/e2e/charts/apps/templates/spark-operator.yaml
new file mode 100644
index 0000000..69f6121
--- /dev/null
+++ b/e2e/charts/apps/templates/spark-operator.yaml
@@ -0,0 +1,34 @@
+apiVersion: argoproj.io/v1alpha1
+kind: Application
+metadata:
+ name: spark-operator
+ namespace: argocd
+ labels:
+ app.kubernetes.io/name: spark-operator
+ app.kubernetes.io/part-of: sparkmeasure
+ app.kubernetes.io/component: operator
+ finalizers:
+ - resources-finalizer.argocd.argoproj.io
+spec:
+ project: default
+ source:
+ chart: spark-operator
+ repoURL: https://kubeflow.github.io/spark-operator
+ targetRevision: 2.1.0
+ helm:
+ releaseName: spark-operator
+ valuesObject:
+ webhook:
+ enable: true
+ spark:
+ # required by sparkJobNamespaces, to allow to create pods in spark application namespaces
+ jobNamespaces: ["{{.Values.spec.sparkAppName}}"]
+ destination:
+ server: {{ .Values.spec.destination.server }}
+ namespace: spark-operator
+ syncPolicy:
+ syncOptions:
+ - CreateNamespace=true
+ # See https://github.com/argoproj/argo-cd/issues/820 and https://github.com/argoproj/argo-cd/issues/13100
+ - ServerSideApply=true
+ - ApplyOutOfSyncOnly=true
diff --git a/e2e/charts/apps/values.yaml b/e2e/charts/apps/values.yaml
new file mode 100644
index 0000000..aba8d50
--- /dev/null
+++ b/e2e/charts/apps/values.yaml
@@ -0,0 +1,8 @@
+spec:
+ destination:
+ server: https://kubernetes.default.svc
+ sparkAppName: spark-jobs
+ source:
+ repoURL: https://github.com/k8s-school/sparkmeasure
+ targetRevision:
+ default: nil
diff --git a/e2e/charts/spark-jobs/Chart.yaml b/e2e/charts/spark-jobs/Chart.yaml
new file mode 100644
index 0000000..07d53de
--- /dev/null
+++ b/e2e/charts/spark-jobs/Chart.yaml
@@ -0,0 +1,23 @@
+apiVersion: v2
+name: spark-jobs
+description: Deploy spark applications
+
+# A chart can be either an 'application' or a 'library' chart.
+#
+# Application charts are a collection of templates that can be packaged into versioned archives
+# to be deployed.
+#
+# Library charts provide useful utilities or functions for the chart developer. They're included as
+# a dependency of application charts to inject those utilities and functions into the rendering
+# pipeline. Library charts do not define any templates and therefore cannot be deployed.
+type: application
+
+# This is the chart version. This version number should be incremented each time you make changes
+# to the chart and its templates, including the app version.
+# Versions are expected to follow Semantic Versioning (https://semver.org/)
+version: 0.1.0
+
+# This is the version number of the application being deployed. This version number should be
+# incremented each time you make changes to the application. Versions are not expected to
+# follow Semantic Versioning. They should reflect the version the application is using.
+appVersion: "1.0"
diff --git a/e2e/charts/spark-jobs/templates/jmx-configmap.yaml b/e2e/charts/spark-jobs/templates/jmx-configmap.yaml
new file mode 100644
index 0000000..55b8ef5
--- /dev/null
+++ b/e2e/charts/spark-jobs/templates/jmx-configmap.yaml
@@ -0,0 +1,111 @@
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: spark-jmx-config
+data:
+ prometheus.yaml: |2
+ lowercaseOutputName: true
+ attrNameSnakeCase: true
+ rules:
+ # Debugging rule
+ # - pattern: ".*"
+ # name: jmx_debug_all
+ # type: GAUGE
+ # Rules for sparkMeasure
+ - pattern: sparkmeasure\.metrics<>Value
+ name: sparkmeasure_$3_$4
+ type: GAUGE
+ labels:
+ app_namespace: "$1"
+ app_id: "$2"
+ - pattern: sparkmeasure\.metrics<>Count
+ name: sparkmeasure_$3_$4
+ type: COUNTER
+ labels:
+ app_namespace: "$1"
+ app_id: "$2"
+ # Rules from https://github.com/prometheus/jmx_exporter/blob/main/examples/spark.yml
+ - pattern: metrics<>Value
+ name: spark_driver_$3_$4
+ type: GAUGE
+ labels:
+ app_namespace: "$1"
+ app_id: "$2"
+ - pattern: metrics<>Value
+ name: spark_streaming_driver_$4
+ type: GAUGE
+ labels:
+ app_namespace: "$1"
+ app_id: "$2"
+ - pattern: metrics<>Value
+ name: spark_structured_streaming_driver_$4
+ type: GAUGE
+ labels:
+ app_namespace: "$1"
+ app_id: "$2"
+ query_name: "$3"
+ - pattern: metrics<>Value
+ name: spark_executor_$4
+ type: GAUGE
+ labels:
+ app_namespace: "$1"
+ app_id: "$2"
+ executor_id: "$3"
+ - pattern: metrics<>Count
+ name: spark_driver_DAGScheduler_$3_count
+ type: COUNTER
+ labels:
+ app_namespace: "$1"
+ app_id: "$2"
+ - pattern: metrics<>Count
+ name: spark_driver_HiveExternalCatalog_$3_count
+ type: COUNTER
+ labels:
+ app_namespace: "$1"
+ app_id: "$2"
+ - pattern: metrics<>Count
+ name: spark_driver_CodeGenerator_$3_count
+ type: COUNTER
+ labels:
+ app_namespace: "$1"
+ app_id: "$2"
+ - pattern: metrics<>Count
+ name: spark_driver_LiveListenerBus_$3_count
+ type: COUNTER
+ labels:
+ app_namespace: "$1"
+ app_id: "$2"
+ - pattern: metrics<>Value
+ name: spark_driver_LiveListenerBus_$3
+ type: GAUGE
+ labels:
+ app_namespace: "$1"
+ app_id: "$2"
+ - pattern: metrics<>Count
+ name: spark_executor_$4_count
+ type: COUNTER
+ labels:
+ app_namespace: "$1"
+ app_id: "$2"
+ executor_id: "$3"
+ - pattern: metrics<>Value
+ name: spark_executor_$4_$5
+ type: GAUGE
+ labels:
+ app_namespace: "$1"
+ app_id: "$2"
+ executor_id: "$3"
+ - pattern: metrics<>Count
+ name: spark_executor_HiveExternalCatalog_$4_count
+ type: COUNTER
+ labels:
+ app_namespace: "$1"
+ app_id: "$2"
+ executor_id: "$3"
+ - pattern: metrics<>Count
+ name: spark_executor_CodeGenerator_$4_count
+ type: COUNTER
+ labels:
+ app_namespace: "$1"
+ app_id: "$2"
+ executor_id: "$3"
diff --git a/e2e/charts/spark-jobs/templates/spark-pi.yaml b/e2e/charts/spark-jobs/templates/spark-pi.yaml
new file mode 100644
index 0000000..8487942
--- /dev/null
+++ b/e2e/charts/spark-jobs/templates/spark-pi.yaml
@@ -0,0 +1,22 @@
+apiVersion: "sparkoperator.k8s.io/v1beta2"
+kind: SparkApplication
+metadata:
+ name: spark-pi
+spec:
+ type: Python
+ mode: cluster
+ image: "{{ .Values.image.repository }}/{{ .Values.image.name }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
+ imagePullPolicy: "{{ .Values.image.pullPolicy }}"
+ mainApplicationFile: local:///opt/spark/examples/spark-pi.py
+ arguments:
+ - "10"
+ sparkVersion: "3.5.6"
+ driver:
+ cores: 1
+ coreLimit: "1200m"
+ memory: "512m"
+ serviceAccount: spark-operator-spark
+ executor:
+ cores: 1
+ instances: 1
+ memory: "512m"
\ No newline at end of file
diff --git a/e2e/charts/spark-jobs/templates/spark-sql.yaml b/e2e/charts/spark-jobs/templates/spark-sql.yaml
new file mode 100644
index 0000000..422bc3b
--- /dev/null
+++ b/e2e/charts/spark-jobs/templates/spark-sql.yaml
@@ -0,0 +1,40 @@
+apiVersion: "sparkoperator.k8s.io/v1beta2"
+kind: SparkApplication
+metadata:
+ name: spark-sql
+spec:
+ type: Python
+ mode: cluster
+ image: "{{ .Values.image.repository }}/{{ .Values.image.name }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
+ imagePullPolicy: "{{ .Values.image.pullPolicy }}"
+ mainApplicationFile: local:///opt/spark/examples/spark-sql.py
+ sparkVersion: "3.5.6"
+ driver:
+ cores: 1
+ coreLimit: "1200m"
+ memory: "512m"
+ serviceAccount: spark-operator-spark
+ volumeMounts:
+ - name: jmx-config
+ mountPath: /config-jmx
+ executor:
+ cores: 1
+ instances: 1
+ memory: "512m"
+ volumeMounts:
+ - name: jmx-config
+ mountPath: /config-jmx
+ volumes:
+ - name: jmx-config
+ configMap:
+ name: spark-jmx-config
+ monitoring:
+ exposeDriverMetrics: true
+ exposeExecutorMetrics: true
+ prometheus:
+ # This is the path to the jmx_exporter jar in the spark container image, added at build time
+ jmxExporterJar: "/opt/spark/jars/jmx_prometheus_javaagent-1.1.0.jar"
+ configFile: /config-jmx/prometheus.yaml
+ port: 8090
+
+
diff --git a/e2e/charts/spark-jobs/values.yaml b/e2e/charts/spark-jobs/values.yaml
new file mode 100644
index 0000000..84c98a2
--- /dev/null
+++ b/e2e/charts/spark-jobs/values.yaml
@@ -0,0 +1,10 @@
+image:
+ pullPolicy: IfNotPresent
+ repository: k8sschool
+ name: sparkmeasure
+ tag: "v0.0.1-rc1"
+jmxConfigMountPath: /opt/jmx-config
+jmxConfig: |
+ rules:
+ - pattern: "org.apache.spark.metrics.MetricsSystem.*"
+ - pattern: ".*"
diff --git a/e2e/check-metrics.sh b/e2e/check-metrics.sh
new file mode 100755
index 0000000..32a3b3a
--- /dev/null
+++ b/e2e/check-metrics.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+set -euxo pipefail
+
+DIR=$(cd "$(dirname "$0")"; pwd -P)
+PROJECT_DIR=$(cd "$DIR/.."; pwd -P)
+export CIUXCONFIG=$PROJECT_DIR/.ciux.d/ciux_itest.sh
+. $CIUXCONFIG
+app_name="$CIUX_IMAGE_NAME"
+
+timeout="480s"
+
+pod="spark-sql-driver"
+namespace="spark-jobs"
+
+while ! kubectl get pods -n $namespace | grep -q $pod; do
+ echo "Waiting for Spark SQL driver pod to be created..."
+ sleep 5
+done
+
+echo "Spark SQL driver pod is created:"
+kubectl get pods -n $namespace
+
+echo "Waiting for Spark SQL driver to be ready..."
+kubectl wait -n $namespace --for=condition=Ready pod/$pod --timeout="$timeout"
+
+while ! kubectl exec -n $namespace $pod -- curl -s http://localhost:8090/metrics; do
+ echo "Waiting for Spark SQL driver metrics to be available..."
+ sleep 5
+done
+
+argocd app wait -l app.kubernetes.io/part-of="$app_name"
\ No newline at end of file
diff --git a/e2e/prereq.sh b/e2e/prereq.sh
new file mode 100755
index 0000000..cf9e14f
--- /dev/null
+++ b/e2e/prereq.sh
@@ -0,0 +1,39 @@
+#!/bin/bash
+
+# Install pre-requisite for fink ci
+
+# @author Fabrice Jammes
+
+set -euxo pipefail
+
+DIR=$(cd "$(dirname "$0")"; pwd -P)
+PROJECT_DIR=$(cd "$DIR/.."; pwd -P)
+
+ciux_version="v0.0.5-rc4"
+go install github.com/k8s-school/ciux@"$ciux_version"
+
+echo "Ignite the project using ciux"
+ciux ignite --selector itest $PROJECT_DIR
+
+export CIUXCONFIG=$PROJECT_DIR/.ciux.d/ciux_itest.sh
+cluster_name=$(ciux get clustername $PROJECT_DIR)
+monitoring=false
+
+# Get kind version from option -k
+while getopts mk: flag
+do
+ case "${flag}" in
+ k) kind_version_opt=--kind-version=${OPTARG};;
+ m) monitoring=true;;
+ esac
+done
+
+ktbx install kind
+ktbx install kubectl
+ktbx install helm
+ink "Create kind cluster"
+ktbx create -s --name $cluster_name
+ink "Install OLM"
+ktbx install olm
+ink "Install ArgoCD operator"
+ktbx install argocd
diff --git a/e2e/push-image.sh b/e2e/push-image.sh
new file mode 100755
index 0000000..2f32b50
--- /dev/null
+++ b/e2e/push-image.sh
@@ -0,0 +1,55 @@
+#!/usr/bin/env bash
+
+# Push image to Docker Hub or load it inside kind
+
+# @author Fabrice Jammes, IN2P3
+
+set -euxo pipefail
+
+DIR=$(cd "$(dirname "$0")"; pwd -P)
+PROJECT_DIR=$(cd "$DIR/.."; pwd -P)
+
+
+usage() {
+ cat << EOD
+
+Usage: `basename $0` [options] path host [host ...]
+
+ Available options:
+ -h this message
+ -k development mode: load image in kind
+ -d do not push image to remote registry
+
+Push image to remote registry and/or load it inside kind
+EOD
+}
+
+kind=false
+registry=true
+
+# get the options
+while getopts dhk c ; do
+ case $c in
+ h) usage ; exit 0 ;;
+ k) kind=true ;;
+ d) registry=false ;;
+ \?) usage ; exit 2 ;;
+ esac
+done
+shift `expr $OPTIND - 1`
+
+if [ $# -ne 0 ] ; then
+ usage
+ exit 2
+fi
+
+export CIUXCONFIG=$DIR/../.ciux.d/ciux_build.sh
+$(ciux get image --check $PROJECT_DIR --env)
+
+if [ $kind = true ]; then
+ cluster_name=$(ciux get clustername $PROJECT_DIR)
+ kind load docker-image "$CIUX_IMAGE_URL" --name "$cluster_name"
+fi
+if [ $registry = true ]; then
+ docker push "$CIUX_IMAGE_URL"
+fi
diff --git a/e2e/rootfs/opt/spark/examples/spark-pi.py b/e2e/rootfs/opt/spark/examples/spark-pi.py
new file mode 100644
index 0000000..fb84e3b
--- /dev/null
+++ b/e2e/rootfs/opt/spark/examples/spark-pi.py
@@ -0,0 +1,45 @@
+import sys
+from random import random
+from operator import add
+from time import sleep
+
+from sparkmeasure import StageMetrics
+from sparkmeasure.jmx import jmxexport
+from pyspark.sql import SparkSession
+from py4j.protocol import Py4JJavaError
+
+if __name__ == "__main__":
+ """
+ Usage: pi [partitions]
+ """
+ spark = SparkSession\
+ .builder\
+ .appName("PythonPi")\
+ .getOrCreate()
+
+ partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
+ n = 100000 * partitions
+
+ def f(_: int) -> float:
+ x = random() * 2 - 1
+ y = random() * 2 - 1
+ return 1 if x ** 2 + y ** 2 <= 1 else 0
+
+ stagemetrics = StageMetrics(spark)
+ stagemetrics.begin()
+
+ count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
+ print("Pi is roughly %f" % (4.0 * count / n))
+
+ stagemetrics.end()
+
+ stagemetrics.print_report()
+
+ try:
+ stagemetrics.print_memory_report()
+ except Py4JJavaError:
+ print("Memory report failed, retrying (see https://github.com/LucaCanali/sparkMeasure/blob/master/README.md#examples-of-sparkmeasure-on-the-cli)")
+ sleep(5)
+ stagemetrics.print_memory_report()
+
+ spark.stop()
diff --git a/e2e/rootfs/opt/spark/examples/spark-sql.py b/e2e/rootfs/opt/spark/examples/spark-sql.py
new file mode 100644
index 0000000..dbc46da
--- /dev/null
+++ b/e2e/rootfs/opt/spark/examples/spark-sql.py
@@ -0,0 +1,57 @@
+"""
+A simple example demonstrating the use of sparkMeasure to instrument Python code running Apache Spark workloads
+also expose the metrics to a prometheus exporter
+"""
+
+import logging
+
+import time
+from pyspark.sql import SparkSession
+from sparkmeasure import StageMetrics
+from sparkmeasure.jmx import jmxexport
+
+def run_my_workload(spark):
+
+ stagemetrics = StageMetrics(spark)
+
+ stagemetrics.begin()
+ spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()
+ stagemetrics.end()
+
+ # print report to standard output
+ stagemetrics.print_report()
+
+ # get metrics data as a dictionary
+ current_metrics = stagemetrics.aggregate_stagemetrics()
+ print(f"metrics elapsedTime = {current_metrics.get('elapsedTime')}")
+
+ # export metrics to JMX Prometheus exporter
+ jmxexport(spark, current_metrics)
+
+ # save session metrics data in json format (default)
+ df = stagemetrics.create_stagemetrics_DF("PerfStageMetrics")
+ stagemetrics.save_data(df.orderBy("jobId", "stageId"), "/tmp/stagemetrics_test1")
+
+ aggregatedDF = stagemetrics.aggregate_stagemetrics_DF("PerfStageMetrics")
+ stagemetrics.save_data(aggregatedDF, "/tmp/stagemetrics_report_test2")
+
+if __name__ == "__main__":
+
+ logging.basicConfig(level=logging.INFO)
+
+ # The Spark session is expected to be already up, created by spark-submit,
+ # which handles also adding the sparkmeasure jar we just need to get a reference to it
+ spark = (SparkSession
+ .builder
+ .appName("Test sparkmeasure instrumentation of Python/PySpark code")
+ .getOrCreate()
+ )
+ # run Spark workload with instrumentation
+ run_my_workload(spark)
+
+ # Wait for a while to allow metrics to be collected and exported
+ # This is just for demonstration purposes, in a real application you would not need to wait
+ logging.info("Waiting for 1 hour to allow metrics to be collected and exported...")
+ time.sleep(3600)
+
+ spark.stop()
diff --git a/e2e/run-all.sh b/e2e/run-all.sh
new file mode 100755
index 0000000..ff2029d
--- /dev/null
+++ b/e2e/run-all.sh
@@ -0,0 +1,46 @@
+#!/bin/bash
+
+# Run the entire end-to-end test workflow
+
+# @author Fabrice Jammes
+
+set -euxo pipefail
+
+DIR=$(cd "$(dirname "$0")"; pwd -P)
+PROJECT_DIR=$(cd "$DIR/.."; pwd -P)
+
+usage() {
+ cat << EOD
+
+Usage: `basename $0` [options]
+
+ Available options:
+ -h this message
+
+Run the entire end-to-end test workflow for sparkMeasure over Kubernetes
+EOD
+}
+
+# get the options
+while getopts h c ; do
+ case $c in
+ h) usage ; exit 0 ;;
+ \?) usage ; exit 2 ;;
+ esac
+done
+
+# Build spark container image with latest sparkMeasure code
+$DIR/build.sh
+
+# Push the image to the registry (docker registry credentials must be set in the environment)
+$DIR/push-image.sh
+
+# Create and configure the Kubernetes cluster
+$DIR/prereq.sh
+
+# Run continuous deployment with ArgoCD for e2e tests
+$DIR/argocd.sh
+
+# Check sparkMeasure metrics avalaibility through prometheus exporter
+$DIR/check-metrics.sh
+
diff --git a/python/sparkmeasure/jmx.py b/python/sparkmeasure/jmx.py
new file mode 100644
index 0000000..c6f29f8
--- /dev/null
+++ b/python/sparkmeasure/jmx.py
@@ -0,0 +1,25 @@
+from typing import Union, Dict
+
+import logging
+from sparkmeasure import StageMetrics
+import datetime
+
+logger = logging.getLogger(__name__)
+
+def jmxexport(spark_session, metrics: Dict[str, Union[float, int]]):
+ """Publish metrics to Dropwizard via JMX."""
+ try:
+ publish_metrics_count = 0
+ dropwizard = spark_session._jvm.ch.cern.metrics.DropwizardMetrics
+ for key, value in metrics.items():
+ # or setGauge
+ is_counter = True
+ dropwizard.setMetricAutoType(key, float(value))
+ # Example counter to track the number of times metrics have been published
+ publish_metrics_count += 1
+
+ dropwizard.setMetricAutoType("metrics_published_total", float(publish_metrics_count))
+
+ logger.info("%d Dropwizard metrics published via JMX", len(metrics))
+ except Exception as e:
+ logger.error("Failed to publish Dropwizard metrics: %s", e)
diff --git a/python/sparkmeasure/test_jmx.py b/python/sparkmeasure/test_jmx.py
new file mode 100644
index 0000000..ca95ac7
--- /dev/null
+++ b/python/sparkmeasure/test_jmx.py
@@ -0,0 +1,75 @@
+# Test for sparkmeasure/jmx.py
+import re
+
+# Note this requires pytest and pyspark to be installed
+from . import StageMetrics
+from .jmx import jmxexport
+from .testutils import setup_sparksession
+
+def test_jmxexport(setup_sparksession):
+ spark = setup_sparksession
+ conf = spark.sparkContext.getConf()
+ conf.set("spark.sql.shuffle.partitions", "2")
+ conf.set("spark.default.parallelism", "2")
+ stagemetrics = StageMetrics(spark)
+
+ stagemetrics.begin()
+ spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()
+ stagemetrics.end()
+
+ # print report to standard output
+ stagemetrics.print_report()
+
+ # get metrics data as a dictionary
+ metrics = stagemetrics.aggregate_stagemetrics()
+ print(f"metrics elapsedTime = {metrics.get('elapsedTime')}")
+ assert metrics.get('elapsedTime') > 0
+
+ jmxexport(spark, metrics)
+
+ dropwizard = spark._jvm.ch.cern.metrics.DropwizardMetrics
+
+ # Regex pour matcher la sortie attendue avec une étoile pour resultSize
+ expected_regex = (
+ r"Metrics for namespace 'unknown' and pod 'unknown'\n"
+ r"Total gauges: 2\n"
+ r"Gauge: unknown\.unknown\.resultSize = [0-9]+(\.[0-9]+)?\n"
+ r"Gauge: unknown\.unknown\.peakExecutionMemory = 0\.0\n"
+ r"Total counters: 19\n"
+ r"Counter: unknown\.unknown\.bytesRead = 0\n"
+ r"Counter: unknown\.unknown\.numStages = 3\n"
+ r"Counter: unknown\.unknown\.shuffleRecordsWritten = [0-9]+\n"
+ r"Counter: unknown\.unknown\.shuffleRemoteBytesRead = 0\n"
+ r"Counter: unknown\.unknown\.shuffleLocalBlocksFetched = [0-9]+\n"
+ r"Counter: unknown\.unknown\.shuffleTotalBlocksFetched = [0-9]+\n"
+ r"Counter: unknown\.unknown\.memoryBytesSpilled = 0\n"
+ r"Counter: unknown\.unknown\.bytesWritten = 0\n"
+ r"Counter: unknown\.unknown\.numTasks = [0-9]+\n"
+ r"Counter: unknown\.unknown\.recordsWritten = 0\n"
+ r"Counter: unknown\.unknown\.shuffleRecordsRead = [0-9]+\n"
+ r"Counter: unknown\.unknown\.recordsRead = 2000\n"
+ r"Counter: unknown\.unknown\.shuffleLocalBytesRead = [0-9]+\n"
+ r"Counter: unknown\.unknown\.shuffleBytesWritten = [0-9]+\n"
+ r"Counter: unknown\.unknown\.shuffleTotalBytesRead = [0-9]+\n"
+ r"Counter: unknown\.unknown\.metrics_published_total = 30\n"
+ r"Counter: unknown\.unknown\.diskBytesSpilled = 0\n"
+ r"Counter: unknown\.unknown\.shuffleRemoteBytesReadToDisk = 0\n"
+ r"Counter: unknown\.unknown\.shuffleRemoteBlocksFetched = 0\n"
+ r"Total timers: 10\n"
+ r"Timer: unknown.unknown.shuffleWriteTime = count=1\n"
+ r"Timer: unknown.unknown.stageDuration = count=1\n"
+ r"Timer: unknown.unknown.executorCpuTime = count=1\n"
+ r"Timer: unknown.unknown.shuffleFetchWaitTime = count=1\n"
+ r"Timer: unknown.unknown.executorRunTime = count=1\n"
+ r"Timer: unknown.unknown.jvmGCTime = count=1\n"
+ r"Timer: unknown.unknown.elapsedTime = count=1\n"
+ r"Timer: unknown.unknown.executorDeserializeCpuTime = count=1\n"
+ r"Timer: unknown.unknown.resultSerializationTime = count=1\n"
+ r"Timer: unknown.unknown.executorDeserializeTime = count=1\n"
+ )
+
+ output = dropwizard.describeMetrics()
+ print(output)
+ assert re.search(expected_regex, output, re.MULTILINE | re.DOTALL), f"Output does not match expected regex.\nGot:\n{output}\nWant:\n{expected_regex}"
+
+ spark.stop()
diff --git a/python/sparkmeasure/test_stagemetrics.py b/python/sparkmeasure/test_stagemetrics.py
index bd4ad94..77b90ca 100644
--- a/python/sparkmeasure/test_stagemetrics.py
+++ b/python/sparkmeasure/test_stagemetrics.py
@@ -1,22 +1,6 @@
# Test for sparkmeasure/stagemetrics.py
-
-# Note this requires pytest and pyspark to be installed
-from pyspark.sql import SparkSession
from . import StageMetrics
-import pytest
-
-@pytest.fixture
-def setup_sparksession():
- # Note this is supposed to run after sbt package on the sparkmeasure project
- # so that we can get the jar file from the target folder
- SPARKMEASURE_JAR_FOLDER = "target/scala-2.12/"
- spark = (SparkSession.builder
- .appName("Test sparkmeasure instrumentation of Python/PySpark code")
- .master("local[*]")
- .config("spark.jars", SPARKMEASURE_JAR_FOLDER + "*.jar")
- .getOrCreate()
- )
- return spark
+from .testutils import setup_sparksession
def test_stagemetrics(setup_sparksession):
spark = setup_sparksession
diff --git a/python/sparkmeasure/test_taskmetrics.py b/python/sparkmeasure/test_taskmetrics.py
index cff637a..a1d3cda 100644
--- a/python/sparkmeasure/test_taskmetrics.py
+++ b/python/sparkmeasure/test_taskmetrics.py
@@ -1,22 +1,7 @@
# Test for sparkmeasure/stagemetrics.py
-
-# Note this requires pytest and pyspark to be installed
-from pyspark.sql import SparkSession
from . import TaskMetrics
-import pytest
-
-@pytest.fixture
-def setup_sparksession():
- # Note this is supposed to run after sbt package on the sparkmeasure project
- # so that we can get the jar file from the target folder
- SPARKMEASURE_JAR_FOLDER = "target/scala-2.12/"
- spark = (SparkSession.builder
- .appName("Test sparkmeasure instrumentation of Python/PySpark code")
- .master("local[*]")
- .config("spark.jars", SPARKMEASURE_JAR_FOLDER + "*.jar")
- .getOrCreate()
- )
- return spark
+from .testutils import setup_sparksession
+
def test_stagemetrics(setup_sparksession):
spark = setup_sparksession
diff --git a/python/sparkmeasure/testutils.py b/python/sparkmeasure/testutils.py
new file mode 100644
index 0000000..a9f9739
--- /dev/null
+++ b/python/sparkmeasure/testutils.py
@@ -0,0 +1,33 @@
+# Note this requires pytest and pyspark to be installed
+import pytest
+from pyspark.sql import SparkSession
+import glob
+import os
+
+@pytest.fixture
+def setup_sparksession():
+ # Note this is supposed to run after sbt package on the sparkmeasure project
+ # so that we can get the jar file from the target folder
+ SPARKMEASURE_JAR_FOLDER = "target/scala-2.12/"
+
+ # Scan SPARKMEASURE_JAR_FOLDER for the spark-measure jar file
+ # Set the directory you want to scan
+ pattern = 'spark-measure_*.jar' # Change this to your desired pattern
+
+ # Use glob to find files matching the pattern
+ files = glob.glob(os.path.join(SPARKMEASURE_JAR_FOLDER, pattern))
+
+ if len(files) == 0:
+ raise FileNotFoundError(f"No files matching pattern '{pattern}' found in {SPARKMEASURE_JAR_FOLDER}")
+ elif len(files) > 1:
+ raise FileExistsError(f"Multiple files matching pattern '{pattern}' found in {SPARKMEASURE_JAR_FOLDER}: {files}")
+
+ jarfile=files[0]
+
+ spark = (SparkSession.builder
+ .appName("Test sparkmeasure instrumentation of Python/PySpark code")
+ .master("local[*]")
+ .config("spark.jars", jarfile)
+ .getOrCreate()
+ )
+ return spark
\ No newline at end of file
diff --git a/run_pytest.sh b/run_pytest.sh
new file mode 100755
index 0000000..bb1cd40
--- /dev/null
+++ b/run_pytest.sh
@@ -0,0 +1,3 @@
+. ~/venv/fink/bin/activate
+sbt package
+pytest python/sparkmeasure -vvv -s
diff --git a/src/main/scala/ch/cern/sparkmeasure/DropwizardMetrics.scala b/src/main/scala/ch/cern/sparkmeasure/DropwizardMetrics.scala
new file mode 100644
index 0000000..b70deaf
--- /dev/null
+++ b/src/main/scala/ch/cern/sparkmeasure/DropwizardMetrics.scala
@@ -0,0 +1,113 @@
+package ch.cern.metrics
+
+import java.io.File
+import org.slf4j.LoggerFactory
+import scala.io.Source
+
+import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer}
+import com.codahale.metrics.jmx.JmxReporter
+
+object DropwizardMetrics {
+ val registry = new MetricRegistry()
+ private val logger = LoggerFactory.getLogger(getClass)
+
+ private def getNamespace(): String = {
+ val path = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
+ val file = new File(path)
+ if (file.exists && file.canRead)
+ Source.fromFile(file).getLines().mkString.trim
+ else "unknown"
+ }
+
+ private def getPodName(): String = {
+ sys.env.getOrElse("HOSTNAME", "unknown")
+ }
+
+ // Simple cache local pour éviter double registration
+ private val knownGauges = scala.collection.mutable.Set[String]()
+ private val knownCounters = scala.collection.mutable.Set[String]()
+ private val knownTimers = scala.collection.mutable.Set[String]()
+
+ // Démarre le JMX reporter une seule fois
+ private val reporter: JmxReporter = JmxReporter
+ .forRegistry(registry)
+ .inDomain("sparkmeasure.metrics") // <== domaine JMX
+ .build()
+
+ reporter.start()
+
+def setMetricAutoType(shortname: String, value: Double): Unit = {
+ val lower = shortname.toLowerCase
+
+ val metricType =
+ if (lower.contains("time") || lower.contains("duration")) "timer"
+ else if (lower.contains("peak") || lower.contains("size")) "gauge"
+ else "counter"
+
+ setMetric(shortname, value, metricType)
+}
+
+def setMetric(shortname: String, value: Double, metricType: String): Unit = {
+ val kind = metricType.toLowerCase
+ val name = s"${getNamespace()}.${getPodName()}.$shortname"
+
+ logger.debug(s"[JMX] Setting $kind: $shortname = $value")
+
+ kind match {
+ case "counter" =>
+ if (!knownCounters.contains(name)) {
+ counters(name) = registry.counter(name)
+ knownCounters.add(name)
+ }
+ val current = counters(name).getCount
+ val delta = value.toLong - current
+ if (delta > 0) {
+ counters(name).inc(delta)
+ } // sinon on ne décrémente pas un counter
+
+ case "gauge" =>
+ if (!knownGauges.contains(name)) {
+ registry.register(name, new Gauge[Double] {
+ override def getValue: Double = gauges.getOrElse(name, 0.0)
+ })
+ knownGauges.add(name)
+ }
+ gauges.update(name, value)
+
+ case "timer" =>
+ if (!knownTimers.contains(name)) {
+ timers(name) = registry.timer(name)
+ knownTimers.add(name)
+ }
+ // on suppose que value est en millisecondes
+ timers(name).update(value.toLong, java.util.concurrent.TimeUnit.MILLISECONDS)
+
+ case _ =>
+ throw new IllegalArgumentException(s"Unknown metric type: $metricType")
+ }
+}
+
+def describeMetrics() : String = {
+ val sb = new StringBuilder
+ sb.append(s"Metrics for namespace '${getNamespace()}' and pod '${getPodName()}'\n")
+ sb.append(s"Total gauges: ${gauges.size}\n")
+ gauges.foreach { case (name, value) =>
+ sb.append(s"Gauge: $name = $value\n")
+ }
+ sb.append(s"Total counters: ${counters.size}\n")
+ counters.foreach { case (name, counter) =>
+ sb.append(s"Counter: $name = ${counter.getCount}\n")
+ }
+ sb.append(s"Total timers: ${timers.size}\n")
+ timers.foreach { case (name, timer) =>
+ val snapshot = timer.getSnapshot
+ sb.append(s"Timer: $name = " +
+ s"count=${timer.getCount}\n")
+ }
+ sb.toString()
+}
+
+ private val gauges = scala.collection.concurrent.TrieMap[String, Double]()
+ private val counters = scala.collection.concurrent.TrieMap[String, Counter]()
+ private val timers = scala.collection.concurrent.TrieMap[String, Timer]()
+}
diff --git a/src/main/scala/ch/cern/sparkmeasure/StageMetrics.scala b/src/main/scala/ch/cern/sparkmeasure/StageMetrics.scala
index 8a9b61b..659dfe3 100644
--- a/src/main/scala/ch/cern/sparkmeasure/StageMetrics.scala
+++ b/src/main/scala/ch/cern/sparkmeasure/StageMetrics.scala
@@ -224,8 +224,8 @@ case class StageMetrics(sparkSession: SparkSession) {
// Legacy transformation of data recorded from the custom Stage listener
// into a DataFrame and register it as a view for querying with SQL
def createStageMetricsDF(nameTempView: String = "PerfStageMetrics"): DataFrame = {
- import sparkSession.implicits._
- val resultDF = listenerStage.stageMetricsData.toSeq.toDF()
+ // Convert the ListBuffer of StageVals into a DataFrame
+ val resultDF = sparkSession.createDataFrame(listenerStage.stageMetricsData.toSeq)
resultDF.createOrReplaceTempView(nameTempView)
logger.warn(s"Stage metrics data refreshed into temp view $nameTempView")
resultDF
diff --git a/src/main/scala/ch/cern/sparkmeasure/TaskMetrics.scala b/src/main/scala/ch/cern/sparkmeasure/TaskMetrics.scala
index 7853725..cd63580 100644
--- a/src/main/scala/ch/cern/sparkmeasure/TaskMetrics.scala
+++ b/src/main/scala/ch/cern/sparkmeasure/TaskMetrics.scala
@@ -131,8 +131,8 @@ case class TaskMetrics(sparkSession: SparkSession) {
// Legacy transformation of data recorded from the custom Stage listener
// into a DataFrame and register it as a view for querying with SQL
def createTaskMetricsDF(nameTempView: String = "PerfTaskMetrics"): DataFrame = {
- import sparkSession.implicits._
- val resultDF = listenerTask.taskMetricsData.toSeq.toDF()
+ // Create a DataFrame from the task metrics data
+ val resultDF = sparkSession.createDataFrame(listenerTask.taskMetricsData.toSeq)
resultDF.createOrReplaceTempView(nameTempView)
logger.warn(s"Stage metrics data refreshed into temp view $nameTempView")
resultDF