Skip to content

Commit ecb45df

Browse files
committed
Merge branch 'main' into fstore-2036-uc-oauth-m2m-sdk
2 parents cb0242c + 9235e44 commit ecb45df

5 files changed

Lines changed: 387 additions & 7 deletions

File tree

java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public StreamFeatureGroup getOrCreateFeatureGroup(FeatureStore featureStore, @No
9999
try {
100100
return getStreamFeatureGroup(featureStore, name, version);
101101
} catch (IOException | FeatureStoreException e) {
102-
if (e.getMessage().contains("\"errorCode\":270009")) {
102+
if (e.getMessage().contains("Error: 404") && e.getMessage().contains("\"errorCode\":270009")) {
103103
return StreamFeatureGroup.builder()
104104
.featureStore(featureStore)
105105
.name(name)

java/hsfs/src/test/java/com/logicalclocks/hsfs/engine/TestFeatureGroupEngine.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ void setUp() throws Exception {
4949
}
5050

5151
@Test
52-
void testGetOrCreateReturnsFgWhenBackendReturns400NotFound() throws Exception {
53-
// Backend returns 400 with errorCode 270009 (feature group not found)
52+
void testGetOrCreateReturnsFgWhenBackendReturns404NotFound() throws Exception {
53+
// Backend returns 404 with errorCode 270009 (feature group not found)
5454
Mockito.when(mockApi.getInternal(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
55-
.thenThrow(new IOException("Error: 400{\"errorCode\":270009,\"errorMsg\":\"Featuregroup wasn't found.\"}"));
55+
.thenThrow(new IOException("Error: 404{\"errorCode\":270009,\"errorMsg\":\"Featuregroup wasn't found.\"}"));
5656

5757
StreamFeatureGroup result = engine.getOrCreateFeatureGroup(
5858
mockFeatureStore, "test_fg", 1, "desc", true,

python/hsfs/core/storage_connector_api.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#
1616
from __future__ import annotations
1717

18+
import json
1819
from typing import TYPE_CHECKING, Any
1920

2021
from hopsworks_common import client
@@ -81,6 +82,79 @@ def refetch(
8182
)
8283
)
8384

85+
def resolve_unity_catalog_spark_options(
86+
self,
87+
feature_store_id: int,
88+
name: str,
89+
catalog: str,
90+
schema: str,
91+
table: str,
92+
) -> dict[str, Any]:
93+
"""Call the EE Unity Catalog spark-options resolver for a connector + table.
94+
95+
Response carries short-lived AWS credentials, so the EE side sets
96+
Cache-Control: no-store.
97+
98+
Parameters:
99+
feature_store_id: Numeric id of the feature store containing the connector.
100+
name: Name of the Unity Catalog storage connector.
101+
catalog: UC catalog name.
102+
schema: UC schema name within the catalog.
103+
table: UC table name within the schema.
104+
105+
Returns:
106+
The raw JSON dict from the resolver.
107+
The calling SDK code wraps it into a UnityCatalogSparkOptions dataclass.
108+
"""
109+
_client = client.get_instance()
110+
path_params = [
111+
"project",
112+
_client._project_id,
113+
"featurestores",
114+
feature_store_id,
115+
"storageconnectors",
116+
name,
117+
"data_source",
118+
"spark_options",
119+
]
120+
body = json.dumps({"catalog": catalog, "schema": schema, "table": table})
121+
return _client._send_request(
122+
"POST",
123+
path_params,
124+
headers={"content-type": "application/json"},
125+
data=body,
126+
)
127+
128+
def resolve_feature_group_spark_options(
129+
self,
130+
feature_store_id: int,
131+
feature_group_id: int,
132+
) -> dict[str, Any]:
133+
"""Sibling of resolve_unity_catalog_spark_options for FG-driven reads.
134+
135+
The FG already knows its (catalog, schema, table) via its data source
136+
metadata, so the SDK doesn't pass them. EE looks them up server-side.
137+
138+
Parameters:
139+
feature_store_id: Numeric id of the feature store containing the feature group.
140+
feature_group_id: Numeric id of the Unity Catalog-backed external feature group.
141+
142+
Returns:
143+
The raw JSON dict from the resolver.
144+
Same shape as resolve_unity_catalog_spark_options.
145+
"""
146+
_client = client.get_instance()
147+
path_params = [
148+
"project",
149+
_client._project_id,
150+
"featurestores",
151+
feature_store_id,
152+
"featuregroups",
153+
feature_group_id,
154+
"spark_options",
155+
]
156+
return _client._send_request("POST", path_params)
157+
84158
def get_online_connector(
85159
self, feature_store_id: int
86160
) -> storage_connector.OnlineStorageConnector:

python/hsfs/feature_group.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5199,6 +5199,47 @@ def insert(
51995199
ge_report.to_ge_type() if ge_report is not None else None,
52005200
)
52015201

5202+
def _maybe_read_unity_catalog_via_spark(
5203+
self, *, force_vended: bool = False
5204+
) -> Any | None:
5205+
"""Return a Spark DataFrame for UC-backed external FGs, or None.
5206+
5207+
Returns None for any FG that isn't a UC external FG so the caller
5208+
can fall through to the standard Query path.
5209+
On Databricks-hosted Spark (auto-detected) routes to native
5210+
`spark.read.table()`; otherwise calls the FG-level
5211+
spark-options resolver and reads the vended Delta path.
5212+
`force_vended=True` skips the Databricks detection.
5213+
"""
5214+
from hsfs import storage_connector as storage_connector_mod
5215+
from hsfs.core import storage_connector_api
5216+
5217+
ds = getattr(self, "_data_source", None) or getattr(self, "data_source", None)
5218+
connector = getattr(ds, "_storage_connector", None) if ds is not None else None
5219+
if (
5220+
connector is None
5221+
or getattr(connector, "type", None)
5222+
!= storage_connector_mod.StorageConnector.UNITY_CATALOG
5223+
):
5224+
return None
5225+
5226+
spark = engine.get_instance()._spark_session
5227+
5228+
if not force_vended and storage_connector_mod._running_in_databricks(spark):
5229+
qualified = storage_connector_mod._quote_uc_identifier(
5230+
ds.database, ds.group, ds.table
5231+
)
5232+
return spark.read.table(qualified)
5233+
5234+
api = storage_connector_api.StorageConnectorApi()
5235+
payload = api.resolve_feature_group_spark_options(
5236+
self._feature_store_id, self._id
5237+
)
5238+
opts = storage_connector_mod.UnityCatalogSparkOptions.from_response_json(
5239+
payload
5240+
)
5241+
return opts.read(spark)
5242+
52025243
@public
52035244
def read(
52045245
self,
@@ -5209,6 +5250,8 @@ def read(
52095250
read_options: dict[str, Any] | None = None,
52105251
start_time: str | int | datetime | date | None = None,
52115252
end_time: str | int | datetime | date | None = None,
5253+
*,
5254+
force_vended: bool = False,
52125255
) -> (
52135256
TypeVar("pyspark.sql.DataFrame")
52145257
| TypeVar("pyspark.RDD")
@@ -5283,6 +5326,13 @@ def read(
52835326
`%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, `%Y-%m-%d %H:%M:%S.%f`,
52845327
or ISO-8601 UTC `%Y-%m-%dT%H:%M:%S.%fZ` (e.g. `2026-01-01T00:00:00.000000Z`).
52855328
Scheduler-injected `HOPS_START_TIME` / `HOPS_END_TIME` use the ISO-8601 form.
5329+
force_vended:
5330+
For Unity Catalog-backed external feature groups read with Spark: skip the
5331+
Databricks-runtime auto-detection and always resolve vended S3 credentials
5332+
via Hopsworks instead of falling through to `spark.read.table()`.
5333+
Use when the Databricks cluster's identity lacks UC grants the connector's
5334+
service principal has, or to force the Hopsworks read path in tests.
5335+
Ignored for non-UC feature groups and for non-Spark dataframe types.
52865336
52875337
Returns:
52885338
A dataframe in the requested format containing the feature group data.
@@ -5303,6 +5353,22 @@ def read(
53035353
start_time, end_time
53045354
)
53055355

5356+
# Unity Catalog external FG + spark engine: short-circuit through
5357+
# the FG-level spark-options resolver. The standard Query path
5358+
# can't read UC tables (no Spark UC adapter in the Hopsworks
5359+
# cluster's Spark); the resolver vends short-lived S3 credentials
5360+
# and we read the underlying Delta files directly.
5361+
if (
5362+
dataframe_type in ("default", "spark")
5363+
and engine.get_type().startswith("spark")
5364+
and not online
5365+
and start_time is None
5366+
and end_time is None
5367+
):
5368+
uc_df = self._maybe_read_unity_catalog_via_spark(force_vended=force_vended)
5369+
if uc_df is not None:
5370+
return uc_df
5371+
53065372
if (
53075373
engine.get_type() == "python"
53085374
and not online

0 commit comments

Comments
 (0)