Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add lazy read to simple retriver #418

Merged
merged 30 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
956ab46
Add lazy read to constructor
lazebnyi Mar 13, 2025
3723a3f
Add implementation
lazebnyi Mar 13, 2025
041632d
Update partition router with process_parent_record
lazebnyi Mar 13, 2025
604dbe8
Re-generate models
lazebnyi Mar 13, 2025
dae0b98
Auto-fix lint and format issues
Mar 13, 2025
40efd79
Add LazySimpleRetriever to retrivers
lazebnyi Mar 13, 2025
65b0546
Add lazy_read_pointer model
lazebnyi Mar 13, 2025
74dbe15
Fix mypy
lazebnyi Mar 13, 2025
b537602
Fix typo
lazebnyi Mar 13, 2025
33b9f83
Auto-fix lint and format issues
Mar 13, 2025
af80b23
Fix mypy after fromating
lazebnyi Mar 13, 2025
2e37296
Merge master to branch
lazebnyi Mar 13, 2025
ed4ea74
Add unit tests
lazebnyi Mar 13, 2025
4f03269
Auto-fix lint and format issues
Mar 13, 2025
f19d048
Add extra_fields support
lazebnyi Mar 13, 2025
ad75c52
Merge branch 'lazebnyi/add-lazy-read-to-simple-retriver' of github.co…
lazebnyi Mar 13, 2025
9664679
Fix child extraction
lazebnyi Mar 13, 2025
2f31d73
Refactor lazy read
lazebnyi Mar 13, 2025
b278dcd
Auto-fix lint and format issues
Mar 13, 2025
b06800f
Update some conditions
lazebnyi Mar 13, 2025
11533d8
Fix mypy
lazebnyi Mar 13, 2025
1a10bb6
Auto-fix lint and format issues
Mar 13, 2025
c903420
Fix unittest
lazebnyi Mar 13, 2025
864f194
Merge branch 'lazebnyi/add-lazy-read-to-simple-retriver' of github.co…
lazebnyi Mar 13, 2025
4687726
Fix typo
lazebnyi Mar 13, 2025
0b49ac4
Merge branch 'main' into lazebnyi/add-lazy-read-to-simple-retriver
lazebnyi Mar 13, 2025
3d31e27
Auto-fix lint and format issues
Mar 13, 2025
0829662
Add docs to SafeResponse
lazebnyi Mar 13, 2025
0a95ed3
Merge branch 'lazebnyi/add-lazy-read-to-simple-retriver' of github.co…
lazebnyi Mar 13, 2025
ad521a0
Auto-fix lint and format issues
Mar 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3208,6 +3208,15 @@ definitions:
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/ZipfileDecoder"
lazy_read_pointer:
title: Lazy Read Pointer
description: If set, this will enable lazy reading, using the initial read of parent records to extract child records.
type: array
default: [ ]
items:
- type: string
interpolation_context:
- config
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,9 @@ class OAuthAuthenticator(BaseModel):
scopes: Optional[List[str]] = Field(
None,
description="List of scopes that should be granted to the access token.",
examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]],
examples=[
["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]
],
title="Scopes",
)
token_expiry_date: Optional[str] = Field(
Expand Down Expand Up @@ -1078,24 +1080,28 @@ class OAuthConfigSpecification(BaseModel):
class Config:
extra = Extra.allow

oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
examples=[
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
{
"app_id": {
"type": "string",
"path_in_connector_config": ["info", "app_id"],
}
},
],
title="OAuth user input",
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = (
Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
examples=[
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
{
"app_id": {
"type": "string",
"path_in_connector_config": ["info", "app_id"],
}
},
],
title="OAuth user input",
)
)
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field(
None,
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
title="DeclarativeOAuth Connector Specification",
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = (
Field(
None,
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
title="DeclarativeOAuth Connector Specification",
)
)
complete_oauth_output_specification: Optional[Dict[str, Any]] = Field(
None,
Expand All @@ -1113,7 +1119,9 @@ class Config:
complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }",
examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}],
examples=[
{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}
],
title="OAuth input specification",
)
complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field(
Expand Down Expand Up @@ -1766,7 +1774,9 @@ class RecordSelector(BaseModel):
description="Responsible for filtering records to be emitted by the Source.",
title="Record Filter",
)
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
schema_normalization: Optional[
Union[SchemaNormalization, CustomSchemaNormalization]
] = Field(
SchemaNormalization.None_,
description="Responsible for normalization according to the schema.",
title="Schema Normalization",
Expand Down Expand Up @@ -1987,7 +1997,9 @@ class Config:
description="Component used to fetch data incrementally based on a time field in the data.",
title="Incremental Sync",
)
name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name")
name: Optional[str] = Field(
"", description="The stream name.", example=["Users"], title="Name"
)
primary_key: Optional[PrimaryKey] = Field(
"", description="The primary key of the stream.", title="Primary Key"
)
Expand Down Expand Up @@ -2261,7 +2273,11 @@ class SimpleRetriever(BaseModel):
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
List[
Union[
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
]
],
]
] = Field(
[],
Expand All @@ -2284,6 +2300,11 @@ class SimpleRetriever(BaseModel):
description="Component decoding the response so records can be extracted.",
title="Decoder",
)
lazy_read_pointer: Optional[List[str]] = Field(
[],
description="If set, this will enable lazy reading, using the initial read of parent records to extract child records.",
title="Lazy Read Pointer",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand All @@ -2305,7 +2326,9 @@ class AsyncRetriever(BaseModel):
)
download_extractor: Optional[
Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor]
] = Field(None, description="Responsible for fetching the records from provided urls.")
] = Field(
None, description="Responsible for fetching the records from provided urls."
)
creation_requester: Union[CustomRequester, HttpRequester] = Field(
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
Expand Down Expand Up @@ -2339,7 +2362,11 @@ class AsyncRetriever(BaseModel):
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
List[
Union[
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
]
],
]
] = Field(
[],
Expand Down Expand Up @@ -2407,10 +2434,12 @@ class DynamicDeclarativeStream(BaseModel):
stream_template: DeclarativeStream = Field(
..., description="Reference to the stream template.", title="Stream Template"
)
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field(
...,
description="Component resolve and populates stream templates with components values.",
title="Components Resolver",
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = (
Field(
...,
description="Component resolve and populates stream templates with components values.",
title="Components Resolver",
)
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@
)
from airbyte_cdk.sources.declarative.retrievers import (
AsyncRetriever,
LazySimpleRetriever,
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)
Expand Down Expand Up @@ -2647,6 +2648,12 @@ def create_simple_retriever(
stop_condition_on_cursor: bool = False,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
transformations: List[RecordTransformation],
incremental_sync: Optional[
Union[
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
]
] = None,
**kwargs: Any,
) -> SimpleRetriever:
decoder = (
self._create_component_from_model(model=model.decoder, config=config)
Expand Down Expand Up @@ -2704,6 +2711,38 @@ def create_simple_retriever(
model.ignore_stream_slicer_parameters_on_paginated_requests or False
)

if model.lazy_read_pointer and not bool(
self._connector_state_manager.get_stream_state(name, None)
):
lazy_read_pointer = [
InterpolatedString.create(path, parameters=model.parameters or {})
for path in model.lazy_read_pointer
]
partition_router = self._create_component_from_model(
model=model.partition_router, config=config
)
stream_slicer = (
self._create_component_from_model(model=incremental_sync, config=config)
if incremental_sync
else SinglePartitionRouter(parameters={})
)

return LazySimpleRetriever(
name=name,
paginator=paginator,
primary_key=primary_key,
requester=requester,
record_selector=record_selector,
stream_slicer=stream_slicer,
request_option_provider=request_options_provider,
cursor=cursor,
config=config,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
parameters=model.parameters or {},
partition_router=partition_router,
lazy_read_pointer=lazy_read_pointer,
)

if self._limit_slices_fetched or self._emit_connector_builder_messages:
return SimpleRetrieverTestReadDecorator(
name=name,
Expand Down
Loading
Loading