diff --git a/tests/cephfs/cephfs_metrics/cephfs_metrics_scale.py b/tests/cephfs/cephfs_metrics/cephfs_metrics_scale.py index e188197f204..db8ebb398ab 100644 --- a/tests/cephfs/cephfs_metrics/cephfs_metrics_scale.py +++ b/tests/cephfs/cephfs_metrics/cephfs_metrics_scale.py @@ -1,11 +1,14 @@ +import json import random import string import time import traceback +from json import JSONDecodeError from ceph.ceph import CommandFailed from tests.cephfs.cephfs_utilsV1 import FsUtils from utility.log import Log +from utility.retry import retry log = Log(__name__) @@ -24,6 +27,109 @@ class Metrics_Value_Not_Matching(Exception): pass +@retry((JSONDecodeError, CommandFailed), tries=5, delay=5) +def get_client_id(client, rank=0, mounted_dir="", fs_name="cephfs"): + ranked_mds, _ = client.exec_command( + sudo=True, + cmd=f"ceph fs status {fs_name} -f json | jq '.mdsmap[] | select(.rank == {rank}) | .name'", + ) + log.info("Executing MDS name with rank command: %s", ranked_mds) + ranked_mds = ranked_mds.replace('"', "").replace("\n", "") + client_id_cmd = ( + f"ceph tell mds.{ranked_mds} session ls | jq '.[] | select(.client_metadata.mount_point" + f' != null and (.client_metadata.mount_point | contains("{mounted_dir}"))) | .id\'' + ) + log.info(f"Executing Client ID Command : {client_id_cmd}") + client_id, _ = client.exec_command(sudo=True, cmd=client_id_cmd) + client_id = client_id.replace('"', "").replace("\n", "") + if client_id == "": + log.error(f"Client not found for Mounted Directory : {mounted_dir}") + raise CommandFailed(f"Client not found for Mounted Directory : {mounted_dir}") + log.info(f"Client ID :[{client_id}] for Mounted Directory : [{mounted_dir}]") + return client_id, rank + + +@retry((JSONDecodeError, CommandFailed), tries=5, delay=10) +def get_mds_metrics_for_client( + client, client_id, rank, mds_rank=0, mounted_dir="", fs_name="cephfs" +): + ranked_mds, _ = client.exec_command( + sudo=True, + cmd=f"ceph fs status {fs_name} -f json | jq '.mdsmap[] | select(.rank == {mds_rank}) | .name'", + ) + log.info(f"Executing MDS name with rank command: {ranked_mds}") + ranked_mds = ranked_mds.replace('"', "").replace("\n", "") + log.info(f"Client ID :[{client_id}] for Mounted Directory : [{mounted_dir}]") + cmd = f""" ceph tell mds.{ranked_mds} counter dump 2>/dev/null | \ + jq -r '. | to_entries | map(select(.key | match("mds_client_metrics"))) | \ + .[].value[] | select(.labels.client != null and (.labels.client | contains("{client_id}")) + and (.labels.rank == "{rank}"))' + """ + metrics_out, _ = client.exec_command(sudo=True, cmd=cmd) + log.info( + f"Metrics for MDS : {ranked_mds} Mounted Directory: {mounted_dir} and Client : {client_id} is {metrics_out}" + ) + if metrics_out == "": + log.error(f"Metrics not found for MDS : {ranked_mds}") + raise CommandFailed(f"Client not found for Mounted Directory : {mounted_dir}") + metrics_out = json.loads(str(metrics_out)) + return metrics_out + + +def get_mds_metrics_from_ranks(ranks, fs_util, client, mount_dir, cephfs): + """ + Try fetching MDS metrics for the given client and mount_dir from the list of ranks. + + Returns: + dict: MDS metrics if found. + + Raises: + CommandFailed: If no metrics are found from any rank. + """ + client_id = None + client_rank = None + + # Step 1: Get client_id from one of the MDS ranks + for rank in ranks: + try: + client_id, client_rank = get_client_id(client, rank, mount_dir, cephfs) + if client_id: + log.info(f"Found client ID '{client_id}' from rank {client_rank}") + break + except Exception as e: + log.warning( + f"Rank {rank}: Failed to get client ID for mount {mount_dir}: {e}" + ) + continue + + if not client_id: + raise CommandFailed(f"Client not found in any MDS ranks for mount {mount_dir}") + + # Step 2: Use client_id and try to collect metrics from all MDS ranks + for rank in ranks: + try: + mds_metric = get_mds_metrics_for_client( + client, + client_id, + client_rank, + mds_rank=rank, + mounted_dir=mount_dir, + fs_name=cephfs, + ) + if mds_metric and mds_metric != 1: + log.info(f"Successfully got MDS metrics from rank {rank}") + return mds_metric + except Exception as e: + log.warning( + f"Rank {rank}: Failed to fetch metrics for client {client_id}: {e}" + ) + continue + + raise CommandFailed( + f"Metrics not found for client {client_id} in any of the MDS ranks for mount {mount_dir}" + ) + + def run(ceph_cluster, **kw): try: tc = "CEPH-83588355" @@ -104,17 +210,18 @@ def run(ceph_cluster, **kw): fs_util.fuse_mount([client4], fuse_mounting_dir_4) # Get initial MDS metrics # pdb.set_trace() - mds_metric_client1 = fs_util.get_mds_metrics( - client1, 0, fuse_mounting_dir_1, cephfs + + mds_metric_client1 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client1, fuse_mounting_dir_1, cephfs ) - mds_metric_client2 = fs_util.get_mds_metrics( - client2, 0, fuse_mounting_dir_2, cephfs + mds_metric_client2 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client2, fuse_mounting_dir_2, cephfs ) - mds_metric_client3 = fs_util.get_mds_metrics( - client3, 0, fuse_mounting_dir_3, cephfs + mds_metric_client3 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client3, fuse_mounting_dir_3, cephfs ) - mds_metric_client4 = fs_util.get_mds_metrics( - client4, 0, fuse_mounting_dir_4, cephfs + mds_metric_client4 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client4, fuse_mounting_dir_4, cephfs ) log.info(f"mds_metric_client1: {mds_metric_client1}") @@ -132,11 +239,11 @@ def run(ceph_cluster, **kw): inode_list = ["opened_inodes", "pinned_icaps", "total_inodes"] # Get initial inode metrics for client1 and client3 - client1_pre_inode_metrics = fs_util.get_mds_metrics( - client1, 0, fuse_mounting_dir_1, cephfs + client1_pre_inode_metrics = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client1, fuse_mounting_dir_1, cephfs ) - client3_pre_inode_metrics = fs_util.get_mds_metrics( - client3, 0, fuse_mounting_dir_3, cephfs + client3_pre_inode_metrics = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client3, fuse_mounting_dir_3, cephfs ) log.info(f"client1_pre_inode_metrics: {client1_pre_inode_metrics}") log.info(f"client3_pre_inode_metrics: {client3_pre_inode_metrics}") @@ -165,11 +272,11 @@ def run(ceph_cluster, **kw): log.info("Writing files is done for client1 and client3") log.info("Get metrics only for client1 and client3") - client1_post_inode_metrics = fs_util.get_mds_metrics( - client1, 0, fuse_mounting_dir_1, cephfs + client1_post_inode_metrics = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client1, fuse_mounting_dir_1, cephfs ) - client3_post_inode_metrics = fs_util.get_mds_metrics( - client3, 0, fuse_mounting_dir_3, cephfs + client3_post_inode_metrics = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client3, fuse_mounting_dir_3, cephfs ) log.info(f"client1_post_inode_metrics: {client1_post_inode_metrics}") log.info(f"client3_post_inode_metrics: {client3_post_inode_metrics}") @@ -197,11 +304,11 @@ def run(ceph_cluster, **kw): ) file_paths_client2 = [] file_paths_client4 = [] - pre_opened_files_client2 = fs_util.get_mds_metrics( - client2, 0, fuse_mounting_dir_2, cephfs + pre_opened_files_client2 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client2, fuse_mounting_dir_2, cephfs )["counters"]["opened_files"] - pre_opened_files_client4 = fs_util.get_mds_metrics( - client4, 0, fuse_mounting_dir_4, cephfs + pre_opened_files_client4 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client4, fuse_mounting_dir_4, cephfs )["counters"]["opened_files"] log.info(f"pre_opened_files_client2: {pre_opened_files_client2}") log.info(f"pre_opened_files_client4: {pre_opened_files_client4}") @@ -240,11 +347,11 @@ def run(ceph_cluster, **kw): log.info(f"Number of PID4s from opening files: {pids4}") time.sleep(10) log.info("Get final MDS metrics after opening files") - client2_post_opened_files = fs_util.get_mds_metrics( - client2, 0, fuse_mounting_dir_2, cephfs + client2_post_opened_files = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client2, fuse_mounting_dir_2, cephfs )["counters"]["opened_files"] - client4_post_opened_files = fs_util.get_mds_metrics( - client4, 0, fuse_mounting_dir_4, cephfs + client4_post_opened_files = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client4, fuse_mounting_dir_4, cephfs )["counters"]["opened_files"] log.info(f"client2_post_opened_files: {client2_post_opened_files}") log.info(f"client4_post_opened_files: {client4_post_opened_files}") @@ -268,11 +375,11 @@ def run(ceph_cluster, **kw): log.error(f"Failed to kill tail processes: {e}") time.sleep(5) log.info("Get final MDS metrics after killing the PIDs") - post_opened_files_client2 = fs_util.get_mds_metrics( - client2, 0, fuse_mounting_dir_2, cephfs + post_opened_files_client2 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client2, fuse_mounting_dir_2, cephfs )["counters"]["opened_files"] - post_opened_files_client4 = fs_util.get_mds_metrics( - client4, 0, fuse_mounting_dir_4, cephfs + post_opened_files_client4 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client4, fuse_mounting_dir_4, cephfs )["counters"]["opened_files"] log.info(f"post_opened_files_client2: {post_opened_files_client2}") log.info(f"post_opened_files_client4: {post_opened_files_client4}") @@ -288,11 +395,11 @@ def run(ceph_cluster, **kw): "Failed to verify opened_files for client4" ) log.info("Verify if other clients opened_files metrics remain same") - post_opened_files_client1 = fs_util.get_mds_metrics( - client1, 0, fuse_mounting_dir_1, cephfs + post_opened_files_client1 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client1, fuse_mounting_dir_1, cephfs )["counters"]["opened_files"] - post_opened_files_client3 = fs_util.get_mds_metrics( - client3, 0, fuse_mounting_dir_3, cephfs + post_opened_files_client3 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client3, fuse_mounting_dir_3, cephfs )["counters"]["opened_files"] log.info(f"post_opened_files_client1: {post_opened_files_client1}") log.info(f"post_opened_files_client3: {post_opened_files_client3}") @@ -315,17 +422,17 @@ def run(ceph_cluster, **kw): log.info( "Increase only Client2 and Client4 dentry metrics and other clients should remain same" ) - pre_dentry_client1 = fs_util.get_mds_metrics( - client1, 0, fuse_mounting_dir_1, cephfs + pre_dentry_client1 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client1, fuse_mounting_dir_1, cephfs )["counters"] - pre_dentry_client2 = fs_util.get_mds_metrics( - client2, 0, fuse_mounting_dir_2, cephfs + pre_dentry_client2 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client2, fuse_mounting_dir_2, cephfs )["counters"] - pre_dentry_client3 = fs_util.get_mds_metrics( - client3, 0, fuse_mounting_dir_3, cephfs + pre_dentry_client3 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client3, fuse_mounting_dir_3, cephfs )["counters"] - pre_dentry_client4 = fs_util.get_mds_metrics( - client4, 0, fuse_mounting_dir_4, cephfs + pre_dentry_client4 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client4, fuse_mounting_dir_4, cephfs )["counters"] log.info(f"pre_dentry_client2: {pre_dentry_client2}") log.info(f"pre_dentry_client4: {pre_dentry_client4}") @@ -368,11 +475,11 @@ def run(ceph_cluster, **kw): client4.exec_command(sudo=True, cmd=f"ls {fuse_mounting_dir_4}/{dir}/") time.sleep(5) log.info("Get final MDS metrics after creating directories and files") - post_dentry_client2 = fs_util.get_mds_metrics( - client2, 0, fuse_mounting_dir_2, cephfs + post_dentry_client2 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client2, fuse_mounting_dir_2, cephfs )["counters"] - post_dentry_client4 = fs_util.get_mds_metrics( - client4, 0, fuse_mounting_dir_4, cephfs + post_dentry_client4 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client4, fuse_mounting_dir_4, cephfs )["counters"] log.info(f"post_dentry_client2: {post_dentry_client2}") log.info(f"post_dentry_client4: {post_dentry_client4}") @@ -388,11 +495,11 @@ def run(ceph_cluster, **kw): f"Failed to verify {dentry} for client4" ) log.info("Verify if other clients dentry metrics remain same") - post_dentry_client1 = fs_util.get_mds_metrics( - client1, 0, fuse_mounting_dir_1, cephfs + post_dentry_client1 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client1, fuse_mounting_dir_1, cephfs )["counters"] - post_dentry_client3 = fs_util.get_mds_metrics( - client3, 0, fuse_mounting_dir_3, cephfs + post_dentry_client3 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client3, fuse_mounting_dir_3, cephfs )["counters"] log.info(f"post_dentry_client1: {post_dentry_client1}") log.info(f"post_dentry_client3: {post_dentry_client3}") @@ -415,11 +522,11 @@ def run(ceph_cluster, **kw): ) # Using scp from client1 to client2, it will increase total_read_ops and total_read_size in client1 # In client2, it will increase total_write_ops and total_write_size - pre_read_ops_client1 = fs_util.get_mds_metrics( - client1, 0, fuse_mounting_dir_1, cephfs + pre_read_ops_client1 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client1, fuse_mounting_dir_1, cephfs )["counters"] - pre_write_ops_client2 = fs_util.get_mds_metrics( - client2, 0, fuse_mounting_dir_2, cephfs + pre_write_ops_client2 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client2, fuse_mounting_dir_2, cephfs )["counters"] log.info(f"pre_read_ops_client1: {pre_read_ops_client1}") log.info(f"pre_write_ops_client2: {pre_write_ops_client2}") @@ -456,11 +563,11 @@ def run(ceph_cluster, **kw): "total_write_size", ] log.info("Get final MDS metrics after copying a file") - post_read_ops_client1 = fs_util.get_mds_metrics( - client1, 0, fuse_mounting_dir_1, cephfs + post_read_ops_client1 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client1, fuse_mounting_dir_1, cephfs )["counters"] - post_write_ops_client2 = fs_util.get_mds_metrics( - client2, 0, fuse_mounting_dir_2, cephfs + post_write_ops_client2 = get_mds_metrics_from_ranks( + [0, 1, 2], fs_util, client2, fuse_mounting_dir_2, cephfs )["counters"] log.info(f"post_read_ops_client1: {post_read_ops_client1}") log.info(f"post_write_ops_client2: {post_write_ops_client2}") diff --git a/tests/cephfs/lib/cephfs_common_lib.py b/tests/cephfs/lib/cephfs_common_lib.py index 6e4a5420b5a..db5442a69ca 100644 --- a/tests/cephfs/lib/cephfs_common_lib.py +++ b/tests/cephfs/lib/cephfs_common_lib.py @@ -45,13 +45,20 @@ def wait_for_healthy_ceph(self, client, wait_time): """ ceph_healthy = 0 end_time = datetime.datetime.now() + datetime.timedelta(seconds=wait_time) + accepted_list = [ + "experiencing slow operations in BlueStore", + "Slow OSD heartbeats", + ] while ceph_healthy == 0 and (datetime.datetime.now() < end_time): if self.check_ceph_status(client, "HEALTH_OK"): ceph_healthy = 1 else: out, _ = client.exec_command(sudo=True, cmd="ceph health detail") - if "experiencing slow operations in BlueStore" in str(out): - log.info("Ignoring the known warning for Bluestore Slow ops") + if any(msg in str(out) for msg in accepted_list): + log.info( + "Ignoring the known warning for Bluestore Slow ops and OSD heartbeats" + ) + log.warning("Cluster health can be OK, current state : %s", out) ceph_healthy = 1 else: log.info( diff --git a/tests/cephfs/snapshot_clone/snap_schedule.py b/tests/cephfs/snapshot_clone/snap_schedule.py index 769ce552c46..a776728b9b3 100644 --- a/tests/cephfs/snapshot_clone/snap_schedule.py +++ b/tests/cephfs/snapshot_clone/snap_schedule.py @@ -1,4 +1,3 @@ -import datetime import json import os import random @@ -10,6 +9,7 @@ from ceph.ceph import CommandFailed from tests.cephfs.cephfs_utilsV1 import FsUtils +from tests.cephfs.lib.cephfs_common_lib import CephFSCommonUtils from tests.cephfs.snapshot_clone.cephfs_snap_utils import SnapUtils from utility.log import Log from utility.retry import retry @@ -51,6 +51,7 @@ def run(ceph_cluster, **kw): test_data = kw.get("test_data") fs_util = FsUtils(ceph_cluster, test_data=test_data) snap_util = SnapUtils(ceph_cluster) + cephfs_common_utils = CephFSCommonUtils(ceph_cluster) erasure = ( FsUtils.get_custom_config_value(test_data, "erasure") if test_data @@ -143,18 +144,12 @@ def run(ceph_cluster, **kw): "m" if LooseVersion(ceph_version) >= LooseVersion("17.2.6") else "M" ) log.info("Verify Ceph Status is healthy before starting test") - end_time = datetime.datetime.now() + datetime.timedelta(seconds=300) - ceph_healthy = 0 - while (datetime.datetime.now() < end_time) and (ceph_healthy == 0): - try: - fs_util.get_ceph_health_status(client1) - ceph_healthy = 1 - except Exception as ex: - log.info(ex) - log.info("Wait for few secs and recheck ceph status") - time.sleep(5) - if ceph_healthy == 0: - assert False, "Ceph remains unhealthy even after wait for 300secs" + wait_time_secs = 300 + if cephfs_common_utils.wait_for_healthy_ceph(client1, wait_time_secs): + raise CommandFailed( + f"Cluster health is not OK even after waiting for {wait_time_secs}sec", + ) + commands = [ f"ceph fs subvolume ls {default_fs}", "ceph config set mgr mgr/snap_schedule/allow_m_granularity true",