- 
                Notifications
    
You must be signed in to change notification settings  - Fork 2.9k
 
Spark: Improve table existence verification logic #14457
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @jerqi !
| 
           Shall we also apply this change in v3.5?  | 
    
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM,
shall we also add to SparkCatalog?
          
 Added.  | 
    
          
 Applied to Spark3.5,Spark3.4.  | 
    
| return true; | ||
| } | ||
| 
               | 
          ||
| // if the original load didn't work, try using the namespace as an identifier because | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept consistent with origin logic. Origin logic will reuse the method loadTable.  Could we simplify the code ? We don't need to consider the cases the identifier may include a snapshot selector or may point to change log. The upper layer usually to verify the table existence before creating ,altering or dropping tables.  The identifiers of altering, creating or dropping tables don't contain snapshot selector or pointer of change log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we reuse the load logic here?
@Override
public boolean tableExists(Identifier ident) {
  if (isPathIdentifier(ident)) {
    try {
      tables.load(((PathIdentifier) ident).location());
      return true;
    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
      return false;
    }
  } else {
    return icebergCatalog.tableExists(buildIdentifier(ident));
  }
}
If this gets too complicated, feel free to ignore my comment and lets just move forward with only the SparkSessionCatalog change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK for me. I can reuse the load logic. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I have modified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be better uses the method exists in the class HadoopTables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding for all the Spark versions. Left a minor comment
| return true; | ||
| } | ||
| 
               | 
          ||
| // if the original load didn't work, try using the namespace as an identifier because | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we reuse the load logic here?
@Override
public boolean tableExists(Identifier ident) {
  if (isPathIdentifier(ident)) {
    try {
      tables.load(((PathIdentifier) ident).location());
      return true;
    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
      return false;
    }
  } else {
    return icebergCatalog.tableExists(buildIdentifier(ident));
  }
}
If this gets too complicated, feel free to ignore my comment and lets just move forward with only the SparkSessionCatalog change
| 
           @singhpk234 could you double check the new logic for SparkCatalog.java?  | 
    
| @Override | ||
| public boolean tableExists(Identifier ident) { | ||
| if (isPathIdentifier(ident)) { | ||
| return tables.exists(((PathIdentifier) ident).location()); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to parse the ident and use the base location?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on the existence semantics.  Do we need to consider the snapshot, change log tables?
There are two solution ways:
- For snapshot, change log tables, we return false directly. Current implementation follows this way.
 - For snapshot, change log tables, we parsed them and verify the existence. I have a previous commit
 
  public boolean tableExists(Identifier ident) {
    try {
      if (isPathIdentifier(ident)) {
        loadFromPathIdentifier((PathIdentifier) ident);
        return true;
      } else {
        boolean isExists = icebergCatalog.tableExists(buildIdentifier(ident));
        if (isExists) {
          return true;
        }
        if (ident.namespace().length == 0) {
          return false;
        }
        // if the original load didn't work, try using the namespace as an identifier because
        // the original identifier may include a snapshot selector or may point to the changelog
        TableIdentifier namespaceAsIdent =
            buildIdentifier(namespaceToIdentifier(ident.namespace()));
        Matcher tag = TAG.matcher(ident.name());
        if (tag.matches()) {
          org.apache.iceberg.Table table = icebergCatalog.loadTable(namespaceAsIdent);
          Snapshot tagSnapshot = table.snapshot(tag.group(1));
          return tagSnapshot != null;
        }
        if (icebergCatalog.tableExists(namespaceAsIdent)) {
          if (ident.name().equalsIgnoreCase(SparkChangelogTable.TABLE_NAME)) {
            return true;
          }
          Matcher at = AT_TIMESTAMP.matcher(ident.name());
          if (at.matches()) {
            return true;
          }
          Matcher id = SNAPSHOT_ID.matcher(ident.name());
          if (id.matches()) {
            return true;
          }
          Matcher branch = BRANCH.matcher(ident.name());
          if (branch.matches()) {
            return true;
          }
          if (ident.name().equalsIgnoreCase(REWRITE)) {
            return true;
          }
        }
        return false;
      }
    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
      return false;
    }
  }
The way 1 is more clear.
They way 2 follows the origin code semantics.
Do u have any suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree way 1 is clearer.
Default implementation used the load the table to judge the table exists. It' better to use the method
tableExistsin the classSparkCatalogandSessionCatalogdirectly . Because these classes may have more effective implementation likeRESTCatalog.