Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 103 additions & 33 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ on:
- reference-workload-1-client
default: "reference-workload-10-client"
primary_version:
description: "Primary driver version. Leave empty for spring-data-valkey (uses branch HEAD)."
description: "Primary driver version (version string or commit ID). Leave empty to use default."
required: false
type: string
default: ""
Expand All @@ -58,6 +58,26 @@ on:
required: false
type: string
default: ""
network_delay:
description: "Network delay with unit, e.g. '1ms' or '500us' (tc netem). Leave empty to disable."
required: false
type: string
default: ""
network_jitter:
description: "Network jitter with unit, e.g. '1ms' or '100us' (tc netem). Requires network_delay."
required: false
type: string
default: ""
network_delay_distribution:
description: "Jitter distribution (tc netem). Requires jitter. Options: normal, pareto, paretonormal."
required: false
type: choice
options:
- ""
- normal
- pareto
- paretonormal
default: ""

env:
RESP_BENCH_REPO: "https://github.com/ikolomi/resp-bench.git"
Expand Down Expand Up @@ -95,13 +115,36 @@ jobs:
PRIMARY_VERSION="${{ github.event.inputs.primary_version }}"
SECONDARY_VERSION="${{ github.event.inputs.secondary_version }}"
JOB_ID_PREFIX="${{ github.event.inputs.job_id_prefix }}"
NETWORK_DELAY="${{ github.event.inputs.network_delay }}"
NETWORK_JITTER="${{ github.event.inputs.network_jitter }}"
NETWORK_DELAY_DISTRIBUTION="${{ github.event.inputs.network_delay_distribution }}"

# Validate tc netem input combinations
if [ -n "$NETWORK_DELAY_DISTRIBUTION" ] && [ -z "$NETWORK_JITTER" ]; then
echo "ERROR: network_delay_distribution requires network_jitter"
exit 1
fi
if [ -n "$NETWORK_JITTER" ] && [ -z "$NETWORK_DELAY" ]; then
echo "ERROR: network_jitter requires network_delay"
exit 1
fi
# Validate unit format (number followed by ms or us)
for val in "$NETWORK_DELAY" "$NETWORK_JITTER"; do
if [ -n "$val" ] && ! [[ "$val" =~ ^[0-9]+(ms|us)$ ]]; then
echo "ERROR: invalid tc netem value '$val' — expected format: <number>ms or <number>us"
exit 1
fi
done

echo "primary_driver=$PRIMARY_DRIVER" >> $GITHUB_OUTPUT
echo "secondary_driver=$SECONDARY_DRIVER" >> $GITHUB_OUTPUT
echo "workload=$WORKLOAD" >> $GITHUB_OUTPUT
echo "primary_version=$PRIMARY_VERSION" >> $GITHUB_OUTPUT
echo "secondary_version=$SECONDARY_VERSION" >> $GITHUB_OUTPUT
echo "job_id_prefix=$JOB_ID_PREFIX" >> $GITHUB_OUTPUT
echo "network_delay=$NETWORK_DELAY" >> $GITHUB_OUTPUT
echo "network_jitter=$NETWORK_JITTER" >> $GITHUB_OUTPUT
echo "network_delay_distribution=$NETWORK_DELAY_DISTRIBUTION" >> $GITHUB_OUTPUT

echo "========================================"
echo "Primary driver: $PRIMARY_DRIVER"
Expand All @@ -110,6 +153,7 @@ jobs:
echo "Primary version: '${PRIMARY_VERSION}'"
echo "Secondary version: '${SECONDARY_VERSION}'"
echo "Job ID prefix: '${JOB_ID_PREFIX}'"
echo "Network delay: '${NETWORK_DELAY}' jitter='${NETWORK_JITTER}' distribution='${NETWORK_DELAY_DISTRIBUTION}'"
echo "========================================"

- name: Validate configs exist
Expand Down Expand Up @@ -225,11 +269,20 @@ jobs:
echo "driver_id=$DRIVER_ID" >> $GITHUB_OUTPUT
echo "secondary_driver_id=$SECONDARY_DRIVER_ID" >> $GITHUB_OUTPUT

