Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ console-link = {file = "../lib/console_link"}
cluster_tools = {editable = true, path = "."}
argparse = "*"
argcomplete = "*"
kubernetes = ">=30.1.0"

[dev-packages]
pytest = "*"
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ name = "pypi"

[packages]
requests = ">=2.32.3"
kubernetes = ">=30.1.0"
boto3 = "*"
pyyaml = "*"
Click = "*"
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
}


BackfillStatus = Enum("BackfillStatus", ["NOT_STARTED", "STARTING", "RUNNING", "STOPPED", "FAILED"])
BackfillStatus = Enum("BackfillStatus", ["NOT_STARTED", "STARTING", "RUNNING", "TERMINATING", "STOPPED", "FAILED"])


class Backfill(ABC):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def archive(self, *args, archive_dir_path: str = None, archive_file_name: str =
archive_dir_path=archive_dir_path,
archive_file_name=archive_file_name)

def get_status(self, deep_check: bool, *args, **kwargs) -> CommandResult:
def get_status(self, deep_check=False, *args, **kwargs) -> CommandResult:
logger.info("Getting status of RFS backfill")
deployment_status = self.kubectl_runner.retrieve_deployment_status()
if not deployment_status:
Expand All @@ -166,6 +166,8 @@ def get_status(self, deep_check: bool, *args, **kwargs) -> CommandResult:
shard_status = None
if shard_status:
status_str += f"\n{shard_status}"
if deployment_status.terminating > 0 and deployment_status.desired == 0:
return CommandResult(True, (BackfillStatus.TERMINATING, status_str))
if deployment_status.running > 0:
return CommandResult(True, (BackfillStatus.RUNNING, status_str))
if deployment_status.pending > 0:
Expand Down Expand Up @@ -209,7 +211,7 @@ def archive(self, *args, archive_dir_path: str = None, archive_file_name: str =
archive_dir_path=archive_dir_path,
archive_file_name=archive_file_name)

def get_status(self, deep_check: bool, *args, **kwargs) -> CommandResult:
def get_status(self, deep_check=False, *args, **kwargs) -> CommandResult:
logger.info(f"Getting status of RFS backfill, with {deep_check=}")
instance_statuses = self.ecs_client.get_instance_statuses()
if not instance_statuses:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import json
import logging

from kubernetes import client, config
from typing import Optional

from console_link.models.command_result import CommandResult
from console_link.models.command_runner import CommandRunner, CommandRunnerError, FlagOnlyArgument
from console_link.models.utils import DeploymentStatus

logger = logging.getLogger(__name__)
Expand All @@ -14,45 +13,55 @@ class KubectlRunner:
def __init__(self, namespace: str, deployment_name: str):
self.namespace = namespace
self.deployment_name = deployment_name
try:
config.load_incluster_config()
except config.ConfigException:
logger.warning("Unable to load in-cluster config, falling back to local kubeconfig")
config.load_kube_config()
self.k8s_core = client.CoreV1Api()
self.k8s_apps = client.AppsV1Api()

def perform_scale_command(self, replicas: int) -> CommandResult:
command = "kubectl"
args = {
"-n": f"{self.namespace}",
"scale": FlagOnlyArgument,
"deployment": f"{self.deployment_name}",
"--replicas": f"{replicas}"
}
command_runner = CommandRunner(command_root=command, command_args=args)
body = {"spec": {"replicas": replicas}}
try:
command_runner.run(print_to_console=False)
self.k8s_apps.patch_namespaced_deployment_scale(name=self.deployment_name, namespace=self.namespace,
body=body)
return CommandResult(True, f"The {self.deployment_name} deployment has been set "
f"to {replicas} desired count.")
except CommandRunnerError as e:
logger.error(f"Performing kubectl command to set replica count failed: {e}")
return CommandResult(success=False, value=f"Kubernetes command failed: {e}")
except Exception as e:
logger.error(f"Error faced when performing k8s patch_namespaced_deployment_scale(): {e}")
return CommandResult(success=False, value=f"Kubernetes action failed: {e}")

