Skip to content

Commit 090ebb1

Browse files
committed
Add network speed check and distributed runner
Change-Id: I9ebec3fbfbe8c0791062497aa6834c56428742b3
1 parent 54cdd5d commit 090ebb1

File tree

4 files changed

+575
-0
lines changed

4 files changed

+575
-0
lines changed

app/check_network_speed.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import logging
2+
import re
3+
4+
from kubernetes import client
5+
from pydantic import BaseModel, Field
6+
7+
from distributed_runner import DistributedRunner
8+
9+
log = logging.getLogger("check.nodespeed")
10+
11+
default_image = "python:3-alpine"
12+
13+
14+
class CheckNodeSpeedParameters(BaseModel):
15+
namespace: str = "cluster-health"
16+
min_download_mbps: float = Field(alias="minDownloadMbps")
17+
min_upload_mbps: float = Field(alias="minUploadMbps")
18+
image: str = default_image
19+
timeout_seconds: int = Field(300, alias="timeoutSeconds")
20+
21+
22+
class CheckNodeSpeed:
23+
"""
24+
Runs a network speed test on each node in the cluster.
25+
The check creates a Pod that runs on each node one at a time. Each pod
26+
executes 'speedtest-cli --secure', and the results are read from the pod logs.
27+
The download and upload speeds are then compared against user-defined
28+
minimums. The Pod is cleaned up after the check.
29+
"""
30+
31+
def __init__(self, parameters: dict) -> None:
32+
params = CheckNodeSpeedParameters(**parameters)
33+
self.namespace = params.namespace
34+
self.min_download_mbps = params.min_download_mbps
35+
self.min_upload_mbps = params.min_upload_mbps
36+
self.image = params.image
37+
self.timeout_seconds = params.timeout_seconds
38+
self.k8s_core_v1 = client.CoreV1Api()
39+
self.k8s_apps_v1 = client.AppsV1Api()
40+
41+
"""
42+
Creates a parsing function that reads log output from each pod and assesses health check pass/fail.
43+
"""
44+
45+
def parse_pod_logs_from_speed_test_func(self) -> callable:
46+
min_download_mbps = self.min_download_mbps
47+
min_upload_mbps = self.min_upload_mbps
48+
49+
def parse_pod_logs_from_speed_test(logs: str, node_name: str) -> bool:
50+
download_match = re.search(r"Download: ([\d\.]+) Mbit/s", logs)
51+
upload_match = re.search(r"Upload: ([\d\.]+) Mbit/s", logs)
52+
53+
if not download_match or not upload_match:
54+
log.error(
55+
f"Could not parse speed test results on node {node_name}. Logs:\n{logs}"
56+
)
57+
return False
58+
59+
download_speed = float(download_match.group(1))
60+
upload_speed = float(upload_match.group(1))
61+
62+
log.info(
63+
f"Node {node_name}: Download={download_speed:.2f} Mbps, Upload={upload_speed:.2f} Mbps"
64+
)
65+
66+
if download_speed < min_download_mbps:
67+
log.error(
68+
f"Node {node_name} download speed {download_speed:.2f} Mbps is below minimum of {min_download_mbps} Mbps."
69+
)
70+
return False
71+
72+
if upload_speed < min_upload_mbps:
73+
log.error(
74+
f"Node {node_name} upload speed {upload_speed:.2f} Mbps is below minimum of {min_upload_mbps} Mbps."
75+
)
76+
return False
77+
78+
return True
79+
80+
return parse_pod_logs_from_speed_test
81+
...
82+
83+
def is_healthy(self):
84+
if self.image == default_image:
85+
container = client.V1Container(
86+
name="speedtest-container",
87+
image=self.image,
88+
command=["sh", "-c"],
89+
args=[
90+
"apk update && pip3 install speedtest-cli && speedtest-cli --secure"
91+
],
92+
)
93+
else:
94+
container = client.V1Container(
95+
name="speedtest-container",
96+
image=self.image,
97+
)
98+
99+
pod_spec = client.V1PodSpec(
100+
containers=[container],
101+
restart_policy="Never",
102+
termination_grace_period_seconds=5,
103+
)
104+
105+
template = client.V1PodTemplateSpec(
106+
metadata=client.V1ObjectMeta(
107+
labels={"app": "speed-test"}, namespace=self.namespace
108+
),
109+
spec=pod_spec,
110+
)
111+
112+
runner = DistributedRunner(
113+
pod_name_prefix="speedtest",
114+
namespace=self.namespace,
115+
pod_template=template,
116+
timeout_seconds=self.timeout_seconds,
117+
log_parse_function=self.parse_pod_logs_from_speed_test_func(),
118+
)
119+
120+
return runner.run_on_all_nodes()

