Skip to content

[FSTORE-1938] Support chaining of Transformation Functions using a DAG#785

Draft
manu-sj wants to merge 10 commits into
logicalclocks:mainfrom
manu-sj:FSTORE-1938
Draft

[FSTORE-1938] Support chaining of Transformation Functions using a DAG#785
manu-sj wants to merge 10 commits into
logicalclocks:mainfrom
manu-sj:FSTORE-1938

Conversation

@manu-sj
Copy link
Copy Markdown
Contributor

@manu-sj manu-sj commented Dec 29, 2025

This PR adds/fixes/changes...

  • please summarize your changes to the code
  • and make sure to include all changes to user-facing APIs

JIRA Issue: https://hopsworks.atlassian.net/browse/FSTORE-1938

Priority for Review: -

Related PRs: -

How Has This Been Tested?

  • Unit Tests
  • Integration Tests
  • Manual Tests on VM

Checklist For The Assigned Reviewer:

- [ ] Checked if merge conflicts with master exist
- [ ] Checked if stylechecks for Java and Python pass
- [ ] Checked if all docstrings were added and/or updated appropriately
- [ ] Ran spellcheck on docstring
- [ ] Checked if guides & concepts need to be updated
- [ ] Checked if naming conventions for parameters and variables were followed
- [ ] Checked if private methods are properly declared and used
- [ ] Checked if hard-to-understand areas of code are commented
- [ ] Checked if tests are effective
- [ ] Built and deployed changes on dev VM and tested manually
- [x] (Checked if all type annotations were added and/or updated appropriately)

manu-sj added 3 commits May 18, 2026 19:33
…ecution DAG

Allow Transformation Functions to be chained — a TF's output column can serve
as another TF's input. Applies to both On-Demand Transformations (ODTs, attached
to Feature Groups) and Model-Dependent Transformations (MDTs, attached to Feature
Views). Execution order is resolved by a topological-sort DAG.

- New TransformationExecutionDAG class for topological scheduling.
- Process-pool parallel execution path with shared-memory Arrow IPC for
  DataFrame inputs and pickle IPC for dict inputs.
- Engine-aware (pandas, polars, Spark).
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 18, 2026

Coverage report

Click to see where and how coverage changed

FileStatementsMissingCoverageCoverage
(new stmts)
Lines missing
  python/hopsworks_common/client
  __init__.py
  exceptions.py
  python/hsfs
  feature_group.py 3148-3149, 4981-5014
  feature_view.py 264-265, 2802, 3033, 3230-3231, 3320-3321, 3637-3638, 3691-3699, 3713-3745, 4149-4150, 5156-5157, 5203-5218
  hopsworks_udf.py 938, 1462
  transformation_function.py
  python/hsfs/core
  feature_group_engine.py 131, 218-228, 267-268
  feature_view_engine.py 946-956, 1014-1015, 1020
  transformation_execution_dag.py 64, 150, 159-170, 212, 294-295, 317-331
  transformation_function_engine.py 172-176, 191, 237, 269, 278, 344, 349, 501, 559, 615, 617, 626-627, 650, 655-660, 667-673, 712-725, 744-754, 807, 809, 821, 847-867, 880-896, 939-949, 959, 996
  vector_server.py 305-315, 400-411, 430-440, 567-568, 794, 911-912, 1059-1069, 1149-1158, 1519-1520, 1538-1539, 1546-1557
  python/hsfs/engine
  python.py 1909
  spark.py 1868-1869, 1893-1894, 1935-1936
Project Total  

This report was generated by python-coverage-comment-action

@manu-sj manu-sj marked this pull request as draft May 21, 2026 13:06
manu-sj and others added 5 commits May 21, 2026 15:08
Strip extra blank line caught by `ruff format --check` on the
hopsworks-api lint workflow. No semantic change; will be folded
into the squashed branch commit at Phase 6 wrap-up.

Signed-off-by: Manu Sathyarajan Joseph <manu.joseph@logicalclocks.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Several n_processes docstrings claimed the default is the DAG's
maximum parallelism. The implementation defaults to sequential on
the dict / list path, and to sequential for DataFrames unless the
chain has at least two transformations and the input has at least
10000 rows. Reword the four affected docstrings to match the
implementation, and fix the recurring "transfromation" typo on the
way through.

Touches the online entry points on VectorServer
(apply_model_dependent_transformations, compute_on_demand_features),
the offline entry points on the Python engine
(write_training_dataset_dataframe, write_training_dataset_splits),
and the two FeatureGroup.insert overloads. feature_view.py
already described the actual policy.

Signed-off-by: Manu Sathyarajan Joseph <manu.joseph@logicalclocks.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address findings from the hands-on review:

