22
22
import inspect
23
23
import logging
24
24
import math
25
+ from itertools import tee
25
26
27
+ from google .api_core import __version__ as _api_core_version
26
28
from google .api_core import exceptions as api_exceptions
27
29
from google .api_core import retry
28
30
from google .cloud .storage .retry import DEFAULT_RETRY
29
31
from google .cloud .storage .retry import _should_retry # pylint: disable=protected-access
32
+ from packaging import version
30
33
31
34
from apache_beam .metrics .metric import Metrics
32
35
from apache_beam .options .pipeline_options import GoogleCloudOptions
33
36
34
37
_LOGGER = logging .getLogger (__name__ )
35
38
36
39
__all__ = ['DEFAULT_RETRY_WITH_THROTTLING_COUNTER' ]
40
+ _MIN_SLEEP_ARG_SWITCH_VERSION = version .parse ("2.25.0rc0" )
41
+ _LEGACY_SLEEP_ARG_NAME = "next_sleep"
42
+ _CURRENT_SLEEP_ARG_NAME = "sleep_iterator"
37
43
38
44
39
45
class ThrottlingHandler (object ):
40
46
_THROTTLED_SECS = Metrics .counter ('gcsio' , "cumulativeThrottlingSeconds" )
41
47
48
+ def __init__ (self ):
49
+ # decide which arg name google-api-core uses
50
+ try :
51
+ core_ver = version .parse (_api_core_version )
52
+ except Exception :
53
+ core_ver = version .parse ("0" )
54
+ if core_ver < _MIN_SLEEP_ARG_SWITCH_VERSION :
55
+ self ._sleep_arg = _LEGACY_SLEEP_ARG_NAME
56
+ else :
57
+ self ._sleep_arg = _CURRENT_SLEEP_ARG_NAME
58
+
42
59
def __call__ (self , exc ):
43
60
if isinstance (exc , api_exceptions .TooManyRequests ):
44
61
_LOGGER .debug ('Caught GCS quota error (%s), retrying.' , exc .reason )
@@ -54,9 +71,19 @@ def __call__(self, exc):
54
71
_LOGGER .warning ('cannot inspect the caller stack frame' )
55
72
return
56
73
57
- # next_sleep is one of the arguments in the caller
58
- # i.e. _retry_error_helper() in google/api_core/retry/retry_base.py
59
- sleep_seconds = prev_frame .f_locals .get ("next_sleep" , 0 )
74
+ # Determine which retry helper argument to inspect in
75
+ # google/api_core/retry/retry_base.py’s _retry_error_helper():
76
+ # - versions < 2.25.0rc0 use “next_sleep”
77
+ # - versions ≥ 2.25.0rc0 use “sleep_iterator”
78
+ if self ._sleep_arg == _LEGACY_SLEEP_ARG_NAME :
79
+ sleep_seconds = prev_frame .f_locals .get (self ._sleep_arg , 0 )
80
+ else :
81
+ sleep_iterator = prev_frame .f_locals .get (self ._sleep_arg , iter ([]))
82
+ sleep_iterator , sleep_iterator_copy = tee (sleep_iterator )
83
+ try :
84
+ sleep_seconds = next (sleep_iterator_copy )
85
+ except StopIteration :
86
+ sleep_seconds = 0
60
87
ThrottlingHandler ._THROTTLED_SECS .inc (math .ceil (sleep_seconds ))
61
88
62
89
0 commit comments