Skip to content

fix: (CDK) (Manifest) - Add deprecations support and handle deprecation warnings; deprecate url_base and path for HttpRequester #486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
25 changes: 23 additions & 2 deletions airbyte_cdk/connector_builder/test_reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,16 @@ def run_test_read(
record_limit = self._check_record_limit(record_limit)
# The connector builder currently only supports reading from a single stream at a time
stream = source.streams(config)[0]

# get any deprecation warnings during the component creation
deprecation_warnings: List[AirbyteLogMessage] = source.deprecation_warnings()

schema_inferrer = SchemaInferrer(
self._pk_to_nested_and_composite_field(stream.primary_key),
self._cursor_field_to_nested_and_composite_field(stream.cursor_field),
)
datetime_format_inferrer = DatetimeFormatInferrer()

message_group = get_message_groups(
self._read_stream(source, config, configured_catalog, state),
schema_inferrer,
Expand All @@ -125,7 +130,7 @@ def run_test_read(
)

slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(
message_group
message_group, deprecation_warnings
)
schema, log_messages = self._get_infered_schema(
configured_catalog, schema_inferrer, log_messages
Expand Down Expand Up @@ -238,7 +243,11 @@ def _check_record_limit(self, record_limit: Optional[int] = None) -> int:

return record_limit

def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES:
def _categorise_groups(
self,
message_groups: MESSAGE_GROUPS,
deprecation_warnings: Optional[List[Any]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it seems like the deprecation warnings are not necessary for categorizing groups. Should we just do log_messages.append(deprecation_warnings) from run_test_read?

) -> GROUPED_MESSAGES:
"""
Categorizes a sequence of message groups into slices, log messages, auxiliary requests, and the latest configuration update.

Expand Down Expand Up @@ -269,6 +278,7 @@ def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES
auxiliary_requests = []
latest_config_update: Optional[AirbyteControlMessage] = None

# process the message groups first
for message_group in message_groups:
match message_group:
case AirbyteLogMessage():
Expand Down Expand Up @@ -298,6 +308,17 @@ def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES
case _:
raise ValueError(f"Unknown message group type: {type(message_group)}")

# process deprecation warnings, if present
if deprecation_warnings is not None:
for deprecation in deprecation_warnings:
match deprecation:
case AirbyteLogMessage():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why is the typing here different than from line 117? It seems like we wouldn't have to worry about the type not matching here if we could rely on the type.

log_messages.append(
LogMessage(message=deprecation.message, level=deprecation.level.value)
)
case _:
raise ValueError(f"Unknown message group type: {type(deprecation)}")

return slices, log_messages, auxiliary_requests, latest_config_update

def _get_infered_schema(
Expand Down
29 changes: 25 additions & 4 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1863,14 +1863,15 @@ definitions:
type: object
required:
- type
- url_base
properties:
type:
type: string
enum: [HttpRequester]
url_base:
deprecated: true
deprecation_message: "Use `url` field instead."
title: API Base URL
description: Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
description: Deprecated, use the `url` instead. Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
type: string
interpolation_context:
- config
Expand All @@ -1886,9 +1887,29 @@ definitions:
- "{{ config['base_url'] or 'https://app.posthog.com'}}/api"
- "https://connect.squareup.com/v2/quotes/{{ stream_partition['id'] }}/quote_line_groups"
- "https://example.com/api/v1/resource/{{ next_page_token['id'] }}"
url:
title: The URL of an API endpoint
description: The URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
type: string
interpolation_context:
- config
- next_page_token
- stream_interval
- stream_partition
- stream_slice
- creation_response
- polling_response
- download_target
examples:
- "https://connect.squareup.com/v2"
- "{{ config['url'] or 'https://app.posthog.com'}}/api"
- "https://connect.squareup.com/v2/quotes/{{ stream_partition['id'] }}/quote_line_groups"
- "https://example.com/api/v1/resource/{{ next_page_token['id'] }}"
path:
deprecated: true
deprecation_message: "Use `url` field instead."
title: URL Path
description: Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
description: Deprecated, use the `url` instead. Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
type: string
interpolation_context:
- config
Expand Down Expand Up @@ -4158,4 +4179,4 @@ interpolation:
regex: The regular expression to search for. It must include a capture group.
return_type: str
examples:
- '{{ "goodbye, cruel world" | regex_search("goodbye,\s(.*)$") }} -> "cruel world"'
- '{{ "goodbye, cruel world" | regex_search("goodbye,\s(.*)$") }} -> "cruel world"'
11 changes: 10 additions & 1 deletion airbyte_cdk/sources/declarative/declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

import logging
from abc import abstractmethod
from typing import Any, Mapping, Tuple
from typing import Any, List, Mapping, Tuple

from airbyte_cdk.models import (
AirbyteLogMessage,
)
from airbyte_cdk.sources.abstract_source import AbstractSource
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker

Expand Down Expand Up @@ -34,3 +37,9 @@ def check_connection(
The error object will be cast to string to display the problem to the user.
"""
return self.connection_checker.check_connection(self, logger, config)

def deprecation_warnings(self) -> List[AirbyteLogMessage]:
"""
Returns a list of deprecation warnings for the source.
"""
return []
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteLogMessage,
AirbyteMessage,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
Expand Down Expand Up @@ -123,6 +124,9 @@ def dynamic_streams(self) -> List[Dict[str, Any]]:
manifest=self._source_config, config=self._config, with_dynamic_stream_name=True
)

def deprecation_warnings(self) -> List[AirbyteLogMessage]:
return self._constructor.get_model_deprecations()

@property
def connection_checker(self) -> ConnectionChecker:
check = self._source_config["check"]
Expand Down
131 changes: 131 additions & 0 deletions airbyte_cdk/sources/declarative/models/base_model_with_deprecations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# THIS IS A STATIC CLASS MODEL USED TO DISPLAY DEPRECATION WARNINGS
# WHEN DEPRECATED FIELDS ARE ACCESSED

import warnings
from typing import Any, List

from pydantic.v1 import BaseModel

from airbyte_cdk.models import (
AirbyteLogMessage,
Level,
)

# format the warning message
warnings.formatwarning = (
lambda message, category, *args, **kwargs: f"{category.__name__}: {message}"
)

FIELDS_TAG = "__fields__"
DEPRECATED = "deprecated"
DEPRECATION_MESSAGE = "deprecation_message"
DEPRECATION_LOGS_TAG = "_deprecation_logs"


class BaseModelWithDeprecations(BaseModel):
"""
Pydantic BaseModel that warns when deprecated fields are accessed.
The deprecation message is stored in the field's extra attributes.
This class is used to create models that can have deprecated fields
and show warnings when those fields are accessed or initialized.

The `_deprecation_logs` attribute is stored in the model itself.
The collected deprecation warnings are further propagated to the Airbyte log messages,
during the component creation process, in `model_to_component._collect_model_deprecations()`.

The component implementation is not responsible for handling the deprecation warnings,
since the deprecation warnings are already handled in the model itself.
"""

class Config:
"""
Allow extra fields in the model. In case the model restricts extra fields.
"""

extra = "allow"

def __init__(self, **model_fields: Any) -> None:
"""
Show warnings for deprecated fields during component initialization.
"""
# call the parent constructor first to initialize Pydantic internals
super().__init__(**model_fields)
# set the placeholder for the deprecation logs
self._deprecation_logs: List[AirbyteLogMessage] = []
# process deprecated fields, if present
self._process_fields(model_fields)
# set the deprecation logs attribute to the model
self._set_deprecation_logs_attr_to_model()

def _is_deprecated_field(self, field_name: str) -> bool:
return (
self.__fields__[field_name].field_info.extra.get(DEPRECATED, False)
if field_name in self.__fields__.keys()
else False
)

def _get_deprecation_message(self, field_name: str) -> str:
return (
self.__fields__[field_name].field_info.extra.get(
DEPRECATION_MESSAGE, "<missing_deprecation_message>"
)
if field_name in self.__fields__.keys()
else "<missing_deprecation_message>"
)

def _process_fields(self, model_fields: Any) -> None:
"""
Processes the fields in the provided model data, checking for deprecated fields.

For each field in the input `model_fields`, this method checks if the field exists in the model's defined fields.
If the field is marked as deprecated (using the `DEPRECATED` flag in its metadata), it triggers a deprecation warning
by calling the `_create_warning` method with the field name and an optional deprecation message.

Args:
model_fields (Any): The data containing fields to be processed.

Returns:
None
"""

if hasattr(self, FIELDS_TAG):
for field_name in model_fields.keys():
if self._is_deprecated_field(field_name):
self._create_warning(
field_name,
self._get_deprecation_message(field_name),
)

def _set_deprecation_logs_attr_to_model(self) -> None:
"""
Sets the deprecation logs attribute on the model instance.

This method attaches the current instance's deprecation logs to the model by setting
an attribute named by `DEPRECATION_LOGS_TAG` to the value of `self._deprecation_logs`.
This is typically used to track or log deprecated features or configurations within the model.

Returns:
None
"""
setattr(self, DEPRECATION_LOGS_TAG, self._deprecation_logs)

def _create_warning(self, field_name: str, message: str) -> None:
"""
Show a warning message for deprecated fields (to stdout).
Args:
field_name (str): Name of the deprecated field.
message (str): Warning message to be displayed.
"""

message = f"Component type: `{self.__class__.__name__}`. Field '{field_name}' is deprecated. {message}"
# Emit a warning message for deprecated fields (to stdout) (Python Default behavior)
warnings.warn(message, DeprecationWarning)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know about the warnings module. Will this be printed in stdout? If so, this does not align with the Airbyte protocol. Should we care about this?

# Create an Airbyte deprecation log message
deprecation_log_message = AirbyteLogMessage(level=Level.WARN, message=message)
# Add the deprecation message to the Airbyte log messages,
# this logs are displayed in the Connector Builder.
if deprecation_log_message not in self._deprecation_logs:
# Avoid duplicates in the deprecation logs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this is instantiate once per model, I assume this will log multiple times even if it is the same error. Should we have _deprecation_logs as a private module variable instead of a private class field to avoid that?

I see that the model_to_component_factory has a way to avoid duplication but is it handled internally by warnings.warn for the print to stdout?

self._deprecation_logs.append(deprecation_log_message)
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand All @@ -10,6 +8,10 @@

from pydantic.v1 import BaseModel, Extra, Field

from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import (
BaseModelWithDeprecations,
)


class AuthFlowType(Enum):
oauth2_0 = "oauth2.0"
Expand Down Expand Up @@ -880,20 +882,17 @@ class FlattenFields(BaseModel):


class KeyTransformation(BaseModel):
prefix: Optional[Union[str, None]] = Field(
type: Literal["KeyTransformation"]
prefix: Optional[str] = Field(
Copy link
Contributor Author

@bazarnov bazarnov Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These corrects are auto-generated based on the declarative_component_schema.yaml as of this change and are not the part of the PR!

None,
description="Prefix to add for object keys. If not provided original keys remain unchanged.",
examples=[
"flattened_",
],
examples=["flattened_"],
title="Key Prefix",
)
suffix: Optional[Union[str, None]] = Field(
suffix: Optional[str] = Field(
None,
description="Suffix to add for object keys. If not provided original keys remain unchanged.",
examples=[
"_flattened",
],
examples=["_flattened"],
title="Key Suffix",
)

Expand All @@ -916,7 +915,7 @@ class DpathFlattenFields(BaseModel):
description="Whether to replace the origin record or not. Default is False.",
title="Replace Origin Record",
)
key_transformation: Optional[Union[KeyTransformation, None]] = Field(
key_transformation: Optional[KeyTransformation] = Field(
None,
description="Transformation for object keys. If not provided, original key will be used.",
title="Key transformation",
Expand Down Expand Up @@ -2171,11 +2170,13 @@ class SessionTokenAuthenticator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class HttpRequester(BaseModel):
class HttpRequester(BaseModelWithDeprecations):
type: Literal["HttpRequester"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are related to the deprecation functionality being added by this PR.

url_base: str = Field(
...,
description="Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.",
url_base: Optional[str] = Field(
None,
deprecated=True,
deprecation_message="Use `url` field instead.",
description="Deprecated, use the `url` instead. Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.",
examples=[
"https://connect.squareup.com/v2",
"{{ config['base_url'] or 'https://app.posthog.com'}}/api",
Expand All @@ -2184,9 +2185,22 @@ class HttpRequester(BaseModel):
],
title="API Base URL",
)
url: Optional[str] = Field(
None,
description="The URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.",
examples=[
"https://connect.squareup.com/v2",
"{{ config['url'] or 'https://app.posthog.com'}}/api",
"https://connect.squareup.com/v2/quotes/{{ stream_partition['id'] }}/quote_line_groups",
"https://example.com/api/v1/resource/{{ next_page_token['id'] }}",
],
title="API URL",
)
path: Optional[str] = Field(
None,
description="Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.",
deprecated=True,
deprecation_message="Use `url` field instead.",
description="Deprecated, use the `url` instead. Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.",
examples=[
"/products",
"/quotes/{{ stream_partition['id'] }}/quote_line_groups",
Expand Down
Loading
Loading