Skip to content

Commit 855a451

Browse files
committed
Add Parquet realted stats recording
1 parent 01812d5 commit 855a451

10 files changed

Lines changed: 195 additions & 21 deletions

presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore;
1717
import com.facebook.presto.hive.orc.DwrfPageSourceFactory;
1818
import com.facebook.presto.hive.orc.OrcPageSourceFactory;
19+
import com.facebook.presto.hive.parquet.ParquetMetadataStats;
1920
import com.facebook.presto.hive.parquet.ParquetPageSourceFactory;
2021
import com.facebook.presto.hive.parquet.ParquetRecordCursorProvider;
2122
import com.facebook.presto.hive.rcfile.RcFilePageSourceFactory;
@@ -102,6 +103,9 @@ public void configure(Binder binder)
102103
binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
103104
newExporter(binder).export(FileFormatDataSourceStats.class).as(generatedNameOf(FileFormatDataSourceStats.class, connectorId));
104105

106+
binder.bind(ParquetMetadataStats.class).in(Scopes.SINGLETON);
107+
newExporter(binder).export(ParquetMetadataStats.class).as(generatedNameOf(ParquetMetadataStats.class, connectorId));
108+
105109
Multibinder<HivePageSourceFactory> pageSourceFactoryBinder = newSetBinder(binder, HivePageSourceFactory.class);
106110
pageSourceFactoryBinder.addBinding().to(OrcPageSourceFactory.class).in(Scopes.SINGLETON);
107111
pageSourceFactoryBinder.addBinding().to(DwrfPageSourceFactory.class).in(Scopes.SINGLETON);

