Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(file-mode-api: add filename extractor component #453

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 6 additions & 20 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,19 +207,9 @@ def _group_streams(
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
# so we need to treat them as synchronous

file_uploader = None
if isinstance(declarative_stream, DeclarativeStream):
file_uploader = (
self._constructor.create_component(
model_type=FileUploader,
component_definition=name_to_stream_mapping[declarative_stream.name][
"file_uploader"
],
config=config,
)
if "file_uploader" in name_to_stream_mapping[declarative_stream.name]
else None
)
supports_file_transfer = (
"file_uploader" in name_to_stream_mapping[declarative_stream.name]
)

if (
isinstance(declarative_stream, DeclarativeStream)
Expand Down Expand Up @@ -288,7 +278,6 @@ def _group_streams(
declarative_stream.get_json_schema(),
retriever,
self.message_repository,
file_uploader,
),
stream_slicer=declarative_stream.retriever.stream_slicer,
)
Expand Down Expand Up @@ -319,7 +308,6 @@ def _group_streams(
declarative_stream.get_json_schema(),
retriever,
self.message_repository,
file_uploader,
),
stream_slicer=cursor,
)
Expand All @@ -339,7 +327,7 @@ def _group_streams(
else None,
logger=self.logger,
cursor=cursor,
supports_file_transfer=bool(file_uploader),
supports_file_transfer=supports_file_transfer,
)
)
elif (
Expand All @@ -351,7 +339,6 @@ def _group_streams(
declarative_stream.get_json_schema(),
declarative_stream.retriever,
self.message_repository,
file_uploader,
),
declarative_stream.retriever.stream_slicer,
)
Expand All @@ -372,7 +359,7 @@ def _group_streams(
cursor_field=None,
logger=self.logger,
cursor=final_state_cursor,
supports_file_transfer=bool(file_uploader),
supports_file_transfer=supports_file_transfer,
)
)
elif (
Expand Down Expand Up @@ -412,7 +399,6 @@ def _group_streams(
declarative_stream.get_json_schema(),
retriever,
self.message_repository,
file_uploader,
),
perpartition_cursor,
)
Expand All @@ -427,7 +413,7 @@ def _group_streams(
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
logger=self.logger,
cursor=perpartition_cursor,
supports_file_transfer=bool(file_uploader),
supports_file_transfer=supports_file_transfer,
)
)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,14 @@ definitions:
anyOf:
- "$ref": "#/definitions/CustomRecordExtractor"
- "$ref": "#/definitions/DpathExtractor"
filename_extractor:
description: Defines relative path and name to store the file
type: string
interpolation_context:
- config
- record
examples:
- "{{ record.relative_path }}/{{ record.file_name }}/"
$parameters:
type: object
additional_properties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.utils.transform import TypeTransformer
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader


@dataclass
Expand All @@ -42,6 +43,7 @@ class RecordSelector(HttpSelector):
record_filter: Optional[RecordFilter] = None
transformations: List[RecordTransformation] = field(default_factory=lambda: [])
transform_before_filtering: bool = False
file_uploader: Optional[FileUploader] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
Expand Down Expand Up @@ -117,7 +119,10 @@ def filter_and_transform(
transformed_filtered_data, schema=records_schema
)
for data in normalized_data:
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)
record = Record(data=data, stream_name=self.name, associated_slice=stream_slice)
if self.file_uploader:
self.file_uploader.upload(record)
yield record

