diff --git a/project/SparkMimaExcludes.scala b/project/SparkMimaExcludes.scala index 20af16c8423..98b354eef24 100644 --- a/project/SparkMimaExcludes.scala +++ b/project/SparkMimaExcludes.scala @@ -84,12 +84,7 @@ object SparkMimaExcludes { // Changes in 1.2.0 ProblemFilters.exclude[MissingClassProblem]("io.delta.storage.LogStore"), - ProblemFilters.exclude[MissingClassProblem]("io.delta.storage.CloseableIterator"), - - // Changes in 4.0.0 - ProblemFilters.exclude[IncompatibleResultTypeProblem]("io.delta.tables.DeltaTable.improveUnsupportedOpError"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("io.delta.tables.DeltaMergeBuilder.execute") + ProblemFilters.exclude[MissingClassProblem]("io.delta.storage.CloseableIterator") // scalastyle:on line.size.limit ) diff --git a/python/delta/tables.py b/python/delta/tables.py index 7ef093047a3..35fdd8b9281 100644 --- a/python/delta/tables.py +++ b/python/delta/tables.py @@ -1236,15 +1236,13 @@ def withSchemaEvolution(self) -> "DeltaMergeBuilder": return DeltaMergeBuilder(self._spark, new_jbuilder) @since(0.4) # type: ignore[arg-type] - def execute(self) -> DataFrame: + def execute(self) -> None: """ Execute the merge operation based on the built matched and not matched actions. See :py:class:`~delta.tables.DeltaMergeBuilder` for complete usage details. """ - return DataFrame( - self._jbuilder.execute(), - getattr(self._spark, "_wrapped", self._spark)) # type: ignore[attr-defined] + self._jbuilder.execute() def __getMatchedBuilder( self, condition: OptionalExpressionOrColumn = None diff --git a/python/delta/tests/test_deltatable.py b/python/delta/tests/test_deltatable.py index 01d98d9fbde..0576b2eeec9 100644 --- a/python/delta/tests/test_deltatable.py +++ b/python/delta/tests/test_deltatable.py @@ -152,27 +152,17 @@ def reset_table() -> None: # String expressions in merge condition and dicts reset_table() - merge_output = dt.merge(source, "key = k") \ + dt.merge(source, "key = k") \ .whenMatchedUpdate(set={"value": "v + 0"}) \ .whenNotMatchedInsert(values={"key": "k", "value": "v + 0"}) \ .whenNotMatchedBySourceUpdate(set={"value": "value + 0"}) \ .execute() - self.__checkAnswer(merge_output, - ([Row(6, # type: ignore[call-overload] - 4, # updated rows (a and b in WHEN MATCHED - # and c and d in WHEN NOT MATCHED BY SOURCE) - 0, # deleted rows - 2)]), # inserted rows (e and f) - StructType([StructField('num_affected_rows', LongType(), False), - StructField('num_updated_rows', LongType(), False), - StructField('num_deleted_rows', LongType(), False), - StructField('num_inserted_rows', LongType(), False)])) self.__checkAnswer(dt.toDF(), ([('a', -1), ('b', 0), ('c', 3), ('d', 4), ('e', -5), ('f', -6)])) # Column expressions in merge condition and dicts reset_table() - merge_output = dt.merge(source, expr("key = k")) \ + dt.merge(source, expr("key = k")) \ .whenMatchedUpdate(set={"value": col("v") + 0}) \ .whenNotMatchedInsert(values={"key": "k", "value": col("v") + 0}) \ .whenNotMatchedBySourceUpdate(set={"value": col("value") + 0}) \ diff --git a/spark/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala b/spark/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala index 3ebdac9d94c..d1598b406ce 100644 --- a/spark/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala +++ b/spark/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala @@ -291,7 +291,7 @@ class DeltaMergeBuilder private( * * @since 0.3.0 */ - def execute(): DataFrame = improveUnsupportedOpError { + def execute(): Unit = improveUnsupportedOpError { val sparkSession = targetTable.toDF.sparkSession withActiveSession(sparkSession) { // Note: We are explicitly resolving DeltaMergeInto plan rather than going to through the diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/AnalysisHelper.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/AnalysisHelper.scala index b58ac935be6..55105708fdf 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/AnalysisHelper.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/AnalysisHelper.scala @@ -92,7 +92,7 @@ trait AnalysisHelper { DataFrameUtils.ofRows(sparkSession, logicalPlan) } - protected def improveUnsupportedOpError[T](f: => T): T = { + protected def improveUnsupportedOpError(f: => Unit): Unit = { val possibleErrorMsgs = Seq( "is only supported with v2 table", // full error: DELETE is only supported with v2 tables "is not supported temporarily", // full error: UPDATE TABLE is not supported temporarily diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMetricsBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMetricsBase.scala index 225df6f9f0f..5cf464497f0 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMetricsBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMetricsBase.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.delta.MergeIntoMetricsShims._ import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.{DataFrame, QueryTest} import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions.expr import org.apache.spark.sql.test.SharedSparkSession @@ -271,9 +271,7 @@ trait MergeIntoMetricsBase val sourceDfWithExtraCols = addExtraColumns(sourceDf) // Run MERGE INTO command - val mergeResultDf = mergeCmdFn(targetTable, sourceDfWithExtraCols) - - checkMergeResultMetrics(mergeResultDf, expectedOpMetrics) + mergeCmdFn(targetTable, sourceDfWithExtraCols) // Query the operation metrics from the Delta log history. val operationMetrics: Map[String, String] = getOperationMetrics(targetTable.history(1)) @@ -314,24 +312,6 @@ trait MergeIntoMetricsBase } } - private def checkMergeResultMetrics( - mergeResultDf: DataFrame, - metrics: Map[String, Int]): Unit = { - val numRowsUpdated = metrics.get("numTargetRowsUpdated").map(_.toLong).getOrElse(0L) - val numRowsDeleted = metrics.get("numTargetRowsDeleted").map(_.toLong).getOrElse(0L) - val numRowsInserted = metrics.get("numTargetRowsInserted").map(_.toLong).getOrElse(0L) - val numRowsTouched = - numRowsUpdated + - numRowsDeleted + - numRowsInserted - - assert(mergeResultDf.collect() === - Array(Row(numRowsTouched, - numRowsUpdated, - numRowsDeleted, - numRowsInserted))) - } - ///////////////////////////// // insert-only merge tests // ///////////////////////////// @@ -1388,7 +1368,7 @@ object MergeIntoMetricsBase extends QueryTest with SharedSparkSession { // helpful types // /////////////////// - type MergeCmd = (io.delta.tables.DeltaTable, DataFrame) => DataFrame + type MergeCmd = (io.delta.tables.DeltaTable, DataFrame) => Unit ///////////////////// // helpful methods //