Skip to content

Add hive configs for supported read and write formats #25147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions presto-docs/src/main/sphinx/connector/hive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,18 @@ Property Name Description
``hive.file-status-cache.max-retained-size`` Maximum size in bytes of the directory listing cache ``0KB``

``hive.metastore.catalog.name`` Specifies the catalog name to be passed to the metastore.

``hive.read-formats`` Comma separated list of file formats supported for reads
from tables. This config can be used to fail-fast at the
coordinator for unsupported file readers in Presto C++.
When this config is not set, the default behavior is to
support all registered file readers.

``hive.write-formats`` Comma separated list of file formats supported for writes
to tables. This config can be used to fail-fast at the
coordinator for unsupported file writers in Presto C++.
When this config is not set, the default behavior is to
support all registered file writers.
================================================== ============================================================ ============

Metastore Configuration Properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ public S3SelectTestHelper(String host,
config.getRecursiveDirWalkerEnabled(),
new ConfigBasedCacheQuotaRequirementProvider(cacheConfig),
new HiveEncryptionInformationProvider(ImmutableSet.of()),
new HivePartitionSkippabilityChecker());
new HivePartitionSkippabilityChecker(),
ImmutableList.of());
pageSourceProvider = new HivePageSourceProvider(
config,
hdfsEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ public class HiveClientConfig
private int parquetQuickStatsMaxConcurrentCalls = 500;
private int quickStatsMaxConcurrentCalls = 100;
private boolean legacyTimestampBucketing;
private List<String> readFormats = ImmutableList.of();
private String writeFormats;

@Min(0)
public int getMaxInitialSplits()
Expand Down Expand Up @@ -464,7 +466,7 @@ public List<String> getResourceConfigFiles()
@Config("hive.config.resources")
public HiveClientConfig setResourceConfigFiles(String files)
{
this.resourceConfigFiles = Splitter.on(',').trimResults().omitEmptyStrings().splitToList(files);
this.resourceConfigFiles = SPLITTER.splitToList(files);
return this;
}

Expand Down Expand Up @@ -1831,4 +1833,30 @@ public HiveClientConfig setLegacyTimestampBucketing(boolean legacyTimestampBucke
this.legacyTimestampBucketing = legacyTimestampBucketing;
return this;
}

@Config("hive.read-formats")
@ConfigDescription("File formats supported for read operation.")
public HiveClientConfig setReadFormats(String formats)
{
this.readFormats = SPLITTER.splitToList(formats);
return this;
}

public List<String> getReadFormats()
{
return readFormats;
}

@Config("hive.write-formats")
@ConfigDescription("File formats supported for write operation.")
public HiveClientConfig setWriteFormats(String formats)
{
this.writeFormats = formats;
return this;
}

public String getWriteFormats()
{
return writeFormats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@
import static com.facebook.presto.hive.HiveSessionProperties.getTemporaryTableSchema;
import static com.facebook.presto.hive.HiveSessionProperties.getTemporaryTableStorageFormat;
import static com.facebook.presto.hive.HiveSessionProperties.getVirtualBucketCount;
import static com.facebook.presto.hive.HiveSessionProperties.getWriteFormats;
import static com.facebook.presto.hive.HiveSessionProperties.isBucketExecutionEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isCollectColumnStatisticsOnWrite;
import static com.facebook.presto.hive.HiveSessionProperties.isCreateEmptyBucketFiles;
Expand Down Expand Up @@ -1982,6 +1983,12 @@ private HiveInsertTableHandle beginInsertInternal(ConnectorSession session, Conn
SchemaTableName tableName = ((HiveTableHandle) tableHandle).getSchemaTableName();
Table table = metastore.getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));
HiveStorageFormat tableStorageFormat = extractHiveStorageFormat(table);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even this code is called during scheduling stage executions... Its worthwhile only if the check is done sooner in logical planning or BasePlanFragmenter.

I think extractHiveStorageFormat from a table can be called much sooner. Can you investigate further ?

List<String> writeFormats = getWriteFormats(session);
if (!writeFormats.isEmpty() && !writeFormats.contains(tableStorageFormat.toString())) {
throw new PrestoException(NOT_SUPPORTED,
format("File format %s not supported for write operation.", tableStorageFormat.toString()));
}

tableWritabilityChecker.checkTableWritable(table);

Expand All @@ -2002,7 +2009,6 @@ private HiveInsertTableHandle beginInsertInternal(ConnectorSession session, Conn
.filter(columnHandle -> !columnHandle.isHidden())
.collect(toList());

HiveStorageFormat tableStorageFormat = extractHiveStorageFormat(table);
LocationHandle locationHandle;
boolean isTemporaryTable = table.getTableType().equals(TEMPORARY_TABLE);
boolean tempPathRequired = isTempPathRequired(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
Expand Down Expand Up @@ -134,6 +135,7 @@ public final class HiveSessionProperties
public static final String DYNAMIC_SPLIT_SIZES_ENABLED = "dynamic_split_sizes_enabled";
public static final String SKIP_EMPTY_FILES = "skip_empty_files";
public static final String LEGACY_TIMESTAMP_BUCKETING = "legacy_timestamp_bucketing";
public static final String WRITE_FORMATS = "write_formats";

public static final String NATIVE_STATS_BASED_FILTER_REORDER_DISABLED = "native_stats_based_filter_reorder_disabled";

Expand Down Expand Up @@ -660,6 +662,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
NATIVE_STATS_BASED_FILTER_REORDER_DISABLED,
"Native Execution only. Disable stats based filter reordering.",
false,
true),
stringProperty(
WRITE_FORMATS,
"File formats supported for write operation.",
hiveClientConfig.getWriteFormats(),
true));
}

