1+ import threading
2+
13from basetestcase import ClusterSetup
24from cb_tools .cb_cli import CbCli
35from BucketLib .bucket import Bucket
68from cb_tools .cbstats import Cbstats
79from cb_server_rest_util .cluster_nodes .cluster_nodes_api import ClusterRestAPI
810from cb_server_rest_util .buckets .buckets_api import BucketRestApi
11+ from sdk_client3 import SDKClient
12+ from sdk_exceptions import SDKException
13+ from StatsLib .StatsOperations import StatsHelper
14+ from upgrade .upgrade_base import UpgradeBase
915
1016
1117class KVRateLimitingTests (ClusterSetup ):
@@ -17,10 +23,41 @@ def setUp(self):
1723 self .cb_cli = CbCli (self .shell )
1824 self .log .info ("Starting KVRateLimitingTests synchronized with latest framework" )
1925
20- # Create bucket with test-specific parameters from conf file
26+ # Cluster-wide throttle enable; node_capacity kept low to trip throttler
27+ # CE does not support throttling — skip to allow CE-specific tests to run
28+ if self .cluster_util .is_enterprise_edition (self .cluster ):
29+ status , content = self .cluster_rest .manage_global_memcached_setting (
30+ throttle_enabled = "true" ,
31+ node_capacity = self .input .param ("node_capacity" , 500 ))
32+ self .log .info (f"Cluster-wide throttle enable: status={ status } , "
33+ f"content={ content } " )
34+ self .assertTrue (status ,
35+ f"Failed to enable cluster-wide throttle: { content } " )
36+ else :
37+ self .log .info ("Skipping global throttle enable on CE" )
38+
2139 self .create_bucket (self .cluster )
2240 self .bucket = self .cluster .buckets [0 ]
2341
42+ # Apply per-bucket throttle limits so hard-limit rejections and SDK
43+ # AmbiguousTimeoutExceptions are actually triggered during burst tests
44+ throttle_reserved = self .input .param ("bucket_throttle_reserved" , 0 )
45+ throttle_hard_limit = self .input .param ("bucket_throttle_hard_limit" , 0 )
46+ if throttle_reserved or throttle_hard_limit :
47+ edit_params = {}
48+ if throttle_reserved :
49+ edit_params [Bucket .throttleReserved ] = throttle_reserved
50+ if throttle_hard_limit :
51+ edit_params [Bucket .throttleHardLimit ] = throttle_hard_limit
52+ status , content = self .bucket_rest .edit_bucket (
53+ self .bucket .name , edit_params )
54+ self .log .info (
55+ f"Bucket throttle limits set: throttleReserved={ throttle_reserved } , "
56+ f"throttleHardLimit={ throttle_hard_limit } , "
57+ f"status={ status } , content={ content } " )
58+ self .assertTrue (
59+ status , f"Failed to set bucket throttle limits: { content } " )
60+
2461 def tearDown (self ):
2562 if getattr (self , "shell" , None ):
2663 self .shell .disconnect ()
@@ -253,3 +290,285 @@ def test_transaction_rate_limiting(self):
253290 if transaction_completed :
254291 self .assertGreater (final_wu , initial_wu ,
255292 "Write units should increase after transactions" )
293+
294+ def test_sdk_rate_limited_exception (self ):
295+ """
296+ Validate SDK clients receive an AmbiguousTimeoutException (or
297+ throttle/reject counters increase) under a parallel burst that
298+ exceeds the configured throttle limit.
299+ """
300+ bucket_name = self .bucket .name
301+ initial_stats = self .get_throttle_stats (bucket_name )
302+ initial_reject = int (initial_stats .get ("reject_count_total" , 0 ))
303+ initial_throttle = int (initial_stats .get ("throttle_count_total" , 0 ))
304+
305+ sdk_throttle_count = self ._burst_writes (
306+ num_clients = self .input .param ("burst_clients" , 8 ),
307+ ops_per_client = self .input .param ("burst_ops_per_client" , 2000 ))
308+
309+ final_stats = self .get_throttle_stats (bucket_name )
310+ final_reject = int (final_stats .get ("reject_count_total" , 0 ))
311+ final_throttle = int (final_stats .get ("throttle_count_total" , 0 ))
312+ self .log .info (
313+ f"sdk_throttle_count={ sdk_throttle_count } , "
314+ f"throttle { initial_throttle } ->{ final_throttle } , "
315+ f"reject { initial_reject } ->{ final_reject } " )
316+
317+ self .assertTrue (
318+ sdk_throttle_count > 0
319+ or final_reject > initial_reject
320+ or final_throttle > initial_throttle ,
321+ "Expected AmbiguousTimeoutException or throttle/reject count increase; "
322+ f"sdk_throttle_count={ sdk_throttle_count } , "
323+ f"throttle { initial_throttle } ->{ final_throttle } , "
324+ f"reject { initial_reject } ->{ final_reject } " )
325+
326+ def test_prometheus_throttle_metrics (self ):
327+ """
328+ Validate Prometheus exposes throttle/reject/wu/ru metric families
329+ for a rate-limited bucket once load is driven through it.
330+ """
331+ self .load_docs_to_bucket (self .bucket , start = 0 , end = 20000 ,
332+ batch_size = 200 , concurrency = 8 )
333+
334+ # Only kv_throttle_duration_seconds is always emitted; others appear
335+ # only after actual throttling occurs.
336+ expected = ["kv_throttle_duration_seconds" ]
337+ seen = {name : False for name in expected }
338+ total_lines = 0
339+ for server in self .cluster .kv_nodes :
340+ try :
341+ metrics = StatsHelper (server ).get_prometheus_metrics_high (
342+ component = "kv" )
343+ except Exception as e :
344+ self .log .warning (f"Prom fetch failed on { server .ip } : { e } " )
345+ continue
346+ for line in metrics :
347+ total_lines += 1
348+ line_str = line if isinstance (line , str ) else line .decode (
349+ errors = "ignore" )
350+ for name in expected :
351+ if name in line_str :
352+ seen [name ] = True
353+
354+ missing = [m for m , found in seen .items () if not found ]
355+ self .log .info (f"Prom metric families seen: { seen } , "
356+ f"total_lines={ total_lines } " )
357+ self .assertFalse (
358+ missing , f"Missing Prometheus metric families: { missing } " )
359+
360+ def _burst_writes (self , num_clients = 8 , ops_per_client = 5000 ):
361+ """
362+ Drive a high-rate parallel burst of SET ops via multiple SDKClients
363+ to force the engine's rate limiter to trip. Returns number of
364+ throttle-related errors observed across all threads.
365+ """
366+ clients = [SDKClient (self .cluster , self .bucket )
367+ for _ in range (num_clients )]
368+ throttle_counts = [0 ] * num_clients
369+
370+ def worker (c_idx , client ):
371+ for i in range (ops_per_client ):
372+ try :
373+ result = client .crud (
374+ "create" , f"burst_{ c_idx } _{ i } " ,
375+ {"idx" : i , "data" : "x" * 256 }, timeout = 2 )
376+ except Exception as e :
377+ self .log .info (
378+ f"[burst_err] { type (e ).__name__ } : { e } " )
379+ if SDKException .check_if_exception_exists (
380+ SDKException .AmbiguousTimeoutException , str (e )):
381+ throttle_counts [c_idx ] += 1
382+ continue
383+ err = None
384+ if isinstance (result , tuple ) and len (result ) == 2 :
385+ _success , fail = result
386+ if fail :
387+ err = next (iter (fail .values ())).get ("error" )
388+ elif isinstance (result , dict ) and \
389+ result .get ("status" ) is False :
390+ err = result .get ("error" )
391+ if err is not None :
392+ self .log .info (
393+ f"[burst_err] client={ c_idx } op={ i } err={ err !r} " )
394+ if SDKException .check_if_exception_exists (
395+ SDKException .AmbiguousTimeoutException , str (err )):
396+ throttle_counts [c_idx ] += 1
397+
398+ threads = [threading .Thread (target = worker , args = (i , clients [i ]))
399+ for i in range (num_clients )]
400+ for t in threads :
401+ t .start ()
402+ for t in threads :
403+ t .join ()
404+ for c in clients :
405+ try :
406+ c .close ()
407+ except Exception :
408+ pass
409+ return sum (throttle_counts )
410+
411+ def test_couchstore_and_magma_throttle (self ):
412+ """
413+ Validate throttling works on both Couchstore and Magma. Storage
414+ backend is parametrized via bucket_storage in the conf.
415+ """
416+ bucket_name = self .bucket .name
417+ storage = getattr (self .bucket , "storageBackend" , None ) or "unknown"
418+
419+ initial_stats = self .get_throttle_stats (bucket_name )
420+ initial_throttle = int (initial_stats .get ("throttle_count_total" , 0 ))
421+ initial_reject = int (initial_stats .get ("reject_count_total" , 0 ))
422+
423+ sdk_throttle_count = self ._burst_writes ()
424+
425+ final_stats = self .get_throttle_stats (bucket_name )
426+ final_throttle = int (final_stats .get ("throttle_count_total" , 0 ))
427+ final_reject = int (final_stats .get ("reject_count_total" , 0 ))
428+ self .log .info (f"storage={ storage } stats: { final_stats } , "
429+ f"sdk_throttle_count={ sdk_throttle_count } " )
430+
431+ self .assertTrue (
432+ final_throttle > initial_throttle
433+ or final_reject > initial_reject
434+ or sdk_throttle_count > 0 ,
435+ f"Expected throttle/reject increase on storage={ storage } ; "
436+ f"throttle { initial_throttle } ->{ final_throttle } , "
437+ f"reject { initial_reject } ->{ final_reject } , "
438+ f"sdk_throttle_count={ sdk_throttle_count } " )
439+
440+ def test_sync_gateway_backoff_simulation (self ):
441+ """
442+ Simulate Sync Gateway-style sustained burst SET load and verify the
443+ bucket rate-limiter applies backoff without client crash. Real
444+ Sync Gateway end-to-end is a manual test.
445+ """
446+ bucket_name = self .bucket .name
447+ initial_stats = self .get_throttle_stats (bucket_name )
448+ initial_throttle = int (initial_stats .get ("throttle_count_total" , 0 ))
449+ initial_reject = int (initial_stats .get ("reject_count_total" , 0 ))
450+
451+ sdk_throttle_count = self ._burst_writes ()
452+
453+ final_stats = self .get_throttle_stats (bucket_name )
454+ final_throttle = int (final_stats .get ("throttle_count_total" , 0 ))
455+ final_reject = int (final_stats .get ("reject_count_total" , 0 ))
456+ self .log .info (f"SGW sim stats: { final_stats } , "
457+ f"sdk_throttle_count={ sdk_throttle_count } " )
458+
459+ self .assertTrue (
460+ final_throttle > initial_throttle
461+ or final_reject > initial_reject
462+ or sdk_throttle_count > 0 ,
463+ f"Expected throttle/reject increase under SGW load; throttle "
464+ f"{ initial_throttle } ->{ final_throttle } , "
465+ f"reject { initial_reject } ->{ final_reject } , "
466+ f"sdk_throttle_count={ sdk_throttle_count } " )
467+
468+
469+ class RateLimitingUpgradeTests (UpgradeBase ):
470+ """
471+ Upgrade-time tests for KV rate limiting (gated to 8.1).
472+ """
473+ def setUp (self ):
474+ super (RateLimitingUpgradeTests , self ).setUp ()
475+ self .bucket_rest = BucketRestApi (self .cluster .master )
476+ self .target_throttle_reserved = self .input .param (
477+ "bucket_throttle_reserved" , 6000 )
478+ self .target_throttle_hard_limit = self .input .param (
479+ "bucket_throttle_hard_limit" , 12000 )
480+
481+ def tearDown (self ):
482+ # Retry delete on every node — cluster may be transient after upgrade
483+ import time
484+ bucket_names = [b .name for b in
485+ list (getattr (self .cluster , "buckets" , []) or [])]
486+ nodes = list (getattr (self .cluster , "nodes_in_cluster" , []) or [])
487+ self .log .info (
488+ f"Upgrade tearDown: buckets={ bucket_names } , "
489+ f"nodes={ [n .ip for n in nodes ]} " )
490+ for name in bucket_names :
491+ for attempt in range (6 ):
492+ done = False
493+ for node in nodes :
494+ try :
495+ status , content = BucketRestApi (node ).delete_bucket (
496+ name )
497+ self .log .info (
498+ f"Delete { name } on { node .ip } attempt "
499+ f"{ attempt } : status={ status } , content={ content } " )
500+ if status :
501+ done = True
502+ break
503+ except Exception as e :
504+ self .log .warning (
505+ f"Delete { name } on { node .ip } raised: { e } " )
506+ if done :
507+ break
508+ time .sleep (10 )
509+ super (RateLimitingUpgradeTests , self ).tearDown ()
510+
511+ def _set_rate_limit (self , master_node ):
512+ edit_params = {
513+ Bucket .throttleReserved : self .target_throttle_reserved ,
514+ Bucket .throttleHardLimit : self .target_throttle_hard_limit ,
515+ }
516+ return BucketRestApi (master_node ).edit_bucket (self .bucket .name ,
517+ edit_params )
518+
519+ def test_rate_limit_in_mixed_mode_cluster (self ):
520+ """
521+ With one node upgraded to 8.1 and others on the initial pre-8.1
522+ version, rate-limit edits must be rejected.
523+ """
524+ self .upgrade_version = self .upgrade_chain [- 1 ]
525+ nodes = list (self .cluster .nodes_in_cluster )
526+ first_node = nodes [0 ]
527+ self .log .info (f"Upgrading first node { first_node .ip } only" )
528+ self .upgrade_function [self .upgrade_type ](first_node )
529+
530+ status , content = self ._set_rate_limit (first_node )
531+ self .log .info (f"Mixed-mode rate-limit edit: status={ status } , "
532+ f"content={ content } " )
533+ try :
534+ self .assertFalse (
535+ status ,
536+ f"Rate limit edit should be rejected in mixed-mode: { content } " )
537+ finally :
538+ # Finish upgrading remaining nodes so cluster ends uniform 8.1;
539+ # otherwise bucket deletes fail in mixed-mode tearDown.
540+ for node in list (self .cluster .nodes_in_cluster ):
541+ if node .ip == first_node .ip :
542+ continue
543+ try :
544+ self .upgrade_function [self .upgrade_type ](node )
545+ except Exception as e :
546+ self .log .warning (
547+ f"Post-assert upgrade of { node .ip } failed: { e } " )
548+
549+ def test_rate_limit_after_full_upgrade (self ):
550+ """
551+ After full upgrade to 8.1+, rate-limit edits must succeed and
552+ values persist via REST.
553+ """
554+ self .upgrade_version = self .upgrade_chain [- 1 ]
555+ for node in list (self .cluster .nodes_in_cluster ):
556+ self .log .info (f"Upgrading node { node .ip } " )
557+ self .upgrade_function [self .upgrade_type ](node )
558+
559+ status , content = self ._set_rate_limit (self .cluster .master )
560+ self .assertTrue (
561+ status , f"Rate limit edit should succeed post-upgrade: { content } " )
562+
563+ _ , buckets = self .bucket_rest .get_bucket_info ()
564+ bucket_info = next (b for b in buckets if b ['name' ] == self .bucket .name )
565+ self .assertEqual (
566+ bucket_info .get ('throttleReserved' ),
567+ self .target_throttle_reserved ,
568+ f"throttleReserved mismatch post-upgrade: "
569+ f"{ bucket_info .get ('throttleReserved' )} " )
570+ self .assertEqual (
571+ bucket_info .get ('throttleHardLimit' ),
572+ self .target_throttle_hard_limit ,
573+ f"throttleHardLimit mismatch post-upgrade: "
574+ f"{ bucket_info .get ('throttleHardLimit' )} " )
0 commit comments