Skip to content

Commit d9ea2d0

Browse files
committed
fixing bug
1 parent d887a43 commit d9ea2d0

2 files changed

Lines changed: 72 additions & 6 deletions

File tree

python/hsfs/feature_view.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5111,12 +5111,19 @@ def on_demand_transformations(self) -> dict[str, Callable]:
51115111

51125112
@property
51135113
def _on_demand_transformation_functions(self) -> list[TransformationFunction]:
5114-
"""Get all on-demand transformations in the feature view."""
5115-
return [
5116-
feature.on_demand_transformation_function
5117-
for feature in self.features
5118-
if feature.on_demand_transformation_function
5119-
]
5114+
"""Get all on-demand transformations in the feature view.
5115+
5116+
A multi-output TF appears on every feature it produces, so the raw
5117+
collection contains duplicates. Dedup by `__eq__` (which compares the
5118+
serialized TF) and preserve the first occurrence so downstream DAG
5119+
construction sees each TF exactly once.
5120+
"""
5121+
seen: list[TransformationFunction] = []
5122+
for feature in self.features:
5123+
tf = feature.on_demand_transformation_function
5124+
if tf and tf not in seen:
5125+
seen.append(tf)
5126+
return seen
51205127

51215128
@public
51225129
@property

python/tests/test_feature_view.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,65 @@ def test_from_response_json_transformation_function(self, mocker, backend_fixtur
162162
assert len(fv.schema) == 2
163163
assert isinstance(fv.schema[0], training_dataset_feature.TrainingDatasetFeature)
164164

165+
def test_on_demand_transformation_functions_dedup_multi_output_tf(self, mocker):
166+
"""Regression: a multi-output ODT is attached to each feature it produces.
167+
168+
Without dedup, the property returns the TF twice — once per feature — and
169+
TransformationExecutionDAG raises "Output column 'x' is produced by both
170+
'tf' and 'tf'." This was hit at FV creation time when reading the
171+
backend response back, breaking any chained pipeline with a multi-output
172+
TF (e.g. add_two -> odt2_1, odt2_2).
173+
"""
174+
from hsfs.core import transformation_execution_dag
175+
from hsfs.transformation_function import (
176+
TransformationFunction,
177+
TransformationType,
178+
)
179+
180+
mocker.patch("hopsworks_common.client.get_instance")
181+
mocker.patch("hsfs.engine.get_type", return_value="python")
182+
183+
@udf([float, float])
184+
def add_two(feature):
185+
return feature + 2, feature + 2
186+
187+
tf = TransformationFunction(
188+
featurestore_id=99,
189+
hopsworks_udf=add_two,
190+
transformation_type=TransformationType.ON_DEMAND,
191+
)("feature").alias("odt2_1", "odt2_2")
192+
193+
fv = feature_view.FeatureView(
194+
name="fv_dedup",
195+
query=fg1.select_features(),
196+
featurestore_id=99,
197+
featurestore_name="test_fs",
198+
)
199+
# Multi-output TF -> the SAME TF appears on every output feature.
200+
# The backend response path produces fresh Python instances per feature,
201+
# so simulate that by attaching distinct copies that compare equal.
202+
fv.schema = [
203+
training_dataset_feature.TrainingDatasetFeature(
204+
name="odt2_1",
205+
type="float",
206+
transformation_function=tf,
207+
),
208+
training_dataset_feature.TrainingDatasetFeature(
209+
name="odt2_2",
210+
type="float",
211+
transformation_function=tf,
212+
),
213+
]
214+
215+
# Property must dedup
216+
odts = fv._on_demand_transformation_functions
217+
assert len(odts) == 1
218+
assert odts[0].hopsworks_udf.function_name == "add_two"
219+
220+
# DAG construction must succeed (no "produced by both 'add_two' and 'add_two'")
221+
dag = transformation_execution_dag.TransformationExecutionDAG(odts)
222+
assert len(dag.nodes) == 1
223+
165224
def test_from_response_json_basic_info_deprecated(self, mocker, backend_fixtures):
166225
# Arrange
167226
mocker.patch("hsfs.engine.get_type")

0 commit comments

Comments
 (0)