Skip to content

Conversation

vaibhavk1992
Copy link
Contributor

Important Read

  • Please ensure the GitHub issue is mentioned at the beginning of the PR

What is the purpose of the pull request

(For example: This pull request implements the sync for delta format.)

Brief change log

(for example:)

  • Fixed JSON parsing error when persisting state
  • Added unit tests for schema evolution

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added TestConversionController to verify the change.
  • Manually verified the change by running a job locally.

@vaibhavk1992 vaibhavk1992 marked this pull request as draft June 30, 2025 10:42
@vaibhavk1992 vaibhavk1992 marked this pull request as ready for review June 30, 2025 15:40
<module>xtable-aws</module>
<module>xtable-hive-metastore</module>
<module>xtable-service</module>
<!-- <module>xtable-service</module>-->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be added back, any reason why you had to comment this out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, xtable-service was giving build failures. As xtable-service is independent of the delta kernel changes. Can we just review the delta kernel changes as of now? I just want to get my changes validated once and anyhow the final version of delta kernel changes would have xtable-service module too.

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vaibhavk1992 I think if you rebase with latest main branch you shouldn't see those failures.

<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-kernel-api</artifactId>
<version>4.0.0</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a properly in the root pom called <delta.kernel.version>4.0.0</delta.kernel.version>, instead of using the hardcoded value?

Also curious how you ended up choosing delta kernel version, is there some specific version that needs to align with delta lake version we have in the repo?

public class DeltaKernelConversionSourceProvider extends ConversionSourceProvider<Long> {
@Override
public DeltaKernelConversionSource getConversionSourceInstance(SourceTable sourceTable) {
Configuration hadoopConf = new Configuration();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason why you are creating a new hadoopConf, can you instead use the hadoopConf from the parent class similar to what DeltaConversionSourceProvider does.

return INSTANCE;
}

public InternalSchema toInternalSchema_v2(StructType structType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just call this toInternalSchema, since its in its own distinct class right?

// Get schema from Delta Kernel's snapshot
io.delta.kernel.types.StructType schema = snapshot.getSchema();

System.out.println("Kernelschema: " + schema);
Copy link
Contributor

@rahil-c rahil-c Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] well need to remove these in final version of the pr.

* Converts between Delta and InternalTable schemas. Some items to be aware of:
*
* <ul>
* <li>Delta schemas are represented as Spark StructTypes which do not have enums so the enum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can live this file as is right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vaibhavk1992 let's remove the changes to this file. They don't seem necessary

import org.apache.xtable.spi.extractor.ConversionSource;

@Builder
public class DeltaKernelConversionSource implements ConversionSource<Long> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need full implementation of all the interface methods, otherwise this will fail during the table format sync. Can you refer to the impl for DeltaConversionSource for these methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the methods have been added, just the commit backlog one is not fully resolved.

<module>xtable-aws</module>
<module>xtable-hive-metastore</module>
<module>xtable-service</module>
<!-- <module>xtable-service</module>-->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why comment this?

# limitations under the License.
#
junit.jupiter.execution.parallel.enabled=true
junit.jupiter.execution.parallel.enabled=false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have run the tests locally by setting this to true and they pass, can we revert this config and see the GH build?

Copy link
Contributor

@vinishjail97 vinishjail97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great progress @vaibhavk1992, added some comments.

<module>xtable-aws</module>
<module>xtable-hive-metastore</module>
<module>xtable-service</module>
<!-- <module>xtable-service</module>-->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vaibhavk1992 I think if you rebase with latest main branch you shouldn't see those failures.

</executions>
<configuration>
<skip>${skipUTs}</skip>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why revert this?

-
tableBasePath: /Desktop/opensource/iceberg/warehouse/demo/nyc/taxis
tableDataPath: /Desktop/opensource/iceberg/warehouse/demo/nyc/taxis/data
tableBasePath: /Users/vaibhakumar/Desktop/opensource/iceberg/warehouse/demo/nyc/taxis
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one can be reverted too?

* @param versionToStartFrom The version to start from.
*/
@Builder
public DeltaKernelIncrementalChangesState(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinishjail97 This is where we need the implementation to happen for response conversions from kernel


long versionNumberAtLastSyncInstant = snapshot.getVersion();
System.out.println("versionNumberAtLastSyncInstant: " + versionNumberAtLastSyncInstant);
// resetState(0, engine,table);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the place which I have commented @vinishjail97 , call is happening inside this method

@the-other-tim-brown
Copy link
Contributor

@vaibhavk1992 can you write up a summary of next steps and blockers for this feature?

@vaibhavk1992
Copy link
Contributor Author

Below is the summary of the difference between two schemes (Delta vs Kernel) also added the what remains the difference between two.

Comparison of Schema Responses: Delta Kernel vs Delta Log

This document outlines the differences in schema responses when using Delta Kernel and Delta Log APIs to retrieve changes in a Delta table. The comparison highlights the structure and format of the responses, providing insights into how the two approaches differ.

Delta Kernel Schema Response

When using the DeltaKernelIncrementalChangesState class to retrieve changes, the response is in the form of a row of columnar batch type. Each row is represented as an object of the io.delta.kernel.data.Row interface, which provides methods to access individual fields. The response is minimalistic and focuses on the raw data representation.

Sample Output

1 row is an object ==> io.delta.kernel.internal.data.ColumnarBatchRow@20c03e47

Key Characteristics
  1. Row Representation: Each row is an instance of ColumnarBatchRow, which provides methods to access fields like getLong, getString, etc.
  2. Minimal Metadata: The response contains only the essential fields (e.g., version, timestamp, commitInfo).
  3. Raw Data: The schema is not enriched with additional metadata or actions; it is a direct representation of the data in the columnar batch.
Use Case

This format is suitable for low-level data processing where the focus is on performance and accessing raw data.

Delta Log Schema Response

When using the DeltaLog.getChanges method, the response is a tuple containing the version number and a list of actions. The actions include detailed metadata about the changes, such as CommitInfo and AddFile.
Sample Output from delta table changes
(2,
Vector(
CommitInfo(None, 2025-08-15 15:00:46.05, None, None, WRITE, Map(mode -> Append, partitionBy -> []), None, None, None, Some(1), Some(Serializable), Some(true), Some(Map(numFiles -> 1, numOutputRows -> 50, numOutputBytes -> 10226)), None, None, Some(Apache-Spark/3.4.2 Delta-Lake/2.4.0), Some(cf7b1472-4c68-4f89-aa97-c8f16512ecfc)),
AddFile(part-00000-e8eeadc8-4e26-46a7-8c61-0bf60e5e7ada-c000.snappy.parquet, Map(), 10226, 1755250246045, true, {"numRecords":50,"minValues":{"id":51,"firstName":"0WI98","lastName":"08VkW","gender":"Female","birthDate":"2013-02-16T21:18:43.000+05:30","level":"ERROR","date_field":"2025-08-15","timestamp_field":"2025-08-15T15:00:45.884+05:30","double_field":0.018425752795049544,"float_field":0.109567106,"long_field":-8844008067348082419,"record_field":{"nested_int":-2060061976}},"maxValues":{"id":100,"firstName":"xZnER","lastName":"ymLQw","gender":"Male","birthDate":"2023-08-07T15:06:55.000+05:30","level":"WARN","date_field":"2025-08-15","timestamp_field":"2025-08-15T15:00:45.885+05:30","double_field":0.9914942463945434,"float_field":0.9841615,"long_field":8775924211265194460,"record_field":{"nested_int":1923869027}},"nullCount":{"id":0,"firstName":25,"lastName":23,"gender":0,"birthDate":0,"level":0,"boolean_field":28,"date_field":24,"timestamp_field":26,"double_field":25,"float_field":28,"long_field":25,"binary_field":32,"primitive_map":22,"record_map":25,"primitive_list":28,"record_list":29,"record_field":{"nested_int":28}}}, null, null)
))
Key Characteristics

  1. Rich Metadata: The response includes detailed metadata such as CommitInfo (e.g., operation type, timestamp, and user metadata) and AddFile (e.g., file path, size, and statistics).
  2. Structured Actions: Each action is represented as a specific object (e.g., CommitInfo, AddFile), making it easier to interpret the changes.
  3. Verbose Output: The response is more verbose, providing a comprehensive view of the changes.

This issue is currently in blocked state. I raised it with delta team quite a few time but no response over it.
https://delta-users.slack.com/archives/C04TRPG3LHZ/p1758730559515289

@the-other-tim-brown
Copy link
Contributor

@vaibhavk1992 I pushed some changes in the latest commit to extract the add and remove files per version.

import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.schema.SchemaUtils;

public class DeltaKernelSchemaExtractor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a unit test for this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked in the current running test cases wherever we are using getCurrentSnapshot we are calling this class and the methods in it. So the checks are already in place. @the-other-tim-brown

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not asking for a functional test, I am asking for unit testing. We want high coverage with the testing of these key components.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@the-other-tim-brown I have added one of the unit test testConvertFromDeltaPartitionSinglePartition. Please confirm if this is looking fine, I will add others too.


@Log4j2
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DeltaKernelPartitionExtractor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a unit test for this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked in the current running test cases wherever we are using getCurrentSnapshot and making InternalTable we are using this class and the methods in it. So the checks are already in place. @the-other-tim-brown

*/
@Log4j2
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DeltaKernelStatsExtractor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here, we could use some unit testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked in the current running test cases under test testInsertsUpsertsAndDeletes we are using getTableChangeForCommit so we are using this class and the methods in it. So the checks are already in place. @the-other-tim-brown

Comment on lines +116 to +117
;
// String tableBasePath = snapshot.dataPath().toUri().toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be cleaned up?

* Converts between Delta and InternalTable schemas. Some items to be aware of:
*
* <ul>
* <li>Delta schemas are represented as Spark StructTypes which do not have enums so the enum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vaibhavk1992 let's remove the changes to this file. They don't seem necessary

this.fields = schema.getFields();

StructType fullSchema = snapshot.getSchema(); // The full table schema
List<String> partitionColumns = snapshot.getPartitionColumnNames(); // List<String>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the comment // List<String>?

StructType fullSchema = snapshot.getSchema(); // The full table schema
List<String> partitionColumns = snapshot.getPartitionColumnNames(); // List<String>

List<StructField> partitionFields_strfld =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: Use camelCase for variable names

Comment on lines +111 to +113
this.dataFilesIterator =
Collections
.emptyIterator(); // Initialize the dataFilesIterator by iterating over the scan files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assignment does not seem necessary since we assign at line 139

Instant deltaCommitInstant = Instant.ofEpochMilli(snapshot.getTimestamp(engine));
return deltaCommitInstant.equals(instant) || deltaCommitInstant.isBefore(instant);
} catch (Exception e) {
System.err.println(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use logging instead of print lines

}

@Test
void testConvertFromDeltaPartitionSinglePartition() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@the-other-tim-brown I have added this unit test, Please confirm if this looks fine I will add the others too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants