Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions python/hopsworks_common/client/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,6 @@ class UnknownSecretStorageError(Exception):
class FeatureStoreException(Exception):
"""Generic feature store exception."""

Comment thread
jimdowling marked this conversation as resolved.
DUPLICATE_RECORD_ERROR_MESSAGE = (
"Duplicate records detected: The dataset contains multiple rows that share identical values "
"across all available columns from primary_key, and if defined: event_time and partition_key. "
"Please remove or deduplicate these records before inserting."
)


class TransformationFunctionException(Exception):
"""Exception raised when a transformation function fails."""
Expand Down
111 changes: 0 additions & 111 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -1016,108 +1016,6 @@ def _to_arrow_table(self, dataframe: pd.DataFrame | pl.DataFrame):
f"Unsupported dataframe type for arrow conversion: {type(dataframe)}"
)

def _check_duplicate_records(self, dataset, feature_group_instance):
"""Check for duplicate records within primary_key, event_time and partition_key columns.

Raises FeatureStoreException if duplicates are found.

Parameters:
-----------
dataset : Union[pd.DataFrame, pl.DataFrame]
The dataset to check for duplicates
feature_group_instance : FeatureGroup
The feature group instance containing primary_key, event_time and partition_key
"""
# Get the unique key columns to check (primary_key + event_time + partition_key)
key_columns = set(feature_group_instance.primary_key)

if not key_columns:
# No keys to check, skip validation
return

if feature_group_instance.event_time:
key_columns.add(feature_group_instance.event_time)

if feature_group_instance.partition_key:
key_columns.update(feature_group_instance.partition_key)

# Materialize as a sorted list so downstream .select()/.group_by() get a
# deterministic, ordered sequence instead of a set.
key_columns = sorted(key_columns)

# Verify all key columns exist against the original dataframe — no conversion needed.
if isinstance(dataset, pd.DataFrame) or (
HAS_POLARS and isinstance(dataset, pl.DataFrame)
):
available_columns = list(dataset.columns)
else:
available_columns = list(self._to_arrow_table(dataset).column_names)

missing_columns = [col for col in key_columns if col not in available_columns]
if missing_columns:
raise FeatureStoreException(
f"Key columns {missing_columns} are missing from the dataset. "
f"Available columns: {available_columns}"
)

import pyarrow as pa
import pyarrow.compute as pc

# Convert only the key columns to Arrow — avoids transcoding all feature columns
# (including costly numpy-U → UTF-8 re-encoding) for a check that only needs keys.
if isinstance(dataset, pd.DataFrame):
key_table = pa.Table.from_pandas(dataset[key_columns], preserve_index=False)
elif HAS_POLARS and isinstance(dataset, pl.DataFrame):
key_table = dataset.select(key_columns).to_arrow()
else:
key_table = self._to_arrow_table(dataset).select(key_columns)

# Check for duplicates using PyArrow group_by
# Group by key columns and count occurrences
grouped = key_table.group_by(key_columns).aggregate(
[
# The aggregation tuple structure: ([], function_name, FunctionOptions)
([], "count_all", pc.CountOptions(mode="all"))
]
)

# Filter groups with count > 1 (duplicates)
duplicate_groups = grouped.filter(pc.greater(grouped["count_all"], 1))

duplicate_count = len(duplicate_groups)

if duplicate_count > 0:
# Get total number of duplicate rows (sum of counts - 1 for each duplicate group)
# Since count includes the first occurrence, duplicates = count - 1 per group
total_duplicate_rows = (
sum(duplicate_groups["count_all"].to_pylist()) - duplicate_count
)

# Get sample duplicate records for error message
# Take first 10 duplicate groups and get their key values
sample_groups = duplicate_groups.slice(0, min(10, duplicate_count))

# Build sample string showing the duplicate key combinations
sample_rows = []
for i in range(len(sample_groups)):
row_dict = {}
for col in key_columns:
row_dict[col] = sample_groups[col][i].as_py()
row_dict["count_all"] = sample_groups["count_all"][i].as_py()
sample_rows.append(str(row_dict))

sample_str = "\n".join(sample_rows)

raise FeatureStoreException(
FeatureStoreException.DUPLICATE_RECORD_ERROR_MESSAGE
+ f"\nDataset contains {total_duplicate_rows} duplicate record(s) within "
f"primary_key ({feature_group_instance.primary_key}), "
f"event_time ({feature_group_instance.event_time}) and "
f"partition_key ({feature_group_instance.partition_key}). "
f"Found {duplicate_count} duplicate group(s). "
f"Sample duplicate key combinations:\n{sample_str}"
)

