Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
name: Run pre-commit, install test and CI tests
steps:
- name: Check out source repository
uses: actions/checkout@v2
uses: actions/checkout@v4
- name: Install Python
uses: actions/setup-python@v4
with:
Expand Down
130 changes: 108 additions & 22 deletions cerberus/kubernetes/client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import re
import os
import sys
import ssl
import yaml
import time
import logging
import requests
import urllib3
from urllib.parse import urlparse
from urllib3.contrib.socks import SOCKSProxyManager
from collections import defaultdict
from kubernetes import client, config
import cerberus.invoke.command as runcommand
Expand All @@ -14,7 +17,7 @@
pods_tracker = defaultdict(dict)

kubeconfig_path_global = ""

urllib3.disable_warnings()

# Load kubeconfig and initialize kubernetes python client
def initialize_clients(kubeconfig_path, chunk_size, timeout):
Expand All @@ -26,25 +29,96 @@ def initialize_clients(kubeconfig_path, chunk_size, timeout):
global kubeconfig_path_global

"""Initialize object and create clients from specified kubeconfig"""
client_config = client.Configuration()
# Load kubeconfig - this sets up authentication in the default configuration
config.load_kube_config(kubeconfig_path)

# Get the configuration with authentication already loaded
client_config = client.Configuration.get_default_copy()

# Check for HTTP proxy configuration
http_proxy = os.getenv("http_proxy", None)
"""Proxy has auth header"""
if http_proxy and "@" in http_proxy:
proxy_auth = http_proxy.split("@")[0].split("//")[1]
user_pass = proxy_auth.split(":")[0]
client_config.username = user_pass[0]
client_config.password = user_pass[1]
client_config.ssl_ca_cert = False
client_config.verify_ssl = False
config.load_kube_config(config_file=kubeconfig_path, persist_config=True, client_configuration=client_config)
proxy_url = http_proxy
if proxy_url:
client_config.proxy = proxy_url
if proxy_auth:
client_config.proxy_headers = urllib3.util.make_headers(proxy_basic_auth=proxy_auth)

client.Configuration.set_default(client_config)
cli = client.CoreV1Api()


if http_proxy and "socks" in http_proxy:
os.environ["HTTP_PROXY"] = http_proxy
# Configure SOCKS5 proxy
# Parse socks5://host:port format
proxy_parsed = urlparse(http_proxy)
proxy_host = proxy_parsed.hostname
proxy_port = proxy_parsed.port
proxy_username = proxy_parsed.username
proxy_password = proxy_parsed.password

logging.info(f"Configuring SOCKS5 proxy: {proxy_host}:{proxy_port}")

# Create SOCKS proxy URL (socks5h uses remote DNS resolution)
socks_proxy_url = f"socks5h://{proxy_host}:{proxy_port}"
if proxy_username and proxy_password:
socks_proxy_url = f"socks5h://{proxy_username}:{proxy_password}@{proxy_host}:{proxy_port}"
os.environ["SOCKS_URL"] = socks_proxy_url
# Prepare SSL context with client certificates from kubeconfig
socks_manager_kwargs = {
'proxy_url': socks_proxy_url,
'num_pools': 10,
'maxsize': 10,
'cert_reqs': ssl.CERT_REQUIRED if client_config.verify_ssl else ssl.CERT_NONE,
'ca_certs': client_config.ssl_ca_cert,
'ssl_version': ssl.PROTOCOL_TLS
}

# Add client certificate if present (for mutual TLS auth)
if client_config.cert_file:
socks_manager_kwargs['cert_file'] = client_config.cert_file
socks_manager_kwargs['key_file'] = client_config.key_file

# For development/testing with self-signed certs
if not client_config.verify_ssl:
socks_manager_kwargs['assert_hostname'] = False

# Disable SSL warnings for SOCKS proxy
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Create SOCKS proxy manager with proper settings
socks_proxy_manager = SOCKSProxyManager(**socks_manager_kwargs)

# Create API client with the configuration that has auth loaded
# This preserves all authentication settings from kubeconfig
api_client = client.ApiClient(configuration=client_config)

# Replace the default pool manager with SOCKS proxy manager
# This routes all HTTP(S) requests through the SOCKS proxy
api_client.rest_client.pool_manager = socks_proxy_manager

# Set this as the default configuration
client.Configuration.set_default(client_config)

# Use the custom API client for all APIs
cli = client.CoreV1Api(api_client=api_client)

logging.info("SOCKS5 proxy configuration completed")

elif http_proxy is not None:
https_proxy = os.getenv("https_proxy", None)
os.environ["HTTP_PROXY"] = http_proxy
os.environ["HTTPS_PROXY"] = https_proxy
# Configure HTTP proxy (existing logic)
client_config.proxy = http_proxy
proxy_auth = urlparse(http_proxy)
logging.info(f"Configuring HTTP proxy: {http_proxy}")

if proxy_auth.username is not None and proxy_auth.password is not None:
auth_string = proxy_auth.username + ":" + proxy_auth.password
client_config.proxy_headers = urllib3.util.make_headers(
proxy_basic_auth=auth_string
)
logging.info("HTTP proxy configuration completed")

client.Configuration.set_default(client_config)
cli = client.CoreV1Api()
else:
client.Configuration.set_default(client_config)
cli = client.CoreV1Api()

cmd_timeout = timeout
request_chunk_size = str(chunk_size)
kubeconfig_path_global = kubeconfig_path
Expand Down Expand Up @@ -412,12 +486,20 @@ def process_master_taint(master_nodes, master_label, iteration, iter_track_time)
# See if url is available
def is_url_available(url, header=None):
try:
response = requests.get(url, headers=header, verify=False)
# Disable SSL warnings for SOCKS proxy
urllib3.disable_warnings(urllib3.exceptions.SSLError)
urllib3.disable_warnings(urllib3.exceptions.ProtocolError)
if os.environ.get("SOCKS_URL"):
proxies = {"https": os.environ.get("SOCKS_URL", None), "http": os.environ.get("SOCKS_URL", None)}
else:
proxies = {"https": os.environ.get("HTTPS_PROXY", None), "http": os.environ.get("HTTP_PROXY", None)}
response = requests.get(url, headers=header, proxies=proxies, verify=False)
if response.status_code != 200:
return False
else:
return True
except Exception:
except Exception as e:
logging.error('Exception seeing if url is avaialble: %s', (e))
return False


Expand Down Expand Up @@ -452,7 +534,11 @@ def get_host() -> str:
def get_clusterversion_string() -> str:
"""Returns clusterversion status text on OpenShift, empty string on other distributions"""
try:
custom_objects_api = client.CustomObjectsApi()
# Use the global api_client if available (for SOCKS proxy support)
if 'api_client' in globals() and api_client is not None:
custom_objects_api = client.CustomObjectsApi(api_client=api_client)
else:
custom_objects_api = client.CustomObjectsApi()
cvs = custom_objects_api.list_cluster_custom_object(
"config.openshift.io",
"v1",
Expand Down
5 changes: 0 additions & 5 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ cerberus:
inspect_components: False # Enable it only when OpenShift client is supported to run
# When enabled, cerberus collects logs, events and metrics of failed components

prometheus_url: # The prometheus url/route is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes.
prometheus_bearer_token: # The bearer token is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes. This is needed to authenticate with prometheus.
# This enables Cerberus to query prometheus and alert on observing high Kube API Server latencies.

slack_integration: False # When enabled, cerberus reports the failed iterations in the slack channel
# The following env vars needs to be set: SLACK_API_TOKEN ( Bot User OAuth Access Token ) and SLACK_CHANNEL ( channel to send notifications in case of failures )
# When slack_integration is enabled, a watcher can be assigned for each day. The watcher of the day is tagged while reporting failures in the slack channel. Values are slack member ID's.
watcher_slack_ID: # (NOTE: Defining the watcher id's is optional and when the watcher slack id's are not defined, the slack_team_alias tag is used if it is set else no tag is used while reporting failures in the slack channel.)
Expand Down
4 changes: 0 additions & 4 deletions config/kubernetes_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ cerberus:
inspect_components: False # Enable it only when OpenShift client is supported to run
# When enabled, cerberus collects logs, events and metrics of failed components

prometheus_url: # The prometheus url/route is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes.
prometheus_bearer_token: # The bearer token is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes. This is needed to authenticate with prometheus.
# This enables Cerberus to query prometheus and alert on observing high Kube API Server latencies.

slack_integration: False # When enabled, cerberus reports the failed iterations in the slack channel
# The following env vars needs to be set: SLACK_API_TOKEN ( Bot User OAuth Access Token ) and SLACK_CHANNEL ( channel to send notifications in case of failures )
# When slack_integration is enabled, a watcher can be assigned for each day. The watcher of the day is tagged while reporting failures in the slack channel. Values are slack member ID's.
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ slack_sdk
pyfiglet
prometheus_api_client
coverage
urllib3[socks]
requests[socks]
32 changes: 0 additions & 32 deletions start_cerberus.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import cerberus.invoke.command as runcommand
import cerberus.kubernetes.client as kubecli
import cerberus.slack.slack_client as slackcli
import cerberus.prometheus.client as promcli
import cerberus.database.client as dbcli


Expand Down Expand Up @@ -97,8 +96,6 @@ def main(cfg):
cerberus_publish_status = config["cerberus"].get("cerberus_publish_status", False)
inspect_components = config["cerberus"].get("inspect_components", False)
slack_integration = config["cerberus"].get("slack_integration", False)
prometheus_url = config["cerberus"].get("prometheus_url", "")
prometheus_bearer_token = config["cerberus"].get("prometheus_bearer_token", "")
custom_checks = config["cerberus"].get("custom_checks", [])
iterations = config["tunings"].get("iterations", 0)
sleep_time = config["tunings"].get("sleep_time", 0)
Expand Down Expand Up @@ -201,14 +198,6 @@ def main(cfg):
# Initialize the start iteration to 0
iteration = 0

# Initialize the prometheus client
promcli.initialize_prom_client(distribution, prometheus_url, prometheus_bearer_token)

# Prometheus query to alert on high apiserver latencies
apiserver_latency_query = r"""ALERTS{alertname="KubeAPILatencyHigh", severity="warning"}"""
# Prometheus query to alert when etcd fync duration is high
etcd_leader_changes_query = r"""ALERTS{alertname="etcdHighNumberOfLeaderChanges", severity="warning"}""" # noqa

# Set the number of iterations to loop to infinity if daemon mode is
# enabled or else set it to the provided iterations count in the config

Expand Down Expand Up @@ -459,27 +448,6 @@ def main(cfg):
elif distribution == "kubernetes" and inspect_components:
logging.info("Skipping the failed components inspection as " "it's specific to OpenShift")

# Alert on high latencies
metrics = promcli.process_prom_query(apiserver_latency_query)
if metrics:
logging.warning(
"Kubernetes API server latency is high. "
"More than 99th percentile latency for given requests to the "
"kube-apiserver is above 1 second.\n"
)
logging.info("%s\n" % (metrics))

# Alert on high etcd fync duration
metrics = promcli.process_prom_query(etcd_leader_changes_query)
if metrics:
logging.warning(
"Observed increase in number of etcd leader elections over the last "
"15 minutes. Frequent elections may be a sign of insufficient resources, "
"high network latency, or disruptions by other components and should be "
"investigated.\n"
)
logging.info("%s\n" % (metrics))

# Sleep for the specified duration
logging.info("Sleeping for the specified duration: %s\n" % (sleep_time))
time.sleep(float(sleep_time))
Expand Down