diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 57706ac3a..1cb53104b 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2890,6 +2890,15 @@ definitions: type: type: string enum: [ParentStreamConfig] + lazy_read_pointer: + title: Lazy Read Pointer + description: If set, this will enable lazy reading, using the initial read of parent records to extract child records. + type: array + default: [ ] + items: + - type: string + interpolation_context: + - config parent_key: title: Parent Key description: The primary key of records from the parent stream that will be used during the retrieval of records for the current substream. This parent identifier field is typically a characteristic of the child records being extracted from the source API. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index d5291345b..e4fb459ff 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -2224,6 +2224,11 @@ class DynamicSchemaLoader(BaseModel): class ParentStreamConfig(BaseModel): type: Literal["ParentStreamConfig"] + lazy_read_pointer: Optional[List[str]] = Field( + [], + description="If set, this will enable lazy reading, using the initial read of parent records to extract child records.", + title="Lazy Read Pointer", + ) parent_key: str = Field( ..., description="The primary key of records from the parent stream that will be used during the retrieval of records for the current substream. This parent identifier field is typically a characteristic of the child records being extracted from the source API.", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 177e80059..86e880b20 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -438,6 +438,7 @@ ) from airbyte_cdk.sources.declarative.retrievers import ( AsyncRetriever, + LazySimpleRetriever, SimpleRetriever, SimpleRetrieverTestReadDecorator, ) @@ -1750,6 +1751,7 @@ def create_declarative_stream( transformations.append( self._create_component_from_model(model=transformation_model, config=config) ) + retriever = self._create_component_from_model( model=model.retriever, config=config, @@ -1760,6 +1762,7 @@ def create_declarative_stream( stop_condition_on_cursor=stop_condition_on_cursor, client_side_incremental_sync=client_side_incremental_sync, transformations=transformations, + incremental_sync=model.incremental_sync, ) cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None @@ -1905,7 +1908,9 @@ def _merge_stream_slicers( ) -> Optional[StreamSlicer]: retriever_model = model.retriever - stream_slicer = self._build_stream_slicer_from_partition_router(retriever_model, config) + stream_slicer = self._build_stream_slicer_from_partition_router( + retriever_model, config, stream_name=model.name + ) if retriever_model.type == "AsyncRetriever": is_not_datetime_cursor = ( @@ -2530,6 +2535,16 @@ def create_parent_stream_config( if model.request_option else None ) + + if model.lazy_read_pointer and any("*" in pointer for pointer in model.lazy_read_pointer): + raise ValueError( + "The '*' wildcard in 'lazy_read_pointer' is not supported — only direct paths are allowed." + ) + + model_lazy_read_pointer: List[Union[InterpolatedString, str]] = ( + [x for x in model.lazy_read_pointer] if model.lazy_read_pointer else [] + ) + return ParentStreamConfig( parent_key=model.parent_key, request_option=request_option, @@ -2539,6 +2554,7 @@ def create_parent_stream_config( incremental_dependency=model.incremental_dependency or False, parameters=model.parameters or {}, extra_fields=model.extra_fields, + lazy_read_pointer=model_lazy_read_pointer, ) @staticmethod @@ -2681,6 +2697,12 @@ def create_simple_retriever( stop_condition_on_cursor: bool = False, client_side_incremental_sync: Optional[Dict[str, Any]] = None, transformations: List[RecordTransformation], + incremental_sync: Optional[ + Union[ + IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel + ] + ] = None, + **kwargs: Any, ) -> SimpleRetriever: decoder = ( self._create_component_from_model(model=model.decoder, config=config) @@ -2738,6 +2760,45 @@ def create_simple_retriever( model.ignore_stream_slicer_parameters_on_paginated_requests or False ) + if ( + model.partition_router + and isinstance(model.partition_router, SubstreamPartitionRouterModel) + and not bool(self._connector_state_manager.get_stream_state(name, None)) + and any( + parent_stream_config.lazy_read_pointer + for parent_stream_config in model.partition_router.parent_stream_configs + ) + ): + if incremental_sync: + if incremental_sync.type != "DatetimeBasedCursor": + raise ValueError( + f"LazySimpleRetriever only supports DatetimeBasedCursor. Found: {incremental_sync.type}." + ) + + elif incremental_sync.step or incremental_sync.cursor_granularity: + raise ValueError( + f"Found more that one slice per parent. LazySimpleRetriever only supports single slice read for stream - {name}." + ) + + if model.decoder and model.decoder.type != "JsonDecoder": + raise ValueError( + f"LazySimpleRetriever only supports JsonDecoder. Found: {model.decoder.type}." + ) + + return LazySimpleRetriever( + name=name, + paginator=paginator, + primary_key=primary_key, + requester=requester, + record_selector=record_selector, + stream_slicer=stream_slicer, + request_option_provider=request_options_provider, + cursor=cursor, + config=config, + ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, + parameters=model.parameters or {}, + ) + if self._limit_slices_fetched or self._emit_connector_builder_messages: return SimpleRetrieverTestReadDecorator( name=name, diff --git a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index fa9264843..50fe4f0bc 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -1,12 +1,16 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # + + import copy +import json import logging from dataclasses import InitVar, dataclass from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union import dpath +import requests from airbyte_cdk.models import AirbyteMessage from airbyte_cdk.models import Type as MessageType @@ -46,6 +50,7 @@ class ParentStreamConfig: ) request_option: Optional[RequestOption] = None incremental_dependency: bool = False + lazy_read_pointer: Optional[List[Union[InterpolatedString, str]]] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self.parent_key = InterpolatedString.create(self.parent_key, parameters=parameters) @@ -59,6 +64,17 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: for key_path in self.extra_fields ] + self.lazy_read_pointer = ( + [ + InterpolatedString.create(path, parameters=parameters) + if isinstance(path, str) + else path + for path in self.lazy_read_pointer + ] + if self.lazy_read_pointer + else None + ) + @dataclass class SubstreamPartitionRouter(PartitionRouter): @@ -196,6 +212,15 @@ def stream_slices(self) -> Iterable[StreamSlice]: # Add extra fields extracted_extra_fields = self._extract_extra_fields(parent_record, extra_fields) + if parent_stream_config.lazy_read_pointer: + extracted_extra_fields = { + "child_response": self._extract_child_response( + parent_record, + parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config + ), + **extracted_extra_fields, + } + yield StreamSlice( partition={ partition_field: partition_value, @@ -205,6 +230,21 @@ def stream_slices(self) -> Iterable[StreamSlice]: extra_fields=extracted_extra_fields, ) + def _extract_child_response( + self, parent_record: Mapping[str, Any] | AirbyteMessage, pointer: List[InterpolatedString] + ) -> requests.Response: + """Extract child records from a parent record based on lazy pointers.""" + + def _create_response(data: MutableMapping[str, Any]) -> SafeResponse: + """Create a SafeResponse with the given data.""" + response = SafeResponse() + response.content = json.dumps(data).encode("utf-8") + response.status_code = 200 + return response + + path = [path.eval(self.config) for path in pointer] + return _create_response(dpath.get(parent_record, path, default=[])) # type: ignore # argunet will be a MutableMapping, given input data structure + def _extract_extra_fields( self, parent_record: Mapping[str, Any] | AirbyteMessage, @@ -376,3 +416,22 @@ def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: @property def logger(self) -> logging.Logger: return logging.getLogger("airbyte.SubstreamPartitionRouter") + + +class SafeResponse(requests.Response): + """ + A subclass of requests.Response that acts as an interface to migrate parsed child records + into a response object. This allows seamless interaction with child records as if they + were original response, ensuring compatibility with methods that expect requests.Response data type. + """ + + def __getattr__(self, name: str) -> Any: + return getattr(requests.Response, name, None) + + @property + def content(self) -> Optional[bytes]: + return super().content + + @content.setter + def content(self, value: Union[str, bytes]) -> None: + self._content = value.encode() if isinstance(value, str) else value diff --git a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py index e35c84c7c..8df5ce66f 100644 --- a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py +++ b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py @@ -71,7 +71,6 @@ def next_page_token( last_page_token_value: Optional[Any] = None, ) -> Optional[Any]: decoded_response = next(self.decoder.decode(response)) - # The default way that link is presented in requests.Response is a string of various links (last, next, etc). This # is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links headers: Dict[str, Any] = dict(response.headers) diff --git a/airbyte_cdk/sources/declarative/retrievers/__init__.py b/airbyte_cdk/sources/declarative/retrievers/__init__.py index 2c3bbc7d5..5b26220e0 100644 --- a/airbyte_cdk/sources/declarative/retrievers/__init__.py +++ b/airbyte_cdk/sources/declarative/retrievers/__init__.py @@ -5,6 +5,7 @@ from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.declarative.retrievers.simple_retriever import ( + LazySimpleRetriever, SimpleRetriever, SimpleRetrieverTestReadDecorator, ) @@ -14,4 +15,5 @@ "SimpleRetriever", "SimpleRetrieverTestReadDecorator", "AsyncRetriever", + "LazySimpleRetriever", ] diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index df41e14a7..65aa5d406 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -6,9 +6,20 @@ from dataclasses import InitVar, dataclass, field from functools import partial from itertools import islice -from typing import Any, Callable, Iterable, List, Mapping, Optional, Set, Tuple, Union +from typing import ( + Any, + Callable, + Iterable, + List, + Mapping, + Optional, + Set, + Tuple, + Union, +) import requests +from typing_extensions import deprecated from airbyte_cdk.models import AirbyteMessage from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector @@ -28,6 +39,7 @@ from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer from airbyte_cdk.sources.http_logger import format_http_message +from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState from airbyte_cdk.utils.mapping_helpers import combine_mappings @@ -438,8 +450,8 @@ def read_records( most_recent_record_from_slice = None record_generator = partial( self._parse_records, + stream_slice=stream_slice, stream_state=self.state or {}, - stream_slice=_slice, records_schema=records_schema, ) @@ -618,3 +630,73 @@ def _fetch_next_page( self.name, ), ) + + +@deprecated( + "This class is experimental. Use at your own risk.", + category=ExperimentalClassWarning, +) +@dataclass +class LazySimpleRetriever(SimpleRetriever): + """ + A retriever that supports lazy loading from parent streams. + """ + + def _read_pages( + self, + records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], + stream_state: Mapping[str, Any], + stream_slice: StreamSlice, + ) -> Iterable[Record]: + response = stream_slice.extra_fields["child_response"] + if response: + last_page_size, last_record = 0, None + for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func + last_page_size += 1 + last_record = record + yield record + + next_page_token = self._next_page_token(response, last_page_size, last_record, None) + if next_page_token: + yield from self._paginate( + next_page_token, + records_generator_fn, + stream_state, + stream_slice, + ) + + yield from [] + else: + yield from self._read_pages(records_generator_fn, stream_state, stream_slice) + + def _paginate( + self, + next_page_token: Any, + records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], + stream_state: Mapping[str, Any], + stream_slice: StreamSlice, + ) -> Iterable[Record]: + """Handle pagination by fetching subsequent pages.""" + pagination_complete = False + + while not pagination_complete: + response = self._fetch_next_page(stream_state, stream_slice, next_page_token) + last_page_size, last_record = 0, None + + for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func + last_page_size += 1 + last_record = record + yield record + + if not response: + pagination_complete = True + else: + last_page_token_value = ( + next_page_token.get("next_page_token") if next_page_token else None + ) + next_page_token = self._next_page_token( + response, last_page_size, last_record, last_page_token_value + ) + + if not next_page_token: + pagination_complete = True diff --git a/unit_tests/sources/declarative/retrievers/test_lazy_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_lazy_simple_retriever.py new file mode 100644 index 000000000..d98f733dc --- /dev/null +++ b/unit_tests/sources/declarative/retrievers/test_lazy_simple_retriever.py @@ -0,0 +1,376 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +import json +from unittest.mock import MagicMock + +import freezegun + +from airbyte_cdk.models import ( + AirbyteStateBlob, + AirbyteStateMessage, + AirbyteStateType, + AirbyteStreamState, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + StreamDescriptor, + Type, +) +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +) +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse + +_CONFIG = {"start_date": "2024-07-01T00:00:00.000Z"} +_MANIFEST = { + "version": "6.0.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["TestStream"]}, + "definitions": { + "TestStream": { + "type": "DeclarativeStream", + "name": "TestStream", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": {}, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "parent/{{ stream_partition.parent_id }}/items", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": ["data"]}, + }, + "paginator": { + "type": "DefaultPaginator", + "page_token_option": { + "type": "RequestOption", + "inject_into": "request_parameter", + "field_name": "starting_after", + }, + "pagination_strategy": { + "type": "CursorPagination", + "cursor_value": '{{ response["data"][-1]["id"] }}', + "stop_condition": '{{ not response.get("has_more", False) }}', + }, + }, + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "parent_id", + "lazy_read_pointer": ["items"], + "stream": { + "type": "DeclarativeStream", + "name": "parent", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "/parents", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "DpathExtractor", + "field_path": ["data"], + }, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": {"id": {"type": "integer"}}, + "type": "object", + }, + }, + }, + } + ], + }, + }, + "incremental_sync": { + "type": "DatetimeBasedCursor", + "start_datetime": { + "datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}" + }, + "end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"}, + "datetime_format": "%Y-%m-%d", + "cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"], + "cursor_field": "updated_at", + "start_time_option": { + "type": "RequestOption", + "field_name": "start", + "inject_into": "request_parameter", + }, + "end_time_option": { + "type": "RequestOption", + "field_name": "end", + "inject_into": "request_parameter", + }, + }, + }, + }, + "streams": [{"$ref": "#/definitions/TestStream"}], + "spec": { + "connection_specification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": [], + "properties": {}, + "additionalProperties": True, + }, + "documentation_url": "https://example.org", + "type": "Spec", + }, +} + + +def to_configured_stream( + stream, + sync_mode=None, + destination_sync_mode=DestinationSyncMode.append, + cursor_field=None, + primary_key=None, +) -> ConfiguredAirbyteStream: + return ConfiguredAirbyteStream( + stream=stream, + sync_mode=sync_mode, + destination_sync_mode=destination_sync_mode, + cursor_field=cursor_field, + primary_key=primary_key, + ) + + +def to_configured_catalog( + configured_streams, +) -> ConfiguredAirbyteCatalog: + return ConfiguredAirbyteCatalog(streams=configured_streams) + + +def create_configured_catalog( + source: ConcurrentDeclarativeSource, config: dict +) -> ConfiguredAirbyteCatalog: + """ + Discovers streams from the source and converts them into a configured catalog. + """ + actual_catalog = source.discover(logger=source.logger, config=config) + configured_streams = [ + to_configured_stream(stream, primary_key=stream.source_defined_primary_key) + for stream in actual_catalog.streams + ] + return to_configured_catalog(configured_streams) + + +def get_records( + source: ConcurrentDeclarativeSource, + config: dict, + catalog: ConfiguredAirbyteCatalog, + state: list = None, +) -> list: + """ + Reads records from the source given a configuration, catalog, and optional state. + Returns a list of record data dictionaries. + """ + return [ + message.record.data + for message in source.read(logger=MagicMock(), config=config, catalog=catalog, state=state) + if message.type == Type.RECORD + ] + + +@freezegun.freeze_time("2024-07-15") +def test_retriever_with_lazy_reading(): + """Test the lazy loading behavior of the SimpleRetriever with paginated substream data.""" + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/parents"), + HttpResponse( + body=json.dumps( + { + "data": [ + { + "id": 1, + "name": "parent_1", + "updated_at": "2024-07-13", + "items": { + "data": [ + {"id": 1, "updated_at": "2024-07-13"}, + {"id": 2, "updated_at": "2024-07-13"}, + ], + "has_more": True, + }, + }, + { + "id": 2, + "name": "parent_2", + "updated_at": "2024-07-13", + "items": { + "data": [ + {"id": 3, "updated_at": "2024-07-13"}, + {"id": 4, "updated_at": "2024-07-13"}, + ], + "has_more": False, + }, + }, + ], + "has_more": False, + } + ) + ), + ) + + http_mocker.get( + HttpRequest( + url="https://api.test.com/parent/1/items?starting_after=2&start=2024-07-01&end=2024-07-15" + ), + HttpResponse( + body=json.dumps( + { + "data": [ + {"id": 5, "updated_at": "2024-07-13"}, + {"id": 6, "updated_at": "2024-07-13"}, + ], + "has_more": True, + } + ) + ), + ) + + http_mocker.get( + HttpRequest( + url="https://api.test.com/parent/1/items?starting_after=6&start=2024-07-01&end=2024-07-15" + ), + HttpResponse( + body=json.dumps( + {"data": [{"id": 7, "updated_at": "2024-07-13"}], "has_more": False} + ) + ), + ) + + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None + ) + configured_catalog = create_configured_catalog(source, _CONFIG) + + # Test full data retrieval (without state) + full_records = get_records(source, _CONFIG, configured_catalog) + expected_full = [ + {"id": 1, "updated_at": "2024-07-13"}, + {"id": 2, "updated_at": "2024-07-13"}, + {"id": 3, "updated_at": "2024-07-13"}, + {"id": 4, "updated_at": "2024-07-13"}, + {"id": 5, "updated_at": "2024-07-13"}, + {"id": 6, "updated_at": "2024-07-13"}, + {"id": 7, "updated_at": "2024-07-13"}, + ] + + assert all(record in expected_full for record in full_records) + + +@freezegun.freeze_time("2024-07-15") +def test_incremental_sync_with_state(): + """Test incremental sync behavior using state to fetch only new records.""" + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/parents"), + HttpResponse( + body=json.dumps( + { + "data": [ + { + "id": 1, + "name": "parent_1", + "updated_at": "2024-07-13", + "items": { + "data": [ + {"id": 1, "updated_at": "2024-07-13"}, + {"id": 2, "updated_at": "2024-07-13"}, + ], + "has_more": False, + }, + }, + { + "id": 2, + "name": "parent_2", + "updated_at": "2024-07-13", + "items": { + "data": [ + {"id": 3, "updated_at": "2024-07-13"}, + {"id": 4, "updated_at": "2024-07-13"}, + ], + "has_more": False, + }, + }, + ], + "has_more": False, + } + ) + ), + ) + + http_mocker.get( + HttpRequest(url="https://api.test.com/parent/1/items?start=2024-07-13&end=2024-07-15"), + HttpResponse( + body=json.dumps( + {"data": [{"id": 10, "updated_at": "2024-07-13"}], "has_more": False} + ) + ), + ) + http_mocker.get( + HttpRequest(url="https://api.test.com/parent/2/items?start=2024-07-13&end=2024-07-15"), + HttpResponse( + body=json.dumps( + {"data": [{"id": 11, "updated_at": "2024-07-13"}], "has_more": False} + ) + ), + ) + + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="TestStream", namespace=None), + stream_state=AirbyteStateBlob(updated_at="2024-07-13"), + ), + ) + ] + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=None, state=state + ) + configured_catalog = create_configured_catalog(source, _CONFIG) + + # Test incremental data retrieval (with state) + incremental_records = get_records(source, _CONFIG, configured_catalog, state) + expected_incremental = [ + {"id": 10, "updated_at": "2024-07-13"}, + {"id": 11, "updated_at": "2024-07-13"}, + ] + assert all(record in expected_incremental for record in incremental_records)