Expand Down Expand Up @@ -1148,4 +1155,10 @@ public static boolean isLegacyTimestampBucketing(ConnectorSession session)
{
return session.getProperty(LEGACY_TIMESTAMP_BUCKETING, Boolean.class);
}

public static List<String> getWriteFormats(ConnectorSession session)
{
String formats = session.getProperty(WRITE_FORMATS, String.class);
return (formats == null) ? ImmutableList.of() : Splitter.on(',').trimResults().omitEmptyStrings().splitToList(formats);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public class HiveSplitManager
private final CacheQuotaRequirementProvider cacheQuotaRequirementProvider;
private final HiveEncryptionInformationProvider encryptionInformationProvider;
private final PartitionSkippabilityChecker partitionSkippabilityChecker;
private final List<String> readFormats;

@Inject
public HiveSplitManager(
Expand Down Expand Up @@ -181,7 +182,8 @@ public HiveSplitManager(
hiveClientConfig.getRecursiveDirWalkerEnabled(),
cacheQuotaRequirementProvider,
encryptionInformationProvider,
partitionSkippabilityChecker);
partitionSkippabilityChecker,
hiveClientConfig.getReadFormats());
}

public HiveSplitManager(
Expand All @@ -200,7 +202,8 @@ public HiveSplitManager(
boolean recursiveDfsWalkerEnabled,
CacheQuotaRequirementProvider cacheQuotaRequirementProvider,
HiveEncryptionInformationProvider encryptionInformationProvider,
PartitionSkippabilityChecker partitionSkippabilityChecker)
PartitionSkippabilityChecker partitionSkippabilityChecker,
List<String> readFormats)
{
this.hiveTransactionManager = requireNonNull(hiveTransactionManager, "hiveTransactionManager is null");
this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null");
Expand All @@ -219,6 +222,7 @@ public HiveSplitManager(
this.cacheQuotaRequirementProvider = requireNonNull(cacheQuotaRequirementProvider, "cacheQuotaRequirementProvider is null");
this.encryptionInformationProvider = requireNonNull(encryptionInformationProvider, "encryptionInformationProvider is null");
this.partitionSkippabilityChecker = requireNonNull(partitionSkippabilityChecker, "partitionSkippabilityChecker is null");
this.readFormats = requireNonNull(readFormats, "readFormats is null");
}

@Override
Expand Down Expand Up @@ -250,6 +254,20 @@ public ConnectorSplitSource getSplits(
session.getRuntimeStats());
Table table = layout.getTable(metastore, metastoreContext);

if (!readFormats.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we should delay checking the tablelayout for storageFormat until split scheduling. We want to avoid scheduling the fragment at all. Is it possible to do this earlier in BasePlanFragmenter
https://github.com/prestodb/presto/blob/master/presto-main-base/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java#L249 ? The TableLayout is available at this point.

@tdcmeehan : wdyt ?

StorageFormat storageFormat = table.getStorage().getStorageFormat();
Optional<HiveStorageFormat> hiveStorageFormat = getHiveStorageFormat(storageFormat);
if (hiveStorageFormat.isPresent()) {
if (!readFormats.contains(hiveStorageFormat.get().toString())) {
throw new HiveNotReadableException(tableName, Optional.empty(),
format("File format %s not supported for read operation.", hiveStorageFormat.get().toString()));
}
}
else {
throw new HiveNotReadableException(tableName, Optional.empty(), "Storage format is null.");
}
}

if (!isOfflineDataDebugModeEnabled(session)) {
// verify table is not marked as non-readable
String tableNotReadable = table.getParameters().get(OBJECT_NOT_READABLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,8 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi
false,
new ConfigBasedCacheQuotaRequirementProvider(cacheConfig),
encryptionInformationProvider,
new HivePartitionSkippabilityChecker());
new HivePartitionSkippabilityChecker(),
ImmutableList.of());
pageSinkProvider = new HivePageSinkProvider(
getDefaultHiveFileWriterFactories(hiveClientConfig, metastoreClientConfig),
hdfsEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ protected void setup(String host, int port, String databaseName, BiFunction<Hive
config.getRecursiveDirWalkerEnabled(),
new ConfigBasedCacheQuotaRequirementProvider(cacheConfig),
new HiveEncryptionInformationProvider(ImmutableSet.of()),
new HivePartitionSkippabilityChecker());
new HivePartitionSkippabilityChecker(),
ImmutableList.of());
pageSinkProvider = new HivePageSinkProvider(
getDefaultHiveFileWriterFactories(config, metastoreClientConfig),
hdfsEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ public void testDefaults()
.setMaxConcurrentParquetQuickStatsCalls(500)
.setCteVirtualBucketCount(128)
.setSkipEmptyFilesEnabled(false)
.setLegacyTimestampBucketing(false));
.setLegacyTimestampBucketing(false)
.setReadFormats("")
.setWriteFormats(null));
}

