Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions distributed-micro-benchmark/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Distributed micro-benchmarking helpers
103 changes: 103 additions & 0 deletions distributed-micro-benchmark/helpers/gcloud_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Unified gcloud command execution utilities"""

import subprocess
import time


def run_gcloud_command(cmd, retries=1, retry_delay=2, check=False, capture_output=True, text=True, **kwargs):
"""Execute a gcloud command with optional retry logic.

Args:
cmd: List of command components (e.g., ['gcloud', 'storage', 'cp', ...])
retries: Number of retry attempts (1 = no retry, 3 = try 3 times)
retry_delay: Seconds to wait between retries
check: If True, raise CalledProcessError on non-zero exit code
capture_output: If True, capture stdout/stderr
text: If True, decode output as text
**kwargs: Additional arguments passed to subprocess.run()

Returns:
subprocess.CompletedProcess object

Raises:
Exception: If command fails after all retries and check=True
"""
for attempt in range(retries):
result = subprocess.run(cmd, capture_output=capture_output, text=text, **kwargs)

if result.returncode == 0:
return result

if attempt < retries - 1:
time.sleep(retry_delay)

if check:
raise subprocess.CalledProcessError(result.returncode, cmd, output=result.stdout, stderr=result.stderr)

return result


def gcloud_storage_cp(source, dest, recursive=False, retries=3, check=True):
"""Copy files to/from GCS"""
cmd = ['gcloud', 'storage', 'cp']
if recursive:
cmd.append('-r')
cmd.extend([source, dest])

return run_gcloud_command(cmd, retries=retries, check=check)


def gcloud_storage_ls(pattern, check=False):
"""List GCS objects matching a pattern"""
cmd = ['gcloud', 'storage', 'ls', pattern]
return run_gcloud_command(cmd, retries=1, check=check)


def gcloud_compute_ssh(vm_name, zone, project, command=None, internal_ip=True, check=True, **kwargs):
"""SSH to a compute instance"""
cmd = ['gcloud', 'compute', 'ssh', vm_name, f'--zone={zone}', f'--project={project}']
if internal_ip:
cmd.append('--internal-ip')
if command:
cmd.extend(['--command', command])

return run_gcloud_command(cmd, retries=1, check=check, **kwargs)


def gcloud_compute_scp(source, dest, zone, project, internal_ip=True, check=True):
"""Copy files to/from a compute instance"""
cmd = ['gcloud', 'compute', 'scp', source, dest, f'--zone={zone}', f'--project={project}']
if internal_ip:
cmd.append('--internal-ip')

return run_gcloud_command(cmd, retries=1, check=check)


def gcloud_compute_instance_group_list(instance_group, zone, project, filter_status='RUNNING'):
"""List VM names in a managed instance group"""
cmd = [
'gcloud', 'compute', 'instance-groups', 'managed', 'list-instances',
instance_group,
f'--zone={zone}',
f'--project={project}',
f'--filter=STATUS={filter_status}',
'--format=value(NAME)'
]

result = run_gcloud_command(cmd, retries=1, check=True)
vms = [vm.strip() for vm in result.stdout.strip().split('\n') if vm.strip()]
return vms
85 changes: 85 additions & 0 deletions distributed-micro-benchmark/helpers/gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""GCS operations for distributed benchmarking"""

import os
import json
import tempfile
from . import gcloud_utils


