Skip to content

Commit 95e9507

Browse files
authored
[HWORKS-2799] Use feature group location to detect HopsFS storage for sink enabled feature groups (#958)
1 parent 9945405 commit 95e9507

2 files changed

Lines changed: 43 additions & 1 deletion

File tree

python/hsfs/feature_group.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
Literal,
2828
TypeVar,
2929
)
30+
from urllib.parse import urlparse
3031

3132
import avro.schema
3233
import hsfs.expectation_suite
@@ -3105,7 +3106,20 @@ def _init_time_travel_and_stream(
31053106
)
31063107

31073108
def _is_hopsfs_storage(self) -> bool:
3108-
"""Return True if storage is HopsFS."""
3109+
"""Return True if the offline storage location is HopsFS.
3110+
3111+
Sink-enabled feature groups can keep the source storage connector
3112+
(for example Redshift) attached while their offline data still lives
3113+
on the default HopsFS warehouse path.
3114+
In that case the location is the reliable signal for how delta-rs
3115+
should talk to storage.
3116+
"""
3117+
location = getattr(self, "location", None)
3118+
if isinstance(location, str):
3119+
scheme = urlparse(location).scheme
3120+
if scheme in {"hopsfs", "hdfs"}:
3121+
return True
3122+
31093123
return self.storage_connector is None or (
31103124
self.storage_connector is not None
31113125
and self.storage_connector.type == sc.StorageConnector.HOPSFS

python/tests/test_feature_group.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -944,6 +944,34 @@ def test_is_hopsfs_storage(self, sc, expected):
944944
fg.storage_connector = sc
945945
assert fg._is_hopsfs_storage() is expected
946946

947+
def test_is_hopsfs_storage_uses_location_when_source_connector_is_external(
948+
self,
949+
):
950+
fg = get_test_feature_group()
951+
fg._location = (
952+
"hopsfs://rpc.namenode.service.consul:8020/apps/hive/warehouse/fs.db/fg_1"
953+
)
954+
fg.storage_connector = storage_connector.RedshiftConnector(
955+
id=1,
956+
name="redshift",
957+
featurestore_id=1,
958+
)
959+
960+
assert fg._is_hopsfs_storage() is True
961+
962+
def test_is_hopsfs_storage_uses_single_slash_hopsfs_location(
963+
self,
964+
):
965+
fg = get_test_feature_group()
966+
fg._location = "hopsfs:/apps/hive/warehouse/fs.db/fg_1"
967+
fg.storage_connector = storage_connector.RedshiftConnector(
968+
id=1,
969+
name="redshift",
970+
featurestore_id=1,
971+
)
972+
973+
assert fg._is_hopsfs_storage() is True
974+
947975
def test_init_time_travel_and_stream_uses_resolvers_python(
948976
self, mocker, monkeypatch
949977
):

0 commit comments

Comments
 (0)