- Restore __main__ scope for UDF wrapper compilation. The wrapper
  inlines the user UDF source via exec(code, scope); reducing scope
  to only __builtins__ broke any UDF that referenced a notebook
  global (NameError at runtime).
- Document the single-threaded-client contract at the class-level
  pool singleton. Internal Hopsworks subsystems never concurrently
  invoke apply_transformation_functions, so the create / swap /
  shutdown sequence is lock-free under that contract.
- Honor HSFS_TF_POOL_START_METHOD on non-Windows so users can
  switch to forkserver / spawn when fork after threaded libraries
  is unsafe. Default is unchanged (fork) on Linux / macOS.
- Replace three duplicate post-failure cleanup blocks with
  _discard_pool_after_failure(), which actually shuts down the
  pool (wait=False, cancel_futures=True) before nulling the
  singleton. Previously the failed pool stayed alive until GC.
- List-of-dict request_parameter validation now intersects keys
  across every row instead of trusting the first row. A TF input
  resolved via request_parameters must be present on every row,
  matching the per-row execution that follows validation.
- Document the FSTORE-1938 return-shape change on apply_udf_on_dict
  and apply_udf_on_dataframe (now return only transformed columns).
- Document the intentional silent-ignore of unknown columns in
  drop_columns (chained TFs may have already dropped intermediates).
- Spark engine: reject the list-of-dicts request_parameters shape
  with a clear error rather than letting it fail opaquely inside
  withColumn; extract a likely missing-column hint from
  AnalysisException so the wrapped FeatureStoreException matches
  the Python-engine path.
- warmup_online_workers docstring calls out the per-worker engine
  init cost so users tuning n_processes know the memory trade-off.
- Correct n_processes docstrings in vector_server, engine/python,
  and feature_group (previously claimed max_parallelism default
  contradicting the implementation; fix recurring "transfromation"
  typo).

Signed-off-by: Manu Sathyarajan Joseph <manu.joseph@logicalclocks.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address findings from the Codex pre-review pass on the Round-2
commit (433ea25):

- HSFS_TF_POOL_START_METHOD: warn on invalid values rather than
  silently falling through to the default. A typo like
  HSFS_TF_POOL_START_METHOD=forked previously gave no feedback.
- _prepare_transformation_function_scope: document the
  spawn / forkserver caveat. Under fork the worker inherits the
  user's interactive __main__ so notebook globals are visible;
  under spawn / forkserver the worker's __main__ is the bootstrap
  module and UDFs that reference notebook globals will still
  NameError there, independent of the scope-build policy. Calling
  this out in the docstring so users tuning
  HSFS_TF_POOL_START_METHOD know the trade-off.
- Spark AnalysisException regex: tighten so a UDF-resolution
  failure like `cannot resolve `MY_UDF`(col1) ...` does not
  produce a misleading "missing column MY_UDF" hint. The trailing
  (?!\s*\() negative lookahead rejects tokens immediately followed
  by `(`, and the prefix is restricted to the three column-specific
  phrasings Spark actually uses. Also removes the redundant
  `import re as _re` from inside the except block; re is already
  imported at module scope.
- _validate_transformation_function_arguments list-of-dict data
  features: intersect keys across every row (mirrors the existing
  request_parameters intersection introduced in this branch). A
  feature key now counts as present only if every row carries it,
  matching the per-row execution semantics that follow validation.

Signed-off-by: Manu Sathyarajan Joseph <manu.joseph@logicalclocks.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address findings from the second Codex review pass:

- D — Per-row union of (data keys, request_parameter keys), then
  intersect across rows. The previous version intersected data
  keys and request_parameter keys independently, which falsely
  rejected mixed batches where row 0 carried a feature via its
  data dict and row 1 carried the same feature via its per-row
  request_parameters. The new logic models what per-row execution
  actually sees: a feature counts as available only when every
  row has at least one source for it. For DataFrame inputs the
  request_parameter list is concatenated by _update_request_-
  parameter_data with the union of keys (missing fill as NaN),
  so on the DataFrame path the available rp set is the union of
  every row's rp keys.

- A — Spark AnalysisException regex gains re.DOTALL so messages
  that split the leading phrase and the quoted column across
  newlines (common for nested-exception payloads) still match.
  The leading-phrase anchor keeps over-matching contained.

- C — HSFS_TF_POOL_START_METHOD warning now fires once per
  session via a class-level latch (_start_method_warned). The
  resolver is called on every pool create / swap, so without the
  latch a persistent bad env value would flood the log of a
  long-running process.

- N — _VALID_START_METHODS frozenset declared at class level so
  the warning message auto-enumerates the accepted values and a
  future contributor adding a method does not need to update the
  message string.

Signed-off-by: Manu Sathyarajan Joseph <manu.joseph@logicalclocks.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant