diff --git a/controllers/array_action/array_action_types.py b/controllers/array_action/array_action_types.py index 17a43773a..7a611e8ca 100644 --- a/controllers/array_action/array_action_types.py +++ b/controllers/array_action/array_action_types.py @@ -1,5 +1,9 @@ from dataclasses import dataclass, field +from enum import IntEnum +from typing import Optional +from datetime import datetime, timedelta +from csi_general import replication_pb2 as pb2 from controllers.common.node_info import Initiators @@ -95,3 +99,19 @@ class VolumeGroupIds: def __bool__(self): return bool(self.internal_id or self.name) + + +class ReplicationStatus(IntEnum): + UNKNOWN = pb2.GetVolumeReplicationInfoResponse.UNKNOWN + HEALTHY = pb2.GetVolumeReplicationInfoResponse.HEALTHY + DEGRADED = pb2.GetVolumeReplicationInfoResponse.DEGRADED + ERROR = pb2.GetVolumeReplicationInfoResponse.ERROR + + +@dataclass +class ReplicationInfo: + last_sync_time: Optional[datetime] = None + last_sync_duration: Optional[timedelta] = None + last_sync_bytes: Optional[int] = None + replication_status: ReplicationStatus = ReplicationStatus.UNKNOWN + status_message: Optional[str] = None diff --git a/controllers/array_action/array_mediator_ds8k.py b/controllers/array_action/array_mediator_ds8k.py index cc83c757c..b808e3070 100644 --- a/controllers/array_action/array_mediator_ds8k.py +++ b/controllers/array_action/array_mediator_ds8k.py @@ -685,6 +685,9 @@ def get_flashcopy_state(self, flashcopy_id): def get_replication(self, replication_request): raise NotImplementedError + def get_last_async_snapshot_info(self, volume_group_id): + raise NotImplementedError + def create_replication(self, replication_request): raise NotImplementedError diff --git a/controllers/array_action/array_mediator_interface.py b/controllers/array_action/array_mediator_interface.py index 3720ee2f4..db32ca54e 100644 --- a/controllers/array_action/array_mediator_interface.py +++ b/controllers/array_action/array_mediator_interface.py @@ -578,6 +578,22 @@ def demote_replication_volume(self, replication): """ raise NotImplementedError + @abstractmethod + def get_last_async_snapshot_info(self, volume_group_id): + """ + This function will return replication sync info for a volume group. + + Args: + volume_group_id : internal id of the volume group (EAR only) + + Returns: + ReplicationInfo + + Raises: + ObjectNotFoundError + """ + raise NotImplementedError + @abstractmethod def add_io_group_to_host(self, host_name, io_group): """ diff --git a/controllers/array_action/array_mediator_svc.py b/controllers/array_action/array_mediator_svc.py index 7abdcdf95..e328a20ec 100644 --- a/controllers/array_action/array_mediator_svc.py +++ b/controllers/array_action/array_mediator_svc.py @@ -1,7 +1,7 @@ from collections import defaultdict from io import StringIO from random import choice, randint -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import os from packaging.version import Version @@ -18,7 +18,9 @@ from controllers.array_action import svc_messages import controllers.servers.settings as controller_settings from controllers.servers.csi.decorators import register_csi_plugin -from controllers.array_action.array_action_types import Volume, Snapshot, Replication, Host, VolumeGroup, ThinVolume +from controllers.array_action.array_action_types import (Volume, Snapshot, Replication, Host, + VolumeGroup, ThinVolume, ReplicationInfo, + ReplicationStatus) from controllers.array_action.array_mediator_abstract import ArrayMediatorAbstract from controllers.array_action.utils import ClassProperty, convert_scsi_id_to_nguid from controllers.array_action.volume_group_interface import VolumeGroupInterface @@ -1778,6 +1780,79 @@ def _get_replication_mode(self, volume_group_id): return getattr(volume_group_replication, location_attr_name, None) + @staticmethod + def _getattr_as_str(obj, attr, default=array_settings.UNKNOWN): + value = getattr(obj, attr, None) + + if value is None: + return default + + value = str(value).strip() + return value or default + + @staticmethod + def _map_dr_link_status_to_proto_status(dr_link_status): + if dr_link_status in array_settings.HEALTHY_DR_LINK_STATES: + return ReplicationStatus.HEALTHY + if dr_link_status in array_settings.DEGRADED_DR_LINK_STATES: + return ReplicationStatus.DEGRADED + if dr_link_status in array_settings.FAILED_DR_LINK_STATES: + return ReplicationStatus.ERROR + return ReplicationStatus.UNKNOWN + + def _get_recovery_location_index(self, vg_replication): + partition_name = self._getattr_as_str(vg_replication, 'partition_name', '') + + if partition_name: + recovery_idx = '3' + logger.info("Partition VG (partition_name='{}') -> " + "DR location = '{}'".format(partition_name, recovery_idx)) + else: + recovery_idx = '2' + logger.info("Non-partition VG -> DR location = '{}'".format(recovery_idx)) + + return recovery_idx + + def _build_replication_status_message(self, vg_replication, recovery_loc_idx, dr_link_status): + remote_name = self._getattr_as_str(vg_replication, 'location{}_system_name'.format(recovery_loc_idx)) + remote_role = self._getattr_as_str(vg_replication, 'location{}_replication_mode'.format(recovery_loc_idx)) + remote_status = self._getattr_as_str(vg_replication, 'location{}_status'.format(recovery_loc_idx)) + within_rpo = self._getattr_as_str(vg_replication, 'location{}_within_rpo'.format(recovery_loc_idx)) + + return svc_messages.REPLICATION_STATUS_MESSAGE.format( + dr_link_status, remote_name, remote_role, remote_status, within_rpo + ) + + def _build_replication_info(self, vg_replication): + recovery_loc_idx = self._get_recovery_location_index(vg_replication) + + # NOTE: For now we have not decided field from storage to be populated into last_sync_time + # until then using current time + last_sync_time = datetime.now(timezone.utc) + + dr_link_status = self._getattr_as_str(vg_replication, 'dr_link_status', array_settings.UNKNOWN) + proto_status = self._map_dr_link_status_to_proto_status(dr_link_status) + dr_status_message = self._build_replication_status_message(vg_replication, recovery_loc_idx, dr_link_status) + + return ReplicationInfo( + last_sync_time=last_sync_time, + replication_status=proto_status, + status_message=dr_status_message + ) + + def get_last_async_snapshot_info(self, volume_group_id): + logger.info("get_last_async_snapshot_info: called for volume_group_id='{}'".format(volume_group_id)) + + vg_replication = self._lsvolumegroupreplication(volume_group_id) + if not vg_replication: + logger.error( + "No replication record found for " + "volume_group_id='{}'".format(volume_group_id) + ) + raise array_errors.ObjectNotFoundError(volume_group_id) + + return self._build_replication_info(vg_replication) + def _get_replication_policy(self, volume_group_id): volume_group_replication = self._lsvolumegroupreplication(volume_group_id) if not volume_group_replication: diff --git a/controllers/array_action/array_mediator_xiv.py b/controllers/array_action/array_mediator_xiv.py index 6dbcf7341..12cbf4f21 100644 --- a/controllers/array_action/array_mediator_xiv.py +++ b/controllers/array_action/array_mediator_xiv.py @@ -517,6 +517,9 @@ def get_array_fc_wwns(self, host_name): def get_replication(self, replication_request): raise NotImplementedError + def get_last_async_snapshot_info(self, volume_group_id): + raise NotImplementedError + def create_replication(self, replication_request): raise NotImplementedError diff --git a/controllers/array_action/settings.py b/controllers/array_action/settings.py index bbe451629..8cb0d940f 100644 --- a/controllers/array_action/settings.py +++ b/controllers/array_action/settings.py @@ -29,3 +29,24 @@ REGISTRATION_PLUGIN = 'block.csi.ibm.com' ODF_REGISTRATION_PLUGIN = 'odf.ibm.com' MINIMUM_HOURS_BETWEEN_REGISTRATIONS = 2 + +PRODUCTION = ENDPOINT_TYPE_PRODUCTION +RECOVERY = ENDPOINT_TYPE_RECOVERY +UNKNOWN = 'unknown' +NOT_AVAILABLE = 'N/A' + +DR_LINK_STATUS_RUNNING = 'running' +DR_LINK_STATUS_SUSPENDED = 'suspended' +DR_LINK_STATUS_RUNNING_MISSING_CONNECTIVITY = 'running_missing_connectivity' +DR_LINK_STATUS_SUSPENDED_MISSING_CONNECTIVITY = 'suspended_missing_connectivity' +DR_LINK_STATUS_INDEPENDENT = 'independent' +DR_LINK_STATUS_DISCONNECTED = 'disconnected' + +HEALTHY_DR_LINK_STATES = {DR_LINK_STATUS_RUNNING} +DEGRADED_DR_LINK_STATES = { + DR_LINK_STATUS_SUSPENDED, + DR_LINK_STATUS_RUNNING_MISSING_CONNECTIVITY, + DR_LINK_STATUS_SUSPENDED_MISSING_CONNECTIVITY, + DR_LINK_STATUS_INDEPENDENT, +} +FAILED_DR_LINK_STATES = {DR_LINK_STATUS_DISCONNECTED} diff --git a/controllers/array_action/svc_messages.py b/controllers/array_action/svc_messages.py index 9e84e4c13..f4e097f90 100644 --- a/controllers/array_action/svc_messages.py +++ b/controllers/array_action/svc_messages.py @@ -10,3 +10,4 @@ CREATE_HOST_WITHOUT_IO_GROUP = 'Created host {} with port {}' CREATE_HOST_WITH_IO_GROUP = 'Created host {} with port [{}] and with io_group [{}]' CHANGE_HOST_PROTOCOL = 'Changed host {} protocol to: {}' +REPLICATION_STATUS_MESSAGE = 'DR Link: {0} | Remote Site: {1} ({2}) | Remote Site Health: {3} | Within RPO: {4}' diff --git a/controllers/scripts/csi_general/csi_pb2.sh b/controllers/scripts/csi_general/csi_pb2.sh index b72ddc6d3..c46c61e0e 100755 --- a/controllers/scripts/csi_general/csi_pb2.sh +++ b/controllers/scripts/csi_general/csi_pb2.sh @@ -2,7 +2,7 @@ set -x CSI_VERSION="v1.12.0" -ADDONS_VERSION="v0.2.0" +ADDONS_VERSION="main" VG_VERSION="main" PB2_DIR="csi_general" diff --git a/controllers/servers/csi/addons_server.py b/controllers/servers/csi/addons_server.py index 86107e177..5c3c8fb82 100644 --- a/controllers/servers/csi/addons_server.py +++ b/controllers/servers/csi/addons_server.py @@ -1,6 +1,7 @@ import grpc from csi_general import replication_pb2 as pb2 from csi_general import replication_pb2_grpc as pb2_grpc +from google.protobuf.timestamp_pb2 import Timestamp import controllers.servers.settings as servers_settings import controllers.array_action.settings as array_settings @@ -8,7 +9,7 @@ from controllers.array_action.storage_agent import get_agent from controllers.common.csi_logger import get_stdout_logger from controllers.servers import utils -from controllers.servers.csi.decorators import csi_method +from controllers.servers.csi.decorators import csi_method, csi_replication_method from controllers.servers.csi.exception_handler import build_error_response logger = get_stdout_logger() @@ -195,3 +196,49 @@ def _get_replication_object(object_id_info, object_type, array_connection_info, # TODO function name misleading - checks partition_name attribute, not necessarily volume mediator.verify_volume_partition(replication_object, array_connection_info.partition_name) return replication_object + + @csi_replication_method(error_response_type=pb2.GetVolumeReplicationInfoResponse) + def GetVolumeReplicationInfo(self, request, context): + logger.info("GetVolumeReplicationInfo: called with replication_source='{}'".format(request.replication_source)) + + object_type, object_id_info = utils.get_replication_object_type_and_id_info(request) + object_id = object_id_info.ids.internal_id + + utils.validate_secrets(request.secrets) + + if object_type != servers_settings.VOLUME_GROUP_TYPE_NAME: + logger.warning( + "GetVolumeReplicationInfo is only supported for EAR (VolumeGroup level). " + "Returning empty response for object_type='{}'".format(object_type) + ) + return pb2.GetVolumeReplicationInfoResponse() + + connection_info = utils.get_array_connection_info_from_secrets(request.secrets) + + with get_agent(connection_info, object_id_info.array_type).get_mediator() as mediator: + replication_info = mediator.get_last_async_snapshot_info(object_id) + + response = pb2.GetVolumeReplicationInfoResponse() + + if replication_info.last_sync_time is not None: + ts_seconds = int(replication_info.last_sync_time.timestamp()) + response.last_sync_time.CopyFrom(Timestamp(seconds=ts_seconds, nanos=0)) + else: + # NOTE: For now, I have intentionally kept Timestamp(0, 0) when last_sync_time is not available, + # so it is easy to detect and verify case for debugging where storage returns an empty value. + # This corresponds to 1970-01-01T00:00:00Z (Unix time starting point). + response.last_sync_time.CopyFrom(Timestamp(seconds=0, nanos=0)) + logger.warning("last_sync_time not available at storage, Setting to default timestamp (0).") + + response.status = replication_info.replication_status + response.status_message = replication_info.status_message or '' + + logger.info( + "GetVolumeReplicationInfo: returning response last_sync_time.seconds={}, " + "status={}, status_message='{}'".format( + response.last_sync_time.seconds, + response.status, + response.status_message + ) + ) + return response diff --git a/controllers/tests/array_action/svc/array_mediator_svc_test.py b/controllers/tests/array_action/svc/array_mediator_svc_test.py index e97a3aa86..2a3927625 100644 --- a/controllers/tests/array_action/svc/array_mediator_svc_test.py +++ b/controllers/tests/array_action/svc/array_mediator_svc_test.py @@ -12,11 +12,12 @@ import controllers.tests.array_action.svc.test_settings as svc_settings import controllers.tests.array_action.test_settings as array_settings import controllers.tests.common.test_settings as common_settings -from controllers.array_action.array_action_types import ReplicationRequest +from controllers.array_action.array_action_types import ReplicationRequest, ReplicationInfo, ReplicationStatus from controllers.array_action.array_mediator_svc import SVCArrayMediator, build_kwargs_from_parameters, \ FCMAP_STATUS_DONE, YES -from controllers.array_action.settings import REPLICATION_TYPE_MIRROR, REPLICATION_TYPE_EAR, \ - RCRELATIONSHIP_STATE_READY, ENDPOINT_TYPE_PRODUCTION +from controllers.array_action.settings import (REPLICATION_TYPE_MIRROR, REPLICATION_TYPE_EAR, + RCRELATIONSHIP_STATE_READY, ENDPOINT_TYPE_PRODUCTION, + HEALTHY_DR_LINK_STATES, DEGRADED_DR_LINK_STATES, FAILED_DR_LINK_STATES) from controllers.common.node_info import Initiators from controllers.common.settings import ARRAY_TYPE_SVC, SPACE_EFFICIENCY_THIN, SPACE_EFFICIENCY_COMPRESSED, \ SPACE_EFFICIENCY_DEDUPLICATED_COMPRESSED, SPACE_EFFICIENCY_DEDUPLICATED_THIN, SPACE_EFFICIENCY_DEDUPLICATED, \ @@ -2594,3 +2595,92 @@ def test_register_plugin_with_odf_success(self, mock_cache, mock_odf_version, is version='1.7.0') self.svc.client.svctask.registerplugin.assert_has_calls([call_1, call_2]) + + def _mock_vg_replication(self, partition_name='', local_location='1', dr_link_status='running', + loc2_system_name='', loc3_system_name='', loc2_status='', loc3_status='', + loc2_within_rpo='', loc3_within_rpo='', loc2_running_rp='', loc3_running_rp=''): + return Munch({ + 'partition_name': partition_name, + 'local_location': local_location, + 'dr_link_status': dr_link_status, + 'location2_system_name': loc2_system_name, + 'location2_status': loc2_status, + 'location2_within_rpo': loc2_within_rpo, + 'location2_running_recovery_point': loc2_running_rp, + 'location3_system_name': loc3_system_name, + 'location3_status': loc3_status, + 'location3_within_rpo': loc3_within_rpo, + 'location3_running_recovery_point': loc3_running_rp, + 'replication_policy_name': REPLICATION_NAME, + }) + + # --- _get_recovery_location_index tests --- + def test_get_recovery_location_index_non_partition_returns_location2(self): + vg_replication = self._mock_vg_replication(partition_name='') + result = self.svc._get_recovery_location_index(vg_replication) + self.assertEqual('2', result) + + def test_get_recovery_location_index_partition_returns_location3(self): + vg_replication = self._mock_vg_replication(partition_name='SKGPTN0') + result = self.svc._get_recovery_location_index(vg_replication) + self.assertEqual('3', result) + + # --- _map_dr_link_status_to_proto_status tests --- + def test_map_dr_link_status_healthy_states(self): + for status in HEALTHY_DR_LINK_STATES: + with self.subTest(status=status): + result = self.svc._map_dr_link_status_to_proto_status(status) + self.assertEqual(ReplicationStatus.HEALTHY, result) + + def test_map_dr_link_status_degraded_states(self): + for status in DEGRADED_DR_LINK_STATES: + with self.subTest(status=status): + result = self.svc._map_dr_link_status_to_proto_status(status) + self.assertEqual(ReplicationStatus.DEGRADED, result) + + def test_map_dr_link_status_failed_states(self): + for status in FAILED_DR_LINK_STATES: + with self.subTest(status=status): + result = self.svc._map_dr_link_status_to_proto_status(status) + self.assertEqual(ReplicationStatus.ERROR, result) + + def test_map_dr_link_status_unknown_returns_unknown(self): + result = self.svc._map_dr_link_status_to_proto_status('some_unknown_state') + self.assertEqual(ReplicationStatus.UNKNOWN, result) + + # --- _build_replication_info tests --- + def test_build_replication_info(self): + vg_replication = self._mock_vg_replication( + partition_name='', + dr_link_status='running', + loc2_status='healthy', + loc2_within_rpo='yes', + loc2_running_rp='0', + ) + + result = self.svc._build_replication_info(vg_replication) + + self.assertIsInstance(result, ReplicationInfo) + self.assertIsNotNone(result.last_sync_time) + self.assertEqual(ReplicationStatus.HEALTHY, result.replication_status) + self.assertIsNotNone(result.status_message) + + # --- get_last_async_snapshot_info tests --- + def test_get_last_async_snapshot_info_returns_replication_info(self): + vg_replication = self._mock_vg_replication(partition_name='', dr_link_status='running') + self.svc.client.svcinfo.lsvolumegroupreplication.return_value = Mock( + as_single_element=vg_replication) + + result = self.svc.get_last_async_snapshot_info(OBJECT_INTERNAL_ID) + + self.svc.client.svcinfo.lsvolumegroupreplication.assert_called_once_with( + object_id=OBJECT_INTERNAL_ID) + self.assertIsInstance(result, ReplicationInfo) + self.assertIsNotNone(result.last_sync_time) + self.assertEqual(ReplicationStatus.HEALTHY, result.replication_status) + + def test_get_last_async_snapshot_info_no_record_raises_object_not_found(self): + self.svc.client.svcinfo.lsvolumegroupreplication.return_value = Mock( + as_single_element=None) + with self.assertRaises(array_errors.ObjectNotFoundError): + self.svc.get_last_async_snapshot_info(OBJECT_INTERNAL_ID) diff --git a/controllers/tests/controller_server/addons_server_test.py b/controllers/tests/controller_server/addons_server_test.py index 5db76d15f..90de69a68 100644 --- a/controllers/tests/controller_server/addons_server_test.py +++ b/controllers/tests/controller_server/addons_server_test.py @@ -1,12 +1,15 @@ import unittest +from datetime import datetime import grpc from csi_general import replication_pb2 as pb2 from mock import Mock, MagicMock from controllers.servers.settings import PARAMETERS_SYSTEM_ID, PARAMETERS_COPY_TYPE, PARAMETERS_REPLICATION_POLICY -from controllers.array_action.settings import REPLICATION_TYPE_MIRROR, REPLICATION_TYPE_EAR, REPLICATION_COPY_TYPE_SYNC -from controllers.array_action.array_action_types import ReplicationRequest +from controllers.array_action import svc_messages +from controllers.array_action.settings import (REPLICATION_TYPE_MIRROR, REPLICATION_TYPE_EAR, + REPLICATION_COPY_TYPE_SYNC, DR_LINK_STATUS_RUNNING, RECOVERY) +from controllers.array_action.array_action_types import ReplicationRequest, ReplicationInfo, ReplicationStatus from controllers.servers.csi.addons_server import ReplicationControllerServicer from controllers.tests import utils from controllers.tests.common.test_settings import VOLUME_NAME, VOLUME_UID, OBJECT_INTERNAL_ID, \ @@ -343,3 +346,84 @@ def test_resync_replication_succeeds(self): def test_resync_replication_fails(self): self._prepare_replication_mocks() self._test_resync_replication(REPLICATION_TYPE_MIRROR, grpc.StatusCode.FAILED_PRECONDITION) + + +class TestGetVolumeReplicationInfo(BaseReplicationSetUp, CommonControllerTest): + @property + def tested_method(self): + return self.servicer.GetVolumeReplicationInfo + + @property + def tested_method_response_class(self): + return pb2.GetVolumeReplicationInfoResponse + + def _make_replication_info(self, last_sync_time=None, + replication_status=ReplicationStatus.UNKNOWN, status_message=None): + return ReplicationInfo( + last_sync_time=last_sync_time, + replication_status=replication_status, + status_message=status_message, + ) + + def _prepare_get_replication_info(self, replication_info): + self.mediator.get_last_async_snapshot_info.return_value = replication_info + + def test_get_volume_replication_info_all_fields_populated_succeeds(self): + replication_status_message = svc_messages.REPLICATION_STATUS_MESSAGE.format( + DR_LINK_STATUS_RUNNING, "fab3p-118-c", RECOVERY, "healthy", "yes" + ) + + self._prepare_get_replication_info(self._make_replication_info( + last_sync_time=datetime(2025, 4, 22, 4, 16, 47), + replication_status=ReplicationStatus.HEALTHY, + status_message=replication_status_message, + )) + + response = self.servicer.GetVolumeReplicationInfo(self.request, self.context) + + self.assertEqual(grpc.StatusCode.OK, self.context.code) + self.mediator.get_last_async_snapshot_info.assert_called_once_with(OBJECT_INTERNAL_ID) + self.assertNotEqual(0, response.last_sync_time.seconds) + self.assertEqual(ReplicationStatus.HEALTHY, response.status) + self.assertEqual(replication_status_message, response.status_message) + + def test_get_volume_replication_info_all_fields_none_succeeds(self): + self._prepare_get_replication_info(self._make_replication_info()) + + response = self.servicer.GetVolumeReplicationInfo(self.request, self.context) + + self.assertEqual(grpc.StatusCode.OK, self.context.code) + self.mediator.get_last_async_snapshot_info.assert_called_once_with(OBJECT_INTERNAL_ID) + self.assertEqual(0, response.last_sync_time.seconds) + self.assertEqual(ReplicationStatus.UNKNOWN, response.status) + self.assertEqual('', response.status_message) + + def test_get_volume_replication_info_non_volumegroup_source_returns_empty(self): + volume_request = ProtoBufMock() + volume_request.secrets = self.request.secrets + volume_request.volume_id = "{0}:{1};{1}".format("A9000", OBJECT_INTERNAL_ID) + + replication_source = ProtoBufMock(spec=['volume', 'ListFields']) + replication_source.ListFields.return_value = [True] + replication_source.volume.volume_id = volume_request.volume_id + volume_request.replication_source = replication_source + + response = self.servicer.GetVolumeReplicationInfo(volume_request, self.context) + + self.assertEqual(grpc.StatusCode.OK, self.context.code) + self.mediator.get_last_async_snapshot_info.assert_not_called() + self.assertEqual(0, response.last_sync_time.seconds) + self.assertEqual(ReplicationStatus.UNKNOWN, response.status) + self.assertEqual('', response.status_message) + + def test_get_volume_replication_info_already_processing(self): + self._test_request_already_processing( + "replication_source", + self.request.replication_source.volumegroup.volume_group_id + ) + + def test_get_volume_replication_info_with_wrong_secrets(self): + self._test_request_with_wrong_secrets() + + def test_get_volume_replication_info_with_array_connection_exception(self): + self._test_request_with_array_connection_exception()