Skip to content

Add asv benchmarks for Azure Storage #2389

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/analysis_workflow.yml
Original file line number Diff line number Diff line change
@@ -170,6 +170,7 @@ jobs:
uses: mamba-org/setup-micromamba@v2
with:
micromamba-version: 2.1.0-0
init-shell: bash cmd.exe

- name: Install libmambapy
shell: bash -el {0}
@@ -185,10 +186,12 @@ jobs:
asv machine --yes

- name: Build project for ASV
shell: bash -el {0}
run: |
python -m pip install -ve .[Testing]

- name: Run ASV Tests Check script
shell: bash -el {0}
run: |
python python/utils/asv_checks.py
continue-on-error: false
9 changes: 9 additions & 0 deletions python/arcticdb/util/environment_setup.py
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@
## Amazon s3 storage bucket dedicated for ASV performance tests
AWS_S3_DEFAULT_BUCKET = 'arcticdb-asv-real-storage'
GCP_S3_DEFAULT_BUCKET = 'arcticdb-asv-real-storage'
AZURE_BLOB_STORAGE_DEFAULT_CONTAINER = 'arcticdb-asv-real-storage'


class GitHubSanitizingHandler(logging.StreamHandler):
@@ -85,6 +86,7 @@ class Storage(Enum):
AMAZON = 1
LMDB = 2
GOOGLE = 3
AZURE = 4


class StorageSpace(Enum):
@@ -138,6 +140,11 @@ def __new__(cls, *args, **kwargs):
cls._gcp_secret = os.getenv("ARCTICDB_REAL_GCP_SECRET_KEY")
cls._gcp_access = os.getenv("ARCTICDB_REAL_GCP_ACCESS_KEY")
cls._gcp_bucket = GCP_S3_DEFAULT_BUCKET

# Azure initialization
cls._azure_account_name = os.getenv("ARCTICDB_REAL_AZURE_ACCOUNT_NAME")
cls._azure_account_key = os.getenv("ARCTICDB_REAL_AZURE_ACCOUNT_KEY")
cls._azure_container = AZURE_BLOB_STORAGE_DEFAULT_CONTAINER