def upload_json(data, gcs_path):
"""Upload JSON data to GCS with retry on failure"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=True) as f:
json.dump(data, f, indent=2)
f.flush()

try:
gcloud_utils.gcloud_storage_cp(f.name, gcs_path, retries=3, check=True)
except Exception as e:
raise RunTimeError(f"Failed to upload to {gcs_path} after 3 attempts: {e}")


def download_json(gcs_path):
"""Download and parse JSON from GCS"""
with tempfile.NamedTemporaryFile(mode='r', suffix='.json', delete=True) as f:
result = gcloud_utils.gcloud_storage_cp(gcs_path, f.name, retries=1, check=False)

if result.returncode != 0:
return None

with open(f.name, 'r') as rf:
return json.load(rf)


def upload_test_cases(csv_path, base_path):
"""Upload test cases CSV to GCS"""
if not os.path.exists(csv_path):
raise FileNotFoundError(f"Test cases file not found: {csv_path}")

dest = f"{base_path}/test-cases.csv"
gcloud_utils.gcloud_storage_cp(csv_path, dest, retries=1, check=True)


def upload_fio_job_file(fio_path, base_path):
"""Upload FIO job template to GCS"""
if not os.path.exists(fio_path):
raise FileNotFoundError(f"FIO job file not found: {fio_path}")

dest = f"{base_path}/jobfile.fio"
gcloud_utils.gcloud_storage_cp(fio_path, dest, retries=1, check=True)


def list_manifests(benchmark_id, artifacts_bucket):
"""List all manifest files for a benchmark"""
pattern = f"gs://{artifacts_bucket}/{benchmark_id}/results/*/manifest.json"
result = gcloud_utils.gcloud_storage_ls(pattern, check=False)

if result.returncode != 0:
return []

return [line.strip() for line in result.stdout.strip().split('\n') if line.strip()]


def download_directory(gcs_path, local_path):
"""Download a directory from GCS"""
gcloud_utils.gcloud_storage_cp(gcs_path, local_path, recursive=True, retries=1, check=True)


def check_cancellation(benchmark_id, artifacts_bucket):
"""Check if cancellation flag exists in GCS"""
cancel_path = f"gs://{artifacts_bucket}/{benchmark_id}/cancel"
result = gcloud_utils.gcloud_storage_ls(cancel_path, check=False)
return result.returncode == 0
102 changes: 102 additions & 0 deletions distributed-micro-benchmark/helpers/job_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Job specification generation and test distribution"""

import csv


def _load_csv_with_id(csv_path, id_field):
"""Generic CSV loader that adds sequential ID field"""
items = []
with open(csv_path, 'r') as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader, start=1):
row[id_field] = i
items.append(row)
return items


def load_test_cases(csv_path):
"""Load test cases from CSV file"""
return _load_csv_with_id(csv_path, 'test_id')


def load_configs(csv_path):
"""Load config variations from CSV file"""
return _load_csv_with_id(csv_path, 'config_id')


def generate_test_matrix(test_cases, configs):
"""Generate cartesian product of configs × test_cases"""
test_matrix = []
matrix_id = 1

for config in configs:
for test_case in test_cases:
# Spread test_case first, then override with matrix-specific IDs
matrix_entry = {
**test_case,
'matrix_id': matrix_id,
'config_id': config['config_id'],
'test_id': test_case['test_id'],
'commit': config['commit'],
'mount_args': config['mount_args'],
'config_label': config['label']
}
test_matrix.append(matrix_entry)
matrix_id += 1

return test_matrix


def distribute_tests(test_cases, vms, is_matrix=False):
# TODO: Add is_matrix case!
"""Distribute test cases evenly across VMs"""
num_vms = len(vms)
tests_per_vm = len(test_cases) // num_vms
remaining = len(test_cases) % num_vms

distribution = {}
start_idx = 0

for i, vm in enumerate(vms):
count = tests_per_vm + (1 if i < remaining else 0)
end_idx = start_idx + count
distribution[vm] = test_cases[start_idx:end_idx]
start_idx = end_idx

return distribution


def create_job_spec(vm_name, benchmark_id, test_entries, bucket, artifacts_bucket, iterations, mode="single-config"):
"""Create job specification for a VM"""
total_tests = len(test_entries)

job_spec = {
"vm_name": vm_name,
"benchmark_id": benchmark_id,
"bucket": bucket,
"artifacts_bucket": artifacts_bucket,
"iterations": iterations,
"total_tests": total_tests,
"total_runs": total_tests * iterations,
}

if mode == "single-config":
job_spec['test_ids'] = [entry['test_id'] for entry in test_entries]
else:
job_spec['test_entries'] = test_entries

return job_spec
Loading