app/distributed_runner.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import logging
2+
import time
3+
import copy
4+
from kubernetes import client
5+
from kubernetes.client.exceptions import ApiException
6+
from typing import Callable
7+
8+
log = logging.getLogger("DistributedRunner")
9+
10+
11+
class DistributedRunner:
12+
"""
13+
A helper class to run commands within Pods on Kubernetes nodes.
14+
It handles pod creation, waiting for completion, log retrieval, and cleanup.
15+
16+
Args:
17+
pod_name_prefix: A string prefix for the name of the pods that will be
18+
created.
19+
namespace: The Kubernetes namespace in which to create the pods.
20+
pod_template: A V1PodTemplateSpec object that defines the pod to be
21+
created on each node. The runner will use this template to create
22+
a pod and schedule it on a specific node.
23+
timeout_seconds: The maximum time in seconds to wait for a pod to
24+
complete its execution.
25+
log_parse_function: A callable that processes the logs of a completed
26+
pod. It should accept two arguments: the log string and the node
27+
name, and return a boolean indicating whether the check passed.
28+
29+
"""
30+
31+
def __init__(
32+
self,
33+
pod_name_prefix: str,
34+
namespace: str,
35+
pod_template: client.V1PodTemplateSpec,
36+
timeout_seconds: int,
37+
log_parse_function: Callable[[str, str], bool],
38+
):
39+
self.pod_name_prefix = pod_name_prefix
40+
self.namespace = namespace
41+
self.pod_template = pod_template
42+
self.timeout_seconds = timeout_seconds
43+
self.log_parse_function = log_parse_function
44+
self.k8s_core_v1 = client.CoreV1Api()
45+
46+
def _get_nodes_on_cluster(self):
47+
nodes = self.k8s_core_v1.list_node()
48+
return [node.metadata.name for node in nodes.items]
49+
50+
def _get_pod_name_for_node(self, node_name):
51+
return f"{self.pod_name_prefix}-{node_name}"
52+
53+
def _create_pod_on_node(self, node_name):
54+
pod_name = self._get_pod_name_for_node(node_name)
55+
56+
# Deepcopy the spec to avoid modifying the original template in-place.
57+
pod_spec = copy.deepcopy(self.pod_template.spec)
58+
pod_spec.node_name = node_name # Schedule the pod on the specific node.
59+
60+
pod = client.V1Pod(
61+
api_version="v1",
62+
kind="Pod",
63+
metadata=client.V1ObjectMeta(
64+
name=pod_name,
65+
namespace=self.namespace,
66+
labels=self.pod_template.metadata.labels,
67+
),
68+
spec=pod_spec,
69+
)
70+
71+
try:
72+
self.k8s_core_v1.create_namespaced_pod(namespace=self.namespace, body=pod)
73+
74+
log.info(f"Pod {pod_name} created on node {node_name} in namespace {self.namespace}")
75+
except ApiException as e:
76+
log.error(
77+
f"Failed to create Pod {pod_name} on node {node_name}: {e}"
78+
)
79+
raise
80+
81+
def _wait_for_pod_to_complete(self, node_name):
82+
pod_name = self._get_pod_name_for_node(node_name)
83+
84+
start_time = time.time()
85+
while time.time() - start_time < self.timeout_seconds:
86+
try:
87+
pod = self.k8s_core_v1.read_namespaced_pod(
88+
namespace=self.namespace,
89+
name=pod_name,
90+
)
91+
except ApiException as e:
92+
log.error(f"Error reading pod: {e}")
93+
return False
94+
95+
if pod.status.phase == "Succeeded":
96+
return True
97+
elif pod.status.phase in ["Failed", "Unknown"]:
98+
log.error(f"Pod {pod_name} on node {pod.spec.node_name} failed.")
99+
return False
100+
101+
log.info(f"Waiting for pod {pod_name} to complete...")
102+
time.sleep(10)
103+
log.error("Timeout waiting for pods to complete.")
104+
return False
105+
106+
def _get_pod_logs_and_parse(self, node_name):
107+
pod_name = self._get_pod_name_for_node(node_name)
108+
109+
log.info(f"Fetching logs for pod {pod_name} on node {node_name}")
110+
try:
111+
logs = self.k8s_core_v1.read_namespaced_pod_log(
112+
name=pod_name,
113+
namespace=self.namespace,
114+
)
115+
return self.log_parse_function(logs, node_name)
116+
except ApiException as e:
117+
log.error(f"Error reading logs for pod {pod_name} on node {node_name}: {e}")
118+
return False
119+
120+
def _cleanup(self, node):
121+
pod_name = self._get_pod_name_for_node(node)
122+
123+
try:
124+
self.k8s_core_v1.delete_namespaced_pod(
125+
name=pod_name,
126+
namespace=self.namespace,
127+
body=client.V1DeleteOptions(propagation_policy="Foreground"),
128+
)
129+
log.info(f"Pod {pod_name} is deleted.")
130+
except ApiException as e:
131+
if e.status == 404:
132+
log.info(
133+
f"Pod {pod_name} was not found for cleanup, might have been deleted already or failed to create."
134+
)
135+
else:
136+
log.error(f"Failed to delete Pod {pod_name}: {e}")
137+
138+
def run_on_all_nodes(self):
139+
overall_success = True
140+
141+
for node in self._get_nodes_on_cluster():
142+
node_success = True
143+
try:
144+
log.info(f"--- Running on node: {node} ---")
145+
self._create_pod_on_node(node)
146+
147+
# A small delay to allow the pod to be scheduled.
148+
time.sleep(5)
149+
150+
if not self._wait_for_pod_to_complete(node):
151+
log.error(f"Pod on node {node} did not complete successfully.")
152+
node_success = False
153+
154+
if node_success and not self._get_pod_logs_and_parse(node):
155+
log.error(f"Check failed on node {node} after parsing logs.")
156+
node_success = False
157+
elif node_success:
158+
log.info(f"Check passed on node {node}.")
159+
160+
except Exception as e:
161+
log.error(
162+
f"An unexpected error occurred during execution on node {node}: {e}",
163+
exc_info=True,
164+
)
165+
node_success = False
166+
finally:
167+
log.info(f"--- Cleaning up for node: {node} ---")
168+
self._cleanup(node)
169+
if not node_success:
170+
overall_success = False
171+
172+
return overall_success

0 commit comments

Comments
 (0)