Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core, Hive: Double check commit status in case of commit conflict for NoLock #12637

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,58 @@ public enum CommitStatus {
* were attempting to set. This is used as a last resort when we are dealing with exceptions that
* may indicate the commit has failed but don't have proof that this is the case. Note that all
* the previous locations must also be searched on the chance that a second committer was able to
* successfully commit on top of our commit.
* successfully commit on top of our commit. When the {@code newMetadataLocation} is not in the
* history or the {@code commitStatusSupplier} fails repeatedly the method returns {@link
* CommitStatus#UNKNOWN}, because possible pending retries might still commit the change.
*
* @param tableOrViewName full name of the Table/View
* @param newMetadataLocation the path of the new commit file
* @param properties properties for retry
* @param commitStatusSupplier check if the latest metadata presents or not using metadata
* location for table.
* @return Commit Status of Success, Failure or Unknown
* @return Commit Status of Success or Unknown
*/
protected CommitStatus checkCommitStatus(
String tableOrViewName,
String newMetadataLocation,
Map<String, String> properties,
Supplier<Boolean> commitStatusSupplier) {
CommitStatus strictStatus =
checkCommitStatusStrict(
tableOrViewName, newMetadataLocation, properties, commitStatusSupplier);
if (strictStatus == CommitStatus.FAILURE) {
LOG.warn(
"Commit status check: Commit to {} of {} unknown, new metadata location is not current "
+ "or in history",
tableOrViewName,
newMetadataLocation);
return CommitStatus.UNKNOWN;
}
return strictStatus;
}

/**
* Attempt to load the content and see if any current or past metadata location matches the one we
* were attempting to set. This is used as a last resort when we are dealing with exceptions that
* may indicate the commit has failed and don't have proof that this is the case, but we can be
* sure that no retry attempts for the commit will be successful later. Note that all the previous
* locations must also be searched on the chance that a second committer was able to successfully
* commit on top of our commit. When the {@code newMetadataLocation} is not in the history the
* method returns {@link CommitStatus#FAILURE}, when the {@code commitStatusSupplier} fails
* repeatedly the method returns {@link CommitStatus#UNKNOWN}.
*
* @param tableOrViewName full name of the Table/View
* @param newMetadataLocation the path of the new commit file
* @param properties properties for retry
* @param commitStatusSupplier check if the latest metadata presents or not using metadata
* location for table.
* @return Commit Status of Success, Failure or Unknown
*/
protected CommitStatus checkCommitStatusStrict(
String tableOrViewName,
String newMetadataLocation,
Map<String, String> properties,
Supplier<Boolean> commitStatusSupplier) {
int maxAttempts =
PropertyUtil.propertyAsInt(
properties, COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT);
Expand Down Expand Up @@ -98,11 +136,7 @@ protected CommitStatus checkCommitStatus(
newMetadataLocation);
status.set(CommitStatus.SUCCESS);
} else {
LOG.warn(
"Commit status check: Commit to {} of {} unknown, new metadata location is not current "
+ "or in history",
tableOrViewName,
newMetadataLocation);
status.set(CommitStatus.FAILURE);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.iceberg.BaseMetastoreOperations.CommitStatus;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
Expand Down Expand Up @@ -286,11 +287,12 @@ public long newSnapshotId() {
* were attempting to set. This is used as a last resort when we are dealing with exceptions that
* may indicate the commit has failed but are not proof that this is the case. Past locations must
* also be searched on the chance that a second committer was able to successfully commit on top
* of our commit.
* of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link
* CommitStatus#UNKNOWN}.
*
* @param newMetadataLocation the path of the new commit file
* @param config metadata to use for configuration
* @return Commit Status of Success, Failure or Unknown
* @return Commit Status of Success, Unknown
*/
protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) {
return CommitStatus.valueOf(
Expand All @@ -302,6 +304,28 @@ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetada
.name());
}

/**
* Attempt to load the table and see if any current or past metadata location matches the one we
* were attempting to set. This is used as a last resort when we are dealing with exceptions that
* may indicate the commit has failed but are not proof that this is the case. Past locations must
* also be searched on the chance that a second committer was able to successfully commit on top
* of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link
* CommitStatus#FAILURE}.
*
* @param newMetadataLocation the path of the new commit file
* @param config metadata to use for configuration
* @return Commit Status of Success, Failure or Unknown
*/
protected CommitStatus checkCommitStatusStrict(String newMetadataLocation, TableMetadata config) {
return CommitStatus.valueOf(
checkCommitStatusStrict(
tableName(),
newMetadataLocation,
config.properties(),
() -> checkCurrentMetadataLocation(newMetadataLocation))
.name());
}

/**
* Validate if the new metadata location is the current metadata location or present within
* previous metadata files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,16 +268,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
throw e;

} catch (Throwable e) {
if (e.getMessage() != null
&& e.getMessage()
.contains(
"The table has been modified. The parameter value for key '"
+ HiveTableOperations.METADATA_LOCATION_PROP
+ "' is")) {
throw new CommitFailedException(
e, "The table %s.%s has been modified concurrently", database, tableName);
}

if (e.getMessage() != null
&& e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
throw new RuntimeException(
Expand All @@ -287,15 +277,33 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
e);
}

LOG.error(
"Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.",
database,
tableName,
e);
commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN;
commitStatus =
BaseMetastoreOperations.CommitStatus.valueOf(
checkCommitStatus(newMetadataLocation, metadata).name());
if (e.getMessage() != null
&& e.getMessage()
.contains(
"The table has been modified. The parameter value for key '"
+ HiveTableOperations.METADATA_LOCATION_PROP
+ "' is")) {
// It's possible the HMS client incorrectly retries a successful operation, due to network
// issue for example, and triggers this exception. So we need double-check to make sure
// this is really a concurrent modification. Hitting this exception means no pending
// requests, if any, can succeed later, so it's safe to check status in strict mode
commitStatus = checkCommitStatusStrict(newMetadataLocation, metadata);
if (commitStatus == BaseMetastoreOperations.CommitStatus.FAILURE) {
throw new CommitFailedException(
e, "The table %s.%s has been modified concurrently", database, tableName);
}
} else {
LOG.error(
"Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.",
database,
tableName,
e);
commitStatus =
Copy link
Member

@deniskuzZ deniskuzZ Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this weird casting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it was introduced in #10001 when we had two enums: BaseMetastoreTableOperations::CommitStatus and BaseMetastoreOperations::CommitStatus, and is not needed anymore. I'd be happy to remove it in this PR if you think that's better.

BaseMetastoreOperations.CommitStatus.valueOf(
checkCommitStatus(newMetadataLocation, metadata).name());
}

switch (commitStatus) {
case SUCCESS:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,10 @@ private HiveMetastoreExtension(String databaseName, Map<String, String> hiveConf

@Override
public void beforeAll(ExtensionContext extensionContext) throws Exception {
metastore = new TestHiveMetastore();
HiveConf hiveConfWithOverrides = new HiveConf(TestHiveMetastore.class);
if (hiveConfOverride != null) {
for (Map.Entry<String, String> kv : hiveConfOverride.entrySet()) {
hiveConfWithOverrides.set(kv.getKey(), kv.getValue());
}
}
metastore = new TestHiveMetastore(hiveConfOverride);

metastore.start(hiveConfWithOverrides);
metastoreClient = new HiveMetaStoreClient(hiveConfWithOverrides);
metastore.start(new HiveConf(TestHiveMetastore.class));
metastoreClient = new HiveMetaStoreClient(metastore.hiveConf());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

METASTORE_TRY_DIRECT_SQL is hard coded to false in TestHiveMetastore::initConf. This change is to make it possible to override the config in our test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please try to set this globally, and see if there is an issue with it? This is just test code, and I would like to try to keep it as simple as possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I'll try setting the default value to true, if everything is fine, I'll revert this change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enabling direct SQL globally seems to fix the weird test case. I did some debug and believe it's because direct SQL gets the partition filter pushdown correct and retrieves the corresponding partitions. Should I update the test cases here, or leave it to another PR?

Copy link
Contributor

@pvary pvary Mar 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the Spark issue still persists when the directSql is turned off, so we don't want to change that before talking to the owners of the test.

Just create this method, and the old methods should use this:

  /**
   * Starts a TestHiveMetastore with a provided connection pool size and HiveConf.
   *
   * @param conf The hive configuration to use
   * @param poolSize The number of threads in the executor pool
   * @param directSql Used to turn on directSql
   */
  public void start(HiveConf conf, int poolSize, boolean directSql) {
[..]
     initConf(conf, port, directSql);
[..]
  }

  /**
   * Starts a TestHiveMetastore with a provided connection pool size and HiveConf.
   *
   * @param conf The hive configuration to use
   * @param poolSize The number of threads in the executor pool
   */
  public void start(HiveConf conf, int poolSize) {
    start(conf, poolSize, false);
  }

if (null != databaseName) {
String dbPath = metastore.getDatabasePath(databaseName);
Database db = new Database(databaseName, "description", dbPath, Maps.newHashMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -63,6 +64,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.hadoop.ConfigProperties;
Expand Down Expand Up @@ -109,7 +111,12 @@ public class TestHiveCommitLocks {
private static final HiveMetastoreExtension HIVE_METASTORE_EXTENSION =
HiveMetastoreExtension.builder()
.withDatabase(DB_NAME)
.withConfig(ImmutableMap.of(HiveConf.ConfVars.HIVE_TXN_TIMEOUT.varname, "1s"))
.withConfig(
ImmutableMap.of(
HiveConf.ConfVars.HIVE_TXN_TIMEOUT.varname,
"1s",
HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname,
"true"))
.build();

private static HiveCatalog catalog;
Expand Down Expand Up @@ -205,6 +212,37 @@ public static void cleanup() {
}
}

@Test
public void testMultipleAlterTableForNoLock() throws Exception {
Table table = catalog.loadTable(TABLE_IDENTIFIER);
table.updateProperties().set(TableProperties.HIVE_LOCK_ENABLED, "false").commit();
spyOps.refresh();
TableMetadata metadataV3 = spyOps.current();
AtomicReference<Throwable> alterTableException = new AtomicReference<>(null);
doAnswer(
i -> {
try {
// mock a situation where alter table is unexpectedly invoked more than once
i.callRealMethod();
return i.callRealMethod();
} catch (Throwable e) {
alterTableException.compareAndSet(null, e);
throw e;
}
})
.when(spyClient)
.alter_table_with_environmentContext(anyString(), anyString(), any(), any());
spyOps.commit(metadataV3, metadataV1);
verify(spyClient, times(1))
.alter_table_with_environmentContext(anyString(), anyString(), any(), any());
assertThat(alterTableException)
.as("Expecting to trigger an exception indicating table has been modified")
.hasValueMatching(
t ->
t.getMessage()
.contains("The table has been modified. The parameter value for key '"));
}

@Test
public void testLockAcquisitionAtFirstTime() throws TException, InterruptedException {
doReturn(acquiredLockResponse).when(spyClient).lock(any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -128,6 +129,15 @@ public class TestHiveMetastore {
private TServer server;
private HiveMetaStore.HMSHandler baseHandler;
private HiveClientPool clientPool;
private final Map<String, String> hiveConfOverride;

public TestHiveMetastore() {
this(null);
}

public TestHiveMetastore(Map<String, String> hiveConfOverride) {
this.hiveConfOverride = hiveConfOverride;
}

/**
* Starts a TestHiveMetastore with the default connection pool size (5) and the default HiveConf.
Expand Down Expand Up @@ -265,12 +275,17 @@ private void initConf(HiveConf conf, int port) {
conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://localhost:" + port);
conf.set(
HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" + HIVE_LOCAL_DIR.getAbsolutePath());
conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false");
conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "true");
conf.set(HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname, "false");
conf.set("iceberg.hive.client-pool-size", "2");
// Setting this to avoid thrift exception during running Iceberg tests outside Iceberg.
conf.set(
HiveConf.ConfVars.HIVE_IN_TEST.varname, HiveConf.ConfVars.HIVE_IN_TEST.getDefaultValue());
if (hiveConfOverride != null) {
for (Map.Entry<String, String> kv : hiveConfOverride.entrySet()) {
conf.set(kv.getKey(), kv.getValue());
}
}
}

private static void setupMetastoreDB(String dbURL) throws SQLException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ CREATE TABLE "APP"."TAB_COL_STATS"(
"BIT_VECTOR" BLOB
);

CREATE TABLE "APP"."TABLE_PARAMS" ("TBL_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to change the other CLOBs in this file as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more changes to be consistent with HIVE-16667 and HIVE-25574

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know the direct SQL is made more robust. Maybe we can revisit this change when we upgrade our hive dependency.

CREATE TABLE "APP"."TABLE_PARAMS" ("TBL_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" VARCHAR(32672));

CREATE TABLE "APP"."BUCKETING_COLS" ("SD_ID" BIGINT NOT NULL, "BUCKET_COL_NAME" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL);

Expand Down
Loading