Skip to content

Commit 38805a8

Browse files
Insert into bucketed but unpartitioned Hive table
1 parent 5bbdf93 commit 38805a8

File tree

4 files changed

+156
-30
lines changed

4 files changed

+156
-30
lines changed

presto-hive/src/main/java/com/facebook/presto/hive/HiveWriterFactory.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -476,9 +476,6 @@ private WriterParameters getWriterParametersForExistingUnpartitionedTable(Option
476476
{
477477
// Note: temporary table is always empty at this step
478478
if (!table.getTableType().equals(TEMPORARY_TABLE)) {
479-
if (bucketNumber.isPresent()) {
480-
throw new PrestoException(HIVE_PARTITION_READ_ONLY, "Cannot insert into bucketed unpartitioned Hive table");
481-
}
482479
if (immutablePartitions) {
483480
throw new PrestoException(HIVE_PARTITION_READ_ONLY, "Unpartitioned Hive tables are immutable");
484481
}

presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -863,6 +863,117 @@ public void testCreateTableNonSupportedVarcharColumn()
863863
assertUpdate("CREATE TABLE test_create_table_non_supported_varchar_column (apple varchar(65536))");
864864
}
865865

866+
@Test
867+
public void testEmptyBucketedTable()
868+
{
869+
// go through all storage formats to make sure the empty buckets are correctly created
870+
testWithAllStorageFormats(this::testEmptyBucketedTable);
871+
}
872+
873+
private void testEmptyBucketedTable(Session session, HiveStorageFormat storageFormat)
874+
{
875+
testEmptyBucketedTable(session, storageFormat, true, true);
876+
testEmptyBucketedTable(session, storageFormat, true, false);
877+
testEmptyBucketedTable(session, storageFormat, false, true);
878+
testEmptyBucketedTable(session, storageFormat, false, false);
879+
}
880+
881+
private void testEmptyBucketedTable(Session session, HiveStorageFormat storageFormat, boolean optimizedPartitionUpdateSerializationEnabled, boolean createEmpty)
882+
{
883+
String tableName = "test_empty_bucketed_table";
884+
885+
@Language("SQL") String createTable = "" +
886+
"CREATE TABLE " + tableName + " " +
887+
"(bucket_key VARCHAR, col_1 VARCHAR, col2 VARCHAR) " +
888+
"WITH (" +
889+
"format = '" + storageFormat + "', " +
890+
"bucketed_by = ARRAY[ 'bucket_key' ], " +
891+
"bucket_count = 11 " +
892+
") ";
893+
894+
assertUpdate(createTable);
895+
896+
TableMetadata tableMetadata = getTableMetadata(catalog, TPCH_SCHEMA, tableName);
897+
assertEquals(tableMetadata.getMetadata().getProperties().get(STORAGE_FORMAT_PROPERTY), storageFormat);
898+
899+
assertNull(tableMetadata.getMetadata().getProperties().get(PARTITIONED_BY_PROPERTY));
900+
assertEquals(tableMetadata.getMetadata().getProperties().get(BUCKETED_BY_PROPERTY), ImmutableList.of("bucket_key"));
901+
assertEquals(tableMetadata.getMetadata().getProperties().get(BUCKET_COUNT_PROPERTY), 11);
902+
903+
assertEquals(computeActual("SELECT * from " + tableName).getRowCount(), 0);
904+
905+
// make sure that we will get one file per bucket regardless of writer count configured
906+
Session parallelWriter = Session.builder(getTableWriteTestingSession(optimizedPartitionUpdateSerializationEnabled))
907+
.setCatalogSessionProperty(catalog, "create_empty_bucket_files", String.valueOf(createEmpty))
908+
.build();
909+
assertUpdate(parallelWriter, "INSERT INTO " + tableName + " VALUES ('a0', 'b0', 'c0')", 1);
910+
assertUpdate(parallelWriter, "INSERT INTO " + tableName + " VALUES ('a1', 'b1', 'c1')", 1);
911+
912+
assertQuery("SELECT * from " + tableName, "VALUES ('a0', 'b0', 'c0'), ('a1', 'b1', 'c1')");
913+
914+
assertUpdate(session, "DROP TABLE " + tableName);
915+
assertFalse(getQueryRunner().tableExists(session, tableName));
916+
}
917+
918+
@Test
919+
public void testBucketedTable()
920+
{
921+
// go through all storage formats to make sure the empty buckets are correctly created
922+
testWithAllStorageFormats(this::testBucketedTable);
923+
}
924+
925+
private void testBucketedTable(Session session, HiveStorageFormat storageFormat)
926+
{
927+
testBucketedTable(session, storageFormat, true, true);
928+
testBucketedTable(session, storageFormat, true, false);
929+
testBucketedTable(session, storageFormat, false, true);
930+
testBucketedTable(session, storageFormat, false, false);
931+
}
932+
933+
private void testBucketedTable(Session session, HiveStorageFormat storageFormat, boolean optimizedPartitionUpdateSerializationEnabled, boolean createEmpty)
934+
{
935+
String tableName = "test_bucketed_table";
936+
937+
@Language("SQL") String createTable = "" +
938+
"CREATE TABLE " + tableName + " " +
939+
"WITH (" +
940+
"format = '" + storageFormat + "', " +
941+
"bucketed_by = ARRAY[ 'bucket_key' ], " +
942+
"bucket_count = 11 " +
943+
") " +
944+
"AS " +
945+
"SELECT * " +
946+
"FROM (" +
947+
"VALUES " +
948+
" (VARCHAR 'a', VARCHAR 'b', VARCHAR 'c'), " +
949+
" ('aa', 'bb', 'cc'), " +
950+
" ('aaa', 'bbb', 'ccc')" +
951+
") t (bucket_key, col_1, col_2)";
952+
953+
// make sure that we will get one file per bucket regardless of writer count configured
954+
Session parallelWriter = Session.builder(getTableWriteTestingSession(optimizedPartitionUpdateSerializationEnabled))
955+
.setCatalogSessionProperty(catalog, "create_empty_bucket_files", String.valueOf(createEmpty))
956+
.build();
957+
assertUpdate(parallelWriter, createTable, 3);
958+
959+
TableMetadata tableMetadata = getTableMetadata(catalog, TPCH_SCHEMA, tableName);
960+
assertEquals(tableMetadata.getMetadata().getProperties().get(STORAGE_FORMAT_PROPERTY), storageFormat);
961+
962+
assertNull(tableMetadata.getMetadata().getProperties().get(PARTITIONED_BY_PROPERTY));
963+
assertEquals(tableMetadata.getMetadata().getProperties().get(BUCKETED_BY_PROPERTY), ImmutableList.of("bucket_key"));
964+
assertEquals(tableMetadata.getMetadata().getProperties().get(BUCKET_COUNT_PROPERTY), 11);
965+
966+
assertQuery("SELECT * from " + tableName, "VALUES ('a', 'b', 'c'), ('aa', 'bb', 'cc'), ('aaa', 'bbb', 'ccc')");
967+
968+
assertUpdate(parallelWriter, "INSERT INTO " + tableName + " VALUES ('a0', 'b0', 'c0')", 1);
969+
assertUpdate(parallelWriter, "INSERT INTO " + tableName + " VALUES ('a1', 'b1', 'c1')", 1);
970+
971+
assertQuery("SELECT * from " + tableName, "VALUES ('a', 'b', 'c'), ('aa', 'bb', 'cc'), ('aaa', 'bbb', 'ccc'), ('a0', 'b0', 'c0'), ('a1', 'b1', 'c1')");
972+
973+
assertUpdate(session, "DROP TABLE " + tableName);
974+
assertFalse(getQueryRunner().tableExists(session, tableName));
975+
}
976+
866977
@Test
867978
public void testCreatePartitionedBucketedTableAsFewRows()
868979
{

presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBasicTableStatistics.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -278,13 +278,18 @@ public void testInsertBucketed()
278278
assertThat(statisticsAfterCreate.getNumRows().getAsLong()).isEqualTo(25);
279279
assertThat(statisticsAfterCreate.getNumFiles().getAsLong()).isEqualTo(50);
280280

281-
// Insert into bucketed unpartitioned table is unsupported
282-
assertThatThrownBy(() -> insertNationData(onPresto(), tableName))
283-
.hasMessageContaining("Cannot insert into bucketed unpartitioned Hive table");
281+
insertNationData(onPresto(), tableName);
284282

285283
BasicStatistics statisticsAfterInsert = getBasicStatisticsForTable(onHive(), tableName);
286-
assertThat(statisticsAfterInsert.getNumRows().getAsLong()).isEqualTo(25);
287-
assertThat(statisticsAfterCreate.getNumFiles().getAsLong()).isEqualTo(50);
284+
285+
assertThat(statisticsAfterInsert.getNumRows().getAsLong()).isEqualTo(50);
286+
assertThat(statisticsAfterInsert.getNumFiles().getAsLong()).isEqualTo(100);
287+
288+
insertNationData(onPresto(), tableName);
289+
290+
BasicStatistics statisticsAfterInsert2 = getBasicStatisticsForTable(onHive(), tableName);
291+
assertThat(statisticsAfterInsert2.getNumRows().getAsLong()).isEqualTo(75);
292+
assertThat(statisticsAfterInsert2.getNumFiles().getAsLong()).isEqualTo(150);
288293
}
289294
finally {
290295
onPresto().executeQuery(format("DROP TABLE IF EXISTS %s", tableName));

presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBucketedTables.java

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -44,35 +44,35 @@ public class TestHiveBucketedTables
4444
implements RequirementsProvider
4545
{
4646
@TableDefinitionsRepository.RepositoryTableDefinition
47-
public static final HiveTableDefinition BUCKETED_PARTITIONED_NATION = HiveTableDefinition.builder("bucket_partition_nation")
48-
.setCreateTableDDLTemplate("CREATE TABLE %NAME%(" +
49-
"n_nationkey BIGINT," +
50-
"n_name STRING," +
51-
"n_regionkey BIGINT," +
52-
"n_comment STRING) " +
53-
"PARTITIONED BY (part_key STRING) " +
54-
"CLUSTERED BY (n_regionkey) " +
55-
"INTO 2 BUCKETS " +
56-
"ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'")
57-
.setNoData()
58-
.build();
47+
public static final HiveTableDefinition BUCKETED_NATION = bucketTableDefinition("bucket_nation", false, true);
5948

6049
@TableDefinitionsRepository.RepositoryTableDefinition
61-
public static final HiveTableDefinition PARTITIONED_NATION = HiveTableDefinition.builder("partitioned_nation")
62-
.setCreateTableDDLTemplate("CREATE TABLE %NAME%(" +
63-
"n_nationkey BIGINT," +
64-
"n_name STRING," +
65-
"n_regionkey BIGINT," +
66-
"n_comment STRING) " +
67-
"PARTITIONED BY (part_key STRING) " +
68-
"ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'")
69-
.setNoData()
70-
.build();
50+
public static final HiveTableDefinition BUCKETED_PARTITIONED_NATION = bucketTableDefinition("bucket_partitioned_nation", true, true);
51+
52+
@TableDefinitionsRepository.RepositoryTableDefinition
53+
public static final HiveTableDefinition PARTITIONED_NATION = bucketTableDefinition("partitioned_nation", true, false);
54+
55+
private static HiveTableDefinition bucketTableDefinition(String tableName, boolean partitioned, boolean bucketed)
56+
{
57+
return HiveTableDefinition.builder(tableName)
58+
.setCreateTableDDLTemplate("CREATE TABLE %NAME%(" +
59+
"n_nationkey BIGINT," +
60+
"n_name STRING," +
61+
"n_regionkey BIGINT," +
62+
"n_comment STRING) " +
63+
(partitioned ? "PARTITIONED BY (part_key STRING) " : " ") +
64+
"CLUSTERED BY (n_regionkey) " +
65+
(bucketed ? "INTO 2 BUCKETS " : " ") +
66+
"ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'")
67+
.setNoData()
68+
.build();
69+
}
7170

7271
@Override
7372
public Requirement getRequirements(Configuration configuration)
7473
{
7574
return Requirements.compose(
75+
MutableTableRequirement.builder(BUCKETED_NATION).withState(CREATED).build(),
7676
MutableTableRequirement.builder(BUCKETED_PARTITIONED_NATION).withState(CREATED).build(),
7777
immutableTable(NATION));
7878
}
@@ -167,4 +167,17 @@ private static void disableBucketedExecution()
167167
throw new RuntimeException(e);
168168
}
169169
}
170+
171+
@Test
172+
public void testInsertIntoBucketedTables()
173+
{
174+
String tableName = mutableTablesState().get(BUCKETED_NATION).getNameInDatabase();
175+
176+
query(format("INSERT INTO %s SELECT * FROM %s", tableName, NATION.getName()));
177+
// make sure that insert will not overwrite existing data
178+
query(format("INSERT INTO %s SELECT * FROM %s", tableName, NATION.getName()));
179+
180+
assertThat(query(format("SELECT count(*) FROM %s", tableName))).containsExactly(row(50));
181+
assertThat(query(format("SELECT count(*) FROM %s WHERE n_regionkey=0", tableName))).containsExactly(row(10));
182+
}
170183
}

0 commit comments

Comments
 (0)