Skip to content

Commit 13ff198

Browse files
authored
[UniForm] Support conflict resolution end to end (#6962)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> ## Description Support conflict resolution for UniForm with new Delta-Rest API integration. More specifically, when a Delta transaction retries after a conflict, the catalogTable it carries — which encodes the last converted Iceberg metadataLocation and convertedDeltaVersion — can be stale. Due to external UniForm's concurrency control design, it has to re-fetch latest UniForm information to proceed. ## How was this patch tested? Add UTs
1 parent 4740bcd commit 13ff198

3 files changed

Lines changed: 244 additions & 39 deletions

File tree

iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@ import org.apache.spark.sql.delta.{CommittedTransaction, CurrentTransactionInfo,
2828
import org.apache.spark.sql.delta.DeltaOperations.OPTIMIZE_OPERATION_NAME
2929
import org.apache.spark.sql.delta.RowId.RowTrackingMetadataDomain
3030
import org.apache.spark.sql.delta.actions.{Action, AddFile, CommitInfo, DomainMetadata, FileAction, InMemoryLogReplay, Metadata, Protocol, RemoveFile}
31+
import org.apache.spark.sql.delta.catalog.DeltaTableV2
3132
import org.apache.spark.sql.delta.hooks.IcebergConverterHook
3233
import org.apache.spark.sql.delta.logging.DeltaLogKeys
3334
import org.apache.spark.sql.delta.metering.DeltaLogging
3435
import org.apache.spark.sql.delta.sources.DeltaSQLConf
3536
import org.apache.spark.sql.delta.util.TransactionHelper
37+
import io.delta.storage.commit.{CommitFailedException => DeltaCommitFailedException}
3638
import org.apache.commons.lang3.exception.ExceptionUtils
3739
import org.apache.hadoop.fs.Path
3840
import shadedForDelta.org.apache.iceberg.{Table => IcebergTable, TableProperties}
@@ -44,6 +46,7 @@ import shadedForDelta.org.apache.iceberg.util.LocationUtil
4446
import org.apache.spark.internal.MDC
4547
import org.apache.spark.sql.{Dataset, SparkSession}
4648
import org.apache.spark.sql.catalyst.catalog.CatalogTable
49+
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, V1Table}
4750

4851
object IcebergConverter {
4952

@@ -217,13 +220,83 @@ class IcebergConverter
217220
(newMetadataPath, lastConvertedInfo.deltaVersionConverted)
218221
}
219222

220-
private def refreshCatalogTableIfNeeded(
223+
protected def refreshCatalogTableIfNeeded(
221224
txnInfo: CurrentTransactionInfo,
222225
deltaAttemptVersion: Long,
223226
catalogTable: CatalogTable): CatalogTable = {
224-
catalogTable
227+
val lastDeltaVersionConvertedOpt =
228+
catalogTable.storage.properties
229+
.get(IcebergConstants.CATALOG_TABLE_ICEBERG_CONVERTED_DELTA_VERSION_PROP)
230+
.map(_.toLong)
231+
val needsRefreshCatalogTable =
232+
deltaAttemptVersion > txnInfo.readSnapshot.version + 1 &&
233+
UniversalFormat.icebergEnabled(txnInfo.metadata) &&
234+
lastDeltaVersionConvertedOpt.nonEmpty
235+
if (!needsRefreshCatalogTable) {
236+
return catalogTable
237+
}
238+
val refreshedTable = reloadCatalogTable(catalogTable)
239+
if (refreshedTable.storage.locationUri != catalogTable.storage.locationUri) {
240+
throw new DeltaCommitFailedException(
241+
false, // retryable
242+
true, // conflict
243+
s"Table location changed when refreshing catalogTable. " +
244+
s"catalogTable location: ${catalogTable.storage.locationUri} " +
245+
s"refreshedTable location: ${refreshedTable.storage.locationUri}")
246+
}
247+
val refreshedLastDeltaVersionConvertedOpt =
248+
refreshedTable.storage.properties
249+
.get(IcebergConstants.CATALOG_TABLE_ICEBERG_CONVERTED_DELTA_VERSION_PROP)
250+
.map(_.toLong)
251+
val oldBaseMetadataLocation =
252+
catalogTable.storage.properties
253+
.get(IcebergConstants.CATALOG_TABLE_ICEBERG_METADATA_LOCATION_PROP)
254+
val newBaseMetadataLocation =
255+
refreshedTable.storage.properties
256+
.get(IcebergConstants.CATALOG_TABLE_ICEBERG_METADATA_LOCATION_PROP)
257+
logInfo(s"refresh CatalogTable for UniForm for table ${catalogTable.identifier}: " +
258+
s"oldLastDeltaVersionConvertedOpt=$lastDeltaVersionConvertedOpt " +
259+
s"refreshedLastDeltaVersionConvertedOpt=$refreshedLastDeltaVersionConvertedOpt " +
260+
s"oldBaseMetadataLocation=$oldBaseMetadataLocation " +
261+
s"newBaseMetadataLocation=$newBaseMetadataLocation")
262+
if (
263+
refreshedLastDeltaVersionConvertedOpt.exists(_ >= deltaAttemptVersion)
264+
) {
265+
throw new DeltaCommitFailedException(
266+
true, // retryable
267+
true, // conflict
268+
s"Attempts to commit Delta version $deltaAttemptVersion while the last converted delta " +
269+
s"version is already $refreshedLastDeltaVersionConvertedOpt")
270+
}
271+
refreshedTable
225272
}
226273

274+
/**
275+
* Reloads a CatalogTable via the V2 catalog API
276+
* The V1 SessionCatalog only reaches HMS and
277+
* fails for UC tables whose schema doesn't exist there
278+
*/
279+
protected def reloadCatalogTable(catalogTable: CatalogTable): CatalogTable = {
280+
val catalog = catalogTable.identifier.catalog
281+
.map(spark.sessionState.catalogManager.catalog)
282+
.getOrElse(spark.sessionState.catalogManager.v2SessionCatalog)
283+
val ident = Identifier.of(
284+
catalogTable.identifier.database.toArray,
285+
catalogTable.identifier.table)
286+
CatalogV2Util.loadTable(catalog, ident)
287+
.flatMap {
288+
case dt: DeltaTableV2 => dt.catalogTable
289+
case other => throw new DeltaCommitFailedException(
290+
false, // retryable
291+
false, // conflict
292+
s"Expected DeltaTableV2 when reloading catalogTable for ${catalogTable.identifier}, " +
293+
s"got ${other.getClass.getName}")
294+
}
295+
.getOrElse(throw new DeltaCommitFailedException(
296+
false, // retryable
297+
false, // conflict
298+
s"Table ${catalogTable.identifier} not found when reloading catalogTable through $catalog"))
299+
}
227300

228301
/**
229302
* Reads the last converted Iceberg state from the catalogTable storage properties.

iceberg/src/test/scala/org/apache/spark/sql/delta/uniform/UniFormConverterSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,16 @@ import org.apache.spark.sql.delta.util.JsonUtils
3434
import org.apache.spark.sql.test.SharedSparkSession
3535

3636
class IcebergConverterForTest extends IcebergConverter {
37+
/**
38+
* Disable catalogTable refresh for this test as this test is not using
39+
* UC so UniForm metadata won't be stored in catalogTable. This test
40+
* needs to manually inject UniForm metadata into catalogTable
41+
*/
42+
override protected def refreshCatalogTableIfNeeded(
43+
txnInfo: CurrentTransactionInfo,
44+
deltaAttemptVersion: Long,
45+
catalogTable: CatalogTable): CatalogTable = catalogTable
46+
3747
def convertSnapshotAndReturnMetadataPath(
3848
snapshotToConvert: Snapshot,
3949
catalogTable: CatalogTable): String = {

iceberg/src/test/scala/org/apache/spark/sql/delta/uniform/UniFormE2EIcebergSuite.scala

Lines changed: 159 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import java.util.{Collections, Optional, UUID}
2020

2121
import scala.collection.JavaConverters._
2222

23+
import com.databricks.spark.util.Log4jUsageLogger
2324
import io.delta.storage.commit.{CommitCoordinatorClient => JCommitCoordinatorClient}
2425
import io.delta.storage.commit.{TableIdentifier => UCTableIdentifier}
2526
import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol}
@@ -29,9 +30,9 @@ import org.apache.hadoop.fs.Path
2930
import org.apache.spark.{SparkConf, SparkSessionSwitch}
3031
import org.apache.spark.sql.{Row, SparkSession}
3132
import org.apache.spark.sql.catalyst.TableIdentifier
32-
import org.apache.spark.sql.catalyst.catalog.CatalogTable
33-
import org.apache.spark.sql.connector.catalog.{Identifier, Table}
34-
import org.apache.spark.sql.delta.{DeltaLog, IcebergConstants}
33+
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TableFunctionRegistry}
34+
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog}
35+
import org.apache.spark.sql.delta.{DeltaLog, DeltaOperations, DeltaTestUtils, IcebergConstants}
3536
import org.apache.spark.sql.delta.DeltaConfigs.{
3637
COORDINATED_COMMITS_COORDINATOR_CONF,
3738
COORDINATED_COMMITS_COORDINATOR_NAME
@@ -45,7 +46,6 @@ import org.apache.spark.sql.delta.coordinatedcommits.{
4546
InMemoryUCCommitCoordinator,
4647
UCCommitCoordinatorBuilder
4748
}
48-
import org.apache.spark.sql.delta.icebergShaded.IcebergConverter
4949
import org.apache.spark.sql.delta.sources.DeltaSQLConf
5050
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
5151
import org.apache.spark.sql.delta.uniform.hms.HMSTest
@@ -95,41 +95,54 @@ trait WriteDeltaHMSReadIceberg extends UniFormE2ETest
9595
}
9696

