Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 163 additions & 56 deletions tests/cephfs/cephfs_metrics/cephfs_metrics_scale.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import json
import random
import string
import time
import traceback
from json import JSONDecodeError

from ceph.ceph import CommandFailed
from tests.cephfs.cephfs_utilsV1 import FsUtils
from utility.log import Log
from utility.retry import retry

log = Log(__name__)

Expand All @@ -24,6 +27,109 @@ class Metrics_Value_Not_Matching(Exception):
pass


@retry((JSONDecodeError, CommandFailed), tries=5, delay=5)
def get_client_id(client, rank=0, mounted_dir="", fs_name="cephfs"):
ranked_mds, _ = client.exec_command(
sudo=True,
cmd=f"ceph fs status {fs_name} -f json | jq '.mdsmap[] | select(.rank == {rank}) | .name'",
)
log.info("Executing MDS name with rank command: %s", ranked_mds)
ranked_mds = ranked_mds.replace('"', "").replace("\n", "")
client_id_cmd = (
f"ceph tell mds.{ranked_mds} session ls | jq '.[] | select(.client_metadata.mount_point"
f' != null and (.client_metadata.mount_point | contains("{mounted_dir}"))) | .id\''
)
log.info(f"Executing Client ID Command : {client_id_cmd}")
client_id, _ = client.exec_command(sudo=True, cmd=client_id_cmd)
client_id = client_id.replace('"', "").replace("\n", "")
if client_id == "":
log.error(f"Client not found for Mounted Directory : {mounted_dir}")
raise CommandFailed(f"Client not found for Mounted Directory : {mounted_dir}")
log.info(f"Client ID :[{client_id}] for Mounted Directory : [{mounted_dir}]")
return client_id, rank


@retry((JSONDecodeError, CommandFailed), tries=5, delay=10)
def get_mds_metrics_for_client(
client, client_id, rank, mds_rank=0, mounted_dir="", fs_name="cephfs"
):
ranked_mds, _ = client.exec_command(
sudo=True,
cmd=f"ceph fs status {fs_name} -f json | jq '.mdsmap[] | select(.rank == {mds_rank}) | .name'",
)
log.info(f"Executing MDS name with rank command: {ranked_mds}")
ranked_mds = ranked_mds.replace('"', "").replace("\n", "")
log.info(f"Client ID :[{client_id}] for Mounted Directory : [{mounted_dir}]")
cmd = f""" ceph tell mds.{ranked_mds} counter dump 2>/dev/null | \
jq -r '. | to_entries | map(select(.key | match("mds_client_metrics"))) | \
.[].value[] | select(.labels.client != null and (.labels.client | contains("{client_id}"))
and (.labels.rank == "{rank}"))'
"""
metrics_out, _ = client.exec_command(sudo=True, cmd=cmd)
log.info(
f"Metrics for MDS : {ranked_mds} Mounted Directory: {mounted_dir} and Client : {client_id} is {metrics_out}"
)
if metrics_out == "":
log.error(f"Metrics not found for MDS : {ranked_mds}")
raise CommandFailed(f"Client not found for Mounted Directory : {mounted_dir}")
metrics_out = json.loads(str(metrics_out))
return metrics_out


def get_mds_metrics_from_ranks(ranks, fs_util, client, mount_dir, cephfs):
"""
Try fetching MDS metrics for the given client and mount_dir from the list of ranks.

Returns:
dict: MDS metrics if found.

Raises:
CommandFailed: If no metrics are found from any rank.
"""
client_id = None
client_rank = None

# Step 1: Get client_id from one of the MDS ranks
for rank in ranks:
try:
client_id, client_rank = get_client_id(client, rank, mount_dir, cephfs)
if client_id:
log.info(f"Found client ID '{client_id}' from rank {client_rank}")
break
except Exception as e:
log.warning(
f"Rank {rank}: Failed to get client ID for mount {mount_dir}: {e}"
)
continue

if not client_id:
raise CommandFailed(f"Client not found in any MDS ranks for mount {mount_dir}")

