@@ -20,6 +20,7 @@ import java.util.{Collections, Optional, UUID}
2020
2121import scala .collection .JavaConverters ._
2222
23+ import com .databricks .spark .util .Log4jUsageLogger
2324import io .delta .storage .commit .{CommitCoordinatorClient => JCommitCoordinatorClient }
2425import io .delta .storage .commit .{TableIdentifier => UCTableIdentifier }
2526import io .delta .storage .commit .actions .{AbstractMetadata , AbstractProtocol }
@@ -29,9 +30,9 @@ import org.apache.hadoop.fs.Path
2930import org .apache .spark .{SparkConf , SparkSessionSwitch }
3031import org .apache .spark .sql .{Row , SparkSession }
3132import org .apache .spark .sql .catalyst .TableIdentifier
32- import org .apache .spark .sql .catalyst .catalog . CatalogTable
33- import org .apache .spark .sql .connector .catalog .{Identifier , Table }
34- import org .apache .spark .sql .delta .{DeltaLog , IcebergConstants }
33+ import org .apache .spark .sql .catalyst .analysis .{ FunctionRegistry , TableFunctionRegistry }
34+ import org .apache .spark .sql .catalyst .catalog .{CatalogTable , SessionCatalog }
35+ import org .apache .spark .sql .delta .{DeltaLog , DeltaOperations , DeltaTestUtils , IcebergConstants }
3536import org .apache .spark .sql .delta .DeltaConfigs .{
3637 COORDINATED_COMMITS_COORDINATOR_CONF ,
3738 COORDINATED_COMMITS_COORDINATOR_NAME
@@ -45,7 +46,6 @@ import org.apache.spark.sql.delta.coordinatedcommits.{
4546 InMemoryUCCommitCoordinator ,
4647 UCCommitCoordinatorBuilder
4748}
48- import org .apache .spark .sql .delta .icebergShaded .IcebergConverter
4949import org .apache .spark .sql .delta .sources .DeltaSQLConf
5050import org .apache .spark .sql .delta .test .DeltaSQLCommandTest
5151import org .apache .spark .sql .delta .uniform .hms .HMSTest
@@ -95,41 +95,54 @@ trait WriteDeltaHMSReadIceberg extends UniFormE2ETest
9595}
9696
9797/**
98- * A [[DeltaCatalog ]] subclass that enriches [[CatalogTable ]] with the last converted Iceberg
99- * metadata from [[InMemoryUCCommitCoordinator ]] before loading the table. This simulates what
100- * the real UC REST catalog does: injecting
101- * [[IcebergConstants.CATALOG_TABLE_ICEBERG_METADATA_LOCATION_PROP ]]
102- * and [[IcebergConstants.CATALOG_TABLE_ICEBERG_CONVERTED_DELTA_VERSION_PROP ]] into
103- * catalog storage properties, matching the UC REST path in [[UCDeltaCatalogClientImpl ]].
98+ * A [[SessionCatalog ]] wrapper that overrides [[getTableMetadata ]] to inject fresh Iceberg
99+ * metadata from [[UCEnrichedSessionCatalog.currentCoordinator ]] on every call. This mirrors what
100+ * [[UCDeltaCatalogClientImpl.toV1Table ]] does in production. Installed via reflection into
101+ * both [[org.apache.spark.sql.internal.SessionState.catalog ]] (for the V1 path used by
102+ * [[org.apache.spark.sql.delta.icebergShaded.IcebergConverter.refreshCatalogTableIfNeeded ]])
103+ * and the [[org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog ]] delegate (for
104+ * the V2 write path that goes through [[DeltaCatalog.loadTable ]]). Both injections happen in
105+ * [[WriteDeltaUCCCReadIceberg.beforeEach ]].
104106 */
105- class UCBackedDeltaCatalog extends DeltaCatalog {
106- override def loadCatalogTable (ident : Identifier , catalogTable : CatalogTable ): Table = {
107- val forbiddenKeys = catalogTable.properties.keys.filter(_.startsWith(" deltaUniformIceberg." ))
107+ private class UCEnrichedSessionCatalog (
108+ spark : SparkSession ,
109+ delegate : SessionCatalog ,
110+ fr : FunctionRegistry ,
111+ tfr : TableFunctionRegistry )
112+ extends SessionCatalog (delegate.externalCatalog, fr, tfr) {
113+
114+ setCurrentDatabase(delegate.getCurrentDatabase)
115+
116+ override def getTableMetadata (name : TableIdentifier ): CatalogTable = {
117+ val base = delegate.getTableMetadata(name)
118+ val forbiddenKeys = base.properties.keys.filter(_.startsWith(" deltaUniformIceberg." ))
108119 assert(forbiddenKeys.isEmpty,
109120 s " deltaUniformIceberg.* keys must only appear in storage.properties, " +
110121 s " never in catalogTable.properties. Found: ${forbiddenKeys.mkString(" , " )}" )
111- val enriched = UCBackedDeltaCatalog .currentCoordinator.flatMap { coordinator =>
112- val deltaLog = DeltaLog .forTable(spark, new Path (catalogTable.location))
113- val tableConf = deltaLog.update().metadata.coordinatedCommitsTableConf
114- tableConf.get(UCCommitCoordinatorClient .UC_TABLE_ID_KEY ).flatMap { tableId =>
115- coordinator.getUniformMetadata(tableId)
116- .filter(_.getIcebergMetadata.isPresent)
117- .map { meta =>
118- val icebergMeta = meta.getIcebergMetadata.get
119- catalogTable.copy(storage = catalogTable.storage.copy(
120- properties = catalogTable.storage.properties +
121- (IcebergConstants .CATALOG_TABLE_ICEBERG_METADATA_LOCATION_PROP ->
122- icebergMeta.getMetadataLocation) +
123- (IcebergConstants .CATALOG_TABLE_ICEBERG_CONVERTED_DELTA_VERSION_PROP ->
124- icebergMeta.getConvertedDeltaVersion.toString)))
125- }
126- }
127- }.getOrElse(catalogTable)
128- super .loadCatalogTable(ident, enriched)
122+ UCEnrichedSessionCatalog .currentCoordinator.flatMap { coordinator =>
123+ scala.util.Try {
124+ val deltaLog = DeltaLog .forTable(spark, new Path (base.location))
125+ val tableConf = deltaLog.unsafeVolatileSnapshot.metadata.coordinatedCommitsTableConf
126+ tableConf.get(UCCommitCoordinatorClient .UC_TABLE_ID_KEY ).flatMap { tableId =>
127+ coordinator.getUniformMetadata(tableId)
128+ .filter(_.getIcebergMetadata.isPresent)
129+ .map { meta =>
130+ val icebergMeta = meta.getIcebergMetadata.get
131+ base.copy(storage = base.storage.copy(
132+ properties = base.storage.properties +
133+ (IcebergConstants .CATALOG_TABLE_ICEBERG_METADATA_LOCATION_PROP ->
134+ icebergMeta.getMetadataLocation) +
135+ (IcebergConstants .CATALOG_TABLE_ICEBERG_CONVERTED_DELTA_VERSION_PROP ->
136+ icebergMeta.getConvertedDeltaVersion.toString)
137+ ))
138+ }
139+ }
140+ }.toOption.flatten
141+ }.getOrElse(base)
129142 }
130143}
131144
132- object UCBackedDeltaCatalog {
145+ private object UCEnrichedSessionCatalog {
133146 @ volatile var currentCoordinator : Option [InMemoryUCCommitCoordinator ] = None
134147}
135148
@@ -154,7 +167,7 @@ trait WriteDeltaUCCCReadIceberg extends UniFormE2ETest
154167 override protected def sparkConf : SparkConf =
155168 super .sparkConf.set(
156169 SQLConf .V2_SESSION_CATALOG_IMPLEMENTATION .key,
157- classOf [UCBackedDeltaCatalog ].getName)
170+ classOf [DeltaCatalog ].getName)
158171
159172 /**
160173 * A [[UCCommitCoordinatorClient ]] subclass that overrides [[registerTable ]] to auto-assign
@@ -187,13 +200,15 @@ trait WriteDeltaUCCCReadIceberg extends UniFormE2ETest
187200
188201 protected var ucCommitCoordinator : InMemoryUCCommitCoordinator = _
189202 private var testCoordinator : TestUCBackedCommitCoordinator = _
203+ private var origSessionCatalog : SessionCatalog = _
204+ private var origV2Catalog : SessionCatalog = _
190205
191206 abstract override def beforeEach (): Unit = {
192207 super .beforeEach()
193208 DeltaLog .clearCache()
194209 CommitCoordinatorProvider .clearAllBuilders()
195210 ucCommitCoordinator = new InMemoryUCCommitCoordinator ()
196- UCBackedDeltaCatalog .currentCoordinator = Some (ucCommitCoordinator)
211+ UCEnrichedSessionCatalog .currentCoordinator = Some (ucCommitCoordinator)
197212 val ucClient = new InMemoryUCClient (" test-metastore" , ucCommitCoordinator)
198213 testCoordinator = new TestUCBackedCommitCoordinator (ucClient)
199214 CommitCoordinatorProvider .registerBuilder(new CatalogOwnedCommitCoordinatorBuilder {
@@ -205,15 +220,63 @@ trait WriteDeltaUCCCReadIceberg extends UniFormE2ETest
205220 spark : SparkSession , catalogName : String ): JCommitCoordinatorClient =
206221 testCoordinator
207222 })
223+
224+ // Inject UCEnrichedSessionCatalog into two places so all catalog reads:
225+ // whether from the
226+ // V1 path (refreshCatalogTableIfNeeded -> spark.sessionState.catalog.getTableMetadata) or
227+ // V2 write path (DeltaCatalog.loadTable -> V2SessionCatalog.catalog.getTableMetadata)
228+ // see up-to-date coordinator Iceberg metadata,
229+ // mirroring what toV1Table does in production for UCDeltaCatalogClientImpl
230+ origSessionCatalog = spark.sessionState.catalog // force lazy init of the field
231+ val fr = reflectField(origSessionCatalog, " functionRegistry" )
232+ .get(origSessionCatalog).asInstanceOf [FunctionRegistry ]
233+ val tfr = reflectField(origSessionCatalog, " tableFunctionRegistry" )
234+ .get(origSessionCatalog).asInstanceOf [TableFunctionRegistry ]
235+ val enriched = new UCEnrichedSessionCatalog (spark, origSessionCatalog, fr, tfr)
236+
237+ // V1 path: spark.sessionState.catalog (used by refreshCatalogTableIfNeeded)
238+ reflectField(spark.sessionState, " catalog" ).set(spark.sessionState, enriched)
239+
240+ // V2 path: V2SessionCatalog holds a direct reference to SessionCatalog captured at
241+ // session construction; it is unaffected by the above swap, so replace it separately.
242+ val v2Cat = spark.sessionState.catalogManager.catalog(" spark_catalog" )
243+ val v2Delegate = reflectField(v2Cat, " delegate" ).get(v2Cat).asInstanceOf [AnyRef ]
244+ val v2CatalogField = reflectField(v2Delegate, " catalog" )
245+ origV2Catalog = v2CatalogField.get(v2Delegate).asInstanceOf [SessionCatalog ]
246+ v2CatalogField.set(v2Delegate, enriched)
208247 }
209248
210249 abstract override def afterEach (): Unit = {
211- UCBackedDeltaCatalog .currentCoordinator = None
250+ if (origV2Catalog != null ) {
251+ val v2Cat = spark.sessionState.catalogManager.catalog(" spark_catalog" )
252+ val v2Delegate = reflectField(v2Cat, " delegate" ).get(v2Cat).asInstanceOf [AnyRef ]
253+ reflectField(v2Delegate, " catalog" ).set(v2Delegate, origV2Catalog)
254+ origV2Catalog = null
255+ }
256+ if (origSessionCatalog != null ) {
257+ reflectField(spark.sessionState, " catalog" ).set(spark.sessionState, origSessionCatalog)
258+ origSessionCatalog = null
259+ }
260+ UCEnrichedSessionCatalog .currentCoordinator = None
212261 CommitCoordinatorProvider .clearAllBuilders()
213262 DeltaLog .clearCache()
214263 super .afterEach()
215264 }
216265
266+ private def reflectField (obj : AnyRef , name : String ): java.lang.reflect.Field = {
267+ var cls : Class [_] = obj.getClass
268+ while (cls != null ) {
269+ try {
270+ val f = cls.getDeclaredField(name)
271+ f.setAccessible(true )
272+ return f
273+ } catch {
274+ case _ : NoSuchFieldException => cls = cls.getSuperclass
275+ }
276+ }
277+ throw new NoSuchFieldException (s " Field ' $name' not found in ${obj.getClass.getName}" )
278+ }
279+
217280 /**
218281 * Returns the TBLPROPERTIES SQL fragment required to enable the UC commit coordinator.
219282 * Concrete suites should append this to their [[extraTableProperties ]] override.
@@ -244,8 +307,67 @@ trait WriteDeltaUCCCReadIceberg extends UniFormE2ETest
244307 * by an in-memory UC commit coordinator, reading results via the native Iceberg reader.
245308 */
246309class UniFormE2EIcebergUCSuite extends UniFormE2EIcebergSuiteBase
247- with WriteDeltaUCCCReadIceberg {
248- // No test should go here. Please add tests in [[UniFormE2EIcebergSuiteBase]]
310+ with WriteDeltaUCCCReadIceberg {
311+ // Tests that don't require CC infrastructure should be added in [[UniFormE2EIcebergSuiteBase]].
312+ // Tests that specifically exercise the UC commit coordinator path may be added here.
249313 override def extraTableProperties (compatVersion : Int ): String =
250314 super .extraTableProperties(compatVersion) + requiredTableProperties
315+
316+ test(" conflict resolution refreshes catalogTable with fresh uniform metadata " +
317+ " for incremental Iceberg conversion" ) {
318+ val tableName = " test_cc_conflict_iceberg_refresh"
319+ withTable(tableName) {
320+ // Create a CC + UniForm table.
321+ write(
322+ s """ CREATE TABLE $tableName (id INT) USING DELTA
323+ |TBLPROPERTIES (
324+ | 'delta.columnMapping.mode' = 'name',
325+ | 'delta.enableIcebergCompatV2' = 'true',
326+ | 'delta.universalFormat.enabledFormats' = 'iceberg'
327+ | $requiredTableProperties
328+ |) """ .stripMargin)
329+
330+ // v1: insert row 1 - triggers atomic Iceberg conversion at v1.
331+ write(s " INSERT INTO $tableName VALUES (1) " )
332+ val deltaLog = DeltaLog .forTable(spark, TableIdentifier (tableName))
333+ val v1Snapshot = deltaLog.update()
334+ val v1CatalogTable =
335+ spark.sessionState.catalog.getTableMetadata(TableIdentifier (tableName))
336+
337+ // Start a transaction with v1 snapshot and v1 Iceberg catalog.
338+ val txn = deltaLog.startTransaction(Some (v1CatalogTable), Some (v1Snapshot))
339+
340+ // v2: insert row 2 - the "winning" commit that the stale txn will conflict with.
341+ // Now v1 snapshot and v1 Iceberg catalog becomes stale so txn's commit would hit
342+ // conflict.
343+ write(s " INSERT INTO $tableName VALUES (2) " )
344+
345+ // Try to commit the transaction as v2, hit a conflict, then retry as v3.
346+ // During conflict resolution, getCommits returns v2's UniformMetadata, which
347+ // refreshes catalogTable to convertedDeltaVersion=2. The retry commit then converts
348+ // only v3 incrementally (fromVersion=3, toVersion=3) rather than from stale v1.
349+ val events = Log4jUsageLogger .track {
350+ txn.commit(Seq .empty, DeltaOperations .ManualUpdate )
351+ }
352+
353+ // Latest version is now 3. There are two deltaCommitRange events: one from the failed
354+ // first attempt (v2, using stale v1 base) and one from the successful retry (v3).
355+ // The retry must use the refreshed catalog (convertedDeltaVersion=2 from v2's
356+ // UniformMetadata), so conversion covers only v3 (fromVersion=3, toVersion=3).
357+ // Without the fix, the retry would still use the stale v1 base and fromVersion would be 2.
358+ val latestVersion = deltaLog.update().version
359+ val rangeEvents = DeltaTestUtils .filterUsageRecords(
360+ events, " delta.iceberg.conversion.deltaCommitRange" )
361+ assert(rangeEvents.size == 2 ,
362+ s " Expected 2 deltaCommitRange events (failed attempt + retry), got ${rangeEvents.size}" )
363+ val retryEventData = JsonUtils .fromJson[Map [String , Any ]](rangeEvents.last.blob)
364+ // Jackson deserializes small JSON integers as Int, but latestVersion is Long;
365+ // use Number.longValue for a type-safe comparison.
366+ assert(retryEventData(" fromVersion" ).asInstanceOf [Number ].longValue === latestVersion,
367+ s " Expected fromVersion= $latestVersion (fresh v2 base), " +
368+ s " got ${retryEventData(" fromVersion" )} (stale v1 base would give ${latestVersion - 1 }) " )
369+ assert(retryEventData(" toVersion" ).asInstanceOf [Number ].longValue === latestVersion,
370+ s " Expected toVersion= $latestVersion" )
371+ }
372+ }
251373}
0 commit comments