Skip to content

Commit 8590ff4

Browse files
committed
[core] Validate retract records in PostponeBucketWriter
1 parent 52e9d19 commit 8590ff4

File tree

5 files changed

+55
-0
lines changed

5 files changed

+55
-0
lines changed

paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ public AbstractFileStoreWrite<KeyValue> newWrite(String commitUser, @Nullable In
173173
partitionType,
174174
keyType,
175175
valueType,
176+
mfFactory,
176177
this::pathFactory,
177178
newReaderFactoryBuilder(),
178179
snapshotManager(),

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,6 @@ public interface MergeFunction<T> {
4848
/** Get current merged value. */
4949
T getResult();
5050

51+
/** Require copy input kv, this may cache kv in memory. */
5152
boolean requireCopy();
5253
}

paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.paimon.io.KeyValueFileReaderFactory;
3030
import org.apache.paimon.io.KeyValueFileWriterFactory;
3131
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
32+
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
3233
import org.apache.paimon.operation.FileStoreScan;
3334
import org.apache.paimon.operation.FileStoreWrite;
3435
import org.apache.paimon.operation.MemoryFileStoreWrite;
@@ -71,6 +72,7 @@ public class PostponeBucketFileStoreWrite extends MemoryFileStoreWrite<KeyValue>
7172
private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
7273
private final FileIO fileIO;
7374
private final FileStorePathFactory pathFactory;
75+
private final MergeFunctionFactory<KeyValue> mfFactory;
7476
private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
7577

7678
private boolean forceBufferSpill = false;
@@ -83,6 +85,7 @@ public PostponeBucketFileStoreWrite(
8385
RowType partitionType,
8486
RowType keyType,
8587
RowType valueType,
88+
MergeFunctionFactory<KeyValue> mfFactory,
8689
BiFunction<CoreOptions, String, FileStorePathFactory> formatPathFactory,
8790
KeyValueFileReaderFactory.Builder readerFactoryBuilder,
8891
SnapshotManager snapshotManager,
@@ -93,6 +96,7 @@ public PostponeBucketFileStoreWrite(
9396
super(snapshotManager, scan, options, partitionType, null, null, tableName);
9497
this.fileIO = fileIO;
9598
this.pathFactory = pathFactory;
99+
this.mfFactory = mfFactory;
96100
this.readerFactoryBuilder = readerFactoryBuilder;
97101

98102
Options newOptions = new Options(options.toMap());
@@ -189,6 +193,7 @@ protected PostponeBucketWriter createWriter(
189193
options.spillCompressOptions(),
190194
options.writeBufferSpillDiskSize(),
191195
ioManager,
196+
mfFactory.create(),
192197
writerFactory,
193198
files -> newFileRead(partition, bucket, files),
194199
forceBufferSpill,

paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.paimon.manifest.FileSource;
3434
import org.apache.paimon.memory.MemoryOwner;
3535
import org.apache.paimon.memory.MemorySegmentPool;
36+
import org.apache.paimon.mergetree.compact.MergeFunction;
3637
import org.apache.paimon.options.MemorySize;
3738
import org.apache.paimon.reader.RecordReaderIterator;
3839
import org.apache.paimon.types.RowType;
@@ -55,6 +56,7 @@ public class PostponeBucketWriter implements RecordWriter<KeyValue>, MemoryOwner
5556

5657
private final FileIO fileIO;
5758
private final DataFilePathFactory pathFactory;
59+
private final MergeFunction<KeyValue> mergeFunction;
5860
private final KeyValueFileWriterFactory writerFactory;
5961
private final List<DataFileMeta> files;
6062
private final IOFunction<List<DataFileMeta>, RecordReaderIterator<KeyValue>> fileRead;
@@ -71,12 +73,14 @@ public PostponeBucketWriter(
7173
CompressOptions spillCompression,
7274
MemorySize maxDiskSize,
7375
@Nullable IOManager ioManager,
76+
MergeFunction<KeyValue> mergeFunction,
7477
KeyValueFileWriterFactory writerFactory,
7578
IOFunction<List<DataFileMeta>, RecordReaderIterator<KeyValue>> fileRead,
7679
boolean useWriteBuffer,
7780
boolean spillable,
7881
@Nullable CommitIncrement restoreIncrement) {
7982
this.ioManager = ioManager;
83+
this.mergeFunction = mergeFunction;
8084
this.writerFactory = writerFactory;
8185
this.fileRead = fileRead;
8286
this.fileIO = fileIO;
@@ -99,6 +103,7 @@ private RollingFileWriter<KeyValue, DataFileMeta> createRollingRowWriter() {
99103

100104
@Override
101105
public void write(KeyValue record) throws Exception {
106+
validateRetract(record);
102107
boolean success = sinkWriter.write(record);
103108
if (!success) {
104109
flush();
@@ -112,6 +117,14 @@ public void write(KeyValue record) throws Exception {
112117
}
113118
}
114119

120+
private void validateRetract(KeyValue kv) {
121+
if (kv.valueKind().isRetract()) {
122+
mergeFunction.reset();
123+
mergeFunction.add(kv);
124+
mergeFunction.getResult();
125+
}
126+
}
127+
115128
private void flush() throws Exception {
116129
files.addAll(sinkWriter.flush());
117130
}

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,47 @@
3535
import java.util.concurrent.atomic.AtomicBoolean;
3636

3737
import static org.assertj.core.api.Assertions.assertThat;
38+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3839

3940
/** IT cases for postpone bucket tables. */
4041
public class PostponeBucketTableITCase extends AbstractTestBase {
4142

4243
private static final int TIMEOUT = 120;
4344

45+
@Test
46+
public void testRetractOnPartialUpdate() {
47+
String warehouse = getTempDirPath();
48+
TableEnvironment tEnv =
49+
tableEnvironmentBuilder()
50+
.batchMode()
51+
.setConf(TableConfigOptions.TABLE_DML_SYNC, true)
52+
.build();
53+
54+
tEnv.executeSql(
55+
"CREATE CATALOG mycat WITH (\n"
56+
+ " 'type' = 'paimon',\n"
57+
+ " 'warehouse' = '"
58+
+ warehouse
59+
+ "'\n"
60+
+ ")");
61+
tEnv.executeSql("USE CATALOG mycat");
62+
tEnv.executeSql(
63+
"CREATE TABLE T (\n"
64+
+ " k INT,\n"
65+
+ " v1 INT,\n"
66+
+ " v2 INT,\n"
67+
+ " row_kind_col STRING,\n"
68+
+ " PRIMARY KEY (k) NOT ENFORCED\n"
69+
+ ") WITH (\n"
70+
+ " 'bucket' = '-2',\n"
71+
+ " 'merge-engine' = 'partial-update',\n"
72+
+ " 'rowkind.field' = 'row_kind_col'\n"
73+
+ ")");
74+
assertThatThrownBy(() -> tEnv.executeSql("INSERT INTO T VALUES (1, 1, 1, '-D')").await())
75+
.rootCause()
76+
.hasMessageContaining("By default, Partial update can not accept delete records");
77+
}
78+
4479
@Test
4580
public void testWriteThenCompact() throws Exception {
4681
String warehouse = getTempDirPath();

0 commit comments

Comments
 (0)