Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add lazy read to simple retriver #418

Merged
merged 30 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
956ab46
Add lazy read to constructor
lazebnyi Mar 13, 2025
3723a3f
Add implementation
lazebnyi Mar 13, 2025
041632d
Update partition router with process_parent_record
lazebnyi Mar 13, 2025
604dbe8
Re-generate models
lazebnyi Mar 13, 2025
dae0b98
Auto-fix lint and format issues
Mar 13, 2025
40efd79
Add LazySimpleRetriever to retrivers
lazebnyi Mar 13, 2025
65b0546
Add lazy_read_pointer model
lazebnyi Mar 13, 2025
74dbe15
Fix mypy
lazebnyi Mar 13, 2025
b537602
Fix typo
lazebnyi Mar 13, 2025
33b9f83
Auto-fix lint and format issues
Mar 13, 2025
af80b23
Fix mypy after fromating
lazebnyi Mar 13, 2025
2e37296
Merge master to branch
lazebnyi Mar 13, 2025
ed4ea74
Add unit tests
lazebnyi Mar 13, 2025
4f03269
Auto-fix lint and format issues
Mar 13, 2025
f19d048
Add extra_fields support
lazebnyi Mar 13, 2025
ad75c52
Merge branch 'lazebnyi/add-lazy-read-to-simple-retriver' of github.co…
lazebnyi Mar 13, 2025
9664679
Fix child extraction
lazebnyi Mar 13, 2025
2f31d73
Refactor lazy read
lazebnyi Mar 13, 2025
b278dcd
Auto-fix lint and format issues
Mar 13, 2025
b06800f
Update some conditions
lazebnyi Mar 13, 2025
11533d8
Fix mypy
lazebnyi Mar 13, 2025
1a10bb6
Auto-fix lint and format issues
Mar 13, 2025
c903420
Fix unittest
lazebnyi Mar 13, 2025
864f194
Merge branch 'lazebnyi/add-lazy-read-to-simple-retriver' of github.co…
lazebnyi Mar 13, 2025
4687726
Fix typo
lazebnyi Mar 13, 2025
0b49ac4
Merge branch 'main' into lazebnyi/add-lazy-read-to-simple-retriver
lazebnyi Mar 13, 2025
3d31e27
Auto-fix lint and format issues
Mar 13, 2025
0829662
Add docs to SafeResponse
lazebnyi Mar 13, 2025
0a95ed3
Merge branch 'lazebnyi/add-lazy-read-to-simple-retriver' of github.co…
lazebnyi Mar 13, 2025
ad521a0
Auto-fix lint and format issues
Mar 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2873,6 +2873,15 @@ definitions:
type:
type: string
enum: [ParentStreamConfig]
lazy_read_pointer:
title: Lazy Read Pointer
description: If set, this will enable lazy reading, using the initial read of parent records to extract child records.
type: array
default: [ ]
items:
- type: string
interpolation_context:
- config
parent_key:
title: Parent Key
description: The primary key of records from the parent stream that will be used during the retrieval of records for the current substream. This parent identifier field is typically a characteristic of the child records being extracted from the source API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2205,6 +2205,11 @@ class DynamicSchemaLoader(BaseModel):

class ParentStreamConfig(BaseModel):
type: Literal["ParentStreamConfig"]
lazy_read_pointer: Optional[List[str]] = Field(
[],
description="If set, this will enable lazy reading, using the initial read of parent records to extract child records.",
title="Lazy Read Pointer",
)
parent_key: str = Field(
...,
description="The primary key of records from the parent stream that will be used during the retrieval of records for the current substream. This parent identifier field is typically a characteristic of the child records being extracted from the source API.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@
)
from airbyte_cdk.sources.declarative.retrievers import (
AsyncRetriever,
LazySimpleRetriever,
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)
Expand Down Expand Up @@ -1745,6 +1746,7 @@ def create_declarative_stream(
transformations.append(
self._create_component_from_model(model=transformation_model, config=config)
)

retriever = self._create_component_from_model(
model=model.retriever,
config=config,
Expand All @@ -1755,6 +1757,7 @@ def create_declarative_stream(
stop_condition_on_cursor=stop_condition_on_cursor,
client_side_incremental_sync=client_side_incremental_sync,
transformations=transformations,
incremental_sync=model.incremental_sync,
)
cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None

Expand Down Expand Up @@ -2525,6 +2528,16 @@ def create_parent_stream_config(
if model.request_option
else None
)

if model.lazy_read_pointer and any("*" in pointer for pointer in model.lazy_read_pointer):
raise ValueError(
"The '*' wildcard in 'lazy_read_pointer' is not supported — only direct paths are allowed."
)

model_lazy_read_pointer: List[Union[InterpolatedString, str]] = (
[x for x in model.lazy_read_pointer] if model.lazy_read_pointer else []
)

return ParentStreamConfig(
parent_key=model.parent_key,
request_option=request_option,
Expand All @@ -2534,6 +2547,7 @@ def create_parent_stream_config(
incremental_dependency=model.incremental_dependency or False,
parameters=model.parameters or {},
extra_fields=model.extra_fields,
lazy_read_pointer=model_lazy_read_pointer,
)

@staticmethod
Expand Down Expand Up @@ -2674,6 +2688,12 @@ def create_simple_retriever(
stop_condition_on_cursor: bool = False,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
transformations: List[RecordTransformation],
incremental_sync: Optional[
Union[
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
]
] = None,
**kwargs: Any,
) -> SimpleRetriever:
decoder = (
self._create_component_from_model(model=model.decoder, config=config)
Expand Down Expand Up @@ -2731,6 +2751,45 @@ def create_simple_retriever(
model.ignore_stream_slicer_parameters_on_paginated_requests or False
)

if (
model.partition_router
and model.partition_router.type == "SubstreamPartitionRouter" # type: ignore[union-attr] # 'model' is SimpleRetrieverModel
and not bool(self._connector_state_manager.get_stream_state(name, None))
and any(
parent_stream_config.lazy_read_pointer
for parent_stream_config in model.partition_router.parent_stream_configs # type: ignore[union-attr] # partition_router type guaranteed by a condition earlier
)
):
if incremental_sync:
if incremental_sync.type != "DatetimeBasedCursor":
raise ValueError(
f"LazySimpleRetriever only supports DatetimeBasedCursor. Found: {incremental_sync.type}."
)

elif incremental_sync.step or incremental_sync.cursor_granularity:
raise ValueError(
f"Found more that one slice per parent. LazySimpleRetriever only supports single slice read for stream - {name}."
)

if model.decoder and model.decoder.type != "JsonDecoder":
raise ValueError(
f"LazySimpleRetriever only supports JsonDecoder. Found: {model.decoder.type}."
)

return LazySimpleRetriever(
name=name,
paginator=paginator,
primary_key=primary_key,
requester=requester,
record_selector=record_selector,
stream_slicer=stream_slicer,
request_option_provider=request_options_provider,
cursor=cursor,
config=config,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
parameters=model.parameters or {},
)

if self._limit_slices_fetched or self._emit_connector_builder_messages:
return SimpleRetrieverTestReadDecorator(
name=name,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import copy
import json
import logging
from dataclasses import InitVar, dataclass
from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import dpath
import requests

from airbyte_cdk.models import AirbyteMessage
from airbyte_cdk.models import Type as MessageType
Expand Down Expand Up @@ -46,6 +50,7 @@ class ParentStreamConfig:
)
request_option: Optional[RequestOption] = None
incremental_dependency: bool = False
lazy_read_pointer: Optional[List[Union[InterpolatedString, str]]] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.parent_key = InterpolatedString.create(self.parent_key, parameters=parameters)
Expand All @@ -59,6 +64,17 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
for key_path in self.extra_fields
]

self.lazy_read_pointer = (
[
InterpolatedString.create(path, parameters=parameters)
if isinstance(path, str)
else path
for path in self.lazy_read_pointer
]
if self.lazy_read_pointer
else None
)


@dataclass
class SubstreamPartitionRouter(PartitionRouter):
Expand Down Expand Up @@ -196,6 +212,15 @@ def stream_slices(self) -> Iterable[StreamSlice]:
# Add extra fields
extracted_extra_fields = self._extract_extra_fields(parent_record, extra_fields)

if parent_stream_config.lazy_read_pointer:
extracted_extra_fields = {
"child_response": self._extract_child_response(
parent_record,
parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
),
**extracted_extra_fields,
}

yield StreamSlice(
partition={
partition_field: partition_value,
Expand All @@ -205,6 +230,21 @@ def stream_slices(self) -> Iterable[StreamSlice]:
extra_fields=extracted_extra_fields,
)

def _extract_child_response(
self, parent_record: Mapping[str, Any] | AirbyteMessage, pointer: List[InterpolatedString]
) -> requests.Response:
"""Extract child records from a parent record based on lazy pointers."""

def _create_response(data: MutableMapping[str, Any]) -> SafeResponse:
"""Create a SafeResponse with the given data."""
response = SafeResponse()
response.content = json.dumps(data).encode("utf-8")
response.status_code = 200
return response

path = [path.eval(self.config) for path in pointer]
return _create_response(dpath.get(parent_record, path, default=[])) # type: ignore # argunet will be a MutableMapping, given input data structure

def _extract_extra_fields(
self,
parent_record: Mapping[str, Any] | AirbyteMessage,
Expand Down Expand Up @@ -381,3 +421,16 @@ def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
@property
def logger(self) -> logging.Logger:
return logging.getLogger("airbyte.SubstreamPartitionRouter")


class SafeResponse(requests.Response):
def __getattr__(self, name: str) -> Any:
return getattr(requests.Response, name, None)

@property
def content(self) -> Optional[bytes]:
return super().content

@content.setter
def content(self, value: Union[str, bytes]) -> None:
self._content = value.encode() if isinstance(value, str) else value
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ def next_page_token(
last_page_token_value: Optional[Any] = None,
) -> Optional[Any]:
decoded_response = next(self.decoder.decode(response))

# The default way that link is presented in requests.Response is a string of various links (last, next, etc). This
# is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links
headers: Dict[str, Any] = dict(response.headers)
Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/retrievers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import (
LazySimpleRetriever,
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)
Expand All @@ -14,4 +15,5 @@
"SimpleRetriever",
"SimpleRetrieverTestReadDecorator",
"AsyncRetriever",
"LazySimpleRetriever",
]
87 changes: 85 additions & 2 deletions airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,20 @@
from dataclasses import InitVar, dataclass, field
from functools import partial
from itertools import islice
from typing import Any, Callable, Iterable, List, Mapping, Optional, Set, Tuple, Union
from typing import (
Any,
Callable,
Iterable,
List,
Mapping,
Optional,
Set,
Tuple,
Union,
)

import requests
from typing_extensions import deprecated

from airbyte_cdk.models import AirbyteMessage
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
Expand All @@ -28,6 +39,7 @@
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.http_logger import format_http_message
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.utils.mapping_helpers import combine_mappings
Expand Down Expand Up @@ -438,8 +450,8 @@ def read_records(
most_recent_record_from_slice = None
record_generator = partial(
self._parse_records,
stream_slice=stream_slice,
stream_state=self.state or {},
stream_slice=_slice,
records_schema=records_schema,
)

Expand Down Expand Up @@ -618,3 +630,74 @@ def _fetch_next_page(
self.name,
),
)


@deprecated(
"This class is experimental. Use at your own risk.",
category=ExperimentalClassWarning,
)
@dataclass
class LazySimpleRetriever(SimpleRetriever):
"""
A retriever that supports lazy loading from parent streams.
"""

def _read_pages(
self,
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
stream_state: Mapping[str, Any],
stream_slice: StreamSlice,
) -> Iterable[Record]:
response = stream_slice.extra_fields["child_response"]
print(f"LOL, {response.json()}")
if response:
last_page_size, last_record = 0, None
for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func
last_page_size += 1
last_record = record
yield record

next_page_token = self._next_page_token(response, last_page_size, last_record, None)
if next_page_token:
yield from self._paginate(
next_page_token,
records_generator_fn,
stream_state,
stream_slice,
)

yield from []
else:
yield from self._read_pages(records_generator_fn, stream_state, stream_slice)

def _paginate(
self,
next_page_token: Any,
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
stream_state: Mapping[str, Any],
stream_slice: StreamSlice,
) -> Iterable[Record]:
"""Handle pagination by fetching subsequent pages."""
pagination_complete = False

while not pagination_complete:
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
last_page_size, last_record = 0, None

for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func
last_page_size += 1
last_record = record
yield record

if not response:
pagination_complete = True
else:
last_page_token_value = (
next_page_token.get("next_page_token") if next_page_token else None
)
next_page_token = self._next_page_token(
response, last_page_size, last_record, last_page_token_value
)

if not next_page_token:
pagination_complete = True
Loading
Loading