def _normalize_by_schema(
self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1989,6 +1989,30 @@ class Config:
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FileUploader(BaseModel):
type: Literal["FileUploader"]
requester: Union[CustomRequester, HttpRequester] = Field(
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API.",
)
download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
...,
description="Responsible for fetching the url where the file is located. This is applied on each records and not on the HTTP response",
)
file_extractor: Optional[Union[CustomRecordExtractor, DpathExtractor]] = Field(
None,
description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content",
)
filename_extractor: str = Field(
...,
description="File Name extractor.",
examples=[
"{{ record.relative_path }}/{{ record.file_name }}/",
],
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class DeclarativeStream(BaseModel):
class Config:
extra = Extra.allow
Expand Down Expand Up @@ -2047,6 +2071,11 @@ class Config:
description="Array of state migrations to be applied on the input state",
title="State Migrations",
)
file_uploader: Optional[FileUploader] = Field(
None,
description="(experimental) Describes how to fetch a file",
title="File Uploader",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down Expand Up @@ -2278,22 +2307,6 @@ class StateDelegatingStream(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FileUploader(BaseModel):
type: Literal["FileUploader"]
requester: Union[CustomRequester, HttpRequester] = Field(
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API.",
)
download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
...,
description="Responsible for fetching the url where the file is located. This is applied on each records and not on the HTTP response",
)
file_extractor: Optional[Union[CustomRecordExtractor, DpathExtractor]] = Field(
None,
description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content",
)


class SimpleRetriever(BaseModel):
type: Literal["SimpleRetriever"]
record_selector: RecordSelector = Field(
Expand Down Expand Up @@ -2324,11 +2337,6 @@ class SimpleRetriever(BaseModel):
description="PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.",
title="Partition Router",
)
file_uploader: Optional[FileUploader] = Field(
None,
description="(experimental) Describes how to fetch a file",
title="File Uploader",
)
decoder: Optional[
Union[
CustomDecoder,
Expand Down Expand Up @@ -2485,6 +2493,7 @@ class DynamicDeclarativeStream(BaseModel):
DeclarativeSource1.update_forward_refs()
DeclarativeSource2.update_forward_refs()
SelectiveAuthenticator.update_forward_refs()
FileUploader.update_forward_refs()
DeclarativeStream.update_forward_refs()
SessionTokenAuthenticator.update_forward_refs()
DynamicSchemaLoader.update_forward_refs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1755,6 +1755,11 @@ def create_declarative_stream(
transformations.append(
self._create_component_from_model(model=transformation_model, config=config)
)
file_uploader = None
if model.file_uploader:
file_uploader = self._create_component_from_model(
model=model.file_uploader, config=config
)

retriever = self._create_component_from_model(
model=model.retriever,
Expand All @@ -1766,6 +1771,7 @@ def create_declarative_stream(
stop_condition_on_cursor=stop_condition_on_cursor,
client_side_incremental_sync=client_side_incremental_sync,
transformations=transformations,
file_uploader=file_uploader,
incremental_sync=model.incremental_sync,
)
cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None
Expand Down Expand Up @@ -2607,6 +2613,7 @@ def create_record_selector(
transformations: List[RecordTransformation] | None = None,
decoder: Decoder | None = None,
client_side_incremental_sync: Dict[str, Any] | None = None,
file_uploader: Optional[FileUploader] = None,
**kwargs: Any,
) -> RecordSelector:
extractor = self._create_component_from_model(
Expand Down Expand Up @@ -2644,6 +2651,7 @@ def create_record_selector(
config=config,
record_filter=record_filter,
transformations=transformations or [],
file_uploader=file_uploader,
schema_normalization=schema_normalization,
parameters=model.parameters or {},
transform_before_filtering=transform_before_filtering,
Expand Down Expand Up @@ -2701,6 +2709,7 @@ def create_simple_retriever(
stop_condition_on_cursor: bool = False,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
transformations: List[RecordTransformation],
file_uploader: Optional[FileUploader] = None,
incremental_sync: Optional[
Union[
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
Expand All @@ -2723,6 +2732,7 @@ def create_simple_retriever(
decoder=decoder,
transformations=transformations,
client_side_incremental_sync=client_side_incremental_sync,
file_uploader=file_uploader,
)
url_base = (
model.requester.url_base
Expand Down Expand Up @@ -3338,7 +3348,13 @@ def create_file_uploader(
name=name,
**kwargs,
)
return FileUploader(requester, download_target_extractor)
return FileUploader(
requester=requester,
download_target_extractor=download_target_extractor,
config=config,
parameters=model.parameters or {},
filename_extractor=model.filename_extractor,
)

def create_moving_window_call_rate_policy(
self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any
Expand Down
58 changes: 40 additions & 18 deletions airbyte_cdk/sources/declarative/retrievers/file_uploader.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,68 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

import json
import logging
from dataclasses import InitVar, dataclass, field
from pathlib import Path
from typing import Optional
from typing import Optional, Mapping, Union, Any

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
InterpolatedString,
)
from airbyte_cdk.models import AirbyteRecordMessageFileReference
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
SafeResponse,
)
from airbyte_cdk.sources.declarative.requesters import Requester
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.utils.files_directory import get_files_directory

logger = logging.getLogger("airbyte")


@dataclass
class FileUploader:
def __init__(
self,
requester: Requester,
download_target_extractor: RecordExtractor,
content_extractor: Optional[RecordExtractor] = None,
) -> None:
self._requester = requester
self._download_target_extractor = download_target_extractor
self._content_extractor = content_extractor
requester: Requester
download_target_extractor: RecordExtractor
config: Config
parameters: InitVar[Mapping[str, Any]]

filename_extractor: Union[InterpolatedString, str]
content_extractor: Optional[RecordExtractor] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._filename_extractor = InterpolatedString.create(
self.filename_extractor,
parameters=parameters,
)

def upload(self, record: Record) -> None:
# TODO validate record shape - is the transformation applied at this point?
mocked_response = SafeResponse()
mocked_response.content = json.dumps(record.data).encode("utf-8")
download_target = list(self._download_target_extractor.extract_records(mocked_response))[0]
download_target = list(self.download_target_extractor.extract_records(mocked_response))[0]
if not isinstance(download_target, str):
raise ValueError(
f"download_target is expected to be a str but was {type(download_target)}: {download_target}"
)

response = self._requester.send_request(
response = self.requester.send_request(
stream_slice=StreamSlice(
partition={}, cursor_slice={}, extra_fields={"download_target": download_target}
),
)

if self._content_extractor:
if self.content_extractor:
raise NotImplementedError("TODO")
else:
files_directory = Path(get_files_directory())
# TODO:: we could either interpolate record data if some relative_path is provided or
# use partition_field value in the slice {"partition_field": some_value_id} to create a path
file_relative_path = Path(record.stream_name) / record.data["file_name"]

relative_path = self._filename_extractor.eval(self.config, record=record)
relative_path = relative_path.lstrip("/")
file_relative_path = Path(relative_path)

full_path = files_directory / file_relative_path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have the stream name somewhere in there? It feels like multiple streams could have a file with the same name.

Even more than this, should we have a unique ID per file? It feels like there could even be two files in the same stream with the same name...

Copy link
Contributor Author

@aldogonzalez8 aldogonzalez8 Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well , for Zendesk support, we do actually, e.g.:

filename_extractor: "{{ record.relative_path }}/{{ record.file_name }}/"

Interpolates as:

hc/article_attachments/"attachments_id"/"name _of_the_file.extension"

This works for this specific endpoint in Zendesk, but I can see it is not guaranteed for every connector in the future. So, I guess we can let the user add any extra path but make the component prefix to the path the stream and the attachment/file ID.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so what we are saying is that it is the developer's responsibility to make sure there are no clash. Could we remove this concern from the developer's and do it ourselves?

Regarding timing: I'm not 100% sure we need this right now and maybe we can make filename_extractor optional in the future when we find a solution this this. On the top of my head, I can only see one way and it is when the stream declares a PK which seems to be common when I checked for Confluence, Jira and Salesforce so maybe this is viable in the future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so what we are saying is that it is the developer's responsibility to make sure there are no clash. Could we remove this concern from the developer's and do it ourselves?

No, I didn't make myself clear. I'm sorry about that. To reduce the risk of collisions, I will add the stream name + unique ID on the backend (CDK).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the logic for the unique ID? Autogenerated UUID?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can:

  • Add ourselves stream name to the path reducing collision risk
  • Make filename_extractor optional so the developer can include a unique ID. There is a risk that he could mess up, but we can add some documentation to the component.
  • Use Autogenerated UUID if filename_extractor is not present.

full_path.parent.mkdir(parents=True, exist_ok=True)
Expand All @@ -54,8 +71,13 @@ def upload(self, record: Record) -> None:
f.write(response.content)
file_size_bytes = full_path.stat().st_size

logger.info("File uploaded successfully")
logger.info(f"File url: {str(full_path)} ")
logger.info(f"File size: {file_size_bytes / 1024} KB")
logger.info(f"File relative path: {str(file_relative_path)}")

record.file_reference = AirbyteRecordMessageFileReference(
file_url=download_target,
file_url=str(full_path),
file_relative_path=str(file_relative_path),
file_size_bytes=file_size_bytes,
)
Loading
Loading