Skip to content

Commit 0e91305

Browse files
committed
fix
1 parent a1098df commit 0e91305

File tree

8 files changed

+148
-111
lines changed

8 files changed

+148
-111
lines changed

paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,13 @@
2626
import org.apache.paimon.fileindex.FileIndexOptions;
2727
import org.apache.paimon.format.FormatWriterFactory;
2828
import org.apache.paimon.format.SimpleColStats;
29-
import org.apache.paimon.format.SimpleStatsExtractor;
3029
import org.apache.paimon.fs.FileIO;
3130
import org.apache.paimon.fs.Path;
3231
import org.apache.paimon.manifest.FileSource;
3332
import org.apache.paimon.stats.SimpleStats;
3433
import org.apache.paimon.stats.SimpleStatsConverter;
3534
import org.apache.paimon.types.RowType;
3635
import org.apache.paimon.utils.Pair;
37-
import org.apache.paimon.utils.StatsCollectorFactories;
3836

3937
import org.slf4j.Logger;
4038
import org.slf4j.LoggerFactory;
@@ -86,11 +84,10 @@ public KeyValueDataFileWriter(
8684
RowType keyType,
8785
RowType valueType,
8886
RowType writeRowType,
89-
@Nullable SimpleStatsExtractor simpleStatsExtractor,
87+
Function<RowType, SimpleStatsProducer> statsProducerFactory,
9088
long schemaId,
9189
int level,
9290
String compression,
93-
String statsMode,
9491
CoreOptions options,
9592
FileSource fileSource,
9693
FileIndexOptions fileIndexOptions,
@@ -101,10 +98,8 @@ public KeyValueDataFileWriter(
10198
path,
10299
converter,
103100
writeRowType,
104-
simpleStatsExtractor,
101+
statsProducerFactory.apply(writeRowType),
105102
compression,
106-
StatsCollectorFactories.createStatsFactoriesForAvro(
107-
statsMode, options, writeRowType.getFieldNames()),
108103
options.asyncFileWrite());
109104

110105
this.keyType = keyType;

paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,12 @@
2424
import org.apache.paimon.fileindex.FileIndexOptions;
2525
import org.apache.paimon.format.FormatWriterFactory;
2626
import org.apache.paimon.format.SimpleColStats;
27-
import org.apache.paimon.format.SimpleStatsExtractor;
2827
import org.apache.paimon.fs.FileIO;
2928
import org.apache.paimon.fs.Path;
3029
import org.apache.paimon.manifest.FileSource;
3130
import org.apache.paimon.types.RowType;
3231
import org.apache.paimon.utils.Pair;
3332

34-
import javax.annotation.Nullable;
35-
3633
import java.util.Arrays;
3734
import java.util.function.Function;
3835

@@ -46,11 +43,10 @@ public KeyValueDataFileWriterImpl(
4643
Function<KeyValue, InternalRow> converter,
4744
RowType keyType,
4845
RowType valueType,
49-
@Nullable SimpleStatsExtractor simpleStatsExtractor,
46+
Function<RowType, SimpleStatsProducer> statsProducerFactory,
5047
long schemaId,
5148
int level,
5249
String compression,
53-
String statsMode,
5450
CoreOptions options,
5551
FileSource fileSource,
5652
FileIndexOptions fileIndexOptions,
@@ -63,11 +59,10 @@ public KeyValueDataFileWriterImpl(
6359
keyType,
6460
valueType,
6561
KeyValue.schema(keyType, valueType),
66-
simpleStatsExtractor,
62+
statsProducerFactory,
6763
schemaId,
6864
level,
6965
compression,
70-
statsMode,
7166
options,
7267
fileSource,
7368
fileIndexOptions,

paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.fileindex.FileIndexOptions;
2828
import org.apache.paimon.format.FileFormat;
2929
import org.apache.paimon.format.FormatWriterFactory;
30+
import org.apache.paimon.format.SimpleStatsCollector;
3031
import org.apache.paimon.format.SimpleStatsExtractor;
3132
import org.apache.paimon.fs.FileIO;
3233
import org.apache.paimon.fs.Path;
@@ -38,8 +39,6 @@
3839
import org.apache.paimon.utils.FileStorePathFactory;
3940
import org.apache.paimon.utils.StatsCollectorFactories;
4041

41-
import javax.annotation.Nullable;
42-
4342
import java.io.IOException;
4443
import java.util.Collections;
4544
import java.util.HashMap;
@@ -123,11 +122,10 @@ private KeyValueDataFileWriter createDataFileWriter(
123122
new KeyValueThinSerializer(keyType, valueType)::toRow,
124123
keyType,
125124
valueType,
126-
formatContext.extractor(level),
125+
type -> formatContext.statsProducer(level, options, type),
127126
schemaId,
128127
level,
129128
formatContext.compression(level),
130-
formatContext.statsMode(level),
131129
options,
132130
fileSource,
133131
fileIndexOptions,
@@ -139,11 +137,10 @@ private KeyValueDataFileWriter createDataFileWriter(
139137
new KeyValueSerializer(keyType, valueType)::toRow,
140138
keyType,
141139
valueType,
142-
formatContext.extractor(level),
140+
type -> formatContext.statsProducer(level, options, type),
143141
schemaId,
144142
level,
145143
formatContext.compression(level),
146-
formatContext.statsMode(level),
147144
options,
148145
fileSource,
149146
fileIndexOptions,
@@ -283,9 +280,6 @@ private WriteFormatContext(
283280

284281
FileFormat fileFormat =
285282
FileFormat.fromIdentifier(format, options.toConfiguration());
286-
// In avro format, minValue, maxValue, and nullCount are not counted, set
287-
// StatsExtractor is Optional.empty() and will use SimpleStatsExtractor to collect
288-
// stats
289283
IntFunction<SimpleStatsExtractor> extractor =
290284
level -> {
291285
SimpleColStatsCollector.Factory[] statsFactories =
@@ -300,7 +294,7 @@ private WriteFormatContext(
300294
.createStatsExtractor(writeRowType, statsFactories)
301295
.orElse(null);
302296
};
303-
format2Extractor.put(format, format.equals("avro") ? level -> null : extractor);
297+
format2Extractor.put(format, extractor);
304298
format2WriterFactory.put(format, fileFormat.createWriterFactory(writeRowType));
305299
}
306300
}
@@ -320,9 +314,22 @@ private boolean supportsThinMode(RowType keyType, RowType valueType) {
320314
return true;
321315
}
322316

323-
@Nullable
324-
private SimpleStatsExtractor extractor(int level) {
325-
return format2Extractor.get(level2Format.apply(level)).apply(level);
317+
private SimpleStatsProducer statsProducer(
318+
int level, CoreOptions options, RowType writeRowType) {
319+
String format = level2Format.apply(level);
320+
String statsMode = level2Stats.apply(level);
321+
if (format.equals("avro")) {
322+
// In avro format, minValue, maxValue, and nullCount are not counted, so use
323+
// SimpleStatsExtractor to collect stats
324+
SimpleColStatsCollector.Factory[] collectors =
325+
StatsCollectorFactories.createStatsFactoriesForAvro(
326+
statsMode, options, writeRowType.getFieldNames());
327+
SimpleStatsCollector collector = new SimpleStatsCollector(writeRowType, collectors);
328+
return SimpleStatsProducer.fromCollector(collector);
329+
}
330+
SimpleStatsExtractor extractor =
331+
format2Extractor.get(level2Format.apply(level)).apply(level);
332+
return SimpleStatsProducer.fromExtractor(extractor);
326333
}
327334

328335
private DataFilePathFactory pathFactory(int level) {
@@ -336,9 +343,5 @@ private FormatWriterFactory writerFactory(int level) {
336343
private String compression(int level) {
337344
return level2Compress.apply(level);
338345
}
339-
340-
private String statsMode(int level) {
341-
return level2Stats.apply(level);
342-
}
343346
}
344347
}

paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,13 @@
2424
import org.apache.paimon.fileindex.FileIndexOptions;
2525
import org.apache.paimon.format.FormatWriterFactory;
2626
import org.apache.paimon.format.SimpleColStats;
27-
import org.apache.paimon.format.SimpleStatsExtractor;
2827
import org.apache.paimon.fs.FileIO;
2928
import org.apache.paimon.fs.Path;
3029
import org.apache.paimon.manifest.FileSource;
3130
import org.apache.paimon.table.SpecialFields;
3231
import org.apache.paimon.types.RowType;
3332
import org.apache.paimon.utils.Pair;
3433

35-
import javax.annotation.Nullable;
36-
3734
import java.util.Arrays;
3835
import java.util.HashMap;
3936
import java.util.Map;
@@ -47,35 +44,17 @@ public class KeyValueThinDataFileWriterImpl extends KeyValueDataFileWriter {
4744

4845
private final int[] keyStatMapping;
4946

50-
/**
51-
* Constructs a KeyValueThinDataFileWriterImpl.
52-
*
53-
* @param fileIO The file IO interface.
54-
* @param factory The format writer factory.
55-
* @param path The path to the file.
56-
* @param converter The function to convert KeyValue to InternalRow.
57-
* @param keyType The row type of the key.
58-
* @param valueType The row type of the value.
59-
* @param simpleStatsExtractor The simple stats extractor, can be null.
60-
* @param schemaId The schema ID.
61-
* @param level The level.
62-
* @param compression The compression type.
63-
* @param options The core options.
64-
* @param fileSource The file source.
65-
* @param fileIndexOptions The file index options.
66-
*/
6747
public KeyValueThinDataFileWriterImpl(
6848
FileIO fileIO,
6949
FormatWriterFactory factory,
7050
Path path,
7151
Function<KeyValue, InternalRow> converter,
7252
RowType keyType,
7353
RowType valueType,
74-
@Nullable SimpleStatsExtractor simpleStatsExtractor,
54+
Function<RowType, SimpleStatsProducer> statsProducerFactory,
7555
long schemaId,
7656
int level,
7757
String compression,
78-
String statsMode,
7958
CoreOptions options,
8059
FileSource fileSource,
8160
FileIndexOptions fileIndexOptions,
@@ -88,11 +67,10 @@ public KeyValueThinDataFileWriterImpl(
8867
keyType,
8968
valueType,
9069
KeyValue.schema(RowType.of(), valueType),
91-
simpleStatsExtractor,
70+
statsProducerFactory,
9271
schemaId,
9372
level,
9473
compression,
95-
statsMode,
9674
options,
9775
fileSource,
9876
fileIndexOptions,

paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,9 @@
2121
import org.apache.paimon.data.InternalRow;
2222
import org.apache.paimon.fileindex.FileIndexOptions;
2323
import org.apache.paimon.format.FormatWriterFactory;
24-
import org.apache.paimon.format.SimpleStatsExtractor;
2524
import org.apache.paimon.fs.FileIO;
2625
import org.apache.paimon.fs.Path;
2726
import org.apache.paimon.manifest.FileSource;
28-
import org.apache.paimon.statistics.SimpleColStatsCollector;
2927
import org.apache.paimon.stats.SimpleStats;
3028
import org.apache.paimon.stats.SimpleStatsConverter;
3129
import org.apache.paimon.types.RowType;
@@ -59,11 +57,10 @@ public RowDataFileWriter(
5957
FormatWriterFactory factory,
6058
Path path,
6159
RowType writeSchema,
62-
@Nullable SimpleStatsExtractor simpleStatsExtractor,
60+
SimpleStatsProducer statsProducer,
6361
long schemaId,
6462
LongCounter seqNumCounter,
6563
String fileCompression,
66-
SimpleColStatsCollector.Factory[] statsCollectors,
6764
FileIndexOptions fileIndexOptions,
6865
FileSource fileSource,
6966
boolean asyncFileWrite,
@@ -75,9 +72,8 @@ public RowDataFileWriter(
7572
path,
7673
Function.identity(),
7774
writeSchema,
78-
simpleStatsExtractor,
75+
statsProducer,
7976
fileCompression,
80-
statsCollectors,
8177
asyncFileWrite);
8278
this.schemaId = schemaId;
8379
this.seqNumCounter = seqNumCounter;
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.io;
20+
21+
import org.apache.paimon.data.InternalRow;
22+
import org.apache.paimon.format.SimpleColStats;
23+
import org.apache.paimon.format.SimpleStatsCollector;
24+
import org.apache.paimon.format.SimpleStatsExtractor;
25+
import org.apache.paimon.fs.FileIO;
26+
import org.apache.paimon.fs.Path;
27+
28+
import java.io.IOException;
29+
30+
/** Produce {@link SimpleColStats} for fields. */
31+
public interface SimpleStatsProducer {
32+
33+
boolean isStatsDisabled();
34+
35+
boolean requirePerRecord();
36+
37+
void collect(InternalRow row);
38+
39+
SimpleColStats[] extract(FileIO fileIO, Path path, long length) throws IOException;
40+
41+
static SimpleStatsProducer fromExtractor(SimpleStatsExtractor extractor) {
42+
return new SimpleStatsProducer() {
43+
44+
@Override
45+
public boolean isStatsDisabled() {
46+
return extractor.isStatsDisabled();
47+
}
48+
49+
@Override
50+
public boolean requirePerRecord() {
51+
return false;
52+
}
53+
54+
@Override
55+
public void collect(InternalRow row) {
56+
throw new IllegalStateException();
57+
}
58+
59+
@Override
60+
public SimpleColStats[] extract(FileIO fileIO, Path path, long length)
61+
throws IOException {
62+
return extractor.extract(fileIO, path, length);
63+
}
64+
};
65+
}
66+
67+
static SimpleStatsProducer fromCollector(SimpleStatsCollector collector) {
68+
return new SimpleStatsProducer() {
69+
70+
@Override
71+
public boolean isStatsDisabled() {
72+
return collector.isDisabled();
73+
}
74+
75+
@Override
76+
public boolean requirePerRecord() {
77+
return true;
78+
}
79+
80+
@Override
81+
public void collect(InternalRow row) {
82+
collector.collect(row);
83+
}
84+
85+
@Override
86+
public SimpleColStats[] extract(FileIO fileIO, Path path, long length) {
87+
return collector.extract();
88+
}
89+
};
90+
}
91+
}

0 commit comments

Comments
 (0)