Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions krkn_ai/chaos_engines/krkn_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,12 @@ def run(self, scenario: BaseScenario, generation_id: int) -> CommandRunResult:
health_check_watcher.run()

# Run command (show logs when verbose mode is enabled)
# Timeout = wait_duration + 120 seconds buffer for initialization/teardown
execution_timeout = self.config.wait_duration + 120
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should disable it by default and let end-user configure it per their use-case. Krkn Scenarios can take over 240 seconds duration depending on the scenario itself. If we start restricting this here without knowing the user application setup or the cluster details and type of scenarios they plan to run, then we might end up killing Krkn-AI tests half way through the scenario execution itself.

log, returncode = run_shell(
self.process_es_env_string(command, True), do_not_log=not is_verbose()
self.process_es_env_string(command, True),
do_not_log=not is_verbose(),
timeout=execution_timeout,
)

# Extract return code from run log which is part of telemetry data present in the log
Expand All @@ -136,14 +140,22 @@ def run(self, scenario: BaseScenario, generation_id: int) -> CommandRunResult:

# Check if krkn scenario failed due to misconfiguration (non-zero and not status code 2)
# Status code 2 means that SLOs not met per Krkn test (valid failure)
# Return code -1 indicates execution timeout
# Other non-zero status codes indicate misconfiguration errors
if returncode != 0 and returncode != 2:
# Misconfiguration failure - skip fitness calculation and set failure marker
logger.warning(
"Krkn scenario failed with return code %d (misconfiguration). "
"Skipping fitness calculation to avoid data pollution.",
returncode,
)
# Misconfiguration or timeout failure - skip fitness calculation and set failure marker
if returncode == -1:
logger.error(
"Krkn scenario execution timed out. "
"This may indicate Kubernetes API unreachability or network issues. "
"Skipping fitness calculation to avoid data pollution."
)
else:
logger.warning(
"Krkn scenario failed with return code %d (misconfiguration). "
"Skipping fitness calculation to avoid data pollution.",
returncode,
)
if self.config.fitness_function.include_krkn_failure:
fitness_result.krkn_failure_score = -1.0
fitness_result.fitness_score = -1.0
Expand Down
62 changes: 52 additions & 10 deletions krkn_ai/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import shlex
import subprocess
from typing import Iterator
import threading
from typing import Iterator, Optional

from krkn_ai.utils.logger import get_logger

Expand All @@ -14,21 +15,62 @@ def id_generator() -> Iterator[int]:
i += 1


def run_shell(command, do_not_log=False):
def run_shell(command, do_not_log=False, timeout: Optional[int] = None):
"""
Run shell command and get logs and statuscode in output.

Args:
command: Shell command to execute
do_not_log: If True, suppress debug logging of command output
timeout: Maximum execution time in seconds. If None, no timeout is applied.
If the timeout expires, the process is terminated and return code -1 is returned.

Returns:
Tuple of (logs, returncode). On timeout, returncode is -1.
"""
logs = ""
command = shlex.split(command)
command_list = shlex.split(command)
# Let's show the command name being executed
logger.debug("Running command: %s", command[0])
logger.debug("Running command: %s", command_list[0])

process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
command_list, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
for line in process.stdout:
if not do_not_log:
logger.debug("%s", line.rstrip())
logs += line
process.wait()

# Use a timer to enforce timeout since we're reading stdout line-by-line
timed_out = threading.Event()

def kill_on_timeout():
timed_out.set()
logger.error(
"Command timed out after %d seconds, terminating: %s", timeout, command_list[0]
)
process.terminate()
# Give process time to terminate gracefully, then force kill
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
logger.warning("Process did not terminate gracefully, sending SIGKILL")
process.kill()

timer = None
if timeout is not None:
timer = threading.Timer(timeout, kill_on_timeout)
timer.start()

try:
for line in process.stdout:
if not do_not_log:
logger.debug("%s", line.rstrip())
logs += line
process.wait()
finally:
if timer is not None:
timer.cancel()