@Test
Expand Down Expand Up @@ -293,6 +295,8 @@ public void testExplicitPropertyMappings()
.put("hive.cte-virtual-bucket-count", "256")
.put("hive.skip-empty-files", "true")
.put("hive.legacy-timestamp-bucketing", "true")
.put("hive.read-formats", "DWRF,ORC,PARQUET")
.put("hive.write-formats", "DWRF,PARQUET")
.build();

HiveClientConfig expected = new HiveClientConfig()
Expand Down Expand Up @@ -414,7 +418,9 @@ public void testExplicitPropertyMappings()
.setMaxConcurrentQuickStatsCalls(101)
.setSkipEmptyFilesEnabled(true)
.setCteVirtualBucketCount(256)
.setLegacyTimestampBucketing(true);
.setLegacyTimestampBucketing(true)
.setReadFormats("DWRF,ORC,PARQUET")
.setWriteFormats("DWRF,PARQUET");

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.hive;

import com.facebook.presto.Session;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.RowType;
Expand All @@ -38,6 +39,7 @@
import com.facebook.presto.spi.RecordPageSource;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -107,6 +109,7 @@
import static com.facebook.presto.hive.HiveTestUtils.SESSION;
import static com.facebook.presto.hive.HiveTestUtils.getAllSessionProperties;
import static com.facebook.presto.hive.HiveTestUtils.getTypes;
import static com.facebook.presto.tests.QueryAssertions.assertQueryFails;
import static com.facebook.presto.tests.StructuralTestUtil.arrayBlockOf;
import static com.facebook.presto.tests.StructuralTestUtil.mapBlockOf;
import static com.facebook.presto.tests.StructuralTestUtil.rowBlockOf;
Expand Down Expand Up @@ -963,6 +966,36 @@ public void testSchemaMismatchOnNestedStruct()
.isFailingForPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER), expectedErrorCode, expectedMessageRowLongNest);
}

@Test
public void testReadWriteFormats() throws Exception {
QueryRunner queryRunner = HiveQueryRunner.createQueryRunner(
ImmutableList.of(),
ImmutableMap.of(),
"sql-standard",
ImmutableMap.of("hive.read-formats", "ORC,PARQUET",
"hive.write-formats", "DWRF,ORC"),
Optional.empty());
Session session = queryRunner.getDefaultSession();

// Writes to PARQUET table should fail, reads should succeed.
queryRunner.execute("DROP TABLE IF EXISTS t0");
queryRunner.execute("CREATE TABLE t0(a BIGINT, b DOUBLE) WITH (format = 'PARQUET')");
assertQueryFails(queryRunner, session, "INSERT INTO t0 VALUES (1, 0.5)", ".*File format PARQUET not supported for write operation.*");
queryRunner.execute("SELECT a FROM t0");

// Reads from DWRF table should fail, writes should succeed.
queryRunner.execute("DROP TABLE IF EXISTS t1");
queryRunner.execute("CREATE TABLE t1(a BIGINT, b DOUBLE) WITH (format = 'DWRF')");
queryRunner.execute("INSERT INTO t1 VALUES (1, 0.5)");
assertQueryFails(queryRunner, session, "SELECT b from t1", ".*File format DWRF not supported for read operation.*");

// Writes and reads to ORC table should succeed.
queryRunner.execute("DROP TABLE IF EXISTS t2");
queryRunner.execute(session, "CREATE TABLE t2(a BIGINT, b DOUBLE) WITH (format = 'ORC')");
queryRunner.execute(session, "INSERT INTO t2 VALUES (1, 0.5)");
queryRunner.execute(session, "SELECT * from t2");
}

private void testCursorProvider(HiveRecordCursorProvider cursorProvider,
FileSplit split,
HiveStorageFormat storageFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,8 @@ private void assertRedundantColumnDomains(Range predicateRange, PartitionStatist
false,
new ConfigBasedCacheQuotaRequirementProvider(new CacheConfig()),
new HiveEncryptionInformationProvider(ImmutableList.of()),
new HivePartitionSkippabilityChecker());
new HivePartitionSkippabilityChecker(),
ImmutableList.of());

HiveColumnHandle partitionColumn = new HiveColumnHandle(
"ds",
Expand Down Expand Up @@ -700,7 +701,8 @@ public void testEncryptionInformation()
false,
new ConfigBasedCacheQuotaRequirementProvider(new CacheConfig()),
encryptionInformationProvider,
new HivePartitionSkippabilityChecker());
new HivePartitionSkippabilityChecker(),
ImmutableList.of());

HiveColumnHandle partitionColumn = new HiveColumnHandle(
"ds",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ protected static void assertQueryFailsEventually(QueryRunner queryRunner, Sessio
}
}

protected static void assertQueryFails(QueryRunner queryRunner, Session session, @Language("SQL") String sql, @Language("RegExp") String expectedMessageRegExp)
public static void assertQueryFails(QueryRunner queryRunner, Session session, @Language("SQL") String sql, @Language("RegExp") String expectedMessageRegExp)
{
try {
queryRunner.execute(session, sql);
Expand Down
Loading