Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1749830855992,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[\"part1\"]","clusterBy":"[]","description":null,"isManaged":"false","properties":"{}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/4.0.0 Delta-Lake/4.0.0","txnId":"d108f896-9662-4eda-b4de-444a99850aa8"}}
{"metaData":{"id":"64dcd182-b3b4-4ee0-88e0-63c159a4121c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"part1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["part1"],"configuration":{},"createdTime":1749830855646}}
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["catalogOwned-preview"],"writerFeatures":["catalogOwned-preview","inCommitTimestamp","invariants","appendOnly"]}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1749830871084,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"889"},"engineInfo":"Apache-Spark/4.0.0 Delta-Lake/4.0.0","txnId":"4cb9708e-b478-44de-b203-53f9ba9b2876"}}
{"add":{"path":"part1=0/part-00000-13fefaba-8ec2-4762-b17e-aeda657451c5.c000.snappy.parquet","partitionValues":{"part1":"0"},"size":889,"modificationTime":1749830870833,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"col1\":0},\"maxValues\":{\"col1\":99},\"nullCount\":{\"col1\":0}}"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1749830881798,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"891"},"engineInfo":"Apache-Spark/4.0.0 Delta-Lake/4.0.0","txnId":"5b9bba4a-0085-430d-a65e-b0d38c1afbe9"}}
{"add":{"path":"part1=1/part-00000-8afb1c56-2018-4af2-aa4f-4336c1b39efd.c000.snappy.parquet","partitionValues":{"part1":"1"},"size":891,"modificationTime":1749830881779,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"col1\":100},\"maxValues\":{\"col1\":199},\"nullCount\":{\"col1\":0}}"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Below are the commands and instructions to create the `catalog-owned-preview` table.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks for adding this!

# Note that delta-spark:4.0.0 does not yet support *creating* catalogManaged tables.
# So, for now, we create a normal table with ICT enabled and then
# (a) manually add the `catalogOwned-preview`
# (b) manually move and rename the published delta files into the _staged_commits directory.

pyspark --packages io.delta:delta-spark_2.13:4.0.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

table_path = <table_path>

# Commit 0: Create the table

spark.sql(f"""
CREATE TABLE delta.`{table_path}` (
part1 INT,
col1 INT
)
USING DELTA
PARTITIONED BY (part1)
""")

# Commit 1: Insert 100 rows into part1=0

spark.sql(f"""
INSERT INTO delta.`{table_path}`
SELECT
col1 DIV 100 as part1,
col1
FROM (
SELECT explode(sequence(0, 99)) as col1
)
""")

# Commit 2: Insert 100 rows into part1=1

spark.sql(f"""
INSERT INTO delta.`{table_path}`
SELECT
col1 DIV 100 as part1,
col1
FROM (
SELECT explode(sequence(100, 199)) as col1
)
""")

# Then, add `"readerFeatures":["catalogOwned-preview"]` to the _delta_log/001.json protocol
# Then, for commits version $v in [1, 2] move _delta_log/$v.json into
# _delta_log/_staged_commits/$v.$uuid.json, where $uuid is taken from the commitInfo.txnId in
# $v.json
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.defaults.catalogManaged

import scala.collection.JavaConverters._

import io.delta.kernel.TableManager
import io.delta.kernel.defaults.test.ResolvedTableAdapterImplicits._
import io.delta.kernel.defaults.utils.{TestRow, TestUtilsWithTableManagerAPIs}
import io.delta.kernel.internal.files.ParsedLogData
import io.delta.kernel.internal.table.ResolvedTableInternal
import io.delta.kernel.internal.tablefeatures.TableFeatures.{CATALOG_MANAGED_R_W_FEATURE_PREVIEW, TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION}
import io.delta.kernel.utils.FileStatus

import org.scalatest.funsuite.AnyFunSuite

/**
* Test suite for end-to-end reads of catalog-managed Delta tables.
*
* The goal of this suite is to simulate how a real "Catalog-Managed-Client" would read a
* catalog-managed Delta table, without introducing a full, or even partial (e.g. in-memory)
* catalog client implementation.
*
* The catalog boundary is simulated by tests manually providing [[ParsedLogData]]. For example,
* there can be X commits in the _staged_commits directory, and a given test can decide that Y
* commits (subset of X) are in fact "ratified". The test can then turn those commits into
* [[ParsedLogData]] and inject them into the [[io.delta.kernel.ResolvedTableBuilder]]. This is,
* in essence, doing exactly what we would expect a "Catalog-Managed-Client" to do.
*/
class CatalogManagedE2EReadSuite extends AnyFunSuite with TestUtilsWithTableManagerAPIs {

test("simple e2e read of catalogOwned-preview table with staged ratified commits") {
val tablePath = getTestResourceFilePath("catalog-owned-preview")

// Note: We need to *resolve* each test resource file path, because the table root file path
// will itself be resolved when we create a ResolvedTable. If we resolved some paths but
// not others, we would get an error like `File <commit-file> doesn't belong in the
// transaction log at <log-path>`.

val parsedLogData = Seq(
// scalastyle:off line.size.limit
getTestResourceFilePath("catalog-owned-preview/_delta_log/_staged_commits/00000000000000000001.4cb9708e-b478-44de-b203-53f9ba9b2876.json"),
getTestResourceFilePath("catalog-owned-preview/_delta_log/_staged_commits/00000000000000000002.5b9bba4a-0085-430d-a65e-b0d38c1afbe9.json"))
// scalastyle:on line.size.limit
.map { path => defaultEngine.getFileSystemClient.resolvePath(path) }
.map { p => FileStatus.of(p) }
.map { fs => ParsedLogData.forFileStatus(fs) }
.toList
.asJava

val resolvedTable = TableManager
.loadTable(tablePath)
.atVersion(2)
.withLogData(parsedLogData)
.build(defaultEngine)
.asInstanceOf[ResolvedTableInternal]

assert(resolvedTable.getVersion === 2)
assert(resolvedTable.getLogSegment.getDeltas.size() === 3)

val protocol = resolvedTable.getProtocol
assert(protocol.getMinReaderVersion == TABLE_FEATURES_MIN_READER_VERSION)
assert(protocol.getMinWriterVersion == TABLE_FEATURES_MIN_WRITER_VERSION)
assert(protocol.getReaderFeatures.contains(CATALOG_MANAGED_R_W_FEATURE_PREVIEW.featureName()))
assert(protocol.getWriterFeatures.contains(CATALOG_MANAGED_R_W_FEATURE_PREVIEW.featureName()))

val actualResult = readResolvedTableAdapter(resolvedTable.toTestAdapter)
val expectedResult = (0 to 199).map { x => TestRow(x / 100, x) }
checkAnswer(actualResult, expectedResult)
}
}
Loading