Skip to content

Commit 941d35d

Browse files
committed
Address comments
1 parent 7fc2fbd commit 941d35d

File tree

14 files changed

+78
-143
lines changed

14 files changed

+78
-143
lines changed

Diff for: docs/src/main/sphinx/connector/hudi.md

+3
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ Additionally, following configuration properties can be set depending on the use
101101
failing the query. This skips data that may be expected to be part of the
102102
table.
103103
- `false`
104+
* - `hudi.metadata-enabled`
105+
- Fetch the list of file names and sizes from Hudi metadata table rather than storage.
106+
- `false`
104107

105108
:::
106109

Diff for: plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -65,19 +65,19 @@ public HudiConfig setColumnsToHide(List<String> columnsToHide)
6565
return this;
6666
}
6767

68+
public boolean isMetadataEnabled()
69+
{
70+
return this.metadataEnabled;
71+
}
72+
6873
@Config("hudi.metadata-enabled")
69-
@ConfigDescription("Fetch the list of file names and sizes from Hudi metadata table rather than storage.")
74+
@ConfigDescription("Fetch the list of file names and sizes from Hudi metadata table rather than storage")
7075
public HudiConfig setMetadataEnabled(boolean metadataEnabled)
7176
{
7277
this.metadataEnabled = metadataEnabled;
7378
return this;
7479
}
7580

76-
public boolean isMetadataEnabled()
77-
{
78-
return this.metadataEnabled;
79-
}
80-
8181
@Config("hudi.parquet.use-column-names")
8282
@ConfigDescription("Access Parquet columns using names from the file. If disabled, then columns are accessed using index."
8383
+ "Only applicable to Parquet file format.")

Diff for: plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSource.java

+7-12
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
package io.trino.plugin.hudi;
1515

1616
import io.trino.plugin.hive.HiveColumnHandle;
17-
import io.trino.plugin.hudi.reader.TrinoHudiReaderContext;
1817
import io.trino.plugin.hudi.util.HudiAvroSerializer;
1918
import io.trino.plugin.hudi.util.SynthesizedColumnHandler;
2019
import io.trino.spi.Page;
@@ -31,29 +30,25 @@
3130
import java.util.concurrent.CompletableFuture;
3231

3332
import static com.google.common.base.Preconditions.checkState;
33+
import static java.util.Objects.requireNonNull;
3434

3535
public class HudiPageSource
3636
implements ConnectorPageSource
3737
{
38-
HoodieFileGroupReader<IndexedRecord> fileGroupReader;
39-
ConnectorPageSource pageSource;
40-
TrinoHudiReaderContext readerContext;
41-
PageBuilder pageBuilder;
42-
HudiAvroSerializer avroSerializer;
43-
List<HiveColumnHandle> columnHandles;
38+
private final HoodieFileGroupReader<IndexedRecord> fileGroupReader;
39+
private final ConnectorPageSource pageSource;
40+
private final PageBuilder pageBuilder;
41+
private final HudiAvroSerializer avroSerializer;
4442

4543
public HudiPageSource(
4644
ConnectorPageSource pageSource,
4745
HoodieFileGroupReader<IndexedRecord> fileGroupReader,
48-
TrinoHudiReaderContext readerContext,
4946
List<HiveColumnHandle> columnHandles,
5047
SynthesizedColumnHandler synthesizedColumnHandler)
5148
{
52-
this.pageSource = pageSource;
53-
this.fileGroupReader = fileGroupReader;
49+
this.pageSource = requireNonNull(pageSource, "pageSource is null");
50+
this.fileGroupReader = requireNonNull(fileGroupReader, "fileGroupReader is null");
5451
this.initFileGroupReader();
55-
this.readerContext = readerContext;
56-
this.columnHandles = columnHandles;
5752
this.pageBuilder = new PageBuilder(columnHandles.stream().map(HiveColumnHandle::getType).toList());
5853
this.avroSerializer = new HudiAvroSerializer(columnHandles, synthesizedColumnHandler);
5954
}

Diff for: plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java

+8-9
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,11 @@ public ConnectorPageSource createPageSource(
124124
{
125125
HudiTableHandle hudiTableHandle = (HudiTableHandle) connectorTable;
126126
HudiSplit hudiSplit = (HudiSplit) connectorSplit;
127-
Optional<HudiBaseFile> hudiBaseFileOpt = hudiSplit.getBaseFile();
127+
Optional<HudiBaseFile> hudiBaseFileOpt = hudiSplit.baseFile();
128128

129129
String dataFilePath = hudiBaseFileOpt.isPresent()
130130
? hudiBaseFileOpt.get().getPath()
131-
: hudiSplit.getLogFiles().getFirst().getPath();
131+
: hudiSplit.logFiles().getFirst().getPath();
132132
// Filter out metadata table splits
133133
if (dataFilePath.contains(new StoragePath(
134134
((HudiTableHandle) connectorTable).getBasePath()).toUri().getPath() + "/.hoodie/metadata")) {
@@ -202,7 +202,6 @@ public ConnectorPageSource createPageSource(
202202
return new HudiPageSource(
203203
dataPageSource,
204204
fileGroupReader,
205-
readerContext,
206205
hiveColumns,
207206
synthesizedColumnHandler);
208207
}
@@ -218,7 +217,7 @@ private static ConnectorPageSource createPageSource(
218217
{
219218
ParquetDataSource dataSource = null;
220219
boolean useColumnNames = shouldUseParquetColumnNames(session);
221-
HudiBaseFile baseFile = hudiSplit.getBaseFile().get();
220+
HudiBaseFile baseFile = hudiSplit.baseFile().get();
222221
String path = baseFile.getPath();
223222
long start = baseFile.getStart();
224223
long length = baseFile.getLength();
@@ -237,7 +236,7 @@ private static ConnectorPageSource createPageSource(
237236
Map<List<String>, ColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, requestedSchema);
238237
TupleDomain<ColumnDescriptor> parquetTupleDomain = options.isIgnoreStatistics()
239238
? TupleDomain.all()
240-
: getParquetTupleDomain(descriptorsByPath, hudiSplit.getPredicate(), fileSchema, useColumnNames);
239+
: getParquetTupleDomain(descriptorsByPath, hudiSplit.predicate(), fileSchema, useColumnNames);
241240

242241
TupleDomainParquetPredicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, timeZone);
243242

@@ -277,8 +276,8 @@ private static ConnectorPageSource createPageSource(
277276
}
278277
catch (IOException _) {
279278
}
280-
if (e instanceof TrinoException) {
281-
throw (TrinoException) e;
279+
if (e instanceof TrinoException trinoException) {
280+
throw trinoException;
282281
}
283282
if (e instanceof ParquetCorruptionException) {
284283
throw new TrinoException(HUDI_BAD_DATA, e);
@@ -290,8 +289,8 @@ private static ConnectorPageSource createPageSource(
290289

291290
private static TrinoException handleException(ParquetDataSourceId dataSourceId, Exception exception)
292291
{
293-
if (exception instanceof TrinoException) {
294-
return (TrinoException) exception;
292+
if (exception instanceof TrinoException trinoException) {
293+
return trinoException;
295294
}
296295
if (exception instanceof ParquetCorruptionException) {
297296
return new TrinoException(HUDI_BAD_DATA, exception);

Diff for: plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
public class HudiSessionProperties
4242
implements SessionPropertiesProvider
4343
{
44-
static final String METADATA_TABLE_ENABLED = "metadata_enabled";
4544
private static final String COLUMNS_TO_HIDE = "columns_to_hide";
4645
private static final String USE_PARQUET_COLUMN_NAMES = "use_parquet_column_names";
4746
private static final String PARQUET_SMALL_FILE_THRESHOLD = "parquet_small_file_threshold";
@@ -54,6 +53,7 @@ public class HudiSessionProperties
5453
private static final String SPLIT_GENERATOR_PARALLELISM = "split_generator_parallelism";
5554
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
5655
private static final String IGNORE_ABSENT_PARTITIONS = "ignore_absent_partitions";
56+
static final String METADATA_TABLE_ENABLED = "metadata_enabled";
5757

5858
private final List<PropertyMetadata<?>> sessionProperties;
5959

@@ -72,11 +72,6 @@ public HudiSessionProperties(HudiConfig hudiConfig, ParquetReaderConfig parquetR
7272
.map(name -> ((String) name).toLowerCase(ENGLISH))
7373
.collect(toImmutableList()),
7474
value -> value),
75-
booleanProperty(
76-
METADATA_TABLE_ENABLED,
77-
"For Hudi tables prefer to fetch the list of files from its metadata table",
78-
hudiConfig.isMetadataEnabled(),
79-
false),
8075
booleanProperty(
8176
USE_PARQUET_COLUMN_NAMES,
8277
"Access parquet columns using names from the file. If disabled, then columns are accessed using index.",
@@ -137,6 +132,11 @@ public HudiSessionProperties(HudiConfig hudiConfig, ParquetReaderConfig parquetR
137132
IGNORE_ABSENT_PARTITIONS,
138133
"Ignore absent partitions",
139134
hudiConfig.isIgnoreAbsentPartitions(),
135+
false),
136+
booleanProperty(
137+
METADATA_TABLE_ENABLED,
138+
"For Hudi tables prefer to fetch the list of files from its metadata table",
139+
hudiConfig.isMetadataEnabled(),
140140
false));
141141
}
142142

@@ -152,11 +152,6 @@ public static List<String> getColumnsToHide(ConnectorSession session)
152152
return (List<String>) session.getProperty(COLUMNS_TO_HIDE, List.class);
153153
}
154154

155-
public static boolean isHudiMetadataTableEnabled(ConnectorSession session)
156-
{
157-
return session.getProperty(METADATA_TABLE_ENABLED, Boolean.class);
158-
}
159-
160155
public static boolean shouldUseParquetColumnNames(ConnectorSession session)
161156
{
162157
return session.getProperty(USE_PARQUET_COLUMN_NAMES, Boolean.class);
@@ -211,4 +206,9 @@ public static boolean isIgnoreAbsentPartitions(ConnectorSession session)
211206
{
212207
return session.getProperty(IGNORE_ABSENT_PARTITIONS, Boolean.class);
213208
}
209+
210+
public static boolean isHudiMetadataTableEnabled(ConnectorSession session)
211+
{
212+
return session.getProperty(METADATA_TABLE_ENABLED, Boolean.class);
213+
}
214214
}

Diff for: plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java

+14-61
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
*/
1414
package io.trino.plugin.hudi;
1515

16-
import com.fasterxml.jackson.annotation.JsonCreator;
17-
import com.fasterxml.jackson.annotation.JsonProperty;
1816
import com.google.common.collect.ImmutableList;
1917
import com.google.common.collect.ImmutableMap;
2018
import io.trino.plugin.hive.HiveColumnHandle;
@@ -35,33 +33,25 @@
3533
import static java.lang.Math.toIntExact;
3634
import static java.util.Objects.requireNonNull;
3735

38-
public class HudiSplit
36+
public record HudiSplit(
37+
Optional<HudiBaseFile> baseFile,
38+
List<HudiLogFile> logFiles,
39+
String commitTime,
40+
TupleDomain<HiveColumnHandle> predicate,
41+
List<HivePartitionKey> partitionKeys,
42+
SplitWeight splitWeight)
3943
implements ConnectorSplit
4044
{
4145
private static final int INSTANCE_SIZE = toIntExact(instanceSize(HudiSplit.class));
4246

43-
private final Optional<HudiBaseFile> baseFile;
44-
private final List<HudiLogFile> logFiles;
45-
private final String commitTime;
46-
private final TupleDomain<HiveColumnHandle> predicate;
47-
private final List<HivePartitionKey> partitionKeys;
48-
private final SplitWeight splitWeight;
49-
50-
@JsonCreator
51-
public HudiSplit(
52-
@JsonProperty("baseFile") HudiBaseFile baseFile,
53-
@JsonProperty("logFiles") List<HudiLogFile> logFiles,
54-
@JsonProperty("commitTime") String commitTime,
55-
@JsonProperty("predicate") TupleDomain<HiveColumnHandle> predicate,
56-
@JsonProperty("partitionKeys") List<HivePartitionKey> partitionKeys,
57-
@JsonProperty("splitWeight") SplitWeight splitWeight)
47+
public HudiSplit
5848
{
59-
this.baseFile = Optional.ofNullable(baseFile);
60-
this.logFiles = requireNonNull(logFiles, "logFiles is null");
61-
this.commitTime = requireNonNull(commitTime, "commitTime is null");
62-
this.predicate = requireNonNull(predicate, "predicate is null");
63-
this.partitionKeys = ImmutableList.copyOf(requireNonNull(partitionKeys, "partitionKeys is null"));
64-
this.splitWeight = requireNonNull(splitWeight, "splitWeight is null");
49+
requireNonNull(baseFile, "baseFile is null");
50+
requireNonNull(logFiles, "logFiles is null");
51+
requireNonNull(commitTime, "commitTime is null");
52+
requireNonNull(predicate, "predicate is null");
53+
partitionKeys = ImmutableList.copyOf(partitionKeys);
54+
requireNonNull(splitWeight, "splitWeight is null");
6555
}
6656

6757
@Override
@@ -74,43 +64,6 @@ public Map<String, String> getSplitInfo()
7464
.buildOrThrow();
7565
}
7666

77-
@JsonProperty
78-
public Optional<HudiBaseFile> getBaseFile()
79-
{
80-
return baseFile;
81-
}
82-
83-
@JsonProperty
84-
public List<HudiLogFile> getLogFiles()
85-
{
86-
return logFiles;
87-
}
88-
89-
@JsonProperty
90-
public String getCommitTime()
91-
{
92-
return commitTime;
93-
}
94-
95-
@JsonProperty
96-
@Override
97-
public SplitWeight getSplitWeight()
98-
{
99-
return splitWeight;
100-
}
101-
102-
@JsonProperty
103-
public TupleDomain<HiveColumnHandle> getPredicate()
104-
{
105-
return predicate;
106-
}
107-
108-
@JsonProperty
109-
public List<HivePartitionKey> getPartitionKeys()
110-
{
111-
return partitionKeys;
112-
}
113-
11467
@Override
11568
public long getRetainedSizeInBytes()
11669
{

Diff for: plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -179,18 +179,18 @@ public static List<HiveColumnHandle> prependHudiMetaColumns(List<HiveColumnHandl
179179

180180
public static FileSlice convertToFileSlice(HudiSplit split, String basePath)
181181
{
182-
String dataFilePath = split.getBaseFile().isPresent()
183-
? split.getBaseFile().get().getPath()
184-
: split.getLogFiles().getFirst().getPath();
182+
String dataFilePath = split.baseFile().isPresent()
183+
? split.baseFile().get().getPath()
184+
: split.logFiles().getFirst().getPath();
185185
String fileId = FSUtils.getFileIdFromFileName(new StoragePath(dataFilePath).getName());
186-
HoodieBaseFile baseFile = split.getBaseFile().isPresent()
187-
? new HoodieBaseFile(dataFilePath, fileId, split.getCommitTime(), null)
186+
HoodieBaseFile baseFile = split.baseFile().isPresent()
187+
? new HoodieBaseFile(dataFilePath, fileId, split.commitTime(), null)
188188
: null;
189189

190190
return new FileSlice(
191191
new HoodieFileGroupId(FSUtils.getRelativePartitionPath(new StoragePath(basePath), new StoragePath(dataFilePath)), fileId),
192-
split.getCommitTime(),
192+
split.commitTime(),
193193
baseFile,
194-
split.getLogFiles().stream().map(lf -> new HoodieLogFile(lf.getPath())).toList());
194+
split.logFiles().stream().map(lf -> new HoodieLogFile(lf.getPath())).toList());
195195
}
196196
}

Diff for: plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/file/HudiBaseFile.java

+2-11
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ public class HudiBaseFile
2323
implements HudiFile
2424
{
2525
private final String path;
26-
private final String fileName;
2726
private final long fileSize;
2827
private final long modificationTime;
2928
private final long start;
@@ -40,15 +39,14 @@ public static HudiBaseFile of(HoodieBaseFile baseFile, long start, long length)
4039
}
4140

4241
@JsonCreator
43-
public HudiBaseFile(@JsonProperty("path") String path,
44-
@JsonProperty("fileName") String fileName,
42+
public HudiBaseFile(
43+
@JsonProperty("path") String path,
4544
@JsonProperty("fileSize") long fileSize,
4645
@JsonProperty("modificationTime") long modificationTime,
4746
@JsonProperty("start") long start,
4847
@JsonProperty("length") long length)
4948
{
5049
this.path = path;
51-
this.fileName = fileName;
5250
this.fileSize = fileSize;
5351
this.modificationTime = modificationTime;
5452
this.start = start;
@@ -62,7 +60,6 @@ private HudiBaseFile(HoodieBaseFile baseFile, long start, long length)
6260
checkArgument(length >= 0, "length must be positive");
6361
checkArgument(start + length <= baseFile.getFileSize(), "fileSize must be at least start + length");
6462
this.path = baseFile.getPath();
65-
this.fileName = baseFile.getFileName();
6663
this.fileSize = baseFile.getFileSize();
6764
this.modificationTime = baseFile.getPathInfo().getModificationTime();
6865
this.start = start;
@@ -75,12 +72,6 @@ public String getPath()
7572
return path;
7673
}
7774

78-
@JsonProperty
79-
public String getFileName()
80-
{
81-
return fileName;
82-
}
83-
8475
@JsonProperty
8576
public long getFileSize()
8677
{

Diff for: plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/file/HudiFile.java

-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ public interface HudiFile
1717
{
1818
String getPath();
1919

20-
String getFileName();
21-
2220
long getFileSize();
2321

2422
long getModificationTime();

0 commit comments

Comments
 (0)