# Check if we need to build spring-data-valkey locally - this is always the case when
# running a spring-data-valkey benchmark
if [ "$DRIVER_ID" = "spring-data-valkey" ]; then
echo "build_sdv=true" >> $GITHUB_OUTPUT
# Check if primary version is a commit ID (7-40 hex chars) or version number
PRIMARY_VERSION="${{ steps.inputs.outputs.primary_version }}"
if [ "$DRIVER_ID" = "spring-data-valkey" ] && [ -n "$PRIMARY_VERSION" ]; then
if [[ "$PRIMARY_VERSION" =~ ^[0-9a-fA-F]{7,40}$ ]]; then
echo "primary_is_commit=true" >> $GITHUB_OUTPUT
echo "build_sdv=true" >> $GITHUB_OUTPUT
echo "Detected: primary_version '$PRIMARY_VERSION' is a commit ID"
else
echo "primary_is_commit=false" >> $GITHUB_OUTPUT
echo "build_sdv=false" >> $GITHUB_OUTPUT
echo "Detected: primary_version '$PRIMARY_VERSION' is a release version"
fi
else
echo "primary_is_commit=false" >> $GITHUB_OUTPUT
echo "build_sdv=false" >> $GITHUB_OUTPUT
fi

Expand Down Expand Up @@ -262,44 +315,36 @@ jobs:
DRIVER_ID="${{ steps.driver-info.outputs.driver_id }}"
SECONDARY_DRIVER_ID="${{ steps.driver-info.outputs.secondary_driver_id }}"

# spring-data-valkey must NOT have explicit primary version (uses branch HEAD)
if [ "$DRIVER_ID" = "spring-data-valkey" ] && [ -n "$PRIMARY_VERSION" ]; then
echo "ERROR: primary_version must be empty for spring-data-valkey (uses branch HEAD automatically)"
exit 1
fi

# Cannot provide secondary version if driver has no secondary driver
if [ -z "$SECONDARY_DRIVER_ID" ] && [ -n "$SECONDARY_VERSION" ]; then
echo "ERROR: secondary_version provided but driver has no secondary driver"
exit 1
fi

# Resolve primary version for spring-data-valkey
if [ "$DRIVER_ID" = "spring-data-valkey" ]; then
PRIMARY_VERSION="${{ github.sha }}"
fi

echo "primary_version=$PRIMARY_VERSION" >> $GITHUB_OUTPUT
echo "secondary_version=$SECONDARY_VERSION" >> $GITHUB_OUTPUT

- name: Build spring-data-valkey
- name: Build spring-data-valkey from source
id: build-sdv
if: steps.driver-info.outputs.build_sdv == 'true'
run: |
echo "Building spring-data-valkey from current branch..."
COMMIT_ID="${{ steps.versions.outputs.primary_version }}"
echo "Building spring-data-valkey from commit: $COMMIT_ID"

# Get base version and short commit ID
BASE_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
COMMIT_SHORT="${GITHUB_SHA:0:7}"
SDV_VERSION="${BASE_VERSION}-${COMMIT_SHORT}"
# Clone to a separate directory so the main workspace (with orchestrator,
# configs, etc.) is preserved — allows building commits older than the benchmark infra
git clone ${{ github.server_url }}/${{ github.repository }}.git /tmp/spring-data-valkey
cd /tmp/spring-data-valkey
git checkout "$COMMIT_ID"

echo "Base version: $BASE_VERSION"
echo "Commit: $COMMIT_SHORT"
# Use commit ID as version for local Maven
SDV_VERSION="${COMMIT_ID:0:8}-SNAPSHOT"
echo "Build version: $SDV_VERSION"

# Set version to include commit ID
mvn versions:set -DnewVersion="$SDV_VERSION" -DgenerateBackupPoms=false -q

# Build and install
# Build and install to local Maven repository
mvn install -DskipTests -Dmaven.compiler.source=17 -Dmaven.compiler.target=17 -Dgpg.skip=true -q

echo "✓ Built spring-data-valkey version: $SDV_VERSION"
Expand Down Expand Up @@ -369,11 +414,20 @@ jobs:
SECONDARY_VERSION="${{ steps.versions.outputs.secondary_version }}"
SECONDARY_IS_COMMIT="${{ steps.driver-info.outputs.secondary_is_commit }}"

# Update spring-data-valkey version if it was built locally
if [ "$DRIVER_ID" = "spring-data-valkey" ]; then
SDV_VERSION="${{ steps.build-sdv.outputs.version }}"
echo "Updating spring-data-valkey to $SDV_VERSION"
sed -i "s|<spring-data-valkey.version>.*</spring-data-valkey.version>|<spring-data-valkey.version>$SDV_VERSION</spring-data-valkey.version>|" java/pom.xml
# Update spring-data-valkey version based on primary_version input
PRIMARY_VERSION="${{ steps.versions.outputs.primary_version }}"
PRIMARY_IS_COMMIT="${{ steps.driver-info.outputs.primary_is_commit }}"

