Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Low-Code CDK Property Chunking): Allow fetching query properties and property chunking for low-code sources #452

Merged
merged 13 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 131 additions & 1 deletion airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,18 @@ definitions:
$parameters:
type: object
additionalProperties: true
EmitPartialRecordMergeStrategy:
title: Emit Partial Record
description: Record merge strategy where in the case where multiple requests are needed to retrieve all properties, properties are not consolidated back into a single record and are instead emitted as separate groups of properties. This strategy should only be used when records do not have a unique identifier like a primary key.
required:
- type
properties:
type:
type: string
enum: [EmitPartialRecordMergeStrategy]
$parameters:
type: object
additionalProperties: true
JwtAuthenticator:
title: JWT Authenticator
description: Authenticator for requests using JWT authentication flow.
Expand Down Expand Up @@ -1752,6 +1764,30 @@ definitions:
$parameters:
type: object
additionalProperties: true
GroupByKeyMergeStrategy:
title: Group by Key
description: Record merge strategy that combines records according to fields on the record.
required:
- type
- key
properties:
type:
type: string
enum: [GroupByKeyMergeStrategy]
key:
title: Key
description: The name of the field on the record whose value will be used to group properties that were retrieved through multiple API requests.
anyOf:
- type: string
- type: array
items:
type: string
examples:
- "id"
- ["parent_id", "end_date"]
$parameters:
type: object
additionalProperties: true
SessionTokenAuthenticator:
type: object
required:
Expand Down Expand Up @@ -1971,7 +2007,9 @@ definitions:
- type: string
- type: object
additionalProperties:
type: string
anyOf:
- type: string
- $ref": "#/definitions/QueryProperties"
interpolation_context:
- next_page_token
- stream_interval
Expand Down Expand Up @@ -2989,6 +3027,98 @@ definitions:
examples:
- id
- ["code", "type"]
PropertiesFromEndpoint:
title: Properties from Endpoint
description: Defines the behavior for fetching the list of properties from an API that will be loaded into the requests to extract records.
type: object
required:
- type
- property_field_path
- retriever
properties:
type:
type: string
enum: [PropertiesFromEndpoint]
property_field_path:
description: Describes the path to the field that should be extracted
type: array
items:
type: string
examples:
- ["name"]
interpolation_context:
- config
- parameters
retriever:
description: Requester component that describes how to fetch the properties to query from a remote API endpoint.
anyOf:
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
$parameters:
type: object
additionalProperties: true
PropertyChunking:
title: Property Chunking
description: For APIs with restrictions on the amount of properties that can be requester per request, property chunking can be applied to make multiple requests with a subset of the properties.
type: object
required:
- type
- property_limit_type
properties:
type:
type: string
enum: [PropertyChunking]
property_limit_type:
title: Property Limit Type
description: The type used to determine the maximum number of properties per chunk
enum:
- characters
- property_count
property_limit:
title: Property Limit
description: The maximum amount of properties that can be retrieved per request according to the limit type.
type: integer
record_merge_strategy:
title: Record Merge Strategy
description: Dictates how to records that require multiple requests to get all properties should be emitted to the destination
anyOf:
- "$ref": "#/definitions/EmitPartialRecordMergeStrategy"
- "$ref": "#/definitions/GroupByKeyMergeStrategy"
$parameters:
type: object
additionalProperties: true
QueryProperties:
title: Query Properties
description: For APIs that require explicit specification of the properties to query for, this component specifies which property fields and how they are supplied to outbound requests.
type: object
required:
- type
- property_list
properties:
type:
type: string
enum: [QueryProperties]
property_list:
title: Property List
description: The set of properties that will be queried for in the outbound request. This can either be statically defined or dynamic based on an API endpoint
anyOf:
- type: array
items:
type: string
- "$ref": "#/definitions/PropertiesFromEndpoint"
always_include_properties:
title: Always Include Properties
description: The list of properties that should be included in every set of properties when multiple chunks of properties are being requested.
type: array
items:
type: string
property_chunking:
title: Property Chunking
description: Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.
"$ref": "#/definitions/PropertyChunking"
$parameters:
type: object
additionalProperties: true
RecordFilter:
title: Record Filter
description: Filter applied on a list of records.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -345,6 +347,11 @@ class Clamping(BaseModel):
target_details: Optional[Dict[str, Any]] = None


class EmitPartialRecordMergeStrategy(BaseModel):
type: Literal["EmitPartialRecordMergeStrategy"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class Algorithm(Enum):
HS256 = "HS256"
HS384 = "HS384"
Expand Down Expand Up @@ -718,6 +725,17 @@ class ExponentialBackoffStrategy(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class GroupByKeyMergeStrategy(BaseModel):
type: Literal["GroupByKeyMergeStrategy"]
key: Union[str, List[str]] = Field(
...,
description="The name of the field on the record whose value will be used to group properties that were retrieved through multiple API requests.",
examples=["id", ["parent_id", "end_date"]],
title="Key",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class SessionTokenRequestBearerAuthenticator(BaseModel):
type: Literal["Bearer"]

Expand Down Expand Up @@ -1189,6 +1207,33 @@ class PrimaryKey(BaseModel):
)


class PropertyLimitType(Enum):
characters = "characters"
property_count = "property_count"


class PropertyChunking(BaseModel):
type: Literal["PropertyChunking"]
property_limit_type: PropertyLimitType = Field(
...,
description="The type used to determine the maximum number of properties per chunk",
title="Property Limit Type",
)
property_limit: Optional[int] = Field(
None,
description="The maximum amount of properties that can be retrieved per request according to the limit type.",
title="Property Limit",
)
record_merge_strategy: Optional[
Union[EmitPartialRecordMergeStrategy, GroupByKeyMergeStrategy]
] = Field(
None,
description="Dictates how to records that require multiple requests to get all properties should be emitted to the destination",
title="Record Merge Strategy",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class RecordFilter(BaseModel):
type: Literal["RecordFilter"]
condition: Optional[str] = Field(
Expand Down Expand Up @@ -2187,7 +2232,7 @@ class HttpRequester(BaseModel):
examples=[{"Output-Format": "JSON"}, {"Version": "{{ config['version'] }}"}],
title="Request Headers",
)
request_parameters: Optional[Union[str, Dict[str, str]]] = Field(
request_parameters: Optional[Union[str, Dict[str, Union[str, Any]]]] = Field(
None,
description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.",
examples=[
Expand Down Expand Up @@ -2277,6 +2322,40 @@ class ParentStreamConfig(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class PropertiesFromEndpoint(BaseModel):
type: Literal["PropertiesFromEndpoint"]
property_field_path: List[str] = Field(
...,
description="Describes the path to the field that should be extracted",
examples=[["name"]],
)
retriever: Union[CustomRetriever, SimpleRetriever] = Field(
...,
description="Requester component that describes how to fetch the properties to query from a remote API endpoint.",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class QueryProperties(BaseModel):
type: Literal["QueryProperties"]
property_list: Union[List[str], PropertiesFromEndpoint] = Field(
...,
description="The set of properties that will be queried for in the outbound request. This can either be statically defined or dynamic based on an API endpoint",
title="Property List",
)
always_include_properties: Optional[List[str]] = Field(
None,
description="The list of properties that should be included in every set of properties when multiple chunks of properties are being requested.",
title="Always Include Properties",
)
property_chunking: Optional[PropertyChunking] = Field(
None,
description="Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.",
title="Property Chunking",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class StateDelegatingStream(BaseModel):
type: Literal["StateDelegatingStream"]
name: str = Field(..., description="The stream name.", example=["Users"], title="Name")
Expand Down Expand Up @@ -2525,5 +2604,6 @@ class DynamicDeclarativeStream(BaseModel):
SessionTokenAuthenticator.update_forward_refs()
DynamicSchemaLoader.update_forward_refs()
ParentStreamConfig.update_forward_refs()
PropertiesFromEndpoint.update_forward_refs()
SimpleRetriever.update_forward_refs()
AsyncRetriever.update_forward_refs()
Loading
Loading