diff --git a/artifacts_test.py b/artifacts_test.py index 745e99d248e..07767b01da7 100644 --- a/artifacts_test.py +++ b/artifacts_test.py @@ -13,8 +13,10 @@ import datetime import pprint import re +from contextlib import contextmanager from functools import cached_property import json +from typing import Iterator, Any from unittest import SkipTest import yaml @@ -62,6 +64,13 @@ def tearDown(self) -> None: super().tearDown() + @contextmanager + def logged_subtest(self, action: str, + trace_id: str | None = None, metadata: dict[str, Any] | None = None) -> Iterator[None]: + with self.actions_log.action_scope(action, self.node.name, trace_id, metadata): + with self.subTest(msg=action): + yield + # since this logic id depended on code run by SCT to mark uuid as test, since commit 617026aa, this code it run in the background # and not being waited for, so we need to compensate for it here with retries @retrying(n=15, sleep_time=10, allowed_exceptions=(AssertionError,)) @@ -71,6 +80,7 @@ def check_scylla_version_in_housekeepingdb(self, prev_id: int, expected_status_c Validate reported version prev_id: check if new version is created """ + self.actions_log.info("Validating scylla version in housekeepingdb", target=self.node.name) assert self.node.uuid, "Node UUID wasn't created" row = self.housekeeping.get_most_recent_record(query=f"SELECT id, version, ip, statuscode " @@ -104,7 +114,7 @@ def check_scylla_version_in_housekeepingdb(self, prev_id: int, expected_status_c assert row[0] > prev_id, f"New row wasn't saved in {self.CHECK_VERSION_TABLE}" else: assert row[0] == prev_id, f"New row was saved in {self.CHECK_VERSION_TABLE} unexpectedly" - + self.actions_log.info("Scylla version in housekeepingdb is validated", target=self.node.name) return row[0] if row else 0 @property @@ -134,10 +144,10 @@ def run_cassandra_stress(self, args: str): transport_str = c_s_transport_str( self.params.get('peer_verification'), self.params.get('client_encrypt_mtls')) stress_cmd += f" -transport '{transport_str}'" - - result = self.node.remoter.run(stress_cmd) - assert "java.io.IOException" not in result.stdout - assert "java.io.IOException" not in result.stderr + with self.actions_log.action_scope("running cassandra-stress", target=self.node.name, metadata={"stress_cmd": stress_cmd}): + result = self.node.remoter.run(stress_cmd) + assert "java.io.IOException" not in result.stdout + assert "java.io.IOException" not in result.stderr def check_scylla(self): self.node.run_nodetool("status") @@ -223,6 +233,8 @@ def verify_snitch(self, backend_name: str): """ if not self.params.get("use_preinstalled_scylla"): self.log.info("Skipping verifying the snitch due to the 'use_preinstalled_scylla' being set to False") + self.actions_log.info( + "Skipping verifying the snitch due to the 'use_preinstalled_scylla' being set to False", target=self.node.name) return describecluster_snitch = self.get_describecluster_info().snitch @@ -234,13 +246,13 @@ def verify_snitch(self, backend_name: str): snitch_matches_describecluster = [pattern.search(describecluster_snitch) for pattern in snitch_patterns] snitch_matches_scylla_yaml = [pattern.search(scylla_yaml_snitch) for pattern in snitch_patterns] - with self.subTest('verify snitch against describecluster output'): + with self.logged_subtest('verify snitch against describecluster output'): self.assertTrue(any(snitch_matches_describecluster), msg=f"Expected snitch matches for describecluster to not be empty, but was. Snitch " f"matches: {snitch_matches_describecluster}" ) - with self.subTest('verify snitch against scylla.yaml configuration'): + with self.logged_subtest('verify snitch against scylla.yaml configuration'): self.assertTrue(any(snitch_matches_scylla_yaml), msg=f"Expected snitch matches for scylla yaml to not be empty, but was. Snitch " f"matches: {snitch_matches_scylla_yaml}" @@ -263,6 +275,7 @@ def verify_docker_latest_match_release(self) -> None: def verify_nvme_write_cache(self) -> None: if self.write_back_cache is None or self.node.parent_cluster.is_additional_data_volume_used(): + self.actions_log.info("Skipped verifying NVMe write cache", target=self.node.name) return expected_write_cache_value = "write back" if self.write_back_cache else "write through" @@ -323,11 +336,11 @@ def test_scylla_service(self): backend = self.params.get("cluster_backend") if backend == "aws": - with self.subTest("check ENA support"): + with self.logged_subtest("check ENA support"): assert self.node.ena_support, "ENA support is not enabled" if backend in ["gce", "aws", "azure"] and self.params.get("use_preinstalled_scylla"): - with self.subTest("check Scylla IO Params"): + with self.logged_subtest("check Scylla IO Params"): try: if self.node.db_node_instance_type in ["t3.micro"]: self.skipTest( @@ -342,57 +355,58 @@ def test_scylla_service(self): ) except SkipTest as exc: self.log.info("Skipping IOTuneValidation due to %s", exc.args) + self.actions_log.info("Skipped IOTuneValidation", target=self.node.name) except Exception: # noqa: BLE001 self.log.error("IOTuneValidator failed", exc_info=True) TestFrameworkEvent(source={self.__class__.__name__}, message="Error during IOTune params validation.", severity=Severity.ERROR).publish() - with self.subTest("verify write cache for NVMe devices"): + with self.logged_subtest("verify write cache for NVMe devices"): self.verify_nvme_write_cache() if (backend != "docker" and not self.params.get("nonroot_offline_install") and self.node.db_node_instance_type != "t3.micro"): - with self.subTest("verify XFS online discard enabled"): + with self.logged_subtest("verify XFS online discard enabled"): self.verify_xfs_online_discard_enabled() if backend == "gce": - with self.subTest("verify users"): + with self.logged_subtest("verify users"): self.verify_users() expected_housekeeping_status_code = 'cr' if backend == "docker" else 'r' if self.params.get("use_preinstalled_scylla") and backend != "docker": - with self.subTest("check the cluster name"): + with self.logged_subtest("check the cluster name"): self.check_cluster_name() - with self.subTest('verify snitch'): + with self.logged_subtest('verify snitch'): self.verify_snitch(backend_name=backend) - with self.subTest('verify node health'): + with self.logged_subtest('verify node health'): self.verify_node_health() - with self.subTest("check Scylla server after installation"): + with self.logged_subtest("check Scylla server after installation"): self.check_scylla() - with self.subTest("check cqlsh installation"): + with self.logged_subtest("check cqlsh installation"): self.check_cqlsh() - with self.subTest("check node_exporter liveness"): + with self.logged_subtest("check node_exporter liveness"): node_info_service = NodeLoadInfoServices().get(self.node) assert node_info_service.cpu_load_5 assert node_info_service.get_node_boot_time_seconds() - with self.subTest("check scylla_doctor results"): - if self.params.get("run_scylla_doctor"): + if self.params.get("run_scylla_doctor"): + with self.logged_subtest("check scylla_doctor results"): self.run_scylla_doctor() - else: - self.log.info("Running scylla-doctor is disabled") + else: + self.log.info("Running scylla-doctor is disabled") # We don't install any time sync service in docker, so the test is unnecessary: # https://github.com/scylladb/scylla/tree/master/dist/docker/etc/supervisord.conf.d if backend != "docker": - with self.subTest("check if scylla unnecessarily installed a time synchronization service"): + with self.logged_subtest("check if scylla unnecessarily installed a time synchronization service"): # Checks https://github.com/scylladb/scylla/issues/8339 # If the instance already has systemd-timesyncd is_timesyncd_service_installed = self.check_service_existence(service_name="systemd-timesyncd") @@ -443,24 +457,24 @@ def test_scylla_service(self): # backend=backend) version_id_after_stop = 0 - with self.subTest("check Scylla server after stop/start"): + with self.logged_subtest("check Scylla server after stop/start"): self.node.stop_scylla(verify_down=True) self.node.start_scylla(verify_up=True) - # Scylla service has been stopped/started after installation and re-configuration. # So we don't need to stop and to start it again self.check_scylla() if not self.node.is_nonroot_install: self.log.info("Validate version after stop/start") - self.check_housekeeping_service_status(backend=backend) - version_id_after_stop = self.check_scylla_version_in_housekeepingdb( - prev_id=0, - expected_status_code=expected_housekeeping_status_code, - new_row_expected=False, - backend=backend) - - with self.subTest("check Scylla server after restart"): + with self.actions_log.action_scope("Validate version after stop/start"): + self.check_housekeeping_service_status(backend=backend) + version_id_after_stop = self.check_scylla_version_in_housekeepingdb( + prev_id=0, + expected_status_code=expected_housekeeping_status_code, + new_row_expected=False, + backend=backend) + + with self.logged_subtest("check Scylla server after restart"): self.node.restart_scylla(verify_up_after=True) self.check_scylla() @@ -473,35 +487,41 @@ def test_scylla_service(self): backend=backend) if backend != 'docker': - with self.subTest("Check the output of perftune.py"): + with self.logged_subtest("Check the output of perftune.py"): perftune_checker = PerftuneOutputChecker(self.node) perftune_checker.compare_perftune_results() if backend == 'docker': - with self.subTest("Check docker latest tags"): + with self.logged_subtest("Check docker latest tags"): self.verify_docker_latest_match_release() def run_scylla_doctor(self): if self.params.get('client_encrypt') and SkipPerIssues("https://github.com/scylladb/field-engineering/issues/2280", self.params): self.log.info("Scylla Doctor test is skipped for encrypted environment due to issue field-engineering#2280") + self.actions_log.info( + "Scylla Doctor test is skipped for encrypted environment due to issue field-engineering#2280") return if self.db_cluster.nodes[0].is_nonroot_install and \ SkipPerIssues("https://github.com/scylladb/scylla-cluster-tests/issues/10540", self.params): self.log.info("Scylla Doctor test is skipped for non-root test due to issue field-engineering#2254. ") + self.actions_log.info( + "Scylla Doctor test is skipped for non-root test due to issue field-engineering#2254.") return if self.node.parent_cluster.cluster_backend == "docker": - self.log.info("Scylla Doctor check in SCT isn't yet support for docker backend") + self.log.info("Scylla Doctor check in SCT isn't yet supported for docker backend") + self.actions_log.info("Scylla Doctor check in SCT isn't yet supported for docker backend") return for node in self.db_cluster.nodes: - scylla_doctor = ScyllaDoctor(node, self.test_config, bool(self.params.get('unified_package'))) - scylla_doctor.install_scylla_doctor() - scylla_doctor.argus_collect_sd_package() - scylla_doctor.run_scylla_doctor_and_collect_results() - scylla_doctor.analyze_vitals() - scylla_doctor.analyze_and_verify_results() + with self.actions_log.action_scope("installing and running Scylla Doctor", target=node.name): + scylla_doctor = ScyllaDoctor(node, self.test_config, bool(self.params.get('unified_package'))) + scylla_doctor.install_scylla_doctor() + scylla_doctor.argus_collect_sd_package() + scylla_doctor.run_scylla_doctor_and_collect_results() + scylla_doctor.analyze_vitals() + scylla_doctor.analyze_and_verify_results() def get_email_data(self): self.log.info("Prepare data for email") diff --git a/sdcm/logcollector.py b/sdcm/logcollector.py index b85cc86a207..d7d6c561e36 100644 --- a/sdcm/logcollector.py +++ b/sdcm/logcollector.py @@ -1006,6 +1006,8 @@ class BaseSCTLogCollector(LogCollector): search_locally=True), FileLog(name='argus.log', search_locally=True), + FileLog(name='actions.log', + search_locally=True), FileLog(name=r'*debug.json', search_locally=True), FileLog(name='result_gradual_increase.log'), diff --git a/sdcm/sct_events/base.py b/sdcm/sct_events/base.py index f0f25544f29..02c5eb891e9 100644 --- a/sdcm/sct_events/base.py +++ b/sdcm/sct_events/base.py @@ -35,10 +35,12 @@ from sdcm import sct_abs_path from sdcm.sct_events import Severity, SctEventProtocol from sdcm.sct_events.events_processes import EventsProcessesRegistry +from sdcm.utils.action_logger import get_action_logger DEFAULT_SEVERITIES = sct_abs_path("defaults/severities.yaml") FILTER_EVENT_DECAY_TIME = 600.0 LOGGER = logging.getLogger(__name__) +ACTION_LOGGER = get_action_logger("event") class SctEventTypesRegistry(Dict[str, Type["SctEvent"]]): @@ -245,6 +247,18 @@ def publish(self, warn_not_ready: bool = True) -> None: LOGGER.warning("[SCT internal warning] %s is not ready to be published", self) return get_events_main_device(_registry=self._events_processes_registry).publish_event(self) + if self.severity.value > Severity.WARNING.value: + metadata = {"base": self.base} + if self.type: + metadata["type"] = self.type + if self.subtype: + metadata["subtype"] = self.subtype + ACTION_LOGGER.error( + f"{self.severity.name} Event", + trace_id=self.event_id, + target=getattr(self, 'node', None), + metadata=metadata + ) self._ready_to_publish = False def publish_or_dump(self, default_logger: Optional[logging.Logger] = None, warn_not_ready: bool = True) -> None: diff --git a/sdcm/tester.py b/sdcm/tester.py index 952926e7840..8cd73f31544 100644 --- a/sdcm/tester.py +++ b/sdcm/tester.py @@ -74,6 +74,7 @@ from sdcm.cassandra_harry_thread import CassandraHarryThread from sdcm.teardown_validators import teardown_validators_list from sdcm.tombstone_gc_verification_thread import TombstoneGcVerificationThread +from sdcm.utils.action_logger import get_action_logger from sdcm.utils.alternator.consts import NO_LWT_TABLE_NAME from sdcm.utils.aws_kms import AwsKms from sdcm.utils.aws_region import AwsRegion @@ -306,6 +307,7 @@ class ClusterInformation(NamedTuple): class ClusterTester(db_stats.TestStatsMixin, unittest.TestCase): log = None + actions_log = None localhost = None events_processes_registry = None monitors: BaseMonitorSet = None @@ -781,6 +783,7 @@ def _init_params(self): def _init_logging(self): self.log = logging.getLogger(self.__class__.__name__) + self.actions_log = get_action_logger('tester') self.logdir = self.test_config.logdir() def run(self, result=None): @@ -1101,6 +1104,7 @@ def _db_post_validation(): # available yet. Update rack info in Argus for loaders in the end of set up. for loaders in self.loaders_multitenant: loaders.update_rack_info_in_argus() + self.actions_log.info("initialized test") def set_system_auth_rf(self, db_cluster=None): db_cluster = db_cluster or self.db_cluster @@ -2449,6 +2453,7 @@ def create_keyspace(self, keyspace_name, replication_factor, tablets_config: Opt if tablets_config: cmd += ' AND TABLETS = %s' % tablets_config execution_result = session.execute(cmd) + self.actions_log.info("Keyspace created", metadata={"keyspace": keyspace_name, "statement": cmd}) if execution_result: self.log.debug("keyspace creation result: {}".format(execution_result.response_future)) @@ -2507,6 +2512,7 @@ def create_table(self, name, key_type="varchar", # noqa: PLR0913 self.log.debug('CQL query to execute: {}'.format(query)) with self.db_cluster.cql_connection_patient(node=self.db_cluster.nodes[0], keyspace=keyspace_name) as session: session.execute(query) + self.actions_log.info("Created table", metadata={"table_name": name, "query": query}) time.sleep(0.2) def truncate_cf(self, ks_name: str, table_name: str, session: Session, truncate_timeout_sec: int | None = None): diff --git a/sdcm/utils/action_logger.py b/sdcm/utils/action_logger.py new file mode 100644 index 00000000000..03e2d123d31 --- /dev/null +++ b/sdcm/utils/action_logger.py @@ -0,0 +1,65 @@ +import logging +from contextlib import contextmanager +from typing import Dict, Optional, Any, Iterator +import json + + +class ActionLogger: + """ + A logger designed for use in test flows to describe the actions taken during a test. + + This logger allows for logging actions in a separate log, making it easier to read + and understand the sequence of steps in a test flow. It supports structured logging + with additional metadata such as source, action, target, trace ID, and custom metadata + for further processing by other tools. + """ + + def __init__(self, logger: logging.Logger, source: str): + self.logger = logger + self.root_logger = logging.getLogger(__name__) + self.source = source + + def _log(self, level: int, action: str, target: Optional[str] = None, trace_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> None: + extra = { + "source": self.source, + "action": action, + } + message = f" source: {self.source}, action: {action}" + if target: + extra["target"] = target + message += f", target: {target}" + if trace_id: + extra["trace_id"] = trace_id + message += f", trace_id: {trace_id}" + if metadata: + extra["metadata"] = metadata + message += f", metadata: {json.dumps(metadata)}" + + self.logger.log(level, "", extra=extra) + self.root_logger.log(level, message) + + def info(self, action: str, target: Optional[str] = None, trace_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> None: + self._log(logging.INFO, action, target, trace_id, metadata) + + def warning(self, action: str, target: Optional[str] = None, trace_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> None: + self._log(logging.WARNING, action, target, trace_id, metadata) + + def error(self, action: str, target: Optional[str] = None, trace_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> None: + self._log(logging.ERROR, action, target, trace_id, metadata) + + @contextmanager + def action_scope(self, action: str, target: Optional[str] = None, + trace_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> Iterator[None]: + self.info(f"Started - {action}", target, trace_id, metadata) + try: + yield + finally: + self.info(f"Finished - {action}", target, trace_id, metadata) + + +logger = logging.getLogger("action_logger") +logger.setLevel(logging.INFO) + + +def get_action_logger(source: str) -> ActionLogger: + return ActionLogger(logger, source) diff --git a/sdcm/utils/log.py b/sdcm/utils/log.py index 9811b1c1ec6..b3c2e3202d1 100644 --- a/sdcm/utils/log.py +++ b/sdcm/utils/log.py @@ -2,8 +2,10 @@ import logging import logging.config import warnings +from datetime import datetime import urllib3 +import json LOGGER = logging.getLogger(__name__) @@ -66,6 +68,36 @@ def format(self, record): return output +class JSONLFormatter(logging.Formatter): + """ + A custom logging formatter that outputs log records in JSON Lines (JSONL) format. + + Attributes added to the log record: + - datetime: The current UTC timestamp in ISO 8601 format with a 'Z' suffix. + - status: The log level name in lowercase. + - source: Optional source of the log record. + - action: Optional action associated with the log record. + - target: Optional target information, if available. + - trace_id: Optional trace ID for correlation, if available. + - metadata: Optional metadata dictionary, if available. + """ + + def format(self, record: logging.LogRecord) -> str: + log_entry = { + "datetime": datetime.utcnow().isoformat() + "Z", + "status": record.levelname.lower(), + "source": getattr(record, "source", ""), + "action": getattr(record, "action", ""), + } + if target := getattr(record, "target", None): + log_entry["target"] = target + if trace_id := getattr(record, "trace_id", None): + log_entry["trace_id"] = trace_id + if metadata := getattr(record, "metadata", None): + log_entry["metadata"] = metadata + return json.dumps(log_entry, ensure_ascii=False) + + class FilterRemote(logging.Filter): def filter(self, record): return not record.name == 'sdcm.remote' @@ -109,6 +141,10 @@ def configure_logging(exception_handler=None, '()': MultilineMessagesFormatter, 'format': '< t:%(asctime)s f:%(filename)-15s l:%(lineno)-4s c:%(name)-20s p:%(levelname)-5s > %(message)s' }, + 'action_logger': { + '()': JSONLFormatter, + 'format': '%(message)s' + } } if filters is None: filters = { @@ -138,6 +174,13 @@ def configure_logging(exception_handler=None, 'filename': '{log_dir}/argus.log', 'mode': 'a', 'formatter': 'default', + }, + 'actions': { + 'level': 'INFO', + 'class': 'logging.FileHandler', + 'filename': '{log_dir}/actions.log', + 'mode': 'a', + 'formatter': 'action_logger', } } if loggers is None: @@ -180,6 +223,11 @@ def configure_logging(exception_handler=None, 'level': 'DEBUG', 'propagate': False }, + 'action_logger': { + 'handlers': ['actions'], + 'level': 'INFO', + 'propagate': False + }, 'sdcm.argus_test_run': { 'handlers': ['argus'], 'level': 'DEBUG', diff --git a/unit_tests/test_tester.py b/unit_tests/test_tester.py index 4058fdaf17b..a13a2dfa797 100644 --- a/unit_tests/test_tester.py +++ b/unit_tests/test_tester.py @@ -25,7 +25,8 @@ from sdcm.sct_events.health import ClusterHealthValidatorEvent from sdcm.tester import ClusterTester, silence, TestResultEvent from sdcm.sct_config import SCTConfiguration -from sdcm.utils.log import MultilineMessagesFormatter, configure_logging +from sdcm.utils.action_logger import get_action_logger +from sdcm.utils.log import MultilineMessagesFormatter, configure_logging, JSONLFormatter from sdcm.sct_events.system import TestFrameworkEvent from sdcm.sct_events.file_logger import get_events_grouped_by_category from sdcm.sct_events.events_processes import EventsProcessesRegistry @@ -56,11 +57,16 @@ def __init__(self, *args): unittest.mock.patch("sdcm.sct_events.base.SctEvent._events_processes_registry", self.events_processes_registry) self.events_processes_registry_patcher.start() + self.actions_log = get_action_logger('tester') configure_logging( formatters={ 'default': { '()': MultilineMessagesFormatter, 'format': '%(message)s' + }, + 'action_logger': { + '()': JSONLFormatter, + 'format': '%(message)s' } }, variables={'log_dir': self.logdir} diff --git a/upgrade_test.py b/upgrade_test.py index 3318d073b1b..23870284f5d 100644 --- a/upgrade_test.py +++ b/upgrade_test.py @@ -72,12 +72,12 @@ def truncate_entries(func): def inner(self, *args, **kwargs): node = args[0] with self.db_cluster.cql_connection_patient(node, keyspace='truncate_ks', connect_timeout=600) as session: - InfoEvent(message="Start truncate simple tables").publish() + self.actions_log.info("Start truncate simple tables") session.default_timeout = 60.0 * 5 session.default_consistency_level = ConsistencyLevel.QUORUM try: self.cql_truncate_simple_tables(session=session, rows=NUMBER_OF_ROWS_FOR_TRUNCATE_TEST) - InfoEvent(message="Finish truncate simple tables").publish() + self.actions_log.info("Finish truncate simple tables") except cassandra.DriverException as details: InfoEvent(message=f"Failed truncate simple tables. Error: {str(details)}. Traceback: {traceback.format_exc()}", severity=Severity.ERROR).publish() @@ -181,13 +181,13 @@ def read_data_from_truncated_tables(self, session): truncate_query = 'SELECT COUNT(*) FROM {}' tables_name = self.get_tables_name_of_keyspace(session=session, keyspace_name='truncate_ks') for table_name in tables_name: - InfoEvent(message=f"Start read data from {table_name} tables").publish() + self.actions_log.info("Start read data from tables", target=table_name) try: count = self._query_from_one_table(session=session, query=truncate_query, table_name=table_name) self.assertEqual(str(count[0][0]), '0', msg='Expected that there is no data in the table truncate_ks.{}, but found {} rows' .format(table_name, count[0][0])) - InfoEvent(message=f"Finish read data from {table_name} tables").publish() + self.actions_log.info("Finish read data from tables", target=table_name) except Exception as details: # noqa: BLE001 InfoEvent(message=f"Failed read data from {table_name} tables. Error: {str(details)}. Traceback: {traceback.format_exc()}", severity=Severity.ERROR).publish() @@ -221,6 +221,7 @@ def upgrade_node(self, node, upgrade_sstables=True): ignore_raft_topology_cmd_failing]) # https://github.com/scylladb/scylla/issues/10447#issuecomment-1194155163 def _upgrade_node(self, node, upgrade_sstables=True, new_scylla_repo=None, new_version=None): # noqa: PLR0915 + self.actions_log.info("Starting node upgrade", target=node.name) new_scylla_repo = new_scylla_repo or self.params.get('new_scylla_repo') new_version = new_version or self.params.get('new_version') upgrade_node_packages = self.params.get('upgrade_node_packages') @@ -236,58 +237,46 @@ def _upgrade_node(self, node, upgrade_sstables=True, new_scylla_repo=None, new_v if self.params.get("enable_views_with_tablets_on_upgrade"): scylla_yaml_updates.update({"experimental_features": ["views-with-tablets"]}) - InfoEvent(message='Upgrading a Node').publish() + self.actions_log.info('Upgrading a Node', target=node.name) # We assume that if update_db_packages is not empty we install packages from there. # In this case we don't use upgrade based on new_scylla_repo(ignored sudo yum update scylla...) result = node.remoter.run('scylla --version') self.orig_ver = result.stdout.strip() if upgrade_node_packages: - InfoEvent(message='upgrade_node - started to "upgrade_node_packages').publish() + self.actions_log.info('Started upgrade_node_packages', target=node.name) # update_scylla_packages - InfoEvent(message='upgrade_node - started sending upgrade packages to node').publish() - node.remoter.send_files(upgrade_node_packages, '/tmp/scylla', verbose=True) - InfoEvent(message='upgrade_node - ended sending upgrade packages to node').publish() + self.actions_log.info("sending upgrade packages to node", target=node.name) + with self.actions_log.action_scope('sending upgrade packages to node', target=node.name): + node.remoter.send_files(upgrade_node_packages, '/tmp/scylla', verbose=True) # node.remoter.run('sudo yum update -y --skip-broken', connect_timeout=900) node.remoter.run('sudo yum install python34-PyYAML -y') # replace the packages node.remoter.run(r'rpm -qa scylla\*') # flush all memtables to SSTables - node.run_nodetool("drain", timeout=15*60, coredump_on_timeout=True, long_running=True, retry=0) - node.run_nodetool("snapshot") - node.stop_scylla_server() - # update *development* packages - node.remoter.run('sudo rpm -UvhR --oldpackage /tmp/scylla/*development*', ignore_status=True) - # and all the rest - node.remoter.run('sudo rpm -URvh --replacefiles /tmp/scylla/*.rpm | true') - node.remoter.run(r'rpm -qa scylla\*') - InfoEvent(message='upgrade_node - ended to "upgrade_node_packages').publish() + with self.actions_log.action_scope("stopping node", target=node.name): + node.run_nodetool("drain", timeout=15*60, coredump_on_timeout=True, long_running=True, retry=0) + node.run_nodetool("snapshot") + node.stop_scylla_server() + with self.actions_log.action_scope("upgrading packages", target=node.name): + node.remoter.run('sudo rpm -UvhR --oldpackage /tmp/scylla/*development*', ignore_status=True) + node.remoter.run('sudo rpm -URvh --replacefiles /tmp/scylla/*.rpm | true') + node.remoter.run(r'rpm -qa scylla\*') elif new_scylla_repo: - InfoEvent(message='upgrade_node - started to create "new_scylla_repo"').publish() # backup the data node.remoter.run('sudo cp /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml-backup') if node.distro.is_rhel_like: node.remoter.run('sudo cp /etc/yum.repos.d/scylla.repo ~/scylla.repo-backup') else: node.remoter.run('sudo cp /etc/apt/sources.list.d/scylla.list ~/scylla.list-backup') - InfoEvent(message='upgrade_node - ended to create "new_scylla_repo"').publish() - InfoEvent(message='upgrade_node - started to "backup_conf"').publish() backup_conf(node) - InfoEvent(message='upgrade_node - ended to "backup_conf"').publish() assert new_scylla_repo.startswith('http') - InfoEvent(message='upgrade_node - started to "download_scylla_repo"').publish() node.download_scylla_repo(new_scylla_repo) - InfoEvent(message='upgrade_node - ended to "download_scylla_repo"').publish() # flush all memtables to SSTables - InfoEvent(message='upgrade_node - started to "drain"').publish() - node.run_nodetool("drain", timeout=15*60, coredump_on_timeout=True, long_running=True, retry=0) - InfoEvent(message='upgrade_node - ended to "drain"').publish() - InfoEvent(message='upgrade_node - started to "nodetool snapshot"').publish() - node.run_nodetool("snapshot") - InfoEvent(message='upgrade_node - ended to "nodetool snapshot"').publish() - InfoEvent(message='upgrade_node - started to "stop_scylla_server"').publish() - node.stop_scylla_server(verify_down=False) - InfoEvent(message='upgrade_node - ended to "stop_scylla_server"').publish() + with self.actions_log.action_scope("stop node", target=node.name): + node.run_nodetool("drain", timeout=15*60, coredump_on_timeout=True, long_running=True, retry=0) + node.run_nodetool("snapshot") + node.stop_scylla_server(verify_down=False) orig_is_enterprise = node.is_product_enterprise if node.distro.is_rhel_like: @@ -302,7 +291,6 @@ def _upgrade_node(self, node, upgrade_sstables=True, new_scylla_repo=None, new_v ver_suffix = r'\*{}'.format(new_version) if new_version else '' scylla_pkg_ver = f"{scylla_pkg}{ver_suffix}" - InfoEvent(message=f'upgrade_node - target version={self.params.scylla_version_upgrade_target}').publish() if (orig_is_enterprise and ComparableScyllaVersion(self.orig_ver) < '2025.1.0~dev' <= ComparableScyllaVersion( self.params.scylla_version_upgrade_target)): @@ -316,67 +304,49 @@ def _upgrade_node(self, node, upgrade_sstables=True, new_scylla_repo=None, new_v if self.params.get('use_preinstalled_scylla'): scylla_pkg_ver += f" {scylla_pkg}-machine-image" + with self.actions_log.action_scope("updating packages", target=node.name): + if node.distro.is_rhel_like: + node.remoter.run(r'sudo yum update {}\* -y'.format(scylla_pkg_ver)) + else: + node.remoter.sudo('apt-get update') + node.remoter.sudo( + f'DEBIAN_FRONTEND=noninteractive apt-get dist-upgrade {scylla_pkg_ver} -y' + f' -o Dpkg::Options::="--force-confdef" -o Dpkg::Options::="--force-confold"' + ) - if node.distro.is_rhel_like: - InfoEvent(message='upgrade_node - starting to "yum update"').publish() - node.remoter.run(r'sudo yum update {}\* -y'.format(scylla_pkg_ver)) - InfoEvent(message='upgrade_node - ended to "yum update"').publish() - else: - InfoEvent(message='upgrade_node - starting to "apt-get update"').publish() - node.remoter.sudo('apt-get update') - node.remoter.sudo( - f'DEBIAN_FRONTEND=noninteractive apt-get dist-upgrade {scylla_pkg_ver} -y' - f' -o Dpkg::Options::="--force-confdef" -o Dpkg::Options::="--force-confold"' - ) - InfoEvent(message='upgrade_node - ended to "apt-get update"').publish() - - InfoEvent(message='upgrade_node - fix /etc/scylla.d/io.conf arguments compatibility').publish() node.remoter.sudo( f"sed -i 's/--num-io-queues/--num-io-groups/' {node.add_install_prefix('/etc/scylla.d/io.conf')}") - InfoEvent(message='upgrade_node - starting to "check_reload_systemd_config"').publish() - check_reload_systemd_config(node) - InfoEvent(message='upgrade_node - ended to "check_reload_systemd_config"').publish() - - InfoEvent(message='upgrade_node - starting to "run_scylla_sysconfig_setup"').publish() - node.run_scylla_sysconfig_setup() - InfoEvent(message='upgrade_node - ended to "run_scylla_sysconfig_setup"').publish() + with self.actions_log.action_scope("reload systemd and run scylla sysconfig setup", target=node.name): + check_reload_systemd_config(node) + node.run_scylla_sysconfig_setup() if scylla_yaml_updates: - InfoEvent(message='upgrade_node - starting to "update_scylla_yaml"').publish() - self._update_scylla_yaml_on_node(node_to_update=node, updates=scylla_yaml_updates) - InfoEvent(message='upgrade_node - ended to "update_scylla_yaml"').publish() + with self.actions_log.action_scope("update_scylla_yaml", target=node.name, metadata={"updates": scylla_yaml_updates}): + self._update_scylla_yaml_on_node(node_to_update=node, updates=scylla_yaml_updates) node.forget_scylla_version() node.drop_raft_property() - InfoEvent(message='upgrade_node - starting to "start_scylla_server"').publish() - node.start_scylla_server(verify_up_timeout=500) - InfoEvent(message='upgrade_node - ended to "start_scylla_server"').publish() - InfoEvent(message='upgrade_node - starting to "get_db_nodes_cpu_mode"').publish() + with self.actions_log.action_scope("start_scylla_server", target=node.name): + node.start_scylla_server(verify_up_timeout=500) self.db_cluster.get_db_nodes_cpu_mode() - InfoEvent(message='upgrade_node - ended to "get_db_nodes_cpu_mode"').publish() result = node.remoter.run('scylla --version') new_ver = result.stdout.strip() assert self.orig_ver != new_ver, "scylla-server version didn't change during upgrade" self.new_ver = new_ver - InfoEvent(message='upgrade_node - starting to "_update_argus_upgraded_version"').publish() self._update_argus_upgraded_version(node, new_ver) - InfoEvent(message='upgrade_node - ended to "_update_argus_upgraded_version"').publish() if upgrade_sstables: - InfoEvent(message='upgrade_node - starting to "upgradesstables_if_command_available"').publish() self.upgradesstables_if_command_available(node) - InfoEvent(message='upgrade_node - ended to "upgradesstables_if_command_available"').publish() self.db_cluster.wait_all_nodes_un() + self.actions_log.info("Upgrade node completed", target=node.name) def upgrade_os(self, nodes): def upgrade(node): - InfoEvent(message=f'upgrade_node_system - starting to "upgrade_system" of the node {node.name}').publish() node.upgrade_system() - InfoEvent(message=f'upgrade_node_system - ended to "upgrade_system" of the node {node.name}').publish() if self.params.get('upgrade_node_system'): - InfoEvent(message='Upgrading OS on nodes').publish() - parallel_obj = ParallelObject(objects=nodes, timeout=self.system_upgrade_timeout) - parallel_obj.run(upgrade) + with self.actions_log.action_scope("upgrade OS on nodes"): + parallel_obj = ParallelObject(objects=nodes, timeout=self.system_upgrade_timeout) + parallel_obj.run(upgrade) @truncate_entries # https://github.com/scylladb/scylla/issues/10447#issuecomment-1194155163 @@ -388,14 +358,15 @@ def rollback_node(self, node, upgrade_sstables=True): ignore_topology_change_coordinator_errors, ignore_raft_topology_cmd_failing]) def _rollback_node(self, node, upgrade_sstables=True): - InfoEvent(message='Rollbacking a Node').publish() + self.actions_log.info("Starting node rollback", target=node.name) result = node.remoter.run('scylla --version') orig_ver = result.stdout.strip() # flush all memtables to SSTables - node.run_nodetool("drain", timeout=15*60, coredump_on_timeout=True, long_running=True, retry=0) - # backup the data - node.run_nodetool("snapshot") - node.stop_scylla_server(verify_down=False) + with self.actions_log.action_scope("stop node", target=node.name): + node.run_nodetool("drain", timeout=15*60, coredump_on_timeout=True, long_running=True, retry=0) + # backup the data + node.run_nodetool("snapshot") + node.stop_scylla_server(verify_down=False) if node.distro.is_rhel_like: node.remoter.run('sudo cp ~/scylla.repo-backup /etc/yum.repos.d/scylla.repo') @@ -409,6 +380,7 @@ def _rollback_node(self, node, upgrade_sstables=True): if re.findall(r'\d+.\d+', self.orig_ver)[0] == re.findall(r'\d+.\d+', self.new_ver)[0]: self.upgrade_rollback_mode = 'minor_release' + self.actions_log.info('downgrading packages', target=node.name) if self.upgrade_rollback_mode == 'reinstall' or not node.distro.is_rhel_like: scylla_pkg_ver = node.scylla_pkg() @@ -427,7 +399,6 @@ def _rollback_node(self, node, upgrade_sstables=True): ) recover_conf(node) node.remoter.run('sudo systemctl daemon-reload') - elif self.upgrade_rollback_mode == 'minor_release': node.remoter.run(r'sudo yum downgrade scylla\*%s-\* -y' % self.orig_ver.split('-')[0]) else: @@ -437,6 +408,7 @@ def _rollback_node(self, node, upgrade_sstables=True): result = node.remoter.run('scylla --version') new_ver = result.stdout.strip() + self.actions_log.info('finished downgrading packages', target=node.name) node.remoter.run('sudo cp /etc/scylla/scylla.yaml-backup /etc/scylla/scylla.yaml') @@ -446,23 +418,22 @@ def _rollback_node(self, node, upgrade_sstables=True): node.run_scylla_sysconfig_setup() node.start_scylla_server(verify_up_timeout=500) - InfoEvent(message='original scylla-server version is %s, latest: %s' % (orig_ver, new_ver)).publish() assert orig_ver != new_ver, "scylla-server version didn't change during rollback" if upgrade_sstables: self.upgradesstables_if_command_available(node) self.db_cluster.wait_all_nodes_un() + self.actions_log.info("Node rollback completed", target=node.name) - @staticmethod - def upgradesstables_if_command_available(node, queue=None): + def upgradesstables_if_command_available(self, node, queue=None): upgradesstables_available = False upgradesstables_supported = node.remoter.run( 'nodetool help | grep -q upgradesstables && echo "yes" || echo "no"') if "yes" in upgradesstables_supported.stdout: upgradesstables_available = True - InfoEvent(message="calling upgradesstables").publish() - node.run_nodetool(sub_cmd="upgradesstables") + with self.actions_log.action_scope("Upgrading SSTables", target=node.name): + node.run_nodetool(sub_cmd="upgradesstables") if queue: queue.put(upgradesstables_available) queue.task_done() @@ -509,7 +480,6 @@ def wait_for_node_to_finish(): return True try: - InfoEvent(message="Start waiting for upgardesstables to finish").publish() wait.wait_for(func=wait_for_node_to_finish, step=30, timeout=900, throw_exc=True, text="Waiting until upgardesstables is finished") except tenacity.RetryError: @@ -528,30 +498,30 @@ def test_upgrade_cql_queries(self): """ self.upgrade_os(self.db_cluster.nodes) - InfoEvent(message='Populate DB with many types of tables and data').publish() + self.actions_log.info('Populating DB with tables and data') self.fill_db_data() - InfoEvent(message='Run some Queries to verify data BEFORE UPGRADE').publish() + self.actions_log.info('Running queries to verify data before upgrade') self.verify_db_data() - InfoEvent(message='Starting c-s write workload to pupulate 10M paritions').publish() + self.actions_log.info('Starting cassandra-stress write workload for 10M partitions') # YAML: stress_cmd: cassandra-stress write cl=QUORUM n=10000000 # -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=3)' # -mode cql3 native -rate threads=1000 -pop seq=1..10000000 stress_cmd = self._cs_add_node_flag(self.params.get('stress_cmd')) self.run_stress_thread(stress_cmd=stress_cmd) - InfoEvent(message='Sleeping for 360s to let cassandra-stress populate ' - 'some data before the mixed workload').publish() + self.actions_log.info('Waiting for cassandra-stress to populate data', metadata={"wait_time_seconds": 600}) time.sleep(600) - InfoEvent(message='Starting c-s read workload for 60m').publish() + self.actions_log.info('Starting cassandra-stress read workload', metadata={"duration": "60m"}) # YAML: stress_cmd_1: cassandra-stress read cl=QUORUM duration=60m # -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=3)' # -mode cql3 native -rate threads=100 -pop seq=1..10000000 stress_cmd_1 = self._cs_add_node_flag(self.params.get('stress_cmd_1')) stress_queue = self.run_stress_thread(stress_cmd=stress_cmd_1) - InfoEvent(message='Sleeping for 300s to let cassandra-stress start before the upgrade...').publish() + self.actions_log.info('Waiting for cassandra-stress to start before upgrade', + metadata={"wait_time_seconds": 300}) time.sleep(300) nodes_num = len(self.db_cluster.nodes) @@ -568,18 +538,18 @@ def test_upgrade_cql_queries(self): time.sleep(300) InfoEvent(message='Upgrade Node %s ended' % self.db_cluster.node_to_upgrade.name).publish() - InfoEvent(message='Run some Queries to verify data AFTER UPGRADE').publish() + self.actions_log.info('Running queries to verify data after upgrade') self.verify_db_data() self.verify_stress_thread(stress_queue) def fill_and_verify_db_data(self, note, pre_fill=False, rewrite_data=True): if pre_fill: - InfoEvent(message='Populate DB with many types of tables and data').publish() + self.actions_log.info('Populating DB with tables and data') self.fill_db_data() - InfoEvent(message='Run some Queries to verify data %s' % note).publish() + self.actions_log.info('Running queries to verify data', metadata={"stage": note}) self.verify_db_data() if rewrite_data: - InfoEvent(message='Re-Populate DB with many types of tables and data').publish() + self.actions_log.info('Re-populating DB with tables and data') self.fill_db_data() # Added to cover the issue #5621: upgrade from 3.1 to 3.2 fails on std::logic_error (Column idx_token doesn't exist @@ -621,17 +591,17 @@ def test_rolling_upgrade(self): # noqa: PLR0914, PLR0915 """ self.upgrade_os(self.db_cluster.nodes) - InfoEvent(message='pre-test - prepare test keyspaces and tables').publish() + self.actions_log.info('Preparing test keyspaces and tables') # prepare test keyspaces and tables before upgrade to avoid schema change during mixed cluster. self.prepare_keyspaces_and_tables() - InfoEvent(message='Running s-b to create schemas to avoid #11459').publish() + self.actions_log.info('Running stress-bench to create schemas', metadata={"issue": "#11459"}) large_partition_stress_during_upgrade = self.params.get('stress_before_upgrade') sb_create_schema = self.run_stress_thread(stress_cmd=f'{large_partition_stress_during_upgrade} -duration=1m') self.verify_stress_thread(sb_create_schema) self.fill_and_verify_db_data('BEFORE UPGRADE', pre_fill=True) # write workload during entire test - InfoEvent(message='Starting c-s write workload during entire test').publish() + self.actions_log.info('Starting cassandra-stress write workload for entire test') write_stress_during_entire_test = self.params.get('write_stress_during_entire_test') entire_write_cs_thread_pool = self.run_stress_thread(stress_cmd=write_stress_during_entire_test) @@ -656,29 +626,30 @@ def test_rolling_upgrade(self): # noqa: PLR0914, PLR0915 keyspace = "keyspace_entire_test" table = "standard1" - InfoEvent(message='pre-test - Run stress workload before upgrade').publish() + self.actions_log.info('Running stress workload before upgrade') if self.should_do_complex_profile(): # complex workload: prepare write - InfoEvent(message='Starting c-s complex workload (5M) to prepare data').publish() + self.actions_log.info('Starting complex cassandra-stress workload to prepare data', metadata={"size": "5M"}) stress_cmd_complex_prepare = self.params.get('stress_cmd_complex_prepare') complex_cs_thread_pool = self.run_stress_thread(stress_cmd=stress_cmd_complex_prepare) # wait for the complex workload to finish self.verify_stress_thread(complex_cs_thread_pool) - InfoEvent(message='Will check paged query before upgrading nodes').publish() + self.actions_log.info('Checking paged query before upgrading nodes') self.paged_query(keyspace=keyspace) - InfoEvent(message='Done checking paged query before upgrading nodes').publish() + self.actions_log.info('Finished checking paged query before upgrading nodes') # prepare write workload - InfoEvent(message='Starting c-s prepare write workload (n=10000000)').publish() + self.actions_log.info('Starting prepare write workload', metadata={"size": "10000000"}) prepare_write_stress = self.params.get('prepare_write_stress') prepare_write_cs_thread_pool = self.run_stress_thread(stress_cmd=prepare_write_stress) - InfoEvent(message='Sleeping for 60s to let cassandra-stress start before the upgrade...').publish() + self.actions_log.info('Waiting for cassandra-stress to start before upgrade', + metadata={"wait_time_seconds": 60}) self.metric_has_data( metric_query='sct_cassandra_stress_write_gauge{type="ops", keyspace="keyspace1"}', n=5) - InfoEvent(message="Start gemini during upgrade").publish() + self.actions_log.info("Starting gemini during upgrade") gemini_cmd = self.params.get("gemini_cmd") if self.enable_cdc_for_tables: gemini_cmd += " --table-options \"cdc={'enabled': true}\"" @@ -689,7 +660,7 @@ def test_rolling_upgrade(self): # noqa: PLR0914, PLR0915 with ignore_upgrade_schema_errors(): step = 'Step1 - Upgrade First Node ' - InfoEvent(message=step).publish() + self.actions_log.info(step) # upgrade first node self.db_cluster.node_to_upgrade = self.db_cluster.nodes[indexes[0]] InfoEvent(message='Upgrade Node %s begin' % self.db_cluster.node_to_upgrade.name).publish() @@ -701,29 +672,29 @@ def test_rolling_upgrade(self): # noqa: PLR0914, PLR0915 self.verify_stress_thread(prepare_write_cs_thread_pool) # read workload (cl=QUORUM) - InfoEvent(message='Starting c-s read workload (cl=QUORUM n=10000000)').publish() + self.actions_log.info('Starting read workload with QUORUM consistency', metadata={"size": "10000000"}) stress_cmd_read_cl_quorum = self.params.get('stress_cmd_read_cl_quorum') read_stress_queue = self.run_stress_thread(stress_cmd=stress_cmd_read_cl_quorum) # wait for the read workload to finish self.verify_stress_thread(read_stress_queue) - InfoEvent(message='after upgraded one node').publish() + self.actions_log.info('Completed first node upgrade') self.search_for_idx_token_error_after_upgrade(node=self.db_cluster.node_to_upgrade, step=step+' - after upgraded one node') # read workload - InfoEvent(message='Starting c-s read workload for 10m').publish() + self.actions_log.info('Starting read workload', metadata={"duration": "10m"}) stress_cmd_read_10m = self.params.get('stress_cmd_read_10m') read_10m_cs_thread_pool = self.run_stress_thread(stress_cmd=stress_cmd_read_10m) - InfoEvent(message='Running s-b large partitions workload before and during upgrade').publish() + self.actions_log.info('Running stress-bench large partitions workload during upgrade') large_partition_stress_during_upgrade = self.params.get('stress_before_upgrade') self.run_stress_thread(stress_cmd=large_partition_stress_during_upgrade) - InfoEvent(message='Sleeping for 60s to let cassandra-stress and s-b start before the upgrade...').publish() + self.actions_log.info('Waiting for workloads to start before upgrade', metadata={"wait_time_seconds": 60}) time.sleep(60) step = 'Step2 - Upgrade Second Node ' - InfoEvent(message=step).publish() + self.actions_log.info(step) # upgrade second node self.db_cluster.node_to_upgrade = self.db_cluster.nodes[indexes[1]] InfoEvent(message='Upgrade Node %s begin' % self.db_cluster.node_to_upgrade.name).publish() @@ -738,13 +709,14 @@ def test_rolling_upgrade(self): # noqa: PLR0914, PLR0915 step=step+' - after upgraded two nodes') # read workload (60m) - InfoEvent(message='Starting c-s read workload for 60m').publish() + self.actions_log.info('Starting read workload', metadata={"duration": "60m"}) stress_cmd_read_60m = self.params.get('stress_cmd_read_60m') read_60m_cs_thread_pool = self.run_stress_thread(stress_cmd=stress_cmd_read_60m) - InfoEvent(message='Sleeping for 60s to let cassandra-stress start before the rollback...').publish() + self.actions_log.info('Waiting for cassandra-stress to start before rollback', + metadata={"wait_time_seconds": 60}) time.sleep(60) - InfoEvent(message='Step3 - Rollback Second Node ').publish() + self.actions_log.info('Step3 - Rollback Second Node') # rollback second node InfoEvent(message='Rollback Node %s begin' % self.db_cluster.nodes[indexes[1]].name).publish() self.rollback_node(self.db_cluster.nodes[indexes[1]]) @@ -752,17 +724,17 @@ def test_rolling_upgrade(self): # noqa: PLR0914, PLR0915 self.db_cluster.nodes[indexes[1]].check_node_health() step = 'Step4 - Verify data during mixed cluster mode ' - InfoEvent(message=step).publish() + self.actions_log.info(step) self.fill_and_verify_db_data('after rollback the second node') - InfoEvent(message='Repair the first upgraded Node').publish() + self.actions_log.info('Repair the first upgraded Node') self.db_cluster.nodes[indexes[0]].run_nodetool(sub_cmd='repair', timeout=7200, coredump_on_timeout=True) self.search_for_idx_token_error_after_upgrade(node=self.db_cluster.node_to_upgrade, step=step) with ignore_upgrade_schema_errors(): step = 'Step5 - Upgrade rest of the Nodes ' - InfoEvent(message=step).publish() + self.actions_log.info(step) for i in indexes[1:]: self.db_cluster.node_to_upgrade = self.db_cluster.nodes[i] InfoEvent(message='Upgrade Node %s begin' % self.db_cluster.node_to_upgrade.name).publish() @@ -772,17 +744,17 @@ def test_rolling_upgrade(self): # noqa: PLR0914, PLR0915 self.fill_and_verify_db_data('after upgraded %s' % self.db_cluster.node_to_upgrade.name) self.search_for_idx_token_error_after_upgrade(node=self.db_cluster.node_to_upgrade, step=step) - InfoEvent(message='Step5.1 - run raft topology upgrade procedure').publish() + self.actions_log.info('Step5.1 - run raft topology upgrade procedure') self.run_raft_topology_upgrade_procedure() - InfoEvent(message='Step6 - Verify stress results after upgrade ').publish() - InfoEvent(message='Waiting for stress threads to complete after upgrade').publish() + self.actions_log.info('Step6 - Verify stress results after upgrade ') + self.actions_log.info('Waiting for stress threads to complete after upgrade') # wait for the 60m read workload to finish self.verify_stress_thread(read_60m_cs_thread_pool) self.verify_stress_thread(entire_write_cs_thread_pool) - InfoEvent(message='Step7 - Upgrade sstables to latest supported version ').publish() + self.actions_log.info('Step7 - Upgrade sstables to latest supported version ') # figure out what is the last supported sstable version self.expected_sstable_format_version = self.get_highest_supported_sstable_version() @@ -791,12 +763,12 @@ def test_rolling_upgrade(self): # noqa: PLR0914, PLR0915 # only check sstable format version if all nodes had 'nodetool upgradesstables' available if all(upgradesstables): - InfoEvent(message='Upgrading sstables if new version is available').publish() + self.actions_log.info('Upgrading sstables if new version is available') tables_upgraded = self.db_cluster.run_func_parallel(func=self.wait_for_sstable_upgrade) assert all(tables_upgraded), "Failed to upgrade the sstable format {}".format(tables_upgraded) # Verify sstabledump / scylla sstable dump-data - InfoEvent(message='Starting sstabledump to verify correctness of sstables').publish() + self.actions_log.info('Starting sstabledump to verify correctness of sstables') first_node = self.db_cluster.nodes[0] dump_cmd = get_sstable_data_dump_command(first_node, keyspace, table) @@ -806,8 +778,8 @@ def test_rolling_upgrade(self): # noqa: PLR0914, PLR0915 f'sudo {dump_cmd} $i 1>/tmp/sstabledump.output || ' 'exit 1; done', verbose=True) - InfoEvent(message='Step8 - Run stress and verify after upgrading entire cluster').publish() - InfoEvent(message='Starting verification stresses after cluster upgrade').publish() + self.actions_log.info('Step8 - Run stress and verify after upgrading entire cluster') + self.actions_log.info('Starting verification stresses after cluster upgrade') stress_after_cluster_upgrade = self.params.get('stress_after_cluster_upgrade') self.run_stress_thread(stress_cmd=stress_after_cluster_upgrade) verify_stress_after_cluster_upgrade = self.params.get( @@ -817,15 +789,15 @@ def test_rolling_upgrade(self): # noqa: PLR0914, PLR0915 if self.should_do_complex_profile(): # complex workload: verify data by simple read cl=ALL - InfoEvent(message='Starting c-s complex workload to verify data by simple read').publish() + self.actions_log.info('Starting c-s complex workload to verify data by simple read') stress_cmd_complex_verify_read = self.params.get('stress_cmd_complex_verify_read') complex_cs_thread_pool = self.run_stress_thread(stress_cmd=stress_cmd_complex_verify_read) # wait for the read complex workload to finish self.verify_stress_thread(complex_cs_thread_pool) - InfoEvent(message='Will check paged query after upgrading all nodes').publish() + self.actions_log.info('Will check paged query after upgrading all nodes') self.paged_query(keyspace=keyspace) - InfoEvent(message='Done checking paged query after upgrading nodes').publish() + self.actions_log.info('Done checking paged query after upgrading nodes') # After adjusted the workloads, there is a entire write workload, and it uses a fixed duration for catching # the data lose. @@ -853,8 +825,8 @@ def test_rolling_upgrade(self): # noqa: PLR0914, PLR0915 # During the test we filter and ignore some specific errors, but we want to allow only certain amount of them step = 'Step9 - Search for errors that we filter during the test ' - InfoEvent(message=step).publish() - InfoEvent(message='Checking how many failed_to_load_schem errors happened during the test').publish() + self.actions_log.info(step) + self.actions_log.info('Checking how many failed_to_load_schem errors happened during the test') error_factor = 3 schema_load_error_num = self.count_log_errors(search_pattern='Failed to load schema version', step=step) @@ -865,8 +837,8 @@ def test_rolling_upgrade(self): # noqa: PLR0914, PLR0915 workload_prioritization_error_num = self.count_log_errors(search_pattern='workload prioritization.*read_failure_exception', step=step, search_for_idx_token_error=False) - InfoEvent(message='schema_load_error_num: %s; workload_prioritization_error_num: %s' % - (schema_load_error_num, workload_prioritization_error_num)).publish() + self.actions_log.info('schema_load_error_num: %s; workload_prioritization_error_num: %s' % + (schema_load_error_num, workload_prioritization_error_num)) # Issue #https://github.com/scylladb/scylla-enterprise/issues/1391 # By Eliran's comment: For 'Failed to load schema version' error which is expected and non offensive is @@ -877,10 +849,10 @@ def test_rolling_upgrade(self): # noqa: PLR0914, PLR0915 'entire test, actual: %d' % ( error_factor, schema_load_error_num) - InfoEvent(message='Step10 - Verify that gemini did not failed during upgrade').publish() + self.actions_log.info('Step10 - Verify that gemini did not failed during upgrade') self.verify_gemini_results(queue=gemini_thread) - InfoEvent(message='all nodes were upgraded, and last workaround is verified.').publish() + self.actions_log.info('all nodes were upgraded, and last workaround is verified.') def run_raft_topology_upgrade_procedure(self): # wait features is enabled on nodes after upgrade @@ -894,35 +866,31 @@ def run_raft_topology_upgrade_procedure(self): node=node) feature_state_per_node.append(result) if not all(feature_state_per_node): - InfoEvent(message="Step5.1 - Consistent topology changes is not supported. Cluster stay with gossip topology mode").publish() + self.actions_log.info( + "Step5.1 - Consistent topology changes is not supported. Cluster stay with gossip topology mode") return raft_upgrade = RaftUpgradeProcedure(self.db_cluster.nodes[0]) raft_upgrade.start_upgrade_procedure() for node in self.db_cluster.nodes: RaftUpgradeProcedure(node).wait_upgrade_procedure_done() - InfoEvent(message="Step5.1 - raft topology upgrade procedure done").publish() + self.actions_log.info("Step5.1 - raft topology upgrade procedure done") def _start_and_wait_for_node_upgrade(self, node: BaseNode, step: int) -> None: - InfoEvent( - message=f"Step {step} - Upgrade {node.name} from dc {node.dc_idx}").publish() - InfoEvent(message='Upgrade Node %s begins' % node.name).publish() + self.actions_log.info(f"Step {step} - Upgrade node", target=node.name, metadata={"dc": node.dc_idx}) self.upgrade_node(node, upgrade_sstables=self.params.get('upgrade_sstables')) InfoEvent(message='Upgrade Node %s ended' % node.name).publish() node.check_node_health() def _start_and_wait_for_node_rollback(self, node: BaseNode, step: int) -> None: - InfoEvent( - message=f"Step {step} - " - f"Rollback {node.name} from dc {node.dc_idx}" - ).publish() - InfoEvent(message='Rollback Node %s begin' % node).publish() + self.actions_log.info(f"Step {step} - " + f"Rollback node", target=node.name, metadata={"dc": node.dc_idx}) self.rollback_node(node, upgrade_sstables=self.params.get('upgrade_sstables')) InfoEvent(message='Rollback Node %s ended' % node).publish() node.check_node_health() def _run_stress_workload(self, workload_name: str, wait_for_finish: bool = False) -> [CassandraStressThread]: """Runs workload from param name specified in test-case yaml""" - InfoEvent(message=f"Starting {workload_name}").publish() + self.actions_log.info(f"Starting {workload_name}") stress_commands = self.params.get(workload_name) workload_thread_pools = [] if isinstance(stress_commands, str): @@ -934,11 +902,11 @@ def _run_stress_workload(self, workload_name: str, wait_for_finish: bool = False if self.params.get('alternator_port'): self.pre_create_alternator_tables() if wait_for_finish is True: - InfoEvent(message=f"Waiting for {workload_name} to finish").publish() + self.actions_log.info(f"Waiting for {workload_name} to finish") for thread_pool in workload_thread_pools: self.verify_stress_thread(thread_pool) else: - InfoEvent(message='Sleeping for 60s to let the stress command(s) start before the next steps...').publish() + self.actions_log.info('Sleeping for 60s to let the stress command(s) start before the next steps...') time.sleep(60) return workload_thread_pools @@ -997,21 +965,20 @@ def test_generic_cluster_upgrade(self): # Rollback all nodes that where upgraded (not necessarily in the same order) random.shuffle(upgraded_nodes) - InfoEvent(message='Upgraded Nodes to be rollback are: %s' % - ", ".join(node.name for node in upgraded_nodes)).publish() + self.actions_log.info('Rollback nodes') for node in upgraded_nodes: self._start_and_wait_for_node_rollback(node, step=next(step)) # Upgrade all nodes + self.actions_log.info('Upgrade nodes') for node_to_upgrade in nodes_to_upgrade: self._start_and_wait_for_node_upgrade(node_to_upgrade, step=next(step)) + self.actions_log.info("All nodes were upgraded successfully") - InfoEvent(message="All nodes were upgraded successfully").publish() - - InfoEvent(message='Run raft topology upgrade procedure').publish() + self.actions_log.info('Run raft topology upgrade procedure') self.run_raft_topology_upgrade_procedure() - InfoEvent(message="Waiting for stress_during_entire_upgrade to finish").publish() + self.actions_log.info("Waiting for stress_during_entire_upgrade to finish") for stress_thread_pool in stress_thread_pools: self.verify_stress_thread(stress_thread_pool)