This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| name: Pipeline Performance Tests - Continuous | |
| permissions: | |
| contents: read | |
| on: | |
| push: | |
| branches: [ main ] | |
| pull_request: | |
| branches: [ main ] | |
| workflow_dispatch: | |
| jobs: | |
| pipeline-perf-test: | |
| runs-on: ubuntu-latest | |
| steps: | |
| - name: Harden the runner (Audit all outbound calls) | |
| uses: step-security/harden-runner@95d9a5deda9de15063e7595e9719c11c38c90ae2 # v2.13.2 | |
| with: | |
| egress-policy: audit | |
| - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 | |
| - id: detect_os | |
| name: Detect OS (self-hosted) | |
| shell: bash | |
| run: | | |
| . /etc/os-release | |
| echo "id=$ID" >> "$GITHUB_OUTPUT" | |
| echo "version_id=$VERSION_ID" >> "$GITHUB_OUTPUT" | |
| - name: Install/prepare Python 3.11 on Oracle Linux 8 (self-hosted only) | |
| if: ${{ runner.os == 'Linux' && steps.detect_os.outputs.id == 'ol' && startsWith(steps.detect_os.outputs.version_id, '8') }} | |
| shell: bash | |
| run: | | |
| set -euxo pipefail | |
| # Run disk cleanup script | |
| bash ./.github/workflows/scripts/disk-cleanup.sh | |
| # Check if Python 3.11 is already installed to avoid unnecessary dnf operations | |
| if rpm -q python3.11 python3.11-devel &>/dev/null; then | |
| echo "Python 3.11 packages already installed, skipping repository setup and package installation" | |
| else | |
| # Enable repos and refresh metadata (only runs if Python not installed) | |
| sudo dnf -y install oracle-epel-release-el8 || true | |
| sudo dnf -y config-manager --set-enabled ol8_codeready_builder || true | |
| sudo dnf -y makecache --timer | |
| # Install core packages (ignore subpackages that don't exist on OL8) | |
| sudo dnf -y install python3.11 python3.11-devel || true | |
| fi | |
| # pip may be packaged as part of python3.11 or available via ensurepip; | |
| # fall back to ensurepip if needed. | |
| if ! /usr/bin/python3.11 -m pip --version >/dev/null 2>&1; then | |
| /usr/bin/python3.11 -m ensurepip --upgrade || true | |
| fi | |
| # Only upgrade pip/setuptools/wheel if outdated (skip if already at latest) | |
| /usr/bin/python3.11 -m pip install --upgrade --upgrade-strategy only-if-needed pip setuptools wheel || true | |
| # Add a 'python' shim so subsequent steps can call `python ...` | |
| mkdir -p "$HOME/.local/bin" | |
| ln -sf /usr/bin/python3.11 "$HOME/.local/bin/python" | |
| echo "$HOME/.local/bin" >> "$GITHUB_PATH" | |
| # Smoke test | |
| python --version | |
| python -m pip --version | |
| - name: Build dataflow_engine | |
| run: | | |
| git submodule init | |
| git submodule update | |
| cd rust/otap-dataflow | |
| docker build --build-context otel-arrow=../../ -f Dockerfile -t df_engine . | |
| cd ../.. | |
| - name: Install dependencies | |
| run: | | |
| python -m pip install --user --upgrade pip | |
| pip install --user -r tools/pipeline_perf_test/orchestrator/requirements.txt | |
| pip install --user -r tools/pipeline_perf_test/load_generator/requirements.txt | |
| - name: Run pipeline performance test log suite | |
| run: | | |
| cd tools/pipeline_perf_test | |
| python orchestrator/run_orchestrator.py --debug --config test_suites/integration/continuous/100klrps-docker.yaml | |
| - name: Run pipeline performance test - Saturation 1 Core | |
| run: | | |
| cd tools/pipeline_perf_test | |
| python orchestrator/run_orchestrator.py --debug --config test_suites/integration/continuous/saturation-1core.yaml | |
| - name: Run pipeline performance test - Saturation 2 Cores | |
| run: | | |
| cd tools/pipeline_perf_test | |
| python orchestrator/run_orchestrator.py --debug --config test_suites/integration/continuous/saturation-2cores.yaml | |
| - name: Run pipeline performance test - Saturation 4 Cores | |
| run: | | |
| cd tools/pipeline_perf_test | |
| python orchestrator/run_orchestrator.py --debug --config test_suites/integration/continuous/saturation-4cores.yaml | |
| - name: Run pipeline performance test - Saturation 8 Cores | |
| run: | | |
| cd tools/pipeline_perf_test | |
| python orchestrator/run_orchestrator.py --debug --config test_suites/integration/continuous/saturation-8cores.yaml | |
| - name: Upload benchmark results for processing | |
| uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 | |
| with: | |
| name: benchmark-results-pipeline | |
| path: tools/pipeline_perf_test/results/integration/gh-actions-benchmark/*.json | |
| - name: Upload benchmark results for processing (Saturation) | |
| uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 | |
| with: | |
| name: benchmark-results-saturation | |
| path: tools/pipeline_perf_test/results/continuous_saturation_*core/gh-actions-benchmark/*.json | |
| - name: Add benchmark link to job summary | |
| run: | | |
| echo "### Benchmark Results" >> $GITHUB_STEP_SUMMARY | |
| echo "[View the benchmark results here](https://open-telemetry.github.io/otel-arrow/benchmarks/continuous/)" >> $GITHUB_STEP_SUMMARY | |
| update-benchmarks: | |
| runs-on: ubuntu-24.04 | |
| needs: [pipeline-perf-test] | |
| permissions: | |
| # deployments permission to deploy GitHub pages website | |
| deployments: write | |
| # contents permission to update benchmark contents in gh-pages branch | |
| contents: write | |
| steps: | |
| - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 | |
| - name: Download benchmark artifacts | |
| uses: actions/download-artifact@634f93cb2916e3fdff6788551b99b062d0335ce0 # v5.0.0 | |
| with: | |
| pattern: benchmark-results-pipeline* | |
| merge-multiple: true | |
| path: results-pipeline | |
| - name: Download benchmark artifacts (Saturation) | |
| uses: actions/download-artifact@634f93cb2916e3fdff6788551b99b062d0335ce0 # v5.0.0 | |
| with: | |
| pattern: benchmark-results-saturation* | |
| merge-multiple: true | |
| path: results-saturation | |
| - name: Consolidate pipeline benchmark data | |
| run: | | |
| echo "Consolidating pipeline benchmark JSON files..." | |
| find results-pipeline -name "*.json" -type f | while read file; do | |
| echo "Processing: $file" | |
| cat "$file" | |
| echo | |
| done | |
| # Combine all benchmark JSON files into a single output (find them recursively) | |
| find results-pipeline -name "*.json" -type f -exec cat {} \; | jq -s 'map(.[])' > output-pipeline.json | |
| echo "Consolidated pipeline benchmark data:" | |
| cat output-pipeline.json | |
| - name: Consolidate saturation benchmark data | |
| run: | | |
| echo "Consolidating saturation benchmark JSON files..." | |
| find results-saturation -name "*.json" -type f | while read file; do | |
| echo "Processing: $file" | |
| cat "$file" | |
| echo | |
| done | |
| # Combine all benchmark JSON files into a single output (find them recursively) | |
| find results-saturation -name "*.json" -type f -exec cat {} \; | jq -s 'map(.[])' > output-saturation.json | |
| echo "Consolidated saturation benchmark data:" | |
| cat output-saturation.json | |
| - name: Compute scaling efficiency metrics | |
| run: | | |
| python3 << 'EOF' | |
| import json | |
| with open('output-saturation.json') as f: | |
| data = json.load(f) | |
| # Group by cores and protocol - collect throughput and CPU | |
| throughput = {} | |
| cpu_norm = {} | |
| for entry in data: | |
| extra = entry['extra'] | |
| parts = extra.split(' - ') | |
| if len(parts) < 3: | |
| continue | |
| cores = parts[2].split('/')[0].split()[0] | |
| protocol = extra.split('/')[1].split(' - ')[0] if '/' in extra else 'unknown' | |
| key = f"{cores}core-{protocol}" | |
| if entry['name'] == 'logs_received_rate': | |
| throughput[key] = entry['value'] | |
| elif entry['name'] == 'cpu_percentage_normalized_avg': | |
| cpu_norm[key] = entry['value'] | |
| # Calculate scaling metrics | |
| scaling_metrics = [] | |
| for protocol in ['OTLP-ATTR-OTLP', 'OTAP-ATTR-OTLP']: | |
| baseline = throughput.get(f"1core-{protocol}", 1) | |
| for cores in ['1', '2', '4', '8']: | |
| key = f"{cores}core-{protocol}" | |
| if key in throughput: | |
| cores_int = int(cores) | |
| actual_speedup = throughput[key] / baseline if baseline > 0 else 0 | |
| ideal_speedup = cores_int | |
| efficiency = (actual_speedup / ideal_speedup) * 100 if ideal_speedup > 0 else 0 | |
| per_core = throughput[key] / cores_int | |
| # Add efficiency metric (higher is better) | |
| scaling_metrics.append({ | |
| "name": "scaling_efficiency", | |
| "unit": "%", | |
| "value": efficiency, | |
| "extra": f"Continuous - Saturation - {cores} Core(s)/{protocol} - Scaling Efficiency" | |
| }) | |
| # Add per-core throughput (should remain constant for linear scaling) | |
| scaling_metrics.append({ | |
| "name": "per_core_throughput", | |
| "unit": "logs/sec/core", | |
| "value": per_core, | |
| "extra": f"Continuous - Saturation - {cores} Core(s)/{protocol} - Per-Core Throughput" | |
| }) | |
| # Add speedup metric | |
| scaling_metrics.append({ | |
| "name": "speedup", | |
| "unit": "x", | |
| "value": actual_speedup, | |
| "extra": f"Continuous - Saturation - {cores} Core(s)/{protocol} - Speedup vs 1-core" | |
| }) | |
| # Merge with original data | |
| data.extend(scaling_metrics) | |
| with open('output-saturation.json', 'w') as f: | |
| json.dump(data, f, indent=2) | |
| print(f"Added {len(scaling_metrics)} scaling metrics") | |
| EOF | |
| - name: Generate scaling analysis summary | |
| run: | | |
| python3 << 'EOF' | |
| import json | |
| import os | |
| with open('output-saturation.json') as f: | |
| data = json.load(f) | |
| # Group metrics by configuration | |
| metrics = {} | |
| for entry in data: | |
| extra = entry['extra'] | |
| parts = extra.split(' - ') | |
| if len(parts) < 3: | |
| continue | |
| cores = parts[2].split('/')[0].split()[0] | |
| protocol = extra.split('/')[1].split(' - ')[0] if '/' in extra else 'unknown' | |
| key = f"{cores}core-{protocol}" | |
| if key not in metrics: | |
| metrics[key] = {} | |
| name = entry['name'] | |
| if name == 'logs_received_rate': | |
| metrics[key]['throughput'] = entry['value'] | |
| elif name == 'cpu_percentage_normalized_avg': | |
| metrics[key]['cpu'] = entry['value'] | |
| elif name == 'speedup': | |
| metrics[key]['speedup'] = entry['value'] | |
| elif name == 'scaling_efficiency': | |
| metrics[key]['efficiency'] = entry['value'] | |
| elif name == 'dropped_logs_percentage': | |
| metrics[key]['dropped'] = entry['value'] | |
| # Write to GitHub Step Summary | |
| with open(os.environ['GITHUB_STEP_SUMMARY'], 'a') as f: | |
| f.write("\n## 🚀 Core Scaling Analysis\n\n") | |
| f.write("### OTLP Protocol\n\n") | |
| f.write("| Cores | Throughput (logs/s) | Speedup | Efficiency | CPU % | Dropped % |\n") | |
| f.write("|-------|--------------------:|--------:|-----------:|------:|----------:|\n") | |
| for cores in ['1', '2', '4', '8']: | |
| key = f"{cores}core-OTLP-ATTR-OTLP" | |
| if key in metrics and 'throughput' in metrics[key]: | |
| m = metrics[key] | |
| speedup = m.get('speedup', 0) | |
| efficiency = m.get('efficiency', 0) | |
| cpu = m.get('cpu', 0) | |
| throughput = m.get('throughput', 0) | |
| dropped = m.get('dropped', 0) | |
| # Add status emoji | |
| eff_emoji = "🟢" if efficiency >= 80 else "🟡" if efficiency >= 60 else "🔴" | |
| cpu_emoji = "✅" if cpu >= 90 else "⚠️" if cpu >= 70 else "❌" | |
| f.write(f"| {cores} {eff_emoji} | {throughput:>15,.0f} | {speedup:>5.2f}x | {efficiency:>8.1f}% | {cpu:>4.1f}% {cpu_emoji} | {dropped:>6.2f}% |\n") | |
| f.write("\n### OTAP Protocol\n\n") | |
| f.write("| Cores | Throughput (logs/s) | Speedup | Efficiency | CPU % | Dropped % |\n") | |
| f.write("|-------|--------------------:|--------:|-----------:|------:|----------:|\n") | |
| for cores in ['1', '2', '4', '8']: | |
| key = f"{cores}core-OTAP-ATTR-OTLP" | |
| if key in metrics and 'throughput' in metrics[key]: | |
| m = metrics[key] | |
| speedup = m.get('speedup', 0) | |
| efficiency = m.get('efficiency', 0) | |
| cpu = m.get('cpu', 0) | |
| throughput = m.get('throughput', 0) | |
| dropped = m.get('dropped', 0) | |
| # Add status emoji | |
| eff_emoji = "🟢" if efficiency >= 80 else "🟡" if efficiency >= 60 else "🔴" | |
| cpu_emoji = "✅" if cpu >= 90 else "⚠️" if cpu >= 70 else "❌" | |
| f.write(f"| {cores} {eff_emoji} | {throughput:>15,.0f} | {speedup:>5.2f}x | {efficiency:>8.1f}% | {cpu:>4.1f}% {cpu_emoji} | {dropped:>6.2f}% |\n") | |
| f.write("\n**Legend:**\n") | |
| f.write("- 🟢 Efficiency ≥80% | 🟡 60-80% | 🔴 <60%\n") | |
| f.write("- ✅ CPU ≥90% (saturated) | ⚠️ 70-90% | ❌ <70% (under-utilized)\n") | |
| f.write("- **Ideal Linear Scaling:** Efficiency = 100%, Speedup = # of cores\n\n") | |
| EOF | |
| - name: Update pipeline benchmark data and deploy to GitHub Pages | |
| uses: benchmark-action/github-action-benchmark@d48d326b4ca9ba73ca0cd0d59f108f9e02a381c7 | |
| with: | |
| tool: "customSmallerIsBetter" | |
| output-file-path: output-pipeline.json | |
| gh-pages-branch: benchmarks | |
| max-items-in-chart: 100 | |
| github-token: ${{ secrets.GITHUB_TOKEN }} | |
| benchmark-data-dir-path: "docs/benchmarks/continuous" | |
| auto-push: true | |
| save-data-file: true | |
| - name: Update saturation benchmark data and deploy to GitHub Pages | |
| uses: benchmark-action/github-action-benchmark@d48d326b4ca9ba73ca0cd0d59f108f9e02a381c7 | |
| with: | |
| tool: "customSmallerIsBetter" | |
| output-file-path: output-saturation.json | |
| gh-pages-branch: benchmarks | |
| max-items-in-chart: 100 | |
| github-token: ${{ secrets.GITHUB_TOKEN }} | |
| benchmark-data-dir-path: "docs/benchmarks/continuous-saturation" | |
| auto-push: true | |
| save-data-file: true | |
| - name: Add benchmark link to job summary | |
| run: | | |
| echo "### Benchmark Results" >> $GITHUB_STEP_SUMMARY | |
| echo "[View the pipeline benchmark results here](https://open-telemetry.github.io/otel-arrow/benchmarks/continuous/)" >> $GITHUB_STEP_SUMMARY | |
| echo "[View the saturation benchmark results here](https://open-telemetry.github.io/otel-arrow/benchmarks/continuous-saturation/)" >> $GITHUB_STEP_SUMMARY |