if timed_out.is_set():
logger.error("Command execution timed out")
return logs, -1

logger.debug("Run Status: %d", process.returncode)
return logs, process.returncode
118 changes: 118 additions & 0 deletions tests/unit/utils/test_run_shell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""
Tests for the run_shell function in krkn_ai.utils
"""

import pytest
import time
from krkn_ai.utils import run_shell


class TestRunShellBasic:
"""Test basic run_shell functionality"""

def test_successful_command(self):
"""Test that a successful command returns output and exit code 0"""
logs, returncode = run_shell("echo 'hello world'")
assert returncode == 0
assert "hello world" in logs

def test_failed_command(self):
"""Test that a failed command returns non-zero exit code"""
logs, returncode = run_shell("ls /nonexistent_directory_12345")
assert returncode != 0

def test_multiline_output(self):
"""Test that multiline output is captured correctly"""
logs, returncode = run_shell("echo -e 'line1\\nline2\\nline3'")
assert returncode == 0
assert "line1" in logs
assert "line2" in logs
assert "line3" in logs


class TestRunShellTimeout:
"""Test run_shell timeout functionality"""

def test_command_completes_before_timeout(self):
"""Test that a fast command completes normally with timeout set"""
logs, returncode = run_shell("echo 'fast'", timeout=10)
assert returncode == 0
assert "fast" in logs

def test_command_times_out(self):
"""Test that a slow command is terminated when timeout expires"""
start_time = time.time()
logs, returncode = run_shell("sleep 30", timeout=2)
elapsed = time.time() - start_time

# Should return -1 on timeout
assert returncode == -1
# Should complete in roughly 2 seconds (with some buffer for termination)
assert elapsed < 10, f"Command took {elapsed} seconds, expected ~2 seconds"

def test_timeout_none_allows_completion(self):
"""Test that timeout=None allows command to complete normally"""
logs, returncode = run_shell("sleep 1", timeout=None)
assert returncode == 0

def test_partial_output_on_timeout(self):
"""Test that partial output is captured even on timeout"""
# Command that continuously outputs then gets killed
# Using a bash script that outputs before sleeping
logs, returncode = run_shell(
"bash -c 'for i in 1 2 3; do echo line$i; done; sleep 30'", timeout=2
)
assert returncode == -1
assert "line1" in logs

def test_zero_timeout_immediately_kills(self):
"""Test that timeout=0 immediately kills the process"""
# Note: timeout=0 means no timeout in the current implementation
# This test documents the behavior
start_time = time.time()
logs, returncode = run_shell("sleep 5", timeout=1)
elapsed = time.time() - start_time

assert returncode == -1
assert elapsed < 5


class TestRunShellLogging:
"""Test run_shell logging behavior"""

def test_do_not_log_suppresses_output(self):
"""Test that do_not_log=True suppresses debug logging"""
# This test mainly verifies the parameter is accepted
logs, returncode = run_shell("echo 'test'", do_not_log=True)
assert returncode == 0
assert "test" in logs

def test_do_not_log_false_allows_output(self):
"""Test that do_not_log=False allows debug logging"""
logs, returncode = run_shell("echo 'test'", do_not_log=False)
assert returncode == 0
assert "test" in logs


class TestRunShellEdgeCases:
"""Test edge cases for run_shell"""

def test_empty_output_command(self):
"""Test command that produces no output"""
logs, returncode = run_shell("true")
assert returncode == 0
assert logs == ""

def test_special_characters_in_output(self):
"""Test that special characters are handled correctly"""
logs, returncode = run_shell("echo 'special: $HOME \"quotes\" `backticks`'")
assert returncode == 0
assert "special:" in logs

def test_large_output(self):
"""Test that large output is captured correctly"""
# Generate 1000 lines of output
logs, returncode = run_shell("seq 1 1000")
assert returncode == 0
assert "1" in logs
assert "1000" in logs