Skip to content

Commit 5af6693

Browse files
committed
[core] Introduce 'metadata.stats-mode.per.level'
1 parent 80128c4 commit 5af6693

File tree

13 files changed

+125
-58
lines changed

13 files changed

+125
-58
lines changed

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,6 +1177,17 @@ public class CoreOptions implements Serializable {
11771177
+ STATS_MODE_SUFFIX))
11781178
.build());
11791179

1180+
public static final ConfigOption<Map<String, String>> METADATA_STATS_MODE_PER_LEVEL =
1181+
key("metadata.stats-mode.per.level")
1182+
.mapType()
1183+
.defaultValue(new HashMap<>())
1184+
.withDescription(
1185+
"Define different 'metadata.stats-mode' for different level, you can add the conf like this:"
1186+
+ " 'metadata.stats-mode.per.level' = '0:none', if the metadata.stats-mode for level is not provided, "
1187+
+ "the default mode which set by `"
1188+
+ METADATA_STATS_MODE.key()
1189+
+ "` will be used.");
1190+
11801191
public static final ConfigOption<Boolean> METADATA_STATS_DENSE_STORE =
11811192
key("metadata.stats-dense-store")
11821193
.booleanType()
@@ -1803,6 +1814,16 @@ public Map<Integer, String> fileFormatPerLevel() {
18031814
e -> normalizeFileFormat(e.getValue())));
18041815
}
18051816

1817+
public String statsMode() {
1818+
return options.get(METADATA_STATS_MODE);
1819+
}
1820+
1821+
public Map<Integer, String> statsModePerLevel() {
1822+
Map<String, String> statsPerLevel = options.get(METADATA_STATS_MODE_PER_LEVEL);
1823+
return statsPerLevel.entrySet().stream()
1824+
.collect(Collectors.toMap(e -> Integer.valueOf(e.getKey()), Map.Entry::getValue));
1825+
}
1826+
18061827
private static String normalizeFileFormat(String fileFormat) {
18071828
return fileFormat.toLowerCase();
18081829
}

paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public KeyValueDataFileWriter(
9090
long schemaId,
9191
int level,
9292
String compression,
93+
String statsMode,
9394
CoreOptions options,
9495
FileSource fileSource,
9596
FileIndexOptions fileIndexOptions,
@@ -103,7 +104,7 @@ public KeyValueDataFileWriter(
103104
simpleStatsExtractor,
104105
compression,
105106
StatsCollectorFactories.createStatsFactories(
106-
options, writeRowType.getFieldNames(), keyType.getFieldNames()),
107+
statsMode, options, writeRowType.getFieldNames()),
107108
options.asyncFileWrite());
108109

109110
this.keyType = keyType;

paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public KeyValueDataFileWriterImpl(
5050
long schemaId,
5151
int level,
5252
String compression,
53+
String statsMode,
5354
CoreOptions options,
5455
FileSource fileSource,
5556
FileIndexOptions fileIndexOptions,
@@ -66,6 +67,7 @@ public KeyValueDataFileWriterImpl(
6667
schemaId,
6768
level,
6869
compression,
70+
statsMode,
6971
options,
7072
fileSource,
7173
fileIndexOptions,

paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,10 @@
4141
import javax.annotation.Nullable;
4242

4343
import java.io.IOException;
44-
import java.util.Collections;
4544
import java.util.HashMap;
4645
import java.util.Map;
47-
import java.util.Optional;
4846
import java.util.Set;
49-
import java.util.function.Function;
47+
import java.util.function.IntFunction;
5048
import java.util.stream.Collectors;
5149

5250
/** A factory to create {@link FileWriter}s for writing {@link KeyValue} files. */
@@ -116,7 +114,7 @@ public RollingFileWriter<KeyValue, DataFileMeta> createRollingChangelogFileWrite
116114

117115
private KeyValueDataFileWriter createDataFileWriter(
118116
Path path, int level, FileSource fileSource, boolean isExternalPath) {
119-
return formatContext.thinModeEnabled()
117+
return formatContext.thinModeEnabled
120118
? new KeyValueThinDataFileWriterImpl(
121119
fileIO,
122120
formatContext.writerFactory(level),
@@ -128,6 +126,7 @@ private KeyValueDataFileWriter createDataFileWriter(
128126
schemaId,
129127
level,
130128
formatContext.compression(level),
129+
formatContext.statsMode(level),
131130
options,
132131
fileSource,
133132
fileIndexOptions,
@@ -143,6 +142,7 @@ private KeyValueDataFileWriter createDataFileWriter(
143142
schemaId,
144143
level,
145144
formatContext.compression(level),
145+
formatContext.statsMode(level),
146146
options,
147147
fileSource,
148148
fileIndexOptions,
@@ -231,10 +231,11 @@ public KeyValueFileWriterFactory build(
231231

232232
private static class WriteFormatContext {
233233

234-
private final Function<Integer, String> level2Format;
235-
private final Function<Integer, String> level2Compress;
234+
private final IntFunction<String> level2Format;
235+
private final IntFunction<String> level2Compress;
236+
private final IntFunction<String> level2Stats;
236237

237-
private final Map<String, Optional<SimpleStatsExtractor>> format2Extractor;
238+
private final Map<String, IntFunction<SimpleStatsExtractor>> format2Extractor;
238239
private final Map<String, DataFilePathFactory> format2PathFactory;
239240
private final Map<String, FormatWriterFactory> format2WriterFactory;
240241

@@ -267,14 +268,13 @@ private WriteFormatContext(
267268
this.level2Compress =
268269
level -> fileCompressionPerLevel.getOrDefault(level, defaultCompress);
269270

271+
String statsMode = options.statsMode();
272+
Map<Integer, String> statsModePerLevel = options.statsModePerLevel();
273+
this.level2Stats = level -> statsModePerLevel.getOrDefault(level, statsMode);
274+
270275
this.format2Extractor = new HashMap<>();
271276
this.format2PathFactory = new HashMap<>();
272277
this.format2WriterFactory = new HashMap<>();
273-
SimpleColStatsCollector.Factory[] statsCollectorFactories =
274-
StatsCollectorFactories.createStatsFactories(
275-
options,
276-
writeRowType.getFieldNames(),
277-
thinModeEnabled ? keyType.getFieldNames() : Collections.emptyList());
278278
for (String format : parentFactories.keySet()) {
279279
format2PathFactory.put(
280280
format,
@@ -285,12 +285,18 @@ private WriteFormatContext(
285285
// In avro format, minValue, maxValue, and nullCount are not counted, set
286286
// StatsExtractor is Optional.empty() and will use SimpleStatsExtractor to collect
287287
// stats
288-
format2Extractor.put(
289-
format,
290-
format.equals("avro")
291-
? Optional.empty()
292-
: fileFormat.createStatsExtractor(
293-
writeRowType, statsCollectorFactories));
288+
IntFunction<SimpleStatsExtractor> extractor =
289+
level -> {
290+
SimpleColStatsCollector.Factory[] statsFactories =
291+
StatsCollectorFactories.createStatsFactories(
292+
level2Stats.apply(level),
293+
options,
294+
writeRowType.getFieldNames());
295+
return fileFormat
296+
.createStatsExtractor(writeRowType, statsFactories)
297+
.orElse(null);
298+
};
299+
format2Extractor.put(format, format.equals("avro") ? level -> null : extractor);
294300
format2WriterFactory.put(format, fileFormat.createWriterFactory(writeRowType));
295301
}
296302
}
@@ -310,13 +316,9 @@ private boolean supportsThinMode(RowType keyType, RowType valueType) {
310316
return true;
311317
}
312318

313-
private boolean thinModeEnabled() {
314-
return thinModeEnabled;
315-
}
316-
317319
@Nullable
318320
private SimpleStatsExtractor extractor(int level) {
319-
return format2Extractor.get(level2Format.apply(level)).orElse(null);
321+
return format2Extractor.get(level2Format.apply(level)).apply(level);
320322
}
321323

322324
private DataFilePathFactory pathFactory(int level) {
@@ -330,5 +332,9 @@ private FormatWriterFactory writerFactory(int level) {
330332
private String compression(int level) {
331333
return level2Compress.apply(level);
332334
}
335+
336+
private String statsMode(int level) {
337+
return level2Stats.apply(level);
338+
}
333339
}
334340
}

paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public KeyValueThinDataFileWriterImpl(
7575
long schemaId,
7676
int level,
7777
String compression,
78+
String statsMode,
7879
CoreOptions options,
7980
FileSource fileSource,
8081
FileIndexOptions fileIndexOptions,
@@ -91,6 +92,7 @@ public KeyValueThinDataFileWriterImpl(
9192
schemaId,
9293
level,
9394
compression,
95+
statsMode,
9496
options,
9597
fileSource,
9698
fileIndexOptions,

paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.migrate;
2020

21+
import org.apache.paimon.CoreOptions;
2122
import org.apache.paimon.data.BinaryRow;
2223
import org.apache.paimon.data.BinaryRowWriter;
2324
import org.apache.paimon.data.BinaryWriter;
@@ -103,15 +104,13 @@ public static DataFileMeta constructFileMeta(
103104
Map<Path, Path> rollback) {
104105

105106
try {
107+
CoreOptions options = ((FileStoreTable) table).coreOptions();
106108
SimpleColStatsCollector.Factory[] factories =
107109
StatsCollectorFactories.createStatsFactories(
108-
((FileStoreTable) table).coreOptions(),
109-
table.rowType().getFieldNames());
110+
options.statsMode(), options, table.rowType().getFieldNames());
110111

111112
SimpleStatsExtractor simpleStatsExtractor =
112-
FileFormat.fromIdentifier(
113-
format,
114-
((FileStoreTable) table).coreOptions().toConfiguration())
113+
FileFormat.fromIdentifier(format, options.toConfiguration())
115114
.createStatsExtractor(table.rowType(), factories)
116115
.orElseThrow(
117116
() ->
@@ -143,15 +142,13 @@ public static DataFileMeta constructFileMeta(
143142
try {
144143
RowType rowTypeWithSchemaId =
145144
((FileStoreTable) table).schemaManager().schema(schemaId).logicalRowType();
145+
CoreOptions options = ((FileStoreTable) table).coreOptions();
146146
SimpleColStatsCollector.Factory[] factories =
147147
StatsCollectorFactories.createStatsFactories(
148-
((FileStoreTable) table).coreOptions(),
149-
rowTypeWithSchemaId.getFieldNames());
148+
options.statsMode(), options, rowTypeWithSchemaId.getFieldNames());
150149

151150
SimpleStatsExtractor simpleStatsExtractor =
152-
FileFormat.fromIdentifier(
153-
format,
154-
((FileStoreTable) table).coreOptions().toConfiguration())
151+
FileFormat.fromIdentifier(format, options.toConfiguration())
155152
.createStatsExtractor(rowTypeWithSchemaId, factories)
156153
.orElseThrow(
157154
() ->

paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ public AppendOnlyFileStoreWrite(
9696
this.pathFactory = pathFactory;
9797

9898
this.statsCollectors =
99-
StatsCollectorFactories.createStatsFactories(options, rowType.getFieldNames());
99+
StatsCollectorFactories.createStatsFactories(
100+
options.statsMode(), options, rowType.getFieldNames());
100101
this.fileIndexOptions = options.indexColumnsOptions();
101102
}
102103

paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@
2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.options.Options;
2323
import org.apache.paimon.statistics.SimpleColStatsCollector;
24-
import org.apache.paimon.statistics.TruncateSimpleColStatsCollector;
25-
import org.apache.paimon.table.SpecialFields;
2624

27-
import java.util.Collections;
2825
import java.util.List;
2926

3027
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
@@ -35,12 +32,7 @@
3532
public class StatsCollectorFactories {
3633

3734
public static SimpleColStatsCollector.Factory[] createStatsFactories(
38-
CoreOptions options, List<String> fields) {
39-
return createStatsFactories(options, fields, Collections.emptyList());
40-
}
41-
42-
public static SimpleColStatsCollector.Factory[] createStatsFactories(
43-
CoreOptions options, List<String> fields, List<String> keyNames) {
35+
String statsMode, CoreOptions options, List<String> fields) {
4436
Options cfg = options.toConfiguration();
4537
SimpleColStatsCollector.Factory[] modes =
4638
new SimpleColStatsCollector.Factory[fields.size()];
@@ -51,17 +43,10 @@ public static SimpleColStatsCollector.Factory[] createStatsFactories(
5143
key(String.format("%s.%s.%s", FIELDS_PREFIX, field, STATS_MODE_SUFFIX))
5244
.stringType()
5345
.noDefaultValue());
54-
if (fieldMode != null) {
55-
modes[i] = SimpleColStatsCollector.from(fieldMode);
56-
} else if (SpecialFields.isSystemField(field)
57-
||
58-
// If we config DATA_FILE_THIN_MODE to true, we need to maintain the
59-
// stats for key fields.
60-
keyNames.contains(SpecialFields.KEY_FIELD_PREFIX + field)) {
61-
modes[i] = () -> new TruncateSimpleColStatsCollector(128);
62-
} else {
63-
modes[i] = SimpleColStatsCollector.from(cfg.get(CoreOptions.METADATA_STATS_MODE));
64-
}
46+
modes[i] =
47+
fieldMode != null
48+
? SimpleColStatsCollector.from(fieldMode)
49+
: SimpleColStatsCollector.from(statsMode);
6550
}
6651
return modes;
6752
}

paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -682,7 +682,9 @@ private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
682682
CoreOptions.FILE_COMPRESSION.defaultValue(),
683683
CompressOptions.defaultOptions(),
684684
StatsCollectorFactories.createStatsFactories(
685-
options, AppendOnlyWriterTest.SCHEMA.getFieldNames()),
685+
"truncate(16)",
686+
options,
687+
AppendOnlyWriterTest.SCHEMA.getFieldNames()),
686688
MemorySize.MAX_VALUE,
687689
new FileIndexOptions(),
688690
true,

paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception
9797
CoreOptions.FILE_COMPRESSION.defaultValue(),
9898
CompressOptions.defaultOptions(),
9999
StatsCollectorFactories.createStatsFactories(
100-
options, SCHEMA.getFieldNames()),
100+
"truncate(16)", options, SCHEMA.getFieldNames()),
101101
MemorySize.MAX_VALUE,
102102
new FileIndexOptions(),
103103
true,

0 commit comments

Comments
 (0)