if [ "$DRIVER_ID" = "spring-data-valkey" ] && [ -n "$PRIMARY_VERSION" ]; then
if [ "$PRIMARY_IS_COMMIT" = "true" ]; then
# Use the version from the local build
SDV_VERSION="${{ steps.build-sdv.outputs.version }}"
echo "Updating spring-data-valkey to locally built $SDV_VERSION"
sed -i "s|<spring-data-valkey.version>.*</spring-data-valkey.version>|<spring-data-valkey.version>$SDV_VERSION</spring-data-valkey.version>|" java/pom.xml
else
echo "Updating spring-data-valkey to release version $PRIMARY_VERSION"
sed -i "s|<spring-data-valkey.version>.*</spring-data-valkey.version>|<spring-data-valkey.version>$PRIMARY_VERSION</spring-data-valkey.version>|" java/pom.xml
fi
fi

# Update secondary driver version if a version/commit-id for it was supplied. Otherwise use defaults.
Expand Down Expand Up @@ -442,6 +496,15 @@ jobs:
PREFIX_ARG=""
[ -n "$JOB_ID_PREFIX" ] && PREFIX_ARG="--job-id-prefix $JOB_ID_PREFIX"

NETWORK_DELAY="${{ steps.inputs.outputs.network_delay }}"
NETWORK_JITTER="${{ steps.inputs.outputs.network_jitter }}"
NETWORK_DELAY_DISTRIBUTION="${{ steps.inputs.outputs.network_delay_distribution }}"

TC_ARGS=""
[ -n "$NETWORK_DELAY" ] && TC_ARGS="$TC_ARGS --network-delay $NETWORK_DELAY"
[ -n "$NETWORK_JITTER" ] && TC_ARGS="$TC_ARGS --network-jitter $NETWORK_JITTER"
[ -n "$NETWORK_DELAY_DISTRIBUTION" ] && TC_ARGS="$TC_ARGS --network-delay-distribution $NETWORK_DELAY_DISTRIBUTION"

python3 -u .github/workflows/benchmark_orchestrator.py \
--output "benchmark_results_${DRIVER}_${WORKLOAD}.json" \
--workload-config "$WORKLOAD_CONFIG" \
Expand All @@ -451,7 +514,7 @@ jobs:
--s3-bucket "${{ secrets.BENCHMARK_S3_BUCKET }}" \
--pg-host "${{ secrets.BENCHMARK_PG_HOST }}" \
--pg-secret-name "${{ secrets.BENCHMARK_PG_SECRET_NAME }}" \
$PREFIX_ARG
$PREFIX_ARG $TC_ARGS

- name: Display results summary
if: always()
Expand Down Expand Up @@ -530,7 +593,9 @@ jobs:
driver = data["config"]["driver"]["driver_id"]
workload_profile = data["config"]["workload"]["benchmark_profile"]["name"]
elapsed = data["results"]["elapsed_ms"]
network_delay = data["results"].get("network_delay_ms", 1)
network_delay = data["results"].get("network_delay", "")
network_jitter = data["results"].get("network_jitter", "")
network_distribution = data["results"].get("network_delay_distribution", "")
pv = versions.get("primary_driver_version") or "N/A"
sv = versions.get("secondary_driver_version")
sid = versions.get("secondary_driver_id")
Expand All @@ -545,7 +610,12 @@ jobs:
print(f"**resp-bench:** \`{commit_id}\`")
print(f"**Workload config:** \`${WORKLOAD}\` ({workload_profile})")
print(f"**Driver config:** \`${DRIVER}\`")
print(f"**Simulated network delay:** {network_delay}ms")
delay_str = network_delay or "disabled"
if network_jitter:
delay_str += f" jitter {network_jitter}"
if network_distribution:
delay_str += f" distribution {network_distribution}"
print(f"**Simulated network delay:** {delay_str}")
print(f"**Elapsed:** {elapsed}ms")
print()

Expand Down
56 changes: 42 additions & 14 deletions .github/workflows/benchmark_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,13 @@ def __init__(self):
self.nmi_watchdog_original = None
self.smt_original = None

def setup(self, network_delay_ms: int = 1):
def setup(self, network_delay: str = "", network_jitter: str = "",
network_delay_distribution: str = ""):
print("Setting up variance control...")
self._disable_smt()
self._disable_turbo_boost()
self._setup_network_delay(network_delay_ms)
self._setup_network_delay(network_delay, network_jitter,
network_delay_distribution)
self._disable_nmi_watchdog()
self._set_perf_permissions()
print("Variance control setup complete")
Expand Down Expand Up @@ -279,17 +281,28 @@ def _restore_smt(self):
except Exception as e:
print(f" ⚠ Could not restore SMT: {e}")

def _setup_network_delay(self, delay_ms: int):
def _setup_network_delay(self, delay: str, jitter: str = "",
distribution: str = ""):
if not delay:
return
try:
subprocess.run(["sudo", "tc", "qdisc", "del", "dev", "lo", "root"],
capture_output=True)
result = subprocess.run(
["sudo", "tc", "qdisc", "add", "dev", "lo", "root", "netem",
"delay", f"{delay_ms}ms"],
capture_output=True, text=True)
cmd = ["sudo", "tc", "qdisc", "add", "dev", "lo", "root", "netem",
"delay", delay]
if jitter:
cmd.append(jitter)
if distribution:
cmd.extend(["distribution", distribution])
desc = delay
if jitter:
desc += f" jitter {jitter}"
if distribution:
desc += f" distribution {distribution}"
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
self.tc_configured = True
print(f" ✓ Network delay configured: {delay_ms}ms on loopback")
print(f" ✓ Network delay configured: {desc} on loopback")
else:
print(f" ⚠ Could not configure network delay: {result.stderr}")
except Exception as e:
Expand Down Expand Up @@ -891,7 +904,8 @@ def __init__(self, resp_bench_dir: Path, resp_bench_commit: str, output_file: Pa
workload_config_path: Path, driver_config_path: Path,
s3_bucket: str,
job_id_prefix: str = "",
skip_infra: bool = False, network_delay_ms: int = 1,
skip_infra: bool = False, network_delay: str = "",
network_jitter: str = "", network_delay_distribution: str = "",
publish_to_db: bool = True, pg_host: str = None,
pg_port: int = 5432, pg_database: str = "postgres",
pg_secret_name: str = None):
Expand All @@ -903,7 +917,9 @@ def __init__(self, resp_bench_dir: Path, resp_bench_commit: str, output_file: Pa
self.workload_config = load_json_config(workload_config_path)
self.driver_config = load_json_config(driver_config_path)
self.skip_infra = skip_infra
self.network_delay_ms = network_delay_ms
self.network_delay = network_delay
self.network_jitter = network_jitter
self.network_delay_distribution = network_delay_distribution
self.publish_to_db = publish_to_db
self.job_id = generate_job_id(prefix=job_id_prefix)
self.timestamp = get_timestamp()
Expand All @@ -916,7 +932,10 @@ def __init__(self, resp_bench_dir: Path, resp_bench_commit: str, output_file: Pa

# Apply variance control
self.variance_control = VarianceControl()
self.variance_control.setup(network_delay_ms=network_delay_ms)
self.variance_control.setup(
network_delay=network_delay,
network_jitter=network_jitter,
network_delay_distribution=network_delay_distribution)

# Detect NUMA topology and allocate cores accordingly
self._setup_numa_aware_cores()
Expand Down Expand Up @@ -1295,7 +1314,9 @@ def run(self):

results = {
"elapsed_ms": elapsed_ms,
"network_delay_ms": self.network_delay_ms,
"network_delay": self.network_delay,
"network_jitter": self.network_jitter,
"network_delay_distribution": self.network_delay_distribution,
"phases": all_phases,
"perf": {
"counters": perf_counters,
Expand Down Expand Up @@ -1342,7 +1363,12 @@ def main():
parser.add_argument("--resp-bench-commit", type=str, required=True,
help="Git commit ID of the resp-bench repository")
parser.add_argument("--skip-infra", action="store_true")
parser.add_argument("--network-delay-ms", type=int, default=0)
parser.add_argument("--network-delay", type=str, default="",
help="Network delay with unit, e.g. '1ms' or '500us'")
parser.add_argument("--network-jitter", type=str, default="",
help="Network jitter with unit, e.g. '1ms' or '100us'")
parser.add_argument("--network-delay-distribution", type=str, default="",
choices=["", "normal", "pareto", "paretonormal"])
parser.add_argument("--job-id-prefix", type=str, default="",
help="Optional prefix for the job ID "
"(e.g., 'regression', 'nightly', 'pr-123')")
Expand Down Expand Up @@ -1376,7 +1402,9 @@ def main():
s3_bucket=args.s3_bucket,
job_id_prefix=args.job_id_prefix,
skip_infra=args.skip_infra,
network_delay_ms=args.network_delay_ms,
network_delay=args.network_delay,
network_jitter=args.network_jitter,
network_delay_distribution=args.network_delay_distribution,
publish_to_db=not args.no_publish,
pg_host=args.pg_host,
pg_port=args.pg_port,
Expand Down
Loading