|
9 | 9 |
|
10 | 10 | import requests |
11 | 11 | from requests.exceptions import ConnectionError, HTTPError |
| 12 | +from tenacity import Retrying, stop_after_attempt, wait_fixed, RetryCallState |
| 13 | +import urllib3 |
12 | 14 |
|
13 | 15 | from core.cluster import SUBSTRATES, ClusterState |
14 | 16 | from core.workload import WorkloadBase |
|
18 | 20 | MSG_STATUS_APP_REMOVED, |
19 | 21 | MSG_STATUS_DB_DOWN, |
20 | 22 | MSG_STATUS_DB_MISSING, |
| 23 | + MSG_STATUS_DB_UNHEALTHY, |
21 | 24 | MSG_STATUS_ERROR, |
22 | 25 | MSG_STATUS_HANGING, |
23 | 26 | MSG_STATUS_UNAVAIL, |
@@ -78,41 +81,59 @@ def opensearch_ok(self) -> tuple[bool, str]: |
78 | 81 | ): |
79 | 82 | return False, MSG_STATUS_DB_MISSING |
80 | 83 |
|
| 84 | + def log_retry(retry_state: RetryCallState) -> None: |
| 85 | + """Log retry attempts.""" |
| 86 | + logger.debug( |
| 87 | + f"Retrying... Attempt {retry_state.attempt_number}" |
| 88 | + f"\tException: {retry_state.outcome.exception()}" |
| 89 | + ) |
| 90 | + |
81 | 91 | for endpoint in self.state.opensearch_server.endpoints: |
82 | 92 | full_url = f"https://{endpoint}/{HEALTH_OPENSEARCH_STATUS_URL}" |
83 | 93 |
|
84 | 94 | request_kwargs = { |
85 | 95 | "url": full_url, |
86 | 96 | "method": "GET", |
87 | 97 | "verify": self.workload.paths.opensearch_ca, |
88 | | - "headers": None, |
| 98 | + "headers": { |
| 99 | + "Content-Type": "application/json", |
| 100 | + "Accept": "application/json", |
| 101 | + }, |
89 | 102 | "timeout": REQUEST_TIMEOUT, |
90 | 103 | } |
91 | 104 |
|
92 | | - try: |
93 | | - with requests.Session() as s: |
| 105 | + with requests.Session() as s: |
| 106 | + try: |
94 | 107 | s.auth = ( # type: ignore [reportAttributeAccessIssue] |
95 | 108 | self.state.opensearch_server.username, |
96 | 109 | self.state.opensearch_server.password, |
97 | 110 | ) |
98 | | - resp = s.request(**request_kwargs) |
99 | | - resp.raise_for_status() |
100 | | - except requests.exceptions.RequestException: |
101 | | - continue |
| 111 | + for attempt in Retrying( |
| 112 | + stop=stop_after_attempt(3), |
| 113 | + wait=wait_fixed(1), |
| 114 | + reraise=True, |
| 115 | + before_sleep=log_retry, |
| 116 | + ): |
| 117 | + with attempt: |
| 118 | + resp = s.request(**request_kwargs) |
| 119 | + resp.raise_for_status() |
| 120 | + except (requests.RequestException, urllib3.exceptions.HTTPError): |
| 121 | + logger.error(f"Failed to connect to {full_url}") |
| 122 | + continue |
102 | 123 |
|
103 | 124 | if resp.status_code == 200: |
104 | 125 | try: |
105 | 126 | status = resp.json() |
106 | 127 | except requests.exceptions.JSONDecodeError: |
| 128 | + logger.error(f"Failed to decode JSON from {full_url}") |
107 | 129 | continue |
108 | | - if status.get("status") == "green": |
109 | | - return True, "" |
110 | 130 |
|
111 | | - return False, MSG_STATUS_DB_DOWN |
| 131 | + if status.get("status") == "red": |
| 132 | + return False, MSG_STATUS_DB_UNHEALTHY |
112 | 133 |
|
113 | | - def app_healthy(self) -> tuple[bool, str]: |
114 | | - """Unit-level global healthcheck.""" |
115 | | - return self.opensearch_ok() |
| 134 | + if status.get("status") in {"green", "yellow"}: |
| 135 | + return True, "" |
| 136 | + return False, MSG_STATUS_DB_DOWN |
116 | 137 |
|
117 | 138 | def unit_healthy(self) -> tuple[bool, str]: |
118 | 139 | """Unit-level global healthcheck.""" |
|
0 commit comments