Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions project/SparkMimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,7 @@ object SparkMimaExcludes {

// Changes in 1.2.0
ProblemFilters.exclude[MissingClassProblem]("io.delta.storage.LogStore"),
ProblemFilters.exclude[MissingClassProblem]("io.delta.storage.CloseableIterator"),

// Changes in 4.0.0
ProblemFilters.exclude[IncompatibleResultTypeProblem]("io.delta.tables.DeltaTable.improveUnsupportedOpError"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("io.delta.tables.DeltaMergeBuilder.execute")
ProblemFilters.exclude[MissingClassProblem]("io.delta.storage.CloseableIterator")

// scalastyle:on line.size.limit
)
Expand Down
6 changes: 2 additions & 4 deletions python/delta/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -1236,15 +1236,13 @@ def withSchemaEvolution(self) -> "DeltaMergeBuilder":
return DeltaMergeBuilder(self._spark, new_jbuilder)

@since(0.4) # type: ignore[arg-type]
def execute(self) -> DataFrame:
def execute(self) -> None:
"""
Execute the merge operation based on the built matched and not matched actions.
See :py:class:`~delta.tables.DeltaMergeBuilder` for complete usage details.
"""
return DataFrame(
self._jbuilder.execute(),
getattr(self._spark, "_wrapped", self._spark)) # type: ignore[attr-defined]
self._jbuilder.execute()

def __getMatchedBuilder(
self, condition: OptionalExpressionOrColumn = None
Expand Down
14 changes: 2 additions & 12 deletions python/delta/tests/test_deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,27 +152,17 @@ def reset_table() -> None:

# String expressions in merge condition and dicts
reset_table()
merge_output = dt.merge(source, "key = k") \
dt.merge(source, "key = k") \
.whenMatchedUpdate(set={"value": "v + 0"}) \
.whenNotMatchedInsert(values={"key": "k", "value": "v + 0"}) \
.whenNotMatchedBySourceUpdate(set={"value": "value + 0"}) \
.execute()
self.__checkAnswer(merge_output,
([Row(6, # type: ignore[call-overload]
4, # updated rows (a and b in WHEN MATCHED
# and c and d in WHEN NOT MATCHED BY SOURCE)
0, # deleted rows
2)]), # inserted rows (e and f)
StructType([StructField('num_affected_rows', LongType(), False),
StructField('num_updated_rows', LongType(), False),
StructField('num_deleted_rows', LongType(), False),
StructField('num_inserted_rows', LongType(), False)]))
self.__checkAnswer(dt.toDF(),
([('a', -1), ('b', 0), ('c', 3), ('d', 4), ('e', -5), ('f', -6)]))

# Column expressions in merge condition and dicts
reset_table()
merge_output = dt.merge(source, expr("key = k")) \
dt.merge(source, expr("key = k")) \
.whenMatchedUpdate(set={"value": col("v") + 0}) \
.whenNotMatchedInsert(values={"key": "k", "value": col("v") + 0}) \
.whenNotMatchedBySourceUpdate(set={"value": col("value") + 0}) \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ class DeltaMergeBuilder private(
*
* @since 0.3.0
*/
def execute(): DataFrame = improveUnsupportedOpError {
def execute(): Unit = improveUnsupportedOpError {
val sparkSession = targetTable.toDF.sparkSession
withActiveSession(sparkSession) {
// Note: We are explicitly resolving DeltaMergeInto plan rather than going to through the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ trait AnalysisHelper {
DataFrameUtils.ofRows(sparkSession, logicalPlan)
}

protected def improveUnsupportedOpError[T](f: => T): T = {
protected def improveUnsupportedOpError(f: => Unit): Unit = {
val possibleErrorMsgs = Seq(
"is only supported with v2 table", // full error: DELETE is only supported with v2 tables
"is not supported temporarily", // full error: UPDATE TABLE is not supported temporarily
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.delta
import org.apache.spark.sql.delta.MergeIntoMetricsShims._
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.{DataFrame, QueryTest}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -271,9 +271,7 @@ trait MergeIntoMetricsBase
val sourceDfWithExtraCols = addExtraColumns(sourceDf)

// Run MERGE INTO command
val mergeResultDf = mergeCmdFn(targetTable, sourceDfWithExtraCols)

checkMergeResultMetrics(mergeResultDf, expectedOpMetrics)
mergeCmdFn(targetTable, sourceDfWithExtraCols)

// Query the operation metrics from the Delta log history.
val operationMetrics: Map[String, String] = getOperationMetrics(targetTable.history(1))
Expand Down Expand Up @@ -314,24 +312,6 @@ trait MergeIntoMetricsBase
}
}

private def checkMergeResultMetrics(
mergeResultDf: DataFrame,
metrics: Map[String, Int]): Unit = {
val numRowsUpdated = metrics.get("numTargetRowsUpdated").map(_.toLong).getOrElse(0L)
val numRowsDeleted = metrics.get("numTargetRowsDeleted").map(_.toLong).getOrElse(0L)
val numRowsInserted = metrics.get("numTargetRowsInserted").map(_.toLong).getOrElse(0L)
val numRowsTouched =
numRowsUpdated +
numRowsDeleted +
numRowsInserted

assert(mergeResultDf.collect() ===
Array(Row(numRowsTouched,
numRowsUpdated,
numRowsDeleted,
numRowsInserted)))
}

/////////////////////////////
// insert-only merge tests //
/////////////////////////////
Expand Down Expand Up @@ -1388,7 +1368,7 @@ object MergeIntoMetricsBase extends QueryTest with SharedSparkSession {
// helpful types //
///////////////////

type MergeCmd = (io.delta.tables.DeltaTable, DataFrame) => DataFrame
type MergeCmd = (io.delta.tables.DeltaTable, DataFrame) => Unit

/////////////////////
// helpful methods //
Expand Down
Loading