Skip to content

Commit 7b058c9

Browse files
committed
file-mode-api: add filename_extractor component to define relative path to store the file.
1 parent 2e6d92e commit 7b058c9

File tree

5 files changed

+64
-18
lines changed

5 files changed

+64
-18
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+8
Original file line numberDiff line numberDiff line change
@@ -1449,6 +1449,14 @@ definitions:
14491449
anyOf:
14501450
- "$ref": "#/definitions/CustomRecordExtractor"
14511451
- "$ref": "#/definitions/DpathExtractor"
1452+
filename_extractor:
1453+
description: Defines relative path and name to store the file
1454+
type: string
1455+
interpolation_context:
1456+
- config
1457+
- record
1458+
examples:
1459+
- "{{ record.relative_path }}/{{ record.file_name }}/"
14521460
$parameters:
14531461
type: object
14541462
additional_properties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+8
Original file line numberDiff line numberDiff line change
@@ -2003,6 +2003,14 @@ class FileUploader(BaseModel):
20032003
None,
20042004
description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content",
20052005
)
2006+
filename_extractor: str = Field(
2007+
...,
2008+
description="File Name extractor.",
2009+
examples=[
2010+
"{{ record.relative_path }}/{{ record.file_name }}/",
2011+
],
2012+
)
2013+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
20062014

20072015

20082016
class DeclarativeStream(BaseModel):

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -3348,7 +3348,13 @@ def create_file_uploader(
33483348
name=name,
33493349
**kwargs,
33503350
)
3351-
return FileUploader(requester, download_target_extractor)
3351+
return FileUploader(
3352+
requester=requester,
3353+
download_target_extractor=download_target_extractor,
3354+
config=config,
3355+
parameters=model.parameters or {},
3356+
filename_extractor=model.filename_extractor,
3357+
)
33523358

33533359
def create_moving_window_call_rate_policy(
33543360
self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,68 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
15
import json
6+
import logging
7+
from dataclasses import InitVar, dataclass, field
28
from pathlib import Path
3-
from typing import Optional
9+
from typing import Optional, Mapping, Union, Any
410

11+
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
12+
InterpolatedString,
13+
)
514
from airbyte_cdk.models import AirbyteRecordMessageFileReference
615
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
716
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
817
SafeResponse,
918
)
1019
from airbyte_cdk.sources.declarative.requesters import Requester
1120
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
21+
from airbyte_cdk.sources.types import Config
1222
from airbyte_cdk.sources.utils.files_directory import get_files_directory
1323

24+
logger = logging.getLogger("airbyte")
1425

26+
27+
@dataclass
1528
class FileUploader:
16-
def __init__(
17-
self,
18-
requester: Requester,
19-
download_target_extractor: RecordExtractor,
20-
content_extractor: Optional[RecordExtractor] = None,
21-
) -> None:
22-
self._requester = requester
23-
self._download_target_extractor = download_target_extractor
24-
self._content_extractor = content_extractor
29+
requester: Requester
30+
download_target_extractor: RecordExtractor
31+
config: Config
32+
parameters: InitVar[Mapping[str, Any]]
33+
34+
filename_extractor: Union[InterpolatedString, str]
35+
content_extractor: Optional[RecordExtractor] = None
36+
37+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
38+
self._filename_extractor = InterpolatedString.create(
39+
self.filename_extractor,
40+
parameters=parameters,
41+
)
2542

2643
def upload(self, record: Record) -> None:
2744
mocked_response = SafeResponse()
2845
mocked_response.content = json.dumps(record.data).encode("utf-8")
29-
download_target = list(self._download_target_extractor.extract_records(mocked_response))[0]
46+
download_target = list(self.download_target_extractor.extract_records(mocked_response))[0]
3047
if not isinstance(download_target, str):
3148
raise ValueError(
3249
f"download_target is expected to be a str but was {type(download_target)}: {download_target}"
3350
)
3451

35-
response = self._requester.send_request(
52+
response = self.requester.send_request(
3653
stream_slice=StreamSlice(
3754
partition={}, cursor_slice={}, extra_fields={"download_target": download_target}
3855
),
3956
)
4057

41-
if self._content_extractor:
58+
if self.content_extractor:
4259
raise NotImplementedError("TODO")
4360
else:
4461
files_directory = Path(get_files_directory())
45-
# TODO:: we could either interpolate record data if some relative_path is provided or
46-
# use partition_field value in the slice {"partition_field": some_value_id} to create a path
47-
file_relative_path = Path(record.stream_name) / record.data["file_name"]
62+
63+
relative_path = self._filename_extractor.eval(self.config, record=record)
64+
relative_path = relative_path.lstrip("/")
65+
file_relative_path = Path(relative_path)
4866

4967
full_path = files_directory / file_relative_path
5068
full_path.parent.mkdir(parents=True, exist_ok=True)
@@ -53,8 +71,13 @@ def upload(self, record: Record) -> None:
5371
f.write(response.content)
5472
file_size_bytes = full_path.stat().st_size
5573

74+
logger.info("File uploaded successfully")
75+
logger.info(f"File url: {str(full_path)} ")
76+
logger.info(f"File size: {file_size_bytes / 1024} KB")
77+
logger.info(f"File relative path: {str(file_relative_path)}")
78+
5679
record.file_reference = AirbyteRecordMessageFileReference(
57-
file_url=download_target,
80+
file_url=str(full_path),
5881
file_relative_path=str(file_relative_path),
5982
file_size_bytes=file_size_bytes,
6083
)

unit_tests/sources/declarative/file/file_stream_manifest.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ definitions:
161161
download_target_extractor:
162162
type: DpathExtractor
163163
field_path: [ "content_url" ]
164+
filename_extractor: "{{ record.relative_path }}/{{ record.file_name }}/"
164165

165166

166167
streams:

0 commit comments

Comments
 (0)