Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add lazy read to simple retriver #418

Merged
merged 30 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
956ab46
Add lazy read to constructor
lazebnyi Mar 13, 2025
3723a3f
Add implementation
lazebnyi Mar 13, 2025
041632d
Update partition router with process_parent_record
lazebnyi Mar 13, 2025
604dbe8
Re-generate models
lazebnyi Mar 13, 2025
dae0b98
Auto-fix lint and format issues
Mar 13, 2025
40efd79
Add LazySimpleRetriever to retrivers
lazebnyi Mar 13, 2025
65b0546
Add lazy_read_pointer model
lazebnyi Mar 13, 2025
74dbe15
Fix mypy
lazebnyi Mar 13, 2025
b537602
Fix typo
lazebnyi Mar 13, 2025
33b9f83
Auto-fix lint and format issues
Mar 13, 2025
af80b23
Fix mypy after fromating
lazebnyi Mar 13, 2025
2e37296
Merge master to branch
lazebnyi Mar 13, 2025
ed4ea74
Add unit tests
lazebnyi Mar 13, 2025
4f03269
Auto-fix lint and format issues
Mar 13, 2025
f19d048
Add extra_fields support
lazebnyi Mar 13, 2025
ad75c52
Merge branch 'lazebnyi/add-lazy-read-to-simple-retriver' of github.co…
lazebnyi Mar 13, 2025
9664679
Fix child extraction
lazebnyi Mar 13, 2025
2f31d73
Refactor lazy read
lazebnyi Mar 13, 2025
b278dcd
Auto-fix lint and format issues
Mar 13, 2025
b06800f
Update some conditions
lazebnyi Mar 13, 2025
11533d8
Fix mypy
lazebnyi Mar 13, 2025
1a10bb6
Auto-fix lint and format issues
Mar 13, 2025
c903420
Fix unittest
lazebnyi Mar 13, 2025
864f194
Merge branch 'lazebnyi/add-lazy-read-to-simple-retriver' of github.co…
lazebnyi Mar 13, 2025
4687726
Fix typo
lazebnyi Mar 13, 2025
0b49ac4
Merge branch 'main' into lazebnyi/add-lazy-read-to-simple-retriver
lazebnyi Mar 13, 2025
3d31e27
Auto-fix lint and format issues
Mar 13, 2025
0829662
Add docs to SafeResponse
lazebnyi Mar 13, 2025
0a95ed3
Merge branch 'lazebnyi/add-lazy-read-to-simple-retriver' of github.co…
lazebnyi Mar 13, 2025
ad521a0
Auto-fix lint and format issues
Mar 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@
)
from airbyte_cdk.sources.declarative.retrievers import (
AsyncRetriever,
LazySimpleRetriever,
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)
Expand Down Expand Up @@ -2647,6 +2648,12 @@ def create_simple_retriever(
stop_condition_on_cursor: bool = False,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
transformations: List[RecordTransformation],
incremental_sync: Optional[
Union[
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
]
] = None,
**kwargs: Any,
) -> SimpleRetriever:
decoder = (
self._create_component_from_model(model=model.decoder, config=config)
Expand Down Expand Up @@ -2704,6 +2711,38 @@ def create_simple_retriever(
model.ignore_stream_slicer_parameters_on_paginated_requests or False
)

if model.lazy_read_pointer and not bool(
self._connector_state_manager.get_stream_state(name, None)
):
lazy_read_pointer = [
InterpolatedString.create(path, parameters=model.parameters or {})
for path in model.lazy_read_pointer
]
partition_router = self._create_component_from_model(
model=model.partition_router, config=config
)
stream_slicer = (
self._create_component_from_model(model=incremental_sync, config=config)
if incremental_sync
else SinglePartitionRouter(parameters={})
)

return LazySimpleRetriever(
name=name,
paginator=paginator,
primary_key=primary_key,
requester=requester,
record_selector=record_selector,
stream_slicer=stream_slicer,
request_option_provider=request_options_provider,
cursor=cursor,
config=config,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
parameters=model.parameters or {},
partition_router=partition_router,
lazy_read_pointer=lazy_read_pointer,
)

if self._limit_slices_fetched or self._emit_connector_builder_messages:
return SimpleRetrieverTestReadDecorator(
name=name,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import copy
import logging
from dataclasses import InitVar, dataclass
from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from typing import (
TYPE_CHECKING,
Any,
Iterable,
List,
Mapping,
MutableMapping,
Optional,
Tuple,
Union,
)

import dpath

Expand Down Expand Up @@ -131,6 +143,42 @@ def _get_request_option(
parent_config.request_option.inject_into_request(params, value, self.config)
return params

def process_parent_record(
self, parent_record: Union[AirbyteMessage, Record, Mapping], parent_stream_name: str
) -> Tuple[Optional[Mapping], Optional[Mapping]]:
"""
Processes and extracts data from a parent record, handling different record types
and ensuring only valid types proceed.

:param parent_record: The parent record to process.
:param parent_stream_name: The parent stream name associated with the record.
:return: Extracted record data and partition (if applicable).
:raises AirbyteTracedException: If the record type is invalid.
"""
if isinstance(parent_record, AirbyteMessage):
self.logger.warning(
f"Parent stream {parent_stream_name} returns records of type AirbyteMessage. "
f"This SubstreamPartitionRouter is not able to checkpoint incremental parent state."
)
if parent_record.type == MessageType.RECORD:
return parent_record.record.data, {}
return None, None # Skip invalid or non-record data

# Handle Record type
if isinstance(parent_record, Record):
parent_partition = (
parent_record.associated_slice.partition if parent_record.associated_slice else {}
)
return parent_record.data, parent_partition

# Validate the record type
if not isinstance(parent_record, Mapping):
raise AirbyteTracedException(
message=f"Parent stream returned records as invalid type {type(parent_record)}"
)

return parent_record, {}

def stream_slices(self) -> Iterable[StreamSlice]:
"""
Iterate over each parent stream's record and create a StreamSlice for each record.
Expand Down Expand Up @@ -163,28 +211,15 @@ def stream_slices(self) -> Iterable[StreamSlice]:
# read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
# not support either substreams or RFR, but something that needs to be considered once we do
for parent_record in parent_stream.read_only_records():
parent_partition = None
# Skip non-records (eg AirbyteLogMessage)
if isinstance(parent_record, AirbyteMessage):
self.logger.warning(
f"Parent stream {parent_stream.name} returns records of type AirbyteMessage. This SubstreamPartitionRouter is not able to checkpoint incremental parent state."
)
if parent_record.type == MessageType.RECORD:
parent_record = parent_record.record.data # type: ignore[union-attr, assignment] # record is always a Record
else:
continue
elif isinstance(parent_record, Record):
parent_partition = (
parent_record.associated_slice.partition
if parent_record.associated_slice
else {}
)
parent_record = parent_record.data
elif not isinstance(parent_record, Mapping):
# The parent_record should only take the form of a Record, AirbyteMessage, or Mapping. Anything else is invalid
raise AirbyteTracedException(
message=f"Parent stream returned records as invalid type {type(parent_record)}"
)
# Process the parent record
parent_record, parent_partition = self.process_parent_record(
parent_record, parent_stream.name
)

# Skip invalid or non-record data
if parent_record is None:
continue

try:
partition_value = dpath.get(
parent_record, # type: ignore [arg-type]
Expand Down
3 changes: 2 additions & 1 deletion airbyte_cdk/sources/declarative/retrievers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import (
LazySimpleRetriever,
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)

__all__ = ["Retriever", "SimpleRetriever", "SimpleRetrieverTestReadDecorator", "AsyncRetriever"]
__all__ = ["Retriever", "SimpleRetriever", "SimpleRetrieverTestReadDecorator", "AsyncRetriever", "LazySimpleRetriever"]
139 changes: 139 additions & 0 deletions airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from itertools import islice
from typing import Any, Callable, Iterable, List, Mapping, Optional, Set, Tuple, Union

import dpath
import requests

from airbyte_cdk.models import AirbyteMessage
Expand All @@ -18,6 +19,9 @@
from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import (
SinglePartitionRouter,
)
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
SubstreamPartitionRouter,
)
from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.requesters.request_options import (
Expand Down Expand Up @@ -618,3 +622,138 @@ def _fetch_next_page(
self.name,
),
)


class SafeResponse(requests.Response):
def __getattr__(self, name):
return getattr(requests.Response, name, None)

@property
def content(self):
return super().content

@content.setter
def content(self, value):
self._content = value.encode() if isinstance(value, str) else value


@dataclass
class LazySimpleRetriever(SimpleRetriever):
"""
A retriever that supports lazy loading from parent streams.
"""

partition_router: SubstreamPartitionRouter = field(init=True, repr=False, default=None)
lazy_read_pointer: Optional[List[InterpolatedString]] = None

def _read_pages(
self,
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
stream_state: Mapping[str, Any],
stream_slice: StreamSlice,
) -> Iterable[Record]:
parent_stream_config = self.partition_router.parent_stream_configs[-1]
parent_stream = parent_stream_config.stream

for parent_record in parent_stream.read_only_records():
parent_record, parent_partition = self.partition_router.process_parent_record(
parent_record, parent_stream.name
)
if parent_record is None:
continue

childs = self._extract_child_records(parent_record)
response = self._create_response(childs)

yield from self._yield_records_with_pagination(
response,
records_generator_fn,
stream_state,
stream_slice,
parent_record,
parent_stream_config,
)

yield from []

def _extract_child_records(self, parent_record: Mapping) -> Mapping:
"""Extract child records from a parent record based on lazy pointers."""
if not self.lazy_read_pointer:
return parent_record

path = [path.eval(self.config) for path in self.lazy_read_pointer]
return (
dpath.values(parent_record, path)
if "*" in path
else dpath.get(parent_record, path, default=[])
)

def _create_response(self, data: Mapping) -> SafeResponse:
"""Create a SafeResponse with the given data."""
response = SafeResponse()
response.content = json.dumps(data).encode("utf-8")
response.status_code = 200
return response

def _yield_records_with_pagination(
self,
response: requests.Response,
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
stream_state: Mapping[str, Any],
stream_slice: StreamSlice,
parent_record: Record,
parent_stream_config: Any,
) -> Iterable[Record]:
"""Yield records, handling pagination if needed."""
last_page_size, last_record = 0, None

for record in records_generator_fn(response):
last_page_size += 1
last_record = record
yield record

next_page_token = self._next_page_token(response, last_page_size, last_record, None)
if next_page_token:
yield from self._paginate(
next_page_token,
records_generator_fn,
stream_state,
stream_slice,
parent_record,
parent_stream_config,
)

def _paginate(
self,
next_page_token: Any,
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
stream_state: Mapping[str, Any],
stream_slice: StreamSlice,
parent_record: Record,
parent_stream_config: Any,
) -> Iterable[Record]:
"""Handle pagination by fetching subsequent pages."""
partition_field = parent_stream_config.partition_field.eval(self.config)
partition_value = dpath.get(
parent_record, parent_stream_config.parent_key.eval(self.config)
)
stream_slice = StreamSlice(
partition={partition_field: partition_value, "parent_slice": {}},
cursor_slice=stream_slice.cursor_slice,
)

while next_page_token:
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
last_page_size, last_record = 0, None

for record in records_generator_fn(response):
last_page_size += 1
last_record = record
yield record

last_page_token_value = (
next_page_token.get("next_page_token") if next_page_token else None
)
next_page_token = self._next_page_token(
response, last_page_size, last_record, last_page_token_value
)
Loading