Skip to content

Conversation

@rdblue
Copy link

@rdblue rdblue commented Sep 19, 2021

This makes an update I requested to the Spark 2 reader. It doesn't update Spark 3 because that refactor is much larger. I'll open a PR for that once apache#1508 is in.

wypoon and others added 9 commits September 6, 2021 22:00
According to Edwin Choi, in order to get the schema for a snapshot,
the only safe option is to scan the metadata files to find the one
where the current-snapshot-id matches target snapshot id.
The changes are mostly in spark3. They are necessitated by the catalog
support introduced in apache#1783.
As the spark3 IcebergSource now implements SupportsCatalogOptions,
DataFrameReader#load no longer calls IcebergSource#getTable but calls
SparkCatalog#loadTable directly. In order for the SparkTable returned by
SparkCatalog#loadTable(Identifier) to be aware of the snapshot, the
information about the snapshot needs to be present in the Identifier.
For this reason, we introduce a SnapshotAwareIdentifier interface
extending Identifier.
As SupportsCatalogOptions does not allow a schema to be specified
(requested), SparkTable no longer needs a requestedSchema field, so
some dead code is removed from it.
Rebased on master.
Use constants from SparkReadOptions.
Implement snapshotSchema() in SparkFilesScan as it extends SparkBatchScan.
Avoid introducing new methods to BaseTable.
Add helper methods to SnapshotUtil instead.
Move recovery of schema from previous metadata files in the event
that snapshot does not have associated schema id to new PR.
Remove snapshotSchema method from SparkBatchScam and its subclasses,
as it is not needed.
Adjust schema in BaseTableScan when useSnapshot is called.
Use the existing CatalogAndIdentifier and swap out the Identifier for a
snapshot-aware TableIdentifier if snapshotId or asOfTimestamp is set.
@github-actions github-actions bot added the SPARK label Sep 19, 2021
@wypoon
Copy link
Owner

wypoon commented Sep 20, 2021

@rdblue there are 4 unit test failures in the spark2 module with this change:
TestRemoveOrphanFilesAction24.testAllValidFilesAreKept:

