diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriterFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriterFactory.java index 4ac892cb21678..80cf127159699 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriterFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriterFactory.java @@ -476,9 +476,6 @@ private WriterParameters getWriterParametersForExistingUnpartitionedTable(Option { // Note: temporary table is always empty at this step if (!table.getTableType().equals(TEMPORARY_TABLE)) { - if (bucketNumber.isPresent()) { - throw new PrestoException(HIVE_PARTITION_READ_ONLY, "Cannot insert into bucketed unpartitioned Hive table"); - } if (immutablePartitions) { throw new PrestoException(HIVE_PARTITION_READ_ONLY, "Unpartitioned Hive tables are immutable"); } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java index 0ca908038c861..d60fb4a2a6a57 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java @@ -863,6 +863,117 @@ public void testCreateTableNonSupportedVarcharColumn() assertUpdate("CREATE TABLE test_create_table_non_supported_varchar_column (apple varchar(65536))"); } + @Test + public void testEmptyBucketedTable() + { + // go through all storage formats to make sure the empty buckets are correctly created + testWithAllStorageFormats(this::testEmptyBucketedTable); + } + + private void testEmptyBucketedTable(Session session, HiveStorageFormat storageFormat) + { + testEmptyBucketedTable(session, storageFormat, true, true); + testEmptyBucketedTable(session, storageFormat, true, false); + testEmptyBucketedTable(session, storageFormat, false, true); + testEmptyBucketedTable(session, storageFormat, false, false); + } + + private void testEmptyBucketedTable(Session session, HiveStorageFormat storageFormat, boolean optimizedPartitionUpdateSerializationEnabled, boolean createEmpty) + { + String tableName = "test_empty_bucketed_table"; + + @Language("SQL") String createTable = "" + + "CREATE TABLE " + tableName + " " + + "(bucket_key VARCHAR, col_1 VARCHAR, col2 VARCHAR) " + + "WITH (" + + "format = '" + storageFormat + "', " + + "bucketed_by = ARRAY[ 'bucket_key' ], " + + "bucket_count = 11 " + + ") "; + + assertUpdate(createTable); + + TableMetadata tableMetadata = getTableMetadata(catalog, TPCH_SCHEMA, tableName); + assertEquals(tableMetadata.getMetadata().getProperties().get(STORAGE_FORMAT_PROPERTY), storageFormat); + + assertNull(tableMetadata.getMetadata().getProperties().get(PARTITIONED_BY_PROPERTY)); + assertEquals(tableMetadata.getMetadata().getProperties().get(BUCKETED_BY_PROPERTY), ImmutableList.of("bucket_key")); + assertEquals(tableMetadata.getMetadata().getProperties().get(BUCKET_COUNT_PROPERTY), 11); + + assertEquals(computeActual("SELECT * from " + tableName).getRowCount(), 0); + + // make sure that we will get one file per bucket regardless of writer count configured + Session parallelWriter = Session.builder(getTableWriteTestingSession(optimizedPartitionUpdateSerializationEnabled)) + .setCatalogSessionProperty(catalog, "create_empty_bucket_files", String.valueOf(createEmpty)) + .build(); + assertUpdate(parallelWriter, "INSERT INTO " + tableName + " VALUES ('a0', 'b0', 'c0')", 1); + assertUpdate(parallelWriter, "INSERT INTO " + tableName + " VALUES ('a1', 'b1', 'c1')", 1); + + assertQuery("SELECT * from " + tableName, "VALUES ('a0', 'b0', 'c0'), ('a1', 'b1', 'c1')"); + + assertUpdate(session, "DROP TABLE " + tableName); + assertFalse(getQueryRunner().tableExists(session, tableName)); + } + + @Test + public void testBucketedTable() + { + // go through all storage formats to make sure the empty buckets are correctly created + testWithAllStorageFormats(this::testBucketedTable); + } + + private void testBucketedTable(Session session, HiveStorageFormat storageFormat) + { + testBucketedTable(session, storageFormat, true, true); + testBucketedTable(session, storageFormat, true, false); + testBucketedTable(session, storageFormat, false, true); + testBucketedTable(session, storageFormat, false, false); + } + + private void testBucketedTable(Session session, HiveStorageFormat storageFormat, boolean optimizedPartitionUpdateSerializationEnabled, boolean createEmpty) + { + String tableName = "test_bucketed_table"; + + @Language("SQL") String createTable = "" + + "CREATE TABLE " + tableName + " " + + "WITH (" + + "format = '" + storageFormat + "', " + + "bucketed_by = ARRAY[ 'bucket_key' ], " + + "bucket_count = 11 " + + ") " + + "AS " + + "SELECT * " + + "FROM (" + + "VALUES " + + " (VARCHAR 'a', VARCHAR 'b', VARCHAR 'c'), " + + " ('aa', 'bb', 'cc'), " + + " ('aaa', 'bbb', 'ccc')" + + ") t (bucket_key, col_1, col_2)"; + + // make sure that we will get one file per bucket regardless of writer count configured + Session parallelWriter = Session.builder(getTableWriteTestingSession(optimizedPartitionUpdateSerializationEnabled)) + .setCatalogSessionProperty(catalog, "create_empty_bucket_files", String.valueOf(createEmpty)) + .build(); + assertUpdate(parallelWriter, createTable, 3); + + TableMetadata tableMetadata = getTableMetadata(catalog, TPCH_SCHEMA, tableName); + assertEquals(tableMetadata.getMetadata().getProperties().get(STORAGE_FORMAT_PROPERTY), storageFormat); + + assertNull(tableMetadata.getMetadata().getProperties().get(PARTITIONED_BY_PROPERTY)); + assertEquals(tableMetadata.getMetadata().getProperties().get(BUCKETED_BY_PROPERTY), ImmutableList.of("bucket_key")); + assertEquals(tableMetadata.getMetadata().getProperties().get(BUCKET_COUNT_PROPERTY), 11); + + assertQuery("SELECT * from " + tableName, "VALUES ('a', 'b', 'c'), ('aa', 'bb', 'cc'), ('aaa', 'bbb', 'ccc')"); + + assertUpdate(parallelWriter, "INSERT INTO " + tableName + " VALUES ('a0', 'b0', 'c0')", 1); + assertUpdate(parallelWriter, "INSERT INTO " + tableName + " VALUES ('a1', 'b1', 'c1')", 1); + + assertQuery("SELECT * from " + tableName, "VALUES ('a', 'b', 'c'), ('aa', 'bb', 'cc'), ('aaa', 'bbb', 'ccc'), ('a0', 'b0', 'c0'), ('a1', 'b1', 'c1')"); + + assertUpdate(session, "DROP TABLE " + tableName); + assertFalse(getQueryRunner().tableExists(session, tableName)); + } + @Test public void testCreatePartitionedBucketedTableAsFewRows() { diff --git a/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBasicTableStatistics.java b/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBasicTableStatistics.java index e9f033422e15b..b891e28e3dd6d 100644 --- a/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBasicTableStatistics.java +++ b/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBasicTableStatistics.java @@ -278,13 +278,18 @@ public void testInsertBucketed() assertThat(statisticsAfterCreate.getNumRows().getAsLong()).isEqualTo(25); assertThat(statisticsAfterCreate.getNumFiles().getAsLong()).isEqualTo(50); - // Insert into bucketed unpartitioned table is unsupported - assertThatThrownBy(() -> insertNationData(onPresto(), tableName)) - .hasMessageContaining("Cannot insert into bucketed unpartitioned Hive table"); + insertNationData(onPresto(), tableName); BasicStatistics statisticsAfterInsert = getBasicStatisticsForTable(onHive(), tableName); - assertThat(statisticsAfterInsert.getNumRows().getAsLong()).isEqualTo(25); - assertThat(statisticsAfterCreate.getNumFiles().getAsLong()).isEqualTo(50); + + assertThat(statisticsAfterInsert.getNumRows().getAsLong()).isEqualTo(50); + assertThat(statisticsAfterInsert.getNumFiles().getAsLong()).isEqualTo(100); + + insertNationData(onPresto(), tableName); + + BasicStatistics statisticsAfterInsert2 = getBasicStatisticsForTable(onHive(), tableName); + assertThat(statisticsAfterInsert2.getNumRows().getAsLong()).isEqualTo(75); + assertThat(statisticsAfterInsert2.getNumFiles().getAsLong()).isEqualTo(150); } finally { onPresto().executeQuery(format("DROP TABLE IF EXISTS %s", tableName)); diff --git a/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBucketedTables.java b/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBucketedTables.java index 18842446f8b74..7c04e3fea59f4 100644 --- a/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBucketedTables.java +++ b/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBucketedTables.java @@ -44,35 +44,35 @@ public class TestHiveBucketedTables implements RequirementsProvider { @TableDefinitionsRepository.RepositoryTableDefinition - public static final HiveTableDefinition BUCKETED_PARTITIONED_NATION = HiveTableDefinition.builder("bucket_partition_nation") - .setCreateTableDDLTemplate("CREATE TABLE %NAME%(" + - "n_nationkey BIGINT," + - "n_name STRING," + - "n_regionkey BIGINT," + - "n_comment STRING) " + - "PARTITIONED BY (part_key STRING) " + - "CLUSTERED BY (n_regionkey) " + - "INTO 2 BUCKETS " + - "ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'") - .setNoData() - .build(); + public static final HiveTableDefinition BUCKETED_NATION = bucketTableDefinition("bucket_nation", false, true); @TableDefinitionsRepository.RepositoryTableDefinition - public static final HiveTableDefinition PARTITIONED_NATION = HiveTableDefinition.builder("partitioned_nation") - .setCreateTableDDLTemplate("CREATE TABLE %NAME%(" + - "n_nationkey BIGINT," + - "n_name STRING," + - "n_regionkey BIGINT," + - "n_comment STRING) " + - "PARTITIONED BY (part_key STRING) " + - "ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'") - .setNoData() - .build(); + public static final HiveTableDefinition BUCKETED_PARTITIONED_NATION = bucketTableDefinition("bucket_partitioned_nation", true, true); + + @TableDefinitionsRepository.RepositoryTableDefinition + public static final HiveTableDefinition PARTITIONED_NATION = bucketTableDefinition("partitioned_nation", true, false); + + private static HiveTableDefinition bucketTableDefinition(String tableName, boolean partitioned, boolean bucketed) + { + return HiveTableDefinition.builder(tableName) + .setCreateTableDDLTemplate("CREATE TABLE %NAME%(" + + "n_nationkey BIGINT," + + "n_name STRING," + + "n_regionkey BIGINT," + + "n_comment STRING) " + + (partitioned ? "PARTITIONED BY (part_key STRING) " : " ") + + "CLUSTERED BY (n_regionkey) " + + (bucketed ? "INTO 2 BUCKETS " : " ") + + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'") + .setNoData() + .build(); + } @Override public Requirement getRequirements(Configuration configuration) { return Requirements.compose( + MutableTableRequirement.builder(BUCKETED_NATION).withState(CREATED).build(), MutableTableRequirement.builder(BUCKETED_PARTITIONED_NATION).withState(CREATED).build(), immutableTable(NATION)); } @@ -167,4 +167,17 @@ private static void disableBucketedExecution() throw new RuntimeException(e); } } + + @Test + public void testInsertIntoBucketedTables() + { + String tableName = mutableTablesState().get(BUCKETED_NATION).getNameInDatabase(); + + query(format("INSERT INTO %s SELECT * FROM %s", tableName, NATION.getName())); + // make sure that insert will not overwrite existing data + query(format("INSERT INTO %s SELECT * FROM %s", tableName, NATION.getName())); + + assertThat(query(format("SELECT count(*) FROM %s", tableName))).containsExactly(row(50)); + assertThat(query(format("SELECT count(*) FROM %s WHERE n_regionkey=0", tableName))).containsExactly(row(10)); + } }