@classmethod
def get_machine_id(cls):
@@ -183,6 +190,8 @@ def get_arctic_uri(cls, storage: Storage, storage_space: StorageSpace, add_to_pr
s = cls._gcp_secret
a = cls._gcp_access
return f"gcpxml://storage.googleapis.com:{cls._gcp_bucket}?access={a}&secret={s}&path_prefix={prefix}"
elif storage == Storage.AZURE:
return f"azure://{AZURE_BLOB_STORAGE_DEFAULT_CONTAINER}?account_name={cls._azure_account_name}&account_key={cls._azure_account_key}&path_prefix={prefix}"
else:
raise Exception("Unsupported storage type :", storage)

175 changes: 175 additions & 0 deletions python/benchmarks/real_azure_read_write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
"""
Copyright 2024 Man Group Operations Limited

Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.

As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""

import time
from arcticdb.util.utils import DFGenerator
from arcticdb.util.environment_setup import TestLibraryManager, LibraryPopulationPolicy, LibraryType, Storage, populate_library
from arcticdb.util.utils import DataRangeUtils
from benchmarks.common import AsvBase


class AzureReadWrite(AsvBase):
"""
This class is for general read write tests on Azure Blob Storage
"""
rounds = 1
number = 3 # invokes 3 times the test runs between each setup-teardown
repeat = 1 # defines the number of times the measurements will invoke setup-teardown
min_run_count = 1
warmup_time = 0

timeout = 1200

library_manager = TestLibraryManager(storage=Storage.AZURE, name_benchmark="READ_WRITE")
library_type = LibraryType.PERSISTENT

param_names = ["num_rows"]
params = [10_000_000] # 10M rows

number_columns = 100 # 100 columns

def get_library_manager(self) -> TestLibraryManager:
return AzureReadWrite.library_manager

def get_population_policy(self) -> LibraryPopulationPolicy:
lpp = LibraryPopulationPolicy(self.get_logger())
lpp.set_parameters(AzureReadWrite.params, [AzureReadWrite.number_columns])
return lpp

def setup_cache(self):
self.symbol = "test_symbol"
self.to_write_df = None
self.last_20 = None

def setup(self, num_rows):
self.lib = self.get_library_manager().get_library(AzureReadWrite.library_type, 1)

# Generate test data with mixed types including strings
df_generator = DFGenerator(num_rows, [AzureReadWrite.number_columns])
self.to_write_df = df_generator.generate_dataframe()

# Add some string columns
string_cols = [f"string_{i}" for i in range(10)] # 10 string columns
for col in string_cols:
self.to_write_df[col] = [f"string_value_{i}" for i in range(num_rows)]

# Write the data
self.lib.write(self.symbol, self.to_write_df)

# Calculate date range for last 20% of rows
self.last_20 = self.get_last_x_percent_date_range(num_rows, 20)

def time_read(self, num_rows):
self.lib.read(self.symbol)

def peakmem_read(self, num_rows):
self.lib.read(self.symbol)

def time_write(self, num_rows):
self.lib.write(self.symbol, self.to_write_df)

def peakmem_write(self, num_rows):
self.lib.write(self.symbol, self.to_write_df)

def time_read_with_column_float(self, num_rows):
COLS = ["float2"]
self.lib.read(symbol=self.symbol, columns=COLS).data

def peakmem_read_with_column_float(self, num_rows):
COLS = ["float2"]
self.lib.read(symbol=self.symbol, columns=COLS).data

def time_read_with_columns_all_types(self, num_rows):
COLS = ["float2", "string_0", "bool", "int64", "uint64"]
self.lib.read(symbol=self.symbol, columns=COLS).data

def peakmem_read_with_columns_all_types(self, num_rows):
COLS = ["float2", "string_0", "bool", "int64", "uint64"]
self.lib.read(symbol=self.symbol, columns=COLS).data

def time_write_staged(self, num_rows):
lib = self.lib
lib.write(self.symbol, self.to_write_df, staged=True)
lib._nvs.compact_incomplete(self.symbol, False, False)

def peakmem_write_staged(self, num_rows):
lib = self.lib
lib.write(self.symbol, self.to_write_df, staged=True)
lib._nvs.compact_incomplete(self.symbol, False, False)

def time_read_with_date_ranges_last20_percent_rows(self, num_rows):
self.lib.read(symbol=self.symbol, date_range=self.last_20).data

def peakmem_read_with_date_ranges_last20_percent_rows(self, num_rows):
self.lib.read(symbol=self.symbol, date_range=self.last_20).data

def get_last_x_percent_date_range(self, num_rows, percents):
df_generator = self.population_policy.df_generator
freq = df_generator.freq
return DataRangeUtils.get_last_x_percent_date_range(
initial_timestamp=df_generator.initial_timestamp,
freq=freq,
num_rows=num_rows,
percents=percents
)


class AzureListVersions(AsvBase):
"""
This class is for testing list_versions performance on Azure Blob Storage
"""
rounds = 1
number = 3
repeat = 1
min_run_count = 1
warmup_time = 0

timeout = 1200

library_manager = TestLibraryManager(storage=Storage.AZURE, name_benchmark="LIST_VERSIONS")
library_type = LibraryType.PERSISTENT

param_names = ["num_symbols"]
params = [10_000] # 10k symbols

def get_library_manager(self) -> TestLibraryManager:
return AzureListVersions.library_manager

def get_population_policy(self) -> LibraryPopulationPolicy:
lpp = LibraryPopulationPolicy(self.get_logger())
lpp.set_parameters([1000] * AzureListVersions.params[0], [10]) # 1000 rows per symbol, 10 columns
return lpp

def setup_cache(self):
self.test_counter = 1

def setup(self, num_symbols):
self.lib = self.get_library_manager().get_library(AzureListVersions.library_type, num_symbols)

# Generate and write test data
start = time.time()
policy = self.get_population_policy()
policy.set_parameters([1000] * num_symbols, [10])
if not self.library_manager.has_library(AzureListVersions.library_type, num_symbols):
populate_library(self.library_manager, policy, AzureListVersions.library_type, num_symbols)
self.get_logger().info(f"Generated {num_symbols} symbols with 1000 rows each in {time.time() - start:.2f}s")
else:
self.get_logger().info("Library already exists, population skipped")

# Clear cache to ensure we're testing actual storage performance
self.lib._nvs.version_store._clear_symbol_list_keys()

def time_list_versions(self, num_symbols):
assert self.test_counter == 1, "Test executed only once in setup-teardown cycle"
self.lib.list_versions()
self.test_counter += 1

def peakmem_list_versions(self, num_symbols):
assert self.test_counter == 1, "Test executed only once in setup-teardown cycle"
self.lib.list_versions()
self.test_counter += 1
Loading