Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions krkn_ai/cli/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from krkn_ai.utils.fs import read_config_from_file
from krkn_ai.templates.generator import create_krkn_ai_template
from krkn_ai.utils.cluster_manager import ClusterManager
from krkn_ai.recommendation import ScenarioRecommender
from krkn_ai.utils.prometheus import create_prometheus_client


@click.group(context_settings={"show_default": True})
Expand Down Expand Up @@ -201,3 +203,79 @@ def discover(
f.write(template)

logger.info("Saved component configuration to %s", output)
logger.info("Saved component configuration to %s", output)


@main.command(help="Recommend a Krkn chaos scenario based on telemetry")
@click.option(
"--prometheus-url",
help="Prometheus URL to fetch telemetry from.",
envvar="PROMETHEUS_URL",
required=False,
)
@click.option(
"--prometheus-token",
help="Prometheus Token.",
envvar="PROMETHEUS_TOKEN",
required=False,
)
@click.option(
"--kubeconfig",
"-k",
help="Path to cluster kubeconfig file.",
default=os.getenv("KUBECONFIG", None),
)
@click.option(
"--model-path",
"-m",
help="Path to the trained ML model.",
default="krkn_model.pkl",
)
@click.option("-v", "--verbose", count=True, help="Increase verbosity of output.")
@click.pass_context
def recommend(
ctx,
prometheus_url: str,
prometheus_token: str,
kubeconfig: str,
model_path: str,
verbose: int = 0,
):
init_logger(None, verbose >= 2)
logger = get_logger(__name__)

if not os.path.exists(model_path):
logger.error(
f"Model not found at {model_path}. Please train a model first or specify a valid path."
)
exit(1)

# Set env vars for prometheus client creation if provided explicitly
if prometheus_url:
os.environ["PROMETHEUS_URL"] = prometheus_url
if prometheus_token:
os.environ["PROMETHEUS_TOKEN"] = prometheus_token

try:
# Create prometheus client using existing utility
prom_client = create_prometheus_client(kubeconfig)

recommender = ScenarioRecommender(prom_client, model_path)

# Collect data
telemetry_df = recommender.collect_telemetry()

logger.info("Collected Telemetry:\n%s", telemetry_df.to_string(index=False))

# Predict
recommendation = recommender.recommend(telemetry_df)

click.echo(f"\nRecommended Chaos Scenario: {recommendation}")
logger.info(f"Recommendation: {recommendation}")

except PrometheusConnectionError as e:
logger.error(f"Prometheus Connection Error: {e}")
exit(1)
except Exception as e:
logger.exception(f"An error occurred: {e}")
exit(1)
3 changes: 3 additions & 0 deletions krkn_ai/recommendation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .recommender import ScenarioRecommender

__all__ = ["ScenarioRecommender"]
89 changes: 89 additions & 0 deletions krkn_ai/recommendation/recommender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import os
import joblib
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from krkn_ai.utils.logger import get_logger
from krkn_ai.utils.prometheus import KrknPrometheus

logger = get_logger(__name__)

class ScenarioRecommender:
def __init__(self, prom_client: KrknPrometheus, model_path: str = None):
self.prom_client = prom_client
self.model_path = model_path
self.model = None
self.feature_names = ["cpu_usage", "memory_usage", "network_io"]

if model_path and os.path.exists(model_path):
self.load_model(model_path)

def collect_telemetry(self, duration_minutes: int = 15) -> pd.DataFrame:
"""
Collects telemetry data from Prometheus for the last N minutes.
Returns a DataFrame with aggregated metrics.
"""
logger.info("Collecting telemetry data for recommendation...")

# Define queries for basic cluster health/state
queries = {
"cpu_usage": 'avg(cluster:node:cpu:ratio)',
"memory_usage": 'avg(cluster:node:memory:utilization:ratio)',
# Basic network I/O sum across cluster
"network_io": 'sum(rate(container_network_receive_bytes_total[5m]))'
}

data = {}

# We'll just take the current values for now as a snapshot
# In a real system you might want time-series features
for name, query in queries.items():
try:
# We use process_query to get instant vector
result = self.prom_client.process_query(query)
if result and len(result) > 0 and 'value' in result[0]:
# result[0]['value'] is [timestamp, "value"]
val = float(result[0]['value'][1])
data[name] = val
else:
logger.warning(f"No data found for {name}, defaulting to 0")
data[name] = 0.0
except Exception as e:
logger.error(f"Failed to query {name}: {e}")
data[name] = 0.0

return pd.DataFrame([data])

def train(self, X: pd.DataFrame, y: list, save_path: str = None):
"""
Trains the Random Forest model.
"""
logger.info("Training recommendation model...")
self.model = RandomForestClassifier(n_estimators=100, random_state=42)
self.model.fit(X[self.feature_names], y)

if save_path:
self.save_model(save_path)

def recommend(self, telemetry_data: pd.DataFrame) -> str:
"""
Returns a recommended chaos scenario based on telemetry.
"""
if not self.model:
raise Exception("Model not loaded or trained.")

prediction = self.model.predict(telemetry_data[self.feature_names])
return prediction[0]

def save_model(self, path: str):
logger.info(f"Saving model to {path}")
joblib.dump(self.model, path)
self.model_path = path

def load_model(self, path: str):
logger.info(f"Loading model from {path}")
try:
self.model = joblib.load(path)
except Exception as e:
logger.error(f"Failed to load model: {e}")
raise
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ click
krkn-lib@git+https://github.com/krkn-chaos/krkn-lib
pyyaml
seaborn
scikit-learn
79 changes: 79 additions & 0 deletions scripts/train_model.py

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Training the model on randomly generated (rule-based) data feels somewhat meaningless from an ML perspective.
If the training data is synthetic and deterministic, this effectively behaves like a hard-coded rule, rather than a model learning from real behavior.
Additionally, the current feature set includes only three telemetry signals, which makes the model largely blind.

Given the scope of this project, it would be more appropriate to train the recommender using real time cluster telemetry, ideally in a time-series context from a live or representative environment.

You should also refer to this comment by @rh-rahulshetty in a previously open PR.

Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@

import os
import sys
import pandas as pd
import numpy as np

# Add parent directory to path to allow importing krkn_ai
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from krkn_ai.recommendation import ScenarioRecommender
from krkn_ai.utils.logger import init_logger, get_logger

init_logger(None, True)
logger = get_logger("train_model")

def generate_synthetic_data(n_samples=1000):
"""
Generates synthetic telemetry data with labeled chaos scenarios.

Logic:
- High CPU, Low Memory -> cpu-hog
- Low CPU, High Memory -> memory-hog
- High Network -> network-chaos
- Balanced/Normal -> random/pod-delete (as a generic fallback)
"""
data = []
labels = []

for _ in range(n_samples):
# Generate random base metrics
cpu = np.random.uniform(0, 1) # 0 to 100% normalized
memory = np.random.uniform(0, 1) # 0 to 100% normalized
network = np.random.uniform(0, 1000) # MB/s roughly

# Rule-based labeling for "ground truth"
if cpu > 0.8 and memory < 0.5:
label = "cpu-hog"
elif memory > 0.8 and cpu < 0.5:
label = "memory-hog"
elif network > 800:
label = "network-chaos"
else:
# If nothing stands out, maybe suggest checking general resilience
label = "pod-delete"

data.append({
"cpu_usage": cpu,
"memory_usage": memory,
"network_io": network
})
labels.append(label)

return pd.DataFrame(data), labels

def main():
logger.info("Generating synthetic data...")
X, y = generate_synthetic_data()

target_path = "krkn_model.pkl"

# Initialize recommender (mocking prom client as None for training)
recommender = ScenarioRecommender(prom_client=None)

logger.info(f"Training model on {len(X)} samples...")
recommender.train(X, y, save_path=target_path)

logger.info(f"Model saved to {target_path}")

# Test a prediction
test_sample = pd.DataFrame([{
"cpu_usage": 0.95,
"memory_usage": 0.2,
"network_io": 100
}])
prediction = recommender.recommend(test_sample)
logger.info(f"Test Prediction for High CPU: {prediction} (Expected: cpu-hog)")

if __name__ == "__main__":
main()