diff --git a/kraken/application_outage/actions.py b/kraken/application_outage/actions.py index 81136970..0bd35a3c 100644 --- a/kraken/application_outage/actions.py +++ b/kraken/application_outage/actions.py @@ -1,18 +1,24 @@ import yaml import logging import time + +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + import kraken.cerberus.setup as cerberus from jinja2 import Template -import kraken.invoke.command as runcommand -from krkn_lib.k8s import KrknKubernetes -from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes from krkn_lib.models.telemetry import ScenarioTelemetry from krkn_lib.utils.functions import get_yaml_item_value, log_exception +from kraken import utils + # Reads the scenario config, applies and deletes a network policy to # block the traffic for the specified duration -def run(scenarios_list, config, wait_duration,kubecli: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]): +def run(scenarios_list, + config, + wait_duration, + telemetry: KrknTelemetryOpenshift, + telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]): failed_post_scenarios = "" scenario_telemetries: list[ScenarioTelemetry] = [] failed_scenarios = [] @@ -20,7 +26,7 @@ def run(scenarios_list, config, wait_duration,kubecli: KrknKubernetes, telemetry scenario_telemetry = ScenarioTelemetry() scenario_telemetry.scenario = app_outage_config scenario_telemetry.start_timestamp = time.time() - telemetry.set_parameters_base64(scenario_telemetry, app_outage_config) + parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, app_outage_config) if len(app_outage_config) > 1: try: with open(app_outage_config, "r") as f: @@ -57,7 +63,7 @@ def run(scenarios_list, config, wait_duration,kubecli: KrknKubernetes, telemetry # Block the traffic by creating network policy logging.info("Creating the network policy") - kubecli.create_net_policy(yaml_spec, namespace) + telemetry.kubecli.create_net_policy(yaml_spec, namespace) # wait for the specified duration logging.info("Waiting for the specified duration in the config: %s" % (duration)) @@ -65,7 +71,7 @@ def run(scenarios_list, config, wait_duration,kubecli: KrknKubernetes, telemetry # unblock the traffic by deleting the network policy logging.info("Deleting the network policy") - kubecli.delete_net_policy("kraken-deny", namespace) + telemetry.kubecli.delete_net_policy("kraken-deny", namespace) logging.info("End of scenario. Waiting for the specified duration: %s" % (wait_duration)) time.sleep(wait_duration) @@ -79,6 +85,16 @@ def run(scenarios_list, config, wait_duration,kubecli: KrknKubernetes, telemetry else: scenario_telemetry.exit_status = 0 scenario_telemetry.end_timestamp = time.time() + utils.collect_and_put_ocp_logs(telemetry, + parsed_scenario_config, + telemetry_request_id, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) + utils.populate_cluster_events(scenario_telemetry, + parsed_scenario_config, + telemetry.kubecli, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) scenario_telemetries.append(scenario_telemetry) return failed_scenarios, scenario_telemetries diff --git a/kraken/arcaflow_plugin/arcaflow_plugin.py b/kraken/arcaflow_plugin/arcaflow_plugin.py index a36f1664..5cd11da8 100644 --- a/kraken/arcaflow_plugin/arcaflow_plugin.py +++ b/kraken/arcaflow_plugin/arcaflow_plugin.py @@ -5,23 +5,47 @@ import logging from pathlib import Path from typing import List + +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + from .context_auth import ContextAuth -from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes from krkn_lib.models.telemetry import ScenarioTelemetry +from .. import utils + -def run(scenarios_list: List[str], kubeconfig_path: str, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]): +def run(scenarios_list: List[str], + telemetry: KrknTelemetryOpenshift, + telemetry_request_id: str + ) -> (list[str], list[ScenarioTelemetry]): scenario_telemetries: list[ScenarioTelemetry] = [] failed_post_scenarios = [] for scenario in scenarios_list: scenario_telemetry = ScenarioTelemetry() scenario_telemetry.scenario = scenario - scenario_telemetry.start_timestamp = time.time() - telemetry.set_parameters_base64(scenario_telemetry,scenario) + start_time = time.time() + scenario_telemetry.start_timestamp = start_time + parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, scenario) engine_args = build_args(scenario) - status_code = run_workflow(engine_args, kubeconfig_path) - scenario_telemetry.end_timestamp = time.time() + status_code = run_workflow(engine_args, telemetry.kubecli.get_kubeconfig_path()) + end_time = time.time() + scenario_telemetry.end_timestamp = end_time scenario_telemetry.exit_status = status_code + + utils.populate_cluster_events(scenario_telemetry, + parsed_scenario_config, + telemetry.kubecli, + int(start_time), + int(end_time)) + + # this is the design proposal for the namespaced logs collection + # check the krkn-lib latest commit to follow also the changes made here + utils.collect_and_put_ocp_logs(telemetry, + parsed_scenario_config, + telemetry_request_id, + int(start_time), + int(end_time)) + scenario_telemetries.append(scenario_telemetry) if status_code != 0: failed_post_scenarios.append(scenario) diff --git a/kraken/network_chaos/actions.py b/kraken/network_chaos/actions.py index 58f1bb05..1fc851d1 100644 --- a/kraken/network_chaos/actions.py +++ b/kraken/network_chaos/actions.py @@ -3,19 +3,26 @@ import time import os import random + +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + import kraken.cerberus.setup as cerberus import kraken.node_actions.common_node_functions as common_node_functions from jinja2 import Environment, FileSystemLoader from krkn_lib.k8s import KrknKubernetes -from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes from krkn_lib.models.telemetry import ScenarioTelemetry from krkn_lib.utils.functions import get_yaml_item_value, log_exception +from kraken import utils + # krkn_lib # Reads the scenario config and introduces traffic variations in Node's host network interface. -def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]): - failed_post_scenarios = "" +def run(scenarios_list, + config, + wait_duration, + telemetry: KrknTelemetryOpenshift, + telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]): logging.info("Runing the Network Chaos tests") failed_post_scenarios = "" scenario_telemetries: list[ScenarioTelemetry] = [] @@ -24,7 +31,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr scenario_telemetry = ScenarioTelemetry() scenario_telemetry.scenario = net_config scenario_telemetry.start_timestamp = time.time() - telemetry.set_parameters_base64(scenario_telemetry, net_config) + parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, net_config) try: with open(net_config, "r") as file: param_lst = ["latency", "loss", "bandwidth"] @@ -56,11 +63,11 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr node_name_list = [test_node] nodelst = [] for single_node_name in node_name_list: - nodelst.extend(common_node_functions.get_node(single_node_name, test_node_label, test_instance_count, kubecli)) + nodelst.extend(common_node_functions.get_node(single_node_name, test_node_label, test_instance_count, telemetry.kubecli)) file_loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__))) env = Environment(loader=file_loader, autoescape=True) pod_template = env.get_template("pod.j2") - test_interface = verify_interface(test_interface, nodelst, pod_template, kubecli) + test_interface = verify_interface(test_interface, nodelst, pod_template, telemetry.kubecli) joblst = [] egress_lst = [i for i in param_lst if i in test_egress] chaos_config = { @@ -86,13 +93,13 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr job_template.render(jobname=i + str(hash(node))[:5], nodename=node, cmd=exec_cmd) ) joblst.append(job_body["metadata"]["name"]) - api_response = kubecli.create_job(job_body) + api_response = telemetry.kubecli.create_job(job_body) if api_response is None: raise Exception("Error creating job") if test_execution == "serial": logging.info("Waiting for serial job to finish") start_time = int(time.time()) - wait_for_job(joblst[:], kubecli, test_duration + 300) + wait_for_job(joblst[:], telemetry.kubecli, test_duration + 300) logging.info("Waiting for wait_duration %s" % wait_duration) time.sleep(wait_duration) end_time = int(time.time()) @@ -102,7 +109,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr if test_execution == "parallel": logging.info("Waiting for parallel job to finish") start_time = int(time.time()) - wait_for_job(joblst[:], kubecli, test_duration + 300) + wait_for_job(joblst[:], telemetry.kubecli, test_duration + 300) logging.info("Waiting for wait_duration %s" % wait_duration) time.sleep(wait_duration) end_time = int(time.time()) @@ -112,13 +119,24 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr raise RuntimeError() finally: logging.info("Deleting jobs") - delete_job(joblst[:], kubecli) + delete_job(joblst[:], telemetry.kubecli) except (RuntimeError, Exception): scenario_telemetry.exit_status = 1 failed_scenarios.append(net_config) log_exception(net_config) else: scenario_telemetry.exit_status = 0 + scenario_telemetry.end_timestamp = time.time() + utils.collect_and_put_ocp_logs(telemetry, + parsed_scenario_config, + telemetry_request_id, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) + utils.populate_cluster_events(scenario_telemetry, + parsed_scenario_config, + telemetry.kubecli, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) scenario_telemetries.append(scenario_telemetry) return failed_scenarios, scenario_telemetries diff --git a/kraken/node_actions/run.py b/kraken/node_actions/run.py index 42771ac3..50214dd9 100644 --- a/kraken/node_actions/run.py +++ b/kraken/node_actions/run.py @@ -2,6 +2,10 @@ import logging import sys import time + +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from kraken import utils from kraken.node_actions.aws_node_scenarios import aws_node_scenarios from kraken.node_actions.general_cloud_node_scenarios import general_node_scenarios from kraken.node_actions.az_node_scenarios import azure_node_scenarios @@ -55,23 +59,27 @@ def get_node_scenario_object(node_scenario, kubecli: KrknKubernetes): # Run defined scenarios # krkn_lib -def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]): +def run(scenarios_list, + config, + wait_duration, + telemetry: KrknTelemetryOpenshift, + telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]): scenario_telemetries: list[ScenarioTelemetry] = [] failed_scenarios = [] for node_scenario_config in scenarios_list: scenario_telemetry = ScenarioTelemetry() scenario_telemetry.scenario = node_scenario_config scenario_telemetry.start_timestamp = time.time() - telemetry.set_parameters_base64(scenario_telemetry, node_scenario_config) + parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, node_scenario_config) with open(node_scenario_config, "r") as f: node_scenario_config = yaml.full_load(f) for node_scenario in node_scenario_config["node_scenarios"]: - node_scenario_object = get_node_scenario_object(node_scenario, kubecli) + node_scenario_object = get_node_scenario_object(node_scenario, telemetry.kubecli) if node_scenario["actions"]: for action in node_scenario["actions"]: start_time = int(time.time()) try: - inject_node_scenario(action, node_scenario, node_scenario_object, kubecli) + inject_node_scenario(action, node_scenario, node_scenario_object, telemetry.kubecli) logging.info("Waiting for the specified duration: %s" % (wait_duration)) time.sleep(wait_duration) end_time = int(time.time()) @@ -85,6 +93,16 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr scenario_telemetry.exit_status = 0 scenario_telemetry.end_timestamp = time.time() + utils.collect_and_put_ocp_logs(telemetry, + parsed_scenario_config, + telemetry_request_id, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) + utils.populate_cluster_events(scenario_telemetry, + parsed_scenario_config, + telemetry.kubecli, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) scenario_telemetries.append(scenario_telemetry) return failed_scenarios, scenario_telemetries diff --git a/kraken/plugins/__init__.py b/kraken/plugins/__init__.py index ea971e53..970deed6 100644 --- a/kraken/plugins/__init__.py +++ b/kraken/plugins/__init__.py @@ -9,9 +9,11 @@ from arcaflow_plugin_kill_pod import kill_pods, wait_for_pods from krkn_lib.k8s import KrknKubernetes from krkn_lib.k8s.pods_monitor_pool import PodsMonitorPool +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift import kraken.plugins.node_scenarios.vmware_plugin as vmware_plugin import kraken.plugins.node_scenarios.ibmcloud_plugin as ibmcloud_plugin +from kraken import utils from kraken.plugins.run_python_plugin import run_python_file from kraken.plugins.network.ingress_shaping import network_chaos from kraken.plugins.pod_network_outage.pod_network_outage_plugin import pod_outage @@ -249,13 +251,12 @@ def json_schema(self): def run(scenarios: List[str], - kubeconfig_path: str, kraken_config: str, failed_post_scenarios: List[str], wait_duration: int, - telemetry: KrknTelemetryKubernetes, - kubecli: KrknKubernetes, - run_uuid: str + telemetry: KrknTelemetryOpenshift, + run_uuid: str, + telemetry_request_id: str, ) -> (List[str], list[ScenarioTelemetry]): scenario_telemetries: list[ScenarioTelemetry] = [] @@ -263,14 +264,14 @@ def run(scenarios: List[str], scenario_telemetry = ScenarioTelemetry() scenario_telemetry.scenario = scenario scenario_telemetry.start_timestamp = time.time() - telemetry.set_parameters_base64(scenario_telemetry, scenario) + parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, scenario) logging.info('scenario ' + str(scenario)) - pool = PodsMonitorPool(kubecli) + pool = PodsMonitorPool(telemetry.kubecli) kill_scenarios = [kill_scenario for kill_scenario in PLUGINS.unserialize_scenario(scenario) if kill_scenario["id"] == "kill-pods"] try: start_monitoring(pool, kill_scenarios) - PLUGINS.run(scenario, kubeconfig_path, kraken_config, run_uuid) + PLUGINS.run(scenario, telemetry.kubecli.get_kubeconfig_path(), kraken_config, run_uuid) result = pool.join() scenario_telemetry.affected_pods = result if result.error: @@ -286,8 +287,19 @@ def run(scenarios: List[str], scenario_telemetry.exit_status = 0 logging.info("Waiting for the specified duration: %s" % (wait_duration)) time.sleep(wait_duration) - scenario_telemetries.append(scenario_telemetry) scenario_telemetry.end_timestamp = time.time() + utils.collect_and_put_ocp_logs(telemetry, + parsed_scenario_config, + telemetry_request_id, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) + utils.populate_cluster_events(scenario_telemetry, + parsed_scenario_config, + telemetry.kubecli, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) + + scenario_telemetries.append(scenario_telemetry) return failed_post_scenarios, scenario_telemetries diff --git a/kraken/pod_scenarios/setup.py b/kraken/pod_scenarios/setup.py index e23e0e5a..38218818 100644 --- a/kraken/pod_scenarios/setup.py +++ b/kraken/pod_scenarios/setup.py @@ -7,15 +7,17 @@ import random import arcaflow_plugin_kill_pod from krkn_lib.k8s.pods_monitor_pool import PodsMonitorPool +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift import kraken.cerberus.setup as cerberus import kraken.post_actions.actions as post_actions from krkn_lib.k8s import KrknKubernetes -from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes from krkn_lib.models.telemetry import ScenarioTelemetry from arcaflow_plugin_sdk import serialization from krkn_lib.utils.functions import get_yaml_item_value, log_exception +from kraken import utils + # Run pod based scenarios def run(kubeconfig_path, scenarios_list, config, failed_post_scenarios, wait_duration): @@ -73,25 +75,26 @@ def run(kubeconfig_path, scenarios_list, config, failed_post_scenarios, wait_dur # krkn_lib -def container_run(kubeconfig_path, +def container_run( scenarios_list, config, failed_post_scenarios, wait_duration, - kubecli: KrknKubernetes, - telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]): + telemetry: KrknTelemetryOpenshift, + telemetry_request_id: str + ) -> (list[str], list[ScenarioTelemetry]): failed_scenarios = [] scenario_telemetries: list[ScenarioTelemetry] = [] - pool = PodsMonitorPool(kubecli) + pool = PodsMonitorPool(telemetry.kubecli) for container_scenario_config in scenarios_list: scenario_telemetry = ScenarioTelemetry() scenario_telemetry.scenario = container_scenario_config[0] scenario_telemetry.start_timestamp = time.time() - telemetry.set_parameters_base64(scenario_telemetry, container_scenario_config[0]) + parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, container_scenario_config[0]) if len(container_scenario_config) > 1: - pre_action_output = post_actions.run(kubeconfig_path, container_scenario_config[1]) + pre_action_output = post_actions.run(telemetry.kubecli.get_kubeconfig_path(), container_scenario_config[1]) else: pre_action_output = "" with open(container_scenario_config[0], "r") as f: @@ -101,7 +104,7 @@ def container_run(kubeconfig_path, # capture start time start_time = int(time.time()) try: - killed_containers = container_killing_in_pod(cont_scenario, kubecli) + killed_containers = container_killing_in_pod(cont_scenario, telemetry.kubecli) logging.info(f"killed containers: {str(killed_containers)}") result = pool.join() if result.error: @@ -125,6 +128,16 @@ def container_run(kubeconfig_path, else: scenario_telemetry.exit_status = 0 scenario_telemetry.end_timestamp = time.time() + utils.populate_cluster_events(scenario_telemetry, + parsed_scenario_config, + telemetry.kubecli, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) + utils.collect_and_put_ocp_logs(telemetry, + parsed_scenario_config, + telemetry_request_id, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) scenario_telemetries.append(scenario_telemetry) return failed_scenarios, scenario_telemetries diff --git a/kraken/pvc/pvc_scenario.py b/kraken/pvc/pvc_scenario.py index 9783a304..2b893655 100644 --- a/kraken/pvc/pvc_scenario.py +++ b/kraken/pvc/pvc_scenario.py @@ -3,15 +3,21 @@ import re import time import yaml +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from .. import utils from ..cerberus import setup as cerberus from krkn_lib.k8s import KrknKubernetes -from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes from krkn_lib.models.telemetry import ScenarioTelemetry from krkn_lib.utils.functions import get_yaml_item_value, log_exception # krkn_lib -def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]): +def run(scenarios_list, + config, + wait_duration, + telemetry: KrknTelemetryOpenshift, + telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]): """ Reads the scenario config and creates a temp file to fill up the PVC """ @@ -22,7 +28,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr scenario_telemetry = ScenarioTelemetry() scenario_telemetry.scenario = app_config scenario_telemetry.start_timestamp = time.time() - telemetry.set_parameters_base64(scenario_telemetry, app_config) + parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, app_config) try: if len(app_config) > 1: with open(app_config, "r") as f: @@ -85,7 +91,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr "pod_name '%s' will be overridden with one of " "the pods mounted in the PVC" % (str(pod_name)) ) - pvc = kubecli.get_pvc_info(pvc_name, namespace) + pvc = telemetry.kubecli.get_pvc_info(pvc_name, namespace) try: # random generator not used for # security/cryptographic purposes. @@ -100,7 +106,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr raise RuntimeError() # Get volume name - pod = kubecli.get_pod_info(name=pod_name, namespace=namespace) + pod = telemetry.kubecli.get_pod_info(name=pod_name, namespace=namespace) if pod is None: logging.error( @@ -117,7 +123,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr if volume.pvcName is not None: volume_name = volume.name pvc_name = volume.pvcName - pvc = kubecli.get_pvc_info(pvc_name, namespace) + pvc = telemetry.kubecli.get_pvc_info(pvc_name, namespace) break if 'pvc' not in locals(): logging.error( @@ -144,7 +150,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr # Get PVC capacity and used bytes command = "df %s -B 1024 | sed 1d" % (str(mount_path)) command_output = ( - kubecli.exec_cmd_in_pod( + telemetry.kubecli.exec_cmd_in_pod( command, pod_name, namespace, @@ -206,7 +212,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr logging.debug( "Create temp file in the PVC command:\n %s" % command ) - kubecli.exec_cmd_in_pod( + telemetry.kubecli.exec_cmd_in_pod( command, pod_name, namespace, @@ -216,7 +222,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr # Check if file is created command = "ls -lh %s" % (str(mount_path)) logging.debug("Check file is created command:\n %s" % command) - response = kubecli.exec_cmd_in_pod( + response = telemetry.kubecli.exec_cmd_in_pod( command, pod_name, namespace, container_name ) logging.info("\n" + str(response)) @@ -238,7 +244,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr container_name, mount_path, file_size_kb, - kubecli + telemetry.kubecli ) # sys.exit(1) raise RuntimeError() @@ -275,14 +281,14 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr logging.debug( "Create temp file in the PVC command:\n %s" % command ) - kubecli.exec_cmd_in_pod( + telemetry.kubecli.exec_cmd_in_pod( command, pod_name, namespace, container_name ) # Check if file is created command = "ls -lh %s" % (str(mount_path)) logging.debug("Check file is created command:\n %s" % command) - response = kubecli.exec_cmd_in_pod( + response = telemetry.kubecli.exec_cmd_in_pod( command, pod_name, namespace, container_name ) logging.info("\n" + str(response)) @@ -303,7 +309,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr container_name, mount_path, file_size_kb, - kubecli + telemetry.kubecli ) logging.info("End of scenario. Waiting for the specified duration: %s" % (wait_duration)) time.sleep(wait_duration) @@ -321,6 +327,18 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr log_exception(app_config) else: scenario_telemetry.exit_status = 0 + + scenario_telemetry.end_timestamp = time.time() + utils.collect_and_put_ocp_logs(telemetry, + parsed_scenario_config, + telemetry_request_id, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) + utils.populate_cluster_events(scenario_telemetry, + parsed_scenario_config, + telemetry.kubecli, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) scenario_telemetries.append(scenario_telemetry) return failed_scenarios, scenario_telemetries diff --git a/kraken/service_disruption/common_service_disruption_functions.py b/kraken/service_disruption/common_service_disruption_functions.py index 6ca98592..80e9474f 100644 --- a/kraken/service_disruption/common_service_disruption_functions.py +++ b/kraken/service_disruption/common_service_disruption_functions.py @@ -1,14 +1,18 @@ import time import random import logging + +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + import kraken.cerberus.setup as cerberus import kraken.post_actions.actions as post_actions import yaml from krkn_lib.k8s import KrknKubernetes -from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes from krkn_lib.models.telemetry import ScenarioTelemetry from krkn_lib.utils.functions import get_yaml_item_value, log_exception +from kraken import utils + def delete_objects(kubecli, namespace): @@ -156,9 +160,8 @@ def run( config, wait_duration, failed_post_scenarios, - kubeconfig_path, - kubecli: KrknKubernetes, - telemetry: KrknTelemetryKubernetes + telemetry: KrknTelemetryOpenshift, + telemetry_request_id: str ) -> (list[str], list[ScenarioTelemetry]): scenario_telemetries: list[ScenarioTelemetry] = [] failed_scenarios = [] @@ -166,10 +169,10 @@ def run( scenario_telemetry = ScenarioTelemetry() scenario_telemetry.scenario = scenario_config[0] scenario_telemetry.start_timestamp = time.time() - telemetry.set_parameters_base64(scenario_telemetry, scenario_config[0]) + parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, scenario_config[0]) try: if len(scenario_config) > 1: - pre_action_output = post_actions.run(kubeconfig_path, scenario_config[1]) + pre_action_output = post_actions.run(telemetry.kubecli.get_kubeconfig_path(), scenario_config[1]) else: pre_action_output = "" with open(scenario_config[0], "r") as f: @@ -206,7 +209,7 @@ def run( start_time = int(time.time()) for i in range(run_count): killed_namespaces = {} - namespaces = kubecli.check_namespaces([scenario_namespace], scenario_label) + namespaces = telemetry.kubecli.check_namespaces([scenario_namespace], scenario_label) for j in range(delete_count): if len(namespaces) == 0: logging.error( @@ -220,7 +223,7 @@ def run( logging.info('Delete objects in selected namespace: ' + selected_namespace ) try: # delete all pods in namespace - objects = delete_objects(kubecli,selected_namespace) + objects = delete_objects(telemetry.kubecli,selected_namespace) killed_namespaces[selected_namespace] = objects logging.info("Deleted all objects in namespace %s was successful" % str(selected_namespace)) except Exception as e: @@ -236,7 +239,7 @@ def run( if len(scenario_config) > 1: try: failed_post_scenarios = post_actions.check_recovery( - kubeconfig_path, scenario_config, failed_post_scenarios, pre_action_output + telemetry.kubecli.get_kubeconfig_path(), scenario_config, failed_post_scenarios, pre_action_output ) except Exception as e: logging.error("Failed to run post action checks: %s" % e) @@ -244,7 +247,7 @@ def run( # sys.exit(1) raise RuntimeError() else: - failed_post_scenarios = check_all_running_deployment(killed_namespaces, wait_time, kubecli) + failed_post_scenarios = check_all_running_deployment(killed_namespaces, wait_time, telemetry.kubecli) end_time = int(time.time()) cerberus.publish_kraken_status(config, failed_post_scenarios, start_time, end_time) @@ -255,6 +258,16 @@ def run( else: scenario_telemetry.exit_status = 0 scenario_telemetry.end_timestamp = time.time() + utils.collect_and_put_ocp_logs(telemetry, + parsed_scenario_config, + telemetry_request_id, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) + utils.populate_cluster_events(scenario_telemetry, + parsed_scenario_config, + telemetry.kubecli, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) scenario_telemetries.append(scenario_telemetry) return failed_scenarios, scenario_telemetries diff --git a/kraken/service_hijacking/service_hijacking.py b/kraken/service_hijacking/service_hijacking.py index ecd1e890..3f5ca1ba 100644 --- a/kraken/service_hijacking/service_hijacking.py +++ b/kraken/service_hijacking/service_hijacking.py @@ -1,20 +1,26 @@ import logging import time - import yaml -from krkn_lib.k8s import KrknKubernetes + from krkn_lib.models.telemetry import ScenarioTelemetry -from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift +from krkn_lib.utils import log_exception + +from kraken import utils + +def run(scenarios_list: list[str], + wait_duration: int, + telemetry: KrknTelemetryOpenshift, + telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]): -def run(scenarios_list: list[str],wait_duration: int, krkn_lib: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]): - scenario_telemetries= list[ScenarioTelemetry]() + scenario_telemetries = list[ScenarioTelemetry]() failed_post_scenarios = [] for scenario in scenarios_list: scenario_telemetry = ScenarioTelemetry() scenario_telemetry.scenario = scenario scenario_telemetry.start_timestamp = time.time() - telemetry.set_parameters_base64(scenario_telemetry, scenario) + parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, scenario) with open(scenario) as stream: scenario_config = yaml.safe_load(stream) @@ -26,9 +32,9 @@ def run(scenarios_list: list[str],wait_duration: int, krkn_lib: KrknKubernetes, chaos_duration = scenario_config["chaos_duration"] logging.info(f"checking service {service_name} in namespace: {service_namespace}") - if not krkn_lib.service_exists(service_name, service_namespace): + if not telemetry.kubecli.service_exists(service_name, service_namespace): logging.error(f"service: {service_name} not found in namespace: {service_namespace}, failed to run scenario.") - fail(scenario_telemetry, scenario_telemetries) + fail_scenario_telemetry(scenario_telemetry) failed_post_scenarios.append(scenario) break try: @@ -37,18 +43,18 @@ def run(scenarios_list: list[str],wait_duration: int, krkn_lib: KrknKubernetes, # both named ports and port numbers can be used if isinstance(target_port, int): logging.info(f"webservice will listen on port {target_port}") - webservice = krkn_lib.deploy_service_hijacking(service_namespace, plan, image, port_number=target_port) + webservice = telemetry.kubecli.deploy_service_hijacking(service_namespace, plan, image, port_number=target_port) else: logging.info(f"traffic will be redirected to named port: {target_port}") - webservice = krkn_lib.deploy_service_hijacking(service_namespace, plan, image, port_name=target_port) + webservice = telemetry.kubecli.deploy_service_hijacking(service_namespace, plan, image, port_name=target_port) logging.info(f"successfully deployed pod: {webservice.pod_name} " f"in namespace:{service_namespace} with selector {webservice.selector}!" ) logging.info(f"patching service: {service_name} to hijack traffic towards: {webservice.pod_name}") - original_service = krkn_lib.replace_service_selector([webservice.selector], service_name, service_namespace) + original_service = telemetry.kubecli.replace_service_selector([webservice.selector], service_name, service_namespace) if original_service is None: logging.error(f"failed to patch service: {service_name}, namespace: {service_namespace} with selector {webservice.selector}") - fail(scenario_telemetry, scenario_telemetries) + fail_scenario_telemetry(scenario_telemetry) failed_post_scenarios.append(scenario) break @@ -58,33 +64,40 @@ def run(scenarios_list: list[str],wait_duration: int, krkn_lib: KrknKubernetes, time.sleep(chaos_duration) selectors = ["=".join([key, original_service["spec"]["selector"][key]]) for key in original_service["spec"]["selector"].keys()] logging.info(f"restoring the service selectors {selectors}") - original_service = krkn_lib.replace_service_selector(selectors, service_name, service_namespace) + original_service = telemetry.kubecli.replace_service_selector(selectors, service_name, service_namespace) if original_service is None: logging.error(f"failed to restore original service: {service_name}, namespace: {service_namespace} with selectors: {selectors}") - fail(scenario_telemetry, scenario_telemetries) + fail_scenario_telemetry(scenario_telemetry) failed_post_scenarios.append(scenario) break logging.info("selectors successfully restored") logging.info("undeploying service-hijacking resources...") - krkn_lib.undeploy_service_hijacking(webservice) + telemetry.kubecli.undeploy_service_hijacking(webservice) logging.info("End of scenario. Waiting for the specified duration: %s" % (wait_duration)) time.sleep(wait_duration) - scenario_telemetry.exit_status = 0 - scenario_telemetry.end_timestamp = time.time() - scenario_telemetries.append(scenario_telemetry) logging.info("success") except Exception as e: logging.error(f"scenario {scenario} failed with exception: {e}") - fail(scenario_telemetry, scenario_telemetries) - failed_post_scenarios.append(scenario) + fail_scenario_telemetry(scenario_telemetry) + log_exception(scenario) - return failed_post_scenarios, scenario_telemetries + scenario_telemetry.end_timestamp = time.time() + utils.collect_and_put_ocp_logs(telemetry, + parsed_scenario_config, + telemetry_request_id, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) + utils.populate_cluster_events(scenario_telemetry, + parsed_scenario_config, + telemetry.kubecli, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) + scenario_telemetries.append(scenario_telemetry) + return failed_post_scenarios, scenario_telemetries -def fail(scenario_telemetry: ScenarioTelemetry, scenario_telemetries: list[ScenarioTelemetry]): +def fail_scenario_telemetry(scenario_telemetry: ScenarioTelemetry): scenario_telemetry.exit_status = 1 - scenario_telemetry.end_timestamp = time.time() - scenario_telemetries.append(scenario_telemetry) - + scenario_telemetry.end_timestamp = time.time() \ No newline at end of file diff --git a/kraken/shut_down/common_shut_down_func.py b/kraken/shut_down/common_shut_down_func.py index 09e09808..d89f85b7 100644 --- a/kraken/shut_down/common_shut_down_func.py +++ b/kraken/shut_down/common_shut_down_func.py @@ -3,6 +3,10 @@ import logging import time from multiprocessing.pool import ThreadPool + +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from .. import utils from ..cerberus import setup as cerberus from ..post_actions import actions as post_actions from ..node_actions.aws_node_scenarios import AWS @@ -10,7 +14,6 @@ from ..node_actions.az_node_scenarios import Azure from ..node_actions.gcp_node_scenarios import GCP from krkn_lib.k8s import KrknKubernetes -from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes from krkn_lib.models.telemetry import ScenarioTelemetry from krkn_lib.utils.functions import log_exception @@ -134,7 +137,11 @@ def cluster_shut_down(shut_down_config, kubecli: KrknKubernetes): # krkn_lib -def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]): +def run(scenarios_list, + config, + wait_duration, + telemetry: KrknTelemetryOpenshift, + telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]): failed_post_scenarios = [] failed_scenarios = [] scenario_telemetries: list[ScenarioTelemetry] = [] @@ -153,7 +160,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr scenario_telemetry = ScenarioTelemetry() scenario_telemetry.scenario = config_path scenario_telemetry.start_timestamp = time.time() - telemetry.set_parameters_base64(scenario_telemetry, config_path) + parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, config_path) with open(config_path, "r") as f: shut_down_config_yaml = yaml.full_load(f) @@ -161,7 +168,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr shut_down_config_yaml["cluster_shut_down_scenario"] start_time = int(time.time()) try: - cluster_shut_down(shut_down_config_scenario, kubecli) + cluster_shut_down(shut_down_config_scenario, telemetry.kubecli) logging.info( "Waiting for the specified duration: %s" % (wait_duration) ) @@ -185,6 +192,16 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr scenario_telemetry.exit_status = 0 scenario_telemetry.end_timestamp = time.time() + utils.collect_and_put_ocp_logs(telemetry, + parsed_scenario_config, + telemetry_request_id, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) + utils.populate_cluster_events(scenario_telemetry, + parsed_scenario_config, + telemetry.kubecli, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) scenario_telemetries.append(scenario_telemetry) return failed_scenarios, scenario_telemetries diff --git a/kraken/syn_flood/syn_flood.py b/kraken/syn_flood/syn_flood.py index 62c4e339..4036388d 100644 --- a/kraken/syn_flood/syn_flood.py +++ b/kraken/syn_flood/syn_flood.py @@ -8,31 +8,37 @@ from krkn_lib.k8s import KrknKubernetes from krkn_lib.models.telemetry import ScenarioTelemetry from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift +from kraken import utils -def run(scenarios_list: list[str], krkn_kubernetes: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]): + +def run(scenarios_list: list[str], + telemetry: KrknTelemetryOpenshift, + telemetry_request_id: str + ) -> (list[str], list[ScenarioTelemetry]): scenario_telemetries: list[ScenarioTelemetry] = [] failed_post_scenarios = [] for scenario in scenarios_list: scenario_telemetry = ScenarioTelemetry() scenario_telemetry.scenario = scenario scenario_telemetry.start_timestamp = time.time() - telemetry.set_parameters_base64(scenario_telemetry, scenario) + parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, scenario) try: pod_names = [] config = parse_config(scenario) if config["target-service-label"]: - target_services = krkn_kubernetes.select_service_by_label(config["namespace"], config["target-service-label"]) + target_services = telemetry.kubecli.select_service_by_label(config["namespace"], config["target-service-label"]) else: target_services = [config["target-service"]] for target in target_services: - if not krkn_kubernetes.service_exists(target, config["namespace"]): + if not telemetry.kubecli.service_exists(target, config["namespace"]): raise Exception(f"{target} service not found") for i in range(config["number-of-pods"]): pod_name = "syn-flood-" + krkn_lib.utils.get_random_string(10) - krkn_kubernetes.deploy_syn_flood(pod_name, + telemetry.kubecli.deploy_syn_flood(pod_name, config["namespace"], config["image"], target, @@ -49,7 +55,7 @@ def run(scenarios_list: list[str], krkn_kubernetes: KrknKubernetes, telemetry: K finished_pods = [] while not did_finish: for pod_name in pod_names: - if not krkn_kubernetes.is_pod_running(pod_name, config["namespace"]): + if not telemetry.kubecli.is_pod_running(pod_name, config["namespace"]): finished_pods.append(pod_name) if set(pod_names) == set(finished_pods): did_finish = True @@ -62,6 +68,16 @@ def run(scenarios_list: list[str], krkn_kubernetes: KrknKubernetes, telemetry: K else: scenario_telemetry.exit_status = 0 scenario_telemetry.end_timestamp = time.time() + utils.collect_and_put_ocp_logs(telemetry, + parsed_scenario_config, + telemetry_request_id, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) + utils.populate_cluster_events(scenario_telemetry, + parsed_scenario_config, + telemetry.kubecli, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) scenario_telemetries.append(scenario_telemetry) return failed_post_scenarios, scenario_telemetries diff --git a/kraken/time_actions/common_time_functions.py b/kraken/time_actions/common_time_functions.py index 9b2b7cef..8c9f039d 100644 --- a/kraken/time_actions/common_time_functions.py +++ b/kraken/time_actions/common_time_functions.py @@ -6,12 +6,12 @@ import yaml import random -from krkn_lib import utils +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift from kubernetes.client import ApiException +from .. import utils from ..cerberus import setup as cerberus from krkn_lib.k8s import KrknKubernetes -from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes from krkn_lib.models.telemetry import ScenarioTelemetry from krkn_lib.utils.functions import get_yaml_item_value, log_exception, get_random_string @@ -348,21 +348,25 @@ def check_date_time(object_type, names, kubecli:KrknKubernetes): # krkn_lib -def run(scenarios_list, config, wait_duration, kubecli:KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]): +def run(scenarios_list, + config, + wait_duration, + telemetry: KrknTelemetryOpenshift, + telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]): failed_scenarios = [] scenario_telemetries: list[ScenarioTelemetry] = [] for time_scenario_config in scenarios_list: scenario_telemetry = ScenarioTelemetry() scenario_telemetry.scenario = time_scenario_config scenario_telemetry.start_timestamp = time.time() - telemetry.set_parameters_base64(scenario_telemetry, time_scenario_config) + parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, time_scenario_config) try: with open(time_scenario_config, "r") as f: scenario_config = yaml.full_load(f) for time_scenario in scenario_config["time_scenarios"]: start_time = int(time.time()) - object_type, object_names = skew_time(time_scenario, kubecli) - not_reset = check_date_time(object_type, object_names, kubecli) + object_type, object_names = skew_time(time_scenario, telemetry.kubecli) + not_reset = check_date_time(object_type, object_names, telemetry.kubecli) if len(not_reset) > 0: logging.info("Object times were not reset") logging.info( @@ -383,6 +387,16 @@ def run(scenarios_list, config, wait_duration, kubecli:KrknKubernetes, telemetry else: scenario_telemetry.exit_status = 0 scenario_telemetry.end_timestamp = time.time() + utils.collect_and_put_ocp_logs(telemetry, + parsed_scenario_config, + telemetry_request_id, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) + utils.populate_cluster_events(scenario_telemetry, + parsed_scenario_config, + telemetry.kubecli, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) scenario_telemetries.append(scenario_telemetry) return failed_scenarios, scenario_telemetries diff --git a/kraken/utils/__init__.py b/kraken/utils/__init__.py index cd69230f..2d947a54 100644 --- a/kraken/utils/__init__.py +++ b/kraken/utils/__init__.py @@ -1 +1,2 @@ from .TeeLogHandler import TeeLogHandler +from .functions import * diff --git a/kraken/utils/functions.py b/kraken/utils/functions.py new file mode 100644 index 00000000..222283ff --- /dev/null +++ b/kraken/utils/functions.py @@ -0,0 +1,60 @@ +import krkn_lib.utils +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.models.telemetry import ScenarioTelemetry +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift +from tzlocal.unix import get_localzone + + +def populate_cluster_events(scenario_telemetry: ScenarioTelemetry, + scenario_config: dict, + kubecli: KrknKubernetes, + start_timestamp: int, + end_timestamp: int + ): + events = [] + namespaces = __retrieve_namespaces(scenario_config, kubecli) + + if len(namespaces) == 0: + events.extend(kubecli.collect_and_parse_cluster_events(start_timestamp, end_timestamp, str(get_localzone()))) + else: + for namespace in namespaces: + events.extend(kubecli.collect_and_parse_cluster_events(start_timestamp, end_timestamp, str(get_localzone()), + namespace=namespace)) + + scenario_telemetry.set_cluster_events(events) + + +def collect_and_put_ocp_logs(telemetry_ocp: KrknTelemetryOpenshift, + scenario_config: dict, + request_id: str, + start_timestamp: int, + end_timestamp: int, + ): + if ( + telemetry_ocp.krkn_telemetry_config and + telemetry_ocp.krkn_telemetry_config["enabled"] and + telemetry_ocp.krkn_telemetry_config["logs_backup"] and + not telemetry_ocp.kubecli.is_kubernetes() + ): + namespaces = __retrieve_namespaces(scenario_config, telemetry_ocp.kubecli) + if len(namespaces) > 0: + for namespace in namespaces: + telemetry_ocp.put_ocp_logs(request_id, + telemetry_ocp.krkn_telemetry_config, + start_timestamp, + end_timestamp, + namespace) + else: + telemetry_ocp.put_ocp_logs(request_id, + telemetry_ocp.krkn_telemetry_config, + start_timestamp, + end_timestamp) + + +def __retrieve_namespaces(scenario_config: dict, kubecli: KrknKubernetes) -> set[str]: + namespaces = list() + namespaces.extend(krkn_lib.utils.deep_get_attribute("namespace", scenario_config)) + namespace_patterns = krkn_lib.utils.deep_get_attribute("namespace_pattern", scenario_config) + for pattern in namespace_patterns: + namespaces.extend(kubecli.list_namespaces_by_regex(pattern)) + return set(namespaces) diff --git a/kraken/zone_outage/actions.py b/kraken/zone_outage/actions.py index 13cd3a34..7e42375b 100644 --- a/kraken/zone_outage/actions.py +++ b/kraken/zone_outage/actions.py @@ -1,13 +1,20 @@ import yaml import logging import time + +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from .. import utils from ..node_actions.aws_node_scenarios import AWS from ..cerberus import setup as cerberus -from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes from krkn_lib.models.telemetry import ScenarioTelemetry from krkn_lib.utils.functions import log_exception -def run(scenarios_list, config, wait_duration, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]) : +def run(scenarios_list, + config, + wait_duration, + telemetry: KrknTelemetryOpenshift, + telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]) : """ filters the subnet of interest and applies the network acl to create zone outage @@ -20,7 +27,7 @@ def run(scenarios_list, config, wait_duration, telemetry: KrknTelemetryKubernete scenario_telemetry = ScenarioTelemetry() scenario_telemetry.scenario = zone_outage_config scenario_telemetry.start_timestamp = time.time() - telemetry.set_parameters_base64(scenario_telemetry, zone_outage_config) + parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, zone_outage_config) try: if len(zone_outage_config) > 1: with open(zone_outage_config, "r") as f: @@ -116,6 +123,16 @@ def run(scenarios_list, config, wait_duration, telemetry: KrknTelemetryKubernete else: scenario_telemetry.exit_status = 0 scenario_telemetry.end_timestamp = time.time() + utils.collect_and_put_ocp_logs(telemetry, + parsed_scenario_config, + telemetry_request_id, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) + utils.populate_cluster_events(scenario_telemetry, + parsed_scenario_config, + telemetry.kubecli, + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp)) scenario_telemetries.append(scenario_telemetry) return failed_scenarios, scenario_telemetries diff --git a/requirements.txt b/requirements.txt index 6aa670c0..b44f8819 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,7 @@ google-api-python-client==2.116.0 ibm_cloud_sdk_core==3.18.0 ibm_vpc==0.20.0 jinja2==3.1.4 -krkn-lib==3.0.0 +krkn-lib==3.1.0 lxml==5.1.0 kubernetes==28.1.0 numpy==1.26.4 diff --git a/run_kraken.py b/run_kraken.py index 5954836f..db8c4626 100644 --- a/run_kraken.py +++ b/run_kraken.py @@ -14,6 +14,8 @@ from krkn_lib.models.elastic import ElasticChaosRunTelemetry from krkn_lib.models.krkn import ChaosRunOutput, ChaosRunAlertSummary from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus +from tzlocal.unix import get_localzone + import kraken.time_actions.common_time_functions as time_actions import kraken.performance_dashboards.setup as performance_dashboards import kraken.pod_scenarios.setup as pod_scenarios @@ -235,8 +237,8 @@ def main(cfg) -> int: logging.info("Cluster version CRD not detected, skipping") # KrknTelemetry init - telemetry_k8s = KrknTelemetryKubernetes(safe_logger, kubecli) - telemetry_ocp = KrknTelemetryOpenshift(safe_logger, ocpcli) + telemetry_k8s = KrknTelemetryKubernetes(safe_logger, kubecli, config["telemetry"]) + telemetry_ocp = KrknTelemetryOpenshift(safe_logger, ocpcli, config["telemetry"]) if enable_elastic: elastic_search = KrknElastic(safe_logger, elastic_url, @@ -315,33 +317,33 @@ def main(cfg) -> int: return 1 elif scenario_type == "arcaflow_scenarios": failed_post_scenarios, scenario_telemetries = arcaflow_plugin.run( - scenarios_list, kubeconfig_path, telemetry_k8s + scenarios_list, + telemetry_ocp, + telemetry_request_id ) chaos_telemetry.scenarios.extend(scenario_telemetries) elif scenario_type == "plugin_scenarios": failed_post_scenarios, scenario_telemetries = plugins.run( scenarios_list, - kubeconfig_path, kraken_config, failed_post_scenarios, wait_duration, - telemetry_k8s, - kubecli, - run_uuid + telemetry_ocp, + run_uuid, + telemetry_request_id ) chaos_telemetry.scenarios.extend(scenario_telemetries) # krkn_lib elif scenario_type == "container_scenarios": logging.info("Running container scenarios") failed_post_scenarios, scenario_telemetries = pod_scenarios.container_run( - kubeconfig_path, scenarios_list, config, failed_post_scenarios, wait_duration, - kubecli, - telemetry_k8s + telemetry_ocp, + telemetry_request_id ) chaos_telemetry.scenarios.extend(scenario_telemetries) @@ -349,14 +351,21 @@ def main(cfg) -> int: # krkn_lib elif scenario_type == "node_scenarios": logging.info("Running node scenarios") - failed_post_scenarios, scenario_telemetries = nodeaction.run(scenarios_list, config, wait_duration, kubecli, telemetry_k8s) + failed_post_scenarios, scenario_telemetries = nodeaction.run(scenarios_list, + config, + wait_duration, + telemetry_ocp, + telemetry_request_id) chaos_telemetry.scenarios.extend(scenario_telemetries) # Inject managedcluster chaos scenarios specified in the config # krkn_lib elif scenario_type == "managedcluster_scenarios": logging.info("Running managedcluster scenarios") managedcluster_scenarios.run( - scenarios_list, config, wait_duration, kubecli + scenarios_list, + config, + wait_duration, + kubecli ) # Inject time skew chaos scenarios specified @@ -364,12 +373,22 @@ def main(cfg) -> int: # krkn_lib elif scenario_type == "time_scenarios": logging.info("Running time skew scenarios") - failed_post_scenarios, scenario_telemetries = time_actions.run(scenarios_list, config, wait_duration, kubecli, telemetry_k8s) + failed_post_scenarios, scenario_telemetries = time_actions.run(scenarios_list, + config, + wait_duration, + telemetry_ocp, + telemetry_request_id + ) chaos_telemetry.scenarios.extend(scenario_telemetries) # Inject cluster shutdown scenarios # krkn_lib elif scenario_type == "cluster_shut_down_scenarios": - failed_post_scenarios, scenario_telemetries = shut_down.run(scenarios_list, config, wait_duration, kubecli, telemetry_k8s) + failed_post_scenarios, scenario_telemetries = shut_down.run(scenarios_list, + config, + wait_duration, + telemetry_ocp, + telemetry_request_id + ) chaos_telemetry.scenarios.extend(scenario_telemetries) # Inject namespace chaos scenarios @@ -381,43 +400,69 @@ def main(cfg) -> int: config, wait_duration, failed_post_scenarios, - kubeconfig_path, - kubecli, - telemetry_k8s + telemetry_ocp, + telemetry_request_id ) chaos_telemetry.scenarios.extend(scenario_telemetries) # Inject zone failures elif scenario_type == "zone_outages": logging.info("Inject zone outages") - failed_post_scenarios, scenario_telemetries = zone_outages.run(scenarios_list, config, wait_duration, telemetry_k8s) + failed_post_scenarios, scenario_telemetries = zone_outages.run(scenarios_list, + config, + wait_duration, + telemetry_ocp, + telemetry_request_id + ) chaos_telemetry.scenarios.extend(scenario_telemetries) # Application outages elif scenario_type == "application_outages": logging.info("Injecting application outage") failed_post_scenarios, scenario_telemetries = application_outage.run( - scenarios_list, config, wait_duration, kubecli, telemetry_k8s) + scenarios_list, + config, + wait_duration, + telemetry_ocp, + telemetry_request_id + ) chaos_telemetry.scenarios.extend(scenario_telemetries) # PVC scenarios # krkn_lib elif scenario_type == "pvc_scenarios": logging.info("Running PVC scenario") - failed_post_scenarios, scenario_telemetries = pvc_scenario.run(scenarios_list, config, wait_duration, kubecli, telemetry_k8s) + failed_post_scenarios, scenario_telemetries = pvc_scenario.run(scenarios_list, + config, + wait_duration, + telemetry_ocp, + telemetry_request_id + ) chaos_telemetry.scenarios.extend(scenario_telemetries) # Network scenarios # krkn_lib elif scenario_type == "network_chaos": logging.info("Running Network Chaos") - failed_post_scenarios, scenario_telemetries = network_chaos.run(scenarios_list, config, wait_duration, kubecli, telemetry_k8s) + failed_post_scenarios, scenario_telemetries = network_chaos.run(scenarios_list, + config, + wait_duration, + telemetry_ocp, + telemetry_request_id + ) elif scenario_type == "service_hijacking": logging.info("Running Service Hijacking Chaos") - failed_post_scenarios, scenario_telemetries = service_hijacking_plugin.run(scenarios_list, wait_duration, kubecli, telemetry_k8s) + failed_post_scenarios, scenario_telemetries = service_hijacking_plugin.run(scenarios_list, + wait_duration, + telemetry_ocp, + telemetry_request_id + ) chaos_telemetry.scenarios.extend(scenario_telemetries) elif scenario_type == "syn_flood": logging.info("Running Syn Flood Chaos") - failed_post_scenarios, scenario_telemetries = syn_flood.run(scenarios_list, kubecli, telemetry_k8s) + failed_post_scenarios, scenario_telemetries = syn_flood.run(scenarios_list, + telemetry_ocp, + telemetry_request_id + ) chaos_telemetry.scenarios.extend(scenario_telemetries) # Check for critical alerts when enabled @@ -454,7 +499,8 @@ def main(cfg) -> int: else: telemetry_k8s.collect_cluster_metadata(chaos_telemetry) - decoded_chaos_run_telemetry = ChaosRunTelemetry(json.loads(chaos_telemetry.to_json())) + telemetry_json = chaos_telemetry.to_json() + decoded_chaos_run_telemetry = ChaosRunTelemetry(json.loads(telemetry_json)) chaos_output.telemetry = decoded_chaos_run_telemetry logging.info(f"Chaos data:\n{chaos_output.to_json()}") if enable_elastic: @@ -470,7 +516,6 @@ def main(cfg) -> int: logging.info(f"telemetry upload log: {safe_logger.log_file_name}") try: telemetry_k8s.send_telemetry(config["telemetry"], telemetry_request_id, chaos_telemetry) - telemetry_k8s.put_cluster_events(telemetry_request_id, config["telemetry"], start_time, end_time) telemetry_k8s.put_critical_alerts(telemetry_request_id, config["telemetry"], summary) # prometheus data collection is available only on Openshift if config["telemetry"]["prometheus_backup"]: @@ -499,8 +544,7 @@ def main(cfg) -> int: if prometheus_archive_files: safe_logger.info("starting prometheus archive upload:") telemetry_k8s.put_prometheus_data(config["telemetry"], prometheus_archive_files, telemetry_request_id) - if config["telemetry"]["logs_backup"] and distribution == "openshift": - telemetry_ocp.put_ocp_logs(telemetry_request_id, config["telemetry"], start_time, end_time) + except Exception as e: logging.error(f"failed to send telemetry data: {str(e)}") else: