Skip to content

Commit 56bf620

Browse files
authored
Minor Fixups
This commit fixes up a few things I noticed when previewing the PR to Trino. * Column/Field name casing should be preserved when writing * Some missing operational/metrics calls in IonPageSource * Throw clearer Exception for errors in IonFileWriter * Move some tests from TestHiveFileFormats to IonPageSourceSmokeTest * Add test for Timestamp Encoding
1 parent 0d593c2 commit 56bf620

File tree

7 files changed

+50
-74
lines changed

7 files changed

+50
-74
lines changed

Diff for: lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonEncoderFactory.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import java.time.ZoneId;
5252
import java.util.Date;
5353
import java.util.List;
54-
import java.util.Locale;
5554
import java.util.Optional;
5655
import java.util.function.IntFunction;
5756

@@ -64,7 +63,7 @@ private IonEncoderFactory() {}
6463
public static IonEncoder buildEncoder(List<Column> columns)
6564
{
6665
return RowEncoder.forFields(columns.stream()
67-
.map(c -> new RowType.Field(Optional.of(c.name().toLowerCase(Locale.ROOT)), c.type()))
66+
.map(c -> new RowType.Field(Optional.of(c.name()), c.type()))
6867
.toList());
6968
}
7069

@@ -89,8 +88,7 @@ private static BlockEncoder encoderForType(Type type)
8988
case DecimalType t -> decimalEncoder(t);
9089
case DateType _ -> dateEncoder;
9190
case TimestampType t -> timestampEncoder(t);
92-
case MapType t -> new MapEncoder(t, t.getKeyType(),
93-
encoderForType(t.getValueType()));
91+
case MapType t -> new MapEncoder(t, t.getKeyType(), encoderForType(t.getValueType()));
9492
case RowType t -> RowEncoder.forFields(t.getFields());
9593
case ArrayType t -> new ArrayEncoder(wrapEncoder(encoderForType(t.getElementType())));
9694
default -> throw new IllegalArgumentException(String.format("Unsupported type: %s", type));
@@ -119,7 +117,7 @@ private static RowEncoder forFields(List<RowType.Field> fields)
119117
ImmutableList.Builder<BlockEncoder> fieldEncodersBuilder = ImmutableList.builder();
120118

121119
for (RowType.Field field : fields) {
122-
fieldNamesBuilder.add(field.getName().get().toLowerCase(Locale.ROOT));
120+
fieldNamesBuilder.add(field.getName().get());
123121
fieldEncodersBuilder.add(wrapEncoder(encoderForType(field.getType())));
124122
}
125123

Diff for: lib/trino-hive-formats/src/test/java/io/trino/hive/formats/ion/TestIonFormat.java

+21
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,27 @@ public void testEncode()
435435
assertIonEquivalence(TEST_COLUMNS, page, ionText);
436436
}
437437

438+
@Test
439+
public void testEncodeTimestamp()
440+
throws IOException
441+
{
442+
List<Column> timestampColumn = List.of(new Column("my_ts", TimestampType.TIMESTAMP_NANOS, 0));
443+
Page page = toPage(timestampColumn, List.of(
444+
toSqlTimestamp(TimestampType.TIMESTAMP_NANOS, LocalDateTime.of(2024, 11, 23, 1, 23, 45, 666777888))));
445+
assertIonEquivalence(timestampColumn, page, "{ my_ts: 2024-11-23T01:23:45.666777888Z }");
446+
}
447+
448+
@Test
449+
public void testEncodeMixedCaseColumn()
450+
throws IOException
451+
{
452+
List<Column> casedColumn = List.of(
453+
new Column("TheAnswer", INTEGER, 0));
454+
455+
Page page = toPage(casedColumn, List.of(42));
456+
assertIonEquivalence(casedColumn, page, "{ TheAnswer: 42 }");
457+
}
458+
438459
@Test
439460
public void testEncodeWithNullField()
440461
throws IOException

Diff for: plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriter.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.function.LongSupplier;
3434

3535
import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR;
36+
import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
3637

3738
public class IonFileWriter
3839
implements FileWriter
@@ -106,7 +107,7 @@ public void rollback()
106107
writer.close();
107108
}
108109
catch (IOException e) {
109-
throw new RuntimeException(e);
110+
throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write to Hive", e);
110111
}
111112
}
112113

@@ -123,7 +124,7 @@ public void appendRows(Page page)
123124
pageEncoder.encode(writer, page);
124125
}
125126
catch (IOException e) {
126-
throw new RuntimeException(e);
127+
throw new TrinoException(HIVE_WRITER_DATA_ERROR, e);
127128
}
128129
}
129130
}

Diff for: plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriterFactory.java

-2
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,6 @@ public Optional<FileWriter> createFileWriter(
8585

8686
Closeable rollbackAction = () -> fileSystem.deleteFile(location);
8787

88-
// we take the column names from the schema, not what was input
89-
// this is what the LineWriterFactory does, I don't understand why
9088
List<String> fileColumnNames = getColumnNames(schema);
9189
List<Type> fileColumnTypes = getColumnTypes(schema).stream()
9290
.map(hiveType -> getType(hiveType, typeManager, getTimestampPrecision(session)))

Diff for: plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonPageSource.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package io.trino.plugin.hive.ion;
1515

16+
import com.amazon.ion.IonBufferConfiguration;
1617
import com.amazon.ion.IonReader;
1718
import com.amazon.ion.IonType;
1819
import io.trino.hive.formats.ion.IonDecoder;
@@ -24,9 +25,13 @@
2425
import java.util.OptionalLong;
2526
import java.util.function.LongSupplier;
2627

28+
import static io.airlift.slice.SizeOf.instanceSize;
29+
2730
public class IonPageSource
2831
implements ConnectorPageSource
2932
{
33+
private static final int INSTANCE_SIZE = instanceSize(IonPageSource.class);
34+
3035
private final IonReader ionReader;
3136
private final PageBuilder pageBuilder;
3237
private final IonDecoder decoder;
@@ -86,7 +91,10 @@ public Page getNextPage()
8691
@Override
8792
public long getMemoryUsage()
8893
{
89-
return 4096;
94+
// we don't have the ability to ask an IonReader how many bytes it has buffered
95+
// it will buffer as much as is needed for each top-level-value.
96+
int assumedIonBufferSize = IonBufferConfiguration.DEFAULT.getInitialBufferSize() * 4;
97+
return INSTANCE_SIZE + assumedIonBufferSize + pageBuilder.getRetainedSizeInBytes();
9098
}
9199

92100
@Override

Diff for: plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java

+4-64
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@
139139
import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.PARTITION_KEY;
140140
import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR;
141141
import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn;
142-
import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR;
143142
import static io.trino.plugin.hive.HivePageSourceProvider.ColumnMapping.buildColumnMappings;
144143
import static io.trino.plugin.hive.HivePartitionKey.HIVE_DEFAULT_DYNAMIC_PARTITION;
145144
import static io.trino.plugin.hive.HiveStorageFormat.AVRO;
@@ -157,8 +156,6 @@
157156
import static io.trino.plugin.hive.HiveTestUtils.getHiveSession;
158157
import static io.trino.plugin.hive.HiveTestUtils.mapType;
159158
import static io.trino.plugin.hive.acid.AcidTransaction.NO_ACID_TRANSACTION;
160-
import static io.trino.plugin.hive.ion.IonWriterOptions.ION_ENCODING_PROPERTY;
161-
import static io.trino.plugin.hive.ion.IonWriterOptions.TEXT_ENCODING;
162159
import static io.trino.plugin.hive.util.HiveTypeTranslator.toHiveType;
163160
import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMNS;
164161
import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMN_TYPES;
@@ -234,7 +231,6 @@ public final class TestHiveFileFormats
234231
private static final FileFormatDataSourceStats STATS = new FileFormatDataSourceStats();
235232
private static final ConnectorSession PARQUET_SESSION = getHiveSession(createParquetHiveConfig(false));
236233
private static final ConnectorSession PARQUET_SESSION_USE_NAME = getHiveSession(createParquetHiveConfig(true));
237-
private static final String ERROR_ENCODING = "error_encoding";
238234

239235
@DataProvider(name = "rowCount")
240236
public static Object[][] rowCountProvider()
@@ -377,7 +373,8 @@ public void testIonWithBinaryEncoding(int rowCount, long fileSizePadding)
377373
throws Exception
378374
{
379375
List<TestColumn> testColumns = TEST_COLUMNS.stream()
380-
// todo: add support for maps to trino impl
376+
// even though maps with text keys work with the native trino impl
377+
// there is an error when testing against the hive serde
381378
.filter(tc -> !(tc.type instanceof MapType))
382379
.collect(toList());
383380

@@ -394,54 +391,6 @@ public void testIonWithBinaryEncoding(int rowCount, long fileSizePadding)
394391
.isReadableByPageSource(fileSystemFactory -> new IonPageSourceFactory(fileSystemFactory, hiveConfig));
395392
}
396393

397-
@Test(dataProvider = "validRowAndFileSizePadding")
398-
public void testIonWithTextEncoding(int rowCount, long fileSizePadding)
399-
throws Exception
400-
{
401-
List<TestColumn> testColumns = TEST_COLUMNS.stream()
402-
// todo: add support for maps to trino impl
403-
.filter(tc -> !(tc.type instanceof MapType))
404-
.collect(toList());
405-
406-
HiveConfig hiveConfig = new HiveConfig();
407-
// enable Ion native trino integration for testing while the implementation is in progress
408-
// TODO: In future this flag should change to `true` as default and then the following statement can be removed.
409-
hiveConfig.setIonNativeTrinoEnabled(true);
410-
411-
assertThatFileFormat(ION)
412-
.withColumns(testColumns)
413-
.withRowsCount(rowCount)
414-
.withFileSizePadding(fileSizePadding)
415-
.withTableProperties(ImmutableMap.of(ION_ENCODING_PROPERTY, TEXT_ENCODING))
416-
.withFileWriterFactory(fileSystemFactory -> new IonFileWriterFactory(fileSystemFactory, TESTING_TYPE_MANAGER))
417-
.isReadableByPageSource(fileSystemFactory -> new IonPageSourceFactory(fileSystemFactory, hiveConfig));
418-
}
419-
420-
@Test(dataProvider = "validRowAndFileSizePadding")
421-
public void testInvalidIonEncoding(int rowCount, long fileSizePadding)
422-
throws Exception
423-
{
424-
List<TestColumn> testColumns = TEST_COLUMNS.stream()
425-
// todo: add support for maps to trino impl
426-
.filter(tc -> !(tc.type instanceof MapType))
427-
.collect(toList());
428-
429-
HiveConfig hiveConfig = new HiveConfig();
430-
// enable Ion native trino integration for testing while the implementation is in progress
431-
// TODO: In future this flag should change to `true` as default and then the following statement can be removed.
432-
hiveConfig.setIonNativeTrinoEnabled(true);
433-
434-
assertTrinoExceptionThrownBy(() -> assertThatFileFormat(ION)
435-
.withColumns(testColumns)
436-
.withRowsCount(rowCount)
437-
.withFileSizePadding(fileSizePadding)
438-
.withTableProperties(ImmutableMap.of(ION_ENCODING_PROPERTY, ERROR_ENCODING))
439-
.withFileWriterFactory(fileSystemFactory -> new IonFileWriterFactory(fileSystemFactory, TESTING_TYPE_MANAGER))
440-
.isReadableByPageSource(fileSystemFactory -> new IonPageSourceFactory(fileSystemFactory, hiveConfig)))
441-
.hasErrorCode(HIVE_WRITER_OPEN_ERROR)
442-
.hasMessage("Error creating Ion Output");
443-
}
444-
445394
@Test(dataProvider = "validRowAndFileSizePadding")
446395
public void testRcTextPageSource(int rowCount, long fileSizePadding)
447396
throws Exception
@@ -1275,7 +1224,6 @@ private static class FileFormatAssertion
12751224
private boolean skipGenericWrite;
12761225
private HiveFileWriterFactory fileWriterFactory;
12771226
private long fileSizePadding;
1278-
private Map<String, String> customTableProperties = ImmutableMap.of();
12791227

12801228
private final TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory();
12811229

@@ -1333,12 +1281,6 @@ public FileFormatAssertion withRowsCount(int rowsCount)
13331281
return this;
13341282
}
13351283

1336-
public FileFormatAssertion withTableProperties(Map<String, String> tableProperties)
1337-
{
1338-
this.customTableProperties = requireNonNull(tableProperties, "customTableProperties is null");
1339-
return this;
1340-
}
1341-
13421284
public FileFormatAssertion withSession(ConnectorSession session)
13431285
{
13441286
this.session = requireNonNull(session, "session is null");
@@ -1397,7 +1339,7 @@ private void assertRead(HivePageSourceFactory pageSourceFactory)
13971339
if (fileWriterFactory == null) {
13981340
continue;
13991341
}
1400-
createTestFileTrino(location, storageFormat, compressionCodec, writeColumns, session, rowsCount, fileWriterFactory, customTableProperties);
1342+
createTestFileTrino(location, storageFormat, compressionCodec, writeColumns, session, rowsCount, fileWriterFactory);
14011343
}
14021344
else {
14031345
if (skipGenericWrite) {
@@ -1427,8 +1369,7 @@ private static void createTestFileTrino(
14271369
List<TestColumn> testColumns,
14281370
ConnectorSession session,
14291371
int numRows,
1430-
HiveFileWriterFactory fileWriterFactory,
1431-
Map<String, String> customTableProperties)
1372+
HiveFileWriterFactory fileWriterFactory)
14321373
{
14331374
// filter out partition keys, which are not written to the file
14341375
testColumns = testColumns.stream()
@@ -1453,7 +1394,6 @@ private static void createTestFileTrino(
14531394
Map<String, String> tableProperties = ImmutableMap.<String, String>builder()
14541395
.put(LIST_COLUMNS, testColumns.stream().map(TestColumn::name).collect(Collectors.joining(",")))
14551396
.put(LIST_COLUMN_TYPES, testColumns.stream().map(TestColumn::type).map(HiveTypeTranslator::toHiveType).map(HiveType::toString).collect(Collectors.joining(",")))
1456-
.putAll(customTableProperties)
14571397
.buildOrThrow();
14581398

14591399
Optional<FileWriter> fileWriter = fileWriterFactory.createFileWriter(

Diff for: plugin/trino-hive/src/test/java/io/trino/plugin/hive/ion/IonPageSourceSmokeTest.java

+10
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,16 @@ public void testBinaryEncoding()
251251
assertEncoding(tableColumns, BINARY_ENCODING);
252252
}
253253

254+
@Test
255+
public void testBadEncodingName()
256+
throws IOException
257+
{
258+
TestFixture fixture = new TestFixture(FOO_BAR_COLUMNS)
259+
.withEncoding("unknown_encoding_name");
260+
261+
Assertions.assertThrows(TrinoException.class, fixture::getFileWriter);
262+
}
263+
254264
private void assertEncoding(List<HiveColumnHandle> tableColumns,
255265
String encoding)
256266
throws IOException

0 commit comments

Comments
 (0)