Skip to content

Commit 4f5dd0c

Browse files
committed
fix
1 parent 0e91305 commit 4f5dd0c

File tree

5 files changed

+78
-50
lines changed

5 files changed

+78
-50
lines changed

paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public KeyValueDataFileWriter(
8484
RowType keyType,
8585
RowType valueType,
8686
RowType writeRowType,
87-
Function<RowType, SimpleStatsProducer> statsProducerFactory,
87+
SimpleStatsProducer statsProducer,
8888
long schemaId,
8989
int level,
9090
String compression,
@@ -98,7 +98,7 @@ public KeyValueDataFileWriter(
9898
path,
9999
converter,
100100
writeRowType,
101-
statsProducerFactory.apply(writeRowType),
101+
statsProducer,
102102
compression,
103103
options.asyncFileWrite());
104104

paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public KeyValueDataFileWriterImpl(
4343
Function<KeyValue, InternalRow> converter,
4444
RowType keyType,
4545
RowType valueType,
46-
Function<RowType, SimpleStatsProducer> statsProducerFactory,
46+
SimpleStatsProducer statsProducer,
4747
long schemaId,
4848
int level,
4949
String compression,
@@ -59,7 +59,7 @@ public KeyValueDataFileWriterImpl(
5959
keyType,
6060
valueType,
6161
KeyValue.schema(keyType, valueType),
62-
statsProducerFactory,
62+
statsProducer,
6363
schemaId,
6464
level,
6565
compression,

paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java

Lines changed: 58 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.paimon.types.DataField;
3838
import org.apache.paimon.types.RowType;
3939
import org.apache.paimon.utils.FileStorePathFactory;
40+
import org.apache.paimon.utils.Pair;
4041
import org.apache.paimon.utils.StatsCollectorFactories;
4142

4243
import java.io.IOException;
@@ -122,7 +123,7 @@ private KeyValueDataFileWriter createDataFileWriter(
122123
new KeyValueThinSerializer(keyType, valueType)::toRow,
123124
keyType,
124125
valueType,
125-
type -> formatContext.statsProducer(level, options, type),
126+
formatContext.statsProducer(level, options),
126127
schemaId,
127128
level,
128129
formatContext.compression(level),
@@ -137,7 +138,7 @@ private KeyValueDataFileWriter createDataFileWriter(
137138
new KeyValueSerializer(keyType, valueType)::toRow,
138139
keyType,
139140
valueType,
140-
type -> formatContext.statsProducer(level, options, type),
141+
formatContext.statsProducer(level, options),
141142
schemaId,
142143
level,
143144
formatContext.compression(level),
@@ -233,12 +234,19 @@ private static class WriteFormatContext {
233234
private final IntFunction<String> level2Compress;
234235
private final IntFunction<String> level2Stats;
235236

236-
private final Map<String, IntFunction<SimpleStatsExtractor>> format2Extractor;
237+
private final Map<Pair<String, String>, SimpleStatsExtractor> formatStats2Extractor;
238+
private final Map<String, SimpleColStatsCollector.Factory[]> statsMode2AvroStats;
237239
private final Map<String, DataFilePathFactory> format2PathFactory;
240+
private final Map<String, FileFormat> formatFactory;
238241
private final Map<String, FormatWriterFactory> format2WriterFactory;
239242

243+
private final BinaryRow partition;
244+
private final int bucket;
240245
private final RowType keyType;
241246
private final RowType valueType;
247+
private final RowType writeRowType;
248+
private final Map<String, FileStorePathFactory> parentFactories;
249+
private final CoreOptions options;
242250
private final boolean thinModeEnabled;
243251

244252
private WriteFormatContext(
@@ -249,11 +257,15 @@ private WriteFormatContext(
249257
FileFormat defaultFormat,
250258
Map<String, FileStorePathFactory> parentFactories,
251259
CoreOptions options) {
260+
this.partition = partition;
261+
this.bucket = bucket;
252262
this.keyType = keyType;
253263
this.valueType = valueType;
264+
this.parentFactories = parentFactories;
265+
this.options = options;
254266
this.thinModeEnabled =
255267
options.dataFileThinMode() && supportsThinMode(keyType, valueType);
256-
RowType writeRowType =
268+
this.writeRowType =
257269
KeyValue.schema(thinModeEnabled ? RowType.of() : keyType, valueType);
258270
Map<Integer, String> fileFormatPerLevel = options.fileFormatPerLevel();
259271
this.level2Format =
@@ -270,33 +282,11 @@ private WriteFormatContext(
270282
Map<Integer, String> statsModePerLevel = options.statsModePerLevel();
271283
this.level2Stats = level -> statsModePerLevel.getOrDefault(level, statsMode);
272284

273-
this.format2Extractor = new HashMap<>();
285+
this.formatStats2Extractor = new HashMap<>();
286+
this.statsMode2AvroStats = new HashMap<>();
274287
this.format2PathFactory = new HashMap<>();
275288
this.format2WriterFactory = new HashMap<>();
276-
for (String format : parentFactories.keySet()) {
277-
format2PathFactory.put(
278-
format,
279-
parentFactories.get(format).createDataFilePathFactory(partition, bucket));
280-
281-
FileFormat fileFormat =
282-
FileFormat.fromIdentifier(format, options.toConfiguration());
283-
IntFunction<SimpleStatsExtractor> extractor =
284-
level -> {
285-
SimpleColStatsCollector.Factory[] statsFactories =
286-
StatsCollectorFactories.createStatsFactories(
287-
level2Stats.apply(level),
288-
options,
289-
writeRowType.getFieldNames(),
290-
thinModeEnabled
291-
? keyType.getFieldNames()
292-
: Collections.emptyList());
293-
return fileFormat
294-
.createStatsExtractor(writeRowType, statsFactories)
295-
.orElse(null);
296-
};
297-
format2Extractor.put(format, extractor);
298-
format2WriterFactory.put(format, fileFormat.createWriterFactory(writeRowType));
299-
}
289+
this.formatFactory = new HashMap<>();
300290
}
301291

302292
private boolean supportsThinMode(RowType keyType, RowType valueType) {
@@ -314,34 +304,64 @@ private boolean supportsThinMode(RowType keyType, RowType valueType) {
314304
return true;
315305
}
316306

317-
private SimpleStatsProducer statsProducer(
318-
int level, CoreOptions options, RowType writeRowType) {
307+
private SimpleStatsProducer statsProducer(int level, CoreOptions options) {
319308
String format = level2Format.apply(level);
320309
String statsMode = level2Stats.apply(level);
321310
if (format.equals("avro")) {
322311
// In avro format, minValue, maxValue, and nullCount are not counted, so use
323312
// SimpleStatsExtractor to collect stats
324-
SimpleColStatsCollector.Factory[] collectors =
325-
StatsCollectorFactories.createStatsFactoriesForAvro(
326-
statsMode, options, writeRowType.getFieldNames());
327-
SimpleStatsCollector collector = new SimpleStatsCollector(writeRowType, collectors);
313+
SimpleColStatsCollector.Factory[] factories =
314+
statsMode2AvroStats.computeIfAbsent(
315+
statsMode,
316+
key ->
317+
StatsCollectorFactories.createStatsFactoriesForAvro(
318+
statsMode, options, writeRowType.getFieldNames()));
319+
SimpleStatsCollector collector = new SimpleStatsCollector(writeRowType, factories);
328320
return SimpleStatsProducer.fromCollector(collector);
329321
}
322+
330323
SimpleStatsExtractor extractor =
331-
format2Extractor.get(level2Format.apply(level)).apply(level);
324+
formatStats2Extractor.computeIfAbsent(
325+
Pair.of(format, statsMode),
326+
key -> {
327+
SimpleColStatsCollector.Factory[] statsFactories =
328+
StatsCollectorFactories.createStatsFactories(
329+
statsMode,
330+
options,
331+
writeRowType.getFieldNames(),
332+
thinModeEnabled
333+
? keyType.getFieldNames()
334+
: Collections.emptyList());
335+
return fileFormat(format)
336+
.createStatsExtractor(writeRowType, statsFactories)
337+
.orElse(null);
338+
});
332339
return SimpleStatsProducer.fromExtractor(extractor);
333340
}
334341

335342
private DataFilePathFactory pathFactory(int level) {
336-
return format2PathFactory.get(level2Format.apply(level));
343+
String format = level2Format.apply(level);
344+
return format2PathFactory.computeIfAbsent(
345+
format,
346+
key ->
347+
parentFactories
348+
.get(format)
349+
.createDataFilePathFactory(partition, bucket));
337350
}
338351

339352
private FormatWriterFactory writerFactory(int level) {
340-
return format2WriterFactory.get(level2Format.apply(level));
353+
return format2WriterFactory.computeIfAbsent(
354+
level2Format.apply(level),
355+
format -> fileFormat(format).createWriterFactory(writeRowType));
341356
}
342357

343358
private String compression(int level) {
344359
return level2Compress.apply(level);
345360
}
361+
362+
private FileFormat fileFormat(String format) {
363+
return formatFactory.computeIfAbsent(
364+
format, k -> FileFormat.fromIdentifier(format, options.toConfiguration()));
365+
}
346366
}
347367
}

paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public KeyValueThinDataFileWriterImpl(
5151
Function<KeyValue, InternalRow> converter,
5252
RowType keyType,
5353
RowType valueType,
54-
Function<RowType, SimpleStatsProducer> statsProducerFactory,
54+
SimpleStatsProducer statsProducer,
5555
long schemaId,
5656
int level,
5757
String compression,
@@ -67,7 +67,7 @@ public KeyValueThinDataFileWriterImpl(
6767
keyType,
6868
valueType,
6969
KeyValue.schema(RowType.of(), valueType),
70-
statsProducerFactory,
70+
statsProducer,
7171
schemaId,
7272
level,
7373
compression,

paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.data.InternalRow;
2222
import org.apache.paimon.fileindex.FileIndexOptions;
2323
import org.apache.paimon.format.FileFormat;
24+
import org.apache.paimon.format.SimpleStatsCollector;
2425
import org.apache.paimon.format.avro.AvroFileFormat;
2526
import org.apache.paimon.fs.FileIO;
2627
import org.apache.paimon.manifest.FileSource;
@@ -52,20 +53,27 @@ public RowDataRollingFileWriter(
5253
fileFormat.createWriterFactory(writeSchema),
5354
pathFactory.newPath(),
5455
writeSchema,
55-
fileFormat instanceof AvroFileFormat
56-
? null
57-
: fileFormat
58-
.createStatsExtractor(writeSchema, statsCollectors)
59-
.orElse(null),
56+
createStatsProducer(fileFormat, writeSchema, statsCollectors),
6057
schemaId,
6158
seqNumCounter,
6259
fileCompression,
63-
statsCollectors,
6460
fileIndexOptions,
6561
fileSource,
6662
asyncFileWrite,
6763
statsDenseStore,
6864
pathFactory.isExternalPath()),
6965
targetFileSize);
7066
}
67+
68+
private static SimpleStatsProducer createStatsProducer(
69+
FileFormat fileFormat,
70+
RowType rowType,
71+
SimpleColStatsCollector.Factory[] statsCollectors) {
72+
if (fileFormat instanceof AvroFileFormat) {
73+
SimpleStatsCollector collector = new SimpleStatsCollector(rowType, statsCollectors);
74+
return SimpleStatsProducer.fromCollector(collector);
75+
}
76+
return SimpleStatsProducer.fromExtractor(
77+
fileFormat.createStatsExtractor(rowType, statsCollectors).orElse(null));
78+
}
7179
}

0 commit comments

Comments
 (0)