Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions controllers/array_action/array_action_types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Optional
from datetime import datetime, timedelta

from csi_general import replication_pb2 as pb2
from controllers.common.node_info import Initiators


Expand Down Expand Up @@ -95,3 +99,19 @@ class VolumeGroupIds:

def __bool__(self):
return bool(self.internal_id or self.name)


class ReplicationStatus(IntEnum):
UNKNOWN = pb2.GetVolumeReplicationInfoResponse.UNKNOWN
HEALTHY = pb2.GetVolumeReplicationInfoResponse.HEALTHY
DEGRADED = pb2.GetVolumeReplicationInfoResponse.DEGRADED
ERROR = pb2.GetVolumeReplicationInfoResponse.ERROR


@dataclass
class ReplicationInfo:
last_sync_time: Optional[datetime] = None
last_sync_duration: Optional[timedelta] = None
last_sync_bytes: Optional[int] = None
replication_status: ReplicationStatus = ReplicationStatus.UNKNOWN
status_message: Optional[str] = None
3 changes: 3 additions & 0 deletions controllers/array_action/array_mediator_ds8k.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,9 @@ def get_flashcopy_state(self, flashcopy_id):
def get_replication(self, replication_request):
raise NotImplementedError

def get_last_async_snapshot_info(self, volume_group_id):
raise NotImplementedError

def create_replication(self, replication_request):
raise NotImplementedError

Expand Down
16 changes: 16 additions & 0 deletions controllers/array_action/array_mediator_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,22 @@ def demote_replication_volume(self, replication):
"""
raise NotImplementedError

@abstractmethod
def get_last_async_snapshot_info(self, volume_group_id):
"""
This function will return replication sync info for a volume group.

Args:
volume_group_id : internal id of the volume group (EAR only)

Returns:
ReplicationInfo

Raises:
ObjectNotFoundError
"""
raise NotImplementedError

@abstractmethod
def add_io_group_to_host(self, host_name, io_group):
"""
Expand Down
79 changes: 77 additions & 2 deletions controllers/array_action/array_mediator_svc.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from collections import defaultdict
from io import StringIO
from random import choice, randint
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone

import os
from packaging.version import Version
Expand All @@ -18,7 +18,9 @@
from controllers.array_action import svc_messages
import controllers.servers.settings as controller_settings
from controllers.servers.csi.decorators import register_csi_plugin
from controllers.array_action.array_action_types import Volume, Snapshot, Replication, Host, VolumeGroup, ThinVolume
from controllers.array_action.array_action_types import (Volume, Snapshot, Replication, Host,
VolumeGroup, ThinVolume, ReplicationInfo,
ReplicationStatus)
from controllers.array_action.array_mediator_abstract import ArrayMediatorAbstract
from controllers.array_action.utils import ClassProperty, convert_scsi_id_to_nguid
from controllers.array_action.volume_group_interface import VolumeGroupInterface
Expand Down Expand Up @@ -1778,6 +1780,79 @@ def _get_replication_mode(self, volume_group_id):

return getattr(volume_group_replication, location_attr_name, None)

@staticmethod
def _getattr_as_str(obj, attr, default=array_settings.UNKNOWN):
value = getattr(obj, attr, None)

if value is None:
return default

value = str(value).strip()
return value or default

@staticmethod
def _map_dr_link_status_to_proto_status(dr_link_status):
if dr_link_status in array_settings.HEALTHY_DR_LINK_STATES:
return ReplicationStatus.HEALTHY
if dr_link_status in array_settings.DEGRADED_DR_LINK_STATES:
return ReplicationStatus.DEGRADED
if dr_link_status in array_settings.FAILED_DR_LINK_STATES:
return ReplicationStatus.ERROR
return ReplicationStatus.UNKNOWN

def _get_recovery_location_index(self, vg_replication):
partition_name = self._getattr_as_str(vg_replication, 'partition_name', '')

if partition_name:
recovery_idx = '3'
logger.info("Partition VG (partition_name='{}') -> "
"DR location = '{}'".format(partition_name, recovery_idx))
else:
recovery_idx = '2'
logger.info("Non-partition VG -> DR location = '{}'".format(recovery_idx))

return recovery_idx

def _build_replication_status_message(self, vg_replication, recovery_loc_idx, dr_link_status):
remote_name = self._getattr_as_str(vg_replication, 'location{}_system_name'.format(recovery_loc_idx))
remote_role = self._getattr_as_str(vg_replication, 'location{}_replication_mode'.format(recovery_loc_idx))
remote_status = self._getattr_as_str(vg_replication, 'location{}_status'.format(recovery_loc_idx))
within_rpo = self._getattr_as_str(vg_replication, 'location{}_within_rpo'.format(recovery_loc_idx))

