|
24 | 24 | import org.apache.fluss.metadata.TablePath; |
25 | 25 | import org.apache.fluss.record.LogRecord; |
26 | 26 |
|
| 27 | +import org.apache.paimon.CoreOptions; |
27 | 28 | import org.apache.paimon.catalog.Catalog; |
28 | 29 | import org.apache.paimon.table.FileStoreTable; |
29 | 30 | import org.apache.paimon.table.sink.CommitMessage; |
30 | 31 |
|
31 | 32 | import java.io.IOException; |
| 33 | +import java.util.Collections; |
32 | 34 | import java.util.List; |
| 35 | +import java.util.Map; |
| 36 | +import java.util.Objects; |
33 | 37 |
|
| 38 | +import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION; |
| 39 | +import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.FLUSS_CONF_PREFIX; |
34 | 40 | import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon; |
35 | 41 |
|
36 | 42 | /** Implementation of {@link LakeWriter} for Paimon. */ |
@@ -97,7 +103,21 @@ public void close() throws IOException { |
97 | 103 |
|
98 | 104 | private FileStoreTable getTable(TablePath tablePath) throws IOException { |
99 | 105 | try { |
100 | | - return (FileStoreTable) paimonCatalog.getTable(toPaimon(tablePath)); |
| 106 | + FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(toPaimon(tablePath)); |
| 107 | + Map<String, String> compactionOptions = |
| 108 | + Collections.singletonMap( |
| 109 | + CoreOptions.WRITE_ONLY.key(), |
| 110 | + Objects.equals( |
| 111 | + table.schema() |
| 112 | + .options() |
| 113 | + .get( |
| 114 | + FLUSS_CONF_PREFIX |
| 115 | + + TABLE_DATALAKE_AUTO_COMPACTION |
| 116 | + .key()), |
| 117 | + "true") |
| 118 | + ? Boolean.FALSE.toString() |
| 119 | + : Boolean.TRUE.toString()); |
| 120 | + return table.copy(compactionOptions); |
101 | 121 | } catch (Exception e) { |
102 | 122 | throw new IOException("Failed to get table " + tablePath + " in Paimon.", e); |
103 | 123 | } |
|
0 commit comments