# Step 2: Use client_id and try to collect metrics from all MDS ranks
for rank in ranks:
try:
mds_metric = get_mds_metrics_for_client(
client,
client_id,
client_rank,
mds_rank=rank,
mounted_dir=mount_dir,
fs_name=cephfs,
)
if mds_metric and mds_metric != 1:
log.info(f"Successfully got MDS metrics from rank {rank}")
return mds_metric
except Exception as e:
log.warning(
f"Rank {rank}: Failed to fetch metrics for client {client_id}: {e}"
)
continue

raise CommandFailed(
f"Metrics not found for client {client_id} in any of the MDS ranks for mount {mount_dir}"
)


def run(ceph_cluster, **kw):
try:
tc = "CEPH-83588355"
Expand Down Expand Up @@ -104,17 +210,18 @@ def run(ceph_cluster, **kw):
fs_util.fuse_mount([client4], fuse_mounting_dir_4)
# Get initial MDS metrics
# pdb.set_trace()
mds_metric_client1 = fs_util.get_mds_metrics(
client1, 0, fuse_mounting_dir_1, cephfs

mds_metric_client1 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client1, fuse_mounting_dir_1, cephfs
)
mds_metric_client2 = fs_util.get_mds_metrics(
client2, 0, fuse_mounting_dir_2, cephfs
mds_metric_client2 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client2, fuse_mounting_dir_2, cephfs
)
mds_metric_client3 = fs_util.get_mds_metrics(
client3, 0, fuse_mounting_dir_3, cephfs
mds_metric_client3 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client3, fuse_mounting_dir_3, cephfs
)
mds_metric_client4 = fs_util.get_mds_metrics(
client4, 0, fuse_mounting_dir_4, cephfs
mds_metric_client4 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client4, fuse_mounting_dir_4, cephfs
)

log.info(f"mds_metric_client1: {mds_metric_client1}")
Expand All @@ -132,11 +239,11 @@ def run(ceph_cluster, **kw):
inode_list = ["opened_inodes", "pinned_icaps", "total_inodes"]

# Get initial inode metrics for client1 and client3
client1_pre_inode_metrics = fs_util.get_mds_metrics(
client1, 0, fuse_mounting_dir_1, cephfs
client1_pre_inode_metrics = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client1, fuse_mounting_dir_1, cephfs
)
client3_pre_inode_metrics = fs_util.get_mds_metrics(
client3, 0, fuse_mounting_dir_3, cephfs
client3_pre_inode_metrics = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client3, fuse_mounting_dir_3, cephfs
)
log.info(f"client1_pre_inode_metrics: {client1_pre_inode_metrics}")
log.info(f"client3_pre_inode_metrics: {client3_pre_inode_metrics}")
Expand Down Expand Up @@ -165,11 +272,11 @@ def run(ceph_cluster, **kw):

