[HWORKS-2802] Remove client-side duplicate-record check on Delta inserts#963
Open
jimdowling wants to merge 2 commits into
Open
[HWORKS-2802] Remove client-side duplicate-record check on Delta inserts#963jimdowling wants to merge 2 commits into
jimdowling wants to merge 2 commits into
Conversation
The python and Spark engines used to scan every Delta-offline insert for duplicate (primary_key, event_time, partition_key) rows and raise client-side before the write. The check added a full groupBy/aggregate pass on every insert (extra Spark action; per-batch PyArrow groupBy under the python engine) and duplicated semantics that the Delta writer itself does not enforce. Drop the check entirely; let the storage engine handle deduplication where appropriate, and let user data flow through unchanged. The removal is non-trivial across hsfs/engine/python.py and hsfs/engine/spark.py: both _check_duplicate_records helpers, both call sites in save_dataframe, the dependent FeatureStoreException.DUPLICATE_RECORD_ERROR_MESSAGE constant, and the matching test suites (call-counting tests, parametrized duplicate fixtures, and the polars-branch tests) are all removed. The Spark engine no longer references pyspark.sql.functions.count, so that import is dropped too. Signed-off-by: Jim Dowling <jim@logicalclocks.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Coverage reportClick to see where and how coverage changed
This report was generated by python-coverage-comment-action |
||||||||||||||||||||||||||||||||||||||||||
Contributor
There was a problem hiding this comment.
Pull request overview
This PR removes the client-side duplicate-record detection that previously ran on every Delta offline insert, relying instead on the underlying Delta writer behavior and eliminating an extra full-pass aggregation/groupBy per insert.
Changes:
- Removed
_check_duplicate_recordsfrom both the Python and Spark engines and deleted the corresponding call sites insave_dataframe. - Removed the
FeatureStoreException.DUPLICATE_RECORD_ERROR_MESSAGEconstant and deleted the related test suites/fixtures asserting duplicate-detection behavior. - Dropped the now-unused
pyspark.sql.functions.countimport in the Spark engine.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| python/hsfs/engine/spark.py | Removes Spark-side duplicate-record validation for Delta offline inserts and cleans up an unused import. |
| python/hsfs/engine/python.py | Removes Python-engine duplicate-record validation before Delta offline inserts. |
| python/hopsworks_common/client/exceptions.py | Removes the duplicate-record error message constant from FeatureStoreException. |
| python/tests/engine/test_spark.py | Deletes tests that asserted Spark Delta inserts invoked/failed due to duplicate-record checks. |
| python/tests/engine/test_python.py | Deletes tests covering Python-engine duplicate-record checks (including polars/pyarrow branches). |
manu-sj
approved these changes
May 22, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
_check_duplicate_recordshelper that ran on every Delta-offline insert in both the python and Spark engines. The check duplicated semantics the Delta writer doesn't enforce and added a full groupBy/aggregate pass per insert (extra Spark action; per-batch PyArrow groupBy on the python engine).save_dataframe, the dependentFeatureStoreException.DUPLICATE_RECORD_ERROR_MESSAGEconstant, and the matching test suites (call-counting tests, parametrized duplicate fixtures, polars-branch tests).pyspark.sql.functions.countimport fromengine/spark.py.Net effect: 1009 lines deleted across 5 files; user data flows through unchanged; the storage engine handles dedup where it applies.
Test plan
uv run --project python pytest python/tests/test_feature_group.py python/tests/test_feature_store.py python/tests/engine/test_python.py python/tests/engine/test_spark.py— all pass; deleted suites stop existing.ruff checkclean on touched files.🤖 Generated with Claude Code