Skip to content

Commit 490eaa9

Browse files
committed
abstract runner logic out + create stubs for new DB's
Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
1 parent dd9faad commit 490eaa9

File tree

10 files changed

+534
-368
lines changed

10 files changed

+534
-368
lines changed

benchmarks/worker_coordinator/parsing_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import pytest
3131
import ujson
3232

33-
from osbenchmark.worker_coordinator import runner
33+
from osbenchmark.worker_coordinator import runners as runner
3434

3535
@pytest.mark.benchmark(
3636
group="parse",

benchmarks/worker_coordinator/runner_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
import pytest
2626

27-
from osbenchmark.worker_coordinator import runner
27+
from osbenchmark.worker_coordinator import runners as runner
2828

2929
bulk_index = runner.BulkIndex()
3030

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
#
3+
# The OpenSearch Contributors require contributions made to
4+
# this file be licensed under the Apache-2.0 license or a
5+
# compatible open source license.
6+
# Modifications Copyright OpenSearch Contributors. See
7+
# GitHub history for details.
8+
# Licensed to Elasticsearch B.V. under one or more contributor
9+
# license agreements. See the NOTICE file distributed with
10+
# this work for additional information regarding copyright
11+
# ownership. Elasticsearch B.V. licenses this file to you under
12+
# the Apache License, Version 2.0 (the "License"); you may
13+
# not use this file except in compliance with the License.
14+
# You may obtain a copy of the License at
15+
#
16+
# http://www.apache.org/licenses/LICENSE-2.0
17+
#
18+
# Unless required by applicable law or agreed to in writing,
19+
# software distributed under the License is distributed on an
20+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
21+
# KIND, either express or implied. See the License for the
22+
# specific language governing permissions and limitations
23+
# under the License.
24+
25+
"""
26+
Runner package for OpenSearch Benchmark.
27+
28+
This package contains:
29+
- base.py: Database-agnostic runner infrastructure (base classes, utilities)
30+
- opensearch.py: OpenSearch-specific runner implementations
31+
- vespa.py: Vespa runner implementations (stub for future PR)
32+
33+
The registry functions (register_runner, runner_for, etc.) live here in __init__.py
34+
and are the central coordination point for all runner registration.
35+
"""
36+
37+
import logging
38+
import types
39+
40+
from osbenchmark import exceptions, workload
41+
from osbenchmark.worker_coordinator.runners.base import (
42+
Runner,
43+
Delegator,
44+
time_func,
45+
request_context_holder,
46+
mandatory,
47+
escape,
48+
remove_prefix,
49+
unwrap,
50+
_single_cluster_runner,
51+
_multi_cluster_runner,
52+
_with_assertions,
53+
_with_completion,
54+
MultiClientRunner,
55+
AssertingRunner,
56+
NoCompletion,
57+
WithCompletion,
58+
)
59+
60+
__RUNNERS = {}
61+
62+
63+
def register_runner(operation_type, runner, **kwargs):
64+
logger = logging.getLogger(__name__)
65+
async_runner = kwargs.get("async_runner", False)
66+
if isinstance(operation_type, workload.OperationType):
67+
operation_type = operation_type.to_hyphenated_string()
68+
69+
if not async_runner:
70+
raise exceptions.BenchmarkAssertionError(
71+
"Runner [{}] must be implemented as async runner and registered with async_runner=True.".format(str(runner)))
72+
73+
if getattr(runner, "multi_cluster", False):
74+
if "__aenter__" in dir(runner) and "__aexit__" in dir(runner):
75+
if logger.isEnabledFor(logging.DEBUG):
76+
logger.debug("Registering runner object [%s] for [%s].", str(runner), str(operation_type))
77+
cluster_aware_runner = _multi_cluster_runner(runner, str(runner), context_manager_enabled=True)
78+
else:
79+
if logger.isEnabledFor(logging.DEBUG):
80+
logger.debug("Registering context-manager capable runner object [%s] for [%s].", str(runner), str(operation_type))
81+
cluster_aware_runner = _multi_cluster_runner(runner, str(runner))
82+
# we'd rather use callable() but this will erroneously also classify a class as callable...
83+
elif isinstance(runner, types.FunctionType):
84+
if logger.isEnabledFor(logging.DEBUG):
85+
logger.debug("Registering runner function [%s] for [%s].", str(runner), str(operation_type))
86+
cluster_aware_runner = _single_cluster_runner(runner, runner.__name__)
87+
elif "__aenter__" in dir(runner) and "__aexit__" in dir(runner):
88+
if logger.isEnabledFor(logging.DEBUG):
89+
logger.debug("Registering context-manager capable runner object [%s] for [%s].", str(runner), str(operation_type))
90+
cluster_aware_runner = _single_cluster_runner(runner, str(runner), context_manager_enabled=True)
91+
else:
92+
if logger.isEnabledFor(logging.DEBUG):
93+
logger.debug("Registering runner object [%s] for [%s].", str(runner), str(operation_type))
94+
cluster_aware_runner = _single_cluster_runner(runner, str(runner))
95+
96+
__RUNNERS[operation_type] = _with_completion(_with_assertions(cluster_aware_runner))
97+
98+
99+
def runner_for(operation_type):
100+
try:
101+
return __RUNNERS[operation_type]
102+
except KeyError:
103+
raise exceptions.BenchmarkError("No runner available for operation type [%s]" % operation_type)
104+
105+
106+
def enable_assertions(enabled):
107+
"""
108+
Changes whether assertions are enabled. The status changes for all tasks that are executed after this call.
109+
110+
:param enabled: ``True`` to enable assertions, ``False`` to disable them.
111+
"""
112+
AssertingRunner.assertions_enabled = enabled
113+
114+
115+
# Only intended for unit-testing!
116+
def remove_runner(operation_type):
117+
del __RUNNERS[operation_type]
118+
119+
120+
# Re-export OpenSearch runners and register_default_runners for convenience
121+
from osbenchmark.worker_coordinator.runners.opensearch import * # noqa: F401,F403,E402
122+
from osbenchmark.worker_coordinator.runners.opensearch import register_default_runners # noqa: E402

0 commit comments

Comments
 (0)