Skip to content

Commit 12ee152

Browse files
authored
[Spark][Sharing] Fix Delta Sharing DataFrame not updated for Snapshot Query
Cherrypick #2574 to branch-3.1 Fix Delta Sharing DataFrame not updated for Snapshot Query: When provider updated the table(insert or delete row), in delta-sharing-spark session, the dataframe on the same query for the same table is not updated, though a new rpc is made, the local delta log is updated, but it’s still at version 0.
1 parent 121c1c8 commit 12ee152

File tree

6 files changed

+103
-12
lines changed

6 files changed

+103
-12
lines changed

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ lazy val sharing = (project in file("sharing"))
224224
libraryDependencies ++= Seq(
225225
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
226226

227-
"io.delta" %% "delta-sharing-client" % "1.0.3",
227+
"io.delta" %% "delta-sharing-client" % "1.0.4",
228228

229229
// Test deps
230230
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",

sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ private[sharing] class DeltaSharingDataSource
391391
val params = new DeltaSharingFileIndexParams(
392392
new Path(path),
393393
spark,
394-
deltaSharingTableMetadata.metadata,
394+
deltaSharingTableMetadata,
395395
options
396396
)
397397
if (ConfUtils.limitPushdownEnabled(spark.sessionState.conf)) {

sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingFileIndex.scala

+9-6
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.sql.types.StructType
3636
private[sharing] case class DeltaSharingFileIndexParams(
3737
path: Path,
3838
spark: SparkSession,
39-
metadata: model.DeltaSharingMetadata,
39+
deltaSharingTableMetadata: DeltaSharingUtils.DeltaSharingTableMetadata,
4040
options: DeltaSharingOptions)
4141

4242
/**
@@ -62,18 +62,20 @@ case class DeltaSharingFileIndex(
6262
override def refresh(): Unit = {}
6363

6464
override def sizeInBytes: Long =
65-
Option(params.metadata.size).getOrElse {
65+
Option(params.deltaSharingTableMetadata.metadata.size).getOrElse {
6666
// Throw error if metadata.size is not returned, to urge the server to respond a table size.
6767
throw new IllegalStateException(
6868
"size is null in the metadata returned from the delta " +
69-
s"sharing server: ${params.metadata}."
69+
s"sharing server: ${params.deltaSharingTableMetadata.metadata}."
7070
)
7171
}
7272

73-
override def partitionSchema: StructType = params.metadata.partitionSchema
73+
override def partitionSchema: StructType =
74+
params.deltaSharingTableMetadata.metadata.partitionSchema
7475

7576
// Returns the partition columns of the shared delta table based on the returned metadata.
76-
def partitionColumns: Seq[String] = params.metadata.deltaMetadata.partitionColumns
77+
def partitionColumns: Seq[String] =
78+
params.deltaSharingTableMetadata.metadata.deltaMetadata.partitionColumns
7779

7880
override def rootPaths: Seq[Path] = params.path :: Nil
7981

@@ -98,7 +100,8 @@ case class DeltaSharingFileIndex(
98100
// keeps the string the same for the same filters.
99101
partitionFilters.map(_.sql).mkString(";"),
100102
dataFilters.map(_.sql).mkString(";"),
101-
jsonPredicateHints.getOrElse("")
103+
jsonPredicateHints.getOrElse(""),
104+
params.deltaSharingTableMetadata.version
102105
)
103106
val tablePathWithHashIdSuffix = DeltaSharingUtils.getTablePathWithIdSuffix(
104107
queryCustomTablePath,

sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -276,9 +276,10 @@ object DeltaSharingUtils extends Logging {
276276
options: DeltaSharingOptions,
277277
partitionFiltersString: String,
278278
dataFiltersString: String,
279-
jsonPredicateHints: String): String = {
279+
jsonPredicateHints: String,
280+
version: Long): String = {
280281
val fullQueryString = s"${options.versionAsOf}_${options.timestampAsOf}_" +
281-
s"${partitionFiltersString}_${dataFiltersString}_${jsonPredicateHints}"
282+
s"${partitionFiltersString}_${dataFiltersString}_${jsonPredicateHints}_${version}"
282283
Hashing.sha256().hashString(fullQueryString, UTF_8).toString
283284
}
284285

sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala

+81
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,87 @@ trait DeltaSharingDataSourceDeltaSuiteBase
199199
}
200200
}
201201

202+
test("DeltaSharingDataSource able to read data with changes") {
203+
withTempDir { tempDir =>
204+
val deltaTableName = "delta_table_change"
205+
206+
def test(tablePath: String, expectedCount: Int, expectedSchema: StructType): Unit = {
207+
assert(
208+
expectedSchema == spark.read
209+
.format("deltaSharing")
210+
.option("responseFormat", "delta")
211+
.load(tablePath)
212+
.schema
213+
)
214+
215+
val deltaDf = spark.read.format("delta").table(deltaTableName)
216+
val sharingDf =
217+
spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath)
218+
checkAnswer(deltaDf, sharingDf)
219+
assert(sharingDf.count() == expectedCount)
220+
}
221+
222+
withTable(deltaTableName) {
223+
val sharedTableName = "shared_table_change"
224+
createTable(deltaTableName)
225+
226+
// test 1: insert 2 rows
227+
sql(
228+
s"INSERT INTO $deltaTableName" +
229+
""" VALUES (1, "one", "2023-01-01", "2023-01-01 00:00:00"),
230+
|(2, "two", "2023-02-02", "2023-02-02 00:00:00")""".stripMargin
231+
)
232+
prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName)
233+
prepareMockedClientGetTableVersion(deltaTableName, sharedTableName)
234+
val expectedSchema: StructType = new StructType()
235+
.add("c1", IntegerType)
236+
.add("c2", StringType)
237+
.add("c3", DateType)
238+
.add("c4", TimestampType)
239+
withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) {
240+
val profileFile = prepareProfileFile(tempDir)
241+
val tableName = s"share1.default.$sharedTableName"
242+
test(s"${profileFile.getCanonicalPath}#$tableName", 2, expectedSchema)
243+
}
244+
245+
// test 2: insert 2 more rows, and rename a column
246+
spark.sql(
247+
s"""ALTER TABLE $deltaTableName SET TBLPROPERTIES('delta.minReaderVersion' = '2',
248+
|'delta.minWriterVersion' = '5',
249+
|'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true)""".stripMargin
250+
)
251+
sql(
252+
s"INSERT INTO $deltaTableName" +
253+
""" VALUES (3, "three", "2023-03-03", "2023-03-03 00:00:00"),
254+
|(4, "four", "2023-04-04", "2023-04-04 00:00:00")""".stripMargin
255+
)
256+
sql(s"""ALTER TABLE $deltaTableName RENAME COLUMN c3 TO c3rename""")
257+
prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName)
258+
prepareMockedClientGetTableVersion(deltaTableName, sharedTableName)
259+
val expectedNewSchema: StructType = new StructType()
260+
.add("c1", IntegerType)
261+
.add("c2", StringType)
262+
.add("c3rename", DateType)
263+
.add("c4", TimestampType)
264+
withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) {
265+
val profileFile = prepareProfileFile(tempDir)
266+
val tableName = s"share1.default.$sharedTableName"
267+
test(s"${profileFile.getCanonicalPath}#$tableName", 4, expectedNewSchema)
268+
}
269+
270+
// test 3: delete 1 row
271+
sql(s"DELETE FROM $deltaTableName WHERE c1 = 2")
272+
prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName)
273+
prepareMockedClientGetTableVersion(deltaTableName, sharedTableName)
274+
withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) {
275+
val profileFile = prepareProfileFile(tempDir)
276+
val tableName = s"share1.default.$sharedTableName"
277+
test(s"${profileFile.getCanonicalPath}#$tableName", 3, expectedNewSchema)
278+
}
279+
}
280+
}
281+
}
282+
202283
test("DeltaSharingDataSource able to auto resolve responseFormat") {
203284
withTempDir { tempDir =>
204285
val deltaTableName = "delta_table_auto"

sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingFileIndexSuite.scala

+8-2
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ private object TestUtils {
5959
}
6060

6161
// scalastyle:off line.size.limit
62+
val protocolStr =
63+
"""{"protocol":{"deltaProtocol":{"minReaderVersion": 1, "minWriterVersion": 1}}}"""
6264
val metaDataStr =
6365
"""{"metaData":{"size":809,"deltaMetadata":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c2"],"configuration":{},"createdTime":1691734718560}}}"""
6466
val metaDataWithoutSizeStr =
@@ -148,7 +150,7 @@ class TestDeltaSharingClientForFileIndex(
148150
DeltaTableFiles(
149151
version = 0,
150152
lines = Seq[String](
151-
"""{"protocol":{"deltaProtocol":{"minReaderVersion": 1, "minWriterVersion": 1}}}""",
153+
protocolStr,
152154
metaDataStr,
153155
getAddFileStr1(paths(numGetFileCalls.min(1)), urlExpirationMsOpt),
154156
getAddFileStr2(urlExpirationMsOpt)
@@ -220,7 +222,11 @@ class DeltaSharingFileIndexSuite
220222
val params = new DeltaSharingFileIndexParams(
221223
tablePath,
222224
spark,
223-
getMockedDeltaSharingMetadata(metaData),
225+
DeltaSharingUtils.DeltaSharingTableMetadata(
226+
version = 0,
227+
protocol = JsonUtils.fromJson[model.DeltaSharingSingleAction](protocolStr).protocol,
228+
metadata = getMockedDeltaSharingMetadata(metaData)
229+
),
224230
new DeltaSharingOptions(Map("path" -> tablePath.toString))
225231
)
226232
val dsTable = Table(share = shareName, schema = schemaName, name = sharedTableName)

0 commit comments

Comments
 (0)