diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala index 878d41e5394..a6ddfebd28d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala @@ -77,6 +77,71 @@ object ResolveDeltaMergeInto { resolveOrFail(resolveExprsFn, Seq(expr), plansToResolveExpr, mergeClauseTypeStr).head } + /** + * Resolves UnresolvedStar (`*`) for `UPDATE SET *` or `INSERT *` actions. + * + * When schema evolution is disabled: expands `*` for all target columns + * + * When schema evolution is enabled: + * - For INSERT clauses: expands `*` for all source columns + * - For UPDATE clauses: expands `*` for all source leaf fields + * + * @param clause the merge clause being resolved (INSERT or UPDATE) + * @param target the target table plan + * @param source the source data plan + * @param canEvolveSchema whether schema evolution is enabled + * @param resolveExprsFn function to resolve expressions + * @param mergeClauseTypeStr string description of the clause type for error messages + * @param conf SQL configuration + * @return sequence of resolved DeltaMergeActions + */ + private def resolveStar( + clause: DeltaMergeIntoClause, + target: LogicalPlan, + source: LogicalPlan, + canEvolveSchema: Boolean, + resolveExprsFn: ResolveExpressionsFn, + mergeClauseTypeStr: String, + conf: SQLConf): Seq[DeltaMergeAction] = { + if (!canEvolveSchema) { + // Expand `*` into seq of [ `columnName = sourceColumnBySameName` ] for every target + // column name. The target columns do not need resolution. The right hand side + // expression (i.e. sourceColumnBySameName) needs to be resolved only by the source plan. + val unresolvedExprs = target.output.map { attr => + UnresolvedAttribute.quotedString(s"`${attr.name}`") + } + val resolvedExprs = resolveOrFail( + resolveExprsFn = resolveExprsFn, + exprs = unresolvedExprs, + plansToResolveExprs = Seq(source), + mergeClauseTypeStr = mergeClauseTypeStr) + (resolvedExprs, target.output.map(_.name)) + .zipped + .map { (resolvedExpr, targetColName) => + DeltaMergeAction(Seq(targetColName), resolvedExpr, targetColNameResolved = true) + } + } else { + clause match { + case _: DeltaMergeIntoNotMatchedInsertClause => + // Expand `*` into seq of [ `columnName = sourceColumnBySameName` ] for every source + // column name. Target columns not present in the source will be filled in + // with null later. + source.output.map { attr => + DeltaMergeAction(Seq(attr.name), attr, targetColNameResolved = true) + } + case _: DeltaMergeIntoMatchedUpdateClause => + // Expand `*` into seq of [ `columnName = sourceColumnBySameName` ] for every source + // column name. Target columns not present in the source will be filled in with + // no-op actions later. + // Nested columns are unfolded to accommodate the case where a source struct has a + // subset of the nested columns in the target. If a source struct (a, b) is writing + // into a target (a, b, c), the final struct after filling in the no-op actions will + // be (s.a, s.b, t.c). + getLeafActionsForSchema(source.schema, Seq.empty, source, conf) + } + } + } + /** * Returns the sequence of [[DeltaMergeActions]] corresponding to * [ `columnName = sourceColumnBySameName` ] for every column name in the schema. Nested @@ -175,43 +240,9 @@ object ResolveDeltaMergeInto { val resolvedOtherExpressions: Seq[DeltaMergeAction] = allOtherExpressions.flatMap { action => action match { // For actions like `UPDATE SET *` or `INSERT *` - case _: UnresolvedStar if !canEvolveSchema => - // Expand `*` into seq of [ `columnName = sourceColumnBySameName` ] for every target - // column name. The target columns do not need resolution. The right hand side - // expression (i.e. sourceColumnBySameName) needs to be resolved only by the source - // plan. - val unresolvedExprs = target.output.map { attr => - UnresolvedAttribute.quotedString(s"`${attr.name}`") - } - val resolvedExprs = resolveOrFail( - resolveExprsFn = resolveExprsFn, - exprs = unresolvedExprs, - plansToResolveExprs = Seq(source), - mergeClauseTypeStr = mergeClauseTypeStr) - (resolvedExprs, target.output.map(_.name)) - .zipped - .map { (resolvedExpr, targetColName) => - DeltaMergeAction(Seq(targetColName), resolvedExpr, targetColNameResolved = true) - } - case _: UnresolvedStar if canEvolveSchema => - clause match { - case _: DeltaMergeIntoNotMatchedInsertClause => - // Expand `*` into seq of [ `columnName = sourceColumnBySameName` ] for every source - // column name. Target columns not present in the source will be filled in - // with null later. - source.output.map { attr => - DeltaMergeAction(Seq(attr.name), attr, targetColNameResolved = true) - } - case _: DeltaMergeIntoMatchedUpdateClause => - // Expand `*` into seq of [ `columnName = sourceColumnBySameName` ] for every source - // column name. Target columns not present in the source will be filled in with - // no-op actions later. - // Nested columns are unfolded to accommodate the case where a source struct has a - // subset of the nested columns in the target. If a source struct (a, b) is writing - // into a target (a, b, c), the final struct after filling in the no-op actions will - // be (s.a, s.b, t.c). - getLeafActionsForSchema(source.schema, Seq.empty, source, conf) - } + case _: UnresolvedStar => + resolveStar( + clause, target, source, canEvolveSchema, resolveExprsFn, mergeClauseTypeStr, conf) case _ =>