diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java index 09c2249046f4..0635b56a7fba 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java @@ -49,20 +49,58 @@ public enum CommitStatus { * were attempting to set. This is used as a last resort when we are dealing with exceptions that * may indicate the commit has failed but don't have proof that this is the case. Note that all * the previous locations must also be searched on the chance that a second committer was able to - * successfully commit on top of our commit. + * successfully commit on top of our commit. When the {@code newMetadataLocation} is not in the + * history or the {@code commitStatusSupplier} fails repeatedly the method returns {@link + * CommitStatus#UNKNOWN}, because possible pending retries might still commit the change. * * @param tableOrViewName full name of the Table/View * @param newMetadataLocation the path of the new commit file * @param properties properties for retry * @param commitStatusSupplier check if the latest metadata presents or not using metadata * location for table. - * @return Commit Status of Success, Failure or Unknown + * @return Commit Status of Success or Unknown */ protected CommitStatus checkCommitStatus( String tableOrViewName, String newMetadataLocation, Map properties, Supplier commitStatusSupplier) { + CommitStatus strictStatus = + checkCommitStatusStrict( + tableOrViewName, newMetadataLocation, properties, commitStatusSupplier); + if (strictStatus == CommitStatus.FAILURE) { + LOG.warn( + "Commit status check: Commit to {} of {} unknown, new metadata location is not current " + + "or in history", + tableOrViewName, + newMetadataLocation); + return CommitStatus.UNKNOWN; + } + return strictStatus; + } + + /** + * Attempt to load the content and see if any current or past metadata location matches the one we + * were attempting to set. This is used as a last resort when we are dealing with exceptions that + * may indicate the commit has failed and don't have proof that this is the case, but we can be + * sure that no retry attempts for the commit will be successful later. Note that all the previous + * locations must also be searched on the chance that a second committer was able to successfully + * commit on top of our commit. When the {@code newMetadataLocation} is not in the history the + * method returns {@link CommitStatus#FAILURE}, when the {@code commitStatusSupplier} fails + * repeatedly the method returns {@link CommitStatus#UNKNOWN}. + * + * @param tableOrViewName full name of the Table/View + * @param newMetadataLocation the path of the new commit file + * @param properties properties for retry + * @param commitStatusSupplier check if the latest metadata presents or not using metadata + * location for table. + * @return Commit Status of Success, Failure or Unknown + */ + protected CommitStatus checkCommitStatusStrict( + String tableOrViewName, + String newMetadataLocation, + Map properties, + Supplier commitStatusSupplier) { int maxAttempts = PropertyUtil.propertyAsInt( properties, COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT); @@ -98,11 +136,7 @@ protected CommitStatus checkCommitStatus( newMetadataLocation); status.set(CommitStatus.SUCCESS); } else { - LOG.warn( - "Commit status check: Commit to {} of {} unknown, new metadata location is not current " - + "or in history", - tableOrViewName, - newMetadataLocation); + status.set(CommitStatus.FAILURE); } }); diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index dbab9e813966..9fa52d52ea5d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; +import org.apache.iceberg.BaseMetastoreOperations.CommitStatus; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; @@ -286,20 +287,39 @@ public long newSnapshotId() { * were attempting to set. This is used as a last resort when we are dealing with exceptions that * may indicate the commit has failed but are not proof that this is the case. Past locations must * also be searched on the chance that a second committer was able to successfully commit on top - * of our commit. + * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link + * CommitStatus#UNKNOWN}. * * @param newMetadataLocation the path of the new commit file * @param config metadata to use for configuration - * @return Commit Status of Success, Failure or Unknown + * @return Commit Status of Success, Unknown */ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) { - return CommitStatus.valueOf( - checkCommitStatus( - tableName(), - newMetadataLocation, - config.properties(), - () -> checkCurrentMetadataLocation(newMetadataLocation)) - .name()); + return checkCommitStatus( + tableName(), + newMetadataLocation, + config.properties(), + () -> checkCurrentMetadataLocation(newMetadataLocation)); + } + + /** + * Attempt to load the table and see if any current or past metadata location matches the one we + * were attempting to set. This is used as a last resort when we are dealing with exceptions that + * may indicate the commit has failed but are not proof that this is the case. Past locations must + * also be searched on the chance that a second committer was able to successfully commit on top + * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link + * CommitStatus#FAILURE}. + * + * @param newMetadataLocation the path of the new commit file + * @param config metadata to use for configuration + * @return Commit Status of Success, Failure or Unknown + */ + protected CommitStatus checkCommitStatusStrict(String newMetadataLocation, TableMetadata config) { + return checkCommitStatusStrict( + tableName(), + newMetadataLocation, + config.properties(), + () -> checkCurrentMetadataLocation(newMetadataLocation)); } /** diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 619f20ab87a3..0e801b57e5eb 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -268,16 +268,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { throw e; } catch (Throwable e) { - if (e.getMessage() != null - && e.getMessage() - .contains( - "The table has been modified. The parameter value for key '" - + HiveTableOperations.METADATA_LOCATION_PROP - + "' is")) { - throw new CommitFailedException( - e, "The table %s.%s has been modified concurrently", database, tableName); - } - if (e.getMessage() != null && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) { throw new RuntimeException( @@ -287,15 +277,31 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { e); } - LOG.error( - "Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", - database, - tableName, - e); commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN; - commitStatus = - BaseMetastoreOperations.CommitStatus.valueOf( - checkCommitStatus(newMetadataLocation, metadata).name()); + if (e.getMessage() != null + && e.getMessage() + .contains( + "The table has been modified. The parameter value for key '" + + HiveTableOperations.METADATA_LOCATION_PROP + + "' is")) { + // It's possible the HMS client incorrectly retries a successful operation, due to network + // issue for example, and triggers this exception. So we need double-check to make sure + // this is really a concurrent modification. Hitting this exception means no pending + // requests, if any, can succeed later, so it's safe to check status in strict mode + commitStatus = checkCommitStatusStrict(newMetadataLocation, metadata); + if (commitStatus == BaseMetastoreOperations.CommitStatus.FAILURE) { + throw new CommitFailedException( + e, "The table %s.%s has been modified concurrently", database, tableName); + } + } else { + LOG.error( + "Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", + database, + tableName, + e); + commitStatus = checkCommitStatus(newMetadataLocation, metadata); + } + switch (commitStatus) { case SUCCESS: break; diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java index c750ff4de62e..fe37223423fa 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java @@ -48,7 +48,7 @@ public void beforeAll(ExtensionContext extensionContext) throws Exception { } } - metastore.start(hiveConfWithOverrides); + metastore.start(hiveConfWithOverrides, 5, true); metastoreClient = new HiveMetaStoreClient(hiveConfWithOverrides); if (null != databaseName) { String dbPath = metastore.getDatabasePath(databaseName); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java index d12a8503313b..0ffcb057095f 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java @@ -22,6 +22,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; @@ -63,6 +64,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.hadoop.ConfigProperties; @@ -205,6 +207,37 @@ public static void cleanup() { } } + @Test + public void testMultipleAlterTableForNoLock() throws Exception { + Table table = catalog.loadTable(TABLE_IDENTIFIER); + table.updateProperties().set(TableProperties.HIVE_LOCK_ENABLED, "false").commit(); + spyOps.refresh(); + TableMetadata metadataV3 = spyOps.current(); + AtomicReference alterTableException = new AtomicReference<>(null); + doAnswer( + i -> { + try { + // mock a situation where alter table is unexpectedly invoked more than once + i.callRealMethod(); + return i.callRealMethod(); + } catch (Throwable e) { + alterTableException.compareAndSet(null, e); + throw e; + } + }) + .when(spyClient) + .alter_table_with_environmentContext(anyString(), anyString(), any(), any()); + spyOps.commit(metadataV3, metadataV1); + verify(spyClient, times(1)) + .alter_table_with_environmentContext(anyString(), anyString(), any(), any()); + assertThat(alterTableException) + .as("Expecting to trigger an exception indicating table has been modified") + .hasValueMatching( + t -> + t.getMessage() + .contains("The table has been modified. The parameter value for key '")); + } + @Test public void testLockAcquisitionAtFirstTime() throws TException, InterruptedException { doReturn(acquiredLockResponse).when(spyClient).lock(any()); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java index c141f0cced02..9736b32e8727 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java @@ -153,10 +153,21 @@ public void start(HiveConf conf) { * @param poolSize The number of threads in the executor pool */ public void start(HiveConf conf, int poolSize) { + start(conf, poolSize, false); + } + + /** + * Starts a TestHiveMetastore with a provided connection pool size and HiveConf. + * + * @param conf The hive configuration to use + * @param poolSize The number of threads in the executor pool + * @param directSql Used to turn on directSql + */ + public void start(HiveConf conf, int poolSize, boolean directSql) { try { TServerSocket socket = new TServerSocket(0); int port = socket.getServerSocket().getLocalPort(); - initConf(conf, port); + initConf(conf, port, directSql); this.hiveConf = conf; this.server = newThriftServer(socket, poolSize, hiveConf); @@ -261,11 +272,11 @@ private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf con return new TThreadPoolServer(args); } - private void initConf(HiveConf conf, int port) { + private void initConf(HiveConf conf, int port, boolean directSql) { conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://localhost:" + port); conf.set( HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" + HIVE_LOCAL_DIR.getAbsolutePath()); - conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false"); + conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, String.valueOf(directSql)); conf.set(HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname, "false"); conf.set("iceberg.hive.client-pool-size", "2"); // Setting this to avoid thrift exception during running Iceberg tests outside Iceberg. diff --git a/hive-metastore/src/test/resources/hive-schema-3.1.0.derby.sql b/hive-metastore/src/test/resources/hive-schema-3.1.0.derby.sql index 55097d6639f2..b7b095c81ac1 100644 --- a/hive-metastore/src/test/resources/hive-schema-3.1.0.derby.sql +++ b/hive-metastore/src/test/resources/hive-schema-3.1.0.derby.sql @@ -52,9 +52,9 @@ CREATE TABLE "APP"."DATABASE_PARAMS" ("DB_ID" BIGINT NOT NULL, "PARAM_KEY" VARCH CREATE TABLE "APP"."TBL_COL_PRIVS" ("TBL_COLUMN_GRANT_ID" BIGINT NOT NULL, "COLUMN_NAME" VARCHAR(767), "CREATE_TIME" INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" VARCHAR(128), "TBL_COL_PRIV" VARCHAR(128), "TBL_ID" BIGINT, "AUTHORIZER" VARCHAR(128)); -CREATE TABLE "APP"."SERDE_PARAMS" ("SERDE_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB); +CREATE TABLE "APP"."SERDE_PARAMS" ("SERDE_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" VARCHAR(32672)); -CREATE TABLE "APP"."COLUMNS_V2" ("CD_ID" BIGINT NOT NULL, "COMMENT" VARCHAR(4000), "COLUMN_NAME" VARCHAR(767) NOT NULL, "TYPE_NAME" CLOB, "INTEGER_IDX" INTEGER NOT NULL); +CREATE TABLE "APP"."COLUMNS_V2" ("CD_ID" BIGINT NOT NULL, "COMMENT" VARCHAR(4000), "COLUMN_NAME" VARCHAR(767) NOT NULL, "TYPE_NAME" VARCHAR(32672), "INTEGER_IDX" INTEGER NOT NULL); CREATE TABLE "APP"."SORT_COLS" ("SD_ID" BIGINT NOT NULL, "COLUMN_NAME" VARCHAR(767), "ORDER" INTEGER NOT NULL, "INTEGER_IDX" INTEGER NOT NULL); @@ -130,7 +130,7 @@ CREATE TABLE "APP"."TAB_COL_STATS"( "BIT_VECTOR" BLOB ); -CREATE TABLE "APP"."TABLE_PARAMS" ("TBL_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB); +CREATE TABLE "APP"."TABLE_PARAMS" ("TBL_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" VARCHAR(32672)); CREATE TABLE "APP"."BUCKETING_COLS" ("SD_ID" BIGINT NOT NULL, "BUCKET_COL_NAME" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL); @@ -138,7 +138,7 @@ CREATE TABLE "APP"."TYPE_FIELDS" ("TYPE_NAME" BIGINT NOT NULL, "COMMENT" VARCHAR CREATE TABLE "APP"."NUCLEUS_TABLES" ("CLASS_NAME" VARCHAR(128) NOT NULL, "TABLE_NAME" VARCHAR(128) NOT NULL, "TYPE" VARCHAR(4) NOT NULL, "OWNER" VARCHAR(2) NOT NULL, "VERSION" VARCHAR(20) NOT NULL, "INTERFACE_NAME" VARCHAR(256) DEFAULT NULL); -CREATE TABLE "APP"."SD_PARAMS" ("SD_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB); +CREATE TABLE "APP"."SD_PARAMS" ("SD_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" VARCHAR(32672)); CREATE TABLE "APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID" BIGINT NOT NULL); @@ -218,7 +218,7 @@ CREATE TABLE "APP"."MV_CREATION_METADATA" ( "CAT_NAME" VARCHAR(256) NOT NULL, "DB_NAME" VARCHAR(128) NOT NULL, "TBL_NAME" VARCHAR(256) NOT NULL, - "TXN_LIST" CLOB, + "TXN_LIST" VARCHAR(32672), "MATERIALIZATION_TIME" BIGINT NOT NULL );