-
Notifications
You must be signed in to change notification settings - Fork 30
Expand file tree
/
Copy patharray_mediator_svc.py
More file actions
2847 lines (2497 loc) · 142 KB
/
Copy patharray_mediator_svc.py
File metadata and controls
2847 lines (2497 loc) · 142 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from collections import defaultdict
from io import StringIO
from random import choice, randint
from datetime import datetime, timedelta, timezone
import time
import os
from packaging.version import Version
from pysvc import errors as svc_errors
from pysvc.unified.client import connect
from pysvc.unified.response import CLIFailureError, SVCResponse
from retry import retry
from controllers.servers.host_definer import settings
from controllers.common.config import config
import controllers.array_action.errors as array_errors
import controllers.array_action.settings as array_settings
from controllers.array_action.registration_cache import SVC_REGISTRATION_CACHE
from controllers.array_action import svc_messages
import controllers.servers.settings as controller_settings
from controllers.servers.csi.decorators import register_csi_plugin
from controllers.array_action.array_action_types import (Volume, Snapshot, Replication, Host,
VolumeGroup, ThinVolume, ReplicationInfo,
ReplicationStatus)
from controllers.array_action.array_mediator_abstract import ArrayMediatorAbstract
from controllers.array_action.utils import ClassProperty, convert_scsi_id_to_nguid
from controllers.array_action.volume_group_interface import VolumeGroupInterface
from controllers.common import settings as common_settings
from controllers.common.csi_logger import get_stdout_logger
from controllers.servers.utils import (get_connectivity_type_ports,
split_string,
is_call_home_enabled,
get_odf_call_home_version,
get_volume_id)
from controllers.servers.settings import UNIQUE_KEY_KEY
from controllers.servers.errors import ValidationException
from controllers.servers import messages
array_connections_dict = {}
logger = get_stdout_logger()
OBJ_NOT_FOUND = 'CMMVC5753E'
SNAPSHOT_NOT_EXIST = 'CMMVC9755E'
NAME_NOT_EXIST_OR_MEET_RULES = 'CMMVC5754E'
NON_ASCII_CHARS = 'CMMVC6017E'
INVALID_NAME = 'CMMVC6527E'
TOO_MANY_CHARS = 'CMMVC5738E'
VALUE_TOO_LONG = 'CMMVC5703E'
OBJECT_NOT_OF_TYPE_OF_HOST = 'CMMVC9699E'
ENTITY_DOES_NOT_EXIST = 'CMMVC5868E'
INVALID_FILTER_VALUE = 'CMMVC5741E'
SPECIFIED_OBJ_NOT_EXIST = 'CMMVC5804E'
LUN_ALREADY_IN_USE = 'CMMVC5879E'
VOL_ALREADY_UNMAPPED = 'CMMVC5842E'
OBJ_ALREADY_EXIST = 'CMMVC6035E'
FC_PORT_IS_NOT_VALID = 'CMMVC5867E'
ISCSI_PORT_IS_NOT_VALID = 'CMMVC6578E'
NVME_PORT_IS_ALREADY_ASSIGNED = 'CMMVC9328E'
FCMAP_ALREADY_EXIST = 'CMMVC6466E'
FCMAP_ALREADY_COPYING = 'CMMVC5907E'
FCMAP_ALREADY_IN_THE_STOPPED_STATE = 'CMMVC5912E'
VOL_NOT_FOUND = 'CMMVC8957E'
POOL_NOT_MATCH_VOL_SPACE_EFFICIENCY = 'CMMVC9292E'
NOT_CHILD_POOL = 'CMMVC9760E'
NOT_REDUCTION_POOL = 'CMMVC9301E'
NOT_ENOUGH_EXTENTS_IN_POOL_EXPAND = 'CMMVC5860E'
NOT_ENOUGH_EXTENTS_IN_POOL_CREATE = 'CMMVC8710E'
NOT_VALID_IO_GROUP = 'CMMVC5729E'
NOT_SUPPORTED_PARAMETER = 'CMMVC5709E'
CANNOT_CHANGE_HOST_PROTOCOL_BECAUSE_OF_MAPPED_PORTS = 'CMMVC9331E'
COMMAND_NOT_SUPPORTED = 'CMMVC7205E'
LUN_ID_IS_NOT_VALID = 'CMMVC5844E'
EAR_PROMOTE_REMOTE_NOT_READY = 'CMMVC1150E'
EAR_PROMOTE_REMOTE_INTERNAL_ERROR = 'CMMVC9913E'
HOST_NQN = 'nqn'
HOST_WWPN = 'WWPN'
HOST_ISCSI_NAME = 'iscsi_name'
HOST_PORTSET_ID = 'portset_id'
LIST_HOSTS_CMD_FORMAT = 'lshost {HOST_ID};echo;'
HOSTS_LIST_ERR_MSG_MAX_LENGTH = 300
LUN_INTERVAL = 128
FCMAP_STATUS_DONE = 'idle_or_copied'
YES = 'yes'
ENDPOINT_TYPE_SOURCE = 'source'
ENDPOINT_TYPE_TARGET = 'target'
ENDPOINT_TYPE_MASTER = 'master'
ENDPOINT_TYPE_AUX = 'aux'
def is_warning_message(exception):
""" Return True if the exception message is warning """
info_seperated_by_quotation = str(exception).split('"')
message = info_seperated_by_quotation[1]
word_in_message = message.split()
message_tag = word_in_message[0]
if message_tag[-1] == 'W':
return True
return False
def _get_space_efficiency_kwargs(space_efficiency):
if space_efficiency:
space_efficiency = space_efficiency.lower()
if space_efficiency == common_settings.SPACE_EFFICIENCY_THIN:
return {'thin': True}
if space_efficiency == common_settings.SPACE_EFFICIENCY_COMPRESSED:
return {'compressed': True}
if space_efficiency == common_settings.SPACE_EFFICIENCY_DEDUPLICATED_THIN:
return {'deduplicated': True, 'thin': True}
if space_efficiency in (common_settings.SPACE_EFFICIENCY_DEDUPLICATED,
common_settings.SPACE_EFFICIENCY_DEDUPLICATED_COMPRESSED):
return {'deduplicated': True, 'compressed': True}
return {}
def _is_space_efficiency_matches_source(parameter_space_efficiency, array_space_efficiency):
return (not parameter_space_efficiency and array_space_efficiency == common_settings.SPACE_EFFICIENCY_THICK) or \
(parameter_space_efficiency and parameter_space_efficiency == array_space_efficiency)
def build_create_volume_in_volume_group_kwargs(pool, io_group, source_id):
cli_kwargs = {
'type': 'clone',
'fromsnapshotid': source_id,
'pool': pool
}
if io_group:
cli_kwargs['iogroup'] = io_group
return cli_kwargs
def _add_port_to_command_kwargs(connectivity_type, port, cli_kwargs):
if connectivity_type == array_settings.NVME_OVER_FC_CONNECTIVITY_TYPE:
cli_kwargs['nqn'] = port
elif connectivity_type == array_settings.FC_CONNECTIVITY_TYPE:
cli_kwargs['fcwwpn'] = port
elif connectivity_type == array_settings.ISCSI_CONNECTIVITY_TYPE:
cli_kwargs['iscsiname'] = port
else:
raise array_errors.UnsupportedConnectivityTypeError(connectivity_type)
return cli_kwargs
def build_create_host_kwargs(host_name, connectivity_type, port, io_group, partition_name, port_set):
cli_kwargs = {'name': host_name}
cli_kwargs = _add_port_to_command_kwargs(connectivity_type, port, cli_kwargs)
if connectivity_type == array_settings.NVME_OVER_FC_CONNECTIVITY_TYPE:
cli_kwargs['protocol'] = 'fcnvme'
if io_group:
cli_kwargs['iogrp'] = io_group
else:
if not partition_name:
cli_kwargs['iogrp'] = common_settings.FULL_IO_GROUP
if not port_set:
port_set = os.getenv(settings.PORT_SET_ENV_VAR)
if port_set:
logger.info("host {} in partition {} is created with port set {}".format(host_name, partition_name, port_set))
cli_kwargs['portset'] = port_set
if partition_name:
cli_kwargs['partition'] = partition_name
return cli_kwargs
def build_host_port_command_kwargs(host_name, connectivity_type, port):
cli_kwargs = {'host_name': host_name}
return _add_port_to_command_kwargs(connectivity_type, port, cli_kwargs)
def build_kwargs_from_parameters(space_efficiency, pool_name, io_group,
volume_group, volume_name, volume_size):
cli_kwargs = {}
cli_kwargs.update({
'name': volume_name,
'unit': 'b',
'size': volume_size,
'pool': pool_name
})
space_efficiency_kwargs = _get_space_efficiency_kwargs(space_efficiency)
cli_kwargs.update(space_efficiency_kwargs)
if io_group:
cli_kwargs['iogrp'] = io_group
if volume_group:
cli_kwargs['volumegroup'] = volume_group
return cli_kwargs
def build_create_replication_kwargs(master_cli_volume_id, aux_cli_volume_id, other_system_id, copy_type):
cli_kwargs = {
'master': master_cli_volume_id,
'aux': aux_cli_volume_id,
'cluster': other_system_id,
}
if copy_type == array_settings.REPLICATION_COPY_TYPE_ASYNC:
cli_kwargs.update({'global': True})
return cli_kwargs
def build_start_replication_kwargs(rcrelationship_id, primary_endpoint_type, force):
cli_kwargs = {'object_id': rcrelationship_id}
if primary_endpoint_type:
cli_kwargs.update({'primary': primary_endpoint_type})
if force:
cli_kwargs.update({'force': True})
return cli_kwargs
def build_stop_replication_kwargs(rcrelationship_id, add_access):
cli_kwargs = {'object_id': rcrelationship_id}
if add_access:
cli_kwargs.update({'access': True})
return cli_kwargs
def build_change_host_protocol_kwargs(host_name, protocol):
return {
'object_id': host_name,
'protocol': protocol
}
def build_register_plugin_kwargs(unique_key, metadata, version):
cli_kwargs = {
UNIQUE_KEY_KEY: unique_key,
array_settings.VERSION_KEY: version
}
if metadata:
cli_kwargs[array_settings.METADATA_KEY] = metadata
return cli_kwargs
def _get_cli_volume_space_efficiency_aliases(cli_volume):
logger.info("cli_volume {}".format(str(cli_volume)))
space_efficiency_aliases = {common_settings.SPACE_EFFICIENCY_THICK, ''}
if cli_volume.se_copy == YES:
space_efficiency_aliases = {common_settings.SPACE_EFFICIENCY_THIN}
if cli_volume.compressed_copy == YES:
space_efficiency_aliases = {common_settings.SPACE_EFFICIENCY_COMPRESSED}
if hasattr(cli_volume, "deduplicated_copy"):
if cli_volume.deduplicated_copy == YES:
if cli_volume.se_copy == YES:
space_efficiency_aliases = {common_settings.SPACE_EFFICIENCY_DEDUPLICATED_THIN}
else:
space_efficiency_aliases = {common_settings.SPACE_EFFICIENCY_DEDUPLICATED_COMPRESSED,
common_settings.SPACE_EFFICIENCY_DEDUPLICATED}
return space_efficiency_aliases
def _get_ssh_port_from_environment():
return int(os.environ.get('SVC_SSH_PORT', '22'))
class SVCArrayMediator(ArrayMediatorAbstract, VolumeGroupInterface):
ARRAY_ACTIONS = {}
BLOCK_SIZE_IN_BYTES = 512
MAX_LUN_NUMBER = 511
MAX_LUN_NUMBER_INCREMENT = 512
MIN_LUN_NUMBER = 0
MIN_SUPPORTED_VERSION = '7.8'
@ClassProperty
def array_type(self):
return common_settings.ARRAY_TYPE_SVC
@ClassProperty
def port(self):
return _get_ssh_port_from_environment()
@classmethod
def is_custom_port(cls):
return _get_ssh_port_from_environment() != 22
@ClassProperty
def max_object_name_length(self):
return 63
@ClassProperty
def max_object_prefix_length(self):
return 20
@ClassProperty
def max_connections(self):
return 2
@ClassProperty
def minimal_volume_size_in_bytes(self):
return 512 # 512 Bytes
@ClassProperty
def maximal_volume_size_in_bytes(self):
return 256 * 1024 * 1024 * 1024 * 1024
@ClassProperty
def max_lun_retries(self):
return 10
@ClassProperty
def default_object_prefix(self):
return "CSI"
def __init__(self, user, password, endpoint):
super().__init__(user, password, endpoint)
self.client = None
# SVC only accept one IP address
if len(endpoint) == 0 or len(endpoint) > 1:
logger.error("SVC only support one cluster IP")
raise array_errors.StorageManagementIPsNotSupportError(
endpoint)
self.endpoint = self.endpoint[0]
self._cluster = None
# In-memory map to track demote operations: VG_ID -> first_demote_timestamp
self._demote_state_map = {}
logger.debug("in init")
self._connect()
def _connect(self):
logger.debug("Connecting to SVC {0}".format(self.endpoint))
try:
self.client = connect(self.endpoint, username=self.user,
password=self.password, port=self.port)
if Version(self._code_level) < Version(self.MIN_SUPPORTED_VERSION):
raise array_errors.UnsupportedStorageVersionError(
self._code_level, self.MIN_SUPPORTED_VERSION
)
except (svc_errors.IncorrectCredentials,
svc_errors.StorageArrayClientException):
raise array_errors.CredentialsError(self.endpoint)
def disconnect(self):
if self.client:
self.client.close()
@property
def _system_info(self):
if self._cluster is None:
try:
for cluster in self.client.svcinfo.lssystem():
if cluster.location == 'local':
self._cluster = cluster
except (svc_errors.CommandExecutionError, CLIFailureError) as ex:
logger.debug("Error running lssystem")
raise ex
return self._cluster
@property
def _code_level(self):
return self._system_info.code_level.split(None, 1)[0]
@property
def identifier(self):
return self._system_info.id_alias
def is_active(self):
return self.client.transport.transport.get_transport().is_active()
def _get_partition_name_of_cli_volume(self, cli_volume):
if not cli_volume.volume_group_name:
return None
cli_volume_group = self._lsvolumegroup(cli_volume.volume_group_name)
if cli_volume_group is not None and hasattr(cli_volume_group, "partition_name") \
and cli_volume_group.partition_name:
return cli_volume_group.partition_name
return None
def _generate_volume_response(self, cli_volume, is_virt_snap_func=False):
pool = self._get_volume_pool(cli_volume)
source_id = None
partition_name = self._get_partition_name_of_cli_volume(cli_volume)
if partition_name:
source_id = self._get_wwn_by_volume_name_if_exists(cli_volume.source_volume_name)
elif not is_virt_snap_func:
source_id = self._get_source_volume_wwn_if_exists(cli_volume)
space_efficiency = _get_cli_volume_space_efficiency_aliases(cli_volume)
return Volume(
capacity_bytes=int(cli_volume.capacity),
id=cli_volume.vdisk_UID,
internal_id=cli_volume.id,
name=cli_volume.name,
array_address=self.endpoint,
pool=pool,
source_id=source_id,
array_type=self.array_type,
space_efficiency_aliases=space_efficiency,
volume_group_id=cli_volume.volume_group_id,
volume_group_name=cli_volume.volume_group_name,
partition_name=partition_name
)
def _generate_snapshot_response_from_cli_volume(self, cli_volume, source_id, partition_name):
return self._generate_snapshot_response(cli_volume.capacity, cli_volume.name, source_id, cli_volume.id,
cli_volume.vdisk_UID,
partition_name)
def _generate_snapshot_response_from_cli_snapshot(self, cli_snapshot, source_cli_volume):
return self._generate_snapshot_response(source_cli_volume.capacity, cli_snapshot.snapshot_name,
source_cli_volume.vdisk_UID, cli_snapshot.snapshot_id, '',
None)
def _generate_snapshot_response(self, capacity, name, source_id, internal_id, vdisk_uid='', partition_name=None):
return Snapshot(
capacity_bytes=int(capacity),
name=name,
source_id=source_id,
internal_id=internal_id,
id=vdisk_uid,
array_address=self.endpoint,
is_ready=True,
array_type=self.array_type,
partition_name=partition_name
)
def _generate_snapshot_response_with_verification(self, cli_object):
partition_name = self._get_partition_name_of_cli_volume(cli_object)
if partition_name:
# Convert cli_volume from concise to full view for the source_ fields
cli_vol = self._get_cli_volume(cli_object.name)
if cli_vol is None:
raise array_errors.ObjectNotFoundError(cli_object.id)
cli_object = cli_vol
source_id = self._get_wwn_by_volume_name_if_exists(cli_object.source_volume_name)
return self._generate_snapshot_response_from_cli_volume(cli_object, source_id, partition_name)
if not cli_object.FC_id:
logger.error("FlashCopy Mapping not found for target volume: {}".format(cli_object.name))
raise array_errors.ExpectedSnapshotButFoundVolumeError(cli_object.name, self.endpoint)
fcmap = self._get_fcmap_as_target_if_exists(cli_object.name)
if fcmap is None or fcmap.copy_rate != '0':
raise array_errors.ExpectedSnapshotButFoundVolumeError(cli_object.name, self.endpoint)
source_id = self._get_wwn_by_volume_name_if_exists(fcmap.source_vdisk_name)
return self._generate_snapshot_response_from_cli_volume(cli_object, source_id, None)
def _lsvdisk_single_element(self, **kwargs):
lsvdisk_response = self._lsvdisk(**kwargs)
if lsvdisk_response is None:
return None
return lsvdisk_response.as_single_element
def _lsvdisk_list(self, **kwargs):
lsvdisk_response = self._lsvdisk(**kwargs)
if lsvdisk_response is None:
return None
return lsvdisk_response.as_list
def _lsvdisk(self, **kwargs):
kwargs['bytes'] = True
try:
return self.client.svcinfo.lsvdisk(**kwargs)
except (svc_errors.CommandExecutionError, CLIFailureError) as ex:
if (OBJ_NOT_FOUND in ex.my_message or
NAME_NOT_EXIST_OR_MEET_RULES in ex.my_message):
logger.info("volume not found")
return None
if any(msg_id in ex.my_message for msg_id in (NON_ASCII_CHARS, VALUE_TOO_LONG, INVALID_FILTER_VALUE)):
raise array_errors.InvalidArgumentError(ex.my_message)
raise ex
def _lsvolumegroup(self, id_or_name, not_exist_err=False):
try:
return self.client.svcinfo.lsvolumegroup(object_id=id_or_name).as_single_element
except (svc_errors.CommandExecutionError, CLIFailureError) as ex:
if (SPECIFIED_OBJ_NOT_EXIST in ex.my_message or
NAME_NOT_EXIST_OR_MEET_RULES in ex.my_message):
logger.info("volume group {} was not found".format(id_or_name))
if not_exist_err:
raise array_errors.ObjectNotFoundError(id_or_name)
return None
if any(msg_id in ex.my_message for msg_id in (NON_ASCII_CHARS, VALUE_TOO_LONG)):
raise array_errors.InvalidArgumentError(ex.my_message)
raise ex
def _format_cli_args(self, cli_kwargs):
return ' '.join(f'-{k} {v}' for k, v in cli_kwargs.items())
def _chvolumegroup(self, id_or_name, **cli_kwargs):
try:
self.client.svctask.chvolumegroup(object_id=id_or_name, **cli_kwargs)
except (svc_errors.CommandExecutionError, CLIFailureError) as ex:
logger.debug("Error running chvolumegroup -object_id {} {}".format(id_or_name,
self._format_cli_args(cli_kwargs)))
if is_warning_message(ex.my_message):
logger.warning(
"exception encountered while changing volume group '{}': {}".format(cli_kwargs, ex.my_message))
else:
if OBJ_ALREADY_EXIST in ex.my_message:
raise array_errors.VolumeAlreadyExists(cli_kwargs, self.endpoint)
raise ex
def _lsvolumegroupreplication(self, id_or_name):
try:
return self.client.svcinfo.lsvolumegroupreplication(object_id=id_or_name).as_single_element
except (svc_errors.CommandExecutionError, CLIFailureError) as ex:
if (SPECIFIED_OBJ_NOT_EXIST in ex.my_message or
NAME_NOT_EXIST_OR_MEET_RULES in ex.my_message):
logger.info("volume group replication {} was not found".format(id_or_name))
return None
if any(msg_id in ex.my_message for msg_id in (NON_ASCII_CHARS, VALUE_TOO_LONG)):
raise array_errors.InvalidArgumentError(ex.my_message)
raise ex
def _chvolumegroupreplication(self, id_or_name, **cli_kwargs):
try:
self.client.svctask.chvolumegroupreplication(object_id=id_or_name, **cli_kwargs)
except (svc_errors.CommandExecutionError, CLIFailureError) as ex:
logger.debug("Error running chvolumegroupreplication -object_id {} {}".
format(id_or_name, self._format_cli_args(cli_kwargs)))
if is_warning_message(ex.my_message):
logger.warning(
"exception encountered while changing volume parameters '{}': {}".format(cli_kwargs, ex.my_message))
else:
if OBJ_ALREADY_EXIST in ex.my_message:
raise array_errors.VolumeAlreadyExists(cli_kwargs, self.endpoint)
raise ex
def _get_cli_volume(self, volume_name, not_exist_err=True):
cli_volume = self._lsvdisk_single_element(object_id=volume_name)
if not cli_volume and not_exist_err:
raise array_errors.ObjectNotFoundError(volume_name)
return cli_volume
def _get_cli_volume_if_exists(self, volume_name):
cli_volume = self._get_cli_volume(volume_name, not_exist_err=False)
logger.debug("cli volume returned : {}".format(cli_volume))
return cli_volume
def _get_fcmap_as_target_if_exists(self, volume_name):
fcmaps_as_target = self._get_fcmaps(volume_name, ENDPOINT_TYPE_TARGET)
if len(fcmaps_as_target) != 1:
return None
return fcmaps_as_target[0]
def _get_fcmaps_as_source_if_exist(self, volume_name):
return self._get_fcmaps(volume_name, ENDPOINT_TYPE_SOURCE)
def _get_source_volume_wwn_if_exists(self, target_cli_object):
fcmap = self._get_fcmap_as_target_if_exists(target_cli_object.name)
if not fcmap:
return None
if self._is_in_remote_copy_relationship(fcmap):
return None
source_volume_name = fcmap.source_vdisk_name
return self._get_wwn_by_volume_name_if_exists(source_volume_name)
def _get_volume_pools(self, cli_volume):
pool = cli_volume.mdisk_grp_name
if isinstance(pool, list):
pool_names = pool[:]
pool_names.remove('many')
return pool_names
return [pool]
def _get_volume_pool(self, cli_volume):
pools = self._get_volume_pools(cli_volume)
return ':'.join(pools)
def get_volume(self, name, pool, is_virt_snap_func):
cli_volume = self._get_cli_volume(name)
return self._generate_volume_response(cli_volume, is_virt_snap_func)
def _get_object_fcmaps(self, object_name):
all_fcmaps = []
fcmap_as_target = self._get_fcmap_as_target_if_exists(object_name)
if fcmap_as_target:
all_fcmaps.append(fcmap_as_target)
all_fcmaps.extend(self._get_fcmaps_as_source_if_exist(object_name))
return all_fcmaps
def _expand_cli_volume(self, cli_volume, increase_in_bytes, is_hyperswap):
volume_name = cli_volume.name
try:
if is_hyperswap:
self.client.svctask.expandvolume(object_id=volume_name, unit='b', size=increase_in_bytes)
else:
self.client.svctask.expandvdisksize(vdisk_id=volume_name, unit='b', size=increase_in_bytes)
except (svc_errors.CommandExecutionError, CLIFailureError) as ex:
logger.debug("Error running {} -{} {} -unit b -size {}".format(
"expandvolume" if is_hyperswap else "expandvdisksize",
"object_id" if is_hyperswap else "vdisk_id", volume_name, increase_in_bytes))
if is_warning_message(ex.my_message):
logger.warning("exception encountered during volume expansion of {}: {}".format(volume_name,
ex.my_message))
else:
logger.error("Failed to expand volume {}".format(volume_name))
if OBJ_NOT_FOUND in ex.my_message or VOL_NOT_FOUND in ex.my_message:
raise array_errors.ObjectNotFoundError(volume_name)
if NOT_ENOUGH_EXTENTS_IN_POOL_EXPAND in ex.my_message:
raise array_errors.NotEnoughSpaceInPool(id_or_name=cli_volume.mdisk_grp_name)
raise ex
def _change_volume_size(self, cli_volume, size_in_bytes): # This is a partition-only function
volume_name = cli_volume.name
try:
self.client.svctask.chvolume(size=size_in_bytes, unit='b', vdisk_id=volume_name)
except (svc_errors.CommandExecutionError, CLIFailureError) as ex:
logger.debug("Error running chvolume -size {} -unit b -vdisk_id {}".format(size_in_bytes, volume_name))
if is_warning_message(ex.my_message):
logger.warning("exception encountered during volume expansion of {}: {}".format(volume_name,
ex.my_message))
else:
logger.error("Failed to expand volume {}".format(volume_name))
if OBJ_NOT_FOUND in ex.my_message or VOL_NOT_FOUND in ex.my_message:
raise array_errors.ObjectNotFoundError(volume_name)
if NOT_ENOUGH_EXTENTS_IN_POOL_EXPAND in ex.my_message:
raise array_errors.NotEnoughSpaceInPool(id_or_name=cli_volume.mdisk_grp_name)
raise ex
def expand_volume(self, volume_id, required_bytes, partition_name=None):
logger.info("Expanding volume with id : {0} to {1} bytes".format(volume_id, required_bytes))
cli_volume = self._get_cli_volume_by_wwn(volume_id, not_exist_err=True)
volume_name = cli_volume.name
current_size = int(cli_volume.capacity)
final_size = self._convert_size_bytes(required_bytes)
if final_size < current_size:
raise array_errors.InvalidArgumentError("New volume size smaller than current")
increase_in_bytes = final_size - current_size
if partition_name:
self._change_volume_size(cli_volume, final_size)
else:
fcmaps = self._get_object_fcmaps(volume_name)
self._safe_delete_fcmaps(volume_name, fcmaps)
is_hyperswap = any(self._is_in_remote_copy_relationship(fcmap) for fcmap in fcmaps)
self._expand_cli_volume(cli_volume, increase_in_bytes, is_hyperswap)
logger.info(
"Finished volume expansion. id : {0}. volume increased by {1} bytes".format(volume_id, increase_in_bytes))
def _get_fcmaps(self, volume_name, endpoint_type):
"""
Args:
endpoint_type : 'source' or 'target'
"""
filter_value = '{0}_vdisk_name={1}'.format(endpoint_type, volume_name)
try:
return self.client.svcinfo.lsfcmap(filtervalue=filter_value).as_list
except (svc_errors.CommandExecutionError, CLIFailureError) as ex:
logger.debug("Error running lsfcmap -filtervalue {}".format(filter_value))
raise ex
def validate_supported_space_efficiency(self, space_efficiency):
logger.debug("validate_supported_space_efficiency for "
"space efficiency : {0}".format(space_efficiency))
if (space_efficiency and space_efficiency.lower() not in
[common_settings.SPACE_EFFICIENCY_THIN, common_settings.SPACE_EFFICIENCY_THICK,
common_settings.SPACE_EFFICIENCY_COMPRESSED,
common_settings.SPACE_EFFICIENCY_DEDUPLICATED,
common_settings.SPACE_EFFICIENCY_DEDUPLICATED_THIN,
common_settings.SPACE_EFFICIENCY_DEDUPLICATED_COMPRESSED]):
logger.error("space efficiency value is not "
"supported {0}".format(space_efficiency))
raise array_errors.SpaceEfficiencyNotSupported(
space_efficiency)
logger.info("Finished validate_supported_space_efficiency")
def _convert_size_bytes(self, size_in_bytes):
# SVC volume size must be the multiple of 512 bytes
ret = size_in_bytes % self.BLOCK_SIZE_IN_BYTES
if ret > 0:
return size_in_bytes - ret + 512
return size_in_bytes
def _get_wwn_by_volume_name_if_exists(self, volume_name):
cli_volume = self._get_cli_volume_if_exists(volume_name)
if not cli_volume:
return None
wwn = cli_volume.vdisk_UID
logger.debug("found wwn : {0}".format(wwn))
return wwn
def _lsvdisk_by_uid(self, vdisk_uid):
filter_value = 'vdisk_UID=' + vdisk_uid
return self._lsvdisk_single_element(filtervalue=filter_value)
def _get_cli_volume_by_wwn(self, volume_id, not_exist_err=False):
cli_volume = self._lsvdisk_by_uid(volume_id)
if not cli_volume:
volume_nguid = convert_scsi_id_to_nguid(volume_id)
cli_volume = self._lsvdisk_by_uid(volume_nguid)
if not cli_volume and not_exist_err:
raise array_errors.ObjectNotFoundError(volume_id)
return cli_volume
def _get_volume_name_by_wwn_if_exists(self, volume_id):
cli_volume = self._get_cli_volume_by_wwn(volume_id)
if not cli_volume:
return None
vol_name = cli_volume.name
logger.debug("found volume name : {0}".format(vol_name))
return vol_name
def _get_volume_name_by_wwn(self, volume_id):
vol_name = self._get_volume_name_by_wwn_if_exists(volume_id)
if not vol_name:
raise array_errors.ObjectNotFoundError(volume_id)
return vol_name
def _get_volume_internal_id_from_uid(self, uid):
if not uid:
raise array_errors.ObjectNotFoundError(uid)
logger.info("internal_id not in volume handle, resolving using uid '{}'".format(uid))
cli_volume = self._get_cli_volume_by_wwn(uid, not_exist_err=True)
return cli_volume.id
def _create_cli_volume(self, name, size_in_bytes, space_efficiency, pool, io_group, volume_group=None):
logger.info("creating volume with name : {}. size : {} . in pool : {} with parameters : {}".format(
name, size_in_bytes, pool, space_efficiency))
try:
size = self._convert_size_bytes(size_in_bytes)
cli_kwargs = build_kwargs_from_parameters(space_efficiency, pool, io_group,
volume_group, name, size)
self.client.svctask.mkvolume(**cli_kwargs)
except (svc_errors.CommandExecutionError, CLIFailureError) as ex:
logger.debug("Error running mkvolume {}".format(self._format_cli_args(cli_kwargs)))
if is_warning_message(ex.my_message):
logger.warning("exception encountered during creation of volume {0}: {1}".format(name,
ex.my_message))
else:
logger.error("Cannot create volume {0}, Reason is: {1}".format(name, ex))
if OBJ_ALREADY_EXIST in ex.my_message:
raise array_errors.VolumeAlreadyExists(name, self.endpoint)
if NAME_NOT_EXIST_OR_MEET_RULES in ex.my_message:
raise array_errors.InvalidArgumentError(ex.my_message)
if POOL_NOT_MATCH_VOL_SPACE_EFFICIENCY in ex.my_message or NOT_REDUCTION_POOL in ex.my_message:
raise array_errors.PoolDoesNotMatchSpaceEfficiency(pool, space_efficiency, ex)
if NOT_ENOUGH_EXTENTS_IN_POOL_CREATE in ex.my_message:
raise array_errors.NotEnoughSpaceInPool(id_or_name=pool)
if any(msg_id in ex.my_message for msg_id in (NON_ASCII_CHARS, INVALID_NAME, TOO_MANY_CHARS)):
raise array_errors.InvalidArgumentError(ex.my_message)
raise ex
logger.info("finished creating cli volume : {}".format(name))
@retry(svc_errors.StorageArrayClientException, tries=5, delay=1)
def _rollback_copy_to_target_volume(self, target_volume_name):
self._delete_unstarted_fcmap_if_exists(target_volume_name)
def _copy_to_target_volume(self, target_volume_name, source_volume_name):
logger.debug("copying volume {0} data to volume {1}.".format(source_volume_name,
target_volume_name))
try:
return self._create_and_start_fcmap(source_volume_name, target_volume_name, is_copy=True)
except (svc_errors.CommandExecutionError, CLIFailureError) as ex:
logger.error("Failed to copy to '{0}': {1}".format(target_volume_name, ex))
logger.info("rolling back copy to '{0}'".format(target_volume_name))
self._rollback_copy_to_target_volume(target_volume_name)
raise ex
def copy_to_existing_volume(self, volume_id, source_id, source_capacity_in_bytes,
minimum_volume_size_in_bytes):
source_name = self._get_volume_name_by_wwn(source_id)
target_volume_name = self._get_volume_name_by_wwn(volume_id)
self._copy_to_target_volume(target_volume_name, source_name)
def _create_volume_group(self, name, partition_name):
if partition_name:
return self._mkvolumegroup(name=name, partition=partition_name)
return self._mkvolumegroup(name=name)
def _is_stretch_pool(self, pool):
if ":" not in pool:
return False
return (self._get_pool_site(pool.split(":")[0]) != self._get_pool_site(pool.split(":")[1]))
def _create_volume_in_volume_group(self, name, pool, io_group, source_id):
cli_kwargs = build_create_volume_in_volume_group_kwargs(pool, io_group, source_id)
self._mkvolumegroup(name, **cli_kwargs)
def _fix_creation_side_effects(self, name, cli_volume_id, volume_group):
self._change_volume_group(None, cli_volume_id, volume_group)
self._rmvolumegroup(name)
self._rename_volume(cli_volume_id, name)
def _add_vdisk_copies(self, is_stretch, pool, vdisk_id):
if not is_stretch:
return
pools = pool.split(":")
for additional_pool in pools[1:]:
logger.info("adding vdisk copy to pool: {}".format(additional_pool))
self.client.svctask.addvdiskcopy(mdiskgrp=additional_pool, vdisk_id=vdisk_id)
def _create_cli_volume_from_snapshot(self, name, pool, io_group, volume_group, source_id):
logger.info("creating volume from snapshot. pool: {}".format(pool))
is_stretch = self._is_stretch_pool(pool)
vg_pool = pool.split(":")[0] if is_stretch else pool
self._create_volume_in_volume_group(name, vg_pool, io_group, source_id)
cli_volume_id = self._get_cli_volume_id_from_volume_group("volume_group_name", name)
try:
# remove vg and leave volume, with name <name>
self._fix_creation_side_effects(name, cli_volume_id, volume_group)
self._add_vdisk_copies(is_stretch, pool, cli_volume_id)
except (svc_errors.CommandExecutionError, CLIFailureError, array_errors.VolumeAlreadyExists) as ex:
self._rollback_create_volume_from_snapshot(cli_volume_id, name)
raise ex
def _create_cli_volume_from_vg_snapshot(self, name, pool, io_group, volume_group, vg_snapshot_name,
vol_name, vol_vg, space_efficiency, use_thin_clone):
# This is a partition-only function
logger.info("creating volume from vg snapshot")
cli_kwargs = {
'type': 'thinclone' if use_thin_clone else 'clone',
'snapshot': vg_snapshot_name,
'fromsourcegroup': vol_vg, # vg where the original volume is in
'pool': pool,
'fromsourcevolume': vol_name, # original volume that the snapshot was created from
'volumegroup': volume_group # vg where we want the new volume to be in
}
if io_group:
cli_kwargs['iogrp'] = io_group
if not use_thin_clone:
space_efficiency_kwargs = _get_space_efficiency_kwargs(space_efficiency)
cli_kwargs.update(space_efficiency_kwargs)
try:
self.client.svctask.mkvolume(name=name, **cli_kwargs)
except (svc_errors.CommandExecutionError, CLIFailureError) as ex:
logger.debug("Error running mkvolume -name {} {}".format(name, self._format_cli_args(cli_kwargs)))
raise ex
def _create_cli_volume_from_volume(self, name, pool, io_group, volume_group, source_id):
logger.info("creating volume from volume")
cli_snapshot = self._add_snapshot(name, source_id, pool)
try:
self._create_cli_volume_from_snapshot(name, pool, io_group, volume_group, cli_snapshot.snapshot_id)
finally:
self._rmsnapshot(cli_snapshot.snapshot_id)
def _partition_create_cli_volume_from_cli_vol(self, name, pool, io_group, volume_group, cli_volume,
space_efficiency, partition_name):
if not cli_volume.volume_group_name:
raise array_errors.InvalidArgumentError("volume group not specified")
if self._verify_volume_group_of_partition_name(partition_name, cli_volume.volume_group_name) is False:
raise array_errors.InvalidArgumentError("volume group not part of partition")
use_thin_clone = False
logger.info("get partition {}".format(str(partition_name)))
filter_value = 'name={}'.format(partition_name)
try:
cli_partition = self.client.svcinfo.lspartition(filtervalue=filter_value).as_single_element
except (svc_errors.CommandExecutionError, CLIFailureError) as ex:
logger.debug("Error running lspartition -filtervalue {}".format(filter_value))
raise ex
if not cli_partition:
raise array_errors.InvalidArgumentError("partition not found")
replication_policy_name = cli_partition.replication_policy_name
logger.info("replication_policy {}".format(str(replication_policy_name)))
if replication_policy_name:
filter_value = 'name={}'.format(replication_policy_name)
try:
cli_replication_policy = self.client.svcinfo.lsreplicationpolicy(
filtervalue=filter_value).as_single_element
except (svc_errors.CommandExecutionError, CLIFailureError) as ex:
logger.debug("Error running lsreplicationpolicy -filtervalue {}".format(filter_value))
raise ex
if not cli_replication_policy:
raise array_errors.InvalidArgumentError("partition replication policy not found")
logger.info("replication_topolgy {}".format(str(cli_replication_policy.topology)))
if cli_replication_policy.topology == "2-site-ha":
use_thin_clone = True
cli_snapshot = self._add_vg_snapshot(name, cli_volume.volume_group_name)
try:
if not space_efficiency:
space_efficiency_aliases = _get_cli_volume_space_efficiency_aliases(cli_volume)
space_efficiency = space_efficiency_aliases.pop()
self._create_cli_volume_from_vg_snapshot(name, pool, io_group, volume_group, cli_snapshot.snapshot_name,
cli_volume.name, cli_volume.volume_group_name,
space_efficiency, use_thin_clone)
finally:
try:
logger.info("Remove temp snapshot")
self.client.svctask.rmsnapshot(
snapshot=cli_snapshot.snapshot_name, volumegroup=cli_volume.volume_group_name)
except (svc_errors.CommandExecutionError, CLIFailureError) as ex:
logger.debug("Error running rmsnapshot -snapshot {} -volumegroup {}".format(
cli_snapshot.snapshot_name, cli_volume.volume_group_name))
raise ex
logger.info("creating volume from snapshot - success")
def _partition_create_cli_volume_from_volume(self, name, pool, io_group, volume_group, source_id, space_efficiency,
size_in_bytes, partition_name):
# VG snapshot is more compatible than vol snapshot for certain partition types
logger.info("creating volume from volume - partition")
cli_volume = self._get_cli_volume_by_wwn(source_id)
if cli_volume is None:
raise array_errors.ObjectNotFoundError(source_id)
# Convert cli_volume from concise to full view
cli_volume = self._get_cli_volume(cli_volume.name)
if cli_volume is None:
raise array_errors.ObjectNotFoundError(source_id)
if self._convert_size_bytes(size_in_bytes) != int(cli_volume.capacity):
raise array_errors.InvalidArgumentError(
"clone not created because the source and target volumes are different sizes {} {}".format(
size_in_bytes, cli_volume.capacity)
)
self._partition_create_cli_volume_from_cli_vol(name, pool, io_group, volume_group, cli_volume,
space_efficiency, partition_name)
def _create_cli_volume_from_source(self, name, pool, io_group, volume_group, source_ids, source_type):
if source_type == controller_settings.SNAPSHOT_TYPE_NAME:
self._create_cli_volume_from_snapshot(name, pool, io_group, volume_group, source_ids.internal_id)
else:
self._create_cli_volume_from_volume(name, pool, io_group, volume_group, source_ids.internal_id)
def _is_vdisk_support_addsnapshot(self, vdisk_uid):
return self._is_addsnapshot_supported() and not self._is_vdisk_has_fcmaps(vdisk_uid)
def _partition_create_volume(self, name, size_in_bytes, space_efficiency, pool, io_group, volume_group, source_ids,
partition_name, partition_vg):
if not volume_group:
# When default VG is implemented in SVC use this (add "default" to the filter)
# logger.info("get corresponding volume group for partition {}".format(partition_name))
# volume_group = self._get_volume_group_from_partition_name(partition_name)
volume_group = partition_vg
logger.info("partition {} use default volume group {}".format(partition_name, volume_group))
else:
logger.info("partition {} use specfied volume group {}".format(partition_name, volume_group))
if not volume_group:
raise array_errors.InvalidArgumentError("volume group not specified")
if self._verify_volume_group_of_partition_name(partition_name, volume_group) is False:
raise array_errors.InvalidArgumentError("volume group not part of partition")
if source_ids:
self._partition_create_cli_volume_from_volume(name, pool, io_group, volume_group, source_ids.uid,
space_efficiency, size_in_bytes, partition_name)
else:
self._create_cli_volume(name, size_in_bytes, space_efficiency, pool, io_group, volume_group)
cli_volume = self._get_cli_volume(name)
return self._generate_volume_response(cli_volume)
@register_csi_plugin()
def create_volume(self, name, size_in_bytes, space_efficiency, pool, io_group, volume_group, source_ids,
source_type, is_virt_snap_func, partition_name=None, partition_vg=None):
if partition_name:
return self._partition_create_volume(name, size_in_bytes, space_efficiency, pool, io_group, volume_group,
source_ids, partition_name, partition_vg)
if is_virt_snap_func and source_ids:
if self._is_vdisk_support_addsnapshot(source_ids.uid):
self._create_cli_volume_from_source(name, pool, io_group, volume_group, source_ids,
source_type)
else:
raise array_errors.VirtSnapshotFunctionNotSupportedMessage(name)
else:
# if there's source - the caller to this function copies with flash copy
# TODO bug? Doesn't check source vol type if space_efficiency not specified
self._create_cli_volume(name, size_in_bytes, space_efficiency, pool, io_group, volume_group)
cli_volume = self._get_cli_volume(name)
return self._generate_volume_response(cli_volume, is_virt_snap_func)
def _rmvolume(self, volume_id_or_name, not_exist_err=True):
logger.info("deleting volume with name : {0}".format(volume_id_or_name))
try:
self.client.svctask.rmvolume(vdisk_id=volume_id_or_name)
except (svc_errors.CommandExecutionError, CLIFailureError) as ex:
logger.debug("Error running rmvolume -vdisk_id {}".format(volume_id_or_name))
if is_warning_message(ex.my_message):
logger.warning("exception encountered during deletion of volume {}: {}".format(volume_id_or_name,
ex.my_message))
else:
logger.error("Failed to delete volume {}".format(volume_id_or_name))
if (OBJ_NOT_FOUND in ex.my_message or VOL_NOT_FOUND in ex.my_message) and not_exist_err:
raise array_errors.ObjectNotFoundError(volume_id_or_name)
raise ex
@register_csi_plugin()
def delete_volume(self, volume_id, partition_name=None):
logger.info("Deleting volume with id : {0}".format(volume_id))
self._delete_volume(volume_id, is_partition=partition_name is not None)
logger.info("Finished volume deletion. id : {0}".format(volume_id))
def get_snapshot(self, volume_id, snapshot_name, pool, is_virt_snap_func):
logger.debug("Get snapshot : {}".format(snapshot_name))
if is_virt_snap_func:
if self._is_addsnapshot_supported():
cli_snapshot = self._get_cli_snapshot_by_name(snapshot_name)
if not cli_snapshot:
return None
source_cli_volume = self._get_cli_volume_by_wwn(volume_id)
return self._generate_snapshot_response_from_cli_snapshot(cli_snapshot, source_cli_volume)
raise array_errors.VirtSnapshotFunctionNotSupportedMessage(volume_id)
target_cli_volume = self._get_cli_volume_if_exists(snapshot_name)
if not target_cli_volume:
return None
return self._generate_snapshot_response_with_verification(target_cli_volume)
# Also called with snapshot object
def verify_volume_partition(self, volume, partition_name):
if volume.partition_name != partition_name:
raise ValidationException(messages.VOLUME_PARTITION_MISMATCH.format(
volume.name, volume.partition_name, partition_name))
def get_object_by_id(self, object_id, object_type, is_virt_snap_func=False):
if is_virt_snap_func and object_type == controller_settings.SNAPSHOT_TYPE_NAME:
cli_snapshot = self._get_cli_snapshot_by_id(object_id)
if not cli_snapshot:
return None
source_cli_volume = self._get_cli_volume(cli_snapshot.volume_name)
if not source_cli_volume: