-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[Enhancement] Add IcebergDeleteSink to support delete operations on Iceberg tables #67277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
[Java-Extensions Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
[FE Incremental Coverage Report]✅ pass : 54 / 59 (91.53%) file detail
|
[BE Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
|
@cursor review |
| tIcebergTableSink.setCompression_type(compression); | ||
| tIcebergTableSink.setTarget_max_file_size(targetMaxFileSize); | ||
| com.starrocks.thrift.TCloudConfiguration tCloudConfiguration = new com.starrocks.thrift.TCloudConfiguration(); | ||
| cloudConfiguration.toThrift(tCloudConfiguration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NullPointerException when toThrift() called before init()
The cloudConfiguration field is only initialized in the init() method, not in the constructor. However, toThrift() uses cloudConfiguration.toThrift(tCloudConfiguration) without any null check. If toThrift() is called before init(), this will throw a NullPointerException. This is inconsistent with the similar IcebergTableSink class, which initializes cloudConfiguration directly in its constructor as a final field, making it safe to call toThrift() immediately after construction.
Additional Locations (2)
| if (IcebergTable.FILE_PATH.equals(colName)) { | ||
| hasFilePathColumn = true; | ||
| if (!slot.getType().equals(VarcharType.VARCHAR)) { | ||
| throw new StarRocksConnectorException("_file column must be type of VARCHAR"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type validation too strict, rejects valid VARCHAR lengths
The type validation uses equals() to compare types, but ScalarType.equals() for VARCHAR types also compares the length field. Since VarcharType.VARCHAR has len=-1 (wildcard), any column with a specific length like VARCHAR(255) would fail validation with the error "_file column must be type of VARCHAR" even though it is a valid VARCHAR. The codebase provides isVarchar() and matchesType() methods for flexible type checking that ignore length differences. The same issue applies to the BIGINT check, though BIGINT doesn't have length variants so it's less likely to manifest there.
Additional Locations (1)
| boolean hasPosColumn = false; | ||
|
|
||
| for (SlotDescriptor slot : desc.getSlots()) { | ||
| if (slot.getColumn() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (slot.getColumn() != null) { | |
| if (slot.getColumn() == null) { | |
| continue; | |
| } |
In this way, it will make the code more readable, because the indentation will be less.
| } | ||
| } else if (IcebergTable.ROW_POSITION.equals(colName)) { | ||
| hasPosColumn = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } | |
| } else if (IcebergTable.ROW_POSITION.equals(colName)) { | |
| hasPosColumn = true; | |
| } | |
| continue; | |
| } | |
| if (IcebergTable.ROW_POSITION.equals(colName)) { | |
| hasPosColumn = true; |


Why I'm doing:
#66944
What I'm doing:
This pull request introduces support for delete operations on Iceberg tables by adding a new
IcebergDeleteSinkclass, its associated serialization logic, and comprehensive unit tests. The main focus is on enabling the planner to write position delete files to Iceberg tables, ensuring correct tuple validation and integration with the existing data sink infrastructure.Iceberg Delete Sink Implementation:
IcebergDeleteSinkclass infe/fe-core/src/main/java/com/starrocks/planner/IcebergDeleteSink.javato support delete operations for Iceberg tables, including tuple validation, configuration handling, and Thrift serialization.TDataSinkTypeenum ingensrc/thrift/DataSinks.thriftto include the newICEBERG_DELETE_SINKtype for proper Thrift serialization and planner integration.Testing and Validation:
fe/fe-core/src/test/java/com/starrocks/planner/IcebergDeleteSinkTest.javato verify tuple validation, Thrift serialization, and explain string output for the new sink.Fixes #issue
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check:
Note
Iceberg delete sink
IcebergDeleteSink(fe/fe-core/.../IcebergDeleteSink.java) to write Iceberg position delete files; validates tuple has_file(VARCHAR) and_pos(BIGINT), sets locations, compression, target file size, cloud config; provides explain output and Thrift serialization viaTDataSinkType.ICEBERG_DELETE_SINKintoTIcebergTableSink(usesdata_location,file_format=parquet).Thrift and tests
gensrc/thrift/DataSinks.thriftwithICEBERG_DELETE_SINK.IcebergDeleteSinkTest) covering tuple validation errors, Thrift serialization fields, andgetExplainString().Written by Cursor Bugbot for commit f63c4c7. This will update automatically on new commits. Configure here.