-
Notifications
You must be signed in to change notification settings - Fork 19
Upgrade ElasticSearch/OpenSearch Client Libraries #999
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -210,6 +210,7 @@ def __init__(self): | |
| self._environment_variables_dict['es_index'] = EnvironmentVariables.get_env('es_index', es_index) | ||
| self._environment_variables_dict['es_doc_type'] = EnvironmentVariables.get_env('es_doc_type', '') | ||
| self._environment_variables_dict['ES_TIMEOUT'] = EnvironmentVariables.get_env('ES_TIMEOUT', 2000) | ||
| self._environment_variables_dict['ES_SERVER_TYPE'] = EnvironmentVariables.get_env('ES_SERVER_TYPE', 'opensearch') | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Normalize At Line 213, normalize case/whitespace so non-exact env values don’t accidentally select the OpenSearch branch. Proposed fix- self._environment_variables_dict['ES_SERVER_TYPE'] = EnvironmentVariables.get_env('ES_SERVER_TYPE', 'opensearch')
+ self._environment_variables_dict['ES_SERVER_TYPE'] = (
+ EnvironmentVariables.get_env('ES_SERVER_TYPE', 'opensearch').strip().lower()
+ )🤖 Prompt for AI Agents |
||
|
|
||
| # GitHub credentials | ||
| self._environment_variables_dict['git_access_token'] = EnvironmentVariables.get_env('git_access_token', '') | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,6 @@ def __init__(self, **kwargs): | |
| self.__es_host = kwargs.get('es_host') | ||
| self.__es_port = kwargs.get('es_port') | ||
| self.__es_index = kwargs.get('es_index') | ||
| self.__es_doc_type = kwargs.get('es_doc_type') | ||
| self.__es_add_items = kwargs.get('es_add_items') | ||
| self.__bucket_name = kwargs.get('bucket') | ||
| self.__s3_file_name = kwargs.get('s3_file_name') | ||
|
|
@@ -95,14 +94,13 @@ def __get_user_cost(self, data): | |
| num += 1 | ||
| return user_cost_results | ||
|
|
||
| def upload_last_policy_to_elasticsearch(self, policy: str, index: str, doc_type: str, s3_json_file: str, | ||
| def upload_last_policy_to_elasticsearch(self, policy: str, index: str, s3_json_file: str, | ||
| es_add_items: dict = None): | ||
|
Comment on lines
+97
to
98
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
With the current flow, Proposed fix- def upload_last_policy_to_elasticsearch(self, policy: str, index: str, s3_json_file: str,
- es_add_items: dict = None):
+ def upload_last_policy_to_elasticsearch(self, policy: str, index: str, s3_json_file: str,
+ es_add_items: dict | None = None):
...
- for key, value in es_add_items.items():
+ for key, value in (es_add_items or {}).items():
data[key] = value
...
def upload_to_es(self, account):
@@
- self.__es_add_items.update({'policy': self.__policy_name, 'region': self.__region_name})
+ self.__es_add_items = self.__es_add_items or {}
+ self.__es_add_items.update({'policy': self.__policy_name, 'region': self.__region_name})Also applies to: 210-212 🧰 Tools🪛 Ruff (0.15.14)[warning] 98-98: PEP 484 prohibits implicit Convert to (RUF013) 🤖 Prompt for AI Agents |
||
| """ | ||
| This method is upload json kubernetes cluster data into elasticsearch | ||
| :param policy: | ||
| :param s3_json_file: | ||
| :param index: | ||
| :param doc_type: | ||
| :param es_add_items: | ||
| :return: | ||
| """ | ||
|
|
@@ -210,5 +208,5 @@ def upload_to_es(self, account): | |
| """ | ||
| self.__es_add_items.update({'policy': self.__policy_name, 'region': self.__region_name}) | ||
| self.upload_last_policy_to_elasticsearch(policy=self.__policy_name, index=self.__es_index, | ||
| doc_type=self.__es_doc_type, s3_json_file=self.__s3_file_name, | ||
| s3_json_file=self.__s3_file_name, | ||
| es_add_items=self.__es_add_items) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,8 @@ | |
| AWS_SECRET_ACCESS_KEY_DELETE_PERF_SCALE = os.environ['AWS_SECRET_ACCESS_KEY_DELETE_PERF_SCALE'] | ||
| ES_HOST = os.environ['ES_HOST'] | ||
| ES_PORT = os.environ['ES_PORT'] | ||
| ES_USER = os.environ.get('ES_USER', '') | ||
| ES_PASSWORD = os.environ.get('ES_PASSWORD', '') | ||
| LDAP_HOST_NAME = os.environ['LDAP_HOST_NAME'] | ||
| JIRA_URL = os.environ['JIRA_URL'] | ||
| JIRA_USERNAME = os.environ['JIRA_USERNAME'] | ||
|
|
@@ -30,7 +32,8 @@ | |
| es_index = CLOUD_RESOURCE_ORCHESTRATION_INDEX | ||
|
|
||
| common_env_vars = { | ||
| 'es_host': ES_HOST, 'es_port': ES_PORT, 'CRO_ES_INDEX': CRO_ES_INDEX, 'log_level': 'INFO', | ||
| 'es_host': ES_HOST, 'es_port': ES_PORT, 'es_user': ES_USER, 'es_password': ES_PASSWORD, | ||
| 'CRO_ES_INDEX': CRO_ES_INDEX, 'log_level': 'INFO', | ||
|
Comment on lines
+35
to
+36
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
set -euo pipefail
# Show the relevant file with line numbers (only parts likely needed)
FILE="jenkins/cloud_resource_orchestration/run_cloud_resource_orchestration.py"
if [ ! -f "$FILE" ]; then
echo "Missing file: $FILE"
exit 1
fi
echo "=== File excerpt around lines 1-140 ==="
nl -ba "$FILE" | sed -n '1,160p'
echo "=== Grep for os.system usage and env dict creation ==="
rg -n "os\.system|subprocess\.run|env\s*=" "$FILE" || true
echo "=== Extract specific lines mentioned in review comment ==="
nl -ba "$FILE" | sed -n '20,120p' | sed -n '1,120p'
# Run flake8 as per instructions (error detection)
echo "=== Running flake8 error detection ==="
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# Run flake8 style validation (exit-zero allowed)
echo "=== Running flake8 style validation ==="
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statisticsRepository: redhat-performance/cloud-governance Length of output: 181 🏁 Script executed: #!/bin/bash
set -euo pipefail
FILE="jenkins/cloud_resource_orchestration/run_cloud_resource_orchestration.py"
echo "=== File existence ==="
ls -l "$FILE" || { echo "Missing $FILE"; exit 1; }
echo "=== Command availability (flake8) ==="
command -v flake8 || echo "flake8 not found"
echo "=== os.system / subprocess.run occurrences (file) ==="
rg -n "os\.system|subprocess\.run" "$FILE" || true
echo "=== Inspect around lines 1-140 (no nl; use sed) ==="
sed -n '1,160p' "$FILE"
echo "=== Inspect around env dict lines (35-60) ==="
sed -n '25,70p' "$FILE"
echo "=== Inspect around later command strings (70-110) ==="
sed -n '65,120p' "$FILE"
echo "=== Inspect around later command strings (90-120) ==="
sed -n '85,130p' "$FILE"
echo "=== Extract specific mentioned snippet lines (35-36, 54-55) with awk ==="
awk 'NR>=30 && NR<=60 {printf "%5d:%s\n", NR, $0}' "$FILE"
awk 'NR>=45 && NR<=65 {printf "%5d:%s\n", NR, $0}' "$FILE"
echo "=== Run flake8 commands only if available ==="
if command -v flake8 >/dev/null 2>&1; then
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics || true
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics || true
else
echo "Skipping flake8 (not installed in sandbox)."
fiRepository: redhat-performance/cloud-governance Length of output: 22289 Avoid interpolating ES credentials into
🤖 Prompt for AI Agents |
||
| 'LDAP_HOST_NAME': LDAP_HOST_NAME, | ||
| 'JIRA_QUEUE': JIRA_QUEUE, 'JIRA_TOKEN': JIRA_TOKEN, 'JIRA_USERNAME': JIRA_USERNAME, 'JIRA_URL': JIRA_URL, | ||
| 'CRO_COST_OVER_USAGE': CRO_COST_OVER_USAGE, 'CRO_PORTAL': CRO_PORTAL, 'CRO_DEFAULT_ADMINS': CRO_DEFAULT_ADMINS, | ||
|
|
@@ -48,7 +51,8 @@ | |
|
|
||
| os.system('echo Run CloudResourceOrchestration in pre active region') | ||
|
|
||
| common_input_vars = {'es_host': ES_HOST, 'es_port': ES_PORT, 'CRO_ES_INDEX': CRO_ES_INDEX, 'log_level': 'INFO', | ||
| common_input_vars = {'es_host': ES_HOST, 'es_port': ES_PORT, 'es_user': ES_USER, 'es_password': ES_PASSWORD, | ||
| 'CRO_ES_INDEX': CRO_ES_INDEX, 'log_level': 'INFO', | ||
| 'LDAP_HOST_NAME': LDAP_HOST_NAME, | ||
| 'JIRA_QUEUE': JIRA_QUEUE, 'JIRA_TOKEN': JIRA_TOKEN, 'JIRA_USERNAME': JIRA_USERNAME, | ||
| 'JIRA_URL': JIRA_URL, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: redhat-performance/cloud-governance
Length of output: 16509
🏁 Script executed:
Repository: redhat-performance/cloud-governance
Length of output: 194
🏁 Script executed:
Repository: redhat-performance/cloud-governance
Length of output: 14638
🏁 Script executed:
Repository: redhat-performance/cloud-governance
Length of output: 13338
🏁 Script executed:
Repository: redhat-performance/cloud-governance
Length of output: 690
🏁 Script executed:
Repository: redhat-performance/cloud-governance
Length of output: 122
🌐 Web query:
elasticsearch.helpers.bulk return value success errors tuple stats_only💡 Result:
The elasticsearch.helpers.bulk function in the Python Elasticsearch client returns a tuple containing two elements: (number_of_successful_actions, error_information) [1][2][3]. The structure of the second element in the returned tuple depends on the value of the stats_only parameter: If stats_only=False (the default): The second element is a list of error responses encountered during the bulk operation [1][2][3]. If stats_only=True: The second element is an integer representing the total number of failed operations [1][2][3]. Important implementation notes: 1. Raise on Error: By default, the bulk helper raises a BulkIndexError if any errors are encountered [1][3]. The stats_only parameter only effectively controls the return value if you set raise_on_error=False [1][3]. 2. Memory Usage: Collecting detailed error information (when stats_only=False) can lead to high memory usage when processing large datasets [1][3]. If you need to handle high volumes of data while tracking errors, the streaming_bulk helper is recommended, as it allows you to process results without storing all errors in memory [1][3].
Citations:
🌐 Web query:
opensearchpy.helpers.bulk return value tuple success errors💡 Result:
The
opensearchpy.helpers.bulkfunction returns a tuple containing two elements: a count of successfully executed actions and a collection of errors [1][2]. The exact structure of the second element in the tuple depends on thestats_onlyparameter: 1. Ifstats_onlyis set toFalse(the default), the second element is a list of error dictionaries describing each failed operation [1][2]. 2. Ifstats_onlyis set toTrue, the second element is an integer representing the total count of failed operations [1][2]. Important Considerations: - By default,helpers.bulkraises aBulkIndexErrorif any operation in the bulk request fails [3]. - To retrieve errors without raising an exception, you must setraise_on_error=Falsein the function arguments [1][2]. - Becausehelpers.bulkbuffers documents to reduce memory usage, if you need to process large datasets and handle errors, it is often more memory-efficient to use thestreaming_bulkhelper, which yields results for each individual operation rather than returning a single summary tuple at the end [1][2][4].Citations:
🏁 Script executed:
Repository: redhat-performance/cloud-governance
Length of output: 1035
Fix
upload_to_elasticsearchtype/runtime mismatch and incorrect bulk failure accountingupload_to_elasticsearch(..., data: dict)is wrapped with@typechecked(), yet the function later includes a JSON-array branch (else: for record in data ...). The body also performs dict-only operations (data.get(...),data.keys(), etc.) before that branch, so passing a list will fail (TypeCheckError and/or AttributeError). Align the type/signature with actual behavior: either remove the list branch or refactor to handle list-of-dicts before any dict operations.upload_data_in_bulkchecksif response:afterresponse = helpers.bulk(...).helpers.bulkreturns a tuple(success_count, errors)and (by default) raisesBulkIndexErroron errors, so this boolean check won’t properly detect partial failures and the code currently ignores the error details. Compute failures from the returned tuple (raise_on_error=False) and/or catchBulkIndexErrorto updatefailed_items.🧰 Tools
🪛 Ruff (0.15.14)
[warning] 139-139: PEP 484 prohibits implicit
OptionalConvert to
T | None(RUF013)
🤖 Prompt for AI Agents