Skip to content

Commit ff92814

Browse files
committed
fix
1 parent 9206acb commit ff92814

File tree

1 file changed

+8
-0
lines changed

1 file changed

+8
-0
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
package org.apache.paimon.flink.sink.coordinator;
2020

2121
import org.apache.paimon.flink.sink.TableWriteOperator;
22+
import org.apache.paimon.fs.Path;
23+
import org.apache.paimon.options.MemorySize;
2224
import org.apache.paimon.table.FileStoreTable;
25+
import org.apache.paimon.utils.SegmentsCache;
2326

2427
import org.apache.flink.runtime.jobgraph.OperatorID;
2528
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
@@ -32,6 +35,7 @@
3235
import java.util.concurrent.CompletionException;
3336
import java.util.concurrent.ThreadPoolExecutor;
3437

38+
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_MEMORY;
3539
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
3640

3741
/**
@@ -52,6 +56,10 @@ public WriteOperatorCoordinator(FileStoreTable table) {
5256
@Override
5357
public void start() throws Exception {
5458
executor = createCachedThreadPool(1, "WriteCoordinator");
59+
MemorySize cacheMemory =
60+
table.coreOptions().toConfiguration().get(SINK_WRITER_COORDINATOR_CACHE_MEMORY);
61+
SegmentsCache<Path> manifestCache = SegmentsCache.create(cacheMemory, Long.MAX_VALUE);
62+
table.setManifestCache(manifestCache);
5563
coordinator = new TableWriteCoordinator(table);
5664
}
5765

0 commit comments

Comments
 (0)