9797
/**
98-
* A [[DeltaCatalog]] subclass that enriches [[CatalogTable]] with the last converted Iceberg
99-
* metadata from [[InMemoryUCCommitCoordinator]] before loading the table. This simulates what
100-
* the real UC REST catalog does: injecting
101-
* [[IcebergConstants.CATALOG_TABLE_ICEBERG_METADATA_LOCATION_PROP]]
102-
* and [[IcebergConstants.CATALOG_TABLE_ICEBERG_CONVERTED_DELTA_VERSION_PROP]] into
103-
* catalog storage properties, matching the UC REST path in [[UCDeltaCatalogClientImpl]].
98+
* A [[SessionCatalog]] wrapper that overrides [[getTableMetadata]] to inject fresh Iceberg
99+
* metadata from [[UCEnrichedSessionCatalog.currentCoordinator]] on every call. This mirrors what
100+
* [[UCDeltaCatalogClientImpl.toV1Table]] does in production. Installed via reflection into
101+
* both [[org.apache.spark.sql.internal.SessionState.catalog]] (for the V1 path used by
102+
* [[org.apache.spark.sql.delta.icebergShaded.IcebergConverter.refreshCatalogTableIfNeeded]])
103+
* and the [[org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog]] delegate (for
104+
* the V2 write path that goes through [[DeltaCatalog.loadTable]]). Both injections happen in
105+
* [[WriteDeltaUCCCReadIceberg.beforeEach]].
104106
*/
105-
class UCBackedDeltaCatalog extends DeltaCatalog {
106-
override def loadCatalogTable(ident: Identifier, catalogTable: CatalogTable): Table = {
107-
val forbiddenKeys = catalogTable.properties.keys.filter(_.startsWith("deltaUniformIceberg."))
107+
private class UCEnrichedSessionCatalog(
108+
spark: SparkSession,
109+
delegate: SessionCatalog,
110+
fr: FunctionRegistry,
111+
tfr: TableFunctionRegistry)
112+
extends SessionCatalog(delegate.externalCatalog, fr, tfr) {
113+
114+
setCurrentDatabase(delegate.getCurrentDatabase)
115+
116+
override def getTableMetadata(name: TableIdentifier): CatalogTable = {
117+
val base = delegate.getTableMetadata(name)
118+
val forbiddenKeys = base.properties.keys.filter(_.startsWith("deltaUniformIceberg."))
108119
assert(forbiddenKeys.isEmpty,
109120
s"deltaUniformIceberg.* keys must only appear in storage.properties, " +
110121
s"never in catalogTable.properties. Found: ${forbiddenKeys.mkString(", ")}")
111-
val enriched = UCBackedDeltaCatalog.currentCoordinator.flatMap { coordinator =>
112-
val deltaLog = DeltaLog.forTable(spark, new Path(catalogTable.location))
113-
val tableConf = deltaLog.update().metadata.coordinatedCommitsTableConf
114-
tableConf.get(UCCommitCoordinatorClient.UC_TABLE_ID_KEY).flatMap { tableId =>
115-
coordinator.getUniformMetadata(tableId)
116-
.filter(_.getIcebergMetadata.isPresent)
117-
.map { meta =>
118-
val icebergMeta = meta.getIcebergMetadata.get
119-
catalogTable.copy(storage = catalogTable.storage.copy(
120-
properties = catalogTable.storage.properties +
121-
(IcebergConstants.CATALOG_TABLE_ICEBERG_METADATA_LOCATION_PROP ->
122-
icebergMeta.getMetadataLocation) +
123-
(IcebergConstants.CATALOG_TABLE_ICEBERG_CONVERTED_DELTA_VERSION_PROP ->
124-
icebergMeta.getConvertedDeltaVersion.toString)))
125-
}
126-
}
127-
}.getOrElse(catalogTable)
128-
super.loadCatalogTable(ident, enriched)
122+
UCEnrichedSessionCatalog.currentCoordinator.flatMap { coordinator =>
123+
scala.util.Try {
124+
val deltaLog = DeltaLog.forTable(spark, new Path(base.location))
125+
val tableConf = deltaLog.unsafeVolatileSnapshot.metadata.coordinatedCommitsTableConf
126+
tableConf.get(UCCommitCoordinatorClient.UC_TABLE_ID_KEY).flatMap { tableId =>
127+
coordinator.getUniformMetadata(tableId)
128+
.filter(_.getIcebergMetadata.isPresent)
129+
.map { meta =>
130+
val icebergMeta = meta.getIcebergMetadata.get
131+
base.copy(storage = base.storage.copy(
132+
properties = base.storage.properties +
133+
(IcebergConstants.CATALOG_TABLE_ICEBERG_METADATA_LOCATION_PROP ->
134+
icebergMeta.getMetadataLocation) +
135+
(IcebergConstants.CATALOG_TABLE_ICEBERG_CONVERTED_DELTA_VERSION_PROP ->
136+
icebergMeta.getConvertedDeltaVersion.toString)
137+
))
138+
}
139+
}
140+
}.toOption.flatten
141+
}.getOrElse(base)
129142
}
130143
}
131144

132-
object UCBackedDeltaCatalog {
145+
private object UCEnrichedSessionCatalog {
133146
@volatile var currentCoordinator: Option[InMemoryUCCommitCoordinator] = None
134147
}
135148

@@ -154,7 +167,7 @@ trait WriteDeltaUCCCReadIceberg extends UniFormE2ETest
154167
override protected def sparkConf: SparkConf =
155168
super.sparkConf.set(
156169
SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key,
157-
classOf[UCBackedDeltaCatalog].getName)
170+
classOf[DeltaCatalog].getName)
158171

159172
/**
160173
* A [[UCCommitCoordinatorClient]] subclass that overrides [[registerTable]] to auto-assign
@@ -187,13 +200,15 @@ trait WriteDeltaUCCCReadIceberg extends UniFormE2ETest
187200

188201
protected var ucCommitCoordinator: InMemoryUCCommitCoordinator = _
189202
private var testCoordinator: TestUCBackedCommitCoordinator = _
203+
private var origSessionCatalog: SessionCatalog = _
204+
private var origV2Catalog: SessionCatalog = _
190205

191206
abstract override def beforeEach(): Unit = {
192207
super.beforeEach()
193208
DeltaLog.clearCache()
194209
CommitCoordinatorProvider.clearAllBuilders()
195210
ucCommitCoordinator = new InMemoryUCCommitCoordinator()
196-
UCBackedDeltaCatalog.currentCoordinator = Some(ucCommitCoordinator)
211+
UCEnrichedSessionCatalog.currentCoordinator = Some(ucCommitCoordinator)
197212
val ucClient = new InMemoryUCClient("test-metastore", ucCommitCoordinator)
198213
testCoordinator = new TestUCBackedCommitCoordinator(ucClient)
199214
CommitCoordinatorProvider.registerBuilder(new CatalogOwnedCommitCoordinatorBuilder {
@@ -205,15 +220,63 @@ trait WriteDeltaUCCCReadIceberg extends UniFormE2ETest
205220
spark: SparkSession, catalogName: String): JCommitCoordinatorClient =
206221
testCoordinator
207222
})
223+
224+
// Inject UCEnrichedSessionCatalog into two places so all catalog reads:
225+
// whether from the
226+
// V1 path (refreshCatalogTableIfNeeded -> spark.sessionState.catalog.getTableMetadata) or
227+
// V2 write path (DeltaCatalog.loadTable -> V2SessionCatalog.catalog.getTableMetadata)
228+
// see up-to-date coordinator Iceberg metadata,
229+
// mirroring what toV1Table does in production for UCDeltaCatalogClientImpl
230+
origSessionCatalog = spark.sessionState.catalog // force lazy init of the field
231+
val fr = reflectField(origSessionCatalog, "functionRegistry")
232+
.get(origSessionCatalog).asInstanceOf[FunctionRegistry]
233+
val tfr = reflectField(origSessionCatalog, "tableFunctionRegistry")
234+
.get(origSessionCatalog).asInstanceOf[TableFunctionRegistry]
235+
val enriched = new UCEnrichedSessionCatalog(spark, origSessionCatalog, fr, tfr)
236+
237+
// V1 path: spark.sessionState.catalog (used by refreshCatalogTableIfNeeded)
238+
reflectField(spark.sessionState, "catalog").set(spark.sessionState, enriched)
239+
240+
// V2 path: V2SessionCatalog holds a direct reference to SessionCatalog captured at
241+
// session construction; it is unaffected by the above swap, so replace it separately.
242+
val v2Cat = spark.sessionState.catalogManager.catalog("spark_catalog")
243+
val v2Delegate = reflectField(v2Cat, "delegate").get(v2Cat).asInstanceOf[AnyRef]
244+
val v2CatalogField = reflectField(v2Delegate, "catalog")
245+
origV2Catalog = v2CatalogField.get(v2Delegate).asInstanceOf[SessionCatalog]
246+
v2CatalogField.set(v2Delegate, enriched)
208247
}
209248

210249
abstract override def afterEach(): Unit = {
211-
UCBackedDeltaCatalog.currentCoordinator = None
250+
if (origV2Catalog != null) {
251+
val v2Cat = spark.sessionState.catalogManager.catalog("spark_catalog")
252+
val v2Delegate = reflectField(v2Cat, "delegate").get(v2Cat).asInstanceOf[AnyRef]
253+
reflectField(v2Delegate, "catalog").set(v2Delegate, origV2Catalog)
254+
origV2Catalog = null
255+
}
256+
if (origSessionCatalog != null) {
257+
reflectField(spark.sessionState, "catalog").set(spark.sessionState, origSessionCatalog)
258+
origSessionCatalog = null
259+
}
260+
UCEnrichedSessionCatalog.currentCoordinator = None
212261
CommitCoordinatorProvider.clearAllBuilders()
213262
DeltaLog.clearCache()
214263
super.afterEach()
215264
}
216265

266+
private def reflectField(obj: AnyRef, name: String): java.lang.reflect.Field = {
267+
var cls: Class[_] = obj.getClass
268+
while (cls != null) {
269+
try {
270+
val f = cls.getDeclaredField(name)
271+
f.setAccessible(true)
272+
return f
273+
} catch {
274+
case _: NoSuchFieldException => cls = cls.getSuperclass
275+
}
276+
}
277+
throw new NoSuchFieldException(s"Field '$name' not found in ${obj.getClass.getName}")
278+
}
279+
217280
/**
218281
* Returns the TBLPROPERTIES SQL fragment required to enable the UC commit coordinator.
219282
* Concrete suites should append this to their [[extraTableProperties]] override.
@@ -244,8 +307,67 @@ trait WriteDeltaUCCCReadIceberg extends UniFormE2ETest
244307
* by an in-memory UC commit coordinator, reading results via the native Iceberg reader.
245308
*/
246309
class UniFormE2EIcebergUCSuite extends UniFormE2EIcebergSuiteBase
247-
with WriteDeltaUCCCReadIceberg {
248-
// No test should go here. Please add tests in [[UniFormE2EIcebergSuiteBase]]
310+
with WriteDeltaUCCCReadIceberg {
311+
// Tests that don't require CC infrastructure should be added in [[UniFormE2EIcebergSuiteBase]].
312+
// Tests that specifically exercise the UC commit coordinator path may be added here.
249313
override def extraTableProperties(compatVersion: Int): String =
250314
super.extraTableProperties(compatVersion) + requiredTableProperties
315+
316+
test("conflict resolution refreshes catalogTable with fresh uniform metadata " +
317+
"for incremental Iceberg conversion") {
318+
val tableName = "test_cc_conflict_iceberg_refresh"
319+
withTable(tableName) {
320+
// Create a CC + UniForm table.
321+
write(
322+
s"""CREATE TABLE $tableName (id INT) USING DELTA
323+
|TBLPROPERTIES (
324+
| 'delta.columnMapping.mode' = 'name',
325+
| 'delta.enableIcebergCompatV2' = 'true',
326+
| 'delta.universalFormat.enabledFormats' = 'iceberg'
327+
| $requiredTableProperties
328+
|)""".stripMargin)
329+
330+
// v1: insert row 1 - triggers atomic Iceberg conversion at v1.
331+
write(s"INSERT INTO $tableName VALUES (1)")
332+
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName))
333+
val v1Snapshot = deltaLog.update()
334+
val v1CatalogTable =
335+
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
336+
337+
// Start a transaction with v1 snapshot and v1 Iceberg catalog.
338+
val txn = deltaLog.startTransaction(Some(v1CatalogTable), Some(v1Snapshot))
339+
340+
// v2: insert row 2 - the "winning" commit that the stale txn will conflict with.
341+
// Now v1 snapshot and v1 Iceberg catalog becomes stale so txn's commit would hit
342+
// conflict.
343+
write(s"INSERT INTO $tableName VALUES (2)")
344+
345+
// Try to commit the transaction as v2, hit a conflict, then retry as v3.
346+
// During conflict resolution, getCommits returns v2's UniformMetadata, which
347+
// refreshes catalogTable to convertedDeltaVersion=2. The retry commit then converts
348+
// only v3 incrementally (fromVersion=3, toVersion=3) rather than from stale v1.
349+
val events = Log4jUsageLogger.track {
350+
txn.commit(Seq.empty, DeltaOperations.ManualUpdate)
351+
}
352+
353+
// Latest version is now 3. There are two deltaCommitRange events: one from the failed
354+
// first attempt (v2, using stale v1 base) and one from the successful retry (v3).
355+
// The retry must use the refreshed catalog (convertedDeltaVersion=2 from v2's
356+
// UniformMetadata), so conversion covers only v3 (fromVersion=3, toVersion=3).
357+
// Without the fix, the retry would still use the stale v1 base and fromVersion would be 2.
358+
val latestVersion = deltaLog.update().version
359+
val rangeEvents = DeltaTestUtils.filterUsageRecords(
360+
events, "delta.iceberg.conversion.deltaCommitRange")
361+
assert(rangeEvents.size == 2,
362+
s"Expected 2 deltaCommitRange events (failed attempt + retry), got ${rangeEvents.size}")
363+
val retryEventData = JsonUtils.fromJson[Map[String, Any]](rangeEvents.last.blob)
364+
// Jackson deserializes small JSON integers as Int, but latestVersion is Long;
365+
// use Number.longValue for a type-safe comparison.
366+
assert(retryEventData("fromVersion").asInstanceOf[Number].longValue === latestVersion,
367+
s"Expected fromVersion=$latestVersion (fresh v2 base), " +
368+
s"got ${retryEventData("fromVersion")} (stale v1 base would give ${latestVersion - 1})")
369+
assert(retryEventData("toVersion").asInstanceOf[Number].longValue === latestVersion,
370+
s"Expected toVersion=$latestVersion")
371+
}
372+
}
251373
}

0 commit comments

Comments
 (0)