org.apache.spark.sql.AnalysisException: cannot resolve '`file_path`' given input columns: [c1, c2, c3];;
'Project ['file_path]
+- RelationV2 iceberg[c1#151, c2#152, c3#153] (Options: [snapshot-id=5825767402861535956,path=file:/var/folders/7x/vgfk2h155v9gjtw55z6ncgc00000gp/T/junit...)

...
	at org.apache.iceberg.actions.TestRemoveOrphanFilesAction.snapshotFiles(TestRemoveOrphanFilesAction.java:478)

TestDataSourceOptions24.testIncrementalScanOptions:

java.lang.AssertionError: [Check both start-snapshot-id and snapshot-id are configured] 
Expecting actual throwable to be an instance of:
  java.lang.IllegalArgumentException
but was:
  java.lang.IllegalStateException: Cannot enable incremental scan, scan-snapshot set to id=2818097334335637343
	at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkState(Preconditions.java:589)
	at org.apache.iceberg.DataTableScan.appendsBetween(DataTableScan.java:51)
...
	at org.apache.iceberg.spark.source.TestDataSourceOptions.testIncrementalScanOptions(TestDataSourceOptions.java:244)

TestIcebergSourceHadoopTables24.testPartitionsTable and TestIcebergSourceHiveTables24.testPartitionsTable:

org.apache.spark.sql.AnalysisException: cannot resolve '`partition.id`' given input columns: [id, data];;
'Sort ['partition.id ASC NULLS FIRST], true
+- RelationV2 iceberg[id#2841, data#2842] (Options: [snapshot-id=6214857628483147750,path=db.partitions_test.partitions,paths=[]])

...
	at org.apache.iceberg.spark.source.TestIcebergSourceTablesBase.testPartitionsTable(TestIcebergSourceTablesBase.java:1103)

Comment on lines -122 to -132
this.startSnapshotId = options.get("start-snapshot-id").map(Long::parseLong).orElse(null);
this.endSnapshotId = options.get("end-snapshot-id").map(Long::parseLong).orElse(null);
if (snapshotId != null || asOfTimestamp != null) {
if (startSnapshotId != null || endSnapshotId != null) {
throw new IllegalArgumentException(
"Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id or " +
"as-of-timestamp is specified");
}
} else {
if (startSnapshotId == null && endSnapshotId != null) {
throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan");
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These checks are tested in TestDataSourceOptions.testIncrementalScanOptions. We need to retain them (somewhere).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are checked in the table scan itself rather than in multiple places. I think we just need to update the tests to match the error messages from the scan.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us move this cleanup (of checks in multiple places) to a separate PR after apache#1508 is merged. It needs to be done for spark3 as well, and is really orthogonal to the main issue.

@wypoon
Copy link
Owner

wypoon commented Sep 21, 2021

@rdblue thanks for the PR showing me what you had in mind. I won't merge it but will incorporate it into my next update of apache#1508.
Your change exposed a bug in my last update where I changed BaseTableScan#useSnapshot to use SnapshotUtil.schemaFor(table, scanSnapshotId) instead of the schema in the BaseTableScan. We should stick with using the schema of the BaseTableScan if we're scanning a metadata table (such as partitions or files). I'll fix that in my next update.
Putting all the setup for the base scan in the constructor of Reader plus putting back the validation of the scan options made the cyclomatic complexity too high, so I have the refactor your change.

@wypoon wypoon force-pushed the schema-for-snapshot branch from 1390ce5 to 0ef74ff Compare September 28, 2021 23:07
@wypoon wypoon force-pushed the schema-for-snapshot branch 3 times, most recently from 290c4a0 to afec25c Compare October 21, 2021 19:51
wypoon pushed a commit that referenced this pull request Jan 26, 2023
…flake-managed Iceberg tables (apache#6428)

* Initial read-only Snowflake Catalog implementation by @sfc-gh-mparmar (#1)

Initial read-only Snowflake Catalog implementation built on top of the Snowflake JDBC driver,
providing support for basic listing of namespaces, listing of tables, and loading/reads of tables.

Auth options are passthrough to the JDBC driver.

Co-authored-by: Maninder Parmar <[email protected]>
Co-authored-by: Maninder Parmar <[email protected]>
Co-authored-by: Dennis Huo <[email protected]>

* Add JdbcSnowflakeClientTest using mocks (apache#2)

Add JdbcSnowflakeClientTest using mocks; provides full coverage of JdbcSnowflakeClient
and entities' ResultSetHandler logic.

Also update target Spark runtime versions to be included.

* Add test { useJUnitPlatform() } tuple to iceberg-snowflake for
consistency and future interoperability with inheriting from abstact
unittest base classes.

* Extract versions into versions.props per PR review

* Misc test-related refactors per review suggestions
-Convert unittests to all use assertj/Assertions for "fluent assertions"
-Refactor test injection into overloaded initialize() method
-Add test cases for close() propagation
-Use CloseableGroup.

* Fix unsupported behaviors of loadNamedpaceMetadata and defaultWarehouseLocation

* Move TableIdentifier checks out of newTableOps into the
SnowflakTableOperations class itself, add test case.

* Refactor out any Namespace-related business logic from the lower
SnowflakeClient/JdbcSnowflakeClient layers and merge SnowflakeTable
and SnowflakeSchema into a single SnowflakeIdentifier that also
encompasses ROOT and DATABASE level identifiers.

A SnowflakeIdentifier thus functions like a type-checked/constrained
Iceberg TableIdentifier, and eliminates any tight coupling between
a SnowflakeClient and Catalog business logic.

Parsing of Namespace numerical levels into a SnowflakeIdentifier
is now fully encapsulated in NamespaceHelpers so that callsites
don't duplicate namespace-handling/validation logic.

* Finish migrating JdbcSnowflakeClientTest off any usage of org.junit.Assert
in favor of assertj's Assertions.

* Style refactorings from review comments, expanded and moved InMemoryFileIO into core
with its own unittest.

* Fix behavior of getNamespaceMetadata to throw when the namespace doesn't
exist.

Refactor for naming conventions and consolidating identifier
handling into NamespaceHandlers.

Make FileIO instantiated fresh for each newTableOps call.

* Move private constructor to top, add assertion to test case.

* Define minimal ResultSetParser/QueryHarness classes to fully replace
any use of commons-dbutils; refactor ResultSet handling fully into
JdbcSnowflakeClient.java.

* Update snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableOperations.java

Co-authored-by: Eduard Tudenhöfner <[email protected]>

* Refactor style suggestions; remove debug-level logging, arguments in exceptions,
private members if not accessed outside, move precondition checks, add test for
NamespaceHelpers.

* Fix precondition messages, remove getConf()

* Clean up varargs.

* Make data members final, include rawJsonVal in toString for debuggability.

* Combine some small test cases into roundtrip test cases, misc cleanup

* Add comment for why a factory class is exposed for testing purposes.

Co-authored-by: Dennis Huo <[email protected]>
Co-authored-by: Maninder Parmar <[email protected]>
Co-authored-by: Maninder Parmar <[email protected]>
Co-authored-by: Eduard Tudenhöfner <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants