diff --git a/python/hopsworks_common/client/exceptions.py b/python/hopsworks_common/client/exceptions.py index 03c18119f5..1a70851ba0 100644 --- a/python/hopsworks_common/client/exceptions.py +++ b/python/hopsworks_common/client/exceptions.py @@ -90,12 +90,6 @@ class UnknownSecretStorageError(Exception): class FeatureStoreException(Exception): """Generic feature store exception.""" - DUPLICATE_RECORD_ERROR_MESSAGE = ( - "Duplicate records detected: The dataset contains multiple rows that share identical values " - "across all available columns from primary_key, and if defined: event_time and partition_key. " - "Please remove or deduplicate these records before inserting." - ) - class TransformationFunctionException(Exception): """Exception raised when a transformation function fails.""" diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 830b0e90d2..850515dd41 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1016,108 +1016,6 @@ def _to_arrow_table(self, dataframe: pd.DataFrame | pl.DataFrame): f"Unsupported dataframe type for arrow conversion: {type(dataframe)}" ) - def _check_duplicate_records(self, dataset, feature_group_instance): - """Check for duplicate records within primary_key, event_time and partition_key columns. - - Raises FeatureStoreException if duplicates are found. - - Parameters: - ----------- - dataset : Union[pd.DataFrame, pl.DataFrame] - The dataset to check for duplicates - feature_group_instance : FeatureGroup - The feature group instance containing primary_key, event_time and partition_key - """ - # Get the unique key columns to check (primary_key + event_time + partition_key) - key_columns = set(feature_group_instance.primary_key) - - if not key_columns: - # No keys to check, skip validation - return - - if feature_group_instance.event_time: - key_columns.add(feature_group_instance.event_time) - - if feature_group_instance.partition_key: - key_columns.update(feature_group_instance.partition_key) - - # Materialize as a sorted list so downstream .select()/.group_by() get a - # deterministic, ordered sequence instead of a set. - key_columns = sorted(key_columns) - - # Verify all key columns exist against the original dataframe — no conversion needed. - if isinstance(dataset, pd.DataFrame) or ( - HAS_POLARS and isinstance(dataset, pl.DataFrame) - ): - available_columns = list(dataset.columns) - else: - available_columns = list(self._to_arrow_table(dataset).column_names) - - missing_columns = [col for col in key_columns if col not in available_columns] - if missing_columns: - raise FeatureStoreException( - f"Key columns {missing_columns} are missing from the dataset. " - f"Available columns: {available_columns}" - ) - - import pyarrow as pa - import pyarrow.compute as pc - - # Convert only the key columns to Arrow — avoids transcoding all feature columns - # (including costly numpy-U → UTF-8 re-encoding) for a check that only needs keys. - if isinstance(dataset, pd.DataFrame): - key_table = pa.Table.from_pandas(dataset[key_columns], preserve_index=False) - elif HAS_POLARS and isinstance(dataset, pl.DataFrame): - key_table = dataset.select(key_columns).to_arrow() - else: - key_table = self._to_arrow_table(dataset).select(key_columns) - - # Check for duplicates using PyArrow group_by - # Group by key columns and count occurrences - grouped = key_table.group_by(key_columns).aggregate( - [ - # The aggregation tuple structure: ([], function_name, FunctionOptions) - ([], "count_all", pc.CountOptions(mode="all")) - ] - ) - - # Filter groups with count > 1 (duplicates) - duplicate_groups = grouped.filter(pc.greater(grouped["count_all"], 1)) - - duplicate_count = len(duplicate_groups) - - if duplicate_count > 0: - # Get total number of duplicate rows (sum of counts - 1 for each duplicate group) - # Since count includes the first occurrence, duplicates = count - 1 per group - total_duplicate_rows = ( - sum(duplicate_groups["count_all"].to_pylist()) - duplicate_count - ) - - # Get sample duplicate records for error message - # Take first 10 duplicate groups and get their key values - sample_groups = duplicate_groups.slice(0, min(10, duplicate_count)) - - # Build sample string showing the duplicate key combinations - sample_rows = [] - for i in range(len(sample_groups)): - row_dict = {} - for col in key_columns: - row_dict[col] = sample_groups[col][i].as_py() - row_dict["count_all"] = sample_groups["count_all"][i].as_py() - sample_rows.append(str(row_dict)) - - sample_str = "\n".join(sample_rows) - - raise FeatureStoreException( - FeatureStoreException.DUPLICATE_RECORD_ERROR_MESSAGE - + f"\nDataset contains {total_duplicate_rows} duplicate record(s) within " - f"primary_key ({feature_group_instance.primary_key}), " - f"event_time ({feature_group_instance.event_time}) and " - f"partition_key ({feature_group_instance.partition_key}). " - f"Found {duplicate_count} duplicate group(s). " - f"Sample duplicate key combinations:\n{sample_str}" - ) - def _mark_online_rows( self, feature_group: FeatureGroup, @@ -1190,15 +1088,6 @@ def save_dataframe( online_write_options: dict[str, Any], validation_id: int | None = None, ) -> job.Job | None: - if ( - # Only `FeatureGroup` class has time_travel_format property - isinstance(feature_group, FeatureGroup) - and feature_group.time_travel_format == "DELTA" - and storage in [None, "offline"] - ): - self._check_duplicate_records(dataframe, feature_group) - _logger.debug("No duplicate records found. Proceeding with Delta write.") - if ( not isinstance(feature_group, fg_mod.ExternalFeatureGroup) and feature_group.stream diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 455b1b88b5..844855b47a 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -56,7 +56,6 @@ array, col, concat, - count, current_timestamp, from_json, lit, @@ -565,86 +564,6 @@ def convert_pandas_to_spark_dataframe(self, dataframe): ) return self._spark_session.createDataFrame(dataframe_copy) - def _check_duplicate_records(self, dataframe, feature_group): - """Check for duplicate records within primary_key, event_time and partition_key columns. - - Raises FeatureStoreException if duplicates are found. - - Parameters: - ----------- - dataframe : pyspark.sql.DataFrame - The Spark DataFrame to check for duplicates - feature_group : FeatureGroup - The feature group instance containing primary_key, event_time and partition_key - """ - # Get the key columns to check (primary_key + partition_key) - key_columns = list(feature_group.primary_key) - - if not key_columns: - # No keys to check, skip validation - return - - if feature_group.event_time: - key_columns.append(feature_group.event_time) - - if feature_group.partition_key: - key_columns.extend(feature_group.partition_key) - - # Verify all key columns exist in the dataset - dataframe_columns = dataframe.columns - missing_columns = [ - col_name for col_name in key_columns if col_name not in dataframe_columns - ] - if missing_columns: - raise FeatureStoreException( - f"Key columns {missing_columns} are missing from the dataset. " - f"Available columns: {dataframe_columns}" - ) - - # Check for duplicates using Spark groupBy and count - # Group by key columns and count occurrences - grouped = dataframe.groupBy(*key_columns).agg(count("*").alias("count")) - - # Filter groups with count > 1 (duplicates) - duplicate_groups = grouped.filter(col("count") > 1) - - # Count the number of duplicate groups - duplicate_count = duplicate_groups.count() - - if duplicate_count > 0: - # Get total number of duplicate rows (sum of counts - 1 for each duplicate group) - # Since count includes the first occurrence, duplicates = count - 1 per group - duplicate_rows_data = duplicate_groups.select( - col("count").cast("long") - ).collect() - total_duplicate_rows = ( - sum(row["count"] for row in duplicate_rows_data) - duplicate_count - ) - - # Get sample duplicate records for error message - # Take first 10 duplicate groups and get their key values - sample_groups = duplicate_groups.limit(10).collect() - - # Build sample string showing the duplicate key combinations - sample_rows = [] - for row in sample_groups: - row_dict = {} - for col_name in key_columns: - row_dict[col_name] = row[col_name] - row_dict["count"] = row["count"] - sample_rows.append(str(row_dict)) - - sample_str = "\n".join(sample_rows) - - raise FeatureStoreException( - FeatureStoreException.DUPLICATE_RECORD_ERROR_MESSAGE - + f"\nDataset contains {total_duplicate_rows} duplicate record(s) within " - f"primary_key ({feature_group.primary_key}) and " - f"partition_key ({feature_group.partition_key}). " - f"Found {duplicate_count} duplicate group(s). " - f"Sample duplicate key combinations:\n{sample_str}" - ) - def save_dataframe( self, feature_group, @@ -659,17 +578,6 @@ def save_dataframe( if self._metrics: self._metrics.snapshot() try: - if ( - # Only `FeatureGroup class has time_travel_format property - isinstance(feature_group, fg_mod.FeatureGroup) - and feature_group.time_travel_format == "DELTA" - and storage in [None, "offline"] - ): - self._check_duplicate_records(dataframe, feature_group) - _logger.debug( - "No duplicate records found. Proceeding with Delta write." - ) - # ExternalFeatureGroups have no offline storage, so offline writes are skipped. # FeatureGroups with stream=True use the same batch insert logic as non-stream # feature groups in spark; streaming ingestion is handled by save_stream_dataframe. diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index f0004ae199..143d9e0820 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -2226,526 +2226,6 @@ def test_save_dataframe_delta_time_travel_format(self, mocker): test_dataframe, write_options={}, validation_id=None, operation="insert" ) - def test_save_dataframe_delta_calls_check_duplicate_records(self, mocker): - # Arrange - mock_check_duplicate_records = mocker.patch( - "hsfs.engine.python.Engine._check_duplicate_records" - ) - mock_delta_engine = mocker.patch("hsfs.core.delta_engine.DeltaEngine") - mocker.patch("hsfs.engine.get_type", return_value="python") - - python_engine = python.Engine() - - fg = feature_group.FeatureGroup( - name="test", - version=1, - featurestore_id=99, - primary_key=["pk1"], - partition_key=[], - id=99, - stream=False, - time_travel_format="DELTA", - ) - - test_dataframe = pd.DataFrame({"pk1": [1, 2, 3], "col2": [4, 5, 6]}) - - # Act - python_engine.save_dataframe( - feature_group=fg, - dataframe=test_dataframe, - operation="insert", - online_enabled=False, - storage="offline", - offline_write_options={}, - online_write_options={}, - validation_id=None, - ) - - # Assert - assert mock_check_duplicate_records.call_count == 1 - mock_check_duplicate_records.assert_called_once_with(test_dataframe, fg) - assert mock_delta_engine.call_count == 1 - - def test_save_dataframe_non_delta_does_not_call_check_duplicate_records( - self, mocker - ): - # Arrange - mock_check_duplicate_records = mocker.patch( - "hsfs.engine.python.Engine._check_duplicate_records" - ) - mock_legacy_save_dataframe = mocker.patch( - "hsfs.engine.python.Engine.legacy_save_dataframe" - ) - mocker.patch("hsfs.engine.get_type", return_value="python") - - python_engine = python.Engine() - - fg = feature_group.FeatureGroup( - name="test", - version=1, - featurestore_id=99, - primary_key=["pk1"], - partition_key=[], - id=10, - stream=False, - time_travel_format="HUDI", - ) - - test_dataframe = pd.DataFrame({"pk1": [1, 2, 3], "col2": [4, 5, 6]}) - - # Act - python_engine.save_dataframe( - feature_group=fg, - dataframe=test_dataframe, - operation="insert", - online_enabled=False, - storage="offline", - offline_write_options={}, - online_write_options={}, - validation_id=None, - ) - - # Assert - assert mock_check_duplicate_records.call_count == 0 - assert mock_legacy_save_dataframe.call_count == 1 - - @pytest.mark.parametrize( - "test_name,primary_key,partition_key,event_time,data_dict", - [ - ( - "duplicate_primary_key", - ["id"], - [], - None, - {"id": [1, 1, 2], "text": ["a", "a_dup", "b"]}, - ), - ( - "duplicate_primary_key_partition", - ["id"], - ["p"], - None, - {"id": [1, 1, 2], "p": [0, 0, 0], "text": ["a_p0", "a_p0_dup", "b_p0"]}, - ), - ( - "duplicate_primary_key_event_time", - ["id"], - [], - "event_time", - { - "id": [1, 1, 2], - "event_time": [ - pd.Timestamp("2024-01-01"), - pd.Timestamp("2024-01-01"), - pd.Timestamp("2024-01-02"), - ], - "text": ["a_t1", "a_t1_dup", "b_t2"], - }, - ), - ( - "duplicate_event_time_in_primary_key", - ["id", "event_time"], - [], - "event_time", - { - "id": [1, 1, 2], - "event_time": [ - pd.Timestamp("2024-01-01"), - pd.Timestamp("2024-01-01"), - pd.Timestamp("2024-01-02"), - ], - "text": ["a_t1", "a_t1_dup", "b_t2"], - }, - ), - ( - "duplicate_partition_key_in_primary_key", - ["id", "p"], - ["p"], - None, - {"id": [1, 1, 2], "p": [0, 0, 0], "text": ["a", "a_dup", "b"]}, - ), - ( - "duplicate_all_overlapping", - ["id", "event_time", "p"], - ["p"], - "event_time", - { - "id": [1, 1, 2], - "event_time": [ - pd.Timestamp("2024-01-01"), - pd.Timestamp("2024-01-01"), - pd.Timestamp("2024-01-02"), - ], - "p": [0, 0, 0], - "text": ["a", "a_dup", "b"], - }, - ), - ], - ) - def test_save_dataframe_delta_duplicate_should_fail( - self, mocker, test_name, primary_key, partition_key, event_time, data_dict - ): - # Arrange - mocker.patch("hsfs.core.delta_engine.DeltaEngine") - mocker.patch("hsfs.engine.get_type", return_value="python") - mocker.patch( - "hsfs.feature_group.FeatureGroup._has_deltalake", return_value=True - ) - mocker.patch( - "hsfs.engine.python.Engine.convert_to_default_dataframe", - side_effect=lambda x: x, - ) - mocker.patch( - "hsfs.core.feature_group_engine.FeatureGroupEngine.save_feature_group_metadata" - ) - mocker.patch( - "hsfs.core.feature_group_engine.FeatureGroupEngine._verify_schema_compatibility" - ) - mocker.patch("hsfs.core.great_expectation_engine.GreatExpectationEngine") - mocker.patch("hsfs.engine.python.Engine._write_dataframe_kafka") - - python_engine = python.Engine() - - fg = feature_group.FeatureGroup( - name=f"dl_dup_{test_name}", - version=1, - featurestore_id=99, - primary_key=primary_key, - partition_key=partition_key, - event_time=event_time, - stream=False, - time_travel_format="DELTA", - ) - - df = pd.DataFrame(data_dict) - - # Act & Assert - with pytest.raises(exceptions.FeatureStoreException) as exc_info: - python_engine.save_dataframe( - feature_group=fg, - dataframe=df, - operation="insert", - online_enabled=True, - storage="offline", - offline_write_options={}, - online_write_options={}, - validation_id=None, - ) - - assert exceptions.FeatureStoreException.DUPLICATE_RECORD_ERROR_MESSAGE in str( - exc_info.value - ) - - @pytest.mark.parametrize( - "test_name,primary_key,partition_key,event_time,data_dict", - [ - ( - "pk_partition_across", - ["id"], - ["p"], - None, - {"id": [1, 1, 2], "p": [0, 1, 0], "text": ["a_p0", "a_p1", "b_p0"]}, - ), - ( - "pk_event_time_across", - ["id"], - [], - "event_time", - { - "id": [1, 1, 2], - "event_time": [ - pd.Timestamp("2024-01-01"), - pd.Timestamp("2024-01-02"), - pd.Timestamp("2024-01-01"), - ], - "text": ["a_t1", "a_t2", "b_t1"], - }, - ), - ( - "pk_with_no_duplicate", - ["id"], - [], - None, - {"id": [1, 2, 3], "text": ["a", "b", "c"]}, - ), - ( - "no_pk_partition_only", - [], - ["p"], - None, - {"id": [1, 1, 2], "p": [0, 1, 0], "text": ["a_p0", "a_p1", "b_p0"]}, - ), - ( - "no_pk_event_time_only", - [], - [], - "event_time", - { - "id": [1, 1, 2], - "event_time": [ - pd.Timestamp("2024-01-01"), - pd.Timestamp("2024-01-02"), - pd.Timestamp("2024-01-01"), - ], - "text": ["a_t1", "a_t2", "b_t1"], - }, - ), - ( - "no_pk", - [], - [], - None, - {"id": [1, 1, 2], "text": ["a", "a_dup", "b"]}, - ), - ( - "event_time_in_primary_key_no_duplicate", - ["id", "event_time"], - [], - "event_time", - { - "id": [1, 1, 2], - "event_time": [ - pd.Timestamp("2024-01-01"), - pd.Timestamp("2024-01-02"), - pd.Timestamp("2024-01-01"), - ], - "text": ["a_t1", "a_t2", "b_t1"], - }, - ), - ( - "partition_key_in_primary_key_no_duplicate", - ["id", "p"], - ["p"], - None, - {"id": [1, 1, 2], "p": [0, 1, 0], "text": ["a_p0", "a_p1", "b_p0"]}, - ), - ( - "all_overlapping_no_duplicate", - ["id", "event_time", "p"], - ["p"], - "event_time", - { - "id": [1, 1, 2], - "event_time": [ - pd.Timestamp("2024-01-01"), - pd.Timestamp("2024-01-02"), - pd.Timestamp("2024-01-01"), - ], - "p": [0, 0, 0], - "text": ["a", "b", "c"], - }, - ), - ], - ) - def test_save_dataframe_delta_duplicate_should_succeed( - self, mocker, test_name, primary_key, partition_key, event_time, data_dict - ): - # Arrange - mocker.patch("hsfs.core.delta_engine.DeltaEngine") - mocker.patch("hsfs.engine.get_type", return_value="python") - mocker.patch( - "hsfs.feature_group.FeatureGroup._has_deltalake", return_value=True - ) - mocker.patch( - "hsfs.engine.python.Engine.convert_to_default_dataframe", - side_effect=lambda x: x, - ) - mocker.patch( - "hsfs.core.feature_group_engine.FeatureGroupEngine.save_feature_group_metadata" - ) - mocker.patch( - "hsfs.core.feature_group_engine.FeatureGroupEngine._verify_schema_compatibility" - ) - mocker.patch("hsfs.core.great_expectation_engine.GreatExpectationEngine") - mocker.patch("hsfs.engine.python.Engine._write_dataframe_kafka") - - python_engine = python.Engine() - - fg = feature_group.FeatureGroup( - name=f"dl_dup_{test_name}", - version=1, - featurestore_id=99, - primary_key=primary_key, - partition_key=partition_key, - event_time=event_time, - stream=False, - time_travel_format="DELTA", - ) - - df = pd.DataFrame(data_dict) - - # Act - should not raise exception - python_engine.save_dataframe( - feature_group=fg, - dataframe=df, - operation="insert", - online_enabled=True, - storage="offline", - offline_write_options={}, - online_write_options={}, - validation_id=None, - ) - - # Assert - no exception should be raised - - def test_check_duplicate_records_converts_only_key_columns(self, mocker): - # Arrange - DataFrame with many feature columns; only key columns should - # be passed to pa.Table.from_pandas for the duplicate check. - # - # pa.Table is a C extension type whose attributes are immutable, so we - # cannot patch from_pandas directly. Instead we inject a fake pyarrow - # module (via sys.modules) whose Table.from_pandas records its arguments - # and delegates to the real implementation. - import sys - import types - - import pyarrow as real_pa - - captured_columns = [] - - def _spy_from_pandas(df, **kwargs): - captured_columns.append(list(df.columns)) - return real_pa.Table.from_pandas(df, **kwargs) - - class _FakeTable: - from_pandas = staticmethod(_spy_from_pandas) - - class _FakePa(types.ModuleType): - Table = _FakeTable - - def __getattr__(self, name): - return getattr(real_pa, name) - - mocker.patch.dict(sys.modules, {"pyarrow": _FakePa("pyarrow")}) - mocker.patch("hsfs.engine.get_type", return_value="python") - mocker.patch( - "hsfs.feature_group.FeatureGroup._has_deltalake", return_value=True - ) - - python_engine = python.Engine() - - fg = feature_group.FeatureGroup( - name="purchases", - version=1, - featurestore_id=99, - primary_key=["purchase_id"], - partition_key=["purchase_month"], - event_time="ts", - stream=False, - time_travel_format="DELTA", - ) - - df = pd.DataFrame( - { - "purchase_id": [1, 2, 3], - "ts": pd.to_datetime(["2024-01-01", "2024-01-02", "2024-01-03"]), - "purchase_month": ["2024-01", "2024-01", "2024-01"], - "amount": [9.99, 19.99, 4.99], - "payment_method": ["card", "cash", "card"], - "device_type": ["mobile", "desktop", "mobile"], - "description": ["item_a", "item_b", "item_c"], - } - ) - - # Act - python_engine._check_duplicate_records(df, fg) - - # Assert - from_pandas called exactly once, with only the 3 key columns - assert len(captured_columns) == 1 - assert set(captured_columns[0]) == { - "purchase_id", - "ts", - "purchase_month", - } # Test using a set because order does not matter for duplicate check - - @pytest.mark.skipif( - not HAS_POLARS, - reason="Polars is not installed.", - ) - def test_check_duplicate_records_polars_no_duplicates(self, mocker): - # Arrange - polars branch uses df.select(...).to_arrow() to extract keys. - mocker.patch("hsfs.engine.get_type", return_value="python") - python_engine = python.Engine() - fg = feature_group.FeatureGroup( - name="purchases", - version=1, - featurestore_id=99, - primary_key=["purchase_id"], - partition_key=[], - event_time="ts", - time_travel_format=None, - ) - df = pl.DataFrame( - { - "purchase_id": [1, 2, 3], - "ts": [ - datetime(2024, 1, 1), - datetime(2024, 1, 2), - datetime(2024, 1, 3), - ], - "amount": [9.99, 19.99, 4.99], - } - ) - - # Act + Assert - no exception means no duplicates were found - python_engine._check_duplicate_records(df, fg) - - @pytest.mark.skipif( - not HAS_POLARS, - reason="Polars is not installed.", - ) - def test_check_duplicate_records_polars_finds_duplicates(self, mocker): - # Arrange - same primary_key + event_time row appears twice. - mocker.patch("hsfs.engine.get_type", return_value="python") - python_engine = python.Engine() - fg = feature_group.FeatureGroup( - name="purchases", - version=1, - featurestore_id=99, - primary_key=["purchase_id"], - partition_key=[], - event_time="ts", - time_travel_format=None, - ) - df = pl.DataFrame( - { - "purchase_id": [1, 1, 2], - "ts": [ - datetime(2024, 1, 1), - datetime(2024, 1, 1), - datetime(2024, 1, 2), - ], - "amount": [9.99, 9.99, 4.99], - } - ) - - # Act + Assert - with pytest.raises(exceptions.FeatureStoreException) as exc_info: - python_engine._check_duplicate_records(df, fg) - assert "duplicate" in str(exc_info.value).lower() - - @pytest.mark.skipif( - not HAS_POLARS, - reason="Polars is not installed.", - ) - def test_check_duplicate_records_polars_missing_key_column_raises(self, mocker): - # Arrange - the polars branch must still validate that key columns exist. - mocker.patch("hsfs.engine.get_type", return_value="python") - python_engine = python.Engine() - fg = feature_group.FeatureGroup( - name="purchases", - version=1, - featurestore_id=99, - primary_key=["purchase_id"], - partition_key=[], - time_travel_format=None, - ) - df = pl.DataFrame({"amount": [9.99, 19.99]}) - - # Act + Assert - with pytest.raises(exceptions.FeatureStoreException) as exc_info: - python_engine._check_duplicate_records(df, fg) - assert "purchase_id" in str(exc_info.value) - def test_to_arrow_table_pandas(self): # Arrange python_engine = python.Engine() diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index b5820f7161..ed6941a0ff 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -1213,286 +1213,6 @@ def test_save_dataframe_external_fg( == expected_offline_calls ) - def test_save_dataframe_delta_calls_check_duplicate_records(self, mocker): - # Arrange - mock_check_duplicate_records = mocker.patch( - "hsfs.engine.spark.Engine._check_duplicate_records" - ) - mock_spark_engine_save_offline_dataframe = mocker.patch( - "hsfs.engine.spark.Engine._save_offline_dataframe" - ) - - spark_engine = spark.Engine() - - fg = feature_group.FeatureGroup( - name="test", - version=1, - featurestore_id=99, - primary_key=["pk1"], - partition_key=[], - id=10, - time_travel_format="DELTA", - ) - - mock_dataframe = mocker.Mock(spec=DataFrame) - - # Act - spark_engine.save_dataframe( - feature_group=fg, - dataframe=mock_dataframe, - operation="insert", - online_enabled=False, - storage="offline", - offline_write_options=None, - online_write_options=None, - validation_id=None, - ) - - # Assert - assert mock_check_duplicate_records.call_count == 1 - mock_check_duplicate_records.assert_called_once_with(mock_dataframe, fg) - assert mock_spark_engine_save_offline_dataframe.call_count == 1 - - def test_save_dataframe_non_delta_does_not_call_check_duplicate_records( - self, mocker - ): - # Arrange - mock_check_duplicate_records = mocker.patch( - "hsfs.engine.spark.Engine._check_duplicate_records" - ) - mock_spark_engine_save_offline_dataframe = mocker.patch( - "hsfs.engine.spark.Engine._save_offline_dataframe" - ) - - spark_engine = spark.Engine() - - fg = feature_group.FeatureGroup( - name="test", - version=1, - featurestore_id=99, - primary_key=["pk1"], - partition_key=[], - id=10, - time_travel_format="HUDI", - ) - - mock_dataframe = mocker.Mock(spec=DataFrame) - - # Act - spark_engine.save_dataframe( - feature_group=fg, - dataframe=mock_dataframe, - operation="insert", - online_enabled=False, - storage="offline", - offline_write_options=None, - online_write_options=None, - validation_id=None, - ) - - # Assert - assert mock_check_duplicate_records.call_count == 0 - assert mock_spark_engine_save_offline_dataframe.call_count == 1 - - @pytest.mark.parametrize( - "test_name,primary_key,partition_key,event_time,data", - [ - ( - "duplicate_primary_key", - ["id"], - [], - None, - [ - {"id": 1, "text": "a"}, - {"id": 1, "text": "a_dup"}, - {"id": 2, "text": "b"}, - ], - ), - ( - "duplicate_primary_key_partition", - ["id"], - ["p"], - None, - [ - {"id": 1, "p": 0, "text": "a_p0"}, - {"id": 1, "p": 0, "text": "a_p0_dup"}, - {"id": 2, "p": 0, "text": "b_p0"}, - ], - ), - ( - "duplicate_primary_key_event_time", - ["id"], - [], - "event_time", - [ - {"id": 1, "event_time": "2024-01-01", "text": "a_t1"}, - {"id": 1, "event_time": "2024-01-01", "text": "a_t1_dup"}, - {"id": 2, "event_time": "2024-01-02", "text": "b_t2"}, - ], - ), - ], - ) - def test_save_dataframe_delta_duplicate_should_fail( - self, mocker, test_name, primary_key, partition_key, event_time, data - ): - # Arrange - from datetime import datetime - - mocker.patch("hsfs.engine.get_type", return_value="spark") - mocker.patch( - "hsfs.feature_group.FeatureGroup._has_deltalake", return_value=True - ) - - spark_engine = spark.Engine() - - fg = feature_group.FeatureGroup( - name=f"dl_dup_{test_name}", - version=1, - featurestore_id=99, - primary_key=primary_key, - partition_key=partition_key, - event_time=event_time, - time_travel_format="DELTA", - ) - - # Convert event_time strings to datetime if needed - if event_time and any(isinstance(row.get(event_time), str) for row in data): - for row in data: - if event_time in row and isinstance(row[event_time], str): - row[event_time] = datetime.fromisoformat(row[event_time]) - - df = spark_engine._spark_session.createDataFrame(data) - - # Act & Assert - with pytest.raises(exceptions.FeatureStoreException) as exc_info: - spark_engine.save_dataframe( - feature_group=fg, - dataframe=df, - operation="insert", - online_enabled=True, - storage="offline", - offline_write_options={}, - online_write_options={}, - validation_id=None, - ) - - assert exceptions.FeatureStoreException.DUPLICATE_RECORD_ERROR_MESSAGE in str( - exc_info.value - ) - - @pytest.mark.parametrize( - "test_name,primary_key,partition_key,event_time,data_factory", - [ - ( - "pk_partition_across", - ["id"], - ["p"], - None, - lambda dt: [ - {"id": 1, "p": 0, "text": "a_p0"}, - {"id": 1, "p": 1, "text": "a_p1"}, - {"id": 2, "p": 0, "text": "b_p0"}, - ], - ), - ( - "pk_event_time_across", - ["id"], - [], - "event_time", - lambda dt: [ - {"id": 1, "event_time": dt.datetime(2024, 1, 1), "text": "a_t1"}, - {"id": 1, "event_time": dt.datetime(2024, 1, 2), "text": "a_t2"}, - {"id": 2, "event_time": dt.datetime(2024, 1, 1), "text": "b_t1"}, - ], - ), - ( - "pk_with_no_duplicate", - ["id"], - [], - None, - lambda dt: [ - {"id": 1, "text": "a"}, - {"id": 2, "text": "b"}, - {"id": 3, "text": "c"}, - ], - ), - ( - "no_pk_partition_only", - [], - ["p"], - None, - lambda dt: [ - {"id": 1, "p": 0, "text": "a_p0"}, - {"id": 1, "p": 1, "text": "a_p1"}, - {"id": 2, "p": 0, "text": "b_p0"}, - ], - ), - ( - "no_pk_event_time_only", - [], - [], - "event_time", - lambda dt: [ - {"id": 1, "event_time": dt.datetime(2024, 1, 1), "text": "a_t1"}, - {"id": 1, "event_time": dt.datetime(2024, 1, 2), "text": "a_t2"}, - {"id": 2, "event_time": dt.datetime(2024, 1, 1), "text": "b_t1"}, - ], - ), - ( - "no_pk", - [], - [], - None, - lambda dt: [ - {"id": 1, "text": "a"}, - {"id": 1, "text": "a_dup"}, - {"id": 2, "text": "b"}, - ], - ), - ], - ) - def test_save_dataframe_delta_duplicate_should_succeed( - self, mocker, test_name, primary_key, partition_key, event_time, data_factory - ): - # Arrange - mocker.patch("hsfs.engine.get_type", return_value="spark") - mocker.patch( - "hsfs.feature_group.FeatureGroup._has_deltalake", return_value=True - ) - mock_spark_engine_save_offline_dataframe = mocker.patch( - "hsfs.engine.spark.Engine._save_offline_dataframe" - ) - - spark_engine = spark.Engine() - - fg = feature_group.FeatureGroup( - name=f"dl_dup_{test_name}", - version=1, - featurestore_id=99, - primary_key=primary_key, - partition_key=partition_key, - event_time=event_time, - time_travel_format="DELTA", - ) - - data = data_factory(datetime) - df = spark_engine._spark_session.createDataFrame(data) - - # Act - should not raise exception - spark_engine.save_dataframe( - feature_group=fg, - dataframe=df, - operation="insert", - online_enabled=True, - storage="offline", - offline_write_options={}, - online_write_options={}, - validation_id=None, - ) - - # Assert - no exception should be raised, and save should be called - assert mock_spark_engine_save_offline_dataframe.call_count == 1 - def test_save_stream_dataframe(self, mocker, backend_fixtures): # Arrange mock_common_client_get_instance = mocker.patch(