def retrieve_deployment_status(self) -> Optional[DeploymentStatus]:
command = "kubectl"
args = {
"-n": f"{self.namespace}",
"get": FlagOnlyArgument,
"deployment": FlagOnlyArgument,
self.deployment_name: FlagOnlyArgument,
"-o": "json"
}
command_runner = CommandRunner(command_root=command, command_args=args)
try:
cmd_result = command_runner.run(print_to_console=False)
json_output = json.loads(cmd_result.output.stdout)
desired = int(json_output["spec"]["replicas"])
running = int(json_output["status"].get("readyReplicas", "0"))
pending = (desired - running) if desired > running else 0
return DeploymentStatus(
running=running,
pending=pending,
desired=desired
)
pods = self.k8s_core.list_namespaced_pod(self.namespace, label_selector=f"app={self.deployment_name}")
except Exception as e:
logger.error(f"Error faced when performing k8s list_namespaced_pod(): {e}")
return None

terminating_pods = 0
running_pods = 0
pending_pods = 0
for pod in pods.items:
if pod.metadata.deletion_timestamp:
terminating_pods += 1
else:
phase = pod.status.phase
if phase == "Running":
running_pods += 1
elif phase == "Pending":
pending_pods += 1

try:
deployment = self.k8s_apps.read_namespaced_deployment(namespace=self.namespace, name=self.deployment_name)
except Exception as e:
logger.error(f"Performing kubectl command to get deployment status failed: {e}")
logger.error(f"Error faced when performing k8s read_namespaced_deployment(): {e}")
return None

desired_pods = deployment.spec.replicas
return DeploymentStatus(
running=running_pods,
pending=pending_pods,
desired=desired_pods,
terminating=terminating_pods
)
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
}


ReplayStatus = Enum("ReplayStatus", ["NOT_STARTED", "STARTING", "RUNNING", "STOPPED", "FAILED"])
ReplayStatus = Enum("ReplayStatus", ["NOT_STARTED", "STARTING", "RUNNING", "TERMINATING", "STOPPED", "FAILED"])


class Replayer(ABC):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ def get_status(self, *args, **kwargs) -> CommandResult:
if not deployment_status:
return CommandResult(False, "Failed to get deployment status for Replayer")
status_str = str(deployment_status)
if deployment_status.terminating > 0 and deployment_status.desired == 0:
return CommandResult(True, (ReplayStatus.TERMINATING, status_str))
if deployment_status.running > 0:
return CommandResult(True, (ReplayStatus.RUNNING, status_str))
if deployment_status.desired > 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ class DeploymentStatus(NamedTuple):
running: int = 0
pending: int = 0
desired: int = 0
terminating: int = 0

def __str__(self):
return f"Running={self.running}\nPending={self.pending}\nDesired={self.desired}"
return f"Running={self.running}\nPending={self.pending}\nDesired={self.desired}\nTerminating={self.terminating}"


class AWSAPIError(Exception):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
version="1.0.0",
description="A Python module to create a console application from a Python script",
packages=find_packages(exclude=("tests")),
install_requires=["requests", "boto3", "pyyaml", "Click", "cerberus"],
install_requires=["requests", "boto3", "pyyaml", "Click", "cerberus", "kubernetes"],
entry_points={
"console_scripts": [
"console = console_link.cli:cli",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from kubernetes import config
import pathlib

import pytest
Expand All @@ -13,6 +14,13 @@
AWS_REGION = "us-east-1"


@pytest.fixture(autouse=True)
def mock_kube_config(monkeypatch):
# Prevent actual config loading
monkeypatch.setattr(config, "load_incluster_config", lambda: None)
monkeypatch.setattr(config, "load_kube_config", lambda: None)


@pytest.fixture
def docker_rfs_backfill() -> DockerRFSBackfill:
docker_rfs_config = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from kubernetes import config
import os
import pathlib
from unittest.mock import ANY
Expand All @@ -20,6 +21,13 @@
TEST_DATA_DIRECTORY = pathlib.Path(__file__).parent / "data"


@pytest.fixture(autouse=True)
def mock_kube_config(monkeypatch):
# Prevent actual config loading
monkeypatch.setattr(config, "load_incluster_config", lambda: None)
monkeypatch.setattr(config, "load_kube_config", lambda: None)


@pytest.fixture
def k8s_rfs_backfill():
k8s_rfs_config = {
Expand Down
Loading
Loading