Skip to content

Commit 68fd753

Browse files
committed
fix
1 parent 2f35061 commit 68fd753

File tree

4 files changed

+16
-6
lines changed

4 files changed

+16
-6
lines changed

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with SQLCon
276276
relativeBucketPath,
277277
SerializationUtils.serializeBinaryRow(partition),
278278
bucket,
279-
filePath,
279+
new Path(filePath).getName,
280280
DeletionVector.serializeToBytes(dv)
281281
)
282282
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
308308
}
309309

310310
dvIndexFileMaintainer.notifyNewDeletionVector(
311-
new Path(sdv.dataFilePath).getName,
311+
sdv.dataFileName,
312312
DeletionVector.deserializeFromBytes(sdv.deletionVector))
313313
}
314314
val indexEntries = dvIndexFileMaintainer.persist()

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVector.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,25 @@ case class SparkDeletionVector(
3232
partitionAndBucket: String,
3333
partition: Array[Byte],
3434
bucket: Int,
35-
dataFilePath: String,
35+
dataFileName: String,
3636
deletionVector: Array[Byte]
37-
)
37+
) {
38+
def relativePath(pathFactory: FileStorePathFactory): String = {
39+
val prefix = pathFactory
40+
.relativeBucketPath(SerializationUtils.deserializeBinaryRow(partition), bucket)
41+
.toUri
42+
.toString + "/"
43+
prefix + dataFileName
44+
}
45+
}
3846

3947
object SparkDeletionVector {
4048
def toDataSplit(
4149
deletionVector: SparkDeletionVector,
4250
root: Path,
51+
pathFactory: FileStorePathFactory,
4352
dataFilePathToMeta: Map[String, SparkDataFileMeta]): DataSplit = {
44-
val meta = dataFilePathToMeta(deletionVector.dataFilePath)
53+
val meta = dataFilePathToMeta(deletionVector.relativePath(pathFactory))
4554
DataSplit
4655
.builder()
4756
.withBucketPath(root + "/" + deletionVector.partitionAndBucket)

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ case class UpdatePaimonTableCommand(
8282
logDebug("No file need to rewrote. It's an empty Commit.")
8383
Seq.empty[CommitMessage]
8484
} else {
85+
val pathFactory = fileStore.pathFactory()
8586
if (deletionVectorsEnabled) {
8687
// Step2: collect all the deletion vectors that marks the deleted rows.
8788
val deletionVectors = collectDeletionVectors(
@@ -95,7 +96,7 @@ case class UpdatePaimonTableCommand(
9596
try {
9697
// Step3: write these updated data
9798
val touchedDataSplits = deletionVectors.collect().map {
98-
SparkDeletionVector.toDataSplit(_, root, dataFilePathToMeta)
99+
SparkDeletionVector.toDataSplit(_, root, pathFactory, dataFilePathToMeta)
99100
}
100101
val addCommitMessage = writeOnlyUpdatedData(sparkSession, touchedDataSplits)
101102

0 commit comments

Comments
 (0)