Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asv v2 s3 tests (Refactored) #2249

Open
wants to merge 48 commits into
base: master
Choose a base branch
from
Open

Asv v2 s3 tests (Refactored) #2249

wants to merge 48 commits into from

Conversation

grusev
Copy link
Collaborator

@grusev grusev commented Mar 17, 2025

Reference Issues/PRs

Contains refactored framework for setting up shared storages + tests for AWS S3 storage

Merged 3 Prs into one:

Log from execution of all tests: https://github.com/man-group/ArcticDB/actions/runs/13900462771/job/38890832599
another one: https://github.com/man-group/ArcticDB/actions/runs/13919594637/job/38949623574

What does this implement or fix?

Any other comments?

Checklist

Checklist for code changes...
  • Have you updated the relevant docstrings, documentation and copyright notice?
  • Is this contribution tested against all ArcticDB's features?
  • Do all exceptions introduced raise appropriate error messages?
  • Are API changes highlighted in the PR description?
  • Is the PR labelled as enhancement or bug so it appears in autogenerated release notes?

@grusev grusev added the patch Small change, should increase patch version label Mar 17, 2025
MODIFIABLE = "MODIFIABLE"


class StorageSetup:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The StorageSetup class can easily be refactored to be more readable like this:

def aws_default_factory() -> BaseS3StorageFixtureFactory:
    return real_s3_from_environment_variables(shared_path=True)

def get_machine_id() -> str:
    """
    Returns machine id, or id specified through environments variable (for github)
    """
    return os.getenv("ARCTICDB_PERSISTENT_STORAGE_SHARED_PATH_PREFIX", socket.gethostname())


def create_prefix(storage_space: StorageSpace, add_to_prefix: str) -> str:
    def is_valid_string(s: str) -> bool:
        return bool(s and s.strip())

    mandatory_part = storage_space.value
    optional = add_to_prefix if is_valid_string(add_to_prefix) else ''
    return f"{mandatory_part}/{optional}" if optional else mandatory_part


def check_persistence_access(storage_space: StorageSpace, confirm_persistent_storage_need: bool = False):
    assert aws_default_factory(), "Environment variables not initialized (ARCTICDB_REAL_S3_ACCESS_KEY,ARCTICDB_REAL_S3_SECRET_KEY)"
    if storage_space == StorageSpace.PERSISTENT:
        assert confirm_persistent_storage_need, "Use of persistent store not confirmed!"


def get_arctic_uri(storage: Storage, storage_space: StorageSpace, add_to_prefix: str = None, confirm_persistent_storage_need: bool = False) -> str:
    check_persistence_access(storage_space, confirm_persistent_storage_need)
    prefix = create_prefix(storage_space, add_to_prefix)
    if storage == Storage.AMAZON:
        factory = aws_default_factory()
        factory.default_prefix = prefix
        return factory.create_fixture().arctic_uri
    elif storage == Storage.LMDB:
        return f"lmdb://{tempfile.gettempdir()}/benchmarks_{prefix}"
    else:
        raise Exception("Unsupported storage type:", storage)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not agree. There is a value of separating responsibility and working TestLibraryManager.

It provides better isolation and management. So I disagree with making those changes



class StorageInfo:
class LibraryManager:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we spoke yesterday, this LibraryManager is unnecessary because it duplicates a lot of the logic of Arctic's internal LibraryManager.
The only needed functionality is:

  • having a function to create the persistent/modifiable Arctic client with the correct URIs
  • having a function that constructs the correct names for the libraries
  • some helper function for cleaning up modifiable libraries, which can just iterate over the libraries in the modifiable Arctic client

Every thing else here is can easily be handled though the arctic clients directly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again - all tests work, pass and the abstraction isolates very well user from ASV etc. All points you make are perhaps valid but constitute totaly different approach. As I disagree with that approach on real grounds I cannot make those changes

TestLibraryManager isolates within itself all needs for user to understand the specifics of the structure and lets the person who writes the test to write any types of test using requested libraries.

It gives the creator full freedom to do what is needed to achive best result

