Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add lazy read to simple retriver #418

Merged
merged 30 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
956ab46
Add lazy read to constructor
lazebnyi Mar 13, 2025
3723a3f
Add implementation
lazebnyi Mar 13, 2025
041632d
Update partition router with process_parent_record
lazebnyi Mar 13, 2025
604dbe8
Re-generate models
lazebnyi Mar 13, 2025
dae0b98
Auto-fix lint and format issues
Mar 13, 2025
40efd79
Add LazySimpleRetriever to retrivers
lazebnyi Mar 13, 2025
65b0546
Add lazy_read_pointer model
lazebnyi Mar 13, 2025
74dbe15
Fix mypy
lazebnyi Mar 13, 2025
b537602
Fix typo
lazebnyi Mar 13, 2025
33b9f83
Auto-fix lint and format issues
Mar 13, 2025
af80b23
Fix mypy after fromating
lazebnyi Mar 13, 2025
2e37296
Merge master to branch
lazebnyi Mar 13, 2025
ed4ea74
Add unit tests
lazebnyi Mar 13, 2025
4f03269
Auto-fix lint and format issues
Mar 13, 2025
f19d048
Add extra_fields support
lazebnyi Mar 13, 2025
ad75c52
Merge branch 'lazebnyi/add-lazy-read-to-simple-retriver' of github.co…
lazebnyi Mar 13, 2025
9664679
Fix child extraction
lazebnyi Mar 13, 2025
2f31d73
Refactor lazy read
lazebnyi Mar 13, 2025
b278dcd
Auto-fix lint and format issues
Mar 13, 2025
b06800f
Update some conditions
lazebnyi Mar 13, 2025
11533d8
Fix mypy
lazebnyi Mar 13, 2025
1a10bb6
Auto-fix lint and format issues
Mar 13, 2025
c903420
Fix unittest
lazebnyi Mar 13, 2025
864f194
Merge branch 'lazebnyi/add-lazy-read-to-simple-retriver' of github.co…
lazebnyi Mar 13, 2025
4687726
Fix typo
lazebnyi Mar 13, 2025
0b49ac4
Merge branch 'main' into lazebnyi/add-lazy-read-to-simple-retriver
lazebnyi Mar 13, 2025
3d31e27
Auto-fix lint and format issues
Mar 13, 2025
0829662
Add docs to SafeResponse
lazebnyi Mar 13, 2025
0a95ed3
Merge branch 'lazebnyi/add-lazy-read-to-simple-retriver' of github.co…
lazebnyi Mar 13, 2025
ad521a0
Auto-fix lint and format issues
Mar 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3208,6 +3208,15 @@ definitions:
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/ZipfileDecoder"
lazy_read_pointer:
title: Lazy Read Pointer
description: If set, this will enable lazy reading, using the initial read of parent records to extract child records.
type: array
default: [ ]
items:
- type: string
interpolation_context:
- config
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2284,6 +2284,11 @@ class SimpleRetriever(BaseModel):
description="Component decoding the response so records can be extracted.",
title="Decoder",
)
lazy_read_pointer: Optional[List[str]] = Field(
[],
description="If set, this will enable lazy reading, using the initial read of parent records to extract child records.",
title="Lazy Read Pointer",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@
)
from airbyte_cdk.sources.declarative.retrievers import (
AsyncRetriever,
LazySimpleRetriever,
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)
Expand Down Expand Up @@ -2647,6 +2648,12 @@ def create_simple_retriever(
stop_condition_on_cursor: bool = False,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
transformations: List[RecordTransformation],
incremental_sync: Optional[
Union[
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
]
] = None,
**kwargs: Any,
) -> SimpleRetriever:
decoder = (
self._create_component_from_model(model=model.decoder, config=config)
Expand Down Expand Up @@ -2704,6 +2711,51 @@ def create_simple_retriever(
model.ignore_stream_slicer_parameters_on_paginated_requests or False
)

if model.lazy_read_pointer and not hasattr(model, "partition_router"):
raise ValueError(
"LazySimpleRetriever requires a 'partition_router' when 'lazy_read_pointer' is set. "
"Please either define 'partition_router' or remove 'lazy_read_pointer' from the model."
)

if model.lazy_read_pointer and not bool(
self._connector_state_manager.get_stream_state(name, None)
):
if model.partition_router.type != "SubstreamPartitionRouterModel": # type: ignore[union-attr] # model.partition_router has BaseModel type
raise ValueError(
"LazySimpleRetriever only supports 'SubstreamPartitionRouterModel' as the 'partition_router' type. " # type: ignore[union-attr] # model.partition_router has BaseModel type
f"Found: '{model.partition_router.type}'."
)

lazy_read_pointer = [
InterpolatedString.create(path, parameters=model.parameters or {})
for path in model.lazy_read_pointer
]
partition_router = self._create_component_from_model(
model=model.partition_router, # type: ignore[arg-type] # model.partition_router has BaseModel type
config=config, # type: ignore[arg-type]
)
stream_slicer = (
self._create_component_from_model(model=incremental_sync, config=config)
if incremental_sync
else SinglePartitionRouter(parameters={})
)

return LazySimpleRetriever(
name=name,
paginator=paginator,
primary_key=primary_key,
requester=requester,
record_selector=record_selector,
stream_slicer=stream_slicer,
request_option_provider=request_options_provider,
cursor=cursor,
config=config,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
parameters=model.parameters or {},
partition_router=partition_router,
lazy_read_pointer=lazy_read_pointer,
)

if self._limit_slices_fetched or self._emit_connector_builder_messages:
return SimpleRetrieverTestReadDecorator(
name=name,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import copy
import logging
from dataclasses import InitVar, dataclass
from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from typing import (
TYPE_CHECKING,
Any,
Iterable,
List,
Mapping,
MutableMapping,
Optional,
Tuple,
Union,
)

import dpath

Expand Down Expand Up @@ -131,6 +143,42 @@ def _get_request_option(
parent_config.request_option.inject_into_request(params, value, self.config)
return params

def process_parent_record(
self,
parent_record: Union[AirbyteMessage, Record, Mapping[str, Any]],
parent_stream_name: str,
) -> Tuple[Optional[MutableMapping[str, Any]], Optional[MutableMapping[str, Any]]]:
"""
Processes and extracts data from a parent record, handling different record types
and ensuring only valid types proceed.

:param parent_record: The parent record to process.
:param parent_stream_name: The parent stream name associated with the record.
:return: Extracted record data and partition (if applicable).
:raises AirbyteTracedException: If the record type is invalid.
"""
if isinstance(parent_record, AirbyteMessage):
self.logger.warning(
f"Parent stream {parent_stream_name} returns records of type AirbyteMessage. "
f"This SubstreamPartitionRouter is not able to checkpoint incremental parent state."
)
if parent_record.type == MessageType.RECORD:
return parent_record.record.data, {} # type: ignore[union-attr] # parent_record.record is always AirbyteRecordMessage
return None, None # Skip invalid or non-record data

if isinstance(parent_record, Record):
parent_partition = (
parent_record.associated_slice.partition if parent_record.associated_slice else {}
)
return {**parent_record.data}, {**parent_partition}

if isinstance(parent_record, Mapping):
return {**parent_record}, {}

raise AirbyteTracedException(
message=f"Parent stream returned records as invalid type {type(parent_record)}"
)

def stream_slices(self) -> Iterable[StreamSlice]:
"""
Iterate over each parent stream's record and create a StreamSlice for each record.
Expand Down Expand Up @@ -162,29 +210,16 @@ def stream_slices(self) -> Iterable[StreamSlice]:

# read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
# not support either substreams or RFR, but something that needs to be considered once we do
for parent_record in parent_stream.read_only_records():
parent_partition = None
# Skip non-records (eg AirbyteLogMessage)
if isinstance(parent_record, AirbyteMessage):
self.logger.warning(
f"Parent stream {parent_stream.name} returns records of type AirbyteMessage. This SubstreamPartitionRouter is not able to checkpoint incremental parent state."
)
if parent_record.type == MessageType.RECORD:
parent_record = parent_record.record.data # type: ignore[union-attr, assignment] # record is always a Record
else:
continue
elif isinstance(parent_record, Record):
parent_partition = (
parent_record.associated_slice.partition
if parent_record.associated_slice
else {}
)
parent_record = parent_record.data
elif not isinstance(parent_record, Mapping):
# The parent_record should only take the form of a Record, AirbyteMessage, or Mapping. Anything else is invalid
raise AirbyteTracedException(
message=f"Parent stream returned records as invalid type {type(parent_record)}"
)
for raw_parent_record in parent_stream.read_only_records():
# Process the parent record
parent_record, parent_partition = self.process_parent_record(
raw_parent_record, parent_stream.name
)

# Skip invalid or non-record data
if parent_record is None:
continue

try:
partition_value = dpath.get(
parent_record, # type: ignore [arg-type]
Expand Down
9 changes: 8 additions & 1 deletion airbyte_cdk/sources/declarative/retrievers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,15 @@
from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import (
LazySimpleRetriever,
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)

__all__ = ["Retriever", "SimpleRetriever", "SimpleRetrieverTestReadDecorator", "AsyncRetriever"]
__all__ = [
"Retriever",
"SimpleRetriever",
"SimpleRetrieverTestReadDecorator",
"AsyncRetriever",
"LazySimpleRetriever",
]
Loading
Loading