return svc_messages.REPLICATION_STATUS_MESSAGE.format(
dr_link_status, remote_name, remote_role, remote_status, within_rpo
)

def _build_replication_info(self, vg_replication):
recovery_loc_idx = self._get_recovery_location_index(vg_replication)

# NOTE: For now we have not decided field from storage to be populated into last_sync_time
# until then using current time
last_sync_time = datetime.now(timezone.utc)

dr_link_status = self._getattr_as_str(vg_replication, 'dr_link_status', array_settings.UNKNOWN)
proto_status = self._map_dr_link_status_to_proto_status(dr_link_status)
dr_status_message = self._build_replication_status_message(vg_replication, recovery_loc_idx, dr_link_status)

return ReplicationInfo(
last_sync_time=last_sync_time,
replication_status=proto_status,
status_message=dr_status_message
)

def get_last_async_snapshot_info(self, volume_group_id):
logger.info("get_last_async_snapshot_info: called for volume_group_id='{}'".format(volume_group_id))

vg_replication = self._lsvolumegroupreplication(volume_group_id)
if not vg_replication:
logger.error(
"No replication record found for "
"volume_group_id='{}'".format(volume_group_id)
)
raise array_errors.ObjectNotFoundError(volume_group_id)

return self._build_replication_info(vg_replication)

def _get_replication_policy(self, volume_group_id):
volume_group_replication = self._lsvolumegroupreplication(volume_group_id)
if not volume_group_replication:
Expand Down
3 changes: 3 additions & 0 deletions controllers/array_action/array_mediator_xiv.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,9 @@ def get_array_fc_wwns(self, host_name):
def get_replication(self, replication_request):
raise NotImplementedError

def get_last_async_snapshot_info(self, volume_group_id):
raise NotImplementedError

def create_replication(self, replication_request):
raise NotImplementedError

Expand Down
21 changes: 21 additions & 0 deletions controllers/array_action/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,24 @@
REGISTRATION_PLUGIN = 'block.csi.ibm.com'
ODF_REGISTRATION_PLUGIN = 'odf.ibm.com'
MINIMUM_HOURS_BETWEEN_REGISTRATIONS = 2

PRODUCTION = ENDPOINT_TYPE_PRODUCTION
RECOVERY = ENDPOINT_TYPE_RECOVERY
UNKNOWN = 'unknown'
NOT_AVAILABLE = 'N/A'

DR_LINK_STATUS_RUNNING = 'running'
DR_LINK_STATUS_SUSPENDED = 'suspended'
DR_LINK_STATUS_RUNNING_MISSING_CONNECTIVITY = 'running_missing_connectivity'
DR_LINK_STATUS_SUSPENDED_MISSING_CONNECTIVITY = 'suspended_missing_connectivity'
DR_LINK_STATUS_INDEPENDENT = 'independent'
DR_LINK_STATUS_DISCONNECTED = 'disconnected'

HEALTHY_DR_LINK_STATES = {DR_LINK_STATUS_RUNNING}
DEGRADED_DR_LINK_STATES = {
DR_LINK_STATUS_SUSPENDED,
DR_LINK_STATUS_RUNNING_MISSING_CONNECTIVITY,
DR_LINK_STATUS_SUSPENDED_MISSING_CONNECTIVITY,
DR_LINK_STATUS_INDEPENDENT,
}
FAILED_DR_LINK_STATES = {DR_LINK_STATUS_DISCONNECTED}
1 change: 1 addition & 0 deletions controllers/array_action/svc_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
CREATE_HOST_WITHOUT_IO_GROUP = 'Created host {} with port {}'
CREATE_HOST_WITH_IO_GROUP = 'Created host {} with port [{}] and with io_group [{}]'
CHANGE_HOST_PROTOCOL = 'Changed host {} protocol to: {}'
REPLICATION_STATUS_MESSAGE = 'DR Link: {0} | Remote Site: {1} ({2}) | Remote Site Health: {3} | Within RPO: {4}'
2 changes: 1 addition & 1 deletion controllers/scripts/csi_general/csi_pb2.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set -x

CSI_VERSION="v1.12.0"
ADDONS_VERSION="v0.2.0"
ADDONS_VERSION="main"
VG_VERSION="main"
PB2_DIR="csi_general"

