[HWORKS-2802 / -2807] partitioned_by — Python passthrough + Delta CREATE TABLE op + Hudi Transformer + predicate translator#961
Draft
jimdowling wants to merge 6 commits into
Draft
Conversation
3 tasks
Coverage reportClick to see where and how coverage changed
This report was generated by python-coverage-comment-action |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Contributor
Author
|
Architectural redesign in flight. After review feedback, partitioned_by is being re-scoped from a Python-only client-side derivation to a backend-native concept using Delta GENERATED columns / Hudi CustomKeyGenerator, with the partition columns marked offline-only by default. Current commits will be torn out and replaced. Re-flagging as draft until the new design lands. See HWORKS-2802 for the updated spec. |
… client (passthrough) https://hopsworks.atlassian.net/browse/HWORKS-2802 Add the two new feature group creation parameters to the Python client. The client validates basic shape (enum membership, no duplicates, mutual exclusion with partition_key, requires event_time, no event_time/grain name collision) for fast-fail UX, then sends the values to the backend via the REST payload. The storage engine on the backend handles the actual partition column derivation; the client never adds grain columns to the inserted dataframe. FeatureGroup gains partitioned_by and online_partition_columns parameters on __init__, threaded through FeatureStore.create_feature_group and get_or_create_feature_group. Two new read-only properties expose the persisted values. Feature gains an offline_only attribute that round-trips through the DTO; the backend sets it on synthetic grain features when partitioned_by is configured. to_dict serializes both new feature group fields into the REST payload (omitting partitionedBy when None so the backend treats it as unset). This is a passthrough commit: the backend persists the new fields (landed separately as the hopsworks-ee scaffolding commit), but the storage activation (Delta GENERATED ALWAYS AS, synthetic feature creation, Hudi key generator config, offline-only RonDB filter) lands in follow-up commits. Until that work merges, partitioned_by round-trips through create/read but does not yet produce a partitioned table. Signed-off-by: Jim Dowling <jim@logicalclocks.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ned_by activation https://hopsworks.atlassian.net/browse/HWORKS-2802 Add a new Spark SQL op driven by the Hopsworks EE backend's fsJobManagerController.setupCreateDeltaTableJob. When a feature group is created with time_travel_format=DELTA and partitioned_by set, the backend launches this PySpark job to materialise the offline table with Delta GENERATED ALWAYS AS partition columns — the only place where the generation expression can be attached, since Delta forbids adding it via ALTER on an existing column. The job assembles a `CREATE TABLE … USING DELTA … GENERATED ALWAYS AS (…) … PARTITIONED BY (…)` statement from the JSON config the backend writes to HopsFS and executes it via Spark, which registers the table in the Hive metastore and initialises the Delta `_delta_log/` with the generation expressions in one operation. Grain → Spark SQL function map: year/month/week/day/hour → YEAR/MONTH/WEEKOFYEAR/DAYOFMONTH/HOUR. Integer event_time wraps in a CAST(CASE WHEN abs(event_time) <= 9_999_999_999 THEN event_time ELSE event_time / 1000 END AS TIMESTAMP) so the seconds-vs-ms rule already used by the Python client and convert_event_time_to_timestamp holds on every row, including columns with mixed units. source_columns and partitioned_by arrive as JSON strings because the backend's job-configuration map is Map<String, String>; the handler decodes them on read. Signed-off-by: Jim Dowling <jim@logicalclocks.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
3 tasks
https://hopsworks.atlassian.net/browse/HWORKS-2807 A custom org.apache.hudi.utilities.transform.Transformer that materialises the partitioned_by grain columns (year / month / week / day / hour) into Hudi records before write, derived from the feature group's event_time. Packaged in the hsfs-utils jar (jar-with-dependencies) so it's on the classpath when the materialization-job Spark image starts. The hudi-utilities-slim-bundle that ships with the spark-feature-pipeline image provides the Transformer interface at runtime; the pom adds hudi-utilities_2.12 1.0.2.1 as a `provided` dependency for compile-time only. Reads configuration from TypedProperties: hoodie.deltastreamer.transformer.partitionedby.eventtime hoodie.deltastreamer.transformer.partitionedby.grains (csv) (Both set by FsJobManagerController.injectHudiPartitionedByOptions in hopsworks-ee — landed in commit b3ede9443 of HWORKS-2802 branch.) The grain → Spark SQL function mapping matches hsfs_utils.create_delta_table_fg exactly (YEAR / MONTH / WEEKOFYEAR / DAYOFMONTH / HOUR), so a feature group on Hudi and a feature group on Delta with the same partitioned_by spec produce identical grain values for the same event_time input. Integer event_time columns use the same per-row seconds-vs-ms CASE WHEN that the Delta CREATE TABLE statement uses (≤ 9999999999 in absolute magnitude → seconds, otherwise milliseconds), so columns with mixed units convert correctly on both formats. Timestamp and date columns pass through unchanged. Signed-off-by: Jim Dowling <jim@logicalclocks.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
https://hopsworks.atlassian.net/browse/HWORKS-2807 Close the read-pruning asymmetry between Spark+Hudi (which needs derived→event_time translation) and Trino+Delta + Trino+Hudi (which need event_time→derived translation) with a single symmetric translator that runs on Query.read() just before SQL generation. augment_filter(filter, fg, engine_type) returns the input AND'd with an equivalent partition predicate; the original filter is kept so the row-level filter still produces correct results if pruning is off. Translation dispatch: - Spark + Delta: no translation (Delta auto-derives via GENERATED). - Spark + Hudi: derived → event_time. fg.filter(fg.year == 2026) → event_time range [2026-01-01, 2027-01-01). Hierarchical-prefix equality combos (year, year+month, year+month+day, year+month+day+hour) yield narrower ranges; non-prefix predicates fall back to row-level only. - Python engine (Trino) + Delta or Hudi: event_time → derived. fg.read(start_time=ts1, end_time=ts2) adds year >= start_year AND year <= end_year predicates that Trino can partition-prune on. Safety rails: - partitioned_by must be a strict left-prefix of (year, month, day, hour). Non-hierarchical specs like ["month"] alone or ["year","week"] skip translation rather than producing bounds that could be incorrect. - OR'd filters short-circuit (translator can't safely tighten branched expressions). - Original predicate always retained; the augmentation is purely additive. - Online reads (online=True) skip translation entirely — RonDB has no partition concept. Tests cover both directions, the no-op paths (Spark+Delta, non-hierarchical, missing partitioned_by, missing event_time, OR short-circuit), and the edge cases (December rollover for year+month, month-alone falls back). Signed-off-by: Jim Dowling <jim@logicalclocks.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…n columns end-to-end https://hopsworks.atlassian.net/browse/HWORKS-2802 Four review findings on the Python client side of the partitioned_by work, all rooted in the same gap: although the backend appends synthetic grain features to the FG schema, the client was still treating them like user-supplied data and demanding them in the inserted dataframe. Wait on the backend's create_delta_table job inline in save_feature_group_metadata so the first insert sees a ready Delta table. Skip the client-side save_empty_table call when partitioned_by is set on Delta, since the backend Spark CREATE TABLE job is the authoritative path (it must run because GENERATED ALWAYS AS expressions cannot be added via ALTER). Expose the job via a new fg.create_delta_table_job property for introspection. Skip the storage-engine-generated grain columns when populating fg.partition_key from the backend feature list. All client write paths that fan out from partition_key — _check_duplicate_records, the delta-rs partition_by argument, _get_partition_values for DataFusion partition pruning, the Hudi SIMPLE-partition path — now ignore them automatically. The grain columns are still visible through fg.partitioned_by and fg.columns. Filter the same grain columns out of the schema-verification call sites so an insert that omits them passes the check. Add a _columns_for_user_schema helper that drops feat.name in partitioned_by from the list the verifier sees. Wire the Python Hudi direct-write path to use CustomKeyGenerator and TIMESTAMP_DATE_BASED on event_time when partitioned_by is set, with a Hive-style output.dateformat that mirrors what FsJobManagerController.injectHudiPartitionedByOptions emits for DeltaStreamer jobs. Without this the writer would have used the existing year:SIMPLE,month:SIMPLE partition path, which requires the grain columns to be present in the dataset. Signed-off-by: Jim Dowling <jim@logicalclocks.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ature + spark-sql dep https://hopsworks.atlassian.net/browse/HWORKS-2802 Three issues blocked the hsfs-utils JAR from building locally, all from when PartitionedByTransformer was introduced. The pom referenced org.apache.hudi:hudi-utilities_2.12:1.0.2.1, which exists in neither Maven Central nor the Hops Nexus. Downgrade to 1.0.2 (present on Maven Central) — the Transformer interface bytecode is stable across the 1.0.x line so the bundled runtime version still matches. Hudi 1.0.x's Transformer.apply contract returns Dataset<Row> directly; the prior implementation returned Option<Dataset<Row>>, which only matches the older 0.x signature. Drop the Option wrapper and the hudi-common-util Option import. Maven `provided` scope is not transitive, so the spark-sql classes that hsfs-spark-spark3.5 ships with as `provided` were not on the compile classpath of this module. Add an explicit spark-sql_2.12 3.5.5 dependency at `provided` scope so Dataset / Row / Column / SparkSession / functions resolve at compile time without affecting the bundled jar — they continue to come from the Spark image at runtime. JAR builds cleanly now (target/hsfs-utils-5.0.0-SNAPSHOT.jar). The JAR still needs to land in the Spark image at /srv/hops/artifacts/ (per the helm chart's spark_hops_utils_dir) for the PartitionedByTransformer to be visible to materialization-job DeltaStreamer; that's a docker-images rebuild + helm upgrade, not covered by the workspace's deploy_backend.sh / deploy_frontend.sh. Signed-off-by: Jim Dowling <jim@logicalclocks.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Python client side of
partitioned_byend-to-end. Covers:Pairs with the backend PR hopsworks-ee#3034. Full engineering walkthrough: Confluence page.
What changes
Python passthrough (
09e06437)partitioned_by: list[str] | None = Noneparameter onFeatureGroup.__init__,FeatureStore.create_feature_group,FeatureStore.get_or_create_feature_group. Light pre-REST validation (enum / non-empty / no-dup / mutual exclusion / requires event_time / no collision) for fast-fail UX; the authoritative validator lives on the backend.online_partition_columns: bool = Falseon the same surfaces.FeatureGroup.partitioned_byandFeatureGroup.online_partition_columnsread-only properties.Feature.offline_onlyattribute, round-trips through the REST DTO.Delta CREATE TABLE op handler (
e4a47c1a)utils/python/hsfs_utils.pygains acreate_delta_table_fgPySpark op handler that the backend launches viasetupCreateDeltaTableJob. Assembles theCREATE TABLE … USING DELTA … GENERATED ALWAYS AS (<grainfn>(event_time)) … PARTITIONED BY (…)SQL from the JSON config. Integerevent_timewraps inCAST(CASE WHEN abs(et) <= 9999999999 THEN et ELSE et / 1000 END AS TIMESTAMP)so the seconds-vs-ms rule holds per row.Hudi PartitionedByTransformer (
09d168b8)utils/java/.../PartitionedByTransformer.java— Hudiorg.apache.hudi.utilities.transform.Transformerthat materialises grain columns as INT columns in records before write. Plugs into the HudiDeltaStreamermaterialization job viahoodie.deltastreamer.transformer.class(registered server-side by hopsworks-ee#3034). Uses the same grain functions (YEAR / MONTH / WEEKOFYEAR / DAYOFMONTH / HOUR) as the Delta CREATE TABLE so reads see identical schemas on either format. Same seconds-vs-ms CASE WHEN as the Delta path. Hudi 1.0.2.1 added as aprovideddep — the runtime jar ships in the materialization-job Spark image already.Cross-engine predicate translator (
1a097675)hsfs/constructor/partitioned_by_translator.py— symmetric translator that adds equivalent partition predicates so every (engine, format) combination prunes. Hooks intoQuery.read()just before SQL generation. Original filter always retained (defence in depth).Safety rails:
["month"]alone skip translation.online=True) skip translation — RonDB has no partition concept.Test plan
python/tests/test_feature_group.py::TestFeatureGroupPartitionedBy— 10 constructor + property tests.python/tests/constructor/test_partitioned_by_translator.py— 12 cases covering both translation directions + no-op paths + edge cases (December rollover, month-without-year fallback, OR short-circuit).ruff checkclean on touched files.test_feature_group.py+test_feature_store.py+test_feature.py+tests/constructor/(107 + 71 pass).🤖 Generated with Claude Code