32
32
import enum
33
33
from contextlib import ExitStack , contextmanager
34
34
from typing import Any , List , Optional , Tuple , Callable , Dict , Set , Union , Iterable
35
- from functools import wraps , partial
35
+ from functools import wraps , partial , cached_property
36
36
from collections import defaultdict , Counter , namedtuple
37
37
from concurrent .futures import ThreadPoolExecutor
38
38
from threading import Lock
@@ -252,7 +252,6 @@ def __init__(self, tester_obj, termination_event, *args, nemesis_selector=None,
252
252
self .monitoring_set = tester_obj .monitors
253
253
self .target_node : BaseNode = None
254
254
self .disruptions_list = []
255
- self .disruptions_cycle : Iterable [Callable ] | None = None
256
255
self .termination_event = termination_event
257
256
self .operation_log = []
258
257
self .current_disruption = None
@@ -274,7 +273,6 @@ def __init__(self, tester_obj, termination_event, *args, nemesis_selector=None,
274
273
self .task_used_streaming = None
275
274
self .filter_seed = self .cluster .params .get ('nemesis_filter_seeds' )
276
275
self .nemesis_seed = nemesis_seed or random .randint (0 , 1000 )
277
- self ._random_sequence = None
278
276
self ._add_drop_column_max_per_drop = 5
279
277
self ._add_drop_column_max_per_add = 5
280
278
self ._add_drop_column_max_column_name_size = 10
@@ -1818,80 +1816,43 @@ def _deprecated_disrupt_stop_start(self):
1818
1816
self .log .info ('StopStart %s' , self .target_node )
1819
1817
self .target_node .restart ()
1820
1818
1821
- def call_random_disrupt_method (self , disrupt_methods = None , predefined_sequence = False ):
1822
- # pylint: disable=too-many-branches
1823
-
1824
- if disrupt_methods is None :
1825
- disrupt_methods = [attr [1 ] for attr in inspect .getmembers (self ) if
1826
- attr [0 ].startswith ('disrupt_' ) and
1827
- callable (attr [1 ])]
1828
- else :
1829
- disrupt_methods = [attr [1 ] for attr in inspect .getmembers (self ) if
1830
- attr [0 ] in disrupt_methods and
1831
- callable (attr [1 ])]
1832
- if not disrupt_methods :
1833
- self .log .warning ("No monkey to run" )
1834
- return
1835
- if not predefined_sequence :
1836
- disrupt_method = random .choice (disrupt_methods )
1837
- else :
1838
- if not self ._random_sequence :
1839
- # Generate random sequence, every method has same chance to be called.
1840
- # Here we use multiple original methods list, it will increase the chance
1841
- # to call same method continuously.
1842
- #
1843
- # Adjust the rate according to the test duration. Try to call more unique
1844
- # methods and don't wait to long time to meet the balance if the test
1845
- # duration is short.
1846
- test_duration = self .cluster .params .get ('test_duration' )
1847
- if test_duration < 600 : # less than 10 hours
1848
- rate = 1
1849
- elif test_duration < 4320 : # less than 3 days
1850
- rate = 2
1851
- else :
1852
- rate = 3
1853
- multiple_disrupt_methods = disrupt_methods * rate
1854
- random .shuffle (multiple_disrupt_methods )
1855
- self ._random_sequence = multiple_disrupt_methods
1856
- # consume the random sequence
1857
- disrupt_method = self ._random_sequence .pop ()
1819
+ # Nemesis running code
1858
1820
1859
- self .execute_disrupt_method (disrupt_method )
1821
+ @cached_property
1822
+ def all_disrupt_methods (self ):
1823
+ return self .nemesis_registry .get_disrupt_methods ()
1860
1824
1861
1825
def execute_disrupt_method (self , disrupt_method ):
1826
+ """Runs selected disrupt method"""
1862
1827
disrupt_method_name = disrupt_method .__name__ .replace ('disrupt_' , '' )
1863
1828
self .metrics_srv .event_start (disrupt_method_name )
1864
1829
try :
1865
1830
disrupt_method (self )
1866
1831
finally :
1867
1832
self .metrics_srv .event_stop (disrupt_method_name )
1868
1833
1869
- def build_list_of_disruptions_to_execute (self , nemesis_selector : str | None = None , nemesis_multiply_factor = 1 ):
1870
- """
1871
- Builds the list of disruptions that should be excuted during a test.
1834
+ def build_disruptions_by_name (self , disrupt_methods : List [str ]):
1835
+ """Builds list of available disruptions according to function names"""
1836
+ filtered = [func for func in self .all_disrupt_methods if func .__name__ in disrupt_methods ]
1837
+ names = [func .__name__ for func in filtered ]
1838
+ assert names == disrupt_methods , f"Unable to find these disrupt methods: { set (disrupt_methods ).difference (names )} "
1839
+ return filtered
1872
1840
1873
- nemesis_selector: should be retrived from the test yaml by using the "nemesis_selector".
1874
- Here it kept for future usages and unit testing ability.
1875
- more about nemesis_selector behaviour in sct_config.py
1841
+ def build_disruptions_by_selector ( self , nemesis_selector : str | None = None ):
1842
+ """
1843
+ Filter available disruptions according to the logical phrase
1876
1844
1877
- nemesis_multiply_factor: should be retrived from the test yaml by using the "nemesis_multiply_factor" .
1878
- Here it kept for future usages and unit testing ability .
1879
- more about nemesis_selector behaviour in sct_config.py
1845
+ nemesis_selector: Logical phrase selector to filter available disruption methods .
1846
+ To be able to be filtered, method needs to have corresponding class .
1847
+ Usually retrieved from the test yaml by using the "nemesis_selector", more about nemesis_selector behaviour in sct_config.py
1880
1848
"""
1881
- nemesis_selector = nemesis_selector or self .nemesis_selector
1882
1849
if self ._is_it_on_kubernetes ():
1883
1850
if nemesis_selector :
1884
1851
nemesis_selector = nemesis_selector + " and kubernetes"
1885
1852
else :
1886
1853
nemesis_selector = "kubernetes"
1887
- nemesis_multiply_factor = self .cluster .params .get ('nemesis_multiply_factor' ) or nemesis_multiply_factor
1888
1854
disruptions = self .nemesis_registry .get_disrupt_methods (nemesis_selector )
1889
-
1890
- if nemesis_multiply_factor :
1891
- disruptions = disruptions * nemesis_multiply_factor
1892
-
1893
- self .disruptions_list .extend (disruptions )
1894
- return self .disruptions_list
1855
+ return disruptions
1895
1856
1896
1857
@property
1897
1858
def nemesis_selector (self ) -> str :
@@ -1915,22 +1876,34 @@ def nemesis_selector(self, value: str):
1915
1876
1916
1877
@property
1917
1878
def _disruption_list_names (self ):
1879
+ """Returns name of all collected nemesis"""
1918
1880
return [nemesis .__name__ for nemesis in self .disruptions_list ]
1919
1881
1920
- def shuffle_list_of_disruptions (self ):
1921
- self .log .debug (f'nemesis_seed to be used is { self .nemesis_seed } ' )
1882
+ def shuffle_list_of_disruptions (self , disruption_list : List , nemesis_multiply_factor : int | None = None ):
1883
+ """
1884
+ Randomizes list of disruptions
1922
1885
1886
+ nemesis_multiply_factor: How many times to multiply the original list before shuffle
1887
+ Useful for increasing probability of the same nemesis twice in a row
1888
+ Usually retrieved from the test yaml by using the "nemesis_selector", more about nemesis_selector behaviour in sct_config.py
1889
+ """
1890
+ self .log .debug (f'nemesis_seed to be used is { self .nemesis_seed } ' )
1923
1891
self .log .debug (f"nemesis stack BEFORE SHUFFLE is { self ._disruption_list_names } " )
1924
- random .Random (self .nemesis_seed ).shuffle (self .disruptions_list )
1892
+ nemesis_multiply_factor = nemesis_multiply_factor or self .cluster .params .get ('nemesis_multiply_factor' )
1893
+ random .Random (self .nemesis_seed ).shuffle (disruption_list * nemesis_multiply_factor )
1925
1894
self .log .info (f"List of Nemesis to execute: { self ._disruption_list_names } " )
1926
1895
1896
+ @cached_property
1897
+ def infinite_cycle (self ):
1898
+ """Returns infinite cycle of all nemesis"""
1899
+ return itertools .cycle (self .disruptions_list )
1900
+
1927
1901
def call_next_nemesis (self ):
1902
+ """Calls next nemesis in the order"""
1928
1903
assert self .disruptions_list , "no nemesis were selected"
1929
- if not self .disruptions_cycle :
1930
- self .disruptions_cycle = itertools .cycle (self .disruptions_list )
1931
- self .log .debug (f'Selecting the next nemesis out of stack { self ._disruption_list_names [10 :]} ' )
1932
- self .execute_disrupt_method (disrupt_method = next (self .disruptions_cycle ))
1904
+ self .execute_disrupt_method (disrupt_method = next (self .infinite_cycle ))
1933
1905
1906
+ # End of Nemesis running code
1934
1907
@latency_calculator_decorator (legend = "Run repair process with nodetool repair" )
1935
1908
def repair_nodetool_repair (self , node = None , publish_event = True ):
1936
1909
node = node if node else self .target_node
@@ -5629,8 +5602,8 @@ def wrapper(*args, **kwargs): # pylint: disable=too-many-statements # noqa: PL
5629
5602
class SisyphusMonkey (Nemesis ):
5630
5603
def __init__ (self , * args , ** kwargs ):
5631
5604
super ().__init__ (* args , ** kwargs )
5632
- self .build_list_of_disruptions_to_execute ()
5633
- self .shuffle_list_of_disruptions ()
5605
+ self .disruptions_list = self . build_disruptions_by_selector ()
5606
+ self .shuffle_list_of_disruptions (self . disruptions_list )
5634
5607
5635
5608
def disrupt (self ):
5636
5609
self .call_next_nemesis ()
@@ -5741,13 +5714,14 @@ class EnableDisableTableEncryptionAwsKmsProviderMonkey(Nemesis):
5741
5714
5742
5715
def __init__ (self , * args , ** kwargs ):
5743
5716
super ().__init__ (* args , ** kwargs )
5744
- self .disrupt_methods_list = [
5717
+ self .disruptions_list = self . build_disruptions_by_name ( [
5745
5718
'disrupt_enable_disable_table_encryption_aws_kms_provider_without_rotation' ,
5746
5719
'disrupt_enable_disable_table_encryption_aws_kms_provider_with_rotation' ,
5747
- ]
5720
+ ])
5721
+ self .shuffle_list_of_disruptions (self .disruptions_list )
5748
5722
5749
5723
def disrupt (self ):
5750
- self .call_random_disrupt_method ( disrupt_methods = self . disrupt_methods_list , predefined_sequence = True )
5724
+ self .call_next_nemesis ( )
5751
5725
5752
5726
5753
5727
class RestartThenRepairNodeMonkey (Nemesis ):
@@ -5975,12 +5949,6 @@ def disrupt(self):
5975
5949
self .disrupt_delete_overlapping_row_ranges ()
5976
5950
5977
5951
5978
- class ChaosMonkey (Nemesis ):
5979
-
5980
- def disrupt (self ):
5981
- self .call_random_disrupt_method ()
5982
-
5983
-
5984
5952
class CategoricalMonkey (Nemesis ):
5985
5953
"""Randomly picks disruptions to execute using the given categorical distribution.
5986
5954
@@ -6066,34 +6034,39 @@ def _random_disrupt(self):
6066
6034
self .execute_disrupt_method (bound_method )
6067
6035
6068
6036
6069
- CLOUD_LIMITED_CHAOS_MONKEY = ['disrupt_nodetool_cleanup' ,
6070
- 'disrupt_nodetool_drain' , 'disrupt_nodetool_refresh' ,
6071
- 'disrupt_stop_start_scylla_server' , 'disrupt_major_compaction' ,
6072
- 'disrupt_modify_table' , 'disrupt_nodetool_enospc' ,
6073
- 'disrupt_stop_wait_start_scylla_server' ,
6074
- 'disrupt_soft_reboot_node' ,
6075
- 'disrupt_truncate' ]
6076
-
6077
-
6078
6037
class ScyllaCloudLimitedChaosMonkey (Nemesis ):
6079
6038
6080
- def disrupt (self ):
6081
- # Limit the nemesis scope to only one relevant to scylla cloud, where we defined we don't have AWS api access:
6082
- self .call_random_disrupt_method (disrupt_methods = CLOUD_LIMITED_CHAOS_MONKEY )
6083
-
6084
-
6085
- class AllMonkey (Nemesis ):
6039
+ def __init__ (self , * args , ** kwargs ):
6040
+ super ().__init__ (* args , ** kwargs )
6041
+ self .disruptions_list = self .build_disruptions_by_name ([
6042
+ 'disrupt_nodetool_cleanup' ,
6043
+ 'disrupt_nodetool_drain' , 'disrupt_nodetool_refresh' ,
6044
+ 'disrupt_stop_start_scylla_server' , 'disrupt_major_compaction' ,
6045
+ 'disrupt_modify_table' , 'disrupt_nodetool_enospc' ,
6046
+ 'disrupt_stop_wait_start_scylla_server' ,
6047
+ 'disrupt_soft_reboot_node' ,
6048
+ 'disrupt_truncate'
6049
+ ])
6050
+ self .shuffle_list_of_disruptions (self .disruptions_list )
6086
6051
6087
6052
def disrupt (self ):
6088
- self .call_random_disrupt_method (predefined_sequence = True )
6053
+ # Limit the nemesis scope to only one relevant to scylla cloud, where we defined we don't have AWS api access:
6054
+ self .call_next_nemesis ()
6089
6055
6090
6056
6091
6057
class MdcChaosMonkey (Nemesis ):
6092
6058
6059
+ def __init__ (self , * args , ** kwargs ):
6060
+ super ().__init__ (* args , ** kwargs )
6061
+ self .disruptions_list = self .build_disruptions_by_name ([
6062
+ 'disrupt_destroy_data_then_repair' ,
6063
+ 'disrupt_no_corrupt_repair' ,
6064
+ 'disrupt_nodetool_decommission'
6065
+ ])
6066
+ self .shuffle_list_of_disruptions (self .disruptions_list )
6067
+
6093
6068
def disrupt (self ):
6094
- self .call_random_disrupt_method (
6095
- disrupt_methods = ['disrupt_destroy_data_then_repair' , 'disrupt_no_corrupt_repair' ,
6096
- 'disrupt_nodetool_decommission' ])
6069
+ self .call_next_nemesis ()
6097
6070
6098
6071
6099
6072
class ModifyTableMonkey (Nemesis ):
@@ -6241,13 +6214,14 @@ class DisruptKubernetesNodeThenReplaceScyllaNode(Nemesis):
6241
6214
6242
6215
def __init__ (self , * args , ** kwargs ):
6243
6216
super ().__init__ (* args , ** kwargs )
6244
- self .disrupt_methods_list = [
6217
+ self .disruptions_list = self . build_disruptions_by_name ( [
6245
6218
'disrupt_drain_kubernetes_node_then_replace_scylla_node' ,
6246
6219
'disrupt_terminate_kubernetes_host_then_replace_scylla_node' ,
6247
- ]
6220
+ ])
6221
+ self .shuffle_list_of_disruptions (self .disruptions_list )
6248
6222
6249
6223
def disrupt (self ):
6250
- self .call_random_disrupt_method ( disrupt_methods = self . disrupt_methods_list )
6224
+ self .call_next_nemesis ( )
6251
6225
6252
6226
6253
6227
class DrainKubernetesNodeThenDecommissionAndAddScyllaNode (Nemesis ):
@@ -6272,13 +6246,14 @@ class DisruptKubernetesNodeThenDecommissionAndAddScyllaNode(Nemesis):
6272
6246
6273
6247
def __init__ (self , * args , ** kwargs ):
6274
6248
super ().__init__ (* args , ** kwargs )
6275
- self .disrupt_methods_list = [
6249
+ self .disruptions_list = self . build_disruptions_by_name ( [
6276
6250
'disrupt_drain_kubernetes_node_then_decommission_and_add_scylla_node' ,
6277
6251
'disrupt_terminate_kubernetes_host_then_decommission_and_add_scylla_node' ,
6278
- ]
6252
+ ])
6253
+ self .shuffle_list_of_disruptions (self .disruptions_list )
6279
6254
6280
6255
def disrupt (self ):
6281
- self .call_random_disrupt_method ( disrupt_methods = self . disrupt_methods_list )
6256
+ self .call_next_nemesis ( )
6282
6257
6283
6258
6284
6259
class K8sSetMonkey (Nemesis ):
@@ -6287,16 +6262,16 @@ class K8sSetMonkey(Nemesis):
6287
6262
6288
6263
def __init__ (self , * args , ** kwargs ):
6289
6264
super ().__init__ (* args , ** kwargs )
6290
- self .disrupt_methods_list = [
6265
+ self .disruptions_list = self . build_disruptions_by_name ( [
6291
6266
'disrupt_drain_kubernetes_node_then_replace_scylla_node' ,
6292
6267
'disrupt_terminate_kubernetes_host_then_replace_scylla_node' ,
6293
6268
'disrupt_drain_kubernetes_node_then_decommission_and_add_scylla_node' ,
6294
6269
'disrupt_terminate_kubernetes_host_then_decommission_and_add_scylla_node' ,
6295
- ]
6270
+ ])
6271
+ self .shuffle_list_of_disruptions (self .disruptions_list )
6296
6272
6297
6273
def disrupt (self ):
6298
- self .call_random_disrupt_method (
6299
- disrupt_methods = self .disrupt_methods_list , predefined_sequence = True )
6274
+ self .call_next_nemesis ()
6300
6275
6301
6276
6302
6277
class OperatorNodeReplace (Nemesis ):
@@ -6468,7 +6443,7 @@ class ScyllaOperatorBasicOperationsMonkey(Nemesis):
6468
6443
6469
6444
def __init__ (self , * args , ** kwargs ):
6470
6445
super ().__init__ (* args , ** kwargs )
6471
- self .disrupt_methods_list = [
6446
+ self .disruptions_list = self . build_disruptions_by_name ( [
6472
6447
'disrupt_nodetool_flush_and_reshard_on_kubernetes' ,
6473
6448
'disrupt_rolling_restart_cluster' ,
6474
6449
'disrupt_grow_shrink_cluster' ,
@@ -6483,10 +6458,11 @@ def __init__(self, *args, **kwargs):
6483
6458
'disrupt_mgmt_repair_cli' ,
6484
6459
'disrupt_mgmt_backup_specific_keyspaces' ,
6485
6460
'disrupt_mgmt_backup' ,
6486
- ]
6461
+ ])
6462
+ self .shuffle_list_of_disruptions (self .disruptions_list )
6487
6463
6488
6464
def disrupt (self ):
6489
- self .call_random_disrupt_method ( disrupt_methods = self . disrupt_methods_list , predefined_sequence = True )
6465
+ self .call_next_nemesis ( )
6490
6466
6491
6467
6492
6468
class NemesisSequence (Nemesis ):
@@ -6554,8 +6530,8 @@ def disrupt(self):
6554
6530
self .disrupt_repair_streaming_err ()
6555
6531
6556
6532
6557
- COMPLEX_NEMESIS = [NoOpMonkey , ChaosMonkey , ScyllaCloudLimitedChaosMonkey ,
6558
- AllMonkey , MdcChaosMonkey , SisyphusMonkey ,
6533
+ COMPLEX_NEMESIS = [NoOpMonkey , ScyllaCloudLimitedChaosMonkey ,
6534
+ MdcChaosMonkey , SisyphusMonkey ,
6559
6535
DisruptKubernetesNodeThenReplaceScyllaNode ,
6560
6536
DisruptKubernetesNodeThenDecommissionAndAddScyllaNode ,
6561
6537
CategoricalMonkey ]
0 commit comments