Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions controllers/array_action/array_mediator_svc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1808,7 +1808,11 @@ def _get_ear_replication(self, replication_request):

# Get volume group name from ID
volume_group = self._lsvolumegroup(volume_group_id)
volume_group_name = volume_group.name if volume_group else volume_group_id
if volume_group and hasattr(volume_group, 'name'):
volume_group_name = volume_group.name
else:
logger.warning("Volume group object not found or missing 'name' attribute for ID: {}, using ID as fallback".format(volume_group_id))
volume_group_name = volume_group_id

logger.info("found ear replication: {} in mode: {}".format(volume_group_id,
replication_mode))
Expand All @@ -1827,7 +1831,9 @@ def _get_replication_mode(self, volume_group_id):
replication_local_location = volume_group_replication.local_location
location_attr_name = f"location{replication_local_location}_replication_mode"

return getattr(volume_group_replication, location_attr_name, None)
if hasattr(volume_group_replication, location_attr_name):
return getattr(volume_group_replication, location_attr_name)
return None

@staticmethod
def _getattr_as_str(obj, attr, default=array_settings.UNKNOWN):
Expand Down Expand Up @@ -1941,7 +1947,9 @@ def _get_replication_policy(self, volume_group_id):
volume_group_replication = self._lsvolumegroupreplication(volume_group_id)
if not volume_group_replication:
return None
return volume_group_replication.replication_policy_name
if hasattr(volume_group_replication, 'replication_policy_name'):
return volume_group_replication.replication_policy_name
return None

def _assign_partition_replication_policy(self, volume_group_id, requested_policy_name):
try:
Expand Down
29 changes: 29 additions & 0 deletions controllers/servers/csi/sync_lock.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import threading
import time
from collections import defaultdict

from controllers.common.csi_logger import get_stdout_logger
Expand All @@ -8,19 +9,42 @@

ids_in_use = defaultdict(set)
ids_in_use_lock = threading.Lock()
ids_last_access = {} # Track last access time for each lock entry
STALE_LOCK_TIMEOUT = 600 # 10 minutes in seconds


def _add_to_ids_in_use(lock_key, object_id):
ids_in_use[lock_key].add(object_id)
ids_last_access[(lock_key, object_id)] = time.time()


def _remove_from_ids_in_use(lock_key, object_id):
if object_id in ids_in_use[lock_key]:
ids_in_use[lock_key].remove(object_id)
ids_last_access.pop((lock_key, object_id), None)
else:
logger.error("could not find lock to release for {}: {}".format(lock_key, object_id))


def _cleanup_stale_locks():
"""Remove lock entries that haven't been accessed recently."""
current_time = time.time()
stale_entries = []

for (lock_key, object_id), last_access in ids_last_access.items():
if current_time - last_access > STALE_LOCK_TIMEOUT:
stale_entries.append((lock_key, object_id))

for lock_key, object_id in stale_entries:
logger.warning("Cleaning up stale lock for {}: {} (last accessed {} seconds ago)".format(
lock_key, object_id, current_time - ids_last_access.get((lock_key, object_id), current_time)))
if object_id in ids_in_use[lock_key]:
ids_in_use[lock_key].remove(object_id)
ids_last_access.pop((lock_key, object_id), None)

return len(stale_entries)


class SyncLock:
def __init__(self, lock_key, object_id, action_name):
self.lock_key = lock_key
Expand All @@ -40,6 +64,11 @@ def _add_object_lock(self):
("trying to acquire lock for action {} with {}: {}".format(self.action_name, self.lock_key,
self.object_id)))
with ids_in_use_lock:
# Clean up stale locks before acquiring new one
stale_count = _cleanup_stale_locks()
if stale_count > 0:
logger.info("Cleaned up {} stale lock(s)".format(stale_count))

if self.object_id in ids_in_use[self.lock_key]:
logger.error(
"lock for action {} with {}: {} is already in use by another thread".format(self.action_name,
Expand Down