def _mark_online_rows(
self,
feature_group: FeatureGroup,
Expand Down Expand Up @@ -1190,15 +1088,6 @@ def save_dataframe(
online_write_options: dict[str, Any],
validation_id: int | None = None,
) -> job.Job | None:
if (
# Only `FeatureGroup` class has time_travel_format property
isinstance(feature_group, FeatureGroup)
and feature_group.time_travel_format == "DELTA"
and storage in [None, "offline"]
):
self._check_duplicate_records(dataframe, feature_group)
_logger.debug("No duplicate records found. Proceeding with Delta write.")

if (
not isinstance(feature_group, fg_mod.ExternalFeatureGroup)
and feature_group.stream
Expand Down
92 changes: 0 additions & 92 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
array,
col,
concat,
count,
current_timestamp,
from_json,
lit,
Expand Down Expand Up @@ -565,86 +564,6 @@ def convert_pandas_to_spark_dataframe(self, dataframe):
)
return self._spark_session.createDataFrame(dataframe_copy)

def _check_duplicate_records(self, dataframe, feature_group):
"""Check for duplicate records within primary_key, event_time and partition_key columns.

Raises FeatureStoreException if duplicates are found.

Parameters:
-----------
dataframe : pyspark.sql.DataFrame
The Spark DataFrame to check for duplicates
feature_group : FeatureGroup
The feature group instance containing primary_key, event_time and partition_key
"""
# Get the key columns to check (primary_key + partition_key)
key_columns = list(feature_group.primary_key)

if not key_columns:
# No keys to check, skip validation
return

if feature_group.event_time:
key_columns.append(feature_group.event_time)

if feature_group.partition_key:
key_columns.extend(feature_group.partition_key)

# Verify all key columns exist in the dataset
dataframe_columns = dataframe.columns
missing_columns = [
col_name for col_name in key_columns if col_name not in dataframe_columns
]
if missing_columns:
raise FeatureStoreException(
f"Key columns {missing_columns} are missing from the dataset. "
f"Available columns: {dataframe_columns}"
)

# Check for duplicates using Spark groupBy and count
# Group by key columns and count occurrences
grouped = dataframe.groupBy(*key_columns).agg(count("*").alias("count"))

# Filter groups with count > 1 (duplicates)
duplicate_groups = grouped.filter(col("count") > 1)

# Count the number of duplicate groups
duplicate_count = duplicate_groups.count()

if duplicate_count > 0:
# Get total number of duplicate rows (sum of counts - 1 for each duplicate group)
# Since count includes the first occurrence, duplicates = count - 1 per group
duplicate_rows_data = duplicate_groups.select(
col("count").cast("long")
).collect()
total_duplicate_rows = (
sum(row["count"] for row in duplicate_rows_data) - duplicate_count
)

# Get sample duplicate records for error message
# Take first 10 duplicate groups and get their key values
sample_groups = duplicate_groups.limit(10).collect()

# Build sample string showing the duplicate key combinations
sample_rows = []
for row in sample_groups:
row_dict = {}
for col_name in key_columns:
row_dict[col_name] = row[col_name]
row_dict["count"] = row["count"]
sample_rows.append(str(row_dict))

sample_str = "\n".join(sample_rows)

raise FeatureStoreException(
FeatureStoreException.DUPLICATE_RECORD_ERROR_MESSAGE
+ f"\nDataset contains {total_duplicate_rows} duplicate record(s) within "
f"primary_key ({feature_group.primary_key}) and "
f"partition_key ({feature_group.partition_key}). "
f"Found {duplicate_count} duplicate group(s). "
f"Sample duplicate key combinations:\n{sample_str}"
)

def save_dataframe(
self,
feature_group,
Expand All @@ -659,17 +578,6 @@ def save_dataframe(
if self._metrics:
self._metrics.snapshot()
try:
if (
# Only `FeatureGroup class has time_travel_format property
isinstance(feature_group, fg_mod.FeatureGroup)
and feature_group.time_travel_format == "DELTA"
and storage in [None, "offline"]
):
self._check_duplicate_records(dataframe, feature_group)
_logger.debug(
"No duplicate records found. Proceeding with Delta write."
)

# ExternalFeatureGroups have no offline storage, so offline writes are skipped.
# FeatureGroups with stream=True use the same batch insert logic as non-stream
# feature groups in spark; streaming ingestion is handled by save_stream_dataframe.
Expand Down
Loading
Loading