Description
Environment
Delta-rs version: python-0.16.0+ (example works until 0.15.3, also tested with 0.19.2)
Binding: Python
Environment:
- Cloud provider:
- OS: MacOS Monterey
- Other:
- pyspark: 3.5.0
- pandas: 2.2.2
- pyarrow: 17.0.0
Bug
What happened:
Beginning with version python-0.16.0+ I receive a DeltaTableFeatureException when reading a delta table with a local spark session. This is related to the newly introduced handling of timezones. In 0.15.3 the write operation will create the following protocol {"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
with 0.16.0 and onward we will see {"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["timestampNtz"]}}
. However, when reading the delta table using a local spark session, I will get the following error message:
py4j.protocol.Py4JJavaError: An error occurred while calling o33.load.
: org.apache.spark.sql.delta.DeltaTableFeatureException: [DELTA_FEATURES_PROTOCOL_METADATA_MISMATCH] Unable to operate on this table because the following table features are enabled in metadata but not listed in protocol: invariants.
at org.apache.spark.sql.delta.DeltaErrorsBase.tableFeatureMismatchException(DeltaErrors.scala:2281)
at org.apache.spark.sql.delta.DeltaErrorsBase.tableFeatureMismatchException$(DeltaErrors.scala:2278)
at org.apache.spark.sql.delta.DeltaErrors$.tableFeatureMismatchException(DeltaErrors.scala:3382)
at org.apache.spark.sql.delta.DeltaLog.assertTableFeaturesMatchMetadata(DeltaLog.scala:436)
at org.apache.spark.sql.delta.Snapshot.init(Snapshot.scala:247)
at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:531)
at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$createSnapshot$2(SnapshotManagement.scala:634)
at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment(SnapshotManagement.scala:796)
at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment$(SnapshotManagement.scala:782)
at org.apache.spark.sql.delta.DeltaLog.createSnapshotFromGivenOrEquivalentLogSegment(DeltaLog.scala:74)
at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot(SnapshotManagement.scala:627)
at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot$(SnapshotManagement.scala:618)
at org.apache.spark.sql.delta.DeltaLog.createSnapshot(DeltaLog.scala:74)
at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotForLogSegmentInternal$1(SnapshotManagement.scala:1043)
at scala.Option.map(Option.scala:230)
at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotForLogSegmentInternal(SnapshotManagement.scala:1036)
at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotForLogSegmentInternal$(SnapshotManagement.scala:1031)
at org.apache.spark.sql.delta.DeltaLog.getSnapshotForLogSegmentInternal(DeltaLog.scala:74)
at org.apache.spark.sql.delta.SnapshotManagement.getUpdatedSnapshot(SnapshotManagement.scala:1012)
at org.apache.spark.sql.delta.SnapshotManagement.getUpdatedSnapshot$(SnapshotManagement.scala:1003)
at org.apache.spark.sql.delta.DeltaLog.getUpdatedSnapshot(DeltaLog.scala:74)
at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$2(SnapshotManagement.scala:583)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166)
at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:74)
at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$1(SnapshotManagement.scala:573)
at org.apache.spark.sql.delta.SnapshotManagement.withSnapshotLockInterruptibly(SnapshotManagement.scala:78)
at org.apache.spark.sql.delta.SnapshotManagement.withSnapshotLockInterruptibly$(SnapshotManagement.scala:75)
at org.apache.spark.sql.delta.DeltaLog.withSnapshotLockInterruptibly(DeltaLog.scala:74)
at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:573)
at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit$(SnapshotManagement.scala:572)
at org.apache.spark.sql.delta.DeltaLog.getSnapshotAtInit(DeltaLog.scala:74)
at org.apache.spark.sql.delta.SnapshotManagement.$init$(SnapshotManagement.scala:69)
at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:80)
at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$4(DeltaLog.scala:853)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$3(DeltaLog.scala:848)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166)
at org.apache.spark.sql.delta.DeltaLog$.recordFrameProfile(DeltaLog.scala:651)
at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:136)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:651)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:135)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:125)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:115)
at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:651)
at org.apache.spark.sql.delta.DeltaLog$.createDeltaLog$1(DeltaLog.scala:847)
at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$5(DeltaLog.scala:866)
at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
at org.apache.spark.sql.delta.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:865)
at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:875)
at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:751)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.$anonfun$deltaLog$1(DeltaTableV2.scala:92)
at org.apache.spark.sql.delta.catalog.DeltaTableV2$.withEnrichedUnsupportedTableException(DeltaTableV2.scala:367)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.deltaLog$lzycompute(DeltaTableV2.scala:92)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.deltaLog(DeltaTableV2.scala:90)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.$anonfun$initialSnapshot$4(DeltaTableV2.scala:145)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.$anonfun$initialSnapshot$1(DeltaTableV2.scala:145)
at org.apache.spark.sql.delta.catalog.DeltaTableV2$.withEnrichedUnsupportedTableException(DeltaTableV2.scala:367)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.initialSnapshot$lzycompute(DeltaTableV2.scala:144)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.initialSnapshot(DeltaTableV2.scala:124)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.toBaseRelation$lzycompute(DeltaTableV2.scala:236)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.toBaseRelation(DeltaTableV2.scala:234)
at org.apache.spark.sql.delta.sources.DeltaDataSource.$anonfun$createRelation$5(DeltaDataSource.scala:250)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166)
at org.apache.spark.sql.delta.sources.DeltaDataSource.recordFrameProfile(DeltaDataSource.scala:49)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:209)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Adding 'invariants' to the writerFeatures manually will resolve the issue. So far I haven't been able to find an option to add the feature explicitly using the python bindings. E.g. by providing table configuration like "delta.minReaderVersion" or "delta.writerFeatures" - they seem to be ignored or not correct.
What you expected to happen:
I can write timestamp data with deltalake and read it back in using spark without any additional configuration (0.15.3 behavior).
How to reproduce it:
import pyarrow as pa
from deltalake import write_deltalake
from pandas import DataFrame
from pyspark.sql import SparkSession
delta_table_path = "delta-table"
schema = pa.schema(
[
pa.field("id", pa.int16(), False),
pa.field("values", pa.float64(), False),
pa.field("date", pa.timestamp("us"), True),
]
)
write_deltalake(
delta_table_path,
data=DataFrame({"id": [1, 2], "values": [2, 1], "date": [None, None]}),
schema=schema,
configuration={"delta.minReaderVersion": "1", "delta.minWriterVersion": "2"} # silently ignored
)
spark = (
SparkSession.builder.appName("failing-delta-load")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.jars", "delta-spark_2.12-3.2.0.jar,delta-storage-3.2.0.jar")
.getOrCreate()
)
df = spark.read.format("delta").load(delta_table_path)
for row in df.collect():
print(row)
Thanks for any help or guidance with this.