-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Core, Hive: Double check commit status in case of commit conflict for NoLock #12637
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
6efb3f8
ed60da3
3fe077a
991a783
bf871bc
f5dae6b
5c5f672
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ | |
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT; | ||
|
||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.Supplier; | ||
import org.apache.iceberg.util.PropertyUtil; | ||
|
@@ -63,6 +64,31 @@ protected CommitStatus checkCommitStatus( | |
String newMetadataLocation, | ||
Map<String, String> properties, | ||
Supplier<Boolean> commitStatusSupplier) { | ||
if (metadataLocationCommitted( | ||
tableOrViewName, newMetadataLocation, properties, commitStatusSupplier) | ||
.orElse(false)) { | ||
return CommitStatus.SUCCESS; | ||
} | ||
return CommitStatus.UNKNOWN; | ||
} | ||
|
||
/** | ||
* Attempt to load the content and see if any current or past metadata location matches the one we | ||
* were attempting to set. | ||
* | ||
* @param tableOrViewName full name of the Table/View | ||
* @param newMetadataLocation the path of the new commit file | ||
* @param properties properties for retry | ||
* @param commitStatusSupplier check if the latest metadata presents or not using metadata | ||
* location for table. | ||
* @return Empty if locations cannot be checked, e.g. unable to refresh. True if the new location | ||
* is committed, false otherwise. | ||
*/ | ||
protected Optional<Boolean> metadataLocationCommitted( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of returning an
And we need to update the javadoc for the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah but I would like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or could we just call the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh yes, good idea! |
||
String tableOrViewName, | ||
String newMetadataLocation, | ||
Map<String, String> properties, | ||
Supplier<Boolean> commitStatusSupplier) { | ||
int maxAttempts = | ||
PropertyUtil.propertyAsInt( | ||
properties, COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT); | ||
|
@@ -78,7 +104,7 @@ protected CommitStatus checkCommitStatus( | |
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS, | ||
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT); | ||
|
||
AtomicReference<CommitStatus> status = new AtomicReference<>(CommitStatus.UNKNOWN); | ||
AtomicReference<Boolean> res = new AtomicReference<>(null); | ||
|
||
Tasks.foreach(newMetadataLocation) | ||
.retry(maxAttempts) | ||
|
@@ -96,23 +122,24 @@ protected CommitStatus checkCommitStatus( | |
"Commit status check: Commit to {} of {} succeeded", | ||
tableOrViewName, | ||
newMetadataLocation); | ||
status.set(CommitStatus.SUCCESS); | ||
res.set(true); | ||
} else { | ||
LOG.warn( | ||
"Commit status check: Commit to {} of {} unknown, new metadata location is not current " | ||
+ "or in history", | ||
tableOrViewName, | ||
newMetadataLocation); | ||
res.set(false); | ||
} | ||
}); | ||
|
||
if (status.get() == CommitStatus.UNKNOWN) { | ||
if (res.get() == null) { | ||
LOG.error( | ||
"Cannot determine commit state to {}. Failed during checking {} times. " | ||
+ "Treating commit state as unknown.", | ||
tableOrViewName, | ||
maxAttempts); | ||
} | ||
return status.get(); | ||
return Optional.ofNullable(res.get()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -268,16 +268,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { | |
throw e; | ||
|
||
} catch (Throwable e) { | ||
if (e.getMessage() != null | ||
&& e.getMessage() | ||
.contains( | ||
"The table has been modified. The parameter value for key '" | ||
+ HiveTableOperations.METADATA_LOCATION_PROP | ||
+ "' is")) { | ||
throw new CommitFailedException( | ||
e, "The table %s.%s has been modified concurrently", database, tableName); | ||
} | ||
|
||
if (e.getMessage() != null | ||
&& e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) { | ||
throw new RuntimeException( | ||
|
@@ -287,15 +277,32 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { | |
e); | ||
} | ||
|
||
LOG.error( | ||
"Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", | ||
database, | ||
tableName, | ||
e); | ||
commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN; | ||
commitStatus = | ||
BaseMetastoreOperations.CommitStatus.valueOf( | ||
checkCommitStatus(newMetadataLocation, metadata).name()); | ||
if (e.getMessage() != null | ||
&& e.getMessage() | ||
pvary marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.contains( | ||
"The table has been modified. The parameter value for key '" | ||
+ HiveTableOperations.METADATA_LOCATION_PROP | ||
+ "' is")) { | ||
// It's possible the HMS client incorrectly retries a successful operation, due to network | ||
// issue for example, and triggers this exception. So we need double-check to | ||
// make sure this is really a concurrent modification | ||
commitStatus = handlePossibleConcurrentModification(newMetadataLocation, metadata); | ||
if (commitStatus == BaseMetastoreOperations.CommitStatus.FAILURE) { | ||
throw new CommitFailedException( | ||
e, "The table %s.%s has been modified concurrently", database, tableName); | ||
} | ||
} else { | ||
LOG.error( | ||
"Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", | ||
database, | ||
tableName, | ||
e); | ||
commitStatus = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need this weird casting? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe it was introduced in #10001 when we had two enums: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I updated to address this issue. At least I think the new |
||
BaseMetastoreOperations.CommitStatus.valueOf( | ||
checkCommitStatus(newMetadataLocation, metadata).name()); | ||
} | ||
|
||
switch (commitStatus) { | ||
case SUCCESS: | ||
break; | ||
|
@@ -324,6 +331,24 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { | |
"Committed to table {} with the new metadata location {}", fullName, newMetadataLocation); | ||
} | ||
|
||
private BaseMetastoreOperations.CommitStatus handlePossibleConcurrentModification( | ||
String newMetadataLocation, TableMetadata metadata) { | ||
Optional<Boolean> locationCommitted = | ||
metadataLocationCommitted( | ||
tableName(), | ||
newMetadataLocation, | ||
metadata.properties(), | ||
() -> checkCurrentMetadataLocation(newMetadataLocation)); | ||
if (locationCommitted.isPresent()) { | ||
if (locationCommitted.get()) { | ||
return BaseMetastoreOperations.CommitStatus.SUCCESS; | ||
} else { | ||
return BaseMetastoreOperations.CommitStatus.FAILURE; | ||
} | ||
} | ||
return BaseMetastoreOperations.CommitStatus.UNKNOWN; | ||
} | ||
|
||
pvary marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private void setHmsTableParameters( | ||
String newMetadataLocation, | ||
Table tbl, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,16 +40,10 @@ private HiveMetastoreExtension(String databaseName, Map<String, String> hiveConf | |
|
||
@Override | ||
public void beforeAll(ExtensionContext extensionContext) throws Exception { | ||
metastore = new TestHiveMetastore(); | ||
HiveConf hiveConfWithOverrides = new HiveConf(TestHiveMetastore.class); | ||
if (hiveConfOverride != null) { | ||
for (Map.Entry<String, String> kv : hiveConfOverride.entrySet()) { | ||
hiveConfWithOverrides.set(kv.getKey(), kv.getValue()); | ||
} | ||
} | ||
metastore = new TestHiveMetastore(hiveConfOverride); | ||
|
||
metastore.start(hiveConfWithOverrides); | ||
metastoreClient = new HiveMetaStoreClient(hiveConfWithOverrides); | ||
metastore.start(new HiveConf(TestHiveMetastore.class)); | ||
metastoreClient = new HiveMetaStoreClient(metastore.hiveConf()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please try to set this globally, and see if there is an issue with it? This is just test code, and I would like to try to keep it as simple as possible. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK I'll try setting the default value to true, if everything is fine, I'll revert this change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Enabling direct SQL globally seems to fix the weird test case. I did some debug and believe it's because direct SQL gets the partition filter pushdown correct and retrieves the corresponding partitions. Should I update the test cases here, or leave it to another PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the Spark issue still persists when the directSql is turned off, so we don't want to change that before talking to the owners of the test. Just create this method, and the old methods should use this:
|
||
if (null != databaseName) { | ||
String dbPath = metastore.getDatabasePath(databaseName); | ||
Database db = new Database(databaseName, "description", dbPath, Maps.newHashMap()); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -130,7 +130,7 @@ CREATE TABLE "APP"."TAB_COL_STATS"( | |
"BIT_VECTOR" BLOB | ||
); | ||
|
||
CREATE TABLE "APP"."TABLE_PARAMS" ("TBL_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to change the other There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add more changes to be consistent with HIVE-16667 and HIVE-25574 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI CLOB handling was fixed in https://github.com/apache/hive/pull/5386/files#diff-bcca13f6cc251df321e8fe80568ef0334a1d44f7e5e7ff2fcaa06ab4f05bbdf9R3387 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good to know the direct SQL is made more robust. Maybe we can revisit this change when we upgrade our hive dependency. |
||
CREATE TABLE "APP"."TABLE_PARAMS" ("TBL_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" VARCHAR(32672)); | ||
|
||
CREATE TABLE "APP"."BUCKETING_COLS" ("SD_ID" BIGINT NOT NULL, "BUCKET_COL_NAME" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL); | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.