Skip to content

Commit 923acc2

Browse files
authored
fix: (CDK) (AsyncRetriever) - Add polling_timeout optional field (#428)
1 parent d4fdd4f commit 923acc2

File tree

5 files changed

+40
-11
lines changed

5 files changed

+40
-11
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+8-1
Original file line numberDiff line numberDiff line change
@@ -3333,7 +3333,7 @@ definitions:
33333333
items:
33343334
type: string
33353335
AsyncRetriever:
3336-
description: "[Experimental - We expect the interface to change shortly and we reserve the right to not consider this a breaking change] Retrieves records by Asynchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router."
3336+
description: "Retrieves records by Asynchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router."
33373337
type: object
33383338
required:
33393339
- type
@@ -3381,6 +3381,13 @@ definitions:
33813381
anyOf:
33823382
- "$ref": "#/definitions/CustomRequester"
33833383
- "$ref": "#/definitions/HttpRequester"
3384+
polling_job_timeout:
3385+
description: The time in minutes after which the single Async Job should be considered as Timed Out.
3386+
anyOf:
3387+
- type: integer
3388+
- type: string
3389+
interpolation_context:
3390+
- config
33843391
download_target_requester:
33853392
description: Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.
33863393
anyOf:

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -1467,7 +1467,7 @@ class AddFields(BaseModel):
14671467
)
14681468
condition: Optional[str] = Field(
14691469
"",
1470-
description="Fields will be added if expression is evaluated to True.,",
1470+
description="Fields will be added if expression is evaluated to True.",
14711471
examples=[
14721472
"{{ property|string == '' }}",
14731473
"{{ property is integer }}",
@@ -2354,6 +2354,10 @@ class AsyncRetriever(BaseModel):
23542354
...,
23552355
description="Requester component that describes how to prepare HTTP requests to send to the source API to fetch the status of the running async job.",
23562356
)
2357+
polling_job_timeout: Optional[Union[int, str]] = Field(
2358+
None,
2359+
description="The time in minutes after which the single Async Job should be considered as Timed Out.",
2360+
)
23572361
download_target_requester: Optional[Union[CustomRequester, HttpRequester]] = Field(
23582362
None,
23592363
description="Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+24-1
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@
507507
IncrementingCountStreamStateConverter,
508508
)
509509
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
510-
from airbyte_cdk.sources.types import Config, ConnectionDefinition
510+
from airbyte_cdk.sources.types import Config
511511
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
512512

513513
ComponentDefinition = Mapping[str, Any]
@@ -2939,6 +2939,27 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
29392939
parameters={},
29402940
)
29412941

2942+
def _get_job_timeout() -> datetime.timedelta:
2943+
user_defined_timeout: Optional[int] = (
2944+
int(
2945+
InterpolatedString.create(
2946+
str(model.polling_job_timeout),
2947+
parameters={},
2948+
).eval(config)
2949+
)
2950+
if model.polling_job_timeout
2951+
else None
2952+
)
2953+
2954+
# check for user defined timeout during the test read or 15 minutes
2955+
test_read_timeout = datetime.timedelta(minutes=user_defined_timeout or 15)
2956+
# default value for non-connector builder is 60 minutes.
2957+
default_sync_timeout = datetime.timedelta(minutes=user_defined_timeout or 60)
2958+
2959+
return (
2960+
test_read_timeout if self._emit_connector_builder_messages else default_sync_timeout
2961+
)
2962+
29422963
decoder = (
29432964
self._create_component_from_model(model=model.decoder, config=config)
29442965
if model.decoder
@@ -3032,6 +3053,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
30323053
config=config,
30333054
name=name,
30343055
)
3056+
30353057
job_repository: AsyncJobRepository = AsyncHttpJobRepository(
30363058
creation_requester=creation_requester,
30373059
polling_requester=polling_requester,
@@ -3042,6 +3064,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
30423064
status_extractor=status_extractor,
30433065
status_mapping=self._create_async_job_status_mapping(model.status_mapping, config),
30443066
download_target_extractor=download_target_extractor,
3067+
job_timeout=_get_job_timeout(),
30453068
)
30463069

30473070
async_job_partition_router = AsyncJobPartitionRouter(

airbyte_cdk/sources/declarative/requesters/http_job_repository.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ class AsyncHttpJobRepository(AsyncJobRepository):
4545
status_mapping: Mapping[str, AsyncJobStatus]
4646
download_target_extractor: DpathExtractor
4747

48+
# timeout for the job to be completed, passed from `polling_job_timeout`
4849
job_timeout: Optional[timedelta] = None
50+
4951
record_extractor: RecordExtractor = field(
5052
init=False, repr=False, default_factory=lambda: ResponseToFileExtractor({})
5153
)
@@ -131,7 +133,7 @@ def _start_job_and_validate_response(self, stream_slice: StreamSlice) -> request
131133
log_formatter=lambda response: format_http_message(
132134
response=response,
133135
title="Async Job -- Create",
134-
description="Create the server-side async job.",
136+
description=f"Create the server-side async job. Timeout after: {self.job_timeout}",
135137
stream_name=None,
136138
is_auxiliary=True,
137139
type="ASYNC_CREATE",

airbyte_cdk/sources/declarative/retrievers/async_retriever.py

-7
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,17 @@
44
from dataclasses import InitVar, dataclass, field
55
from typing import Any, Iterable, Mapping, Optional
66

7-
from typing_extensions import deprecated
8-
97
from airbyte_cdk.sources.declarative.async_job.job import AsyncJob
108
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
119
from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import (
1210
AsyncJobPartitionRouter,
1311
)
1412
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
15-
from airbyte_cdk.sources.source import ExperimentalClassWarning
1613
from airbyte_cdk.sources.streams.core import StreamData
1714
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
1815
from airbyte_cdk.sources.utils.slice_logger import AlwaysLogSliceLogger
1916

2017

21-
@deprecated(
22-
"This class is experimental. Use at your own risk.",
23-
category=ExperimentalClassWarning,
24-
)
2518
@dataclass
2619
class AsyncRetriever(Retriever):
2720
config: Config

0 commit comments

Comments
 (0)