presto-hive/src/main/java/com/facebook/presto/hive/parquet/HdfsParquetDataSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ private void readInternal(long position, byte[] buffer, int bufferOffset, int bu
8181
try {
8282
long readStart = System.nanoTime();
8383
inputStream.readFully(position, buffer, bufferOffset, bufferLength);
84-
stats.readDataBytesPerSecond(bufferLength, System.nanoTime() - readStart);
84+
stats.readDataBytesPerSecond((long) bufferLength, System.nanoTime() - readStart);
8585
}
8686
catch (PrestoException e) {
8787
// just in case there is a Presto wrapper or hook

presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetHiveRecordCursor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ public class ParquetHiveRecordCursor
119119
private boolean closed;
120120

121121
private final FileFormatDataSourceStats stats;
122+
private final ParquetMetadataStats metadataStats;
122123

123124
public ParquetHiveRecordCursor(
124125
HdfsEnvironment hdfsEnvironment,
@@ -134,13 +135,15 @@ public ParquetHiveRecordCursor(
134135
TypeManager typeManager,
135136
boolean predicatePushdownEnabled,
136137
TupleDomain<HiveColumnHandle> effectivePredicate,
137-
FileFormatDataSourceStats stats)
138+
FileFormatDataSourceStats stats,
139+
ParquetMetadataStats metadataStats)
138140
{
139141
requireNonNull(path, "path is null");
140142
checkArgument(length >= 0, "length is negative");
141143
requireNonNull(splitSchema, "splitSchema is null");
142144
requireNonNull(columns, "columns is null");
143145
this.stats = requireNonNull(stats, "stats is null");
146+
this.metadataStats = requireNonNull(metadataStats);
144147

145148
this.totalBytes = length;
146149

@@ -353,7 +356,7 @@ private ParquetRecordReader<FakeParquetRecord> createParquetRecordReader(
353356
Map<List<String>, RichColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, requestedSchema);
354357
TupleDomain<ColumnDescriptor> parquetTupleDomain = getParquetTupleDomain(descriptorsByPath, effectivePredicate);
355358
ParquetPredicate parquetPredicate = buildParquetPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath);
356-
if (predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain)) {
359+
if (predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, metadataStats)) {
357360
offsets.add(block.getStartingPos());
358361
}
359362
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.hive.parquet;
15+
16+
import com.fasterxml.jackson.databind.ObjectMapper;
17+
import io.airlift.stats.DistributionStat;
18+
import org.weakref.jmx.Managed;
19+
import org.weakref.jmx.Nested;
20+
21+
import javax.inject.Inject;
22+
23+
import java.util.HashMap;
24+
import java.util.LinkedHashMap;
25+
import java.util.Map;
26+
27+
public class ParquetMetadataStats
28+
{
29+
private final DistributionStat metadataLength;
30+
private final DistributionStat readSize;
31+
private final Map<String, DistributionStat> dataReadSize;
32+
private final Map<String, DistributionStat> dictionaryReadSize;
33+
private final Map<String, DistributionStat> pageSize;
34+
35+
private static ObjectMapper objectMapper = new ObjectMapper();
36+
37+
@Inject
38+
public ParquetMetadataStats()
39+
{
40+
this.metadataLength = new DistributionStat();
41+
this.readSize = new DistributionStat();
42+
this.dataReadSize = new HashMap<>();
43+
this.dictionaryReadSize = new HashMap<>();
44+
this.pageSize = new HashMap<>();
45+
}
46+
47+
@Managed
48+
@Nested
49+
public DistributionStat getMetadataLength()
50+
{
51+
return metadataLength;
52+
}
53+
54+
@Managed
55+
@Nested
56+
public DistributionStat getReadSize()
57+
{
58+
return readSize;
59+
}
60+
61+
@Managed
62+
public String getDataReadSize()
63+
{
64+
return getSnapshot(dataReadSize);
65+
}
66+
67+
@Managed
68+
public String getDictionaryReadSize()
69+
{
70+
return getSnapshot(dictionaryReadSize);
71+
}
72+
73+
@Managed
74+
public String getPageSize()
75+
{
76+
return getSnapshot(pageSize);
77+
}
78+
79+
private static String getSnapshot(Map<String, DistributionStat> stats)
80+
{
81+
Map<String, DistributionStat.DistributionStatSnapshot> result = new LinkedHashMap<>(stats.size());
82+
for (Map.Entry<String, DistributionStat> entry : stats.entrySet()) {
83+
result.put(entry.getKey(), entry.getValue().snapshot());
84+
}
85+
return toJson(result);
86+
}
87+
88+
private static String toJson(Map<String, DistributionStat.DistributionStatSnapshot> snapshot)
89+
{
90+
try {
91+
return objectMapper.writeValueAsString(snapshot);
92+
}
93+
catch (Exception ignore) {
94+
return snapshot.toString();
95+
}
96+
}
97+
98+
public void addDataReadSize(String name, long size)
99+
{
100+
addSize(dataReadSize, name, size);
101+
readSize.add(size);
102+
}
103+
104+
public void addDictionaryReadSize(String name, long size)
105+
{
106+
addSize(dictionaryReadSize, name, size);
107+
readSize.add(size);
108+
}
109+
110+
public void addPageSize(String name, long size)
111+
{
112+
addSize(pageSize, name, size);
113+
}
114+
115+
private static void addSize(Map<String, DistributionStat> stats, String name, long size)
116+
{
117+
if (!stats.containsKey(name)) {
118+
stats.put(name, new DistributionStat());
119+
}
120+
121+
stats.get(name).add(size);
122+
}
123+
}

presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,19 +83,31 @@ public class ParquetPageSourceFactory
8383
private final boolean useParquetColumnNames;
8484
private final HdfsEnvironment hdfsEnvironment;
8585
private final FileFormatDataSourceStats stats;
86+
private final ParquetMetadataStats metadataStats;
8687

8788
@Inject
89+
public ParquetPageSourceFactory(TypeManager typeManager, HiveClientConfig config, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats, ParquetMetadataStats metadataStats)
90+
{
91+
this(typeManager, requireNonNull(config, "hiveClientConfig is null").isUseParquetColumnNames(), hdfsEnvironment, stats, metadataStats);
92+
}
93+
8894
public ParquetPageSourceFactory(TypeManager typeManager, HiveClientConfig config, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats)
8995
{
90-
this(typeManager, requireNonNull(config, "hiveClientConfig is null").isUseParquetColumnNames(), hdfsEnvironment, stats);
96+
this(typeManager, config, hdfsEnvironment, stats, new ParquetMetadataStats());
9197
}
9298

9399
public ParquetPageSourceFactory(TypeManager typeManager, boolean useParquetColumnNames, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats)
100+
{
101+
this(typeManager, useParquetColumnNames, hdfsEnvironment, stats, new ParquetMetadataStats());
102+
}
103+
104+
public ParquetPageSourceFactory(TypeManager typeManager, boolean useParquetColumnNames, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats, ParquetMetadataStats metadataStats)
94105
{
95106
this.typeManager = requireNonNull(typeManager, "typeManager is null");
96107
this.useParquetColumnNames = useParquetColumnNames;
97108
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
98109
this.stats = requireNonNull(stats, "stats is null");
110+
this.metadataStats = requireNonNull(metadataStats);
99111
}
100112

101113
@Override
@@ -133,7 +145,9 @@ public Optional<? extends ConnectorPageSource> createPageSource(
133145
typeManager,
134146
isParquetPredicatePushdownEnabled(session),
135147
effectivePredicate,
136-
stats));
148+
stats,
149+
metadataStats,
150+
session));
137151
}
138152

139153
public static ParquetPageSource createParquetPageSource(
@@ -150,15 +164,17 @@ public static ParquetPageSource createParquetPageSource(
150164
TypeManager typeManager,
151165
boolean predicatePushdownEnabled,
152166
TupleDomain<HiveColumnHandle> effectivePredicate,
153-
FileFormatDataSourceStats stats)
167+
FileFormatDataSourceStats stats,
168+
ParquetMetadataStats metadataStats,
169+
ConnectorSession session)
154170
{
155171
AggregatedMemoryContext systemMemoryContext = newSimpleAggregatedMemoryContext();
156172

157173
ParquetDataSource dataSource = null;
158174
try {
159175
FileSystem fileSystem = hdfsEnvironment.getFileSystem(user, path, configuration);
160176
FSDataInputStream inputStream = fileSystem.open(path);
161-
ParquetMetadata parquetMetadata = ParquetMetadataReader.readFooter(inputStream, path, fileSize);
177+
ParquetMetadata parquetMetadata = ParquetMetadataReader.readFooter(inputStream, path, fileSize, metadataStats);
162178
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
163179
MessageType fileSchema = fileMetaData.getSchema();
164180
dataSource = buildHdfsParquetDataSource(inputStream, path, fileSize, stats);
@@ -169,6 +185,9 @@ public static ParquetPageSource createParquetPageSource(
169185
.filter(Objects::nonNull)
170186
.collect(toList());
171187

188+
System.err.println(columns);
189+
System.err.println(effectivePredicate.toString(session));
190+
172191
MessageType requestedSchema = new MessageType(fileSchema.getName(), fields);
173192

174193
List<BlockMetaData> blocks = new ArrayList<>();
@@ -185,15 +204,16 @@ public static ParquetPageSource createParquetPageSource(
185204
ParquetPredicate parquetPredicate = buildParquetPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath);
186205
final ParquetDataSource finalDataSource = dataSource;
187206
blocks = blocks.stream()
188-
.filter(block -> predicateMatches(parquetPredicate, block, finalDataSource, descriptorsByPath, parquetTupleDomain))
207+
.filter(block -> predicateMatches(parquetPredicate, block, finalDataSource, descriptorsByPath, parquetTupleDomain, metadataStats))
189208
.collect(toList());
190209
}
191210
MessageColumnIO messageColumnIO = getColumnIO(fileSchema, requestedSchema);
192211
ParquetReader parquetReader = new ParquetReader(
193212
messageColumnIO,
194213
blocks,
195214
dataSource,
196-
systemMemoryContext);
215+
systemMemoryContext,
216+
metadataStats);
197217

198218
return new ParquetPageSource(
199219
parquetReader,

presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetRecordCursorProvider.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,30 @@ public class ParquetRecordCursorProvider
4949
private final boolean useParquetColumnNames;
5050
private final HdfsEnvironment hdfsEnvironment;
5151
private final FileFormatDataSourceStats stats;
52+
private final ParquetMetadataStats metadataStats;
5253

53-
@Inject
5454
public ParquetRecordCursorProvider(HiveClientConfig hiveClientConfig, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats)
5555
{
56-
this(requireNonNull(hiveClientConfig, "hiveClientConfig is null").isUseParquetColumnNames(), hdfsEnvironment, stats);
56+
this(hiveClientConfig, hdfsEnvironment, stats, new ParquetMetadataStats());
5757
}
5858

5959
public ParquetRecordCursorProvider(boolean useParquetColumnNames, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats)
60+
{
61+
this(useParquetColumnNames, hdfsEnvironment, stats, new ParquetMetadataStats());
62+
}
63+
64+
@Inject
65+
public ParquetRecordCursorProvider(HiveClientConfig hiveClientConfig, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats, ParquetMetadataStats metadataStats)
66+
{
67+
this(requireNonNull(hiveClientConfig, "hiveClientConfig is null").isUseParquetColumnNames(), hdfsEnvironment, stats, metadataStats);
68+
}
69+
70+
public ParquetRecordCursorProvider(boolean useParquetColumnNames, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats, ParquetMetadataStats metadataStats)
6071
{
6172
this.useParquetColumnNames = useParquetColumnNames;
6273
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
6374
this.stats = requireNonNull(stats, "stats is null");
75+
this.metadataStats = requireNonNull(metadataStats);
6476
}
6577

6678
@Override
@@ -95,6 +107,7 @@ public Optional<RecordCursor> createRecordCursor(
95107
typeManager,
96108
isParquetPredicatePushdownEnabled(session),
97109
effectivePredicate,
98-
stats));
110+
stats,
111+
metadataStats));
99112
}
100113
}

presto-hive/src/main/java/com/facebook/presto/hive/parquet/predicate/ParquetPredicateUtils.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.facebook.presto.hive.parquet.ParquetDataSource;
1818
import com.facebook.presto.hive.parquet.ParquetDictionaryPage;
1919
import com.facebook.presto.hive.parquet.ParquetEncoding;
20+
import com.facebook.presto.hive.parquet.ParquetMetadataStats;
2021
import com.facebook.presto.hive.parquet.RichColumnDescriptor;
2122
import com.facebook.presto.spi.predicate.Domain;
2223
import com.facebook.presto.spi.predicate.TupleDomain;
@@ -110,14 +111,14 @@ public static ParquetPredicate buildParquetPredicate(MessageType requestedSchema
110111
return new TupleDomainParquetPredicate(parquetTupleDomain, columnReferences.build());
111112
}
112113

113-
public static boolean predicateMatches(ParquetPredicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map<List<String>, RichColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain)
114+
public static boolean predicateMatches(ParquetPredicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map<List<String>, RichColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain, ParquetMetadataStats metadataStats)
114115
{
115116
Map<ColumnDescriptor, Statistics<?>> columnStatistics = getStatistics(block, descriptorsByPath);
116117
if (!parquetPredicate.matches(block.getRowCount(), columnStatistics)) {
117118
return false;
118119
}
119120

120-
Map<ColumnDescriptor, ParquetDictionaryDescriptor> dictionaries = getDictionaries(block, dataSource, descriptorsByPath, parquetTupleDomain);
121+
Map<ColumnDescriptor, ParquetDictionaryDescriptor> dictionaries = getDictionaries(block, dataSource, descriptorsByPath, parquetTupleDomain, metadataStats);
121122
return parquetPredicate.matches(dictionaries);
122123
}
123124

@@ -136,7 +137,7 @@ private static Map<ColumnDescriptor, Statistics<?>> getStatistics(BlockMetaData
136137
return statistics.build();
137138
}
138139

139-
private static Map<ColumnDescriptor, ParquetDictionaryDescriptor> getDictionaries(BlockMetaData blockMetadata, ParquetDataSource dataSource, Map<List<String>, RichColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain)
140+
private static Map<ColumnDescriptor, ParquetDictionaryDescriptor> getDictionaries(BlockMetaData blockMetadata, ParquetDataSource dataSource, Map<List<String>, RichColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain, ParquetMetadataStats metadataStats)
140141
{
141142
ImmutableMap.Builder<ColumnDescriptor, ParquetDictionaryDescriptor> dictionaries = ImmutableMap.builder();
142143
for (ColumnChunkMetaData columnMetaData : blockMetadata.getColumns()) {
@@ -146,6 +147,7 @@ private static Map<ColumnDescriptor, ParquetDictionaryDescriptor> getDictionarie
146147
int totalSize = toIntExact(columnMetaData.getTotalSize());
147148
byte[] buffer = new byte[totalSize];
148149
dataSource.readFully(columnMetaData.getStartingPos(), buffer);
150+
metadataStats.addDictionaryReadSize(columnMetaData.getPath().toDotString(), totalSize);
149151
Optional<ParquetDictionaryPage> dictionaryPage = readDictionaryPage(buffer, columnMetaData.getCodec());
150152
dictionaries.put(descriptor, new ParquetDictionaryDescriptor(descriptor, dictionaryPage));
151153
break;

presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetColumnChunk.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.facebook.presto.hive.parquet.ParquetDataPageV1;
1919
import com.facebook.presto.hive.parquet.ParquetDataPageV2;
2020
import com.facebook.presto.hive.parquet.ParquetDictionaryPage;
21+
import com.facebook.presto.hive.parquet.ParquetMetadataStats;
2122
import io.airlift.slice.Slice;
2223
import parquet.column.Encoding;
2324
import parquet.format.DataPageHeader;
@@ -60,7 +61,7 @@ protected PageHeader readPageHeader()
6061
return Util.readPageHeader(this);
6162
}
6263

63-
public ParquetPageReader readAllPages()
64+
public ParquetPageReader readAllPages(ParquetMetadataStats metadataStats)
6465
throws IOException
6566
{
6667
List<ParquetDataPage> pages = new ArrayList<>();
@@ -70,6 +71,7 @@ public ParquetPageReader readAllPages()
7071
PageHeader pageHeader = readPageHeader();
7172
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
7273
int compressedPageSize = pageHeader.getCompressed_page_size();
74+
metadataStats.addPageSize(descriptor.getColumnChunkMetaData().getPath().toDotString(), compressedPageSize);
7375
switch (pageHeader.type) {
7476
case DICTIONARY_PAGE:
7577
if (dictionaryPage != null) {

0 commit comments

Comments
 (0)