Skip to content

Commit 9ad9e09

Browse files
authored
fix: (CDK) (AsyncRetriever) - fix availability strategy issues, during check connection (#419)
1 parent b79177b commit 9ad9e09

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

airbyte_cdk/sources/declarative/declarative_stream.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1515
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
1616
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
17+
from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
1718
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
1819
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
1920
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
@@ -76,11 +77,17 @@ def primary_key(self, value: str) -> None:
7677

7778
@property
7879
def exit_on_rate_limit(self) -> bool:
80+
if isinstance(self.retriever, AsyncRetriever):
81+
return self.retriever.exit_on_rate_limit
82+
7983
return self.retriever.requester.exit_on_rate_limit # type: ignore # abstract Retriever class has not requester attribute
8084

8185
@exit_on_rate_limit.setter
8286
def exit_on_rate_limit(self, value: bool) -> None:
83-
self.retriever.requester.exit_on_rate_limit = value # type: ignore[attr-defined]
87+
if isinstance(self.retriever, AsyncRetriever):
88+
self.retriever.exit_on_rate_limit = value
89+
else:
90+
self.retriever.requester.exit_on_rate_limit = value # type: ignore[attr-defined]
8491

8592
@property # type: ignore
8693
def name(self) -> str:

airbyte_cdk/sources/declarative/retrievers/async_retriever.py

+30
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,36 @@ class AsyncRetriever(Retriever):
3636
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3737
self._parameters = parameters
3838

39+
@property
40+
def exit_on_rate_limit(self) -> bool:
41+
"""
42+
Whether to exit on rate limit. This is a property of the job repository
43+
and not the stream slicer. The stream slicer is responsible for creating
44+
the jobs, but the job repository is responsible for managing the rate
45+
limits and other job-related properties.
46+
47+
Note:
48+
- If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits
49+
- If the `creation_requester` can place / create the job - it means all other requesters should successfully manage
50+
to complete the results.
51+
"""
52+
job_orchestrator = self.stream_slicer._job_orchestrator
53+
if job_orchestrator is None:
54+
# Default value when orchestrator is not available
55+
return False
56+
return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore
57+
58+
@exit_on_rate_limit.setter
59+
def exit_on_rate_limit(self, value: bool) -> None:
60+
"""
61+
Sets the `exit_on_rate_limit` property of the job repository > creation_requester,
62+
meaning that the Job cannot be placed / created if the rate limit is reached.
63+
Thus no further work on managing jobs is expected to be done.
64+
"""
65+
job_orchestrator = self.stream_slicer._job_orchestrator
66+
if job_orchestrator is not None:
67+
job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value # type: ignore[attr-defined, assignment]
68+
3969
@property
4070
def state(self) -> StreamState:
4171
"""

0 commit comments

Comments
 (0)