It provides a way of work that eliminates the need of test author to know ASV internals and thus protects from problems that will arise during test execution

It also gives ability without change of test code to make changes in the structure of the the storage spaces.

All that is well tested and the framework itself is covered with tests that can be extended.

There is no point of making any changes rather than wasting more resources on this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, It is TestLibrary manager now


WIDE_DATAFRAME_NUM_COLS = 30_000

class LargeAppendDataModifyCache:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is not very descriptive and the comments seems a bit misleading.
AFAICS this is a cache for the expected results thought the run.
The name/comment should reflect that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make necessary changes

def get_population_policy(self):
pass

def get_index_info(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be renamed to something like index_start, then it doesn't need the comment

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the naming is ok. It returns both start and the index frequence - hence the name get_index_info(), also the comment there suggests that:

def get_index_info(self):
    """
    Returns initial timestamp and index frequency
    """
    return (pd.Timestamp("2-2-1986"), 's')

def initialize_cache(self, warmup_time, params, num_cols, num_sequential_dataframes):
# warmup will execute tests additional time and we do not want that at all for write
# update and append tests. We want exact specified `number` of times to be executed between
assert warmup_time == 0, "warm up must be 0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it can be only 1 value, why do we even have it as a parameter?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameters of the tests are defined as such in each test case. Thus test case A parameters are memebers of class A not to the instance. Test case B parameters are for test case B. The function checks parameters of different classes

class AWSLargeAppendTests(AsvBase)
class AWS30kColsWideDFLargeAppendTests(AWSLargeAppendTests)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test code in some tests do contain assertains about the certains ASV parameter values. Those assertions are needed because the tests can either not work if they do not have that specific value or they might work but produce false results.

that is additional thing added to the new implementation of tests. They do check 2 things:

  • validity of the preconditions (usually ASV parameters, or setup)
  • validity of test operations (see tests for batches for instance the asserts for batch operations, which are silent by default - ie they do not fail laoudly if error)

# update and append tests. We want exact specified `number` of times to be executed between
assert warmup_time == 0, "warm up must be 0"

num_sequential_dataframes = num_sequential_dataframes + 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_sequential_dataframes += 1

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it is also correct in current form right?


return cache

def initialize_update_dataframes(self, num_rows: int, num_cols: int, cached_results: LargeAppendDataModifyCache,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is a bit hard to follow, consider refactoring it to something like:

def initialize_update_dataframes(self, num_rows: int, num_cols: int, cached_results: LargeAppendDataModifyCache, 
                                 generator: SequentialDataframesGenerator):
    logger = self.get_logger()
    initial_timestamp, freq = self.get_index_info()
    timestamp_number = TimestampNumber.from_timestamp(initial_timestamp, freq)
    
    def log_time_range(update_type: str, df_key: int):
        time_range = generator.get_first_and_last_timestamp([cached_results[update_type][df_key]])
        logger.info(f"Time range {update_type.upper()} update {time_range}")

    def generate_and_log(update_type: str, num_rows: int, start_ts: pd.Timestamp):
        df = generator.df_generator.get_dataframe(number_rows=num_rows, number_columns=num_cols, start_timestamp=start_ts, freq=freq)
        cached_results[update_type][num_rows] = df
        log_time_range(update_type, num_rows)

    logger.info(f"Frame START-LAST Timestamps {timestamp_number} == {timestamp_number + num_rows}")

    # Full update
    generate_and_log('update_full_dict', num_rows, initial_timestamp)

    # Half update
    half = num_rows // 2
    timestamp_number.inc(half - 3)
    generate_and_log('update_half_dict', half, timestamp_number.to_timestamp())

    # Upsert update
    generate_and_log('update_upsert_dict', num_rows, timestamp_number.to_timestamp())

    # Single update
    timestamp_number.inc(half)
    generate_and_log('update_single_dict', 1, timestamp_number.to_timestamp())

    # Single append
    next_timestamp = generator.get_next_timestamp_number(cached_results.write_and_append_dict[num_rows], freq)
    generate_and_log('append_single_dict', 1, next_timestamp.to_timestamp())

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sence!


def get_modifiable_library(self, library_suffix: Union[str, int] = None) -> Library:

class LibraryPopulationPolicy:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we have discussed, this can be greatly simplified by decoupling the configuration for the population from the logic of executing the populating.
This can be done with a refactor like:

@dataclass
class LibraryPopulationConfig:
    """Immutable configuration for library population."""
    parameters: List[int]
    parameters_are_rows: bool = True
    fixed_rows: int = 1
    fixed_columns: int = 1
    symbol_prefix: str = ""
    use_auto_increment: bool = False
    with_metadata: bool = False
    versions_count: int = 1
    versions_mean: float = 1.0
    with_snapshots: bool = False

    def symbol_name(self, index: int) -> str:
        """Get the symbol name based on configuration."""
        prefix = f"symbol_{self.symbol_prefix}_" if self.symbol_prefix else "symbol_"
        return f"{prefix}{index}"
    
    def create_metadata(self) -> Dict[str, Any]:
        """Create metadata for symbols and snapshots."""
        if not self.with_metadata:
            return {}
        return DFGenerator.generate_random_dataframe(rows=3, cols=10).to_dict()


class LibraryPopulator:
    """
    Handles the actual population of a library based on a configuration.
    Separates the configuration from the execution.
    """
    def __init__(self, config: LibraryPopulationConfig, logger: logging.Logger, 
                 df_generator: DataFrameGenerator = None):
        self.config = config
        self.logger = logger
        self.df_generator = df_generator or VariableSizeDataframe()
    
    def populate(self, library):
        """Populate the library according to the configuration."""
        start_time = time.time()
        
        for i, param in enumerate(self.config.parameters):
            # Determine symbol index
            symbol_index = i if self.config.use_auto_increment else param
            symbol_name = self.config.symbol_name(symbol_index)
            
            # Determine rows and columns
            rows = param if self.config.parameters_are_rows else self.config.fixed_rows
            columns = self.config.fixed_columns if self.config.parameters_are_rows else param
            
            # Generate dataframe
            df = self.df_generator.generate_dataframe(rows, columns)
            
            # Create symbol
            symbol = library.create_symbol(symbol_name, df)
            
            # Add metadata if configured
            if self.config.with_metadata:
                symbol.set_metadata(self.config.create_metadata())
            
            # Create versions if configured
            if self.config.versions_count > 1:
                versions_list = self._generate_versions_list(len(self.config.parameters))
                for v in range(1, min(versions_list[i], self.config.versions_count) + 1):
                    version_df = self.df_generator.generate_dataframe(rows, columns)
                    version = symbol.create_version(version_df)
                    
                    # Add metadata if configured
                    if self.config.with_metadata:
                        version.set_metadata(self.config.create_metadata())
                    
                    # Create snapshot if configured
                    if self.config.with_snapshots:
                        snapshot = library.create_snapshot(f"snapshot_{symbol_name}_{v}")
                        if self.config.with_metadata:
                            snapshot.set_metadata(self.config.create_metadata())
        
        self.logger.info(f"Population completed in: {time.time() - start_time:.2f}s")
    
    def _generate_versions_list(self, number_symbols: int) -> List[np.int64]:
        """Generate a list of version counts for each symbol."""
        # Implementation would depend on your specific requirements
        # This is a placeholder based on the original code
        versions_list = np.random.poisson(self.config.versions_mean, number_symbols)
        versions_list = np.clip(versions_list, 1, self.config.versions_count)
        return versions_list.astype(np.int64)

The code is just a example that I got from a pass though Claude and can be simplified further e.g:

  • there are parameter in the Policy that can be removed
  • we probably don't need a LibraryPopulator class, as some helper functions that take a policy should suffice

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed we also agreed that current code is not conflicting any recommendations. Additionally there is no real benefit of making that rewrite other than to pass Claude's suggestions. The code is just ok in the form that is now. No need to changed it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
patch Small change, should increase patch version
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants