Skip to content

Commit a967bfd

Browse files
authored
Fix --number-of-docs bug in create-workload (#659)
Signed-off-by: Ian Hoang <[email protected]>
1 parent 7c6e1b7 commit a967bfd

File tree

4 files changed

+58
-18
lines changed

4 files changed

+58
-18
lines changed

osbenchmark/utils/opts.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,14 @@ class StoreKeyPairAsDict(argparse.Action):
125125
"""
126126
def __call__(self, parser, namespace, values, option_string=None):
127127
custom_dict = {}
128-
for kv in values:
128+
129+
if len(values) == 1:
130+
# If values contains spaces, user provided 2+ key value pairs
131+
kv_pairs = values[0].split(" ")
132+
else:
133+
kv_pairs = values
134+
135+
for kv in kv_pairs:
129136
try:
130137
k,v = kv.split(":")
131138
custom_dict[k] = v

osbenchmark/workload_generator/extractors.py

+26-5
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212
import os
1313
from abc import ABC, abstractmethod
1414

15-
from opensearchpy import OpenSearchException
15+
import opensearchpy.exceptions
1616

17+
from osbenchmark import exceptions
1718
from osbenchmark.utils import console
1819
from osbenchmark.workload_generator.config import CustomWorkload
1920

@@ -41,8 +42,10 @@ def extract_indices(self, workload_path):
4142
try:
4243
for index in self.custom_workload.indices:
4344
extracted_indices += self.extract(workload_path, index.name)
44-
except OpenSearchException:
45-
self.logger("Failed at extracting index [%s]", index)
45+
except opensearchpy.exceptions.NotFoundError:
46+
raise exceptions.SystemSetupError(f"Index [{index.name}] does not exist.")
47+
except opensearchpy.OpenSearchException:
48+
self.logger.error("Failed at extracting index [%s]", index)
4649
failed_indices += index
4750

4851
return extracted_indices, failed_indices
@@ -138,6 +141,9 @@ def extract_documents(self, index, documents_limit=None):
138141

139142

140143
class SequentialCorpusExtractor(CorpusExtractor):
144+
DEFAULT_TEST_MODE_DOC_COUNT = 1000
145+
DEFAULT_TEST_MODE_SUFFIX = "-1k"
146+
141147
def __init__(self, custom_workload, client):
142148
self.custom_workload: CustomWorkload = custom_workload
143149
self.client = client
@@ -173,15 +179,30 @@ def extract_documents(self, index, documents_limit=None):
173179

174180
documents_to_extract = total_documents if not documents_limit else min(total_documents, documents_limit)
175181

182+
if documents_limit:
183+
# Only time when documents-1k.json will be less than 1K documents is
184+
# when the documents_limit is < 1k documents or source index has less than 1k documents
185+
if documents_limit < self.DEFAULT_TEST_MODE_DOC_COUNT:
186+
test_mode_warning_msg = "Due to --number-of-docs set by user, " + \
187+
f"test-mode docs will be less than the default {self.DEFAULT_TEST_MODE_DOC_COUNT} documents."
188+
console.warn(test_mode_warning_msg)
189+
190+
# Notify users when they specified more documents than available in index
191+
if documents_limit > total_documents:
192+
documents_to_extract_warning_msg = f"User requested extraction of {documents_limit} documents " + \
193+
f"but there are only {total_documents} documents in {index}. " + \
194+
f"Will only extract {total_documents} documents from {index}."
195+
console.warn(documents_to_extract_warning_msg)
196+
176197
if documents_to_extract > 0:
177198
logger.info("[%d] total docs in index [%s]. Extracting [%s] docs.", total_documents, index, documents_to_extract)
178199
docs_path = self._get_doc_outpath(self.custom_workload.workload_path, index)
179200
# Create test mode corpora
180201
self.dump_documents(
181202
self.client,
182203
index,
183-
self._get_doc_outpath(self.custom_workload.workload_path, index, "-1k"),
184-
min(documents_to_extract, 1000),
204+
self._get_doc_outpath(self.custom_workload.workload_path, index, self.DEFAULT_TEST_MODE_SUFFIX),
205+
min(documents_to_extract, self.DEFAULT_TEST_MODE_DOC_COUNT),
185206
" for test mode")
186207
# Create full corpora
187208
self.dump_documents(self.client, index, docs_path, documents_to_extract)

osbenchmark/workload_generator/helpers.py

+23-12
Original file line numberDiff line numberDiff line change
@@ -125,18 +125,29 @@ def process_queries(self):
125125

126126
return processed_queries
127127

128-
def process_indices(indices, document_frequency, number_of_docs):
128+
def process_indices(indices, document_frequency, indices_docs_mapping):
129129
processed_indices = []
130130
for index_name in indices:
131-
index = Index(
132-
name=index_name,
133-
document_frequency=document_frequency,
134-
number_of_docs=number_of_docs
135-
)
136-
processed_indices.append(index)
131+
try:
132+
#Setting number_of_docs_for_index to None means OSB will grab all docs available in index
133+
number_of_docs_for_index = None
134+
if indices_docs_mapping and index_name in indices_docs_mapping:
135+
number_of_docs_for_index = int(indices_docs_mapping[index_name])
136+
if number_of_docs_for_index <= 0:
137+
raise exceptions.SystemSetupError(
138+
"Values specified with --number-of-docs must be greater than 0")
139+
140+
index = Index(
141+
name=index_name,
142+
document_frequency=document_frequency,
143+
number_of_docs=number_of_docs_for_index
144+
)
145+
processed_indices.append(index)
137146

138-
return processed_indices
147+
except ValueError as e:
148+
raise exceptions.SystemSetupError("Ensure you are using integers if providing --number-of-docs.", e)
139149

150+
return processed_indices
140151

141152
def validate_index_documents_map(indices, indices_docs_map):
142153
logger = logging.getLogger(__name__)
@@ -147,13 +158,13 @@ def validate_index_documents_map(indices, indices_docs_map):
147158

148159
if len(indices) < len(indices_docs_map):
149160
raise exceptions.SystemSetupError(
150-
"Number of <index>:<doc_count> pairs exceeds number of indices in --indices. " +
151-
"Ensure number of <index>:<doc_count> pairs is less than or equal to number of indices in --indices."
161+
"Number of <index>:<doc_count> pairs in --number-of-docs exceeds number of indices in --indices. " +
162+
"Ensure number of <index>:<doc_count> pairs is less than or equal to number of indices."
152163
)
153164

154165
for index_name in indices_docs_map:
155166
if index_name not in indices:
156167
raise exceptions.SystemSetupError(
157-
"Index from <index>:<doc_count> pair was not found in --indices. " +
158-
"Ensure that indices from all <index>:<doc_count> pairs exist in --indices."
168+
f"Index {index_name} provided in --number-of-docs was not found in --indices. " +
169+
"Ensure that all indices in --number-of-docs are present in --indices."
159170
)

osbenchmark/workload_generator/workload_generator.py

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def create_workload(cfg):
4040
console.info(f"Connected to OpenSearch cluster [{info['name']}] version [{info['version']['number']}].\n", logger=logger)
4141

4242
processed_indices = process_indices(indices, document_frequency, number_of_docs)
43+
logger.info("Processed Indices: %s", processed_indices)
4344

4445
custom_workload = CustomWorkload(
4546
workload_name=workload_name,

0 commit comments

Comments
 (0)