Skip to content

Commit 56bbc09

Browse files
feat(iceberg): auto-configure IOConfig for Alibaba Cloud OSS (#6993)
## Changes Made OSS is S3-compatible; it just needs `force_virtual_addressing` plus an `oss`→`s3`protocol alias. `_convert_iceberg_file_io_properties_to_io_config` now applies these when the table `location` is `oss://`, routing it through the existing S3 path — same approach as PyIceberg's `_initialize_oss_fs`. No new backend. Works even with no table properties (env-var creds). Pass an explicit `io_config` to opt out. Scoped to the Iceberg connector.
1 parent d7ccb93 commit 56bbc09

3 files changed

Lines changed: 83 additions & 4 deletions

File tree

daft/dataframe/dataframe.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1341,7 +1341,9 @@ def write_iceberg(
13411341
)
13421342

13431343
io_config = (
1344-
_convert_iceberg_file_io_properties_to_io_config(table.io.properties) if io_config is None else io_config
1344+
_convert_iceberg_file_io_properties_to_io_config(table.io.properties, table.location())
1345+
if io_config is None
1346+
else io_config
13451347
)
13461348
io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config
13471349

daft/io/iceberg/_iceberg.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
# ruff: noqa: I002
22
# isort: dont-add-import: from __future__ import annotations
33

4+
import logging
45
from typing import TYPE_CHECKING, Any, Union
56

67
from daft import context, runners
78
from daft.api_annotations import PublicAPI
89
from daft.daft import IOConfig, ScanOperatorHandle, StorageConfig
910
from daft.dataframe import DataFrame
11+
from daft.filesystem import get_protocol_from_path
1012
from daft.io._checkpoint import attach_checkpoint
1113
from daft.logical.builder import LogicalPlanBuilder
1214

@@ -16,8 +18,18 @@
1618
from daft.checkpoint import CheckpointConfig
1719

1820

19-
def _convert_iceberg_file_io_properties_to_io_config(props: dict[str, Any]) -> IOConfig | None:
20-
"""Property keys defined here: https://github.com/apache/iceberg-python/blob/main/pyiceberg/io/__init__.py."""
21+
logger = logging.getLogger(__name__)
22+
23+
24+
def _convert_iceberg_file_io_properties_to_io_config(
25+
props: dict[str, Any], location: str | None = None
26+
) -> IOConfig | None:
27+
"""Property keys defined here: https://github.com/apache/iceberg-python/blob/main/pyiceberg/io/__init__.py.
28+
29+
For an ``oss://`` ``location`` (Alibaba Cloud OSS, S3-compatible), the IOConfig gets
30+
virtual-hosted addressing and an ``oss``->``s3`` alias so the S3 filesystem resolves
31+
``oss://`` paths -- applied even with no IO properties (e.g. env-var credentials).
32+
"""
2133
from daft.io import AzureConfig, GCSConfig, IOConfig, S3Config
2234

2335
any_props_set = False
@@ -30,13 +42,16 @@ def get_first_property_value(*property_names: str) -> Any | None:
3042
return property_value
3143
return None
3244

45+
is_oss = location is not None and get_protocol_from_path(location) == "oss"
46+
3347
io_config = IOConfig(
3448
s3=S3Config(
3549
endpoint_url=get_first_property_value("s3.endpoint"),
3650
region_name=get_first_property_value("s3.region", "client.region"),
3751
key_id=get_first_property_value("s3.access-key-id", "client.access-key-id"),
3852
access_key=get_first_property_value("s3.secret-access-key", "client.secret-access-key"),
3953
session_token=get_first_property_value("s3.session-token", "client.session-token"),
54+
force_virtual_addressing=True if is_oss else None,
4055
),
4156
azure=AzureConfig(
4257
storage_account=get_first_property_value("adls.account-name", "adlfs.account-name"),
@@ -50,8 +65,12 @@ def get_first_property_value(*property_names: str) -> Any | None:
5065
project_id=get_first_property_value("gcs.project-id"),
5166
token=get_first_property_value("gcs.oauth2.token"),
5267
),
68+
protocol_aliases={"oss": "s3"} if is_oss else None,
5369
)
5470

71+
if is_oss:
72+
logger.debug("oss:// table detected; applying S3-compatible settings to the IOConfig")
73+
return io_config
5574
return io_config if any_props_set else None
5675

5776

@@ -107,7 +126,9 @@ def read_iceberg(
107126
table = StaticTable.from_metadata(metadata_location=table)
108127

109128
io_config = (
110-
_convert_iceberg_file_io_properties_to_io_config(table.io.properties) if io_config is None else io_config
129+
_convert_iceberg_file_io_properties_to_io_config(table.io.properties, table.location())
130+
if io_config is None
131+
else io_config
111132
)
112133
io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config
113134

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from __future__ import annotations
2+
3+
from daft.io.iceberg._iceberg import _convert_iceberg_file_io_properties_to_io_config
4+
5+
6+
def test_oss_location_applies_settings():
7+
"""An oss:// table location enables virtual-hosted addressing and the oss->s3 alias."""
8+
props = {"s3.endpoint": "http://oss-cn-hangzhou.aliyuncs.com", "s3.access-key-id": "ak"}
9+
result = _convert_iceberg_file_io_properties_to_io_config(props, "oss://my-bucket/warehouse/db/table")
10+
assert result is not None
11+
assert result.s3.force_virtual_addressing is True
12+
assert result.protocol_aliases == {"oss": "s3"}
13+
# Table properties are still applied.
14+
assert result.s3.endpoint_url == "http://oss-cn-hangzhou.aliyuncs.com"
15+
assert result.s3.key_id == "ak"
16+
17+
18+
def test_oss_location_no_props():
19+
"""An oss:// table with no IO properties still gets the OSS settings.
20+
21+
Covers credentials supplied via the environment rather than table properties.
22+
"""
23+
result = _convert_iceberg_file_io_properties_to_io_config({}, "oss://my-bucket/warehouse/db/table")
24+
assert result is not None
25+
assert result.s3.force_virtual_addressing is True
26+
assert result.protocol_aliases == {"oss": "s3"}
27+
28+
29+
def test_non_oss_location_with_props():
30+
"""A non-oss:// location with properties leaves the OSS settings off."""
31+
props = {"s3.endpoint": "https://s3.us-west-2.amazonaws.com"}
32+
result = _convert_iceberg_file_io_properties_to_io_config(props, "s3://my-bucket/warehouse/db/table")
33+
assert result is not None
34+
assert result.s3.force_virtual_addressing is False
35+
assert result.protocol_aliases == {}
36+
assert result.s3.endpoint_url == "https://s3.us-west-2.amazonaws.com"
37+
38+
39+
def test_non_oss_location_no_props_returns_none():
40+
"""A non-oss:// location with no properties yields no IOConfig."""
41+
assert _convert_iceberg_file_io_properties_to_io_config({}, "s3://my-bucket/warehouse/db/table") is None
42+
43+
44+
def test_no_location_with_props():
45+
"""With no location (default), properties still convert and OSS settings stay off."""
46+
props = {"s3.endpoint": "https://s3.us-west-2.amazonaws.com"}
47+
result = _convert_iceberg_file_io_properties_to_io_config(props)
48+
assert result is not None
49+
assert result.s3.force_virtual_addressing is False
50+
assert result.protocol_aliases == {}
51+
assert result.s3.endpoint_url == "https://s3.us-west-2.amazonaws.com"
52+
53+
54+
def test_no_location_no_props_returns_none():
55+
"""With no location (default) and no properties, no IOConfig is produced."""
56+
assert _convert_iceberg_file_io_properties_to_io_config({}) is None

0 commit comments

Comments
 (0)