Skip to content

Commit 869dcb3

Browse files
committed
Fix compile.
1 parent 4efb48d commit 869dcb3

2 files changed

Lines changed: 3 additions & 19 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.paimon.memory.HeapMemorySegmentPool;
3939
import org.apache.paimon.memory.MemoryPoolFactory;
4040
import org.apache.paimon.options.Options;
41+
import org.apache.paimon.table.BucketMode;
4142
import org.apache.paimon.table.FileStoreTable;
4243
import org.apache.paimon.utils.ExecutorThreadFactory;
4344
import org.slf4j.Logger;
@@ -161,7 +162,7 @@ public void write(InputT event, Context context) throws IOException {
161162
.updateBlobWriteContext(((ChangeEvent) event).tableId(), blobWriteContext);
162163
} catch (Exception e) {
163164
// Table might not exist yet, ignore and continue
164-
LOGGER.debug("Could not get table for BlobWriteContext: {}", e.getMessage());
165+
LOG.debug("Could not get table for BlobWriteContext: {}", e.getMessage());
165166
}
166167
}
167168
if (paimonEvent.getGenericRows() != null) {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterTest.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,8 @@
2929
import org.apache.flink.cdc.common.types.DataType;
3030
import org.apache.flink.cdc.common.types.DataTypes;
3131
import org.apache.flink.cdc.common.types.RowType;
32-
import org.apache.flink.cdc.connectors.paimon.sink.dlf.DlfCatalogUtil;
3332
import org.apache.flink.cdc.connectors.paimon.sink.v2.blob.BlobWriteContext;
3433
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
35-
import org.apache.flink.configuration.Configuration;
3634
import org.apache.flink.metrics.MetricGroup;
3735
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
3836

@@ -78,7 +76,6 @@ class PaimonWriterTest {
7876
@Mock private CoreOptions mockCoreOptions;
7977

8078
private MockedStatic<FlinkCatalogFactory> mockedFactory;
81-
private MockedStatic<DlfCatalogUtil> mockedDlfUtil;
8279
private MockedStatic<BlobWriteContext> mockedBlobWriteContext;
8380
private MockedConstruction<StoreSinkWriteImpl> mockedConstruction;
8481

@@ -90,17 +87,12 @@ class PaimonWriterTest {
9087
@BeforeEach
9188
void setUp() throws Exception {
9289
mockedFactory = Mockito.mockStatic(FlinkCatalogFactory.class);
93-
mockedDlfUtil = Mockito.mockStatic(DlfCatalogUtil.class);
9490
mockedBlobWriteContext = Mockito.mockStatic(BlobWriteContext.class);
9591

9692
mockedFactory
9793
.when(() -> FlinkCatalogFactory.createPaimonCatalog(any(Options.class)))
9894
.thenReturn(mockCatalog);
9995

100-
mockedDlfUtil
101-
.when(() -> DlfCatalogUtil.convertOptionToDlf(any(Options.class), any()))
102-
.thenCallRealMethod();
103-
10496
BlobWriteContext mockBlobContext = BlobWriteContext.empty();
10597
mockedBlobWriteContext
10698
.when(() -> BlobWriteContext.fromTable(anyBoolean(), any(FileStoreTable.class)))
@@ -127,13 +119,7 @@ void setUp() throws Exception {
127119
.thenReturn(java.util.Collections.emptyList()));
128120

129121
writer =
130-
new PaimonWriter<>(
131-
catalogOptions,
132-
metricGroup,
133-
"test_commit_user",
134-
serializer,
135-
new Configuration(),
136-
0L);
122+
new PaimonWriter<>(catalogOptions, metricGroup, "test_commit_user", serializer, 0L);
137123

138124
injectMockCatalog(writer, mockCatalog);
139125
}
@@ -152,9 +138,6 @@ void tearDown() throws Exception {
152138
if (mockedFactory != null) {
153139
mockedFactory.close();
154140
}
155-
if (mockedDlfUtil != null) {
156-
mockedDlfUtil.close();
157-
}
158141
if (mockedBlobWriteContext != null) {
159142
mockedBlobWriteContext.close();
160143
}

0 commit comments

Comments
 (0)