Expand Down
49 changes: 48 additions & 1 deletion controllers/servers/csi/addons_server.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import grpc
from csi_general import replication_pb2 as pb2
from csi_general import replication_pb2_grpc as pb2_grpc
from google.protobuf.timestamp_pb2 import Timestamp

import controllers.servers.settings as servers_settings
import controllers.array_action.settings as array_settings
from controllers.array_action import errors as array_errors
from controllers.array_action.storage_agent import get_agent
from controllers.common.csi_logger import get_stdout_logger
from controllers.servers import utils
from controllers.servers.csi.decorators import csi_method
from controllers.servers.csi.decorators import csi_method, csi_replication_method
from controllers.servers.csi.exception_handler import build_error_response

logger = get_stdout_logger()
Expand Down Expand Up @@ -195,3 +196,49 @@ def _get_replication_object(object_id_info, object_type, array_connection_info,
# TODO function name misleading - checks partition_name attribute, not necessarily volume
mediator.verify_volume_partition(replication_object, array_connection_info.partition_name)
return replication_object

@csi_replication_method(error_response_type=pb2.GetVolumeReplicationInfoResponse)
def GetVolumeReplicationInfo(self, request, context):
logger.info("GetVolumeReplicationInfo: called with replication_source='{}'".format(request.replication_source))

object_type, object_id_info = utils.get_replication_object_type_and_id_info(request)
object_id = object_id_info.ids.internal_id

utils.validate_secrets(request.secrets)

if object_type != servers_settings.VOLUME_GROUP_TYPE_NAME:
logger.warning(
"GetVolumeReplicationInfo is only supported for EAR (VolumeGroup level). "
"Returning empty response for object_type='{}'".format(object_type)
)
return pb2.GetVolumeReplicationInfoResponse()

connection_info = utils.get_array_connection_info_from_secrets(request.secrets)

with get_agent(connection_info, object_id_info.array_type).get_mediator() as mediator:
replication_info = mediator.get_last_async_snapshot_info(object_id)

response = pb2.GetVolumeReplicationInfoResponse()

if replication_info.last_sync_time is not None:
ts_seconds = int(replication_info.last_sync_time.timestamp())
response.last_sync_time.CopyFrom(Timestamp(seconds=ts_seconds, nanos=0))
else:
# NOTE: For now, I have intentionally kept Timestamp(0, 0) when last_sync_time is not available,
# so it is easy to detect and verify case for debugging where storage returns an empty value.
# This corresponds to 1970-01-01T00:00:00Z (Unix time starting point).
response.last_sync_time.CopyFrom(Timestamp(seconds=0, nanos=0))
logger.warning("last_sync_time not available at storage, Setting to default timestamp (0).")

response.status = replication_info.replication_status
response.status_message = replication_info.status_message or ''

logger.info(
"GetVolumeReplicationInfo: returning response last_sync_time.seconds={}, "
"status={}, status_message='{}'".format(
response.last_sync_time.seconds,
response.status,
response.status_message
)
)
return response
96 changes: 93 additions & 3 deletions controllers/tests/array_action/svc/array_mediator_svc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
import controllers.tests.array_action.svc.test_settings as svc_settings
import controllers.tests.array_action.test_settings as array_settings
import controllers.tests.common.test_settings as common_settings
from controllers.array_action.array_action_types import ReplicationRequest
from controllers.array_action.array_action_types import ReplicationRequest, ReplicationInfo, ReplicationStatus
from controllers.array_action.array_mediator_svc import SVCArrayMediator, build_kwargs_from_parameters, \
FCMAP_STATUS_DONE, YES
from controllers.array_action.settings import REPLICATION_TYPE_MIRROR, REPLICATION_TYPE_EAR, \
RCRELATIONSHIP_STATE_READY, ENDPOINT_TYPE_PRODUCTION
from controllers.array_action.settings import (REPLICATION_TYPE_MIRROR, REPLICATION_TYPE_EAR,
RCRELATIONSHIP_STATE_READY, ENDPOINT_TYPE_PRODUCTION,
HEALTHY_DR_LINK_STATES, DEGRADED_DR_LINK_STATES, FAILED_DR_LINK_STATES)
from controllers.common.node_info import Initiators
from controllers.common.settings import ARRAY_TYPE_SVC, SPACE_EFFICIENCY_THIN, SPACE_EFFICIENCY_COMPRESSED, \
SPACE_EFFICIENCY_DEDUPLICATED_COMPRESSED, SPACE_EFFICIENCY_DEDUPLICATED_THIN, SPACE_EFFICIENCY_DEDUPLICATED, \
Expand Down Expand Up @@ -2594,3 +2595,92 @@ def test_register_plugin_with_odf_success(self, mock_cache, mock_odf_version, is
version='1.7.0')

self.svc.client.svctask.registerplugin.assert_has_calls([call_1, call_2])

def _mock_vg_replication(self, partition_name='', local_location='1', dr_link_status='running',
loc2_system_name='', loc3_system_name='', loc2_status='', loc3_status='',
loc2_within_rpo='', loc3_within_rpo='', loc2_running_rp='', loc3_running_rp=''):
return Munch({
'partition_name': partition_name,
'local_location': local_location,
'dr_link_status': dr_link_status,
'location2_system_name': loc2_system_name,
'location2_status': loc2_status,
'location2_within_rpo': loc2_within_rpo,
'location2_running_recovery_point': loc2_running_rp,
'location3_system_name': loc3_system_name,
'location3_status': loc3_status,
'location3_within_rpo': loc3_within_rpo,
'location3_running_recovery_point': loc3_running_rp,
'replication_policy_name': REPLICATION_NAME,
})

# --- _get_recovery_location_index tests ---
def test_get_recovery_location_index_non_partition_returns_location2(self):
vg_replication = self._mock_vg_replication(partition_name='')
result = self.svc._get_recovery_location_index(vg_replication)
self.assertEqual('2', result)

def test_get_recovery_location_index_partition_returns_location3(self):
vg_replication = self._mock_vg_replication(partition_name='SKGPTN0')
result = self.svc._get_recovery_location_index(vg_replication)
self.assertEqual('3', result)

# --- _map_dr_link_status_to_proto_status tests ---
def test_map_dr_link_status_healthy_states(self):
for status in HEALTHY_DR_LINK_STATES:
with self.subTest(status=status):
result = self.svc._map_dr_link_status_to_proto_status(status)
self.assertEqual(ReplicationStatus.HEALTHY, result)

def test_map_dr_link_status_degraded_states(self):
for status in DEGRADED_DR_LINK_STATES:
with self.subTest(status=status):
result = self.svc._map_dr_link_status_to_proto_status(status)
self.assertEqual(ReplicationStatus.DEGRADED, result)

def test_map_dr_link_status_failed_states(self):
for status in FAILED_DR_LINK_STATES:
with self.subTest(status=status):
result = self.svc._map_dr_link_status_to_proto_status(status)
self.assertEqual(ReplicationStatus.ERROR, result)

def test_map_dr_link_status_unknown_returns_unknown(self):
result = self.svc._map_dr_link_status_to_proto_status('some_unknown_state')
self.assertEqual(ReplicationStatus.UNKNOWN, result)

# --- _build_replication_info tests ---
def test_build_replication_info(self):
vg_replication = self._mock_vg_replication(
partition_name='',
dr_link_status='running',
loc2_status='healthy',
loc2_within_rpo='yes',
loc2_running_rp='0',
)

result = self.svc._build_replication_info(vg_replication)

self.assertIsInstance(result, ReplicationInfo)
self.assertIsNotNone(result.last_sync_time)
self.assertEqual(ReplicationStatus.HEALTHY, result.replication_status)
self.assertIsNotNone(result.status_message)

# --- get_last_async_snapshot_info tests ---
def test_get_last_async_snapshot_info_returns_replication_info(self):
vg_replication = self._mock_vg_replication(partition_name='', dr_link_status='running')
self.svc.client.svcinfo.lsvolumegroupreplication.return_value = Mock(
as_single_element=vg_replication)

result = self.svc.get_last_async_snapshot_info(OBJECT_INTERNAL_ID)

self.svc.client.svcinfo.lsvolumegroupreplication.assert_called_once_with(
object_id=OBJECT_INTERNAL_ID)
self.assertIsInstance(result, ReplicationInfo)
self.assertIsNotNone(result.last_sync_time)
self.assertEqual(ReplicationStatus.HEALTHY, result.replication_status)

def test_get_last_async_snapshot_info_no_record_raises_object_not_found(self):
self.svc.client.svcinfo.lsvolumegroupreplication.return_value = Mock(
as_single_element=None)
with self.assertRaises(array_errors.ObjectNotFoundError):
self.svc.get_last_async_snapshot_info(OBJECT_INTERNAL_ID)
Loading