[SPARK-56550][SQL] Support source with fewer columns/fields in INSERT INTO WITH SCHEMA EVOLUTION#55427
Conversation
…SERT INTO WITH SCHEMA EVOLUTION Add support for INSERT INTO WITH SCHEMA EVOLUTION to fill missing nested struct fields with null (or column defaults) when the source has fewer fields than the target, mirroring existing MERGE INTO behavior. Changes: - Add spark.sql.insertNestedTypeCoercion.enabled config flag (default false) - Refactor TableOutputResolver.resolveOutputColumns to accept DefaultValueFillMode enum directly instead of two overlapping boolean parameters - Enable RECURSE mode for V2 inserts when both schema evolution and the config flag are active - Add comprehensive tests for all scenarios
johanl-db
left a comment
There was a problem hiding this comment.
The change makes sense, the most important points are:
- It only applies when schema evolution is enabled. Without schema evolution, Spark should fallback to schema enforcement: fail when schemas don't match
- It's disabled by default for now. That'll allow tuning the behavior if we find any case that deviates from the intended behavior.
| TableOutputResolver.suitableForByNameCheck(v2Write.isByName, | ||
| expected = v2Write.table.output, queryOutput = v2Write.query.output) | ||
| val defaultValueFillMode = | ||
| if (conf.coerceInsertNestedTypes && v2Write.schemaEvolutionEnabled) RECURSE |
There was a problem hiding this comment.
Worth calling out in a comment: without schema evolution, spark will enforce that there are no missing columns in the data being written
| } | ||
|
|
||
| // --------------------------------------------------------------------------- | ||
| // Tests for source with fewer columns/fields than target |
There was a problem hiding this comment.
Another interesting test case I can think of:
insert by name + schema evolution with an extra column and a missing column: column count is the same between source and target, but not the same column names
Same for nested struct
cloud-fan
left a comment
There was a problem hiding this comment.
Summary
Prior state and problem. INSERT INTO ... WITH SCHEMA EVOLUTION required source columns and nested fields to match the target exactly; sources with fewer struct fields (or fewer trailing top-level columns, under by-position) produced arity / struct-missing-field errors. MERGE INTO already has the equivalent coercion via spark.sql.mergeNestedTypeCoercion.enabled (SPARK-53482), which created an asymmetry for schema-evolution workflows where older sources trail the target.
Design approach. Mirrors the MERGE pattern with a parallel internal flag spark.sql.insertNestedTypeCoercion.enabled (off by default, experimental, v4.2.0). Leverages the existing DefaultValueFillMode enum (NONE / FILL / RECURSE) already threaded through TableOutputResolver and replaces the supportColDefaultValue: Boolean parameter on resolveOutputColumns with it. RECURSE is selected in ResolveOutputRelation only when both conf.coerceInsertNestedTypes and v2Write.schemaEvolutionEnabled are true; otherwise FILL is selected (preserving prior top-level default-fill behavior). RECURSE then (a) relaxes the by-position "not enough columns" gate and fills trailing target columns with defaults / null, and (b) propagates the fill flag through nested struct resolution.
Key design decisions made by this PR.
- Gating: config ×
schemaEvolutionEnabled. Off by default and explicitly scoped to the WITH SCHEMA EVOLUTION path, so plain INSERT INTO behavior is untouched. - V2-only scope:
PreprocessTableInsertion(V1) keeps FILL — V1 doesn't support schema evolution, so this is coherent. - Enum replacement of the boolean: cleaner signature, and makes the three possible modes explicit at every call site.
Implementation sketch.
Analyzer.scalaResolveOutputRelationpicks the mode.TableOutputResolver.resolveOutputColumnstakes the new parameter, derivesfillDefaultValue = defaultValueFillMode == RECURSE, and loosens the by-position gate.resolveColumnsByPositiongainsfillDefaultValue; when true, appends defaults for target columns trailing past the input length.resolveStructTypeforwardsfillDefaultValueinto recursive by-position resolution.- 17 new tests in
InsertIntoSchemaEvolutionTestscover by-name / by-position, explicit DEFAULT, nested structs (including inside arrays and map values — by-name only), deeply nested, null structs, and negative cases.
Main open points (see inline comments).
resolveArrayType/resolveMapTypeby-position recursion doesn't propagatefillDefaultValue, inconsistent withresolveStructType. Concretely, by-position INSERT intoarray<struct<...>>/map<_, struct<...>>with missing nested fields still errors even under RECURSE, while the by-name counterpart succeeds. Tests only cover by-name for array / map, so this gap isn't exercised.- By-position trailing default fill skips
applyColumnMetadata, inconsistent with the by-name path — likely to break char / varchar write-side handling for trailing filled columns. - A few doc / comment / test-fidelity issues.
General note on the pre-existing DefaultValueFillMode enum doc (TableOutputResolver.scala:42-48, not in this diff): it was written for the MERGE use case and currently says RECURSE "fill[s] missing top-level columns and also recurse[s] into nested struct fields". After this PR, RECURSE also enables by-position top-level trailing fill. Worth updating the scaladoc as part of this PR so the enum semantics don't drift further.
| } else { | ||
| resolveColumnsByPosition( | ||
| tableName, fields, toAttributes(expectedType), conf, addError, colPath) | ||
| tableName, fields, toAttributes(expectedType), conf, addError, colPath, fillDefaultValue) |
There was a problem hiding this comment.
fillDefaultValue is correctly propagated here, but the sibling paths in resolveArrayType (line 522) and resolveMapType (lines 559, 571) still call resolveColumnsByPosition without the flag. Consequence: INSERT BY POSITION ... WITH SCHEMA EVOLUTION into a column typed array<struct<...>> or map<_, struct<...>> whose nested struct has missing fields still errors — while the BY NAME counterpart succeeds. The PR's test matrix only covers by-name for array / map, so this gap isn't exercised.
Either propagate fillDefaultValue in those two methods (consistent with resolveStructType), or narrow the enum doc / PR description to say array / map nested fill is by-name only — and skip the fillDefaultValue plumbing into those methods entirely. If the former, please add by-position test coverage for array-of-struct and map-of-struct symmetric to the existing by-name tests.
There was a problem hiding this comment.
Done in the latest commit: resolveArrayType and resolveMapType now pass fillDefaultValue into resolveColumnsByPosition for the by-position branches (matching resolveStructType). Added Insert schema evolution: source missing field in struct nested in array/map value by position tests in InsertIntoTests.
| val defaults = if (fillDefaultValue) { | ||
| actualExpectedCols.drop(inputCols.size).flatMap { expectedCol => | ||
| getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues) | ||
| .map(expr => Alias(expr, expectedCol.name)()) |
There was a problem hiding this comment.
The by-name path at line 327 routes the default-value expression through applyColumnMetadata(defaultExpr.get, expectedCol), which strips source metadata and pins the target column's required metadata — including CharVarcharUtils.cleanMetadata and the write-side metadata guarantees documented on applyColumnMetadata. This trailing-fill branch just does Alias(expr, expectedCol.name)(), so for a by-position insert where the trailing target column is char / varchar, the filled column won't carry the target-column metadata the way the by-name filled column does. Suggest aligning with the by-name path:
| .map(expr => Alias(expr, expectedCol.name)()) | |
| getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues) | |
| .map(expr => applyColumnMetadata(expr, expectedCol)) |
There was a problem hiding this comment.
Done: the trailing-fill branch now uses applyColumnMetadata(expr, expectedCol) like the by-name path.
| // Negative tests: missing columns/fields should fail WITHOUT schema evolution | ||
| // --------------------------------------------------------------------------- | ||
|
|
||
| test("Insert without evolution: source missing top-level column by name fails") { |
There was a problem hiding this comment.
Test name says "by name" but the call uses doInsert (by-position). The asserted error INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS is only emitted on the by-position path — the by-name path would instead hit incompatibleDataToTableCannotFindDataError (see the nested-field counterpart a few tests below that uses doInsertByName). For symmetry with the positive test that uses byName = true, please switch to doInsertByName and update the expected error accordingly (or rename this test to "… by position fails", but then it duplicates the existing by-position test at line 1791).
There was a problem hiding this comment.
Done: switched to doInsertByName and assert INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA for missing salary. Wrapped in withSQLConf(USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES -> false) so FILL mode does not silently insert null under the test session defaults (otherwise the insert succeeds and no exception is thrown).
| // With schema evolution, allow the source to have fewer columns/fields than the target | ||
| // and fill missing ones with default values or nulls (RECURSE mode). Without schema | ||
| // evolution, only top-level default column values are filled (FILL mode) and any | ||
| // missing columns will cause a schema enforcement error. |
There was a problem hiding this comment.
The phrase "only top-level default column values are filled (FILL mode) and any missing columns will cause a schema enforcement error" reads as self-contradictory: FILL mode does fill missing top-level columns (with explicit DEFAULT values, or null when spark.sql.defaultColumn.useNullsForMissingDefaultValues is true). The intended contrast is with nested fields, not top-level. Suggest:
| // missing columns will cause a schema enforcement error. | |
| // With schema evolution + coercion flag, missing top-level columns AND missing nested | |
| // struct fields are filled with defaults/null (RECURSE mode). Otherwise, only missing | |
| // top-level columns are filled via FILL mode; missing nested struct fields still cause | |
| // schema enforcement errors. |
There was a problem hiding this comment.
Done: updated the comment to your suggested wording (RECURSE vs FILL and nested vs top-level).
| val INSERT_INTO_NESTED_TYPE_COERCION_ENABLED = | ||
| buildConf("spark.sql.insertNestedTypeCoercion.enabled") | ||
| .internal() | ||
| .doc("If enabled, allow INSERT INTO WITH SCHEMA EVOLUTION to fill missing nested " + |
There was a problem hiding this comment.
The doc describes only the nested-struct-field effect, but enabling this flag also loosens by-position trailing top-level fill (see the "Missing top-level column (by position)" row in the PR description, which moves from "error" to "fill trailing"). Users skimming the config doc might reasonably expect the flag's scope to be limited to nested types. Suggest extending, e.g.:
| .doc("If enabled, allow INSERT INTO WITH SCHEMA EVOLUTION to fill missing nested " + | |
| .doc("If enabled, allow INSERT INTO WITH SCHEMA EVOLUTION to fill missing nested " + | |
| "struct fields with null when the source has fewer nested fields than the target " + | |
| "table. Also relaxes by-position column-count enforcement so trailing missing " + | |
| "top-level columns are filled with their default value (or null). This is " + | |
| "experimental and the semantics may change.") |
There was a problem hiding this comment.
Done: extended the config doc to mention by-position trailing top-level fill as well.
…on coercion Propagate fillDefaultValue through resolveArrayType and resolveMapType by-position paths; use applyColumnMetadata for trailing default fills; clarify Analyzer and SQLConf docs; extend DefaultValueFillMode scaladoc; fix by-name negative test (with USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES disabled) and add by-position array/map nested struct tests.
|
@cloud-fan thanks for the detailed review. I addressed the inline points in a single follow-up commit (pushed to this branch):
Replies are threaded on each of your line comments. |
What changes were proposed in this pull request?
Add support for
INSERT INTO ... WITH SCHEMA EVOLUTIONto fill missing nested struct fields with null (or column defaults) when the source has fewer fields than the target table. This mirrors the existingMERGE INTObehavior gated byspark.sql.mergeNestedTypeCoercion.enabled.Specific changes:
New config flag:
spark.sql.insertNestedTypeCoercion.enabled(internal, defaultfalse) — mirrors the existingspark.sql.mergeNestedTypeCoercion.enabledfor MERGE.Refactored
TableOutputResolver.resolveOutputColumns: Replaced two overlapping boolean parameters (supportColDefaultValue,fillNestedDefaults) with a singleDefaultValueFillModeenum (NONE,FILL,RECURSE), making the API cleaner and the intent explicit at each call site.RECURSE mode for V2 inserts: When both schema evolution and the coercion flag are enabled,
RECURSEmode fills missing nested struct fields with null, relaxes the by-position arity check, and recurses into structs nested within arrays and maps.Supported scenarios (source has fewer columns/fields than target, with schema evolution + coercion flag):
Why are the changes needed?
MERGE INTOalready supports coercing nested types when the source has fewer struct fields than the target (viaspark.sql.mergeNestedTypeCoercion.enabled).INSERT INTO WITH SCHEMA EVOLUTIONlacked this capability, causing errors for legitimate use cases where the source schema is a subset of the target schema at the nested level.This is important for schema evolution workflows where tables accumulate new nested fields over time, but older data sources may not have all fields populated.
Does this PR introduce any user-facing change?
Yes. When
spark.sql.insertNestedTypeCoercion.enabledis set totrue(defaultfalse),INSERT INTO ... WITH SCHEMA EVOLUTIONwill no longer fail when the source has fewer nested struct fields than the target. Instead, missing fields are filled with null. This is gated behind an internal, experimental config flag.How was this patch tested?
Added 17 new test cases in
InsertIntoSchemaEvolutionTests:Positive tests (with schema evolution + coercion flag):
Negative tests (verifying errors when coercion is disabled):
All 64 matched tests pass.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor (Claude Opus 4)