Skip to content

Commit 105e9ae

Browse files
committed
Ensure that FileWriterFactory Checks Hive Feature Flag and Unimplemented Props
Before this change the FileWriterFactory was not checking if hive.ion.nativetrino is True, nor if there are non-default values for unimplemented SerDe properties. This change fixes that. I combined IonReaderOptions and IonWriterOptions into a single IonSerDeProperties to avoid code duplication around the unimplemented check.
1 parent 5af2d2e commit 105e9ae

File tree

10 files changed

+214
-236
lines changed

10 files changed

+214
-236
lines changed

Diff for: lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonEncoderFactory.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,9 @@ private void encodeStruct(IonWriter writer, IntFunction<Block> blockSelector, in
148148
{
149149
writer.stepIn(IonType.STRUCT);
150150
for (int i = 0; i < fieldEncoders.size(); i++) {
151-
// Omit the filed when the field is null
151+
// fields are omitted by default, as was true in the hive serde.
152+
// there is an unimplemented hive legacy property of `ion.serialize_null`
153+
// that could be used to specify typed or untyped ion nulls instead.
152154
Block block = blockSelector.apply(i);
153155
if (block.isNull(position)) {
154156
continue;

Diff for: plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public IonFileWriter(
5151
Closeable rollbackAction,
5252
TypeManager typeManager,
5353
Optional<CompressionKind> compressionKind,
54-
IonWriterOptions.IonEncoding ionEncoding,
54+
IonSerDeProperties.IonEncoding ionEncoding,
5555
List<Column> columns)
5656
throws IOException
5757
{

Diff for: plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriterFactory.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.trino.metastore.StorageFormat;
2424
import io.trino.plugin.hive.FileWriter;
2525
import io.trino.plugin.hive.HiveCompressionCodec;
26+
import io.trino.plugin.hive.HiveConfig;
2627
import io.trino.plugin.hive.HiveFileWriterFactory;
2728
import io.trino.plugin.hive.WriterKind;
2829
import io.trino.plugin.hive.acid.AcidTransaction;
@@ -42,7 +43,6 @@
4243
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
4344
import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR;
4445
import static io.trino.plugin.hive.HiveSessionProperties.getTimestampPrecision;
45-
import static io.trino.plugin.hive.ion.IonWriterOptions.getIonEncoding;
4646
import static io.trino.plugin.hive.util.HiveTypeUtil.getType;
4747
import static io.trino.plugin.hive.util.HiveUtil.getColumnNames;
4848
import static io.trino.plugin.hive.util.HiveUtil.getColumnTypes;
@@ -52,14 +52,17 @@ public class IonFileWriterFactory
5252
{
5353
private final TrinoFileSystemFactory fileSystemFactory;
5454
private final TypeManager typeManager;
55+
private final boolean nativeTrinoEnabled;
5556

5657
@Inject
5758
public IonFileWriterFactory(
5859
TrinoFileSystemFactory fileSystemFactory,
59-
TypeManager typeManager)
60+
TypeManager typeManager,
61+
HiveConfig hiveConfig)
6062
{
6163
this.fileSystemFactory = fileSystemFactory;
6264
this.typeManager = typeManager;
65+
this.nativeTrinoEnabled = hiveConfig.getIonNativeTrinoEnabled();
6366
}
6467

6568
@Override
@@ -75,9 +78,12 @@ public Optional<FileWriter> createFileWriter(
7578
boolean useAcidSchema,
7679
WriterKind writerKind)
7780
{
78-
if (!ION_OUTPUT_FORMAT.equals(storageFormat.getOutputFormat())) {
81+
if (!nativeTrinoEnabled
82+
|| !ION_OUTPUT_FORMAT.equals(storageFormat.getOutputFormat())
83+
|| IonSerDeProperties.hasUnsupportedProperty(schema)) {
7984
return Optional.empty();
8085
}
86+
8187
try {
8288
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
8389
TrinoOutputFile outputFile = fileSystem.newOutputFile(location);
@@ -100,7 +106,7 @@ public Optional<FileWriter> createFileWriter(
100106
rollbackAction,
101107
typeManager,
102108
compressionCodec.getHiveCompressionKind(),
103-
getIonEncoding(schema),
109+
IonSerDeProperties.getIonEncoding(schema),
104110
columns));
105111
}
106112
catch (Exception e) {

Diff for: plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonPageSourceFactory.java

+4-53
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515

1616
import com.amazon.ion.IonReader;
1717
import com.amazon.ion.system.IonReaderBuilder;
18-
import com.google.common.collect.ImmutableMap;
19-
import com.google.common.collect.ImmutableSet;
2018
import com.google.inject.Inject;
2119
import io.trino.filesystem.Location;
2220
import io.trino.filesystem.TrinoFileSystem;
@@ -46,47 +44,23 @@
4644
import java.io.IOException;
4745
import java.io.InputStream;
4846
import java.util.List;
49-
import java.util.Map;
5047
import java.util.Optional;
5148
import java.util.OptionalInt;
52-
import java.util.Set;
53-
import java.util.regex.Pattern;
5449

5550
import static com.google.common.base.Preconditions.checkArgument;
5651
import static com.google.common.collect.ImmutableList.toImmutableList;
5752
import static io.trino.hive.formats.HiveClassNames.ION_SERDE_CLASS;
5853
import static io.trino.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
5954
import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns;
6055
import static io.trino.plugin.hive.ReaderPageSource.noProjectionAdaptation;
61-
import static io.trino.plugin.hive.ion.IonReaderOptions.FAIL_ON_OVERFLOW_PROPERTY;
62-
import static io.trino.plugin.hive.ion.IonReaderOptions.FAIL_ON_OVERFLOW_PROPERTY_DEFAULT;
63-
import static io.trino.plugin.hive.ion.IonReaderOptions.FAIL_ON_OVERFLOW_PROPERTY_WITH_COLUMN;
64-
import static io.trino.plugin.hive.ion.IonReaderOptions.IGNORE_MALFORMED;
65-
import static io.trino.plugin.hive.ion.IonReaderOptions.IGNORE_MALFORMED_DEFAULT;
66-
import static io.trino.plugin.hive.ion.IonWriterOptions.ION_SERIALIZATION_AS_NULL_DEFAULT;
67-
import static io.trino.plugin.hive.ion.IonWriterOptions.ION_SERIALIZATION_AS_NULL_PROPERTY;
68-
import static io.trino.plugin.hive.ion.IonWriterOptions.ION_SERIALIZATION_AS_PROPERTY;
69-
import static io.trino.plugin.hive.ion.IonWriterOptions.ION_TIMESTAMP_OFFSET_DEFAULT;
70-
import static io.trino.plugin.hive.ion.IonWriterOptions.ION_TIMESTAMP_OFFSET_PROPERTY;
7156
import static io.trino.plugin.hive.util.HiveUtil.splitError;
7257

7358
public class IonPageSourceFactory
7459
implements HivePageSourceFactory
7560
{
7661
private final TrinoFileSystemFactory trinoFileSystemFactory;
77-
// this is used as a feature flag to enable Ion native trino integration
7862
private final boolean nativeTrinoEnabled;
7963

80-
private static final Map<String, String> TABLE_PROPERTIES = ImmutableMap.of(
81-
FAIL_ON_OVERFLOW_PROPERTY, FAIL_ON_OVERFLOW_PROPERTY_DEFAULT,
82-
IGNORE_MALFORMED, IGNORE_MALFORMED_DEFAULT,
83-
ION_TIMESTAMP_OFFSET_PROPERTY, ION_TIMESTAMP_OFFSET_DEFAULT,
84-
ION_SERIALIZATION_AS_NULL_PROPERTY, ION_SERIALIZATION_AS_NULL_DEFAULT);
85-
86-
private static final Set<Pattern> COLUMN_PROPERTIES = ImmutableSet.of(
87-
Pattern.compile(FAIL_ON_OVERFLOW_PROPERTY_WITH_COLUMN),
88-
Pattern.compile(ION_SERIALIZATION_AS_PROPERTY));
89-
9064
@Inject
9165
public IonPageSourceFactory(TrinoFileSystemFactory trinoFileSystemFactory, HiveConfig hiveConfig)
9266
{
@@ -110,19 +84,12 @@ public Optional<ReaderPageSource> createPageSource(
11084
boolean originalFile,
11185
AcidTransaction transaction)
11286
{
113-
if (!this.nativeTrinoEnabled) {
114-
// this allows user to defer to a legacy hive implementation(like ion-hive-serde) or throw an error based
115-
// on their use case
87+
if (!nativeTrinoEnabled
88+
|| !ION_SERDE_CLASS.equals(schema.serializationLibraryName())
89+
|| IonSerDeProperties.hasUnsupportedProperty(schema.serdeProperties())) {
11690
return Optional.empty();
11791
}
11892

119-
if (schema.serdeProperties().entrySet().stream().filter(entry -> entry.getKey().startsWith("ion.")).anyMatch(this::isUnsupportedProperty)) {
120-
return Optional.empty();
121-
}
122-
123-
if (!ION_SERDE_CLASS.equals(schema.serializationLibraryName())) {
124-
return Optional.empty();
125-
}
12693
checkArgument(acidInfo.isEmpty(), "Acid is not supported for Ion files");
12794

12895
// Skip empty inputs
@@ -170,7 +137,7 @@ public Optional<ReaderPageSource> createPageSource(
170137
.map(hc -> new Column(hc.getName(), hc.getType(), hc.getBaseHiveColumnIndex()))
171138
.toList();
172139

173-
IonDecoderConfig decoderConfig = IonReaderOptions.decoderConfigFor(schema.serdeProperties());
140+
IonDecoderConfig decoderConfig = IonSerDeProperties.decoderConfigFor(schema.serdeProperties());
174141
IonDecoder decoder = IonDecoderFactory.buildDecoder(decoderColumns, decoderConfig, pageBuilder);
175142
IonPageSource pageSource = new IonPageSource(ionReader, trinoInputStream, decoder, pageBuilder);
176143

@@ -180,20 +147,4 @@ public Optional<ReaderPageSource> createPageSource(
180147
throw new TrinoException(HIVE_CANNOT_OPEN_SPLIT, splitError(e, path, start, length), e);
181148
}
182149
}
183-
184-
private boolean isUnsupportedProperty(Map.Entry<String, String> entry)
185-
{
186-
String key = entry.getKey();
187-
String value = entry.getValue();
188-
189-
String propertyDefault = TABLE_PROPERTIES.get(key);
190-
if (propertyDefault != null) {
191-
return !propertyDefault.equals(value);
192-
}
193-
194-
// For now, any column-specific properties result in an empty PageSource
195-
// since they have no default values for comparison.
196-
return COLUMN_PROPERTIES.stream()
197-
.anyMatch(pattern -> pattern.matcher(key).matches());
198-
}
199150
}

Diff for: plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonReaderOptions.java

-59
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.hive.ion;
15+
16+
import com.amazon.ion.IonWriter;
17+
import com.amazon.ion.system.IonBinaryWriterBuilder;
18+
import com.amazon.ion.system.IonTextWriterBuilder;
19+
import com.google.common.collect.ImmutableMap;
20+
import io.trino.hive.formats.ion.IonDecoderConfig;
21+
import io.trino.spi.TrinoException;
22+
23+
import java.io.OutputStream;
24+
import java.util.Locale;
25+
import java.util.Map;
26+
import java.util.regex.Matcher;
27+
import java.util.regex.Pattern;
28+
29+
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
30+
31+
public final class IonSerDeProperties
32+
{
33+
// Reader properties
34+
public static final String STRICT_PATH_TYPING_PROPERTY = "ion.path_extractor.strict";
35+
public static final String STRICT_PATH_TYPING_DEFAULT = "false";
36+
public static final String PATH_EXTRACTOR_PROPERTY = "ion.(\\w+).path_extractor";
37+
public static final String PATH_EXTRACTION_CASE_SENSITIVITY = "ion.path_extractor.case_sensitive";
38+
public static final String PATH_EXTRACTION_CASE_SENSITIVITY_DEFAULT = "false";
39+
private static final Pattern pathExtractorPattern = Pattern.compile(PATH_EXTRACTOR_PROPERTY);
40+
41+
// unimplemented reader properties
42+
public static final String FAIL_ON_OVERFLOW_PROPERTY = "ion.fail_on_overflow";
43+
public static final String FAIL_ON_OVERFLOW_PROPERTY_DEFAULT = "true";
44+
public static final String COLUMN_FAIL_ON_OVERFLOW_PROPERTY = "ion.\\w+.fail_on_overflow";
45+
public static final String IGNORE_MALFORMED = "ion.ignore_malformed";
46+
public static final String IGNORE_MALFORMED_DEFAULT = "false";
47+
48+
// Writer properties
49+
public static final String ION_ENCODING_PROPERTY = "ion.encoding";
50+
public static final String TEXT_ENCODING = "text";
51+
public static final String BINARY_ENCODING = "binary";
52+
53+
// unimplemented writer properties
54+
public static final String ION_TIMESTAMP_OFFSET_PROPERTY = "ion.timestamp.serialization_offset";
55+
public static final String ION_TIMESTAMP_OFFSET_DEFAULT = "Z";
56+
public static final String ION_SERIALIZE_NULL_AS_PROPERTY = "ion.serialize_null";
57+
public static final String ION_SERIALIZE_NULL_AS_DEFAULT = "OMIT";
58+
public static final String ION_SERIALIZE_AS_PROPERTY = "ion.\\w+.serialize_as";
59+
60+
private static final Pattern unsupportedPropertiesRegex = Pattern.compile(
61+
ION_SERIALIZE_AS_PROPERTY + "|" + COLUMN_FAIL_ON_OVERFLOW_PROPERTY);
62+
63+
private static final Map<String, String> defaultOnlyProperties = Map.of(
64+
// reader properties
65+
FAIL_ON_OVERFLOW_PROPERTY, FAIL_ON_OVERFLOW_PROPERTY_DEFAULT,
66+
IGNORE_MALFORMED, IGNORE_MALFORMED_DEFAULT,
67+
68+
// writer properties
69+
ION_TIMESTAMP_OFFSET_PROPERTY, ION_TIMESTAMP_OFFSET_DEFAULT,
70+
ION_SERIALIZE_NULL_AS_PROPERTY, ION_SERIALIZE_NULL_AS_DEFAULT);
71+
72+
private IonSerDeProperties() {}
73+
74+
public static IonDecoderConfig decoderConfigFor(Map<String, String> propertiesMap)
75+
{
76+
ImmutableMap.Builder<String, String> extractionsBuilder = ImmutableMap.builder();
77+
78+
for (Map.Entry<String, String> property : propertiesMap.entrySet()) {
79+
Matcher matcher = pathExtractorPattern.matcher(property.getKey());
80+
if (matcher.matches()) {
81+
extractionsBuilder.put(matcher.group(1), property.getValue());
82+
}
83+
}
84+
85+
Boolean strictTyping = Boolean.parseBoolean(
86+
propertiesMap.getOrDefault(STRICT_PATH_TYPING_PROPERTY, STRICT_PATH_TYPING_DEFAULT));
87+
Boolean caseSensitive = Boolean.parseBoolean(
88+
propertiesMap.getOrDefault(PATH_EXTRACTION_CASE_SENSITIVITY, PATH_EXTRACTION_CASE_SENSITIVITY_DEFAULT));
89+
90+
return new IonDecoderConfig(extractionsBuilder.buildOrThrow(), strictTyping, caseSensitive);
91+
}
92+
93+
public enum IonEncoding
94+
{
95+
BINARY {
96+
@Override
97+
public IonWriter createWriter(OutputStream outputStream)
98+
{
99+
return IonBinaryWriterBuilder.standard().build(outputStream);
100+
}
101+
},
102+
103+
TEXT {
104+
@Override
105+
public IonWriter createWriter(OutputStream outputStream)
106+
{
107+
return IonTextWriterBuilder.minimal().build(outputStream);
108+
}
109+
};
110+
111+
public abstract IonWriter createWriter(OutputStream outputStream);
112+
}
113+
114+
public static IonEncoding getIonEncoding(Map<String, String> schema)
115+
{
116+
String encodingStr = schema.getOrDefault(ION_ENCODING_PROPERTY, BINARY_ENCODING);
117+
return switch (encodingStr.toLowerCase(Locale.ROOT)) {
118+
case TEXT_ENCODING -> IonEncoding.TEXT;
119+
case BINARY_ENCODING -> IonEncoding.BINARY;
120+
default -> throw new TrinoException(HIVE_UNSUPPORTED_FORMAT,
121+
"Unsupported Ion encoding format: " + encodingStr);
122+
};
123+
}
124+
125+
public static boolean hasUnsupportedProperty(Map<String, String> properties)
126+
{
127+
return properties.entrySet().stream()
128+
.anyMatch((entry) -> {
129+
String key = entry.getKey();
130+
String value = entry.getValue();
131+
if (!key.startsWith("ion.")) {
132+
return false;
133+
}
134+
135+
if (!defaultOnlyProperties.getOrDefault(key, value).equals(value)) {
136+
return true;
137+
}
138+
139+
return unsupportedPropertiesRegex.matcher(key).matches();
140+
});
141+
}
142+
}

0 commit comments

Comments
 (0)