Skip to content

Commit 303e625

Browse files
committed
fix tests
1 parent 4ec8a52 commit 303e625

File tree

8 files changed

+41
-34
lines changed

8 files changed

+41
-34
lines changed

paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ protected List<String> ddl() {
3939

4040
@Test
4141
public void testCompactionInStreamingMode() throws Exception {
42-
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')");
43-
batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')");
42+
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '4')");
4443
batchSql("ALTER TABLE append_table SET ('continuous.discovery-interval' = '1 s')");
4544

4645
sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(500));

paimon-flink/paimon-flink-1.19/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ protected List<String> ddl() {
3939

4040
@Test
4141
public void testCompactionInStreamingMode() throws Exception {
42-
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')");
43-
batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')");
42+
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '4')");
4443
batchSql("ALTER TABLE append_table SET ('continuous.discovery-interval' = '1 s')");
4544

4645
sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(500));

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -342,8 +342,7 @@ public void testIngestFromSource() {
342342

343343
@Test
344344
public void testAutoCompaction() {
345-
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')");
346-
batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')");
345+
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '4')");
347346

348347
assertAutoCompaction(
349348
"INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')",

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,7 @@ public void testIngestFromSource() {
150150

151151
@Test
152152
public void testNoCompactionInBatchMode() {
153-
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')");
154-
batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')");
153+
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '4')");
155154

156155
assertExecuteExpected(
157156
"INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')",
@@ -212,8 +211,7 @@ public void testNoCompactionInBatchMode() {
212211

213212
@Test
214213
public void testCompactionInStreamingMode() throws Exception {
215-
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')");
216-
batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')");
214+
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '4')");
217215
batchSql("ALTER TABLE append_table SET ('continuous.discovery-interval' = '1 s')");
218216

219217
sEnv.getConfig()
@@ -237,8 +235,7 @@ public void testCompactionInStreamingMode() throws Exception {
237235

238236
@Test
239237
public void testCompactionInStreamingModeWithMaxWatermark() throws Exception {
240-
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')");
241-
batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')");
238+
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '4')");
242239
batchSql("ALTER TABLE append_table SET ('continuous.discovery-interval' = '1 s')");
243240

244241
sEnv.getConfig()

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ case class DeleteFromPaimonTableCommand(
5050
with ExpressionHelper
5151
with SupportsSubquery {
5252

53-
private lazy val writer = PaimonSparkWriter(table)
54-
5553
override def run(sparkSession: SparkSession): Seq[Row] = {
5654

5755
val commit = table.newBatchWriteBuilder().newCommit()
@@ -96,15 +94,15 @@ case class DeleteFromPaimonTableCommand(
9694
if (dropPartitions.nonEmpty) {
9795
commit.truncatePartitions(dropPartitions.asJava)
9896
} else {
99-
writer.commit(Seq.empty)
97+
dvSafeWriter.commit(Seq.empty)
10098
}
10199
} else {
102100
val commitMessages = if (usePrimaryKeyDelete()) {
103101
performPrimaryKeyDelete(sparkSession)
104102
} else {
105103
performNonPrimaryKeyDelete(sparkSession)
106104
}
107-
writer.commit(commitMessages)
105+
dvSafeWriter.commit(commitMessages)
108106
}
109107
}
110108

@@ -118,7 +116,7 @@ case class DeleteFromPaimonTableCommand(
118116
private def performPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = {
119117
val df = createDataset(sparkSession, Filter(condition, relation))
120118
.withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue))
121-
writer.write(df)
119+
dvSafeWriter.write(df)
122120
}
123121

124122
private def performNonPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = {
@@ -136,7 +134,7 @@ case class DeleteFromPaimonTableCommand(
136134
sparkSession)
137135

138136
// Step3: update the touched deletion vectors and index files
139-
writer.persistDeletionVectors(deletionVectors)
137+
dvSafeWriter.persistDeletionVectors(deletionVectors)
140138
} else {
141139
// Step2: extract out the exactly files, which must have at least one record to be updated.
142140
val touchedFilePaths =
@@ -151,7 +149,7 @@ case class DeleteFromPaimonTableCommand(
151149
val data = createDataset(sparkSession, toRewriteScanRelation)
152150

153151
// only write new files, should have no compaction
154-
val addCommitMessage = writer.writeOnly().write(data)
152+
val addCommitMessage = dvSafeWriter.writeOnly().write(data)
155153

156154
// Step5: convert the deleted files that need to be written to commit message.
157155
val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ case class MergeIntoPaimonTable(
6262

6363
lazy val tableSchema: StructType = v2Table.schema
6464

65-
private lazy val writer = PaimonSparkWriter(table)
66-
6765
private lazy val (targetOnlyCondition, filteredTargetPlan): (Option[Expression], LogicalPlan) = {
6866
val filtersOnlyTarget = getExpressionOnlyRelated(mergeCondition, targetTable)
6967
(
@@ -81,12 +79,12 @@ case class MergeIntoPaimonTable(
8179
} else {
8280
performMergeForNonPkTable(sparkSession)
8381
}
84-
writer.commit(commitMessages)
82+
dvSafeWriter.commit(commitMessages)
8583
Seq.empty[Row]
8684
}
8785

8886
private def performMergeForPkTable(sparkSession: SparkSession): Seq[CommitMessage] = {
89-
writer.write(
87+
dvSafeWriter.write(
9088
constructChangedRows(
9189
sparkSession,
9290
createDataset(sparkSession, filteredTargetPlan),
@@ -128,14 +126,14 @@ case class MergeIntoPaimonTable(
128126
val dvDS = ds.where(
129127
s"$ROW_KIND_COL = ${RowKind.DELETE.toByteValue} or $ROW_KIND_COL = ${RowKind.UPDATE_AFTER.toByteValue}")
130128
val deletionVectors = collectDeletionVectors(dataFilePathToMeta, dvDS, sparkSession)
131-
val indexCommitMsg = writer.persistDeletionVectors(deletionVectors)
129+
val indexCommitMsg = dvSafeWriter.persistDeletionVectors(deletionVectors)
132130

133131
// Step4: filter rows that should be written as the inserted/updated data.
134132
val toWriteDS = ds
135133
.where(
136134
s"$ROW_KIND_COL = ${RowKind.INSERT.toByteValue} or $ROW_KIND_COL = ${RowKind.UPDATE_AFTER.toByteValue}")
137135
.drop(FILE_PATH_COLUMN, ROW_INDEX_COLUMN)
138-
val addCommitMessage = writer.write(toWriteDS)
136+
val addCommitMessage = dvSafeWriter.write(toWriteDS)
139137

140138
// Step5: commit index and data commit messages
141139
addCommitMessage ++ indexCommitMsg
@@ -192,7 +190,7 @@ case class MergeIntoPaimonTable(
192190

193191
val toWriteDS =
194192
constructChangedRows(sparkSession, targetDSWithFileTouchedCol).drop(ROW_KIND_COL)
195-
val addCommitMessage = writer.write(toWriteDS)
193+
val addCommitMessage = dvSafeWriter.write(toWriteDS)
196194
val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
197195

198196
addCommitMessage ++ deletedCommitMessage

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.spark.commands
2020

21+
import org.apache.paimon.CoreOptions
2122
import org.apache.paimon.deletionvectors.BitmapDeletionVector
2223
import org.apache.paimon.fs.Path
2324
import org.apache.paimon.index.IndexFileMeta
@@ -27,7 +28,7 @@ import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
2728
import org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
2829
import org.apache.paimon.spark.schema.PaimonMetadataColumn
2930
import org.apache.paimon.spark.schema.PaimonMetadataColumn._
30-
import org.apache.paimon.table.{FileStoreTable, KnownSplitsTable}
31+
import org.apache.paimon.table.{BucketMode, FileStoreTable, KnownSplitsTable}
3132
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
3233
import org.apache.paimon.table.source.DataSplit
3334
import org.apache.paimon.types.RowType
@@ -50,6 +51,24 @@ import scala.collection.JavaConverters._
5051
/** Helper trait for all paimon commands. */
5152
trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with SQLConfHelper {
5253

54+
lazy val dvSafeWriter: PaimonSparkWriter = {
55+
if (table.primaryKeys().isEmpty && table.bucketMode() == BucketMode.HASH_FIXED) {
56+
57+
/**
58+
* Writer without compaction, note that some operations may generate Deletion Vectors, and
59+
* writing may occur at the same time as generating deletion vectors. If compaction occurs at
60+
* this time, it will cause the file that deletion vectors are working on to no longer exist,
61+
* resulting in an error.
62+
*
63+
* For example: Update bucketed append table with deletion vectors enabled
64+
*/
65+
PaimonSparkWriter(table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true")))
66+
} else {
67+
PaimonSparkWriter(table)
68+
}
69+
70+
}
71+
5372
/**
5473
* For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call the `truncate`
5574
* methods where the `AlwaysTrue` Filter is used.

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ case class UpdatePaimonTableCommand(
4545
with AssignmentAlignmentHelper
4646
with SupportsSubquery {
4747

48-
private lazy val writer = PaimonSparkWriter(table)
49-
5048
private lazy val updateExpressions = {
5149
generateAlignedExpressions(relation.output, assignments).zip(relation.output).map {
5250
case (expr, attr) => Alias(expr, attr.name)()
@@ -60,7 +58,7 @@ case class UpdatePaimonTableCommand(
6058
} else {
6159
performUpdateForNonPkTable(sparkSession)
6260
}
63-
writer.commit(commitMessages)
61+
dvSafeWriter.commit(commitMessages)
6462

6563
Seq.empty[Row]
6664
}
@@ -70,7 +68,7 @@ case class UpdatePaimonTableCommand(
7068
val updatedPlan = Project(updateExpressions, Filter(condition, relation))
7169
val df = createDataset(sparkSession, updatedPlan)
7270
.withColumn(ROW_KIND_COL, lit(RowKind.UPDATE_AFTER.toByteValue))
73-
writer.write(df)
71+
dvSafeWriter.write(df)
7472
}
7573

7674
/** Update for table without primary keys */
@@ -103,7 +101,7 @@ case class UpdatePaimonTableCommand(
103101
val addCommitMessage = writeOnlyUpdatedData(sparkSession, touchedDataSplits)
104102

105103
// Step4: write these deletion vectors.
106-
val indexCommitMsg = writer.persistDeletionVectors(deletionVectors)
104+
val indexCommitMsg = dvSafeWriter.persistDeletionVectors(deletionVectors)
107105

108106
addCommitMessage ++ indexCommitMsg
109107
} finally {
@@ -144,7 +142,7 @@ case class UpdatePaimonTableCommand(
144142
Filter(condition, toUpdateScanRelation)
145143
}
146144
val data = createDataset(sparkSession, newPlan).select(updateColumns: _*)
147-
writer.write(data)
145+
dvSafeWriter.write(data)
148146
}
149147

150148
private def writeUpdatedAndUnchangedData(
@@ -161,6 +159,6 @@ case class UpdatePaimonTableCommand(
161159
}
162160

163161
val data = createDataset(sparkSession, toUpdateScanRelation).select(updateColumns: _*)
164-
writer.write(data)
162+
dvSafeWriter.write(data)
165163
}
166164
}

0 commit comments

Comments
 (0)