Skip to content

Commit 2d31524

Browse files
committed
fix
1 parent 8fac08a commit 2d31524

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import java.util.stream.Collectors;
7070
import java.util.stream.IntStream;
7171

72+
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
7273
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
7374
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC;
7475
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC_THREAD_NUMBER;
@@ -149,6 +150,10 @@ public ChangelogMode getChangelogMode() {
149150
return ChangelogMode.all();
150151
}
151152

153+
if (options.get(CHANGELOG_PRODUCER) != CoreOptions.ChangelogProducer.NONE) {
154+
return ChangelogMode.all();
155+
}
156+
152157
return ChangelogMode.upsert();
153158
}
154159

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.paimon.flink;
2020

21-
import org.apache.paimon.CoreOptions;
2221
import org.apache.paimon.flink.sink.FlinkTableSink;
2322
import org.apache.paimon.flink.source.DataTableSource;
2423
import org.apache.paimon.fs.Path;
@@ -40,6 +39,9 @@
4039

4140
import java.util.Collections;
4241

42+
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
43+
import static org.apache.paimon.CoreOptions.ChangelogProducer.INPUT;
44+
import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
4345
import static org.assertj.core.api.Assertions.assertThat;
4446

4547
/** Test for changelog mode with flink source and sink. */
@@ -90,7 +92,14 @@ public void testDefault() throws Exception {
9092
@Test
9193
public void testInputChangelogProducer() throws Exception {
9294
Options options = new Options();
93-
options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT);
95+
options.set(CHANGELOG_PRODUCER, INPUT);
9496
test(options, ChangelogMode.all(), ChangelogMode.all());
9597
}
98+
99+
@Test
100+
public void testLookupChangelogProducer() throws Exception {
101+
Options options = new Options();
102+
options.set(CHANGELOG_PRODUCER, LOOKUP);
103+
test(options, ChangelogMode.all(), ChangelogMode.upsert());
104+
}
96105
}

0 commit comments

Comments
 (0)