log.info("Writing files is done for client1 and client3")
log.info("Get metrics only for client1 and client3")
client1_post_inode_metrics = fs_util.get_mds_metrics(
client1, 0, fuse_mounting_dir_1, cephfs
client1_post_inode_metrics = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client1, fuse_mounting_dir_1, cephfs
)
client3_post_inode_metrics = fs_util.get_mds_metrics(
client3, 0, fuse_mounting_dir_3, cephfs
client3_post_inode_metrics = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client3, fuse_mounting_dir_3, cephfs
)
log.info(f"client1_post_inode_metrics: {client1_post_inode_metrics}")
log.info(f"client3_post_inode_metrics: {client3_post_inode_metrics}")
Expand Down Expand Up @@ -197,11 +304,11 @@ def run(ceph_cluster, **kw):
)
file_paths_client2 = []
file_paths_client4 = []
pre_opened_files_client2 = fs_util.get_mds_metrics(
client2, 0, fuse_mounting_dir_2, cephfs
pre_opened_files_client2 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client2, fuse_mounting_dir_2, cephfs
)["counters"]["opened_files"]
pre_opened_files_client4 = fs_util.get_mds_metrics(
client4, 0, fuse_mounting_dir_4, cephfs
pre_opened_files_client4 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client4, fuse_mounting_dir_4, cephfs
)["counters"]["opened_files"]
log.info(f"pre_opened_files_client2: {pre_opened_files_client2}")
log.info(f"pre_opened_files_client4: {pre_opened_files_client4}")
Expand Down Expand Up @@ -240,11 +347,11 @@ def run(ceph_cluster, **kw):
log.info(f"Number of PID4s from opening files: {pids4}")
time.sleep(10)
log.info("Get final MDS metrics after opening files")
client2_post_opened_files = fs_util.get_mds_metrics(
client2, 0, fuse_mounting_dir_2, cephfs
client2_post_opened_files = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client2, fuse_mounting_dir_2, cephfs
)["counters"]["opened_files"]
client4_post_opened_files = fs_util.get_mds_metrics(
client4, 0, fuse_mounting_dir_4, cephfs
client4_post_opened_files = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client4, fuse_mounting_dir_4, cephfs
)["counters"]["opened_files"]
log.info(f"client2_post_opened_files: {client2_post_opened_files}")
log.info(f"client4_post_opened_files: {client4_post_opened_files}")
Expand All @@ -268,11 +375,11 @@ def run(ceph_cluster, **kw):
log.error(f"Failed to kill tail processes: {e}")
time.sleep(5)
log.info("Get final MDS metrics after killing the PIDs")
post_opened_files_client2 = fs_util.get_mds_metrics(
client2, 0, fuse_mounting_dir_2, cephfs
post_opened_files_client2 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client2, fuse_mounting_dir_2, cephfs
)["counters"]["opened_files"]
post_opened_files_client4 = fs_util.get_mds_metrics(
client4, 0, fuse_mounting_dir_4, cephfs
post_opened_files_client4 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client4, fuse_mounting_dir_4, cephfs
)["counters"]["opened_files"]
log.info(f"post_opened_files_client2: {post_opened_files_client2}")
log.info(f"post_opened_files_client4: {post_opened_files_client4}")
Expand All @@ -288,11 +395,11 @@ def run(ceph_cluster, **kw):
"Failed to verify opened_files for client4"
)
log.info("Verify if other clients opened_files metrics remain same")
post_opened_files_client1 = fs_util.get_mds_metrics(
client1, 0, fuse_mounting_dir_1, cephfs
post_opened_files_client1 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client1, fuse_mounting_dir_1, cephfs
)["counters"]["opened_files"]
post_opened_files_client3 = fs_util.get_mds_metrics(
client3, 0, fuse_mounting_dir_3, cephfs
post_opened_files_client3 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client3, fuse_mounting_dir_3, cephfs
)["counters"]["opened_files"]
log.info(f"post_opened_files_client1: {post_opened_files_client1}")
log.info(f"post_opened_files_client3: {post_opened_files_client3}")
Expand All @@ -315,17 +422,17 @@ def run(ceph_cluster, **kw):
log.info(
"Increase only Client2 and Client4 dentry metrics and other clients should remain same"
)
pre_dentry_client1 = fs_util.get_mds_metrics(
client1, 0, fuse_mounting_dir_1, cephfs
pre_dentry_client1 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client1, fuse_mounting_dir_1, cephfs
)["counters"]
pre_dentry_client2 = fs_util.get_mds_metrics(
client2, 0, fuse_mounting_dir_2, cephfs
pre_dentry_client2 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client2, fuse_mounting_dir_2, cephfs
)["counters"]
pre_dentry_client3 = fs_util.get_mds_metrics(
client3, 0, fuse_mounting_dir_3, cephfs
pre_dentry_client3 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client3, fuse_mounting_dir_3, cephfs
)["counters"]
pre_dentry_client4 = fs_util.get_mds_metrics(
client4, 0, fuse_mounting_dir_4, cephfs
pre_dentry_client4 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client4, fuse_mounting_dir_4, cephfs
)["counters"]
log.info(f"pre_dentry_client2: {pre_dentry_client2}")
log.info(f"pre_dentry_client4: {pre_dentry_client4}")
Expand Down Expand Up @@ -368,11 +475,11 @@ def run(ceph_cluster, **kw):
client4.exec_command(sudo=True, cmd=f"ls {fuse_mounting_dir_4}/{dir}/")
time.sleep(5)
log.info("Get final MDS metrics after creating directories and files")
post_dentry_client2 = fs_util.get_mds_metrics(
client2, 0, fuse_mounting_dir_2, cephfs
post_dentry_client2 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client2, fuse_mounting_dir_2, cephfs
)["counters"]
post_dentry_client4 = fs_util.get_mds_metrics(
client4, 0, fuse_mounting_dir_4, cephfs
post_dentry_client4 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client4, fuse_mounting_dir_4, cephfs
)["counters"]
log.info(f"post_dentry_client2: {post_dentry_client2}")
log.info(f"post_dentry_client4: {post_dentry_client4}")
Expand All @@ -388,11 +495,11 @@ def run(ceph_cluster, **kw):
f"Failed to verify {dentry} for client4"
)
log.info("Verify if other clients dentry metrics remain same")
post_dentry_client1 = fs_util.get_mds_metrics(
client1, 0, fuse_mounting_dir_1, cephfs
post_dentry_client1 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client1, fuse_mounting_dir_1, cephfs
)["counters"]
post_dentry_client3 = fs_util.get_mds_metrics(
client3, 0, fuse_mounting_dir_3, cephfs
post_dentry_client3 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client3, fuse_mounting_dir_3, cephfs
)["counters"]
log.info(f"post_dentry_client1: {post_dentry_client1}")
log.info(f"post_dentry_client3: {post_dentry_client3}")
Expand All @@ -415,11 +522,11 @@ def run(ceph_cluster, **kw):
)
# Using scp from client1 to client2, it will increase total_read_ops and total_read_size in client1
# In client2, it will increase total_write_ops and total_write_size
pre_read_ops_client1 = fs_util.get_mds_metrics(
client1, 0, fuse_mounting_dir_1, cephfs
pre_read_ops_client1 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client1, fuse_mounting_dir_1, cephfs
)["counters"]
pre_write_ops_client2 = fs_util.get_mds_metrics(
client2, 0, fuse_mounting_dir_2, cephfs
pre_write_ops_client2 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client2, fuse_mounting_dir_2, cephfs
)["counters"]
log.info(f"pre_read_ops_client1: {pre_read_ops_client1}")
log.info(f"pre_write_ops_client2: {pre_write_ops_client2}")
Expand Down Expand Up @@ -456,11 +563,11 @@ def run(ceph_cluster, **kw):
"total_write_size",
]
log.info("Get final MDS metrics after copying a file")
post_read_ops_client1 = fs_util.get_mds_metrics(
client1, 0, fuse_mounting_dir_1, cephfs
post_read_ops_client1 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client1, fuse_mounting_dir_1, cephfs
)["counters"]
post_write_ops_client2 = fs_util.get_mds_metrics(
client2, 0, fuse_mounting_dir_2, cephfs
post_write_ops_client2 = get_mds_metrics_from_ranks(
[0, 1, 2], fs_util, client2, fuse_mounting_dir_2, cephfs
)["counters"]
log.info(f"post_read_ops_client1: {post_read_ops_client1}")
log.info(f"post_write_ops_client2: {post_write_ops_client2}")
Expand Down
11 changes: 9 additions & 2 deletions tests/cephfs/lib/cephfs_common_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,20 @@ def wait_for_healthy_ceph(self, client, wait_time):
"""
ceph_healthy = 0
end_time = datetime.datetime.now() + datetime.timedelta(seconds=wait_time)
accepted_list = [
"experiencing slow operations in BlueStore",
"Slow OSD heartbeats",
]
while ceph_healthy == 0 and (datetime.datetime.now() < end_time):
if self.check_ceph_status(client, "HEALTH_OK"):
ceph_healthy = 1
else:
out, _ = client.exec_command(sudo=True, cmd="ceph health detail")
if "experiencing slow operations in BlueStore" in str(out):
log.info("Ignoring the known warning for Bluestore Slow ops")
if any(msg in str(out) for msg in accepted_list):
log.info(
"Ignoring the known warning for Bluestore Slow ops and OSD heartbeats"
)
log.warning("Cluster health can be OK, current state : %s", out)
ceph_healthy = 1
else:
log.info(
Expand Down
Loading